Using Threads in a parallel mapreduce
One might want to carry out a computation across several nodes of a cluster, where each node uses multithreading to evaluate a result that is subsequently reduced across all nodes. We walk through one such example where we concatenate arrays that are locally initialized on each node using threads.
We load the packages necessary, in this case these are ParallelUtilities
and Distributed
.
using ParallelUtilities
using Distributed
We create a function to initailize the local part on each worker. In this case we simulate a heavy workload by adding a sleep
period. In other words we assume that the individual elements of the array are expensive to evaluate. We use Threads.@threads
to split up the loop into sections that are processed on invidual threads.
function initializenode_threads(sleeptime)
s = zeros(Int, 2_000)
Threads.@threads for ind in eachindex(s)
sleep(sleeptime)
s[ind] = ind
end
return s
end
We create a main function that runs on the calling process and launches the array initialization task on each node. This is run on a WorkerPool
consisting of one worker per node which acts as the root process. We may obtain such a pool through the function ParallelUtilities.workerpool_nodes()
. The array creation step on each node is followed by an eventual concatenation.
function main_threads(sleeptime)
# obtain the workerpool with one process on each node
pool = ParallelUtilities.workerpool_nodes()
# obtain the number of workers in the pool.
nw_nodes = nworkers(pool)
# Evaluate the parallel mapreduce
pmapreduce(x -> initializenode_threads(sleeptime), hcat, pool, 1:nw_nodes)
end
We compare the results with a serial execution that uses a similar workflow, except we use mapreduce
instead of pmapreduce
and do not use threads.
function initialize_serial(sleeptime)
s = zeros(Int, 2_000)
for ind in eachindex(s)
sleep(sleeptime)
s[ind] = ind
end
return s
end
function main_serial(sleeptime)
pool = ParallelUtilities.workerpool_nodes()
nw_nodes = nworkers(pool)
mapreduce(x -> initialize_serial(sleeptime), hcat, 1:nw_nodes)
end
We create a function to compare the performance of the two. We start with a precompilation run with no sleep time, followed by recording the actual timings with a sleep time of 5 milliseconds for each index of the array.
function compare_with_serial()
# precompile
main_serial(0)
main_threads(0)
# time
println("Testing serial")
A = @time main_serial(5e-3);
println("Testing threads")
B = @time main_threads(5e-3);
println("Results match : ", A == B)
end
We run this script on a Slurm cluster across 2 nodes with 28 cores on each node. The results are:
julia> compare_with_serial()
Testing serial
24.601593 seconds (22.49 k allocations: 808.266 KiB)
Testing threads
0.666256 seconds (3.71 k allocations: 201.703 KiB)
Results match : true
The full script may be found in the examples directory.