# Functions

## Data structures

`DistributedData.Dinfo`

— Type`Dinfo`

The basic structure for working with loaded data, distributed amongst workers. In completeness, it represents a dataset as such:

`val`

is the "value name" under which the data are saved in processes. E.g.`val=:foo`

means that there is a variable`foo`

on each process holding a part of the matrix.`workers`

is a list of workers (in correct order!) that hold the data (similar to`DArray.pids`

)

## Base functions

`DistributedData.dexec`

— Method`dexec(val, fn, workers)`

Execute a function on workers, taking `val`

as a parameter. Results are not collected. This is optimal for various side-effect-causing computations that are not easily expressible with `dtransform`

.

`DistributedData.dexec`

— Method`dexec(dInfo::Dinfo, fn)`

Variant of `dexec`

that works with `Dinfo`

.

`DistributedData.dmap`

— Method`dmap(arr::Vector, fn, workers)`

Call a function `fn`

on `workers`

, with a single parameter arriving from the corresponding position in `arr`

.

`DistributedData.dmapreduce`

— Method`dmapreduce(val, map, fold, workers; prefetch = :all)`

A distributed work-alike of the standard `mapreduce`

: Take a function `map`

(a non-modifying transform on the data) and `fold`

(2-to-1 reduction of the results of `map`

), systematically run them on the data described by `val`

distributed on `workers`

, and return the final reduced result.

It is assumed that the fold operation is associative, but not commutative (as in semigroups). If there are no workers, operation returns `nothing`

(we don't have a monoid to magically conjure zero elements :[ ).

In the current version, the reduce step is a sequential left fold, executed in the main process. Parameter `prefetch`

says how many futures should be `fetch`

ed in advance; increasing prefetch improves the throughput but increases memory usage in case the results of `map`

are big.

**Example**

```
# compute the mean of all distributed data
sum,len = dmapreduce(:myData,
(d) -> (sum(d),length(d)),
((s1, l1), (s2, l2)) -> (s1+s2, l1+l2),
workers())
println(sum/len)
```

**Processing multiple arguments (a.k.a. "zipWith")**

The `val`

here does not necessarily need to refer to a symbol, you can easily pass in a quoted tuple, which will be unquoted in the function parameter. For example, distributed values `:a`

and `:b`

can be joined as such:

```
dmapreduce(:((a,b)),
((a,b)::Tuple) -> [a b],
vcat,
workers())
```

`DistributedData.dmapreduce`

— Method`dmapreduce(dInfo::Dinfo, map, fold)`

Distributed map/reduce (just as the other overload of `dmapreduce`

) that works with `Dinfo`

.

`DistributedData.dmapreduce`

— Method`dmapreduce(dInfo1::Dinfo, dInfo2::Dinfo, map, fold)`

Variant of `dmapreduce`

that works with more `Dinfo`

s at once. The data must be distributed on the same set of workers, in the same order.

`DistributedData.dmapreduce`

— Method`dmapreduce(vals::Vector, map, fold, workers)`

Variant of `dmapreduce`

that works with more distributed variables at once.

`DistributedData.dpmap`

— Method`dpmap(fn, args...; mod = Main, kwargs...)`

"Distributed pool map."

A wrapper for `pmap`

from `Distributed`

package that executes the code in the correct module, so that it can access the distributed variables at remote workers. All arguments other than the first function `fn`

are passed to `pmap`

.

The function `fn`

should return an expression that is going to get evaluated.

**Example**

```
using Distributed
dpmap(x -> :(computeSomething(someData, $x)), WorkerPool(workers), Vector(1:10))
```

```
di = distributeSomeData()
dpmap(x -> :(computeSomething($(di.val), $x)), CachingPool(di.workers), Vector(1:10))
```

`DistributedData.dtransform`

— Function`dtransform(dInfo::Dinfo, fn, tgt::Symbol=dInfo.val)::Dinfo`

Same as `dtransform`

, but specialized for `Dinfo`

.

`DistributedData.dtransform`

— Function`dtransform(val, fn, workers, tgt::Symbol=val)`

Transform the worker-local distributed data available as `val`

on `workers`

in-place, by a function `fn`

. Store the result as `tgt`

(default `val`

)

**Example**

```
# multiply all saved data by 2
dtransform(:myData, (d)->(2*d), workers())
```

`DistributedData.gather_array`

— Function`gather_array(val::Symbol, workers, dim=1; free=false)`

Collect the arrays distributed on `workers`

under value `val`

into an array. The individual arrays are pasted in the dimension specified by `dim`

, i.e. `dim=1`

is roughly equivalent to using `vcat`

, and `dim=2`

to `hcat`

.

`val`

must be an Array-based type; the function will otherwise fail.

If `free`

is true, the `val`

is `unscatter`

ed after being gathered.

This preallocates the array for results, and is thus more efficient than e.g. using `dmapreduce`

with `vcat`

for folding.

`DistributedData.gather_array`

— Function`gather_array(dInfo::Dinfo, dim=1; free=false)`

Distributed gather_array (just as the other overload) that works with `Dinfo`

.

`DistributedData.get_from`

— Method`get_from(worker,val)`

Get a value `val`

from a remote `worker`

; quoting of `val`

works just as with `save_at`

. Returns a future with the requested value.

`DistributedData.get_val_from`

— Method`get_val_from(worker,val)`

Shortcut for instantly fetching the future from `get_from`

.

`DistributedData.remove_from`

— Method`remove_from(worker,sym)`

Sets symbol `sym`

on `worker`

to `nothing`

, effectively freeing the data.

`DistributedData.save_at`

— Method`save_at(worker, sym, val)`

Saves value `val`

to symbol `sym`

at `worker`

. `sym`

should be quoted (or contain a symbol). `val`

gets unquoted in the processing and evaluated at the worker, quote it if you want to pass exact command to the worker.

This is loosely based on the package ParallelDataTransfers, but made slightly more flexible by omitting/delaying the explicit fetches etc. In particular, `save_at`

is roughly the same as `ParallelDataTransfers.sendto`

, and `get_val_from`

works very much like `ParallelDataTransfers.getfrom`

.

**Return value**

A future with Nothing that can be fetched to see that the operation has finished.

**Examples**

```
addprocs(1)
save_at(2,:x,123) # saves 123
save_at(2,:x,myid()) # saves 1
save_at(2,:x,:(myid())) # saves 2
save_at(2,:x,:(:x)) # saves the symbol :x
# (just :x won't work because of unquoting)
```

**Note: Symbol scope**

The symbols are saved in Main module on the corresponding worker. For example, `save_at(1, :x, nothing)`

*will* erase your local `x`

variable. Beware of name collisions.

`DistributedData.scatter_array`

— Method`scatter_array(sym, x::Array, workers; dim=1)::Dinfo`

Distribute roughly equal parts of array `x`

separated on dimension `dim`

among `workers`

into a worker-local variable `sym`

.

Returns the `Dinfo`

structure for the distributed data.

`DistributedData.tmp_symbol`

— Method`tmp_symbol(dInfo::Dinfo; prefix="", suffix="_tmp")`

Decorate the symbol from `dInfo`

with prefix and suffix.

`DistributedData.tmp_symbol`

— Method`tmp_symbol(s::Symbol; prefix="", suffix="_tmp")`

Decorate a symbol `s`

with prefix and suffix, to create a good name for a related temporary value.

`DistributedData.unscatter`

— Method`unscatter(dInfo::Dinfo)`

Remove the loaded data described by `dInfo`

from the corresponding workers.

`DistributedData.unscatter`

— Method`unscatter(sym, workers)`

Remove the loaded data from workers.

`DistributedData.@remote`

— Macro`@remote module expr`

A version of `@remote`

that adds additional choice of the module for scope.

`DistributedData.@remote`

— Macro`@remote expr`

In a function that will get evaluated on a remote worker, this ensures the evaluation scope of the expression `expr`

(usually a variable) is taken on the remote side, preventing namespace clash with the local session.

This is mainly useful for making the functions from `Distributed`

package (such as `pmap`

and `remotecall`

) work with the data stored by `DistributedData`

package.

Internally, this is handled by wrapping in `eval`

.

**Example**

```
julia> save_at(2, :x, 321)
Future(2, 1, 162, nothing)
julia> let x=123
remotecall_fetch(() -> x + (@remote x), 2)
end
444
```

## Higher-level array operations

`DistributedData.catmapbuckets`

— Method`catmapbuckets(fn, a::Array, nbuckets::Int, buckets::Vector{Int}; bucketdim::Int=1)`

Same as `mapbuckets`

, except concatenates the bucketing results in the bucketing dimension, thus creating a slightly neater matrix. `slicedims`

is therefore fixed to `bucketdim`

.

`DistributedData.combine_stats`

— Method`combine_stats((s1, sqs1, n1), (s2, sqs2, n2))`

Helper for `dstat`

-style functions that just adds up elements in triplets of vectors.

`DistributedData.dapply_cols`

— Method`dapply_cols(dInfo::Dinfo, fn, columns::Vector{Int})`

Apply a function `fn`

over columns of a distributed dataset.

`fn`

gets *2* parameters:

- a data vector for (the whole column saved at one worker)
- index of the column in the
`columns`

array (i.e. a number from`1:length(columns)`

)

`DistributedData.dapply_rows`

— Method`dapply_rows(dInfo::Dinfo, fn)`

Apply a function `fn`

over rows of a distributed dataset.

`fn`

gets a single vector parameter for each row to transform.

`DistributedData.dcopy`

— Method`dcopy(dInfo::Dinfo, newName::Symbol)`

Clone the dataset and store it under a new distributed name `newName`

.

`DistributedData.dcount`

— Method`dcount(ncats::Int, dInfo::Dinfo)::Vector{Int}`

Count the numbers of integer vector values stored in `dInfo`

; assuming the values are in range 1–`ncats`

.

`DistributedData.dcount_buckets`

— Method`dcount_buckets(ncats::Int, dInfo::Dinfo, nbuckets::Int, buckets::Dinfo)::Matrix{Int}`

Same as `dcount`

, but counts the items in `dInfo`

bucketed by `buckets`

to produce a matrix of counts, with `ncats`

rows and `nbuckets`

columns.

Useful with `distributeFCSFileVector`

to determine cluster distribution within files.

`DistributedData.dmedian`

— Method`dmedian(dInfo::Dinfo, columns::Vector{Int})`

Compute a median in a distributed fashion, avoiding data transfer and memory capacity that is required to compute the median in the classical way by sorting. All data must be finite and defined. If the median is just between 2 values, the lower one is chosen.

The algorithm is approximative, searching for a good median by halving interval and counting how many values are below the threshold. `iters`

can be increased to improve precision, each value adds roughly 1 bit to the precision. The default value is 20, which corresponds to precision 10e-6 times the data range.

`DistributedData.dmedian_buckets`

— Method`dmedian_buckets(dInfo::Dinfo, nbuckets::Int, buckets::Dinfo, columns::Vector{Int}; iters=20)`

A version of `dmedian`

that works with the bucketing information (i.e. clusters) from `nbuckets`

and `buckets`

.

`DistributedData.dscale`

— Method`dscale(dInfo::Dinfo, columns::Vector{Int})`

Scale the columns in the dataset to have mean 0 and sdev 1.

Prevents creation of NaNs by avoiding division by zero sdevs.

`DistributedData.dselect`

— Function`dselect(dInfo::Dinfo, columns::Vector{Int}; tgt=dInfo.val)`

Reduce dataset to selected columns, optionally save it under a different name.

`DistributedData.dselect`

— Function```
function dselect(dInfo::Dinfo,
currentColnames::Vector{String}, selectColnames::Vector{String};
tgt=dInfo.val)::Dinfo
```

Convenience overload of `dselect`

that works with column names.

`DistributedData.dstat`

— Method`dstat(dInfo::Dinfo, columns::Vector{Int})::Tuple{Vector{Float64}, Vector{Float64}}`

Compute mean and standard deviation of the columns in dataset. Returns a tuple with a vector of means in `columns`

, and a vector of corresponding sdevs.

`DistributedData.dstat_buckets`

— Method`dstat_buckets(dInfo::Dinfo, nbuckets::Int, buckets::Dinfo, columns::Vector{Int})::Tuple{Matrix{Float64}, Matrix{Float64}}`

A version of `dstat`

that works with bucketing information (e.g. clusters); returns a tuple of matrices.

`DistributedData.mapbuckets`

— Method`mapbuckets(fn, a::Array, nbuckets::Int, buckets::Vector{Int}; bucketdim::Int=1, slicedims=bucketdim)`

Apply the function `fn`

over array `a`

so that it processes the data by buckets defined by `buckets`

(that contains integers in range `1:nbuckets`

).

The buckets are sliced out in dimension specified by `bucketdim`

.

`DistributedData.reduce_extrema`

— Method`reduce_extrema(ex1, ex2)`

Helper for gathering the minima and maxima of the data. `ex1`

, `ex2`

are arrays of pairs (min,max), this function combines the arrays element-wise and finds combined minima and maxima.

`DistributedData.update_extrema`

— Method`update_extrema(counts, target, lim, mid)`

Helper for distributed median computation – returns updated extrema in `lims`

depending on whether the item count in `counts`

of values less than `mids`

is less or higher than `targets`

.

## Input/Output

`DistributedData.defaultFiles`

— Method`defaultFiles(s, pids)`

Make a good set of filenames for saving a dataset.

`DistributedData.dload`

— Function`dload(sym::Symbol, pids, files=defaultFiles(sym,pids))`

Import the content of symbol `sym`

by each worker specified by `pids`

from the corresponding filename in `files`

.

`DistributedData.dload`

— Function`dload(dInfo::Dinfo, files=defaultFiles(dInfo.val, dInfo.workers))`

Overloaded functionality for `Dinfo`

.

`DistributedData.dstore`

— Function`dstore(sym::Symbol, pids, files=defaultFiles(sym,pids))`

Export the content of symbol `sym`

by each worker specified by `pids`

to a corresponding filename in `files`

.

`DistributedData.dstore`

— Function`dstore(dInfo::Dinfo, files=defaultFiles(dInfo.val, dInfo.workers))`

Overloaded functionality for `Dinfo`

.

`DistributedData.dunlink`

— Function`dunlink(sym::Symbol, pids, files=defaultFiles(sym,pids))`

Remove the files created by `dstore`

with the same parameters.

`DistributedData.dunlink`

— Function`dunlink(dInfo::Dinfo, files=defaultFiles(dInfo.val, dInfo.workers))`

Overloaded functionality for `Dinfo`

.