Merge: Relocate examples
[nit.git] / lib / mpi / mpi.nit
diff --git a/lib/mpi/mpi.nit b/lib/mpi/mpi.nit
new file mode 100644 (file)
index 0000000..a9e22d8
--- /dev/null
@@ -0,0 +1,488 @@
+# This file is part of NIT (http://www.nitlanguage.org).
+#
+# Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Implementation of the Message Passing Interface protocol by wrapping OpenMPI
+#
+# OpenMPI is used only at linking and for it's `mpi.h`. Other implementations
+# could be used without much modification.
+#
+# Supports transfer of any valid `Serializable` instances as well as basic
+# C arrays defined in module `c`. Using C arrays is encouraged when performance
+# is critical.
+#
+# Since this module is a thin wrapper around OpenMPI, in case of missing
+# documentation, you can refer to https://www.open-mpi.org/doc/v1.8/.
+module mpi is
+       cflags exec("mpicc", "-showme:compile")
+       ldflags exec("mpicc", "-showme:link")
+end
+
+import c
+intrude import core::text::flat
+import serialization
+private import json::serialization
+
+in "C Header" `{
+       #include <mpi.h>
+`}
+
+# Handle to most MPI services
+class MPI
+       # Initialize the MPI execution environment
+       init do native_init
+
+       private fun native_init `{ MPI_Init(NULL, NULL); `}
+
+       # Terminates the MPI execution environment
+       fun finalize `{ MPI_Finalize(); `}
+
+       # Name of this processor, usually the hostname
+       fun processor_name: String import NativeString.to_s_with_length `{
+               char *name = malloc(MPI_MAX_PROCESSOR_NAME);
+               int size;
+               MPI_Get_processor_name(name, &size);
+               return NativeString_to_s_with_length(name, size);
+       `}
+
+       # Send the content of a buffer
+       fun send_from(buffer: Sendable, at, count: Int, dest: Rank, tag: Tag, comm: Comm)
+       do
+               buffer.send(self, at, count, dest, tag, comm)
+       end
+
+       # Send the full content of a buffer
+       fun send_all(buffer: Sendable, dest: Rank, tag: Tag, comm: Comm)
+       do
+               buffer.send_all(self, dest, tag, comm)
+       end
+
+       # Efficiently receive data in an existing buffer
+       fun recv_into(buffer: Receptacle, at, count: Int, source: Rank, tag: Tag, comm: Comm)
+       do
+               buffer.recv(self, at, count, source, tag, comm)
+       end
+
+       # Efficiently receive data and fill an existing buffer
+       fun recv_fill(buffer: Receptacle, source: Rank, tag: Tag, comm: Comm)
+       do
+               buffer.recv_fill(self, source, tag, comm)
+       end
+
+       # Send a complex `Serializable` object
+       fun send(data: nullable Serializable, dest: Rank, tag: Tag, comm: Comm)
+       do
+               # Serialize data
+               var stream = new StringWriter
+               var serializer = new JsonSerializer(stream)
+               serializer.serialize(data)
+
+               # Send message
+               var str = stream.to_s
+               send_from(str, 0, str.length, dest, tag, comm)
+       end
+
+       # Receive a complex object
+       fun recv(source: Rank, tag: Tag, comm: Comm): nullable Object
+       do
+               var status = new Status
+
+               # Block until a message in in queue
+               var err = probe(source, tag, comm, status)
+               assert err.is_success else print err
+
+               # Get message length
+               var count = status.count(new DataType.char)
+               assert not count.is_undefined
+
+               # Receive message into buffer
+               var buffer = new FlatBuffer.with_capacity(count)
+               recv_into(buffer, 0, count, status.source, status.tag, comm)
+
+               # Free our status
+               status.free
+
+               # Deserialize message
+               var deserializer = new JsonDeserializer(buffer)
+               var deserialized = deserializer.deserialize
+
+               if deserialized == null then print "|{buffer}|{buffer.chars.join("-")}| {buffer.length}"
+
+               return deserialized
+       end
+
+       # Send an empty buffer, only for the `tag`
+       fun send_empty(dest: Rank, tag: Tag, comm: Comm): SuccessOrError
+       `{
+               return MPI_Send(NULL, 0, MPI_CHAR, dest, tag, comm);
+       `}
+
+       # Receive an empty buffer, only for the `tag`
+       fun recv_empty(dest: Rank, tag: Tag, comm: Comm): SuccessOrError
+       `{
+               return MPI_Recv(NULL, 0, MPI_CHAR, dest, tag, comm, MPI_STATUS_IGNORE);
+       `}
+
+       # Send a `NativeCArray` `buffer` with a given `count` of `data_type`
+       fun native_send(buffer: NativeCArray, count: Int, data_type: DataType, dest: Rank, tag: Tag, comm: Comm): SuccessOrError
+       `{
+               return MPI_Send(buffer, count, data_type, dest, tag, comm);
+       `}
+
+       # Receive into a `NativeCArray` `buffer` with a given `count` of `data_type`
+       fun native_recv(buffer: NativeCArray, count: Int, data_type: DataType, dest: Rank, tag: Tag, comm: Comm, status: Status): SuccessOrError
+       `{
+               return MPI_Recv(buffer, count, data_type, dest, tag, comm, status);
+       `}
+
+       # Probe for the next data to receive, store the result in `status`
+       #
+       # Note: If you encounter an error where the next receive does not correspond
+       # to the last `probe`, call this method twice to ensure a correct result.
+       fun probe(source: Rank, tag: Tag, comm: Comm, status: Status): SuccessOrError
+       `{
+               return MPI_Probe(source, tag, comm, status);
+       `}
+
+       # Synchronize all processors
+       fun barrier(comm: Comm) `{ MPI_Barrier(comm); `}
+
+       # Seconds since some time in the past which does not change
+       fun wtime: Float `{ return MPI_Wtime(); `}
+end
+
+# An MPI communicator
+extern class Comm `{ MPI_Comm `}
+       # The _null_ communicator, targeting no processors
+       new null_ `{ return MPI_COMM_NULL; `}
+
+       # The _world_ communicator, targeting all processors
+       new world `{ return MPI_COMM_WORLD; `}
+
+       # The _self_ communicator, targeting this processor only
+       new self_ `{ return MPI_COMM_SELF; `}
+
+       # Number of processors in this communicator
+       fun size: Int `{
+               int size;
+               MPI_Comm_size(self, &size);
+               return size;
+       `}
+
+       # Rank on this processor in this communicator
+       fun rank: Rank `{
+               int rank;
+               MPI_Comm_rank(self, &rank);
+               return rank;
+       `}
+end
+
+# An MPI data type
+extern class DataType `{ MPI_Datatype `}
+       # Get a MPI char.
+       new char `{ return MPI_CHAR; `}
+
+       # Get a MPI short.
+       new short `{ return MPI_SHORT; `}
+
+       # Get a MPI int.
+       new int `{ return MPI_INT; `}
+
+       # Get a MPI long.
+       new long `{ return MPI_LONG; `}
+
+       # Get a MPI long long.
+       new long_long `{ return MPI_LONG_LONG; `}
+
+       # Get a MPI unsigned char.
+       new unsigned_char `{ return MPI_UNSIGNED_CHAR; `}
+
+       # Get a MPI unsigned short.
+       new unsigned_short `{ return MPI_UNSIGNED_SHORT; `}
+
+       # Get a MPI unsigned int.
+       new unsigned `{ return MPI_UNSIGNED; `}
+
+       # Get a MPI unsigned long.
+       new unsigned_long `{ return MPI_UNSIGNED_LONG; `}
+
+       # Get a MPI unsigned long long.
+       new unsigned_long_long `{ return MPI_UNSIGNED_LONG_LONG; `}
+
+       # Get a MPI float.
+       new float `{ return MPI_FLOAT; `}
+
+       # Get a MPI double.
+       new double `{ return MPI_DOUBLE; `}
+
+       # Get a MPI long double.
+       new long_double `{ return MPI_LONG_DOUBLE; `}
+
+       # Get a MPI byte.
+       new byte `{ return MPI_BYTE; `}
+end
+
+# Status of a communication used by `MPI::probe`
+extern class Status `{ MPI_Status* `}
+       # Ignore the resulting status
+       new ignore `{ return MPI_STATUS_IGNORE; `}
+
+       # Allocated a new `Status`, must be freed with `free`
+       new `{ return malloc(sizeof(MPI_Status)); `}
+
+       # Source of this communication
+       fun source: Rank `{ return self->MPI_SOURCE; `}
+
+       # Tag of this communication
+       fun tag: Tag `{ return self->MPI_TAG; `}
+
+       # Success or error on this communication
+       fun error: SuccessOrError `{ return self->MPI_ERROR; `}
+
+       # Count of the given `data_type` in this communication
+       fun count(data_type: DataType): Int
+       `{
+               int count;
+               MPI_Get_count(self, data_type, &count);
+               return count;
+       `}
+end
+
+# An MPI operation
+#
+# Used with the `reduce` method.
+#
+# See <http://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node78.html>
+extern class Op `{ MPI_Op `}
+       # Get a MPI null operation.
+       new op_null `{ return MPI_OP_NULL; `}
+
+       # Get a MPI maximum operation.
+       new max `{ return MPI_MAX; `}
+
+       # Get a MPI minimum operation.
+       new min `{ return MPI_MIN; `}
+
+       # Get a MPI sum operation.
+       new sum `{ return MPI_SUM; `}
+
+       # Get a MPI product operation.
+       new prod `{ return MPI_PROD; `}
+
+       # Get a MPI logical and operation.
+       new land `{ return MPI_LAND; `}
+
+       # Get a MPI bit-wise and operation.
+       new band `{ return MPI_BAND; `}
+
+       # Get a MPI logical or operation.
+       new lor `{ return MPI_LOR; `}
+
+       # Get a MPI bit-wise or operation.
+       new bor `{ return MPI_BOR; `}
+
+       # Get a MPI logical xor operation.
+       new lxor `{ return MPI_LXOR; `}
+
+       # Get a MPI bit-wise xor operation.
+       new bxor `{ return MPI_BXOR; `}
+
+       # Get a MPI minloc operation.
+       #
+       # Used to compute a global minimum and also an index attached
+       # to the minimum value.
+       #
+       # See <http://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node79.html#Node79>
+       new minloc `{ return MPI_MINLOC; `}
+
+       # Get a MPI maxloc operation.
+       #
+       # Used to compute a global maximum and also an index attached
+       # to the maximum value.
+       #
+       # See <http://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node79.html#Node79>
+       new maxloc `{ return MPI_MAXLOC; `}
+
+       # Get a MPI replace operation.
+       new replace `{ return MPI_REPLACE; `}
+end
+
+# An MPI return code to report success or errors
+extern class SuccessOrError `{ int `}
+       # Is this a success?
+       fun is_success: Bool `{ return self == MPI_SUCCESS; `}
+
+       # Is this an error?
+       fun is_error: Bool do return not is_success
+
+       # TODO add is_... for each variant
+
+       # The class of this error
+       fun error_class: ErrorClass
+       `{
+               int class;
+               MPI_Error_class(self, &class);
+               return class;
+       `}
+
+       redef fun to_s do return native_to_s.to_s
+       private fun native_to_s: NativeString `{
+               char *err = malloc(MPI_MAX_ERROR_STRING);
+               MPI_Error_string(self, err, NULL);
+               return err;
+       `}
+end
+
+# An MPI error class
+extern class ErrorClass `{ int `}
+       redef fun to_s do return native_to_s.to_s
+       private fun native_to_s: NativeString `{
+               char *err = malloc(MPI_MAX_ERROR_STRING);
+               MPI_Error_string(self, err, NULL);
+               return err;
+       `}
+end
+
+# An MPI rank within a communcator
+extern class Rank `{ int `}
+       # Special rank accepting any processor
+       new any `{ return MPI_ANY_SOURCE; `}
+
+       # This Rank as an `Int`
+       fun to_i: Int `{ return self; `}
+       redef fun to_s do return to_i.to_s
+end
+
+# An MPI tag, can be defined using `Int::tag`
+extern class Tag `{ int `}
+       # Special tag accepting any tag
+       new any `{ return MPI_ANY_TAG; `}
+
+       # This tag as an `Int`
+       fun to_i: Int `{ return self; `}
+       redef fun to_s do return to_i.to_s
+end
+
+redef universal Int
+       # `self`th MPI rank
+       fun rank: Rank `{ return self; `}
+
+       # Tag identified by `self`
+       fun tag: Tag `{ return self; `}
+
+       # Is this value undefined according to MPI? (may be returned by `Status::count`)
+       fun is_undefined: Bool `{ return self == MPI_UNDEFINED; `}
+end
+
+# Something sendable directly and efficiently over MPI
+#
+# Subclasses of `Sendable` should use the native MPI send function, without
+# using Nit serialization.
+interface Sendable
+       # Type specific send over MPI
+       protected fun send(mpi: MPI, at, count: Int, dest: Rank, tag: Tag, comm: Comm) is abstract
+
+       # Type specific send full buffer over MPI
+       protected fun send_all(mpi: MPI, dest: Rank, tag: Tag, comm: Comm) is abstract
+end
+
+
+# Something which can receive data directly and efficiently from MPI
+#
+# Subclasses of `Receptacle` should use the native MPI recveive function,
+# without using Nit serialization.
+interface Receptacle
+       # Type specific receive from MPI
+       protected fun recv(mpi: MPI, at, count: Int, source: Rank, tag: Tag, comm: Comm) is abstract
+
+       # Type specific receive and fill buffer from MPI
+       protected fun recv_fill(mpi: MPI, source: Rank, tag: Tag, comm: Comm) is abstract
+end
+
+redef class CArray[E]
+       super Sendable
+       super Receptacle
+end
+
+redef class Text
+       super Sendable
+
+       redef fun send(mpi, at, count, dest, tag, comm)
+       do
+               var str
+               if at != 0 or count != length then
+                       str = substring(at, count)
+               else str = self
+
+               mpi.native_send(str.to_cstring, count, new DataType.char,
+                       dest, tag, new Comm.world)
+       end
+
+       redef fun send_all(mpi, dest, tag, comm) do send(mpi, 0, length, dest, tag, comm)
+end
+
+redef class FlatBuffer
+       super Receptacle
+
+       redef fun recv(mpi, at, count, source, tag, comm)
+       do
+               var min_capacity = at + count
+               if capacity < min_capacity then enlarge min_capacity
+
+               var array
+               if at != 0 then
+                       array = items + at
+               else array = items
+
+               mpi.native_recv(array, count, new DataType.char,
+                       source, tag, new Comm.world, new Status.ignore)
+
+               length = capacity
+               is_dirty = true
+       end
+
+       redef fun recv_fill(mpi, dest, tag, comm) do recv(mpi, 0, capacity, dest, tag, comm)
+end
+
+redef class CIntArray
+       redef fun send(mpi, at, count, dest, tag, comm)
+       do
+               var array
+               if at != 0 then
+                       array = native_array + at
+               else array = native_array
+
+               mpi.native_send(array, count, new DataType.int,
+                       dest, tag, new Comm.world)
+       end
+
+       redef fun send_all(mpi, dest, tag, comm) do send(mpi, 0, length, dest, tag, comm)
+
+       redef fun recv(mpi, at, count, source, tag, comm)
+       do
+               var array
+               if at != 0 then
+                       array = native_array + at
+               else array = native_array
+
+               mpi.native_recv(array, count, new DataType.int,
+                       source, tag, new Comm.world, new Status.ignore)
+       end
+
+       redef fun recv_fill(mpi, dest, tag, comm) do recv(mpi, 0, length, dest, tag, comm)
+end
+
+# Shortcut to the world communicator (same as `new Comm.world`)
+fun comm_world: Comm do return once new Comm.world