Data structures


The basic structure for working with loaded data, distributed amongst workers. In completeness, it represents a dataset as such:

  • val is the "value name" under which the data are saved in processes. E.g. val=:foo means that there is a variable foo on each process holding a part of the matrix.
  • workers is a list of workers (in correct order!) that hold the data (similar to DArray.pids)

Base functions

dexec(val, fn, workers)

Execute a function on workers, taking val as a parameter. Results are not collected. This is optimal for various side-effect-causing computations that are not easily expressible with dtransform.

dmap(arr::Vector, fn, workers)

Call a function fn on workers, with a single parameter arriving from the corresponding position in arr.

dmapreduce(val, map, fold, workers; prefetch = :all)

A distributed work-alike of the standard mapreduce: Take a function map (a non-modifying transform on the data) and fold (2-to-1 reduction of the results of map), systematically run them on the data described by val distributed on workers, and return the final reduced result.

It is assumed that the fold operation is associative, but not commutative (as in semigroups). If there are no workers, operation returns nothing (we don't have a monoid to magically conjure zero elements :[ ).

In the current version, the reduce step is a sequential left fold, executed in the main process. Parameter prefetch says how many futures should be fetched in advance; increasing prefetch improves the throughput but increases memory usage in case the results of map are big.


# compute the mean of all distributed data
sum,len = dmapreduce(:myData,
    (d) -> (sum(d),length(d)),
    ((s1, l1), (s2, l2)) -> (s1+s2, l1+l2),

Processing multiple arguments (a.k.a. "zipWith")

The val here does not necessarily need to refer to a symbol, you can easily pass in a quoted tuple, which will be unquoted in the function parameter. For example, distributed values :a and :b can be joined as such:

    ((a,b)::Tuple) -> [a b],
dmapreduce(dInfo::Dinfo, map, fold)

Distributed map/reduce (just as the other overload of dmapreduce) that works with Dinfo.

dmapreduce(dInfo1::Dinfo, dInfo2::Dinfo, map, fold)

Variant of dmapreduce that works with more Dinfos at once. The data must be distributed on the same set of workers, in the same order.

dmapreduce(vals::Vector, map, fold, workers)

Variant of dmapreduce that works with more distributed variables at once.

dpmap(fn, args...; mod = Main, kwargs...)

"Distributed pool map."

A wrapper for pmap from Distributed package that executes the code in the correct module, so that it can access the distributed variables at remote workers. All arguments other than the first function fn are passed to pmap.

The function fn should return an expression that is going to get evaluated.


using Distributed
dpmap(x -> :(computeSomething(someData, $x)), WorkerPool(workers), Vector(1:10))
di = distributeSomeData()
dpmap(x -> :(computeSomething($(di.val), $x)), CachingPool(di.workers), Vector(1:10))
dtransform(dInfo::Dinfo, fn, tgt::Symbol=dInfo.val)::Dinfo

Same as dtransform, but specialized for Dinfo.

dtransform(val, fn, workers, tgt::Symbol=val)

Transform the worker-local distributed data available as val on workers in-place, by a function fn. Store the result as tgt (default val)


# multiply all saved data by 2
dtransform(:myData, (d)->(2*d), workers())
gather_array(val::Symbol, workers, dim=1; free=false)

Collect the arrays distributed on workers under value val into an array. The individual arrays are pasted in the dimension specified by dim, i.e. dim=1 is roughly equivalent to using vcat, and dim=2 to hcat.

val must be an Array-based type; the function will otherwise fail.

If free is true, the val is unscattered after being gathered.

This preallocates the array for results, and is thus more efficient than e.g. using dmapreduce with vcat for folding.

gather_array(dInfo::Dinfo, dim=1; free=false)

Distributed gather_array (just as the other overload) that works with Dinfo.


Get a value val from a remote worker; quoting of val works just as with save_at. Returns a future with the requested value.

save_at(worker, sym, val)

Saves value val to symbol sym at worker. sym should be quoted (or contain a symbol). val gets unquoted in the processing and evaluated at the worker, quote it if you want to pass exact command to the worker.

This is loosely based on the package ParallelDataTransfers, but made slightly more flexible by omitting/delaying the explicit fetches etc. In particular, save_at is roughly the same as ParallelDataTransfers.sendto, and get_val_from works very much like ParallelDataTransfers.getfrom.

Return value

A future with Nothing that can be fetched to see that the operation has finished.


save_at(2,:x,123)       # saves 123
save_at(2,:x,myid())    # saves 1
save_at(2,:x,:(myid())) # saves 2
save_at(2,:x,:(:x))     # saves the symbol :x
                        # (just :x won't work because of unquoting)

Note: Symbol scope

The symbols are saved in Main module on the corresponding worker. For example, save_at(1, :x, nothing) will erase your local x variable. Beware of name collisions.

scatter_array(sym, x::Array, workers; dim=1)::Dinfo

Distribute roughly equal parts of array x separated on dimension dim among workers into a worker-local variable sym.

Returns the Dinfo structure for the distributed data.

tmp_symbol(dInfo::Dinfo; prefix="", suffix="_tmp")

Decorate the symbol from dInfo with prefix and suffix.

tmp_symbol(s::Symbol; prefix="", suffix="_tmp")

Decorate a symbol s with prefix and suffix, to create a good name for a related temporary value.


Remove the loaded data described by dInfo from the corresponding workers.

@remote expr

In a function that will get evaluated on a remote worker, this ensures the evaluation scope of the expression expr (usually a variable) is taken on the remote side, preventing namespace clash with the local session.

This is mainly useful for making the functions from Distributed package (such as pmap and remotecall) work with the data stored by DistributedData package.

Internally, this is handled by wrapping in eval.


julia> save_at(2, :x, 321)
Future(2, 1, 162, nothing)

julia> let x=123
         remotecall_fetch(() -> x + (@remote x), 2)

Higher-level array operations

catmapbuckets(fn, a::Array, nbuckets::Int, buckets::Vector{Int}; bucketdim::Int=1)

Same as mapbuckets, except concatenates the bucketing results in the bucketing dimension, thus creating a slightly neater matrix. slicedims is therefore fixed to bucketdim.

combine_stats((s1, sqs1, n1), (s2, sqs2, n2))

Helper for dstat-style functions that just adds up elements in triplets of vectors.

dapply_cols(dInfo::Dinfo, fn, columns::Vector{Int})

Apply a function fn over columns of a distributed dataset.

fn gets 2 parameters:

  • a data vector for (the whole column saved at one worker)
  • index of the column in the columns array (i.e. a number from 1:length(columns))
dapply_rows(dInfo::Dinfo, fn)

Apply a function fn over rows of a distributed dataset.

fn gets a single vector parameter for each row to transform.

dcopy(dInfo::Dinfo, newName::Symbol)

Clone the dataset and store it under a new distributed name newName.

dcount(ncats::Int, dInfo::Dinfo)::Vector{Int}

Count the numbers of integer vector values stored in dInfo; assuming the values are in range 1–ncats.

dcount_buckets(ncats::Int, dInfo::Dinfo, nbuckets::Int, buckets::Dinfo)::Matrix{Int}

Same as dcount, but counts the items in dInfo bucketed by buckets to produce a matrix of counts, with ncats rows and nbuckets columns.

Useful with distributeFCSFileVector to determine cluster distribution within files.

dmedian(dInfo::Dinfo, columns::Vector{Int})

Compute a median in a distributed fashion, avoiding data transfer and memory capacity that is required to compute the median in the classical way by sorting. All data must be finite and defined. If the median is just between 2 values, the lower one is chosen.

The algorithm is approximative, searching for a good median by halving interval and counting how many values are below the threshold. iters can be increased to improve precision, each value adds roughly 1 bit to the precision. The default value is 20, which corresponds to precision 10e-6 times the data range.

dmedian_buckets(dInfo::Dinfo, nbuckets::Int, buckets::Dinfo, columns::Vector{Int}; iters=20)

A version of dmedian that works with the bucketing information (i.e. clusters) from nbuckets and buckets.

dscale(dInfo::Dinfo, columns::Vector{Int})

Scale the columns in the dataset to have mean 0 and sdev 1.

Prevents creation of NaNs by avoiding division by zero sdevs.

dselect(dInfo::Dinfo, columns::Vector{Int}; tgt=dInfo.val)

Reduce dataset to selected columns, optionally save it under a different name.

function dselect(dInfo::Dinfo,
    currentColnames::Vector{String}, selectColnames::Vector{String};

Convenience overload of dselect that works with column names.

dstat(dInfo::Dinfo, columns::Vector{Int})::Tuple{Vector{Float64}, Vector{Float64}}

Compute mean and standard deviation of the columns in dataset. Returns a tuple with a vector of means in columns, and a vector of corresponding sdevs.

dstat_buckets(dInfo::Dinfo, nbuckets::Int, buckets::Dinfo, columns::Vector{Int})::Tuple{Matrix{Float64}, Matrix{Float64}}

A version of dstat that works with bucketing information (e.g. clusters); returns a tuple of matrices.

mapbuckets(fn, a::Array, nbuckets::Int, buckets::Vector{Int}; bucketdim::Int=1, slicedims=bucketdim)

Apply the function fn over array a so that it processes the data by buckets defined by buckets (that contains integers in range 1:nbuckets).

The buckets are sliced out in dimension specified by bucketdim.

reduce_extrema(ex1, ex2)

Helper for gathering the minima and maxima of the data. ex1, ex2 are arrays of pairs (min,max), this function combines the arrays element-wise and finds combined minima and maxima.

update_extrema(counts, target, lim, mid)

Helper for distributed median computation – returns updated extrema in lims depending on whether the item count in counts of values less than mids is less or higher than targets.


dload(sym::Symbol, pids, files=defaultFiles(sym,pids))

Import the content of symbol sym by each worker specified by pids from the corresponding filename in files.

dload(dInfo::Dinfo, files=defaultFiles(dInfo.val, dInfo.workers))

Overloaded functionality for Dinfo.

dstore(sym::Symbol, pids, files=defaultFiles(sym,pids))

Export the content of symbol sym by each worker specified by pids to a corresponding filename in files.

dstore(dInfo::Dinfo, files=defaultFiles(dInfo.val, dInfo.workers))

Overloaded functionality for Dinfo.

dunlink(sym::Symbol, pids, files=defaultFiles(sym,pids))

Remove the files created by dstore with the same parameters.

dunlink(dInfo::Dinfo, files=defaultFiles(dInfo.val, dInfo.workers))

Overloaded functionality for Dinfo.