mpi4py.util.sync

Added in version 4.0.0.

The mpi4py.util.sync module provides parallel synchronization utilities.

Sequential execution

class mpi4py.util.sync.Sequential

Sequential execution.

Context manager for sequential execution within a group of MPI processes.

The implementation is based in MPI-1 point-to-point communication. A process with rank i waits in a blocking receive until the previous process rank i-1 finish executing and signals the next rank i with a send.

__init__(comm, tag=0)

Initialize sequential execution.

Parameters:
  • comm (Intracomm) – Intracommunicator context.

  • tag (int) – Tag for point-to-point communication.

Return type:

None

__enter__()

Enter sequential execution.

Return type:

Self

__exit__(*exc)

Exit sequential execution.

Parameters:

exc (object)

Return type:

None

begin()

Begin sequential execution.

Return type:

None

end()

End sequential execution.

Return type:

None

Global counter

class mpi4py.util.sync.Counter

Global counter.

Produce consecutive values within a group of MPI processes. The counter interface is close to that of itertools.count.

The implementation is based in MPI-3 one-sided operations. A root process (typically rank 0) holds the counter, and its value is queried and incremented with an atomic RMA fetch-and-add operation.

__init__(start=0, step=1, *, typecode='i', comm=COMM_SELF, info=INFO_NULL, root=0)

Initialize global counter.

Parameters:
  • start (int) – Start value.

  • step (int) – Increment value.

  • typecode (str) – Type code as defined in the array module.

  • comm (Intracomm) – Intracommunicator context.

  • info (Info) – Info object for RMA context creation.

  • root (int) – Process rank holding the counter memory.

Return type:

None

__iter__()

Implement iter(self).

Return type:

Self

__next__()

Implement next(self).

Return type:

int

next(incr=None)

Return current value and increment.

Parameters:

incr (int | None) – Increment value.

Returns:

The counter value before incrementing.

Return type:

int

free()

Free counter resources.

Return type:

None

Mutual exclusion

class mpi4py.util.sync.Mutex

Mutual exclusion.

Establish a critical section or mutual exclusion among MPI processes.

The mutex interface is close to that of threading.Lock and threading.RLock, allowing the use of either recursive or non-recursive mutual exclusion. However, a mutex should be used within a group of MPI processes, not threads.

In non-recursive mode, the semantics of Mutex are somewhat different than these of threading.Lock:

  • Once acquired, a mutex is held and owned by a process until released.

  • Trying to acquire a mutex already held raises RuntimeError.

  • Trying to release a mutex not yet held raises RuntimeError.

This mutex implementation uses the scalable and fair spinlock algorithm from [mcs-paper] and took inspiration from the MPI-3 RMA implementation of [uam-book].

__init__(*, recursive=False, comm=COMM_SELF, info=INFO_NULL)

Initialize mutex object.

Parameters:
  • comm (Intracomm) – Intracommunicator context.

  • recursive (bool) – Whether to allow recursive acquisition.

  • info (Info) – Info object for RMA context creation.

Return type:

None

__enter__()

Acquire mutex.

Return type:

Self

__exit__(*exc)

Release mutex.

Parameters:

exc (object)

Return type:

None

acquire(blocking=True)

Acquire mutex, blocking or non-blocking.

Parameters:

blocking (bool) – If True, block until the mutex is held.

Returns:

True if the mutex is held, False otherwise.

Return type:

bool

release()

Release mutex.

Return type:

None

locked()

Return whether the mutex is held.

Return type:

bool

count()

Return the recursion count.

Return type:

int

free()

Free mutex resources.

Return type:

None

[mcs-paper]

John M. Mellor-Crummey and Michael L. Scott. Algorithms for scalable synchronization on shared-memory multiprocessors. ACM Transactions on Computer Systems, 9(1):21-65, February 1991. https://doi.org/10.1145/103727.103729

[uam-book]

William Gropp, Torsten Hoefler, Rajeev Thakur, Ewing Lusk. Using Advanced MPI - Modern Features of the Message-Passing Interface. Chapter 4, Section 4.7, Pages 130-131. The MIT Press, November 2014. https://mitpress.mit.edu/9780262527637/using-advanced-mpi/

Condition variable

class mpi4py.util.sync.Condition

Condition variable.

A condition variable allows one or more MPI processes to wait until they are notified by another processes.

The condition variable interface is close to that of threading.Condition, allowing the use of either recursive or non-recursive mutual exclusion. However, the condition variable should be used within a group of MPI processes, not threads.

This condition variable implementation uses a MPI-3 RMA-based scalable and fair circular queue algorithm to track the set of waiting processes.

__init__(mutex=None, *, recursive=True, comm=COMM_SELF, info=INFO_NULL)

Initialize condition variable.

Parameters:
  • mutex (Mutex | None) – Mutual exclusion object.

  • recursive (bool) – Whether to allow recursive acquisition.

  • comm (Intracomm) – Intracommunicator context.

  • info (Info) – Info object for RMA context creation.

Return type:

None

__enter__()

Acquire the underlying mutex.

Return type:

Self

__exit__(*exc)

Release the underlying mutex.

Parameters:

exc (object)

Return type:

None

acquire(blocking=True)

Acquire the underlying mutex.

Parameters:

blocking (bool)

Return type:

bool

release()

Release the underlying mutex.

Return type:

None

locked()

Return whether the underlying mutex is held.

Return type:

bool

wait()

Wait until notified by another process.

Returns:

Always True.

Return type:

Literal[True]

wait_for(predicate)

Wait until a predicate evaluates to True.

Parameters:

predicate (Callable[[], T]) – callable returning a boolean.

Returns:

The result of predicate once it evaluates to True.

Return type:

T

notify(n=1)

Wake up one or more processes waiting on this condition.

Parameters:

n (int) – Maximum number of processes to wake up.

Returns:

The actual number of processes woken up.

Return type:

int

notify_all()

Wake up all processes waiting on this condition.

Returns:

The actual number of processes woken up.

Return type:

int

free()

Free condition resources.

Return type:

None

Semaphore object

class mpi4py.util.sync.Semaphore

Semaphore object.

A semaphore object manages an internal counter which is decremented by each acquire() call and incremented by each release() call. The internal counter never reaches a value below zero; when acquire() finds that it is zero, it blocks and waits until some other process calls release().

The semaphore interface is close to that of threading.Semaphore and threading.BoundedSemaphore, allowing the use of either bounded (default) or unbounded semaphores. With a bounded semaphore, the internal counter never exceeds its initial value; otherwise release() raises ValueError.

This semaphore implementation uses a global Counter and a Condition variable to handle waiting and and notification.

__init__(value=1, *, bounded=True, comm=COMM_SELF, info=INFO_NULL)

Initialize semaphore object.

Parameters:
  • value (int) – Initial value for internal counter.

  • bounded (bool) – Bound internal counter to initial value.

  • comm (Intracomm) – Intracommunicator context.

  • info (Info) – Info object for RMA context creation.

Return type:

None

__enter__()

Acquire semaphore.

Return type:

Self

__exit__(*exc)

Release semaphore.

Parameters:

exc (object)

Return type:

None

acquire(blocking=True)

Acquire semaphore, decrementing the internal counter by one.

Parameters:

blocking (bool) – If True, block until the semaphore is acquired.

Returns:

True if the semaphore is acquired, False otherwise.

Return type:

bool

release(n=1)

Release semaphore, incrementing the internal counter by one or more.

Parameters:

n (int) – Increment for the internal counter.

Return type:

None

free()

Free semaphore resources.

Return type:

None

Examples

test-sync-1.py
 1from mpi4py import MPI
 2from mpi4py.util.sync import Counter, Sequential
 3
 4comm = MPI.COMM_WORLD
 5
 6counter = Counter(comm)
 7with Sequential(comm):
 8   value = next(counter)
 9counter.free()
10
11assert comm.rank == value
test-sync-2.py
 1from mpi4py import MPI
 2from mpi4py.util.sync import Counter, Mutex
 3
 4comm = MPI.COMM_WORLD
 5
 6mutex = Mutex(comm)
 7counter = Counter(comm)
 8with mutex:
 9   value = next(counter)
10counter.free()
11mutex.free()
12
13assert (
14   list(range(comm.size)) ==
15   sorted(comm.allgather(value))
16)