1 # This file is part of NIT (http://www.nitlanguage.org).
3 # Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # Implementation of the Message Passing Interface protocol by wrapping OpenMPI
19 # OpenMPI is used only at linking and for it's `mpi.h`. Other implementations
20 # could be used without much modification.
22 # Supports transfer of any valid `Serializable` instances as well as basic
23 # C arrays defined in module `c`. Using C arrays is encouraged when performance
26 # Since this module is a thin wrapper around OpenMPI, in case of missing
27 # documentation, you can refer to https://www.open-mpi.org/doc/v1.8/.
29 c_compiler_option
(exec
("mpicc", "-showme:compile"))
30 c_linker_option
(exec
("mpicc", "-showme:link"))
34 intrude import standard
::string
36 private import json_serialization
42 # Handle to most MPI services
44 # Initialize the MPI execution environment
47 private fun native_init
`{ MPI_Init(NULL, NULL); `}
49 # Terminates the MPI execution environment
50 fun finalize `{ MPI_Finalize(); `}
52 # Name of this processor, usually the hostname
53 fun processor_name
: String import NativeString.to_s_with_length
`{
54 char *name = malloc(MPI_MAX_PROCESSOR_NAME);
56 MPI_Get_processor_name(name, &size);
57 return NativeString_to_s_with_length(name, size);
60 # Send the content of a buffer
61 fun send_from
(buffer
: Sendable, at
, count
: Int, dest
: Rank, tag
: Tag, comm
: Comm)
63 buffer
.send
(self, at
, count
, dest
, tag
, comm
)
66 # Send the full content of a buffer
67 fun send_all
(buffer
: Sendable, dest
: Rank, tag
: Tag, comm
: Comm)
69 buffer
.send_all
(self, dest
, tag
, comm
)
72 # Efficiently receive data in an existing buffer
73 fun recv_into
(buffer
: Receptacle, at
, count
: Int, source
: Rank, tag
: Tag, comm
: Comm)
75 buffer
.recv
(self, at
, count
, source
, tag
, comm
)
78 # Efficiently receive data and fill an existing buffer
79 fun recv_fill
(buffer
: Receptacle, source
: Rank, tag
: Tag, comm
: Comm)
81 buffer
.recv_fill
(self, source
, tag
, comm
)
84 # Send a complex `Serializable` object
85 fun send
(data
: nullable Serializable, dest
: Rank, tag
: Tag, comm
: Comm)
88 var stream
= new StringOStream
89 var serializer
= new JsonSerializer(stream
)
90 serializer
.serialize
(data
)
94 send_from
(str
, 0, str
.length
, dest
, tag
, comm
)
97 # Receive a complex object
98 fun recv
(source
: Rank, tag
: Tag, comm
: Comm): nullable Object
100 var status
= new Status
102 # Block until a message in in queue
103 var err
= probe
(source
, tag
, comm
, status
)
104 assert err
.is_success
else print err
107 var count
= status
.count
(new DataType.char
)
108 assert not count
.is_undefined
110 # Receive message into buffer
111 var buffer
= new FlatBuffer.with_capacity
(count
)
112 recv_into
(buffer
, 0, count
, status
.source
, status
.tag
, comm
)
117 # Deserialize message
118 var deserializer
= new JsonDeserializer(buffer
)
119 var deserialized
= deserializer
.deserialize
121 if deserialized
== null then print
"|{buffer}|{buffer.chars.join("-")}| {buffer.length}"
126 # Send an empty buffer, only for the `tag`
127 fun send_empty
(dest
: Rank, tag
: Tag, comm
: Comm): SuccessOrError
129 return MPI_Send(NULL, 0, MPI_CHAR, dest, tag, comm);
132 # Receive an empty buffer, only for the `tag`
133 fun recv_empty
(dest
: Rank, tag
: Tag, comm
: Comm): SuccessOrError
135 return MPI_Recv(NULL, 0, MPI_CHAR, dest, tag, comm, MPI_STATUS_IGNORE);
138 # Send a `NativeCArray` `buffer` with a given `count` of `data_type`
139 fun native_send
(buffer
: NativeCArray, count
: Int, data_type
: DataType, dest
: Rank, tag
: Tag, comm
: Comm): SuccessOrError
141 return MPI_Send(buffer, count, data_type, dest, tag, comm);
144 # Receive into a `NativeCArray` `buffer` with a given `count` of `data_type`
145 fun native_recv
(buffer
: NativeCArray, count
: Int, data_type
: DataType, dest
: Rank, tag
: Tag, comm
: Comm, status
: Status): SuccessOrError
147 return MPI_Recv(buffer, count, data_type, dest, tag, comm, status);
150 # Probe for the next data to receive, store the result in `status`
152 # Note: If you encounter an error where the next receive does not correspond
153 # to the last `probe`, call this method twice to ensure a correct result.
154 fun probe
(source
: Rank, tag
: Tag, comm
: Comm, status
: Status): SuccessOrError
156 return MPI_Probe(source, tag, comm, status);
159 # Synchronize all processors
160 fun barrier
(comm
: Comm) `{ MPI_Barrier(comm); `}
162 # Seconds since some time in the past which does not change
163 fun wtime: Float `{ return MPI_Wtime(); `}
166 # An MPI communicator
167 extern class Comm `{ MPI_Comm `}
168 # The _null_ communicator, targeting no processors
169 new null_ `{ return MPI_COMM_NULL; `}
171 # The _world_ communicator, targeting all processors
172 new world
`{ return MPI_COMM_WORLD; `}
174 # The _self_ communicator, targeting this processor only
175 new self_ `{ return MPI_COMM_SELF; `}
177 # Number of processors in this communicator
180 MPI_Comm_size(recv, &size);
184 # Rank on this processor in this communicator
187 MPI_Comm_rank(recv, &rank);
193 extern class DataType `{ MPI_Datatype `}
194 new char `{ return MPI_CHAR; `}
195 new short
`{ return MPI_SHORT; `}
196 new int `{ return MPI_INT; `}
197 new long
`{ return MPI_LONG; `}
198 new long_long `{ return MPI_LONG_LONG; `}
199 new unsigned_char
`{ return MPI_UNSIGNED_CHAR; `}
200 new unsigned_short `{ return MPI_UNSIGNED_SHORT; `}
201 new unsigned
`{ return MPI_UNSIGNED; `}
202 new unsigned_long `{ return MPI_UNSIGNED_LONG; `}
203 new unsigned_long_long
`{ return MPI_UNSIGNED_LONG_LONG; `}
204 new float `{ return MPI_FLOAT; `}
205 new double
`{ return MPI_DOUBLE; `}
206 new long_double `{ return MPI_LONG_DOUBLE; `}
207 new byte
`{ return MPI_BYTE; `}
210 # Status of a communication used by `MPI::probe
`
211 extern class Status `{ MPI_Status* `}
212 # Ignore the resulting status
213 new ignore
`{ return MPI_STATUS_IGNORE; `}
215 # Allocated a new `Status`, must be freed with `free
`
216 new `{ return malloc(sizeof(MPI_Status)); `}
218 # Source of this communication
219 fun source
: Rank `{ return recv->MPI_SOURCE; `}
221 # Tag of this communication
222 fun tag: Tag `{ return recv->MPI_TAG; `}
224 # Success or error on this communication
225 fun error
: SuccessOrError `{ return recv->MPI_ERROR; `}
227 # Count of the given `data_type
` in this communication
228 fun count(data_type: DataType): Int
231 MPI_Get_count(recv
, data_type
, &count
);
238 # Used with the `reduce
` method
239 extern class Op `{ MPI_Op `}
240 new op_null
`{ return MPI_OP_NULL; `}
241 new max `{ return MPI_MAX; `}
242 new min
`{ return MPI_MIN; `}
243 new sum `{ return MPI_SUM; `}
244 new prod
`{ return MPI_PROD; `}
245 new land `{ return MPI_LAND; `}
246 new band
`{ return MPI_BAND; `}
247 new lor `{ return MPI_LOR; `}
248 new bor
`{ return MPI_BOR; `}
249 new lxor `{ return MPI_LXOR; `}
250 new bxor
`{ return MPI_BXOR; `}
251 new minloc `{ return MPI_MINLOC; `}
252 new maxloc
`{ return MPI_MAXLOC; `}
253 new replace `{ return MPI_REPLACE; `}
256 # An MPI return code to report success or errors
257 extern class SuccessOrError `{ int `}
259 fun is_success: Bool `{ return recv == MPI_SUCCESS; `}
262 fun is_error
: Bool do return not is_success
264 # TODO add is_... for each variant
266 # The class of this error
267 fun error_class
: ErrorClass
270 MPI_Error_class(recv, &class);
274 redef fun to_s
do return native_to_s
.to_s
275 private fun native_to_s
: NativeString `{
276 char *err = malloc(MPI_MAX_ERROR_STRING);
277 MPI_Error_string(recv, err, NULL);
283 extern class ErrorClass `{ int `}
284 redef fun to_s do return native_to_s.to_s
285 private fun native_to_s: NativeString `{
286 char
*err
= malloc
(MPI_MAX_ERROR_STRING);
287 MPI_Error_string(recv
, err
, NULL);
292 # An MPI rank within a communcator
293 extern class Rank `{ int `}
294 # Special rank accepting any processor
295 new any
`{ return MPI_ANY_SOURCE; `}
297 # This Rank as an `Int`
298 fun to_i: Int `{ return recv; `}
299 redef fun to_s
do return to_i
.to_s
302 # An MPI tag, can be defined using `Int::tag`
303 extern class Tag `{ int `}
304 # Special tag accepting any tag
305 new any `{ return MPI_ANY_TAG; `}
307 # This tag as an `Int`
308 fun to_i
: Int `{ return recv; `}
309 redef fun to_s do return to_i.to_s
314 fun rank: Rank `{ return recv; `}
316 # Tag identified by `self`
317 fun tag
: Tag `{ return recv; `}
319 # Is this value undefined according to MPI? (may be returned by `Status::count
`)
320 fun is_undefined: Bool `{ return recv == MPI_UNDEFINED; `}
323 # Something sendable directly and efficiently over MPI
325 # Subclasses of `Sendable` should use the native MPI send function, without
326 # using Nit serialization.
328 # Type specific send over MPI
329 protected fun send
(mpi
: MPI, at
, count
: Int, dest
: Rank, tag
: Tag, comm
: Comm) is abstract
331 # Type specific send full buffer over MPI
332 protected fun send_all
(mpi
: MPI, dest
: Rank, tag
: Tag, comm
: Comm) is abstract
336 # Something which can receive data directly and efficiently from MPI
338 # Subclasses of `Receptacle` should use the native MPI recveive function,
339 # without using Nit serialization.
341 # Type specific receive from MPI
342 protected fun recv
(mpi
: MPI, at
, count
: Int, source
: Rank, tag
: Tag, comm
: Comm) is abstract
344 # Type specific receive and fill buffer from MPI
345 protected fun recv_fill
(mpi
: MPI, source
: Rank, tag
: Tag, comm
: Comm) is abstract
348 redef class CArray[E
]
356 redef fun send
(mpi
, at
, count
, dest
, tag
, comm
)
359 if at
!= 0 or count
!= length
then
360 str
= substring
(at
, count
)
363 mpi
.native_send
(str
.to_cstring
, count
, new DataType.char
,
364 dest
, tag
, new Comm.world
)
367 redef fun send_all
(mpi
, dest
, tag
, comm
) do send
(mpi
, 0, length
, dest
, tag
, comm
)
370 redef class FlatBuffer
373 redef fun recv
(mpi
, at
, count
, source
, tag
, comm
)
375 var min_capacity
= at
+ count
376 if capacity
< min_capacity
then enlarge min_capacity
383 mpi
.native_recv
(array
, count
, new DataType.char
,
384 source
, tag
, new Comm.world
, new Status.ignore
)
390 redef fun recv_fill
(mpi
, dest
, tag
, comm
) do recv
(mpi
, 0, capacity
, dest
, tag
, comm
)
393 redef class CIntArray
394 redef fun send
(mpi
, at
, count
, dest
, tag
, comm
)
398 array
= native_array
+ at
399 else array
= native_array
401 mpi
.native_send
(array
, count
, new DataType.int
,
402 dest
, tag
, new Comm.world
)
405 redef fun send_all
(mpi
, dest
, tag
, comm
) do send
(mpi
, 0, length
, dest
, tag
, comm
)
407 redef fun recv
(mpi
, at
, count
, source
, tag
, comm
)
411 array
= native_array
+ at
412 else array
= native_array
414 mpi
.native_recv
(array
, count
, new DataType.int
,
415 source
, tag
, new Comm.world
, new Status.ignore
)
418 redef fun recv_fill
(mpi
, dest
, tag
, comm
) do recv
(mpi
, 0, length
, dest
, tag
, comm
)
421 # Shortcut to the world communicator (same as `new Comm.world`)
422 fun comm_world
: Comm do return once
new Comm.world