1 # This file is part of NIT (http://www.nitlanguage.org).
3 # Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # Implementation of the Message Passing Interface protocol by wrapping OpenMPI
19 # OpenMPI is used only at linking and for it's `mpi.h`. Other implementations
20 # could be used without much modification.
22 # Supports transfer of any valid `Serializable` instances as well as basic
23 # C arrays defined in module `c`. Using C arrays is encouraged when performance
26 # Since this module is a thin wrapper around OpenMPI, in case of missing
27 # documentation, you can refer to https://www.open-mpi.org/doc/v1.8/.
29 c_compiler_option
(exec
("mpicc", "-showme:compile"))
30 c_linker_option
(exec
("mpicc", "-showme:link"))
34 intrude import standard
::string
36 private import json_serialization
42 # Handle to most MPI services
44 # Initialize the MPI execution environment
47 private fun native_init
`{ MPI_Init(NULL, NULL); `}
49 # Terminates the MPI execution environment
50 fun finalize `{ MPI_Finalize(); `}
52 # Name of this processor, usually the hostname
53 fun processor_name
: String import NativeString.to_s_with_length
`{
54 char *name = malloc(MPI_MAX_PROCESSOR_NAME);
56 MPI_Get_processor_name(name, &size);
57 return NativeString_to_s_with_length(name, size);
60 # Send the content of a buffer
61 fun send_from
(buffer
: Sendable, at
, count
: Int, dest
: Rank, tag
: Tag, comm
: Comm)
63 buffer
.send
(self, at
, count
, dest
, tag
, comm
)
66 # Send the full content of a buffer
67 fun send_all
(buffer
: Sendable, dest
: Rank, tag
: Tag, comm
: Comm)
69 buffer
.send_all
(self, dest
, tag
, comm
)
72 # Efficiently receive data in an existing buffer
73 fun recv_into
(buffer
: Receptacle, at
, count
: Int, source
: Rank, tag
: Tag, comm
: Comm)
75 buffer
.recv
(self, at
, count
, source
, tag
, comm
)
78 # Efficiently receive data and fill an existing buffer
79 fun recv_fill
(buffer
: Receptacle, source
: Rank, tag
: Tag, comm
: Comm)
81 buffer
.recv_fill
(self, source
, tag
, comm
)
84 # Send a complex `Serializable` object
85 fun send
(data
: nullable Serializable, dest
: Rank, tag
: Tag, comm
: Comm)
88 var stream
= new StringOStream
89 var serializer
= new JsonSerializer(stream
)
90 serializer
.serialize
(data
)
94 send_from
(str
, 0, str
.length
, dest
, tag
, comm
)
97 # Receive a complex object
98 fun recv
(source
: Rank, tag
: Tag, comm
: Comm): nullable Object
100 var status
= new Status
102 # Block until a message in in queue
103 var err
= probe
(source
, tag
, comm
, status
)
104 assert err
.is_success
else print err
107 var count
= status
.count
(new DataType.char
)
108 assert not count
.is_undefined
110 # Receive message into buffer
111 var buffer
= new FlatBuffer.with_capacity
(count
)
112 recv_into
(buffer
, 0, count
, status
.source
, status
.tag
, comm
)
117 # Deserialize message
118 var deserializer
= new JsonDeserializer(buffer
)
119 var deserialized
= deserializer
.deserialize
121 if deserialized
== null then print
"|{buffer}|{buffer.chars.join("-")}| {buffer.length}"
126 fun send_empty
(dest
: Rank, tag
: Tag, comm
: Comm): SuccessOrError
128 return MPI_Send(NULL, 0, MPI_CHAR, dest, tag, comm);
131 fun recv_empty
(dest
: Rank, tag
: Tag, comm
: Comm): SuccessOrError
133 return MPI_Recv(NULL, 0, MPI_CHAR, dest, tag, comm, MPI_STATUS_IGNORE);
136 fun native_send
(data
: NativeCArray, count
: Int, data_type
: DataType, dest
: Rank, tag
: Tag, comm
: Comm): SuccessOrError
138 return MPI_Send(data, count, data_type, dest, tag, comm);
141 fun native_recv
(data
: NativeCArray, count
: Int, data_type
: DataType, dest
: Rank, tag
: Tag, comm
: Comm, status
: Status): SuccessOrError
143 return MPI_Recv(data, count, data_type, dest, tag, comm, status);
146 fun probe
(source
: Rank, tag
: Tag, comm
: Comm, status
: Status): SuccessOrError
148 return MPI_Probe(source, tag, comm, status);
151 # Synchronize all processors
152 fun barrier
(comm
: Comm) `{ MPI_Barrier(comm); `}
154 # Seconds since some time in the past which does not change
155 fun wtime: Float `{ return MPI_Wtime(); `}
158 # An MPI communicator
159 extern class Comm `{ MPI_Comm `}
160 new null_ `{ return MPI_COMM_NULL; `}
161 new world
`{ return MPI_COMM_WORLD; `}
162 new self_ `{ return MPI_COMM_SELF; `}
164 # Number of processors in this communicator
167 MPI_Comm_size(recv, &size);
171 # Rank on this processor in this communicator
174 MPI_Comm_rank(recv, &rank);
180 extern class DataType `{ MPI_Datatype `}
181 new char `{ return MPI_CHAR; `}
182 new short
`{ return MPI_SHORT; `}
183 new int `{ return MPI_INT; `}
184 new long
`{ return MPI_LONG; `}
185 new long_long `{ return MPI_LONG_LONG; `}
186 new unsigned_char
`{ return MPI_UNSIGNED_CHAR; `}
187 new unsigned_short `{ return MPI_UNSIGNED_SHORT; `}
188 new unsigned
`{ return MPI_UNSIGNED; `}
189 new unsigned_long `{ return MPI_UNSIGNED_LONG; `}
190 new unsigned_long_long
`{ return MPI_UNSIGNED_LONG_LONG; `}
191 new float `{ return MPI_FLOAT; `}
192 new double
`{ return MPI_DOUBLE; `}
193 new long_double `{ return MPI_LONG_DOUBLE; `}
194 new byte
`{ return MPI_BYTE; `}
197 # Status of a communication used by `MPI::probe
`
198 extern class Status `{ MPI_Status* `}
199 # Ignore the resulting status
200 new ignore
`{ return MPI_STATUS_IGNORE; `}
202 # Allocated a new `Status`, must be freed with `free
`
203 new `{ return malloc(sizeof(MPI_Status)); `}
205 # Source of this communication
206 fun source
: Rank `{ return recv->MPI_SOURCE; `}
208 # Tag of this communication
209 fun tag: Tag `{ return recv->MPI_TAG; `}
211 # Success or error on this communication
212 fun error
: SuccessOrError `{ return recv->MPI_ERROR; `}
214 # Count of the given `data_type
` in this communication
215 fun count(data_type: DataType): Int
218 MPI_Get_count(recv
, data_type
, &count
);
225 # Used with the `reduce
` method
226 extern class Op `{ MPI_Op `}
227 new op_null
`{ return MPI_OP_NULL; `}
228 new max `{ return MPI_MAX; `}
229 new min
`{ return MPI_MIN; `}
230 new sum `{ return MPI_SUM; `}
231 new prod
`{ return MPI_PROD; `}
232 new land `{ return MPI_LAND; `}
233 new band
`{ return MPI_BAND; `}
234 new lor `{ return MPI_LOR; `}
235 new bor
`{ return MPI_BOR; `}
236 new lxor `{ return MPI_LXOR; `}
237 new bxor
`{ return MPI_BXOR; `}
238 new minloc `{ return MPI_MINLOC; `}
239 new maxloc
`{ return MPI_MAXLOC; `}
240 new replace `{ return MPI_REPLACE; `}
243 # An MPI return code to report success or errors
244 extern class SuccessOrError `{ int `}
246 fun is_success: Bool `{ return recv == MPI_SUCCESS; `}
249 fun is_error
: Bool do return not is_success
251 # TODO add is_... for each variant
253 # The class of this error
254 fun error_class
: ErrorClass
257 MPI_Error_class(recv, &class);
261 redef fun to_s
do return native_to_s
.to_s
262 private fun native_to_s
: NativeString `{
263 char *err = malloc(MPI_MAX_ERROR_STRING);
264 MPI_Error_string(recv, err, NULL);
270 extern class ErrorClass `{ int `}
271 redef fun to_s do return native_to_s.to_s
272 private fun native_to_s: NativeString `{
273 char
*err
= malloc
(MPI_MAX_ERROR_STRING);
274 MPI_Error_string(recv
, err
, NULL);
279 # An MPI rank within a communcator
280 extern class Rank `{ int `}
281 new any
`{ return MPI_ANY_SOURCE; `}
283 # This Rank as an `Int`
284 fun to_i: Int `{ return recv; `}
285 redef fun to_s
do return to_i
.to_s
288 # An MPI tag, can be defined using `Int::tag`
289 extern class Tag `{ int `}
290 new any `{ return MPI_ANY_TAG; `}
292 # This tag as an `Int`
293 fun to_i
: Int `{ return recv; `}
294 redef fun to_s do return to_i.to_s
299 fun rank: Rank `{ return recv; `}
301 # Tag identified by `self`
302 fun tag
: Tag `{ return recv; `}
304 # Is this value undefined according to MPI? (may be returned by `Status::count
`)
305 fun is_undefined: Bool `{ return recv == MPI_UNDEFINED; `}
308 # Something sendable directly and efficiently over MPI
310 # Subclasses of `Sendable` should use the native MPI send function, without
311 # using Nit serialization.
313 # Type specific send over MPI
314 protected fun send
(mpi
: MPI, at
, count
: Int, dest
: Rank, tag
: Tag, comm
: Comm) is abstract
316 # Type specific send full buffer over MPI
317 protected fun send_all
(mpi
: MPI, dest
: Rank, tag
: Tag, comm
: Comm) is abstract
321 # Something which can receive data directly and efficiently from MPI
323 # Subclasses of `Receptacle` should use the native MPI recveive function,
324 # without using Nit serialization.
326 # Type specific receive from MPI
327 protected fun recv
(mpi
: MPI, at
, count
: Int, source
: Rank, tag
: Tag, comm
: Comm) is abstract
329 # Type specific receive and fill buffer from MPI
330 protected fun recv_fill
(mpi
: MPI, source
: Rank, tag
: Tag, comm
: Comm) is abstract
333 redef class CArray[E
]
341 redef fun send
(mpi
, at
, count
, dest
, tag
, comm
)
344 if at
!= 0 or count
!= length
then
345 str
= substring
(at
, count
)
348 mpi
.native_send
(str
.to_cstring
, count
, new DataType.char
,
349 dest
, tag
, new Comm.world
)
352 redef fun send_all
(mpi
, dest
, tag
, comm
) do send
(mpi
, 0, length
, dest
, tag
, comm
)
355 redef class FlatBuffer
358 redef fun recv
(mpi
, at
, count
, source
, tag
, comm
)
360 var min_capacity
= at
+ count
361 if capacity
< min_capacity
then enlarge min_capacity
368 mpi
.native_recv
(array
, count
, new DataType.char
,
369 source
, tag
, new Comm.world
, new Status.ignore
)
375 redef fun recv_fill
(mpi
, dest
, tag
, comm
) do recv
(mpi
, 0, capacity
, dest
, tag
, comm
)
378 redef class CIntArray
379 redef fun send
(mpi
, at
, count
, dest
, tag
, comm
)
383 array
= native_array
+ at
384 else array
= native_array
386 mpi
.native_send
(array
, count
, new DataType.int
,
387 dest
, tag
, new Comm.world
)
390 redef fun send_all
(mpi
, dest
, tag
, comm
) do send
(mpi
, 0, length
, dest
, tag
, comm
)
392 redef fun recv
(mpi
, at
, count
, source
, tag
, comm
)
396 array
= native_array
+ at
397 else array
= native_array
399 mpi
.native_recv
(array
, count
, new DataType.int
,
400 source
, tag
, new Comm.world
, new Status.ignore
)
403 redef fun recv_fill
(mpi
, dest
, tag
, comm
) do recv
(mpi
, 0, length
, dest
, tag
, comm
)
406 # Shortcut to the world communicator (same as `new Comm.world`)
407 fun comm_world
: Comm do return once
new Comm.world