Implementation of the Message Passing Interface protocol by wrapping OpenMPI

OpenMPI is used only at linking and for it's mpi.h. Other implementations could be used without much modification.

Supports transfer of any valid Serializable instances as well as basic C arrays defined in module c. Using C arrays is encouraged when performance is critical.

Since this module is a thin wrapper around OpenMPI, in case of missing documentation, you can refer to https://www.open-mpi.org/doc/v1.8/.

Introduced classes

extern class Comm

mpi :: Comm

An MPI communicator
extern class DataType

mpi :: DataType

An MPI data type
extern class ErrorClass

mpi :: ErrorClass

An MPI error class
class MPI

mpi :: MPI

Handle to most MPI services
extern class Op

mpi :: Op

An MPI operation
extern class Rank

mpi :: Rank

An MPI rank within a communcator
interface Receptacle

mpi :: Receptacle

Something which can receive data directly and efficiently from MPI
interface Sendable

mpi :: Sendable

Something sendable directly and efficiently over MPI
extern class Status

mpi :: Status

Status of a communication used by MPI::probe
extern class SuccessOrError

mpi :: SuccessOrError

An MPI return code to report success or errors
extern class Tag

mpi :: Tag

An MPI tag, can be defined using Int::tag

Redefined classes

redef abstract class CArray[E: nullable Object]

mpi :: mpi $ CArray

A thin wrapper around a NativeCArray adding length information
redef class CIntArray

mpi :: mpi $ CIntArray

Wrapper around an array of int in C (int*) with length and destroy state
redef class FlatBuffer

mpi :: mpi $ FlatBuffer

Mutable strings of characters.
redef enum Int

mpi :: mpi $ Int

Native integer numbers.
redef class Sys

mpi :: mpi $ Sys

The main class of the program.
redef abstract class Text

mpi :: mpi $ Text

High-level abstraction for all text representations

All class definitions

redef abstract class CArray[E: nullable Object]

mpi :: mpi $ CArray

A thin wrapper around a NativeCArray adding length information
redef class CIntArray

mpi :: mpi $ CIntArray

Wrapper around an array of int in C (int*) with length and destroy state
extern class Comm

mpi $ Comm

An MPI communicator
extern class DataType

mpi $ DataType

An MPI data type
extern class ErrorClass

mpi $ ErrorClass

An MPI error class
redef class FlatBuffer

mpi :: mpi $ FlatBuffer

Mutable strings of characters.
redef enum Int

mpi :: mpi $ Int

Native integer numbers.
class MPI

mpi $ MPI

Handle to most MPI services
extern class Op

mpi $ Op

An MPI operation
extern class Rank

mpi $ Rank

An MPI rank within a communcator
interface Receptacle

mpi $ Receptacle

Something which can receive data directly and efficiently from MPI
interface Sendable

mpi $ Sendable

Something sendable directly and efficiently over MPI
extern class Status

mpi $ Status

Status of a communication used by MPI::probe
extern class SuccessOrError

mpi $ SuccessOrError

An MPI return code to report success or errors
redef class Sys

mpi :: mpi $ Sys

The main class of the program.
extern class Tag

mpi $ Tag

An MPI tag, can be defined using Int::tag
redef abstract class Text

mpi :: mpi $ Text

High-level abstraction for all text representations
package_diagram mpi::mpi mpi c c mpi::mpi->c json json mpi::mpi->json core core c->core parser_base parser_base json->parser_base serialization serialization json->serialization ...core ... ...core->core ...parser_base ... ...parser_base->parser_base ...serialization ... ...serialization->serialization mpi::mpi_simple mpi_simple mpi::mpi_simple->mpi::mpi a_star-m a_star-m a_star-m->mpi::mpi_simple a_star-m... ... a_star-m...->a_star-m

Ancestors

module abstract_collection

core :: abstract_collection

Abstract collection classes and services.
module abstract_text

core :: abstract_text

Abstract class for manipulation of sequences of characters
module array

core :: array

This module introduces the standard array structure.
module bitset

core :: bitset

Services to handle BitSet
module bytes

core :: bytes

Services for byte streams and arrays
module caching

serialization :: caching

Services for caching serialization engines
module circular_array

core :: circular_array

Efficient data structure to access both end of the sequence.
module codec_base

core :: codec_base

Base for codecs to use with streams
module codecs

core :: codecs

Group module for all codec-related manipulations
module collection

core :: collection

This module define several collection classes.
module core

core :: core

Standard classes and methods used by default by Nit programs and libraries.
module engine_tools

serialization :: engine_tools

Advanced services for serialization engines
module environ

core :: environ

Access to the environment variables of the process
module error

core :: error

Standard error-management infrastructure.
module error

json :: error

Intro JsonParseError which is exposed by all JSON reading APIs
module exec

core :: exec

Invocation and management of operating system sub-processes.
module file

core :: file

File manipulations (create, read, write, etc.)
module fixed_ints

core :: fixed_ints

Basic integers of fixed-precision
module fixed_ints_text

core :: fixed_ints_text

Text services to complement fixed_ints
module flat

core :: flat

All the array-based text representations
module gc

core :: gc

Access to the Nit internal garbage collection mechanism
module hash_collection

core :: hash_collection

Introduce HashMap and HashSet.
module inspect

serialization :: inspect

Refine Serializable::inspect to show more useful information
module iso8859_1

core :: iso8859_1

Codec for ISO8859-1 I/O
module kernel

core :: kernel

Most basic classes and methods.
module list

core :: list

This module handle double linked lists
module math

core :: math

Mathematical operations
module meta

meta :: meta

Simple user-defined meta-level to manipulate types of instances as object.
module native

core :: native

Native structures for text and bytes
module numeric

core :: numeric

Advanced services for Numeric types
module parser_base

parser_base :: parser_base

Simple base for hand-made parsers of all kinds
module poset

poset :: poset

Pre order sets and partial order set (ie hierarchies)
module protocol

core :: protocol

module queue

core :: queue

Queuing data structures and wrappers
module range

core :: range

Module for range of discrete objects.
module re

core :: re

Regular expression support for all services based on Pattern
module ropes

core :: ropes

Tree-based representation of a String.
module safe

serialization :: safe

Services for safer deserialization engines
module serialization

serialization :: serialization

General serialization services
module serialization_core

serialization :: serialization_core

Abstract services to serialize Nit objects to different formats
module serialization_read

json :: serialization_read

Services to read JSON: deserialize_json and JsonDeserializer
module serialization_write

json :: serialization_write

Services to write Nit objects to JSON strings: serialize_to_json and JsonSerializer
module sorter

core :: sorter

This module contains classes used to compare things and sorts arrays.
module static

json :: static

Static interface to read Nit objects from JSON strings
module stream

core :: stream

Input and output streams of characters
module text

core :: text

All the classes and methods related to the manipulation of text entities
module time

core :: time

Management of time and dates
module union_find

core :: union_find

union–find algorithm using an efficient disjoint-set data structure
module utf8

core :: utf8

Codec for UTF-8 I/O

Parents

module c

c :: c

Structures and services for compatibility with the C language
module json

json :: json

Read and write JSON formatted text using the standard serialization services

Children

Descendants

module a_star-m

a_star-m

# Implementation of the Message Passing Interface protocol by wrapping OpenMPI
#
# OpenMPI is used only at linking and for it's `mpi.h`. Other implementations
# could be used without much modification.
#
# Supports transfer of any valid `Serializable` instances as well as basic
# C arrays defined in module `c`. Using C arrays is encouraged when performance
# is critical.
#
# Since this module is a thin wrapper around OpenMPI, in case of missing
# documentation, you can refer to https://www.open-mpi.org/doc/v1.8/.
module mpi is
	cflags exec("mpicc", "-showme:compile")
	ldflags exec("mpicc", "-showme:link")
end

import c
intrude import core::text::flat
import serialization
private import json

in "C Header" `{
	#include <mpi.h>
`}

# Handle to most MPI services
class MPI
	# Initialize the MPI execution environment
	init do native_init

	private fun native_init `{ MPI_Init(NULL, NULL); `}

	# Terminates the MPI execution environment
	fun finalize `{ MPI_Finalize(); `}

	# Name of this processor, usually the hostname
	fun processor_name: String import CString.to_s_with_length `{
		char *name = malloc(MPI_MAX_PROCESSOR_NAME);
		int size;
		MPI_Get_processor_name(name, &size);
		return CString_to_s_with_length(name, size);
	`}

	# Send the content of a buffer
	fun send_from(buffer: Sendable, at, count: Int, dest: Rank, tag: Tag, comm: Comm)
	do
		buffer.send(self, at, count, dest, tag, comm)
	end

	# Send the full content of a buffer
	fun send_all(buffer: Sendable, dest: Rank, tag: Tag, comm: Comm)
	do
		buffer.send_all(self, dest, tag, comm)
	end

	# Efficiently receive data in an existing buffer
	fun recv_into(buffer: Receptacle, at, count: Int, source: Rank, tag: Tag, comm: Comm)
	do
		buffer.recv(self, at, count, source, tag, comm)
	end

	# Efficiently receive data and fill an existing buffer
	fun recv_fill(buffer: Receptacle, source: Rank, tag: Tag, comm: Comm)
	do
		buffer.recv_fill(self, source, tag, comm)
	end

	# Send a complex `Serializable` object
	fun send(data: nullable Serializable, dest: Rank, tag: Tag, comm: Comm)
	do
		# Serialize data
		var stream = new StringWriter
		var serializer = new JsonSerializer(stream)
		serializer.serialize(data)

		# Send message
		var str = stream.to_s
		send_from(str, 0, str.length, dest, tag, comm)
	end

	# Receive a complex object
	fun recv(source: Rank, tag: Tag, comm: Comm): nullable Object
	do
		var status = new Status

		# Block until a message in in queue
		var err = probe(source, tag, comm, status)
		assert err.is_success else print err

		# Get message length
		var count = status.count(new DataType.char)
		assert not count.is_undefined

		# Receive message into buffer
		var buffer = new FlatBuffer.with_capacity(count)
		recv_into(buffer, 0, count, status.source, status.tag, comm)

		# Free our status
		status.free

		# Deserialize message
		var deserializer = new JsonDeserializer(buffer)
		var deserialized = deserializer.deserialize

		if deserialized == null then print "|{buffer}|{buffer.chars.join("-")}| {buffer.length}"

		return deserialized
	end

	# Send an empty buffer, only for the `tag`
	fun send_empty(dest: Rank, tag: Tag, comm: Comm): SuccessOrError
	`{
		return MPI_Send(NULL, 0, MPI_CHAR, dest, tag, comm);
	`}

	# Receive an empty buffer, only for the `tag`
	fun recv_empty(dest: Rank, tag: Tag, comm: Comm): SuccessOrError
	`{
		return MPI_Recv(NULL, 0, MPI_CHAR, dest, tag, comm, MPI_STATUS_IGNORE);
	`}

	# Send a `NativeCArray` `buffer` with a given `count` of `data_type`
	fun native_send(buffer: NativeCArray, count: Int, data_type: DataType, dest: Rank, tag: Tag, comm: Comm): SuccessOrError
	`{
		return MPI_Send(buffer, count, data_type, dest, tag, comm);
	`}

	# Receive into a `NativeCArray` `buffer` with a given `count` of `data_type`
	fun native_recv(buffer: NativeCArray, count: Int, data_type: DataType, dest: Rank, tag: Tag, comm: Comm, status: Status): SuccessOrError
	`{
		return MPI_Recv(buffer, count, data_type, dest, tag, comm, status);
	`}

	# Probe for the next data to receive, store the result in `status`
	#
	# Note: If you encounter an error where the next receive does not correspond
	# to the last `probe`, call this method twice to ensure a correct result.
	fun probe(source: Rank, tag: Tag, comm: Comm, status: Status): SuccessOrError
	`{
		return MPI_Probe(source, tag, comm, status);
	`}

	# Synchronize all processors
	fun barrier(comm: Comm) `{ MPI_Barrier(comm); `}

	# Seconds since some time in the past which does not change
	fun wtime: Float `{ return MPI_Wtime(); `}
end

# An MPI communicator
extern class Comm `{ MPI_Comm `}
	# The _null_ communicator, targeting no processors
	new null_ `{ return MPI_COMM_NULL; `}

	# The _world_ communicator, targeting all processors
	new world `{ return MPI_COMM_WORLD; `}

	# The _self_ communicator, targeting this processor only
	new self_ `{ return MPI_COMM_SELF; `}

	# Number of processors in this communicator
	fun size: Int `{
		int size;
		MPI_Comm_size(self, &size);
		return size;
	`}

	# Rank on this processor in this communicator
	fun rank: Rank `{
		int rank;
		MPI_Comm_rank(self, &rank);
		return rank;
	`}
end

# An MPI data type
extern class DataType `{ MPI_Datatype `}
	# Get a MPI char.
	new char `{ return MPI_CHAR; `}

	# Get a MPI short.
	new short `{ return MPI_SHORT; `}

	# Get a MPI int.
	new int `{ return MPI_INT; `}

	# Get a MPI long.
	new long `{ return MPI_LONG; `}

	# Get a MPI long long.
	new long_long `{ return MPI_LONG_LONG; `}

	# Get a MPI unsigned char.
	new unsigned_char `{ return MPI_UNSIGNED_CHAR; `}

	# Get a MPI unsigned short.
	new unsigned_short `{ return MPI_UNSIGNED_SHORT; `}

	# Get a MPI unsigned int.
	new unsigned `{ return MPI_UNSIGNED; `}

	# Get a MPI unsigned long.
	new unsigned_long `{ return MPI_UNSIGNED_LONG; `}

	# Get a MPI unsigned long long.
	new unsigned_long_long `{ return MPI_UNSIGNED_LONG_LONG; `}

	# Get a MPI float.
	new float `{ return MPI_FLOAT; `}

	# Get a MPI double.
	new double `{ return MPI_DOUBLE; `}

	# Get a MPI long double.
	new long_double `{ return MPI_LONG_DOUBLE; `}

	# Get a MPI byte.
	new byte `{ return MPI_BYTE; `}
end

# Status of a communication used by `MPI::probe`
extern class Status `{ MPI_Status* `}
	# Ignore the resulting status
	new ignore `{ return MPI_STATUS_IGNORE; `}

	# Allocated a new `Status`, must be freed with `free`
	new `{ return malloc(sizeof(MPI_Status)); `}

	# Source of this communication
	fun source: Rank `{ return self->MPI_SOURCE; `}

	# Tag of this communication
	fun tag: Tag `{ return self->MPI_TAG; `}

	# Success or error on this communication
	fun error: SuccessOrError `{ return self->MPI_ERROR; `}

	# Count of the given `data_type` in this communication
	fun count(data_type: DataType): Int
	`{
		int count;
		MPI_Get_count(self, data_type, &count);
		return count;
	`}
end

# An MPI operation
#
# Used with the `reduce` method.
#
# See <http://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node78.html>
extern class Op `{ MPI_Op `}
	# Get a MPI null operation.
	new op_null `{ return MPI_OP_NULL; `}

	# Get a MPI maximum operation.
	new max `{ return MPI_MAX; `}

	# Get a MPI minimum operation.
	new min `{ return MPI_MIN; `}

	# Get a MPI sum operation.
	new sum `{ return MPI_SUM; `}

	# Get a MPI product operation.
	new prod `{ return MPI_PROD; `}

	# Get a MPI logical and operation.
	new land `{ return MPI_LAND; `}

	# Get a MPI bit-wise and operation.
	new band `{ return MPI_BAND; `}

	# Get a MPI logical or operation.
	new lor `{ return MPI_LOR; `}

	# Get a MPI bit-wise or operation.
	new bor `{ return MPI_BOR; `}

	# Get a MPI logical xor operation.
	new lxor `{ return MPI_LXOR; `}

	# Get a MPI bit-wise xor operation.
	new bxor `{ return MPI_BXOR; `}

	# Get a MPI minloc operation.
	#
	# Used to compute a global minimum and also an index attached
	# to the minimum value.
	#
	# See <http://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node79.html#Node79>
	new minloc `{ return MPI_MINLOC; `}

	# Get a MPI maxloc operation.
	#
	# Used to compute a global maximum and also an index attached
	# to the maximum value.
	#
	# See <http://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node79.html#Node79>
	new maxloc `{ return MPI_MAXLOC; `}

	# Get a MPI replace operation.
	new replace `{ return MPI_REPLACE; `}
end

# An MPI return code to report success or errors
extern class SuccessOrError `{ int `}
	# Is this a success?
	fun is_success: Bool `{ return self == MPI_SUCCESS; `}

	# Is this an error?
	fun is_error: Bool do return not is_success

	# TODO add is_... for each variant

	# The class of this error
	fun error_class: ErrorClass
	`{
		int class;
		MPI_Error_class(self, &class);
		return class;
	`}

	redef fun to_s do return native_to_s.to_s
	private fun native_to_s: CString `{
		char *err = malloc(MPI_MAX_ERROR_STRING);
		MPI_Error_string(self, err, NULL);
		return err;
	`}
end

# An MPI error class
extern class ErrorClass `{ int `}
	redef fun to_s do return native_to_s.to_s
	private fun native_to_s: CString `{
		char *err = malloc(MPI_MAX_ERROR_STRING);
		MPI_Error_string(self, err, NULL);
		return err;
	`}
end

# An MPI rank within a communcator
extern class Rank `{ int `}
	# Special rank accepting any processor
	new any `{ return MPI_ANY_SOURCE; `}

	# This Rank as an `Int`
	fun to_i: Int `{ return self; `}
	redef fun to_s do return to_i.to_s
end

# An MPI tag, can be defined using `Int::tag`
extern class Tag `{ int `}
	# Special tag accepting any tag
	new any `{ return MPI_ANY_TAG; `}

	# This tag as an `Int`
	fun to_i: Int `{ return self; `}
	redef fun to_s do return to_i.to_s
end

redef universal Int
	# `self`th MPI rank
	fun rank: Rank `{ return self; `}

	# Tag identified by `self`
	fun tag: Tag `{ return self; `}

	# Is this value undefined according to MPI? (may be returned by `Status::count`)
	fun is_undefined: Bool `{ return self == MPI_UNDEFINED; `}
end

# Something sendable directly and efficiently over MPI
#
# Subclasses of `Sendable` should use the native MPI send function, without
# using Nit serialization.
interface Sendable
	# Type specific send over MPI
	protected fun send(mpi: MPI, at, count: Int, dest: Rank, tag: Tag, comm: Comm) is abstract

	# Type specific send full buffer over MPI
	protected fun send_all(mpi: MPI, dest: Rank, tag: Tag, comm: Comm) is abstract
end


# Something which can receive data directly and efficiently from MPI
#
# Subclasses of `Receptacle` should use the native MPI recveive function,
# without using Nit serialization.
interface Receptacle
	# Type specific receive from MPI
	protected fun recv(mpi: MPI, at, count: Int, source: Rank, tag: Tag, comm: Comm) is abstract

	# Type specific receive and fill buffer from MPI
	protected fun recv_fill(mpi: MPI, source: Rank, tag: Tag, comm: Comm) is abstract
end

redef class CArray[E]
	super Sendable
	super Receptacle
end

redef class Text
	super Sendable

	redef fun send(mpi, at, count, dest, tag, comm)
	do
		var str
		if at != 0 or count != length then
			str = substring(at, count)
		else str = self

		mpi.native_send(str.to_cstring, count, new DataType.char,
			dest, tag, new Comm.world)
	end

	redef fun send_all(mpi, dest, tag, comm) do send(mpi, 0, length, dest, tag, comm)
end

redef class FlatBuffer
	super Receptacle

	redef fun recv(mpi, at, count, source, tag, comm)
	do
		var min_capacity = at + count
		if capacity < min_capacity then enlarge min_capacity

		var array
		if at != 0 then
			array = items + at
		else array = items

		mpi.native_recv(array, count, new DataType.char,
			source, tag, new Comm.world, new Status.ignore)

		length = capacity
	end

	redef fun recv_fill(mpi, dest, tag, comm) do recv(mpi, 0, capacity, dest, tag, comm)
end

redef class CIntArray
	redef fun send(mpi, at, count, dest, tag, comm)
	do
		var array
		if at != 0 then
			array = native_array + at
		else array = native_array

		mpi.native_send(array, count, new DataType.int,
			dest, tag, new Comm.world)
	end

	redef fun send_all(mpi, dest, tag, comm) do send(mpi, 0, length, dest, tag, comm)

	redef fun recv(mpi, at, count, source, tag, comm)
	do
		var array
		if at != 0 then
			array = native_array + at
		else array = native_array

		mpi.native_recv(array, count, new DataType.int,
			source, tag, new Comm.world, new Status.ignore)
	end

	redef fun recv_fill(mpi, dest, tag, comm) do recv(mpi, 0, length, dest, tag, comm)
end

# Shortcut to the world communicator (same as `new Comm.world`)
fun comm_world: Comm do return once new Comm.world
lib/mpi/mpi.nit:17,1--487,50