1 # This file is part of NIT (http://www.nitlanguage.org).
3 # Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # Implementation of the Message Passing Interface protocol by wrapping OpenMPI
19 # OpenMPI is used only at linking and for it's `mpi.h`. Other implementations
20 # could be used without much modification.
22 # Supports transfer of any valid `Serializable` instances as well as basic
23 # C arrays defined in module `c`. Using C arrays is encouraged when performance
26 # Since this module is a thin wrapper around OpenMPI, in case of missing
27 # documentation, you can refer to https://www.open-mpi.org/doc/v1.8/.
29 cflags exec
("mpicc", "-showme:compile")
30 ldflags exec
("mpicc", "-showme:link")
34 intrude import core
::text
::flat
42 # Handle to most MPI services
44 # Initialize the MPI execution environment
47 private fun native_init
`{ MPI_Init(NULL, NULL); `}
49 # Terminates the MPI execution environment
50 fun finalize `{ MPI_Finalize(); `}
52 # Name of this processor, usually the hostname
53 fun processor_name
: String import CString.to_s_with_length
`{
54 char *name = malloc(MPI_MAX_PROCESSOR_NAME);
56 MPI_Get_processor_name(name, &size);
57 return CString_to_s_with_length(name, size);
60 # Send the content of a buffer
61 fun send_from
(buffer
: Sendable, at
, count
: Int, dest
: Rank, tag
: Tag, comm
: Comm)
63 buffer
.send
(self, at
, count
, dest
, tag
, comm
)
66 # Send the full content of a buffer
67 fun send_all
(buffer
: Sendable, dest
: Rank, tag
: Tag, comm
: Comm)
69 buffer
.send_all
(self, dest
, tag
, comm
)
72 # Efficiently receive data in an existing buffer
73 fun recv_into
(buffer
: Receptacle, at
, count
: Int, source
: Rank, tag
: Tag, comm
: Comm)
75 buffer
.recv
(self, at
, count
, source
, tag
, comm
)
78 # Efficiently receive data and fill an existing buffer
79 fun recv_fill
(buffer
: Receptacle, source
: Rank, tag
: Tag, comm
: Comm)
81 buffer
.recv_fill
(self, source
, tag
, comm
)
84 # Send a complex `Serializable` object
85 fun send
(data
: nullable Serializable, dest
: Rank, tag
: Tag, comm
: Comm)
88 var stream
= new StringWriter
89 var serializer
= new JsonSerializer(stream
)
90 serializer
.serialize
(data
)
94 send_from
(str
, 0, str
.length
, dest
, tag
, comm
)
97 # Receive a complex object
98 fun recv
(source
: Rank, tag
: Tag, comm
: Comm): nullable Object
100 var status
= new Status
102 # Block until a message in in queue
103 var err
= probe
(source
, tag
, comm
, status
)
104 assert err
.is_success
else print err
107 var count
= status
.count
(new DataType.char
)
108 assert not count
.is_undefined
110 # Receive message into buffer
111 var buffer
= new FlatBuffer.with_capacity
(count
)
112 recv_into
(buffer
, 0, count
, status
.source
, status
.tag
, comm
)
117 # Deserialize message
118 var deserializer
= new JsonDeserializer(buffer
)
119 var deserialized
= deserializer
.deserialize
121 if deserialized
== null then print
"|{buffer}|{buffer.chars.join("-")}| {buffer.length}"
126 # Send an empty buffer, only for the `tag`
127 fun send_empty
(dest
: Rank, tag
: Tag, comm
: Comm): SuccessOrError
129 return MPI_Send(NULL, 0, MPI_CHAR, dest, tag, comm);
132 # Receive an empty buffer, only for the `tag`
133 fun recv_empty
(dest
: Rank, tag
: Tag, comm
: Comm): SuccessOrError
135 return MPI_Recv(NULL, 0, MPI_CHAR, dest, tag, comm, MPI_STATUS_IGNORE);
138 # Send a `NativeCArray` `buffer` with a given `count` of `data_type`
139 fun native_send
(buffer
: NativeCArray, count
: Int, data_type
: DataType, dest
: Rank, tag
: Tag, comm
: Comm): SuccessOrError
141 return MPI_Send(buffer, count, data_type, dest, tag, comm);
144 # Receive into a `NativeCArray` `buffer` with a given `count` of `data_type`
145 fun native_recv
(buffer
: NativeCArray, count
: Int, data_type
: DataType, dest
: Rank, tag
: Tag, comm
: Comm, status
: Status): SuccessOrError
147 return MPI_Recv(buffer, count, data_type, dest, tag, comm, status);
150 # Probe for the next data to receive, store the result in `status`
152 # Note: If you encounter an error where the next receive does not correspond
153 # to the last `probe`, call this method twice to ensure a correct result.
154 fun probe
(source
: Rank, tag
: Tag, comm
: Comm, status
: Status): SuccessOrError
156 return MPI_Probe(source, tag, comm, status);
159 # Synchronize all processors
160 fun barrier
(comm
: Comm) `{ MPI_Barrier(comm); `}
162 # Seconds since some time in the past which does not change
163 fun wtime: Float `{ return MPI_Wtime(); `}
166 # An MPI communicator
167 extern class Comm `{ MPI_Comm `}
168 # The _null_ communicator, targeting no processors
169 new null_ `{ return MPI_COMM_NULL; `}
171 # The _world_ communicator, targeting all processors
172 new world
`{ return MPI_COMM_WORLD; `}
174 # The _self_ communicator, targeting this processor only
175 new self_ `{ return MPI_COMM_SELF; `}
177 # Number of processors in this communicator
180 MPI_Comm_size(self, &size);
184 # Rank on this processor in this communicator
187 MPI_Comm_rank(self, &rank);
193 extern class DataType `{ MPI_Datatype `}
195 new char `{ return MPI_CHAR; `}
198 new short
`{ return MPI_SHORT; `}
201 new int `{ return MPI_INT; `}
204 new long
`{ return MPI_LONG; `}
206 # Get a MPI long long.
207 new long_long `{ return MPI_LONG_LONG; `}
209 # Get a MPI unsigned char.
210 new unsigned_char
`{ return MPI_UNSIGNED_CHAR; `}
212 # Get a MPI unsigned short.
213 new unsigned_short `{ return MPI_UNSIGNED_SHORT; `}
215 # Get a MPI unsigned int.
216 new unsigned
`{ return MPI_UNSIGNED; `}
218 # Get a MPI unsigned long.
219 new unsigned_long `{ return MPI_UNSIGNED_LONG; `}
221 # Get a MPI unsigned long long.
222 new unsigned_long_long
`{ return MPI_UNSIGNED_LONG_LONG; `}
225 new float `{ return MPI_FLOAT; `}
228 new double
`{ return MPI_DOUBLE; `}
230 # Get a MPI long double.
231 new long_double `{ return MPI_LONG_DOUBLE; `}
234 new byte
`{ return MPI_BYTE; `}
237 # Status of a communication used by `MPI::probe
`
238 extern class Status `{ MPI_Status* `}
239 # Ignore the resulting status
240 new ignore
`{ return MPI_STATUS_IGNORE; `}
242 # Allocated a new `Status`, must be freed with `free
`
243 new `{ return malloc(sizeof(MPI_Status)); `}
245 # Source of this communication
246 fun source
: Rank `{ return self->MPI_SOURCE; `}
248 # Tag of this communication
249 fun tag: Tag `{ return self->MPI_TAG; `}
251 # Success or error on this communication
252 fun error
: SuccessOrError `{ return self->MPI_ERROR; `}
254 # Count of the given `data_type
` in this communication
255 fun count(data_type: DataType): Int
258 MPI_Get_count(self, data_type
, &count
);
265 # Used with the `reduce
` method.
267 # See <http://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node78.html>
268 extern class Op `{ MPI_Op `}
269 # Get a MPI null operation.
270 new op_null
`{ return MPI_OP_NULL; `}
272 # Get a MPI maximum operation.
273 new max `{ return MPI_MAX; `}
275 # Get a MPI minimum operation.
276 new min
`{ return MPI_MIN; `}
278 # Get a MPI sum operation.
279 new sum `{ return MPI_SUM; `}
281 # Get a MPI product operation.
282 new prod
`{ return MPI_PROD; `}
284 # Get a MPI logical and operation.
285 new land `{ return MPI_LAND; `}
287 # Get a MPI bit-wise and operation.
288 new band
`{ return MPI_BAND; `}
290 # Get a MPI logical or operation.
291 new lor `{ return MPI_LOR; `}
293 # Get a MPI bit-wise or operation.
294 new bor
`{ return MPI_BOR; `}
296 # Get a MPI logical xor operation.
297 new lxor `{ return MPI_LXOR; `}
299 # Get a MPI bit-wise xor operation.
300 new bxor
`{ return MPI_BXOR; `}
302 # Get a MPI minloc operation.
304 # Used to compute a global minimum and also an index attached
305 # to the minimum value.
307 # See <http://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node79.html#Node79>
308 new minloc `{ return MPI_MINLOC; `}
310 # Get a MPI maxloc operation.
312 # Used to compute a global maximum and also an index attached
313 # to the maximum value.
315 # See <http://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node79.html#Node79>
316 new maxloc
`{ return MPI_MAXLOC; `}
318 # Get a MPI replace operation.
319 new replace `{ return MPI_REPLACE; `}
322 # An MPI return code to report success or errors
323 extern class SuccessOrError `{ int `}
325 fun is_success: Bool `{ return self == MPI_SUCCESS; `}
328 fun is_error
: Bool do return not is_success
330 # TODO add is_... for each variant
332 # The class of this error
333 fun error_class
: ErrorClass
336 MPI_Error_class(self, &class);
340 redef fun to_s
do return native_to_s
.to_s
341 private fun native_to_s
: CString `{
342 char *err = malloc(MPI_MAX_ERROR_STRING);
343 MPI_Error_string(self, err, NULL);
349 extern class ErrorClass `{ int `}
350 redef fun to_s do return native_to_s.to_s
351 private fun native_to_s: CString `{
352 char
*err
= malloc
(MPI_MAX_ERROR_STRING);
353 MPI_Error_string(self, err
, NULL);
358 # An MPI rank within a communcator
359 extern class Rank `{ int `}
360 # Special rank accepting any processor
361 new any
`{ return MPI_ANY_SOURCE; `}
363 # This Rank as an `Int`
364 fun to_i: Int `{ return self; `}
365 redef fun to_s
do return to_i
.to_s
368 # An MPI tag, can be defined using `Int::tag`
369 extern class Tag `{ int `}
370 # Special tag accepting any tag
371 new any `{ return MPI_ANY_TAG; `}
373 # This tag as an `Int`
374 fun to_i
: Int `{ return self; `}
375 redef fun to_s do return to_i.to_s
380 fun rank: Rank `{ return self; `}
382 # Tag identified by `self`
383 fun tag
: Tag `{ return self; `}
385 # Is this value undefined according to MPI? (may be returned by `Status::count
`)
386 fun is_undefined: Bool `{ return self == MPI_UNDEFINED; `}
389 # Something sendable directly and efficiently over MPI
391 # Subclasses of `Sendable` should use the native MPI send function, without
392 # using Nit serialization.
394 # Type specific send over MPI
395 protected fun send
(mpi
: MPI, at
, count
: Int, dest
: Rank, tag
: Tag, comm
: Comm) is abstract
397 # Type specific send full buffer over MPI
398 protected fun send_all
(mpi
: MPI, dest
: Rank, tag
: Tag, comm
: Comm) is abstract
402 # Something which can receive data directly and efficiently from MPI
404 # Subclasses of `Receptacle` should use the native MPI recveive function,
405 # without using Nit serialization.
407 # Type specific receive from MPI
408 protected fun recv
(mpi
: MPI, at
, count
: Int, source
: Rank, tag
: Tag, comm
: Comm) is abstract
410 # Type specific receive and fill buffer from MPI
411 protected fun recv_fill
(mpi
: MPI, source
: Rank, tag
: Tag, comm
: Comm) is abstract
414 redef class CArray[E
]
422 redef fun send
(mpi
, at
, count
, dest
, tag
, comm
)
425 if at
!= 0 or count
!= length
then
426 str
= substring
(at
, count
)
429 mpi
.native_send
(str
.to_cstring
, count
, new DataType.char
,
430 dest
, tag
, new Comm.world
)
433 redef fun send_all
(mpi
, dest
, tag
, comm
) do send
(mpi
, 0, length
, dest
, tag
, comm
)
436 redef class FlatBuffer
439 redef fun recv
(mpi
, at
, count
, source
, tag
, comm
)
441 var min_capacity
= at
+ count
442 if capacity
< min_capacity
then enlarge min_capacity
449 mpi
.native_recv
(array
, count
, new DataType.char
,
450 source
, tag
, new Comm.world
, new Status.ignore
)
455 redef fun recv_fill
(mpi
, dest
, tag
, comm
) do recv
(mpi
, 0, capacity
, dest
, tag
, comm
)
458 redef class CIntArray
459 redef fun send
(mpi
, at
, count
, dest
, tag
, comm
)
463 array
= native_array
+ at
464 else array
= native_array
466 mpi
.native_send
(array
, count
, new DataType.int
,
467 dest
, tag
, new Comm.world
)
470 redef fun send_all
(mpi
, dest
, tag
, comm
) do send
(mpi
, 0, length
, dest
, tag
, comm
)
472 redef fun recv
(mpi
, at
, count
, source
, tag
, comm
)
476 array
= native_array
+ at
477 else array
= native_array
479 mpi
.native_recv
(array
, count
, new DataType.int
,
480 source
, tag
, new Comm.world
, new Status.ignore
)
483 redef fun recv_fill
(mpi
, dest
, tag
, comm
) do recv
(mpi
, 0, length
, dest
, tag
, comm
)
486 # Shortcut to the world communicator (same as `new Comm.world`)
487 fun comm_world
: Comm do return once
new Comm.world