+++ /dev/null
-# This file is part of NIT (http://www.nitlanguage.org).
-#
-# Copyright 2014 Alexis Laferrière <alexis.laf@xymus.net>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Implementation of the Message Passing Interface protocol by wrapping OpenMPI
-#
-# OpenMPI is used only at linking and for it's `mpi.h`. Other implementations
-# could be used without much modification.
-#
-# Supports transfer of any valid `Serializable` instances as well as basic
-# C arrays defined in module `c`. Using C arrays is encouraged when performance
-# is critical.
-#
-# Since this module is a thin wrapper around OpenMPI, in case of missing
-# documentation, you can refer to https://www.open-mpi.org/doc/v1.8/.
-module mpi is
- cflags exec("mpicc", "-showme:compile")
- ldflags exec("mpicc", "-showme:link")
-end
-
-import c
-intrude import core::text::flat
-import serialization
-private import json::serialization
-
-in "C Header" `{
- #include <mpi.h>
-`}
-
-# Handle to most MPI services
-class MPI
- # Initialize the MPI execution environment
- init do native_init
-
- private fun native_init `{ MPI_Init(NULL, NULL); `}
-
- # Terminates the MPI execution environment
- fun finalize `{ MPI_Finalize(); `}
-
- # Name of this processor, usually the hostname
- fun processor_name: String import NativeString.to_s_with_length `{
- char *name = malloc(MPI_MAX_PROCESSOR_NAME);
- int size;
- MPI_Get_processor_name(name, &size);
- return NativeString_to_s_with_length(name, size);
- `}
-
- # Send the content of a buffer
- fun send_from(buffer: Sendable, at, count: Int, dest: Rank, tag: Tag, comm: Comm)
- do
- buffer.send(self, at, count, dest, tag, comm)
- end
-
- # Send the full content of a buffer
- fun send_all(buffer: Sendable, dest: Rank, tag: Tag, comm: Comm)
- do
- buffer.send_all(self, dest, tag, comm)
- end
-
- # Efficiently receive data in an existing buffer
- fun recv_into(buffer: Receptacle, at, count: Int, source: Rank, tag: Tag, comm: Comm)
- do
- buffer.recv(self, at, count, source, tag, comm)
- end
-
- # Efficiently receive data and fill an existing buffer
- fun recv_fill(buffer: Receptacle, source: Rank, tag: Tag, comm: Comm)
- do
- buffer.recv_fill(self, source, tag, comm)
- end
-
- # Send a complex `Serializable` object
- fun send(data: nullable Serializable, dest: Rank, tag: Tag, comm: Comm)
- do
- # Serialize data
- var stream = new StringWriter
- var serializer = new JsonSerializer(stream)
- serializer.serialize(data)
-
- # Send message
- var str = stream.to_s
- send_from(str, 0, str.length, dest, tag, comm)
- end
-
- # Receive a complex object
- fun recv(source: Rank, tag: Tag, comm: Comm): nullable Object
- do
- var status = new Status
-
- # Block until a message in in queue
- var err = probe(source, tag, comm, status)
- assert err.is_success else print err
-
- # Get message length
- var count = status.count(new DataType.char)
- assert not count.is_undefined
-
- # Receive message into buffer
- var buffer = new FlatBuffer.with_capacity(count)
- recv_into(buffer, 0, count, status.source, status.tag, comm)
-
- # Free our status
- status.free
-
- # Deserialize message
- var deserializer = new JsonDeserializer(buffer)
- var deserialized = deserializer.deserialize
-
- if deserialized == null then print "|{buffer}|{buffer.chars.join("-")}| {buffer.length}"
-
- return deserialized
- end
-
- # Send an empty buffer, only for the `tag`
- fun send_empty(dest: Rank, tag: Tag, comm: Comm): SuccessOrError
- `{
- return MPI_Send(NULL, 0, MPI_CHAR, dest, tag, comm);
- `}
-
- # Receive an empty buffer, only for the `tag`
- fun recv_empty(dest: Rank, tag: Tag, comm: Comm): SuccessOrError
- `{
- return MPI_Recv(NULL, 0, MPI_CHAR, dest, tag, comm, MPI_STATUS_IGNORE);
- `}
-
- # Send a `NativeCArray` `buffer` with a given `count` of `data_type`
- fun native_send(buffer: NativeCArray, count: Int, data_type: DataType, dest: Rank, tag: Tag, comm: Comm): SuccessOrError
- `{
- return MPI_Send(buffer, count, data_type, dest, tag, comm);
- `}
-
- # Receive into a `NativeCArray` `buffer` with a given `count` of `data_type`
- fun native_recv(buffer: NativeCArray, count: Int, data_type: DataType, dest: Rank, tag: Tag, comm: Comm, status: Status): SuccessOrError
- `{
- return MPI_Recv(buffer, count, data_type, dest, tag, comm, status);
- `}
-
- # Probe for the next data to receive, store the result in `status`
- #
- # Note: If you encounter an error where the next receive does not correspond
- # to the last `probe`, call this method twice to ensure a correct result.
- fun probe(source: Rank, tag: Tag, comm: Comm, status: Status): SuccessOrError
- `{
- return MPI_Probe(source, tag, comm, status);
- `}
-
- # Synchronize all processors
- fun barrier(comm: Comm) `{ MPI_Barrier(comm); `}
-
- # Seconds since some time in the past which does not change
- fun wtime: Float `{ return MPI_Wtime(); `}
-end
-
-# An MPI communicator
-extern class Comm `{ MPI_Comm `}
- # The _null_ communicator, targeting no processors
- new null_ `{ return MPI_COMM_NULL; `}
-
- # The _world_ communicator, targeting all processors
- new world `{ return MPI_COMM_WORLD; `}
-
- # The _self_ communicator, targeting this processor only
- new self_ `{ return MPI_COMM_SELF; `}
-
- # Number of processors in this communicator
- fun size: Int `{
- int size;
- MPI_Comm_size(self, &size);
- return size;
- `}
-
- # Rank on this processor in this communicator
- fun rank: Rank `{
- int rank;
- MPI_Comm_rank(self, &rank);
- return rank;
- `}
-end
-
-# An MPI data type
-extern class DataType `{ MPI_Datatype `}
- # Get a MPI char.
- new char `{ return MPI_CHAR; `}
-
- # Get a MPI short.
- new short `{ return MPI_SHORT; `}
-
- # Get a MPI int.
- new int `{ return MPI_INT; `}
-
- # Get a MPI long.
- new long `{ return MPI_LONG; `}
-
- # Get a MPI long long.
- new long_long `{ return MPI_LONG_LONG; `}
-
- # Get a MPI unsigned char.
- new unsigned_char `{ return MPI_UNSIGNED_CHAR; `}
-
- # Get a MPI unsigned short.
- new unsigned_short `{ return MPI_UNSIGNED_SHORT; `}
-
- # Get a MPI unsigned int.
- new unsigned `{ return MPI_UNSIGNED; `}
-
- # Get a MPI unsigned long.
- new unsigned_long `{ return MPI_UNSIGNED_LONG; `}
-
- # Get a MPI unsigned long long.
- new unsigned_long_long `{ return MPI_UNSIGNED_LONG_LONG; `}
-
- # Get a MPI float.
- new float `{ return MPI_FLOAT; `}
-
- # Get a MPI double.
- new double `{ return MPI_DOUBLE; `}
-
- # Get a MPI long double.
- new long_double `{ return MPI_LONG_DOUBLE; `}
-
- # Get a MPI byte.
- new byte `{ return MPI_BYTE; `}
-end
-
-# Status of a communication used by `MPI::probe`
-extern class Status `{ MPI_Status* `}
- # Ignore the resulting status
- new ignore `{ return MPI_STATUS_IGNORE; `}
-
- # Allocated a new `Status`, must be freed with `free`
- new `{ return malloc(sizeof(MPI_Status)); `}
-
- # Source of this communication
- fun source: Rank `{ return self->MPI_SOURCE; `}
-
- # Tag of this communication
- fun tag: Tag `{ return self->MPI_TAG; `}
-
- # Success or error on this communication
- fun error: SuccessOrError `{ return self->MPI_ERROR; `}
-
- # Count of the given `data_type` in this communication
- fun count(data_type: DataType): Int
- `{
- int count;
- MPI_Get_count(self, data_type, &count);
- return count;
- `}
-end
-
-# An MPI operation
-#
-# Used with the `reduce` method.
-#
-# See <http://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node78.html>
-extern class Op `{ MPI_Op `}
- # Get a MPI null operation.
- new op_null `{ return MPI_OP_NULL; `}
-
- # Get a MPI maximum operation.
- new max `{ return MPI_MAX; `}
-
- # Get a MPI minimum operation.
- new min `{ return MPI_MIN; `}
-
- # Get a MPI sum operation.
- new sum `{ return MPI_SUM; `}
-
- # Get a MPI product operation.
- new prod `{ return MPI_PROD; `}
-
- # Get a MPI logical and operation.
- new land `{ return MPI_LAND; `}
-
- # Get a MPI bit-wise and operation.
- new band `{ return MPI_BAND; `}
-
- # Get a MPI logical or operation.
- new lor `{ return MPI_LOR; `}
-
- # Get a MPI bit-wise or operation.
- new bor `{ return MPI_BOR; `}
-
- # Get a MPI logical xor operation.
- new lxor `{ return MPI_LXOR; `}
-
- # Get a MPI bit-wise xor operation.
- new bxor `{ return MPI_BXOR; `}
-
- # Get a MPI minloc operation.
- #
- # Used to compute a global minimum and also an index attached
- # to the minimum value.
- #
- # See <http://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node79.html#Node79>
- new minloc `{ return MPI_MINLOC; `}
-
- # Get a MPI maxloc operation.
- #
- # Used to compute a global maximum and also an index attached
- # to the maximum value.
- #
- # See <http://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node79.html#Node79>
- new maxloc `{ return MPI_MAXLOC; `}
-
- # Get a MPI replace operation.
- new replace `{ return MPI_REPLACE; `}
-end
-
-# An MPI return code to report success or errors
-extern class SuccessOrError `{ int `}
- # Is this a success?
- fun is_success: Bool `{ return self == MPI_SUCCESS; `}
-
- # Is this an error?
- fun is_error: Bool do return not is_success
-
- # TODO add is_... for each variant
-
- # The class of this error
- fun error_class: ErrorClass
- `{
- int class;
- MPI_Error_class(self, &class);
- return class;
- `}
-
- redef fun to_s do return native_to_s.to_s
- private fun native_to_s: NativeString `{
- char *err = malloc(MPI_MAX_ERROR_STRING);
- MPI_Error_string(self, err, NULL);
- return err;
- `}
-end
-
-# An MPI error class
-extern class ErrorClass `{ int `}
- redef fun to_s do return native_to_s.to_s
- private fun native_to_s: NativeString `{
- char *err = malloc(MPI_MAX_ERROR_STRING);
- MPI_Error_string(self, err, NULL);
- return err;
- `}
-end
-
-# An MPI rank within a communcator
-extern class Rank `{ int `}
- # Special rank accepting any processor
- new any `{ return MPI_ANY_SOURCE; `}
-
- # This Rank as an `Int`
- fun to_i: Int `{ return self; `}
- redef fun to_s do return to_i.to_s
-end
-
-# An MPI tag, can be defined using `Int::tag`
-extern class Tag `{ int `}
- # Special tag accepting any tag
- new any `{ return MPI_ANY_TAG; `}
-
- # This tag as an `Int`
- fun to_i: Int `{ return self; `}
- redef fun to_s do return to_i.to_s
-end
-
-redef universal Int
- # `self`th MPI rank
- fun rank: Rank `{ return self; `}
-
- # Tag identified by `self`
- fun tag: Tag `{ return self; `}
-
- # Is this value undefined according to MPI? (may be returned by `Status::count`)
- fun is_undefined: Bool `{ return self == MPI_UNDEFINED; `}
-end
-
-# Something sendable directly and efficiently over MPI
-#
-# Subclasses of `Sendable` should use the native MPI send function, without
-# using Nit serialization.
-interface Sendable
- # Type specific send over MPI
- protected fun send(mpi: MPI, at, count: Int, dest: Rank, tag: Tag, comm: Comm) is abstract
-
- # Type specific send full buffer over MPI
- protected fun send_all(mpi: MPI, dest: Rank, tag: Tag, comm: Comm) is abstract
-end
-
-
-# Something which can receive data directly and efficiently from MPI
-#
-# Subclasses of `Receptacle` should use the native MPI recveive function,
-# without using Nit serialization.
-interface Receptacle
- # Type specific receive from MPI
- protected fun recv(mpi: MPI, at, count: Int, source: Rank, tag: Tag, comm: Comm) is abstract
-
- # Type specific receive and fill buffer from MPI
- protected fun recv_fill(mpi: MPI, source: Rank, tag: Tag, comm: Comm) is abstract
-end
-
-redef class CArray[E]
- super Sendable
- super Receptacle
-end
-
-redef class Text
- super Sendable
-
- redef fun send(mpi, at, count, dest, tag, comm)
- do
- var str
- if at != 0 or count != length then
- str = substring(at, count)
- else str = self
-
- mpi.native_send(str.to_cstring, count, new DataType.char,
- dest, tag, new Comm.world)
- end
-
- redef fun send_all(mpi, dest, tag, comm) do send(mpi, 0, length, dest, tag, comm)
-end
-
-redef class FlatBuffer
- super Receptacle
-
- redef fun recv(mpi, at, count, source, tag, comm)
- do
- var min_capacity = at + count
- if capacity < min_capacity then enlarge min_capacity
-
- var array
- if at != 0 then
- array = items + at
- else array = items
-
- mpi.native_recv(array, count, new DataType.char,
- source, tag, new Comm.world, new Status.ignore)
-
- length = capacity
- is_dirty = true
- end
-
- redef fun recv_fill(mpi, dest, tag, comm) do recv(mpi, 0, capacity, dest, tag, comm)
-end
-
-redef class CIntArray
- redef fun send(mpi, at, count, dest, tag, comm)
- do
- var array
- if at != 0 then
- array = native_array + at
- else array = native_array
-
- mpi.native_send(array, count, new DataType.int,
- dest, tag, new Comm.world)
- end
-
- redef fun send_all(mpi, dest, tag, comm) do send(mpi, 0, length, dest, tag, comm)
-
- redef fun recv(mpi, at, count, source, tag, comm)
- do
- var array
- if at != 0 then
- array = native_array + at
- else array = native_array
-
- mpi.native_recv(array, count, new DataType.int,
- source, tag, new Comm.world, new Status.ignore)
- end
-
- redef fun recv_fill(mpi, dest, tag, comm) do recv(mpi, 0, length, dest, tag, comm)
-end
-
-# Shortcut to the world communicator (same as `new Comm.world`)
-fun comm_world: Comm do return once new Comm.world