1 # This file is part of NIT (http://www.nitlanguage.org).
3 # Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # Tester of Nit engines on an MPI cluster
24 # Any processor, worker or controller
26 # All data and methods within this class are shared by the controller and the
28 abstract class Processor
31 # Controller rank is always 0
32 var controller_rank
: Rank = 0.rank
34 # Where to store data for transfer between nodes
36 # Require: `buffer.length % 4 == 0`
37 var buffer
= new CIntArray(1024)
39 # Run in verbose mode, display more text
44 # OpenMPI sends a SIGTERM to all nodes upon receiving a SIGTERM or SIGINT
45 # on the first process.
46 handle_signal
(sigterm
, true)
49 # Tag of a new task packet of size `tasks_per_packet`
50 var task_tag
: Tag = 0.tag
52 # Tag to return a set of `Result` throught `buffer`
53 var result_tag
: Tag = 1.tag
55 # Tag to notify `Worker` when to quit
56 var quit_tag
: Tag = 2.tag
58 # Tag to request more work from the `Controller` by a `Worker`
59 var need_work_tag
: Tag = 4.tag
61 # Tag to notify `Controller` that the sender `Worker` is done
62 var done_tag
: Tag = 5.tag
64 # Number of tasks within each task assignation with `task_tag`
65 var tasks_per_packet
= 1
67 # Run the main logic of this node
70 # Engines targetted by this execution
71 var engines
: Array[String] is noinit
73 # All known engines, used to detect errors in `engines`
74 var all_engines
: Array[String] = ["nitg-s", "nitg-sg", "nitg-g", "nitg-e", "niti", "emscripten"]
76 # Programs to test in this execution
77 var test_programs
: Array[String] is noinit
79 # Root of the temporary directory
80 var tmp_dir
= "/dev/shm/"
83 var ccache_dir
= "/dev/shm/nit_ccache"
85 # Read command line options
88 var opt_ctx
= new OptionContext
89 var opt_engines
= new OptionString(
90 "Engines to test, separated with commas ({all_engines.join(", ")} or all)",
92 var opt_help
= new OptionBool("Print this help message", "--help", "-h")
93 var opt_verbose
= new OptionCount(
94 "Be verbose, repeat to increase verbose level (max with -vvv)",
96 var opt_cleanup
= new OptionBool(
97 "Clean up all nitester files (and do not run tests)",
100 opt_ctx
.add_option
(opt_engines
, opt_help
, opt_verbose
, opt_cleanup
)
104 if opt_help
.value
then opt_ctx
.usage_error
null
107 verbose
= opt_verbose
.value
110 if opt_cleanup
.value
then
111 assert tmp_dir
.file_exists
112 for file
in tmp_dir
.files
do if file
.has_prefix
("nit") then
113 var full_path
= tmp_dir
/ file
114 if full_path
== ccache_dir
then continue
116 assert full_path
.file_exists
118 var stat
= full_path
.file_lstat
122 full_path
.file_delete
131 var rest
= opt_ctx
.rest
132 if rest
.is_empty
then opt_ctx
.usage_error
"This tool needs at least one test_program.nit"
135 # gather and check engines
136 var engines_str
= opt_engines
.value
138 if engines_str
== null then
142 engines
= engines_str
.split
(',')
144 if engines
.has
("all") then
146 engines
= all_engines
150 # check validity of targetted engines
151 var unknown_engines
= new Array[String]
152 for engine
in engines
do if not all_engines
.has
(engine
) then unknown_engines
.add engine
154 if not unknown_engines
.is_empty
then
155 opt_ctx
.usage_error
"Unknown engines: {unknown_engines.join(", ")} (expected one or most of {all_engines.join(", ")})"
157 self.engines
= engines
160 # All tasks to be performed
161 var tasks
= new Array[Task]
163 # Gather and registar all tasks
166 for prog
in test_programs
do for engine
in engines
do
167 tasks
.add
new Task(engine
, prog
)
172 # Single controller to dispatch tasks, gather results and produce stats
176 # Id as `Int` of the next task to distribute
179 redef fun receive_signal
(signal
)
196 # Cumulated results from workers
197 var results
= new ResultSet
199 # Maintain communication with workers to distribute tasks and receiver results
202 var at_work
= new Array[Rank]
205 for r
in [1..comm_world
.size
[ do
206 var sent
= send_task_to
(r
.rank
)
210 mpi
.send_empty
(r
.rank
, quit_tag
, comm_world
)
214 var status
= new Status
215 # await results and send new tasks
216 while not at_work
.is_empty
do
219 # Double probe to avoid bug with some implementation of MPI
220 mpi
.probe
(new Rank.any
, new Tag.any
, comm_world
, status
)
221 mpi
.probe
(new Rank.any
, new Tag.any
, comm_world
, status
)
223 if status
.tag
== result_tag
then
224 # Receive results fron a worker
225 var count
= status
.count
(new DataType.int
)
226 mpi
.recv_into
(buffer
, 0, count
, status
.source
, status
.tag
, comm_world
)
228 # Parse results from C array to `Result` instances
230 # Each result is on 4 ints: task id, arg, alt and result.
232 # See the comments where the data is produced in `Worker::work_on_tasks` for more informationé
233 assert count
% 4 == 0
234 for t
in (count
/4).times
do
237 var task_id
= buffer
[tt
]
238 var arg
= buffer
[tt
+1]
239 var alt
= buffer
[tt
+2]
240 var res
= buffer
[tt
+3]
242 var result
= new Result(tasks
[task_id
], arg
, alt
)
244 if res
== 1 then result
.ok
= true
245 if res
== 2 then result
.ok_empty
= true
246 if res
== 3 then result
.no_sav
= true
247 if res
== 4 then result
.fixme
= true
248 if res
== 5 then result
.fail
= true
249 if res
== 6 then result
.soso
= true
250 if res
== 7 then result
.skip
= true
251 if res
== 8 then result
.todo
= true
252 if res
== 9 then result
.skip_exec
= true
253 if res
== 0 then result
.unknown
= true
257 if verbose
> 0 and results
.length
% 25 == 0 then print_short_results
260 else if status
.tag
== need_work_tag
then
261 # A worker needs more work
262 mpi
.recv_empty
(status
.source
, status
.tag
, comm_world
)
263 var sent
= send_task_to
(status
.source
)
266 mpi
.send_empty
(status
.source
, quit_tag
, comm_world
)
268 else if status
.tag
== done_tag
then
269 # A worker is done and will quit
270 mpi
.recv_empty
(status
.source
, status
.tag
, comm_world
)
271 at_work
.remove
(status
.source
)
273 if verbose
> 1 then print
"worker {status.source} is done ({at_work.length} still at work)"
275 print
"Unexpected tag {status.tag}"
283 # Send a packet of tasks to worker at `rank`
284 fun send_task_to
(rank
: Rank): Bool
286 if next_task_id
>= tasks
.length
then return false
288 buffer
[0] = next_task_id
289 next_task_id
+= tasks_per_packet
291 mpi
.send_from
(buffer
, 0, 1, rank
, task_tag
, comm_world
)
293 if verbose
> 1 then print
"sent tasks [{buffer[0]}..{next_task_id}[ to worker {rank}"
297 # Display the accumulated results received from workers
301 print
"* {results.length} total"
302 print
"* {results.oks.length + results.ok_empties.length} oks & 0ks"
303 print
"* {results.fails.length} fails"
304 print
"* {results.no_savs.length} no savs"
305 print
"* {results.fixmes.length} fixmes"
306 print
"* {results.sosos.length} sosos"
307 print
"* {results.skips.length} skips"
308 print
"* {results.todos.length} todos"
309 print
"* {results.skip_execs.length} skip execs"
310 print
"* {results.unknowns.length} unknowns (bug in tests.sh or nitester)"
313 fun print_short_results
do print
"oks & fails / total: {results.oks.length + results.ok_empties.length} " +
314 "& {results.fails.length} / {results.length}"
316 # Shutdown anormaly the running tests
319 print
"Shutting down"
320 mpi
.send_empty
(new Rank.any
, quit_tag
, comm_world
)
324 # A worker node which actually execute the tests
328 # The `Rank` of `self`
331 # Compilation directory
332 var comp_dir
= "/dev/shm/nit_compile{rank}" is lazy
334 # Output file directory
335 var out_dir
= "/dev/shm/nit_out{rank}" is lazy
337 # Directory to store the xml files produced for Jenkins
338 var xml_dir
= "~/jenkins_xml/"
340 # Output file of the `tests.sh` script
341 var tests_sh_out
= "/dev/shm/nit_local_out{rank}" is lazy
343 # Source Nit repository, must be already updated and `make` before execution
344 var nit_source_dir
= "~/nit"
346 # Compiled `Regex` to detect the argument of an execution
347 var re_arg
: Regex = "arg [0-9]+".to_re
349 # Compiled `Regex` to detect the alternative of an execution
350 var re_alt
: Regex = "_alt[0-9]+".to_re
361 # Setup the testing environment
363 # Clone the nit repository.
366 if verbose
> 0 then sys
.system
"hostname"
369 # Clean up the testing environment
371 # Delete all temporary files, except `ccache_dir`.
374 if comp_dir
.file_exists
then comp_dir
.rmdir
375 if out_dir
.file_exists
then out_dir
.rmdir
376 if tests_sh_out
.file_exists
then tests_sh_out
.file_delete
379 # Single C `int` to hold the next task id received from the `Controller`
380 var task_buffer
= new CIntArray(1)
382 # Manage communication with the `Controller` and execute dispatched `Task`s
385 var status
= new Status
389 # We double probe to prevent bug where a single probes does not receive the
391 mpi
.probe
(controller_rank
, new Tag.any
, comm_world
, status
)
392 mpi
.probe
(controller_rank
, new Tag.any
, comm_world
, status
)
394 if status
.tag
== task_tag
then
395 # Receive tasks to execute
396 mpi
.recv_into
(task_buffer
, 0, 1, status
.source
, status
.tag
, comm_world
)
397 var first_id
= task_buffer
[0]
398 for task_id
in [first_id
.. first_id
+ tasks_per_packet
] do
400 # If id is over all known tasks, stop right here
401 if task_id
>= tasks
.length
then break
402 var task
= tasks
[task_id
]
404 # Command line to execute test
405 var cmd
= "XMLDIR={xml_dir} ERRLIST={out_dir}/errlist TMPDIR={out_dir} " +
406 "CCACHE_DIR={ccache_dir} CCACHE_TEMPDIR={ccache_dir} CCACHE_BASEDIR={comp_dir} " +
407 "./tests.sh --compdir {comp_dir} --outdir {out_dir} " +
408 " --node --engine {task.engine} {task.test_program} > {tests_sh_out}"
413 # Test results were written to file, read them
414 var fstream
= new IFStream.open
(tests_sh_out
)
415 var content
= fstream
.read_all
418 # Parse result and prepare them for sending
420 # The structure is composed of 4 ints for each result.
424 # 4. test result as int
425 var c
= results_count
426 for line
in content
.split
('\n') do if not line
.is_empty
then
431 var arg_match
= line
.search
(re_arg
)
433 if arg_match
!= null then arg
= arg_match
.to_s
.substring_from
(4).to_i
436 var alt_match
= line
.search
(re_alt
)
438 if alt_match
!= null then alt
= alt_match
.to_s
.substring_from
(4).to_i
442 if line
.has
("[ok]") then res
= 1
443 if line
.has
("[0k]") then res
= 2
444 if line
.has
("[=== no sav ===]") then res
= 3
445 if line
.has
("[fixme]") then res
= 4
446 if line
.has
("[======= fail") then res
= 5
447 if line
.has
("[======= soso") then res
= 6
448 if line
.has
("[skip]") then res
= 7
449 if line
.has
("[todo]") then res
= 8
450 if line
.has
("[skip exec]") then res
= 9
454 if verbose
> 1 then print
"Unknown result: '{line}'"
460 if verbose
> 2 then print
"tests.sh output line: {line}"
462 # If result buffer is full, send to `Controller`
463 if c
*4 == buffer
.length
then
469 self.results_count
= c
472 mpi
.send_empty
(controller_rank
, need_work_tag
, comm_world
)
473 else if status
.tag
== quit_tag
then
474 # Notification from the `Controller` to quit
475 mpi
.recv_empty
(status
.source
, status
.tag
, comm_world
)
477 # Send remaining results
480 # Notify `Controller` that `self` is done and will quit
481 mpi
.send_empty
(controller_rank
, done_tag
, comm_world
)
484 print
"Unexpected tag {status.tag}"
491 # Total results listed in `buffer` and ready to send
492 var results_count
= 0
494 # Send all results in `buffer` to the `Controller`
497 if results_count
> 0 then
498 if verbose
> 1 then print
"sending {results_count} results"
499 mpi
.send_from
(buffer
, 0, results_count
*4, controller_rank
, result_tag
, comm_world
)
504 redef fun receive_signal
(signal
)
512 # A single test task, on a `test_program` with an `engine`
514 # Note that a task may involve more than one program to test considering the
515 # alts and args for the `test_program`.
517 # Engine to test executing `test_program`
520 # Program to execute with `engine`
521 var test_program
: String
523 redef fun to_s
do return "{engine} {test_program}"
528 # There may be more than one result per `Task`.
530 # `Task` associated to `self`
533 # Argument index of the execution resulting in `self`
536 # Alternative index of the execution resulting in `self`
539 # Is `self` result an _ok_?
542 # Is `self` result an _0k_?
545 # Is `self` result a _no sav_?
548 # Is `self` result a _fixme_?
551 # Is `self` result a _fail_?
554 # Is `self` result a _soso_?
557 # Has `self` been skipped?
563 # Has the execution of `self` been skipped?
564 var skip_exec
= false
566 # Is `self` an unknown result, probably an error
572 if no_sav
then err
= "no sav"
573 if ok
then err
= "ok"
574 if ok_empty
then err
= "0k"
575 if fixme
then err
= "fixme"
576 if fail
then err
= "fail"
577 if soso
then err
= "soso"
578 if skip
then err
= "skip"
579 if todo
then err
= "todo"
580 if skip_exec
then err
= "skip_exec"
582 return "{task} arg{arg} alt{alt} => {err}"
586 # A global and sorted collection of `Result`
588 super HashSet[Result]
590 var no_savs
= new HashSet[Result]
591 var oks
= new HashSet[Result]
592 var ok_empties
= new HashSet[Result]
593 var fixmes
= new HashSet[Result]
594 var fails
= new HashSet[Result]
595 var sosos
= new HashSet[Result]
596 var skips
= new HashSet[Result]
597 var todos
= new HashSet[Result]
598 var skip_execs
= new HashSet[Result]
599 var unknowns
= new HashSet[Result]
602 var per_engines
= new HashMap[String, Result]
604 redef fun add
(result
)
606 if result
.no_sav
then no_savs
.add result
607 if result
.ok
then oks
.add result
608 if result
.ok_empty
then ok_empties
.add result
609 if result
.fixme
then fixmes
.add result
610 if result
.fail
then fails
.add result
611 if result
.soso
then sosos
.add result
612 if result
.skip
then skips
.add result
613 if result
.todo
then todos
.add result
614 if result
.skip_exec
then skip_execs
.add result
615 if result
.unknown
then unknowns
.add result
620 redef fun remove
(r
) do abort
622 redef fun clear
do abort
625 redef class OptionContext
627 # Print usage with a possible error `message`
628 private fun usage_error
(message
: nullable String)
631 if message
!= null then
632 print
"Error: {message}"
636 if comm_world
.rank
== 0 then
637 print
"Usage: mpirun nitester [Options] test_program.nit [other_test.nit [...]]"
646 # On `Worker` nodes, prefix all prints with `rank/comm_world.size`
647 redef fun print
(msg
: Object)
649 if comm_world
.rank
!= 0.rank
then
650 super "{comm_world.rank}/{comm_world.size}: {msg}"
654 # Running MPI instance
655 fun mpi
: MPI do return once
new MPI
661 var rank
= comm_world
.rank
663 var processor
: Processor
664 if rank
== 0.rank
then
665 # If rank == 0, this is the `Controller`
666 processor
= new Controller
669 processor
= new Worker(rank
)