9374c667a6bfe35030e23cd1624281688feff9da
1 # This file is part of NIT (http://www.nitlanguage.org).
3 # Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # Tester of Nit engines on an MPI cluster
24 # Any processor, worker or controller
26 # All data and methods within this class are shared by the controller and the
28 abstract class Processor
31 # Controller rank is always 0
32 var controller_rank
: Rank = 0.rank
34 # Rank on this processor
35 fun rank
: Rank is abstract
37 # Where to store data for transfer between nodes
39 # Require: `buffer.length % 4 == 0`
40 var buffer
= new CIntArray(1024)
42 # Run in verbose mode, display more text
47 # OpenMPI sends a SIGTERM to all nodes upon receiving a SIGTERM or SIGINT
48 # on the first process.
49 handle_signal
(sigterm
, true)
52 # Tag of a new task packet of size `tasks_per_packet`
53 var task_tag
: Tag = 0.tag
55 # Tag to return a set of `Result` thought `buffer`
56 var result_tag
: Tag = 1.tag
58 # Tag to notify `Worker` when to quit
59 var quit_tag
: Tag = 2.tag
61 # Tag to request more work from the `Controller` by a `Worker`
62 var need_work_tag
: Tag = 4.tag
64 # Tag to notify `Controller` that the sender `Worker` is done
65 var done_tag
: Tag = 5.tag
67 # Number of tasks within each task assignation with `task_tag`
68 var tasks_per_packet
= 1
70 # Run the main logic of this node
73 # Engines targeted by this execution
74 var engines
: Array[String] is noinit
76 # All known engines, used to detect errors in `engines`
77 var all_engines
: Array[String] = ["nitg-s", "nitg-sg", "nitg-g", "nitg-e", "niti", "emscripten"]
79 # Programs to test in this execution
80 var test_programs
: Array[String] is noinit
82 # Root of the temporary directory
83 var tmp_dir
= "/dev/shm/"
86 var ccache_dir
= "/dev/shm/nit_ccache"
88 # Read command line options
91 var opt_ctx
= new OptionContext
92 var opt_engines
= new OptionString(
93 "Engines to test, separated with commas ({all_engines.join(", ")} or all)",
95 var opt_help
= new OptionBool("Print this help message", "--help", "-h")
96 var opt_verbose
= new OptionCount(
97 "Be verbose, repeat to increase verbose level (max with -vvv)",
99 var opt_cleanup
= new OptionBool(
100 "Clean up all nitester files (and do not run tests)",
103 opt_ctx
.add_option
(opt_engines
, opt_help
, opt_verbose
, opt_cleanup
)
107 if opt_help
.value
then opt_ctx
.usage_error
null
110 verbose
= opt_verbose
.value
113 if opt_cleanup
.value
then
114 assert tmp_dir
.file_exists
115 for file
in tmp_dir
.files
do if file
.has_prefix
("nit") then
116 var full_path
= tmp_dir
/ file
117 if full_path
== ccache_dir
then continue
119 assert full_path
.file_exists
121 var stat
= full_path
.file_lstat
125 full_path
.file_delete
134 var rest
= opt_ctx
.rest
135 if rest
.is_empty
then opt_ctx
.usage_error
"This tool needs at least one test_program.nit"
138 # gather and check engines
139 var engines_str
= opt_engines
.value
141 if engines_str
== null then
145 engines
= engines_str
.split
(',')
147 if engines
.has
("all") then
149 engines
= all_engines
153 # check validity of targetted engines
154 var unknown_engines
= new Array[String]
155 for engine
in engines
do if not all_engines
.has
(engine
) then unknown_engines
.add engine
157 if not unknown_engines
.is_empty
then
158 opt_ctx
.usage_error
"Unknown engines: {unknown_engines.join(", ")} (expected one or most of {all_engines.join(", ")})"
160 self.engines
= engines
163 # All tasks to be performed
164 var tasks
= new Array[Task]
166 # Gather and register all tasks
169 # At this point we are in our local nit
170 var skip_path
= "tests/turing.skip"
172 if skip_path
.file_exists
then
173 var skip_file
= new IFStream.open
(skip_path
)
174 skip
= skip_file
.read_lines
177 skip
= new Array[String]
180 for prog
in test_programs
do for engine
in engines
do
183 for s
in skip
do if not s
.is_empty
and prog
.has
(s
) then
184 if verbose
> 0 and rank
== 0 then print
"Skipping test '{prog}' because of '{s}' in turing.skip"
188 tasks
.add
new Task(engine
, prog
)
193 # Single controller to dispatch tasks, gather results and produce stats
197 redef fun rank
do return controller_rank
199 # Id as `Int` of the next task to distribute
202 redef fun receive_signal
(signal
)
219 # Cumulated results from workers
220 var results
= new ResultSet
222 # Maintain communication with workers to distribute tasks and receiver results
225 var at_work
= new Array[Rank]
228 for r
in [1..comm_world
.size
[ do
229 var sent
= send_task_to
(r
.rank
)
233 mpi
.send_empty
(r
.rank
, quit_tag
, comm_world
)
237 var status
= new Status
238 # await results and send new tasks
239 while not at_work
.is_empty
do
242 # Double probe to avoid bug with some implementation of MPI
243 mpi
.probe
(new Rank.any
, new Tag.any
, comm_world
, status
)
244 mpi
.probe
(new Rank.any
, new Tag.any
, comm_world
, status
)
246 if status
.tag
== result_tag
then
247 # Receive results fron a worker
248 var count
= status
.count
(new DataType.int
)
249 mpi
.recv_into
(buffer
, 0, count
, status
.source
, status
.tag
, comm_world
)
251 # Parse results from C array to `Result` instances
253 # Each result is on 4 ints: task id, arg, alt and result.
255 # See the comments where the data is produced in `Worker::work_on_tasks` for more informationé
256 assert count
% 4 == 0
257 for t
in (count
/4).times
do
260 var task_id
= buffer
[tt
]
261 var arg
= buffer
[tt
+1]
262 var alt
= buffer
[tt
+2]
263 var res
= buffer
[tt
+3]
265 var result
= new Result(tasks
[task_id
], arg
, alt
)
267 if res
== 1 then result
.ok
= true
268 if res
== 2 then result
.ok_empty
= true
269 if res
== 3 then result
.no_sav
= true
270 if res
== 4 then result
.fixme
= true
271 if res
== 5 then result
.fail
= true
272 if res
== 6 then result
.soso
= true
273 if res
== 7 then result
.skip
= true
274 if res
== 8 then result
.todo
= true
275 if res
== 9 then result
.skip_exec
= true
276 if res
== 0 then result
.unknown
= true
280 if verbose
> 0 and results
.length
% 25 == 0 then print_short_results
283 else if status
.tag
== need_work_tag
then
284 # A worker needs more work
285 mpi
.recv_empty
(status
.source
, status
.tag
, comm_world
)
286 var sent
= send_task_to
(status
.source
)
289 mpi
.send_empty
(status
.source
, quit_tag
, comm_world
)
291 else if status
.tag
== done_tag
then
292 # A worker is done and will quit
293 mpi
.recv_empty
(status
.source
, status
.tag
, comm_world
)
294 at_work
.remove
(status
.source
)
296 if verbose
> 1 then print
"worker {status.source} is done ({at_work.length} still at work)"
298 print
"Unexpected tag {status.tag}"
306 # Send a packet of tasks to worker at `rank`
307 fun send_task_to
(rank
: Rank): Bool
309 if next_task_id
>= tasks
.length
then return false
311 buffer
[0] = next_task_id
312 next_task_id
+= tasks_per_packet
314 mpi
.send_from
(buffer
, 0, 1, rank
, task_tag
, comm_world
)
316 if verbose
> 1 then print
"sent tasks [{buffer[0]}..{next_task_id}[ to worker {rank}"
320 # Display the accumulated results received from workers
324 print
"* {results.length} total"
325 print
"* {results.oks.length + results.ok_empties.length} oks & 0ks"
326 print
"* {results.fails.length} fails"
327 print
"* {results.no_savs.length} no savs"
328 print
"* {results.fixmes.length} fixmes"
329 print
"* {results.sosos.length} sosos"
330 print
"* {results.skips.length} skips"
331 print
"* {results.todos.length} todos"
332 print
"* {results.skip_execs.length} skip execs"
333 print
"* {results.unknowns.length} unknowns (bug in tests.sh or nitester)"
336 fun print_short_results
do print
"oks & fails / total: {results.oks.length + results.ok_empties.length} " +
337 "& {results.fails.length} / {results.length}"
339 # Shutdown anormaly the running tests
342 print
"Shutting down"
343 mpi
.send_empty
(new Rank.any
, quit_tag
, comm_world
)
347 # A worker node which actually execute the tests
351 # The `Rank` of `self`
354 # Compilation directory
355 var comp_dir
= "/dev/shm/nit_compile{rank}" is lazy
357 # Output file directory
358 var out_dir
= "/dev/shm/nit_out{rank}" is lazy
360 # Directory to store the xml files produced for Jenkins
361 var xml_dir
= "~/jenkins_xml/"
363 # Output file of the `tests.sh` script
364 var tests_sh_out
= "/dev/shm/nit_local_out{rank}" is lazy
366 # Source Nit repository, must be already updated and `make` before execution
367 var nit_source_dir
= "~/nit"
369 # Compiled `Regex` to detect the argument of an execution
370 var re_arg
: Regex = "arg [0-9]+".to_re
372 # Compiled `Regex` to detect the alternative of an execution
373 var re_alt
: Regex = "_alt[0-9]+".to_re
384 # Setup the testing environment
386 # Clone the nit repository.
389 if verbose
> 0 then sys
.system
"hostname"
392 # Clean up the testing environment
394 # Delete all temporary files, except `ccache_dir`.
397 if comp_dir
.file_exists
then comp_dir
.rmdir
398 if out_dir
.file_exists
then out_dir
.rmdir
399 if tests_sh_out
.file_exists
then tests_sh_out
.file_delete
402 # Single C `int` to hold the next task id received from the `Controller`
403 var task_buffer
= new CIntArray(1)
405 # Manage communication with the `Controller` and execute dispatched `Task`s
408 var status
= new Status
412 # We double probe to prevent bug where a single probes does not receive the
414 mpi
.probe
(controller_rank
, new Tag.any
, comm_world
, status
)
415 mpi
.probe
(controller_rank
, new Tag.any
, comm_world
, status
)
417 if status
.tag
== task_tag
then
418 # Receive tasks to execute
419 mpi
.recv_into
(task_buffer
, 0, 1, status
.source
, status
.tag
, comm_world
)
420 var first_id
= task_buffer
[0]
421 for task_id
in [first_id
.. first_id
+ tasks_per_packet
[ do
423 # If id is over all known tasks, stop right here
424 if task_id
>= tasks
.length
then break
425 var task
= tasks
[task_id
]
427 # Command line to execute test
428 var cmd
= "XMLDIR={xml_dir} ERRLIST={out_dir}/errlist TMPDIR={out_dir} " +
429 "CCACHE_DIR={ccache_dir} CCACHE_TEMPDIR={ccache_dir} CCACHE_BASEDIR={comp_dir} " +
430 "./tests.sh --compdir {comp_dir} --outdir {out_dir} " +
431 " --node --engine {task.engine} {task.test_program} > {tests_sh_out}"
436 # Test results were written to file, read them
437 var fstream
= new IFStream.open
(tests_sh_out
)
438 var content
= fstream
.read_all
441 # Parse result and prepare them for sending
443 # The structure is composed of 4 ints for each result.
447 # 4. test result as int
448 var c
= results_count
449 for line
in content
.split
('\n') do if not line
.is_empty
then
454 var arg_match
= line
.search
(re_arg
)
456 if arg_match
!= null then arg
= arg_match
.to_s
.substring_from
(4).to_i
459 var alt_match
= line
.search
(re_alt
)
461 if alt_match
!= null then alt
= alt_match
.to_s
.substring_from
(4).to_i
465 if line
.has
("[ok]") then res
= 1
466 if line
.has
("[0k]") then res
= 2
467 if line
.has
("[=== no sav ===]") then res
= 3
468 if line
.has
("[fixme]") then res
= 4
469 if line
.has
("[======= fail") then res
= 5
470 if line
.has
("[======= soso") then res
= 6
471 if line
.has
("[skip]") then res
= 7
472 if line
.has
("[todo]") then res
= 8
473 if line
.has
("[skip exec]") then res
= 9
477 if verbose
> 1 then print
"Unknown result: '{line}'"
483 if verbose
> 2 then print
"tests.sh output line: {line}"
485 # If result buffer is full, send to `Controller`
486 if c
*4 == buffer
.length
then
492 self.results_count
= c
495 mpi
.send_empty
(controller_rank
, need_work_tag
, comm_world
)
496 else if status
.tag
== quit_tag
then
497 # Notification from the `Controller` to quit
498 mpi
.recv_empty
(status
.source
, status
.tag
, comm_world
)
500 # Send remaining results
503 # Notify `Controller` that `self` is done and will quit
504 mpi
.send_empty
(controller_rank
, done_tag
, comm_world
)
507 print
"Unexpected tag {status.tag}"
514 # Total results listed in `buffer` and ready to send
515 var results_count
= 0
517 # Send all results in `buffer` to the `Controller`
520 if results_count
> 0 then
521 if verbose
> 1 then print
"sending {results_count} results"
522 mpi
.send_from
(buffer
, 0, results_count
*4, controller_rank
, result_tag
, comm_world
)
527 redef fun receive_signal
(signal
)
535 # A single test task, on a `test_program` with an `engine`
537 # Note that a task may involve more than one program to test considering the
538 # alts and args for the `test_program`.
540 # Engine to test executing `test_program`
543 # Program to execute with `engine`
544 var test_program
: String
546 redef fun to_s
do return "{engine} {test_program}"
551 # There may be more than one result per `Task`.
553 # `Task` associated to `self`
556 # Argument index of the execution resulting in `self`
559 # Alternative index of the execution resulting in `self`
562 # Is `self` result an _ok_?
565 # Is `self` result an _0k_?
568 # Is `self` result a _no sav_?
571 # Is `self` result a _fixme_?
574 # Is `self` result a _fail_?
577 # Is `self` result a _soso_?
580 # Has `self` been skipped?
586 # Has the execution of `self` been skipped?
587 var skip_exec
= false
589 # Is `self` an unknown result, probably an error
595 if no_sav
then err
= "no sav"
596 if ok
then err
= "ok"
597 if ok_empty
then err
= "0k"
598 if fixme
then err
= "fixme"
599 if fail
then err
= "fail"
600 if soso
then err
= "soso"
601 if skip
then err
= "skip"
602 if todo
then err
= "todo"
603 if skip_exec
then err
= "skip_exec"
605 return "{task} arg{arg} alt{alt} => {err}"
609 # A global and sorted collection of `Result`
611 super HashSet[Result]
613 var no_savs
= new HashSet[Result]
614 var oks
= new HashSet[Result]
615 var ok_empties
= new HashSet[Result]
616 var fixmes
= new HashSet[Result]
617 var fails
= new HashSet[Result]
618 var sosos
= new HashSet[Result]
619 var skips
= new HashSet[Result]
620 var todos
= new HashSet[Result]
621 var skip_execs
= new HashSet[Result]
622 var unknowns
= new HashSet[Result]
625 var per_engines
= new HashMap[String, Result]
627 redef fun add
(result
)
629 if result
.no_sav
then no_savs
.add result
630 if result
.ok
then oks
.add result
631 if result
.ok_empty
then ok_empties
.add result
632 if result
.fixme
then fixmes
.add result
633 if result
.fail
then fails
.add result
634 if result
.soso
then sosos
.add result
635 if result
.skip
then skips
.add result
636 if result
.todo
then todos
.add result
637 if result
.skip_exec
then skip_execs
.add result
638 if result
.unknown
then unknowns
.add result
643 redef fun remove
(r
) do abort
645 redef fun clear
do abort
648 redef class OptionContext
650 # Print usage with a possible error `message`
651 private fun usage_error
(message
: nullable String)
654 if message
!= null then
655 print
"Error: {message}"
659 if comm_world
.rank
== 0 then
660 print
"Usage: mpirun nitester [Options] test_program.nit [other_test.nit [...]]"
669 # On `Worker` nodes, prefix all prints with `rank/comm_world.size`
670 redef fun print
(msg
: Object)
672 if comm_world
.rank
!= 0.rank
then
673 super "{comm_world.rank}/{comm_world.size}: {msg}"
677 # Running MPI instance
678 fun mpi
: MPI do return once
new MPI
684 var rank
= comm_world
.rank
686 var processor
: Processor
687 if rank
== 0.rank
then
688 # If rank == 0, this is the `Controller`
689 processor
= new Controller
692 processor
= new Worker(rank
)