contrib/nitester: each Processor know its rank
[nit.git] / contrib / nitester / src / nitester.nit
1 # This file is part of NIT (http://www.nitlanguage.org).
2 #
3 # Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 # Tester of Nit engines on an MPI cluster
18 module nitester
19
20 import mpi
21 import signals
22 import opts
23
24 # Any processor, worker or controller
25 #
26 # All data and methods within this class are shared by the controller and the
27 # workers.
28 abstract class Processor
29 super SignalHandler
30
31 # Controller rank is always 0
32 var controller_rank: Rank = 0.rank
33
34 # Rank on this processor
35 fun rank: Rank is abstract
36
37 # Where to store data for transfer between nodes
38 #
39 # Require: `buffer.length % 4 == 0`
40 var buffer = new CIntArray(1024)
41
42 # Run in verbose mode, display more text
43 var verbose = 0
44
45 init
46 do
47 # OpenMPI sends a SIGTERM to all nodes upon receiving a SIGTERM or SIGINT
48 # on the first process.
49 handle_signal(sigterm, true)
50 end
51
52 # Tag of a new task packet of size `tasks_per_packet`
53 var task_tag: Tag = 0.tag
54
55 # Tag to return a set of `Result` throught `buffer`
56 var result_tag: Tag = 1.tag
57
58 # Tag to notify `Worker` when to quit
59 var quit_tag: Tag = 2.tag
60
61 # Tag to request more work from the `Controller` by a `Worker`
62 var need_work_tag: Tag = 4.tag
63
64 # Tag to notify `Controller` that the sender `Worker` is done
65 var done_tag: Tag = 5.tag
66
67 # Number of tasks within each task assignation with `task_tag`
68 var tasks_per_packet = 1
69
70 # Run the main logic of this node
71 fun run is abstract
72
73 # Engines targetted by this execution
74 var engines: Array[String] is noinit
75
76 # All known engines, used to detect errors in `engines`
77 var all_engines: Array[String] = ["nitg-s", "nitg-sg", "nitg-g", "nitg-e", "niti", "emscripten"]
78
79 # Programs to test in this execution
80 var test_programs: Array[String] is noinit
81
82 # Root of the temporary directory
83 var tmp_dir = "/dev/shm/"
84
85 # `ccache` directory
86 var ccache_dir = "/dev/shm/nit_ccache"
87
88 # Read command line options
89 fun read_cli_options
90 do
91 var opt_ctx = new OptionContext
92 var opt_engines = new OptionString(
93 "Engines to test, separated with commas ({all_engines.join(", ")} or all)",
94 "--engine", "-e")
95 var opt_help = new OptionBool("Print this help message", "--help", "-h")
96 var opt_verbose = new OptionCount(
97 "Be verbose, repeat to increase verbose level (max with -vvv)",
98 "--verbose", "-v")
99 var opt_cleanup = new OptionBool(
100 "Clean up all nitester files (and do not run tests)",
101 "--cleanup", "-C")
102
103 opt_ctx.add_option(opt_engines, opt_help, opt_verbose, opt_cleanup)
104 opt_ctx.parse args
105
106 # --help?
107 if opt_help.value then opt_ctx.usage_error null
108
109 # --verbose?
110 verbose = opt_verbose.value
111
112 # --cleanup?
113 if opt_cleanup.value then
114 assert tmp_dir.file_exists
115 for file in tmp_dir.files do if file.has_prefix("nit") then
116 var full_path = tmp_dir / file
117 if full_path == ccache_dir then continue
118
119 assert full_path.file_exists
120
121 var stat = full_path.file_lstat
122 if stat.is_dir then
123 full_path.rmdir
124 else
125 full_path.file_delete
126 end
127 stat.free
128 end
129 mpi.finalize
130 exit 0
131 end
132
133 # any files?
134 var rest = opt_ctx.rest
135 if rest.is_empty then opt_ctx.usage_error "This tool needs at least one test_program.nit"
136 test_programs = rest
137
138 # gather and check engines
139 var engines_str = opt_engines.value
140 var engines
141 if engines_str == null then
142 # default
143 engines = ["nitg-s"]
144 else
145 engines = engines_str.split(',')
146
147 if engines.has("all") then
148 # all engines
149 engines = all_engines
150 end
151 end
152
153 # check validity of targetted engines
154 var unknown_engines = new Array[String]
155 for engine in engines do if not all_engines.has(engine) then unknown_engines.add engine
156
157 if not unknown_engines.is_empty then
158 opt_ctx.usage_error "Unknown engines: {unknown_engines.join(", ")} (expected one or most of {all_engines.join(", ")})"
159 end
160 self.engines = engines
161 end
162
163 # All tasks to be performed
164 var tasks = new Array[Task]
165
166 # Gather and registar all tasks
167 fun create_tasks
168 do
169 for prog in test_programs do for engine in engines do
170 tasks.add new Task(engine, prog)
171 end
172 end
173 end
174
175 # Single controller to dispatch tasks, gather results and produce stats
176 class Controller
177 super Processor
178
179 redef fun rank do return controller_rank
180
181 # Id as `Int` of the next task to distribute
182 var next_task_id = 0
183
184 redef fun receive_signal(signal)
185 do
186 shutdown
187 print_results
188
189 mpi.finalize
190 exit 0
191 end
192
193 redef fun run
194 do
195 read_cli_options
196 create_tasks
197 distribute_tasks
198 print_results
199 end
200
201 # Cumulated results from workers
202 var results = new ResultSet
203
204 # Maintain communication with workers to distribute tasks and receiver results
205 fun distribute_tasks
206 do
207 var at_work = new Array[Rank]
208
209 # send initial tasks
210 for r in [1..comm_world.size[ do
211 var sent = send_task_to(r.rank)
212 if sent then
213 at_work.add r.rank
214 else
215 mpi.send_empty(r.rank, quit_tag, comm_world)
216 end
217 end
218
219 var status = new Status
220 # await results and send new tasks
221 while not at_work.is_empty do
222 check_signals
223
224 # Double probe to avoid bug with some implementation of MPI
225 mpi.probe(new Rank.any, new Tag.any, comm_world, status)
226 mpi.probe(new Rank.any, new Tag.any, comm_world, status)
227
228 if status.tag == result_tag then
229 # Receive results fron a worker
230 var count = status.count(new DataType.int)
231 mpi.recv_into(buffer, 0, count, status.source, status.tag, comm_world)
232
233 # Parse results from C array to `Result` instances
234 #
235 # Each result is on 4 ints: task id, arg, alt and result.
236 #
237 # See the comments where the data is produced in `Worker::work_on_tasks` for more informationé
238 assert count % 4 == 0
239 for t in (count/4).times do
240 var tt = t*4
241
242 var task_id = buffer[tt]
243 var arg = buffer[tt+1]
244 var alt = buffer[tt+2]
245 var res = buffer[tt+3]
246
247 var result = new Result(tasks[task_id], arg, alt)
248
249 if res == 1 then result.ok = true
250 if res == 2 then result.ok_empty = true
251 if res == 3 then result.no_sav = true
252 if res == 4 then result.fixme = true
253 if res == 5 then result.fail = true
254 if res == 6 then result.soso = true
255 if res == 7 then result.skip = true
256 if res == 8 then result.todo = true
257 if res == 9 then result.skip_exec = true
258 if res == 0 then result.unknown = true
259
260 results.add result
261
262 if verbose > 0 and results.length % 25 == 0 then print_short_results
263 end
264
265 else if status.tag == need_work_tag then
266 # A worker needs more work
267 mpi.recv_empty(status.source, status.tag, comm_world)
268 var sent = send_task_to(status.source)
269 if not sent then
270 # no more work, quit
271 mpi.send_empty(status.source, quit_tag, comm_world)
272 end
273 else if status.tag == done_tag then
274 # A worker is done and will quit
275 mpi.recv_empty(status.source, status.tag, comm_world)
276 at_work.remove(status.source)
277
278 if verbose > 1 then print "worker {status.source} is done ({at_work.length} still at work)"
279 else
280 print "Unexpected tag {status.tag}"
281 shutdown
282 break
283 end
284 end
285 status.free
286 end
287
288 # Send a packet of tasks to worker at `rank`
289 fun send_task_to(rank: Rank): Bool
290 do
291 if next_task_id >= tasks.length then return false
292
293 buffer[0] = next_task_id
294 next_task_id += tasks_per_packet
295
296 mpi.send_from(buffer, 0, 1, rank, task_tag, comm_world)
297
298 if verbose > 1 then print "sent tasks [{buffer[0]}..{next_task_id}[ to worker {rank}"
299 return true
300 end
301
302 # Display the accumulated results received from workers
303 fun print_results
304 do
305 print "# results #"
306 print "* {results.length} total"
307 print "* {results.oks.length + results.ok_empties.length} oks & 0ks"
308 print "* {results.fails.length} fails"
309 print "* {results.no_savs.length} no savs"
310 print "* {results.fixmes.length} fixmes"
311 print "* {results.sosos.length} sosos"
312 print "* {results.skips.length} skips"
313 print "* {results.todos.length} todos"
314 print "* {results.skip_execs.length} skip execs"
315 print "* {results.unknowns.length} unknowns (bug in tests.sh or nitester)"
316 end
317
318 fun print_short_results do print "oks & fails / total: {results.oks.length + results.ok_empties.length} " +
319 "& {results.fails.length} / {results.length}"
320
321 # Shutdown anormaly the running tests
322 fun shutdown
323 do
324 print "Shutting down"
325 mpi.send_empty(new Rank.any, quit_tag, comm_world)
326 end
327 end
328
329 # A worker node which actually execute the tests
330 class Worker
331 super Processor
332
333 # The `Rank` of `self`
334 redef var rank: Rank
335
336 # Compilation directory
337 var comp_dir = "/dev/shm/nit_compile{rank}" is lazy
338
339 # Output file directory
340 var out_dir = "/dev/shm/nit_out{rank}" is lazy
341
342 # Directory to store the xml files produced for Jenkins
343 var xml_dir = "~/jenkins_xml/"
344
345 # Output file of the `tests.sh` script
346 var tests_sh_out = "/dev/shm/nit_local_out{rank}" is lazy
347
348 # Source Nit repository, must be already updated and `make` before execution
349 var nit_source_dir = "~/nit"
350
351 # Compiled `Regex` to detect the argument of an execution
352 var re_arg: Regex = "arg [0-9]+".to_re
353
354 # Compiled `Regex` to detect the alternative of an execution
355 var re_alt: Regex = "_alt[0-9]+".to_re
356
357 redef fun run
358 do
359 read_cli_options
360 setup
361 create_tasks
362 work_on_tasks
363 cleanup
364 end
365
366 # Setup the testing environment
367 #
368 # Clone the nit repository.
369 fun setup
370 do
371 if verbose > 0 then sys.system "hostname"
372 end
373
374 # Clean up the testing environment
375 #
376 # Delete all temporary files, except `ccache_dir`.
377 fun cleanup
378 do
379 if comp_dir.file_exists then comp_dir.rmdir
380 if out_dir.file_exists then out_dir.rmdir
381 if tests_sh_out.file_exists then tests_sh_out.file_delete
382 end
383
384 # Single C `int` to hold the next task id received from the `Controller`
385 var task_buffer = new CIntArray(1)
386
387 # Manage communication with the `Controller` and execute dispatched `Task`s
388 fun work_on_tasks
389 do
390 var status = new Status
391 loop
392 check_signals
393
394 # We double probe to prevent bug where a single probes does not receive the
395 # real next read.
396 mpi.probe(controller_rank, new Tag.any, comm_world, status)
397 mpi.probe(controller_rank, new Tag.any, comm_world, status)
398
399 if status.tag == task_tag then
400 # Receive tasks to execute
401 mpi.recv_into(task_buffer, 0, 1, status.source, status.tag, comm_world)
402 var first_id = task_buffer[0]
403 for task_id in [first_id .. first_id + tasks_per_packet[ do
404
405 # If id is over all known tasks, stop right here
406 if task_id >= tasks.length then break
407 var task = tasks[task_id]
408
409 # Command line to execute test
410 var cmd = "XMLDIR={xml_dir} ERRLIST={out_dir}/errlist TMPDIR={out_dir} " +
411 "CCACHE_DIR={ccache_dir} CCACHE_TEMPDIR={ccache_dir} CCACHE_BASEDIR={comp_dir} " +
412 "./tests.sh --compdir {comp_dir} --outdir {out_dir} " +
413 " --node --engine {task.engine} {task.test_program} > {tests_sh_out}"
414
415 # Execute test
416 sys.system cmd
417
418 # Test results were written to file, read them
419 var fstream = new IFStream.open(tests_sh_out)
420 var content = fstream.read_all
421 fstream.close
422
423 # Parse result and prepare them for sending
424 #
425 # The structure is composed of 4 ints for each result.
426 # 1. task id
427 # 2. arg number
428 # 3. alt number
429 # 4. test result as int
430 var c = results_count
431 for line in content.split('\n') do if not line.is_empty then
432 var cc = c*4
433
434 buffer[cc] = task_id
435
436 var arg_match = line.search(re_arg)
437 var arg = 0
438 if arg_match != null then arg = arg_match.to_s.substring_from(4).to_i
439 buffer[cc+1] = arg
440
441 var alt_match = line.search(re_alt)
442 var alt = 0
443 if alt_match != null then alt = alt_match.to_s.substring_from(4).to_i
444 buffer[cc+2] = alt
445
446 var res = null
447 if line.has("[ok]") then res = 1
448 if line.has("[0k]") then res = 2
449 if line.has("[=== no sav ===]") then res = 3
450 if line.has("[fixme]") then res = 4
451 if line.has("[======= fail") then res = 5
452 if line.has("[======= soso") then res = 6
453 if line.has("[skip]") then res = 7
454 if line.has("[todo]") then res = 8
455 if line.has("[skip exec]") then res = 9
456
457 if res == null then
458 res = 0
459 if verbose > 1 then print "Unknown result: '{line}'"
460 end
461 buffer[cc+3] = res
462
463 c += 1
464
465 if verbose > 2 then print "tests.sh output line: {line}"
466
467 # If result buffer is full, send to `Controller`
468 if c*4 == buffer.length then
469 send_results
470 c = 0
471 end
472 end
473
474 self.results_count = c
475 end
476
477 mpi.send_empty(controller_rank, need_work_tag, comm_world)
478 else if status.tag == quit_tag then
479 # Notification from the `Controller` to quit
480 mpi.recv_empty(status.source, status.tag, comm_world)
481
482 # Send remaining results
483 send_results
484
485 # Notify `Controller` that `self` is done and will quit
486 mpi.send_empty(controller_rank, done_tag, comm_world)
487 break
488 else
489 print "Unexpected tag {status.tag}"
490 break
491 end
492 end
493 status.free
494 end
495
496 # Total results listed in `buffer` and ready to send
497 var results_count = 0
498
499 # Send all results in `buffer` to the `Controller`
500 fun send_results
501 do
502 if results_count > 0 then
503 if verbose > 1 then print "sending {results_count} results"
504 mpi.send_from(buffer, 0, results_count*4, controller_rank, result_tag, comm_world)
505 results_count = 0
506 end
507 end
508
509 redef fun receive_signal(signal)
510 do
511 cleanup
512 mpi.finalize
513 exit 0
514 end
515 end
516
517 # A single test task, on a `test_program` with an `engine`
518 #
519 # Note that a task may involve more than one program to test considering the
520 # alts and args for the `test_program`.
521 class Task
522 # Engine to test executing `test_program`
523 var engine: String
524
525 # Program to execute with `engine`
526 var test_program: String
527
528 redef fun to_s do return "{engine} {test_program}"
529 end
530
531 # Result of a `Task`
532 #
533 # There may be more than one result per `Task`.
534 class Result
535 # `Task` associated to `self`
536 var task: Task
537
538 # Argument index of the execution resulting in `self`
539 var arg: Int
540
541 # Alternative index of the execution resulting in `self`
542 var alt: Int
543
544 # Is `self` result an _ok_?
545 var ok = false
546
547 # Is `self` result an _0k_?
548 var ok_empty = false
549
550 # Is `self` result a _no sav_?
551 var no_sav = false
552
553 # Is `self` result a _fixme_?
554 var fixme = false
555
556 # Is `self` result a _fail_?
557 var fail = false
558
559 # Is `self` result a _soso_?
560 var soso = false
561
562 # Has `self` been skipped?
563 var skip = false
564
565 # Is `self` TODO?
566 var todo = false
567
568 # Has the execution of `self` been skipped?
569 var skip_exec = false
570
571 # Is `self` an unknown result, probably an error
572 var unknown = false
573
574 redef fun to_s
575 do
576 var err = "Unknown"
577 if no_sav then err = "no sav"
578 if ok then err = "ok"
579 if ok_empty then err = "0k"
580 if fixme then err = "fixme"
581 if fail then err = "fail"
582 if soso then err = "soso"
583 if skip then err = "skip"
584 if todo then err = "todo"
585 if skip_exec then err = "skip_exec"
586
587 return "{task} arg{arg} alt{alt} => {err}"
588 end
589 end
590
591 # A global and sorted collection of `Result`
592 class ResultSet
593 super HashSet[Result]
594
595 var no_savs = new HashSet[Result]
596 var oks = new HashSet[Result]
597 var ok_empties = new HashSet[Result]
598 var fixmes = new HashSet[Result]
599 var fails = new HashSet[Result]
600 var sosos = new HashSet[Result]
601 var skips = new HashSet[Result]
602 var todos = new HashSet[Result]
603 var skip_execs = new HashSet[Result]
604 var unknowns = new HashSet[Result]
605
606 # TODO remove
607 var per_engines = new HashMap[String, Result]
608
609 redef fun add(result)
610 do
611 if result.no_sav then no_savs.add result
612 if result.ok then oks.add result
613 if result.ok_empty then ok_empties.add result
614 if result.fixme then fixmes.add result
615 if result.fail then fails.add result
616 if result.soso then sosos.add result
617 if result.skip then skips.add result
618 if result.todo then todos.add result
619 if result.skip_exec then skip_execs.add result
620 if result.unknown then unknowns.add result
621
622 super
623 end
624
625 redef fun remove(r) do abort
626
627 redef fun clear do abort
628 end
629
630 redef class OptionContext
631
632 # Print usage with a possible error `message`
633 private fun usage_error(message: nullable String)
634 do
635 var ret = 0
636 if message != null then
637 print "Error: {message}"
638 ret = 1
639 end
640
641 if comm_world.rank == 0 then
642 print "Usage: mpirun nitester [Options] test_program.nit [other_test.nit [...]]"
643 usage
644 end
645
646 mpi.finalize
647 exit ret
648 end
649 end
650
651 # On `Worker` nodes, prefix all prints with `rank/comm_world.size`
652 redef fun print(msg: Object)
653 do
654 if comm_world.rank != 0.rank then
655 super "{comm_world.rank}/{comm_world.size}: {msg}"
656 else super msg
657 end
658
659 # Running MPI instance
660 fun mpi: MPI do return once new MPI
661
662 # Launch mpi
663 mpi
664
665 # Local rank
666 var rank = comm_world.rank
667
668 var processor: Processor
669 if rank == 0.rank then
670 # If rank == 0, this is the `Controller`
671 processor = new Controller
672 else
673 # This is a worker
674 processor = new Worker(rank)
675 end
676 processor.run
677
678 mpi.finalize