neo_doxygen: Fix a forgotten warning.
[nit.git] / lib / mpi.nit
1 # This file is part of NIT (http://www.nitlanguage.org).
2 #
3 # Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 # Implementation of the Message Passing Interface protocol by wrapping OpenMPI
18 #
19 # OpenMPI is used only at linking and for it's `mpi.h`. Other implementations
20 # could be used without much modification.
21 #
22 # Supports transfer of any valid `Serializable` instances as well as basic
23 # C arrays defined in module `c`. Using C arrays is encouraged when performance
24 # is critical.
25 #
26 # Since this module is a thin wrapper around OpenMPI, in case of missing
27 # documentation, you can refer to https://www.open-mpi.org/doc/v1.8/.
28 module mpi is
29 c_compiler_option(exec("mpicc", "-showme:compile"))
30 c_linker_option(exec("mpicc", "-showme:link"))
31 end
32
33 import c
34 intrude import standard::string
35 import serialization
36 private import json_serialization
37
38 in "C Header" `{
39 #include <mpi.h>
40 `}
41
42 # Handle to most MPI services
43 class MPI
44 # Initialize the MPI execution environment
45 init do native_init
46
47 private fun native_init `{ MPI_Init(NULL, NULL); `}
48
49 # Terminates the MPI execution environment
50 fun finalize `{ MPI_Finalize(); `}
51
52 # Name of this processor, usually the hostname
53 fun processor_name: String import NativeString.to_s_with_length `{
54 char *name = malloc(MPI_MAX_PROCESSOR_NAME);
55 int size;
56 MPI_Get_processor_name(name, &size);
57 return NativeString_to_s_with_length(name, size);
58 `}
59
60 # Send the content of a buffer
61 fun send_from(buffer: Sendable, at, count: Int, dest: Rank, tag: Tag, comm: Comm)
62 do
63 buffer.send(self, at, count, dest, tag, comm)
64 end
65
66 # Send the full content of a buffer
67 fun send_all(buffer: Sendable, dest: Rank, tag: Tag, comm: Comm)
68 do
69 buffer.send_all(self, dest, tag, comm)
70 end
71
72 # Efficiently receive data in an existing buffer
73 fun recv_into(buffer: Receptacle, at, count: Int, source: Rank, tag: Tag, comm: Comm)
74 do
75 buffer.recv(self, at, count, source, tag, comm)
76 end
77
78 # Efficiently receive data and fill an existing buffer
79 fun recv_fill(buffer: Receptacle, source: Rank, tag: Tag, comm: Comm)
80 do
81 buffer.recv_fill(self, source, tag, comm)
82 end
83
84 # Send a complex `Serializable` object
85 fun send(data: nullable Serializable, dest: Rank, tag: Tag, comm: Comm)
86 do
87 # Serialize data
88 var stream = new StringOStream
89 var serializer = new JsonSerializer(stream)
90 serializer.serialize(data)
91
92 # Send message
93 var str = stream.to_s
94 send_from(str, 0, str.length, dest, tag, comm)
95 end
96
97 # Receive a complex object
98 fun recv(source: Rank, tag: Tag, comm: Comm): nullable Object
99 do
100 var status = new Status
101
102 # Block until a message in in queue
103 var err = probe(source, tag, comm, status)
104 assert err.is_success else print err
105
106 # Get message length
107 var count = status.count(new DataType.char)
108 assert not count.is_undefined
109
110 # Receive message into buffer
111 var buffer = new FlatBuffer.with_capacity(count)
112 recv_into(buffer, 0, count, status.source, status.tag, comm)
113
114 # Free our status
115 status.free
116
117 # Deserialize message
118 var deserializer = new JsonDeserializer(buffer)
119 var deserialized = deserializer.deserialize
120
121 if deserialized == null then print "|{buffer}|{buffer.chars.join("-")}| {buffer.length}"
122
123 return deserialized
124 end
125
126 # Send an empty buffer, only for the `tag`
127 fun send_empty(dest: Rank, tag: Tag, comm: Comm): SuccessOrError
128 `{
129 return MPI_Send(NULL, 0, MPI_CHAR, dest, tag, comm);
130 `}
131
132 # Receive an empty buffer, only for the `tag`
133 fun recv_empty(dest: Rank, tag: Tag, comm: Comm): SuccessOrError
134 `{
135 return MPI_Recv(NULL, 0, MPI_CHAR, dest, tag, comm, MPI_STATUS_IGNORE);
136 `}
137
138 # Send a `NativeCArray` `buffer` with a given `count` of `data_type`
139 fun native_send(buffer: NativeCArray, count: Int, data_type: DataType, dest: Rank, tag: Tag, comm: Comm): SuccessOrError
140 `{
141 return MPI_Send(buffer, count, data_type, dest, tag, comm);
142 `}
143
144 # Receive into a `NativeCArray` `buffer` with a given `count` of `data_type`
145 fun native_recv(buffer: NativeCArray, count: Int, data_type: DataType, dest: Rank, tag: Tag, comm: Comm, status: Status): SuccessOrError
146 `{
147 return MPI_Recv(buffer, count, data_type, dest, tag, comm, status);
148 `}
149
150 # Probe for the next data to receive, store the result in `status`
151 #
152 # Note: If you encounter an error where the next receive does not correspond
153 # to the last `probe`, call this method twice to ensure a correct result.
154 fun probe(source: Rank, tag: Tag, comm: Comm, status: Status): SuccessOrError
155 `{
156 return MPI_Probe(source, tag, comm, status);
157 `}
158
159 # Synchronize all processors
160 fun barrier(comm: Comm) `{ MPI_Barrier(comm); `}
161
162 # Seconds since some time in the past which does not change
163 fun wtime: Float `{ return MPI_Wtime(); `}
164 end
165
166 # An MPI communicator
167 extern class Comm `{ MPI_Comm `}
168 # The _null_ communicator, targeting no processors
169 new null_ `{ return MPI_COMM_NULL; `}
170
171 # The _world_ communicator, targeting all processors
172 new world `{ return MPI_COMM_WORLD; `}
173
174 # The _self_ communicator, targeting this processor only
175 new self_ `{ return MPI_COMM_SELF; `}
176
177 # Number of processors in this communicator
178 fun size: Int `{
179 int size;
180 MPI_Comm_size(recv, &size);
181 return size;
182 `}
183
184 # Rank on this processor in this communicator
185 fun rank: Rank `{
186 int rank;
187 MPI_Comm_rank(recv, &rank);
188 return rank;
189 `}
190 end
191
192 # An MPI data type
193 extern class DataType `{ MPI_Datatype `}
194 new char `{ return MPI_CHAR; `}
195 new short `{ return MPI_SHORT; `}
196 new int `{ return MPI_INT; `}
197 new long `{ return MPI_LONG; `}
198 new long_long `{ return MPI_LONG_LONG; `}
199 new unsigned_char `{ return MPI_UNSIGNED_CHAR; `}
200 new unsigned_short `{ return MPI_UNSIGNED_SHORT; `}
201 new unsigned `{ return MPI_UNSIGNED; `}
202 new unsigned_long `{ return MPI_UNSIGNED_LONG; `}
203 new unsigned_long_long `{ return MPI_UNSIGNED_LONG_LONG; `}
204 new float `{ return MPI_FLOAT; `}
205 new double `{ return MPI_DOUBLE; `}
206 new long_double `{ return MPI_LONG_DOUBLE; `}
207 new byte `{ return MPI_BYTE; `}
208 end
209
210 # Status of a communication used by `MPI::probe`
211 extern class Status `{ MPI_Status* `}
212 # Ignore the resulting status
213 new ignore `{ return MPI_STATUS_IGNORE; `}
214
215 # Allocated a new `Status`, must be freed with `free`
216 new `{ return malloc(sizeof(MPI_Status)); `}
217
218 # Source of this communication
219 fun source: Rank `{ return recv->MPI_SOURCE; `}
220
221 # Tag of this communication
222 fun tag: Tag `{ return recv->MPI_TAG; `}
223
224 # Success or error on this communication
225 fun error: SuccessOrError `{ return recv->MPI_ERROR; `}
226
227 # Count of the given `data_type` in this communication
228 fun count(data_type: DataType): Int
229 `{
230 int count;
231 MPI_Get_count(recv, data_type, &count);
232 return count;
233 `}
234 end
235
236 # An MPI operation
237 #
238 # Used with the `reduce` method
239 extern class Op `{ MPI_Op `}
240 new op_null `{ return MPI_OP_NULL; `}
241 new max `{ return MPI_MAX; `}
242 new min `{ return MPI_MIN; `}
243 new sum `{ return MPI_SUM; `}
244 new prod `{ return MPI_PROD; `}
245 new land `{ return MPI_LAND; `}
246 new band `{ return MPI_BAND; `}
247 new lor `{ return MPI_LOR; `}
248 new bor `{ return MPI_BOR; `}
249 new lxor `{ return MPI_LXOR; `}
250 new bxor `{ return MPI_BXOR; `}
251 new minloc `{ return MPI_MINLOC; `}
252 new maxloc `{ return MPI_MAXLOC; `}
253 new replace `{ return MPI_REPLACE; `}
254 end
255
256 # An MPI return code to report success or errors
257 extern class SuccessOrError `{ int `}
258 # Is this a success?
259 fun is_success: Bool `{ return recv == MPI_SUCCESS; `}
260
261 # Is this an error?
262 fun is_error: Bool do return not is_success
263
264 # TODO add is_... for each variant
265
266 # The class of this error
267 fun error_class: ErrorClass
268 `{
269 int class;
270 MPI_Error_class(recv, &class);
271 return class;
272 `}
273
274 redef fun to_s do return native_to_s.to_s
275 private fun native_to_s: NativeString `{
276 char *err = malloc(MPI_MAX_ERROR_STRING);
277 MPI_Error_string(recv, err, NULL);
278 return err;
279 `}
280 end
281
282 # An MPI error class
283 extern class ErrorClass `{ int `}
284 redef fun to_s do return native_to_s.to_s
285 private fun native_to_s: NativeString `{
286 char *err = malloc(MPI_MAX_ERROR_STRING);
287 MPI_Error_string(recv, err, NULL);
288 return err;
289 `}
290 end
291
292 # An MPI rank within a communcator
293 extern class Rank `{ int `}
294 # Special rank accepting any processor
295 new any `{ return MPI_ANY_SOURCE; `}
296
297 # This Rank as an `Int`
298 fun to_i: Int `{ return recv; `}
299 redef fun to_s do return to_i.to_s
300 end
301
302 # An MPI tag, can be defined using `Int::tag`
303 extern class Tag `{ int `}
304 # Special tag accepting any tag
305 new any `{ return MPI_ANY_TAG; `}
306
307 # This tag as an `Int`
308 fun to_i: Int `{ return recv; `}
309 redef fun to_s do return to_i.to_s
310 end
311
312 redef universal Int
313 # `self`th MPI rank
314 fun rank: Rank `{ return recv; `}
315
316 # Tag identified by `self`
317 fun tag: Tag `{ return recv; `}
318
319 # Is this value undefined according to MPI? (may be returned by `Status::count`)
320 fun is_undefined: Bool `{ return recv == MPI_UNDEFINED; `}
321 end
322
323 # Something sendable directly and efficiently over MPI
324 #
325 # Subclasses of `Sendable` should use the native MPI send function, without
326 # using Nit serialization.
327 interface Sendable
328 # Type specific send over MPI
329 protected fun send(mpi: MPI, at, count: Int, dest: Rank, tag: Tag, comm: Comm) is abstract
330
331 # Type specific send full buffer over MPI
332 protected fun send_all(mpi: MPI, dest: Rank, tag: Tag, comm: Comm) is abstract
333 end
334
335
336 # Something which can receive data directly and efficiently from MPI
337 #
338 # Subclasses of `Receptacle` should use the native MPI recveive function,
339 # without using Nit serialization.
340 interface Receptacle
341 # Type specific receive from MPI
342 protected fun recv(mpi: MPI, at, count: Int, source: Rank, tag: Tag, comm: Comm) is abstract
343
344 # Type specific receive and fill buffer from MPI
345 protected fun recv_fill(mpi: MPI, source: Rank, tag: Tag, comm: Comm) is abstract
346 end
347
348 redef class CArray[E]
349 super Sendable
350 super Receptacle
351 end
352
353 redef class Text
354 super Sendable
355
356 redef fun send(mpi, at, count, dest, tag, comm)
357 do
358 var str
359 if at != 0 or count != length then
360 str = substring(at, count)
361 else str = self
362
363 mpi.native_send(str.to_cstring, count, new DataType.char,
364 dest, tag, new Comm.world)
365 end
366
367 redef fun send_all(mpi, dest, tag, comm) do send(mpi, 0, length, dest, tag, comm)
368 end
369
370 redef class FlatBuffer
371 super Receptacle
372
373 redef fun recv(mpi, at, count, source, tag, comm)
374 do
375 var min_capacity = at + count
376 if capacity < min_capacity then enlarge min_capacity
377
378 var array
379 if at != 0 then
380 array = items + at
381 else array = items
382
383 mpi.native_recv(array, count, new DataType.char,
384 source, tag, new Comm.world, new Status.ignore)
385
386 length = capacity
387 is_dirty = true
388 end
389
390 redef fun recv_fill(mpi, dest, tag, comm) do recv(mpi, 0, capacity, dest, tag, comm)
391 end
392
393 redef class CIntArray
394 redef fun send(mpi, at, count, dest, tag, comm)
395 do
396 var array
397 if at != 0 then
398 array = native_array + at
399 else array = native_array
400
401 mpi.native_send(array, count, new DataType.int,
402 dest, tag, new Comm.world)
403 end
404
405 redef fun send_all(mpi, dest, tag, comm) do send(mpi, 0, length, dest, tag, comm)
406
407 redef fun recv(mpi, at, count, source, tag, comm)
408 do
409 var array
410 if at != 0 then
411 array = native_array + at
412 else array = native_array
413
414 mpi.native_recv(array, count, new DataType.int,
415 source, tag, new Comm.world, new Status.ignore)
416 end
417
418 redef fun recv_fill(mpi, dest, tag, comm) do recv(mpi, 0, length, dest, tag, comm)
419 end
420
421 # Shortcut to the world communicator (same as `new Comm.world`)
422 fun comm_world: Comm do return once new Comm.world