Data Helpers

FluxMPI.DistributedDataContainerType
DistributedDataContainer(data)

data must be compatible with MLUtils interface. The returned container is compatible with MLUtils interface and is used to partition the dataset across the available processes.

General Functions

FluxMPI.InitFunction
Init(; gpu_devices::Union{Nothing,Vector{Int}}=nothing, verbose::Bool=false)

Setup FluxMPI. If GPUs are available and CUDA is functional, each rank is allocated a GPU in a round-robin fashion.

If calling this function, no need to call MPI.Init first.

FluxMPI.fluxmpi_printFunction
fluxmpi_print(args...; kwargs...)

Add rank and size information to the printed statement

FluxMPI.fluxmpi_printlnFunction
fluxmpi_println(args...; kwargs...)

Add rank and size information to the printed statement

MPIExtensions: Blocking Communication Wrappers

FluxMPI.allreduce!Function
allreduce!(v, op, comm)

Perform MPI.Allreduce! ensuring that CuArrays are safely transfered to CPU if CUDA-aware MPI is unavailable/disabled.

FluxMPI.bcast!Function
bcast!(v, op, comm)

Perform MPI.Bcast! ensuring that CuArrays are safely transfered to CPU if CUDA-aware MPI is unavailable/disabled.

FluxMPI.reduce!Function
reduce!(v, op, comm)

Perform MPI.Reduce! ensuring that CuArrays are safely transfered to CPU if CUDA-aware MPI is unavailable/disabled.

MPIExtensions: Non-Blocking Communication

FluxMPI.Iallreduce!Function
Iallreduce!(sendbuf, recvbuf, op, comm)
Iallreduce!(sendrecvbuf, op, comm)

Performs non-blocking elementwise reduction using the operator op on the buffer sendbuf. sendbuf can also be a scalar, in which case recvbuf will be a value of the same type.

recvbuf and an MPI_Request object are returned. The value in recvbuf is only valid after the request has been completed. (MPI.Wait!)

Warning

OpenMPI doesn't support Iallreduce! with CUDA. See this issue.

FluxMPI.Ibcast!Function
Ibcast!(buf, root, comm)

Non-blocking broadcast of the buffer buf to all processes in comm.

recvbuf and an MPI_Request object are returned. The value in recvbuf is only valid after the request has been completed. (MPI.Wait!)

Optimization

FluxMPI.DistributedOptimizerType
DistributedOptimizer(optimizer)

Wrap the optimizer in a DistributedOptimizer. Before updating the parameters, this adds the gradients across the processes using non-blocking Allreduce

Arguments

  • optimizer: An Optimizer compatible with the Optimisers.jl package
Note

Remember to scale the loss function by 1 / total_workers() to ensure that the gradients are correctly averaged

FluxMPI.allreduce_gradientsFunction
allreduce_gradients(gs::NamedTuple; on_gpu::Bool=CUDA.functional())

Allreduce the gradients. This uses a non-blocking API which will be efficient for large containers of multiple parameter arrays.

Arguments

  • gs: A NamedTuple of gradients

Keyword Arguments

  • on_gpu: Specify if the gradients are on GPU. Defaults to CUDA.functional()

Returns

  • Allreduced NamedTuple of gradients

Synchronization

FluxMPI.synchronize!Function
synchronize!(x; root_rank::Integer=0)

Synchronize x across all processes.

Note

this function is not in-place for CuArrays when MPI is not CUDA aware.

Configuration

FluxMPI.disable_cudampi_supportFunction
disable_cudampi_support(; disable=true)

Disable CUDA MPI support. Julia Session needs to be restarted for this to take effect.