misc/vim: inform the user when no results are found
[nit.git] / lib / mpi.nit
1 # This file is part of NIT (http://www.nitlanguage.org).
2 #
3 # Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 # Implementation of the Message Passing Interface protocol by wrapping OpenMPI
18 #
19 # OpenMPI is used only at linking and for it's `mpi.h`. Other implementations
20 # could be used without much modification.
21 #
22 # Supports transfer of any valid `Serializable` instances as well as basic
23 # C arrays defined in module `c`. Using C arrays is encouraged when performance
24 # is critical.
25 #
26 # Since this module is a thin wrapper around OpenMPI, in case of missing
27 # documentation, you can refer to https://www.open-mpi.org/doc/v1.8/.
28 module mpi is
29 cflags exec("mpicc", "-showme:compile")
30 ldflags exec("mpicc", "-showme:link")
31 end
32
33 import c
34 intrude import standard::string
35 import serialization
36 private import json_serialization
37
38 in "C Header" `{
39 #include <mpi.h>
40 `}
41
42 # Handle to most MPI services
43 class MPI
44 # Initialize the MPI execution environment
45 init do native_init
46
47 private fun native_init `{ MPI_Init(NULL, NULL); `}
48
49 # Terminates the MPI execution environment
50 fun finalize `{ MPI_Finalize(); `}
51
52 # Name of this processor, usually the hostname
53 fun processor_name: String import NativeString.to_s_with_length `{
54 char *name = malloc(MPI_MAX_PROCESSOR_NAME);
55 int size;
56 MPI_Get_processor_name(name, &size);
57 return NativeString_to_s_with_length(name, size);
58 `}
59
60 # Send the content of a buffer
61 fun send_from(buffer: Sendable, at, count: Int, dest: Rank, tag: Tag, comm: Comm)
62 do
63 buffer.send(self, at, count, dest, tag, comm)
64 end
65
66 # Send the full content of a buffer
67 fun send_all(buffer: Sendable, dest: Rank, tag: Tag, comm: Comm)
68 do
69 buffer.send_all(self, dest, tag, comm)
70 end
71
72 # Efficiently receive data in an existing buffer
73 fun recv_into(buffer: Receptacle, at, count: Int, source: Rank, tag: Tag, comm: Comm)
74 do
75 buffer.recv(self, at, count, source, tag, comm)
76 end
77
78 # Efficiently receive data and fill an existing buffer
79 fun recv_fill(buffer: Receptacle, source: Rank, tag: Tag, comm: Comm)
80 do
81 buffer.recv_fill(self, source, tag, comm)
82 end
83
84 # Send a complex `Serializable` object
85 fun send(data: nullable Serializable, dest: Rank, tag: Tag, comm: Comm)
86 do
87 # Serialize data
88 var stream = new StringWriter
89 var serializer = new JsonSerializer(stream)
90 serializer.serialize(data)
91
92 # Send message
93 var str = stream.to_s
94 send_from(str, 0, str.length, dest, tag, comm)
95 end
96
97 # Receive a complex object
98 fun recv(source: Rank, tag: Tag, comm: Comm): nullable Object
99 do
100 var status = new Status
101
102 # Block until a message in in queue
103 var err = probe(source, tag, comm, status)
104 assert err.is_success else print err
105
106 # Get message length
107 var count = status.count(new DataType.char)
108 assert not count.is_undefined
109
110 # Receive message into buffer
111 var buffer = new FlatBuffer.with_capacity(count)
112 recv_into(buffer, 0, count, status.source, status.tag, comm)
113
114 # Free our status
115 status.free
116
117 # Deserialize message
118 var deserializer = new JsonDeserializer(buffer)
119 var deserialized = deserializer.deserialize
120
121 if deserialized == null then print "|{buffer}|{buffer.chars.join("-")}| {buffer.length}"
122
123 return deserialized
124 end
125
126 # Send an empty buffer, only for the `tag`
127 fun send_empty(dest: Rank, tag: Tag, comm: Comm): SuccessOrError
128 `{
129 return MPI_Send(NULL, 0, MPI_CHAR, dest, tag, comm);
130 `}
131
132 # Receive an empty buffer, only for the `tag`
133 fun recv_empty(dest: Rank, tag: Tag, comm: Comm): SuccessOrError
134 `{
135 return MPI_Recv(NULL, 0, MPI_CHAR, dest, tag, comm, MPI_STATUS_IGNORE);
136 `}
137
138 # Send a `NativeCArray` `buffer` with a given `count` of `data_type`
139 fun native_send(buffer: NativeCArray, count: Int, data_type: DataType, dest: Rank, tag: Tag, comm: Comm): SuccessOrError
140 `{
141 return MPI_Send(buffer, count, data_type, dest, tag, comm);
142 `}
143
144 # Receive into a `NativeCArray` `buffer` with a given `count` of `data_type`
145 fun native_recv(buffer: NativeCArray, count: Int, data_type: DataType, dest: Rank, tag: Tag, comm: Comm, status: Status): SuccessOrError
146 `{
147 return MPI_Recv(buffer, count, data_type, dest, tag, comm, status);
148 `}
149
150 # Probe for the next data to receive, store the result in `status`
151 #
152 # Note: If you encounter an error where the next receive does not correspond
153 # to the last `probe`, call this method twice to ensure a correct result.
154 fun probe(source: Rank, tag: Tag, comm: Comm, status: Status): SuccessOrError
155 `{
156 return MPI_Probe(source, tag, comm, status);
157 `}
158
159 # Synchronize all processors
160 fun barrier(comm: Comm) `{ MPI_Barrier(comm); `}
161
162 # Seconds since some time in the past which does not change
163 fun wtime: Float `{ return MPI_Wtime(); `}
164 end
165
166 # An MPI communicator
167 extern class Comm `{ MPI_Comm `}
168 # The _null_ communicator, targeting no processors
169 new null_ `{ return MPI_COMM_NULL; `}
170
171 # The _world_ communicator, targeting all processors
172 new world `{ return MPI_COMM_WORLD; `}
173
174 # The _self_ communicator, targeting this processor only
175 new self_ `{ return MPI_COMM_SELF; `}
176
177 # Number of processors in this communicator
178 fun size: Int `{
179 int size;
180 MPI_Comm_size(recv, &size);
181 return size;
182 `}
183
184 # Rank on this processor in this communicator
185 fun rank: Rank `{
186 int rank;
187 MPI_Comm_rank(recv, &rank);
188 return rank;
189 `}
190 end
191
192 # An MPI data type
193 extern class DataType `{ MPI_Datatype `}
194 # Get a MPI char.
195 new char `{ return MPI_CHAR; `}
196
197 # Get a MPI short.
198 new short `{ return MPI_SHORT; `}
199
200 # Get a MPI int.
201 new int `{ return MPI_INT; `}
202
203 # Get a MPI long.
204 new long `{ return MPI_LONG; `}
205
206 # Get a MPI long long.
207 new long_long `{ return MPI_LONG_LONG; `}
208
209 # Get a MPI unsigned char.
210 new unsigned_char `{ return MPI_UNSIGNED_CHAR; `}
211
212 # Get a MPI unsigned short.
213 new unsigned_short `{ return MPI_UNSIGNED_SHORT; `}
214
215 # Get a MPI unsigned int.
216 new unsigned `{ return MPI_UNSIGNED; `}
217
218 # Get a MPI unsigned long.
219 new unsigned_long `{ return MPI_UNSIGNED_LONG; `}
220
221 # Get a MPI unsigned long long.
222 new unsigned_long_long `{ return MPI_UNSIGNED_LONG_LONG; `}
223
224 # Get a MPI float.
225 new float `{ return MPI_FLOAT; `}
226
227 # Get a MPI double.
228 new double `{ return MPI_DOUBLE; `}
229
230 # Get a MPI long double.
231 new long_double `{ return MPI_LONG_DOUBLE; `}
232
233 # Get a MPI byte.
234 new byte `{ return MPI_BYTE; `}
235 end
236
237 # Status of a communication used by `MPI::probe`
238 extern class Status `{ MPI_Status* `}
239 # Ignore the resulting status
240 new ignore `{ return MPI_STATUS_IGNORE; `}
241
242 # Allocated a new `Status`, must be freed with `free`
243 new `{ return malloc(sizeof(MPI_Status)); `}
244
245 # Source of this communication
246 fun source: Rank `{ return recv->MPI_SOURCE; `}
247
248 # Tag of this communication
249 fun tag: Tag `{ return recv->MPI_TAG; `}
250
251 # Success or error on this communication
252 fun error: SuccessOrError `{ return recv->MPI_ERROR; `}
253
254 # Count of the given `data_type` in this communication
255 fun count(data_type: DataType): Int
256 `{
257 int count;
258 MPI_Get_count(recv, data_type, &count);
259 return count;
260 `}
261 end
262
263 # An MPI operation
264 #
265 # Used with the `reduce` method.
266 #
267 # See <http://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node78.html>
268 extern class Op `{ MPI_Op `}
269 # Get a MPI null operation.
270 new op_null `{ return MPI_OP_NULL; `}
271
272 # Get a MPI maximum operation.
273 new max `{ return MPI_MAX; `}
274
275 # Get a MPI minimum operation.
276 new min `{ return MPI_MIN; `}
277
278 # Get a MPI sum operation.
279 new sum `{ return MPI_SUM; `}
280
281 # Get a MPI product operation.
282 new prod `{ return MPI_PROD; `}
283
284 # Get a MPI logical and operation.
285 new land `{ return MPI_LAND; `}
286
287 # Get a MPI bit-wise and operation.
288 new band `{ return MPI_BAND; `}
289
290 # Get a MPI logical or operation.
291 new lor `{ return MPI_LOR; `}
292
293 # Get a MPI bit-wise or operation.
294 new bor `{ return MPI_BOR; `}
295
296 # Get a MPI logical xor operation.
297 new lxor `{ return MPI_LXOR; `}
298
299 # Get a MPI bit-wise xor operation.
300 new bxor `{ return MPI_BXOR; `}
301
302 # Get a MPI minloc operation.
303 #
304 # Used to compute a global minimum and also an index attached
305 # to the minimum value.
306 #
307 # See <http://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node79.html#Node79>
308 new minloc `{ return MPI_MINLOC; `}
309
310 # Get a MPI maxloc operation.
311 #
312 # Used to compute a global maximum and also an index attached
313 # to the maximum value.
314 #
315 # See <http://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node79.html#Node79>
316 new maxloc `{ return MPI_MAXLOC; `}
317
318 # Get a MPI replace operation.
319 new replace `{ return MPI_REPLACE; `}
320 end
321
322 # An MPI return code to report success or errors
323 extern class SuccessOrError `{ int `}
324 # Is this a success?
325 fun is_success: Bool `{ return recv == MPI_SUCCESS; `}
326
327 # Is this an error?
328 fun is_error: Bool do return not is_success
329
330 # TODO add is_... for each variant
331
332 # The class of this error
333 fun error_class: ErrorClass
334 `{
335 int class;
336 MPI_Error_class(recv, &class);
337 return class;
338 `}
339
340 redef fun to_s do return native_to_s.to_s
341 private fun native_to_s: NativeString `{
342 char *err = malloc(MPI_MAX_ERROR_STRING);
343 MPI_Error_string(recv, err, NULL);
344 return err;
345 `}
346 end
347
348 # An MPI error class
349 extern class ErrorClass `{ int `}
350 redef fun to_s do return native_to_s.to_s
351 private fun native_to_s: NativeString `{
352 char *err = malloc(MPI_MAX_ERROR_STRING);
353 MPI_Error_string(recv, err, NULL);
354 return err;
355 `}
356 end
357
358 # An MPI rank within a communcator
359 extern class Rank `{ int `}
360 # Special rank accepting any processor
361 new any `{ return MPI_ANY_SOURCE; `}
362
363 # This Rank as an `Int`
364 fun to_i: Int `{ return recv; `}
365 redef fun to_s do return to_i.to_s
366 end
367
368 # An MPI tag, can be defined using `Int::tag`
369 extern class Tag `{ int `}
370 # Special tag accepting any tag
371 new any `{ return MPI_ANY_TAG; `}
372
373 # This tag as an `Int`
374 fun to_i: Int `{ return recv; `}
375 redef fun to_s do return to_i.to_s
376 end
377
378 redef universal Int
379 # `self`th MPI rank
380 fun rank: Rank `{ return recv; `}
381
382 # Tag identified by `self`
383 fun tag: Tag `{ return recv; `}
384
385 # Is this value undefined according to MPI? (may be returned by `Status::count`)
386 fun is_undefined: Bool `{ return recv == MPI_UNDEFINED; `}
387 end
388
389 # Something sendable directly and efficiently over MPI
390 #
391 # Subclasses of `Sendable` should use the native MPI send function, without
392 # using Nit serialization.
393 interface Sendable
394 # Type specific send over MPI
395 protected fun send(mpi: MPI, at, count: Int, dest: Rank, tag: Tag, comm: Comm) is abstract
396
397 # Type specific send full buffer over MPI
398 protected fun send_all(mpi: MPI, dest: Rank, tag: Tag, comm: Comm) is abstract
399 end
400
401
402 # Something which can receive data directly and efficiently from MPI
403 #
404 # Subclasses of `Receptacle` should use the native MPI recveive function,
405 # without using Nit serialization.
406 interface Receptacle
407 # Type specific receive from MPI
408 protected fun recv(mpi: MPI, at, count: Int, source: Rank, tag: Tag, comm: Comm) is abstract
409
410 # Type specific receive and fill buffer from MPI
411 protected fun recv_fill(mpi: MPI, source: Rank, tag: Tag, comm: Comm) is abstract
412 end
413
414 redef class CArray[E]
415 super Sendable
416 super Receptacle
417 end
418
419 redef class Text
420 super Sendable
421
422 redef fun send(mpi, at, count, dest, tag, comm)
423 do
424 var str
425 if at != 0 or count != length then
426 str = substring(at, count)
427 else str = self
428
429 mpi.native_send(str.to_cstring, count, new DataType.char,
430 dest, tag, new Comm.world)
431 end
432
433 redef fun send_all(mpi, dest, tag, comm) do send(mpi, 0, length, dest, tag, comm)
434 end
435
436 redef class FlatBuffer
437 super Receptacle
438
439 redef fun recv(mpi, at, count, source, tag, comm)
440 do
441 var min_capacity = at + count
442 if capacity < min_capacity then enlarge min_capacity
443
444 var array
445 if at != 0 then
446 array = items + at
447 else array = items
448
449 mpi.native_recv(array, count, new DataType.char,
450 source, tag, new Comm.world, new Status.ignore)
451
452 length = capacity
453 is_dirty = true
454 end
455
456 redef fun recv_fill(mpi, dest, tag, comm) do recv(mpi, 0, capacity, dest, tag, comm)
457 end
458
459 redef class CIntArray
460 redef fun send(mpi, at, count, dest, tag, comm)
461 do
462 var array
463 if at != 0 then
464 array = native_array + at
465 else array = native_array
466
467 mpi.native_send(array, count, new DataType.int,
468 dest, tag, new Comm.world)
469 end
470
471 redef fun send_all(mpi, dest, tag, comm) do send(mpi, 0, length, dest, tag, comm)
472
473 redef fun recv(mpi, at, count, source, tag, comm)
474 do
475 var array
476 if at != 0 then
477 array = native_array + at
478 else array = native_array
479
480 mpi.native_recv(array, count, new DataType.int,
481 source, tag, new Comm.world, new Status.ignore)
482 end
483
484 redef fun recv_fill(mpi, dest, tag, comm) do recv(mpi, 0, length, dest, tag, comm)
485 end
486
487 # Shortcut to the world communicator (same as `new Comm.world`)
488 fun comm_world: Comm do return once new Comm.world