neo: Correct the documentation of neo.nit according to PR #734.
[nit.git] / lib / mpi.nit
1 # This file is part of NIT (http://www.nitlanguage.org).
2 #
3 # Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 # Implementation of the Message Passing Interface protocol by wrapping OpenMPI
18 #
19 # OpenMPI is used only at linking and for it's `mpi.h`. Other implementations
20 # could be used without much modification.
21 #
22 # Supports transfer of any valid `Serializable` instances as well as basic
23 # C arrays defined in module `c`. Using C arrays is encouraged when performance
24 # is critical.
25 #
26 # Since this module is a thin wrapper around OpenMPI, in case of missing
27 # documentation, you can refer to https://www.open-mpi.org/doc/v1.8/.
28 module mpi is
29 c_compiler_option(exec("mpicc", "-showme:compile"))
30 c_linker_option(exec("mpicc", "-showme:link"))
31 end
32
33 import c
34 intrude import standard::string
35 import serialization
36 private import json_serialization
37
38 in "C Header" `{
39 #include <mpi.h>
40 `}
41
42 # Handle to most MPI services
43 class MPI
44 # Initialize the MPI execution environment
45 init do native_init
46
47 private fun native_init `{ MPI_Init(NULL, NULL); `}
48
49 # Terminates the MPI execution environment
50 fun finalize `{ MPI_Finalize(); `}
51
52 # Name of this processor, usually the hostname
53 fun processor_name: String import NativeString.to_s_with_length `{
54 char *name = malloc(MPI_MAX_PROCESSOR_NAME);
55 int size;
56 MPI_Get_processor_name(name, &size);
57 return NativeString_to_s_with_length(name, size);
58 `}
59
60 # Send the content of a buffer
61 fun send_from(buffer: Sendable, at, count: Int, dest: Rank, tag: Tag, comm: Comm)
62 do
63 buffer.send(self, at, count, dest, tag, comm)
64 end
65
66 # Send the full content of a buffer
67 fun send_all(buffer: Sendable, dest: Rank, tag: Tag, comm: Comm)
68 do
69 buffer.send_all(self, dest, tag, comm)
70 end
71
72 # Efficiently receive data in an existing buffer
73 fun recv_into(buffer: Receptacle, at, count: Int, source: Rank, tag: Tag, comm: Comm)
74 do
75 buffer.recv(self, at, count, source, tag, comm)
76 end
77
78 # Efficiently receive data and fill an existing buffer
79 fun recv_fill(buffer: Receptacle, source: Rank, tag: Tag, comm: Comm)
80 do
81 buffer.recv_fill(self, source, tag, comm)
82 end
83
84 # Send a complex `Serializable` object
85 fun send(data: nullable Serializable, dest: Rank, tag: Tag, comm: Comm)
86 do
87 # Serialize data
88 var stream = new StringOStream
89 var serializer = new JsonSerializer(stream)
90 serializer.serialize(data)
91
92 # Send message
93 var str = stream.to_s
94 send_from(str, 0, str.length, dest, tag, comm)
95 end
96
97 # Receive a complex object
98 fun recv(source: Rank, tag: Tag, comm: Comm): nullable Object
99 do
100 var status = new Status
101
102 # Block until a message in in queue
103 var err = probe(source, tag, comm, status)
104 assert err.is_success else print err
105
106 # Get message length
107 var count = status.count(new DataType.char)
108 assert not count.is_undefined
109
110 # Receive message into buffer
111 var buffer = new FlatBuffer.with_capacity(count)
112 recv_into(buffer, 0, count, status.source, status.tag, comm)
113
114 # Free our status
115 status.free
116
117 # Deserialize message
118 var deserializer = new JsonDeserializer(buffer)
119 var deserialized = deserializer.deserialize
120
121 if deserialized == null then print "|{buffer}|{buffer.chars.join("-")}| {buffer.length}"
122
123 return deserialized
124 end
125
126 fun send_empty(dest: Rank, tag: Tag, comm: Comm): SuccessOrError
127 `{
128 return MPI_Send(NULL, 0, MPI_CHAR, dest, tag, comm);
129 `}
130
131 fun recv_empty(dest: Rank, tag: Tag, comm: Comm): SuccessOrError
132 `{
133 return MPI_Recv(NULL, 0, MPI_CHAR, dest, tag, comm, MPI_STATUS_IGNORE);
134 `}
135
136 fun native_send(data: NativeCArray, count: Int, data_type: DataType, dest: Rank, tag: Tag, comm: Comm): SuccessOrError
137 `{
138 return MPI_Send(data, count, data_type, dest, tag, comm);
139 `}
140
141 fun native_recv(data: NativeCArray, count: Int, data_type: DataType, dest: Rank, tag: Tag, comm: Comm, status: Status): SuccessOrError
142 `{
143 return MPI_Recv(data, count, data_type, dest, tag, comm, status);
144 `}
145
146 fun probe(source: Rank, tag: Tag, comm: Comm, status: Status): SuccessOrError
147 `{
148 return MPI_Probe(source, tag, comm, status);
149 `}
150
151 # Synchronize all processors
152 fun barrier(comm: Comm) `{ MPI_Barrier(comm); `}
153
154 # Seconds since some time in the past which does not change
155 fun wtime: Float `{ return MPI_Wtime(); `}
156 end
157
158 # An MPI communicator
159 extern class Comm `{ MPI_Comm `}
160 new null_ `{ return MPI_COMM_NULL; `}
161 new world `{ return MPI_COMM_WORLD; `}
162 new self_ `{ return MPI_COMM_SELF; `}
163
164 # Number of processors in this communicator
165 fun size: Int `{
166 int size;
167 MPI_Comm_size(recv, &size);
168 return size;
169 `}
170
171 # Rank on this processor in this communicator
172 fun rank: Rank `{
173 int rank;
174 MPI_Comm_rank(recv, &rank);
175 return rank;
176 `}
177 end
178
179 # An MPI data type
180 extern class DataType `{ MPI_Datatype `}
181 new char `{ return MPI_CHAR; `}
182 new short `{ return MPI_SHORT; `}
183 new int `{ return MPI_INT; `}
184 new long `{ return MPI_LONG; `}
185 new long_long `{ return MPI_LONG_LONG; `}
186 new unsigned_char `{ return MPI_UNSIGNED_CHAR; `}
187 new unsigned_short `{ return MPI_UNSIGNED_SHORT; `}
188 new unsigned `{ return MPI_UNSIGNED; `}
189 new unsigned_long `{ return MPI_UNSIGNED_LONG; `}
190 new unsigned_long_long `{ return MPI_UNSIGNED_LONG_LONG; `}
191 new float `{ return MPI_FLOAT; `}
192 new double `{ return MPI_DOUBLE; `}
193 new long_double `{ return MPI_LONG_DOUBLE; `}
194 new byte `{ return MPI_BYTE; `}
195 end
196
197 # Status of a communication used by `MPI::probe`
198 extern class Status `{ MPI_Status* `}
199 # Ignore the resulting status
200 new ignore `{ return MPI_STATUS_IGNORE; `}
201
202 # Allocated a new `Status`, must be freed with `free`
203 new `{ return malloc(sizeof(MPI_Status)); `}
204
205 # Source of this communication
206 fun source: Rank `{ return recv->MPI_SOURCE; `}
207
208 # Tag of this communication
209 fun tag: Tag `{ return recv->MPI_TAG; `}
210
211 # Success or error on this communication
212 fun error: SuccessOrError `{ return recv->MPI_ERROR; `}
213
214 # Count of the given `data_type` in this communication
215 fun count(data_type: DataType): Int
216 `{
217 int count;
218 MPI_Get_count(recv, data_type, &count);
219 return count;
220 `}
221 end
222
223 # An MPI operation
224 #
225 # Used with the `reduce` method
226 extern class Op `{ MPI_Op `}
227 new op_null `{ return MPI_OP_NULL; `}
228 new max `{ return MPI_MAX; `}
229 new min `{ return MPI_MIN; `}
230 new sum `{ return MPI_SUM; `}
231 new prod `{ return MPI_PROD; `}
232 new land `{ return MPI_LAND; `}
233 new band `{ return MPI_BAND; `}
234 new lor `{ return MPI_LOR; `}
235 new bor `{ return MPI_BOR; `}
236 new lxor `{ return MPI_LXOR; `}
237 new bxor `{ return MPI_BXOR; `}
238 new minloc `{ return MPI_MINLOC; `}
239 new maxloc `{ return MPI_MAXLOC; `}
240 new replace `{ return MPI_REPLACE; `}
241 end
242
243 # An MPI return code to report success or errors
244 extern class SuccessOrError `{ int `}
245 # Is this a success?
246 fun is_success: Bool `{ return recv == MPI_SUCCESS; `}
247
248 # Is this an error?
249 fun is_error: Bool do return not is_success
250
251 # TODO add is_... for each variant
252
253 # The class of this error
254 fun error_class: ErrorClass
255 `{
256 int class;
257 MPI_Error_class(recv, &class);
258 return class;
259 `}
260
261 redef fun to_s do return native_to_s.to_s
262 private fun native_to_s: NativeString `{
263 char *err = malloc(MPI_MAX_ERROR_STRING);
264 MPI_Error_string(recv, err, NULL);
265 return err;
266 `}
267 end
268
269 # An MPI error class
270 extern class ErrorClass `{ int `}
271 redef fun to_s do return native_to_s.to_s
272 private fun native_to_s: NativeString `{
273 char *err = malloc(MPI_MAX_ERROR_STRING);
274 MPI_Error_string(recv, err, NULL);
275 return err;
276 `}
277 end
278
279 # An MPI rank within a communcator
280 extern class Rank `{ int `}
281 new any `{ return MPI_ANY_SOURCE; `}
282
283 # This Rank as an `Int`
284 fun to_i: Int `{ return recv; `}
285 redef fun to_s do return to_i.to_s
286 end
287
288 # An MPI tag, can be defined using `Int::tag`
289 extern class Tag `{ int `}
290 new any `{ return MPI_ANY_TAG; `}
291
292 # This tag as an `Int`
293 fun to_i: Int `{ return recv; `}
294 redef fun to_s do return to_i.to_s
295 end
296
297 redef universal Int
298 # `self`th MPI rank
299 fun rank: Rank `{ return recv; `}
300
301 # Tag identified by `self`
302 fun tag: Tag `{ return recv; `}
303
304 # Is this value undefined according to MPI? (may be returned by `Status::count`)
305 fun is_undefined: Bool `{ return recv == MPI_UNDEFINED; `}
306 end
307
308 # Something sendable directly and efficiently over MPI
309 #
310 # Subclasses of `Sendable` should use the native MPI send function, without
311 # using Nit serialization.
312 interface Sendable
313 # Type specific send over MPI
314 protected fun send(mpi: MPI, at, count: Int, dest: Rank, tag: Tag, comm: Comm) is abstract
315
316 # Type specific send full buffer over MPI
317 protected fun send_all(mpi: MPI, dest: Rank, tag: Tag, comm: Comm) is abstract
318 end
319
320
321 # Something which can receive data directly and efficiently from MPI
322 #
323 # Subclasses of `Receptacle` should use the native MPI recveive function,
324 # without using Nit serialization.
325 interface Receptacle
326 # Type specific receive from MPI
327 protected fun recv(mpi: MPI, at, count: Int, source: Rank, tag: Tag, comm: Comm) is abstract
328
329 # Type specific receive and fill buffer from MPI
330 protected fun recv_fill(mpi: MPI, source: Rank, tag: Tag, comm: Comm) is abstract
331 end
332
333 redef class CArray[E]
334 super Sendable
335 super Receptacle
336 end
337
338 redef class Text
339 super Sendable
340
341 redef fun send(mpi, at, count, dest, tag, comm)
342 do
343 var str
344 if at != 0 or count != length then
345 str = substring(at, count)
346 else str = self
347
348 mpi.native_send(str.to_cstring, count, new DataType.char,
349 dest, tag, new Comm.world)
350 end
351
352 redef fun send_all(mpi, dest, tag, comm) do send(mpi, 0, length, dest, tag, comm)
353 end
354
355 redef class FlatBuffer
356 super Receptacle
357
358 redef fun recv(mpi, at, count, source, tag, comm)
359 do
360 var min_capacity = at + count
361 if capacity < min_capacity then enlarge min_capacity
362
363 var array
364 if at != 0 then
365 array = items + at
366 else array = items
367
368 mpi.native_recv(array, count, new DataType.char,
369 source, tag, new Comm.world, new Status.ignore)
370
371 length = capacity
372 is_dirty = true
373 end
374
375 redef fun recv_fill(mpi, dest, tag, comm) do recv(mpi, 0, capacity, dest, tag, comm)
376 end
377
378 redef class CIntArray
379 redef fun send(mpi, at, count, dest, tag, comm)
380 do
381 var array
382 if at != 0 then
383 array = native_array + at
384 else array = native_array
385
386 mpi.native_send(array, count, new DataType.int,
387 dest, tag, new Comm.world)
388 end
389
390 redef fun send_all(mpi, dest, tag, comm) do send(mpi, 0, length, dest, tag, comm)
391
392 redef fun recv(mpi, at, count, source, tag, comm)
393 do
394 var array
395 if at != 0 then
396 array = native_array + at
397 else array = native_array
398
399 mpi.native_recv(array, count, new DataType.int,
400 source, tag, new Comm.world, new Status.ignore)
401 end
402
403 redef fun recv_fill(mpi, dest, tag, comm) do recv(mpi, 0, length, dest, tag, comm)
404 end
405
406 # Shortcut to the world communicator (same as `new Comm.world`)
407 fun comm_world: Comm do return once new Comm.world