ClusterManagers

Support for different job queue systems commonly used on compute clusters.

Currently supported job queue systems

Job queue systemCommand to add processors
Sun Grid Engineaddprocs_sge(np::Integer, queue="") or addprocs(SGEManager(np, queue))
PBSaddprocs_pbs(np::Integer, queue="") or addprocs(PBSManager(np, queue))
Scyldaddprocs_scyld(np::Integer) or addprocs(ScyldManager(np))
HTCondoraddprocs_htc(np::Integer) or addprocs(HTCManager(np))
Slurmaddprocs_slurm(np::Integer; kwargs...) or addprocs(SlurmManager(np); kwargs...)
Local manager with CPU affinity settingaddprocs(LocalAffinityManager(;np=CPU_CORES, mode::AffinityMode=BALANCED, affinities=[]); kwargs...)

You can also write your own custom cluster manager; see the instructions in the Julia manual

Slurm: a simple example

using ClusterManagers

# Arguments to the Slurm srun(1) command can be given as keyword
# arguments to addprocs.  The argument name and value is translated to
# a srun(1) command line argument as follows:
# 1) If the length of the argument is 1 => "-arg value",
#    e.g. t="0:1:0" => "-t 0:1:0"
# 2) If the length of the argument is > 1 => "--arg=value"
#    e.g. time="0:1:0" => "--time=0:1:0"
# 3) If the value is the empty string, it becomes a flag value,
#    e.g. exclusive="" => "--exclusive"
# 4) If the argument contains "_", they are replaced with "-",
#    e.g. mem_per_cpu=100 => "--mem-per-cpu=100"
addprocs(SlurmManager(2), partition="debug", t="00:5:00")

hosts = []
pids = []
for i in workers()
	host, pid = fetch(@spawnat i (gethostname(), getpid()))
	push!(hosts, host)
	push!(pids, pid)
end

# The Slurm resource allocation is released when all the workers have
# exited
for i in workers()
	rmprocs(i)
end

SGE - a simple interactive example

julia> using ClusterManagers

julia> ClusterManagers.addprocs_sge(5)
job id is 961, waiting for job to start .
5-element Array{Any,1}:
2
3
4
5
6

julia> @parallel for i=1:5
       run(`hostname`)
       end

julia>  From worker 2:  compute-6
        From worker 4:  compute-6
        From worker 5:  compute-6
        From worker 6:  compute-6
        From worker 3:  compute-6

SGE - an example with resource list

Some clusters require the user to specify a list of required resources. For example, it may be necessary to specify how much memory will be needed by the job - see this issue.

julia> using ClusterManagers

julia> addprocs_sge(5,res_list="h_vmem=4G,tmem=4G")
job id is 9827051, waiting for job to start ........
5-element Array{Int64,1}:
 22
 23
 24
 25
 26

julia> pmap(x->run(`hostname`),workers());

julia>  From worker 26: lum-7-2.local
        From worker 23: pace-6-10.local
        From worker 22: chong-207-10.local
        From worker 24: pace-6-11.local
        From worker 25: cheech-207-16.local

Using LocalAffinityManager (for pinning local workers to specific cores)

  • Linux only feature.
  • Requires the Linux taskset command to be installed.
  • Usage : addprocs(LocalAffinityManager(;np=CPU_CORES, mode::AffinityMode=BALANCED, affinities=[]); kwargs...).

where

  • np is the number of workers to be started.
  • affinities, if specified, is a list of CPU IDs. As many workers as entries in affinities are launched. Each worker is pinned to the specified CPU ID.
  • mode (used only when affinities is not specified, can be either COMPACT or BALANCED) - COMPACT results in the requested number of workers pinned to cores in increasing order, For example, worker1 => CPU0, worker2 => CPU1 and so on. BALANCED tries to spread the workers. Useful when we have multiple CPU sockets, with each socket having multiple cores. A BALANCED mode results in workers spread across CPU sockets. Default is BALANCED.

Using ElasticManager (dynamically adding workers to a cluster)

The ElasticManager is useful in scenarios where we want to dynamically add workers to a cluster. It achieves this by listening on a known port on the master. The launched workers connect to this port and publish their own host/port information for other workers to connect to.

Usage

On the master, you need to instantiate an instance of ElasticManager. The constructors defined are:

ElasticManager(;addr=IPv4("127.0.0.1"), port=9009, cookie=nothing, topology=:all_to_all)
ElasticManager(port) = ElasticManager(;port=port)
ElasticManager(addr, port) = ElasticManager(;addr=addr, port=port)
ElasticManager(addr, port, cookie) = ElasticManager(;addr=addr, port=port, cookie=cookie)

On the worker, you need to call ClusterManagers.elastic_worker with the addr/port that the master is listening on and the same cookie. elastic_worker is defined as:

ClusterManagers.elastic_worker(cookie, addr="127.0.0.1", port=9009; stdout_to_master=true)

For example, on the master:

using ClusterManagers
em=ElasticManager(cookie="foobar")

and launch each worker locally as echo "using ClusterManagers; ClusterManagers.elastic_worker(\"foobar\")" | julia &

or if you want a REPL on the worker, you can start a julia process normally and manually enter

using ClusterManagers
@schedule ClusterManagers.elastic_worker("foobar", "addr_of_master", port_of_master; stdout_to_master=false)

The above will yield back the REPL prompt and also display any printed output locally.