# Manual

We saw how to run an asynchronous version of the SGD algorithm on a LRMSE problem in quick start. Here we'll use this same example to look at the following:

- Working with a distributed problem
- Synchronous run
- Active processes
- Recording iterates
- Customization of
`start`

's execution - Handling worker failures
- Algorithm wrappers

## Working with a distributed problem

Suppose you have a `make_problem`

function

```
# Note: In this example, we sample `A` and `b`.
# In practice, we could read them from a file or any other source.
@everywhere function make_problem(pid)
pid==1 && return nothing # for now, let's assign process 1 an empty problem
LRMSE(rand(pid,10),rand(pid)) # the sample size is `m` is set to `pid` for demonstration purposes only
end
```

When instantiating your problems you might have three requirements:

**Limiting communication costs**and**avoiding duplicated memory**: loading problems directly on their assigned processes is preferable to loading them central node before sending them to their respective processes**Persistent data**: necessary if you want to reuse problems for multiple experiments (you don't want your problems to be stuck on remote processes in`start`

's local scope)

Depending on your needs, you have three options to construct your problems:

```
# Option 1: Instantiate the problems remotely
problem_constructor = make_problem
# Option 2: Instantiate the problems on the central node and send them to their respective processes
problems = Dict(procs() .=> make_problem.(procs()));
problem_constructor = (pid) -> problems[pid]
# Option 3: Create a `DistributedObject` that references a problem on each process.
@everywhere using DistributedObjects
distributed_problem = DistributedObject((pid) -> make_problem(pid), pids=procs())
```

Option 3 uses

`DistributedObjects`

. In a nutshell, a`DistributedObject`

instance references at most one object per process, and you can access the object stored on the current process with`[]`

communication costs & duplicated memory | single use objectives | ||
---|---|---|---|

Option 1 | ❌ | ||

Option 2 | ❌ | ||

Option 3 |

As previously noted, Option 2 should be avoided when working with large data. However, it does offer the advantage of preserving access to problems, which is not possible with Option 1. This opens up the possibility of reconstructing the global problem.

```
# reconstructing global problem from problems stored locally
function LRMSE(problems::Dict)
pids = [pid for pid in keys(problems) if pid ≠ 1]
n = problems[pids[1]].n
m = sum([problems[pid].m for pid in pids])
L = sum([problems[pid].L for pid in pids])
∇f(x) = sum([problems[pid].∇f(x) * problems[pid].m for pid in pids]) / m
return LRMSE(nothing,nothing,n,m,L,∇f)
end
problems[1] = LRMSE(problems);
# We now have access to the global Lipschitz constant!
sgd = SGD(1/problems[1].L)
```

*Option 3* is the best of both worlds:

```
# reconstructing global problem from problems stored remotely
function LRMSE(d::DistributedObject)
pids = [pid for pid in where(d) if pid ≠ 1]
n = fetch(@spawnat pids[1] d[].n)
m = sum(fetch.([@spawnat pid d[].m for pid in pids]))
L = sum(fetch.([@spawnat pid d[].L for pid in pids]))
∇f(x) = sum(fetch.([@spawnat pid d[].∇f(x) * d[].m for pid in pids])) / m
return LRMSE(nothing,nothing,n,m,L,∇f)
end
distributed_problem[] = LRMSE(distributed_problem);
# We also have access to the global Lipschitz constant!
sgd = SGD(1/distributed_problem[].L)
```

It's worth mentioning that instead of `problem_constructor::Function`

, `distributed_problem::DistributedObject`

can be passed to `start`

. Both of the following are equivalent:

```
history = start(sgd, (pid)-> distributed_problem[], stopat)
history = start(sgd, distributed_problem, stopat);
```

## Synchronous run

If you want to run your algorithm synchronously you just have to define the **synchronous central step** performed by the central node when receiving answers `as::Vector{A}`

from all the `workers`

...

```
@everywhere begin
# synchronous central step
(sgd::SGD)(as::Vector{Vector{Float64}}, workers::Vector{Int64}, problem::Any) = sum(as)
end
```

...and to add the `synchronous=true`

keyword to `start`

`history = start(sgd, distributed_problem, stopat; synchronous=true);`

## Active processes

You can choose which processes are active with the `pids`

keyword

`history = start(sgd, problem_constructor, stopat; pids=[2,3,6]);`

If `pids=[1]`

, a non-distributed (and necessarily synchronous) version of your algorithm will be `start`

ed.

`history = start(sgd, (pid)->LRMSE(rand(42,10),rand(42)), stopat; pids=[1], synchronous=true);`

## Recording iterates

The queries`::Q`

sent by the central node, along with the iterations, epochs, times at wich they were recorded, are saved at intervals specified by the keyword `saveat`

: every `iteration`

and every `epoch`

(see `savenow`

for custom saving criteria).

You can set any or all criteria: `saveat=(iteration=100, epoch=10)`

or `saveat=(epoch=100,)`

for example.

To also save the workers' answers`::A`

, simply add the `save_answers=true`

keyword (see `savevalues`

and `report`

to save additional variables during execution).

`history = start(sgd, distributed_problem, stopat; saveat=(iteration=100, epoch=10), save_answers=true);`

## Customization of `start`

's execution

Let's look at a slightly modified version of `SGD`

where we track the *"precision"* of our iterative algorithm, measured as the distance between the last two iterates.

```
@everywhere begin
using LinearAlgebra
mutable struct SGDbis<:AbstractAlgorithm{Vector{Float64},Vector{Float64}}
stepsize::Float64
previous_q::Vector{Float64}
precision::Float64 # will hold the distance between the last two iterates
precisions::Vector{Float64} # record of all the precisions
SGDbis(stepsize::Float64) = new(stepsize, Float64[], Inf, Float64[])
end
# no changes
function (sgd::SGDbis)(problem::Any)
sgd.previous_q = rand(problem.n)
end
# no changes
function (sgd::SGDbis)(q::Vector{Float64}, problem::Any)
sgd.stepsize * problem.∇f(q, rand(1:problem.m))
end
function (sgd::SGDbis)(a::Vector{Float64}, worker::Int64, problem::Any)
q = sgd.previous_q - a
sgd.precision = norm(q-sgd.previous_q)
sgd.previous_q = q
end
end
```

Recall that we defined

`const AIA = AsynchronousIterativeAlgorithms`

`stopnow`

By default, you can specify the any of following stopping criteria through the `stopat`

argument: maximum `iteration`

, `epoch`

and `time`

. If any is met, the execution is stopped.

If you require additional stopping conditions, for instance *"stop at current iteration if the precision is below a threshold"* you can define `stopnow`

on your algorithm:

```
function AIA.stopnow(sgd::SGDbis, stopat::NamedTuple)
haskey(stopat, :precision) ? sgd.precision ≤ stopat.precision : false
end
```

You can now set `stopat`

to `(iteration=1000, precision=1e-5)`

or `(precision=1e-5,)`

for example.

`savenow`

By default, you can specify intervals at which some parameters are recorded through the `saveat`

keyword: every `iteration`

and every `epoch`

.

If you require additional saving checkpoints, for instance *"save current iteration if is below a threshold"*, you can define `savenow`

on your algorithm:

```
function AIA.savenow(sgd::SGDbis, saveat::NamedTuple)
haskey(saveat, :precision) ? sgd.precision ≤ saveat.precision : false
end
```

You can now set `saveat`

to `(precision=1e-4, time=42)`

or just `(precision=1e-4,)`

for example.

`savevalues`

By default, at each `saveat`

checkpoint, only queries, iterations, epochs, times, answer count per worker and optionally answers and their provenance.

If you want to record other values, for instance the precisions computed at the `saveat`

checkpoints, you can define `savevalues`

on your algorithm:

```
function AIA.savevalues(sgd::SGDbis)
sgd.precisions = append!(sgd.precisions, [sgd.precision])
end
```

`report`

To retrieve any values held by your algorithm, for example the precisions, return them as a `NamedTuple`

in `report`

:

```
function AIA.report(sgd::SGDbis)
(precisions = sgd.precisions,)
end
```

They will now be outputted by `start`

.

`progress`

When `verbose>1`

a progress bar is displayed. To reflect any progress other than the number of iterations, epochs, and the time, return a value between `0`

to `1`

(`1`

meaning completion) in `progress`

:

```
function AIA.progress(sgd::SGDbis, stopat::NamedTuple)
if haskey(stopat, :precision)
sgd.precision == 0 && return 1.
return stopat.precision / sgd.precision
else
return 0.
end
end
```

`showvalues`

Below the progress bar, by default, the number of iterations, epochs and answers-per-worker count are displayed. If you want to keep track of other values, return them as a `Vector`

of `Tuple{Symbol,Any}`

in `savevalues`

:

```
function AIA.showvalues(sgd::SGDbis)
[(:precision, round(sgd.precision; sigdigits=4))]
end
```

## Handling worker failures

If you expect some workers to fail but still want the algorithm to continue running, you can set the `resilience`

parameter to the maximum number of worker failures you can tolerate before the execution is terminated.

## Algorithm wrappers

You are free to create your own algorithms, but if you're interested in *aggregation algorithms*, you can use an implementation provided in this library. The iteration of such an algorithm performs the following computation:

\[q_j \longleftarrow \textrm{query}(\underset{i \in \textrm{connected}}{\textrm{aggregate}}(a_j))\ \ \textrm{where }\ \ a_i = \textrm{answer}(q_i)\]

where $q_j$ is computed by the worker upon reception of $\textrm{answer}(q_i)$ from worker $j$ and where $connected$ are the list of workers that have answered.

The `AggregationAlgorithm`

in this library requires you to define four methods: `initialize`

, `query`

, `answer`

, and `aggregate`

. Here's an example showing the required signatures of these three methods:

```
@everywhere begin
using Statistics
struct ToBeAggregatedGD <: AbstractAlgorithm{Vector{Float64},Vector{Float64}}
q1::Vector{Float64}
stepsize::Float64
end
AIA.initialize(tba::ToBeAggregatedGD, problem::Any) = tba.q1
AIA.aggregate(tba::ToBeAggregatedGD, a::Vector{Vector{Float64}}, connected::Vector{Int64}) = mean(a)
AIA.query(tba::ToBeAggregatedGD, a::Vector{Float64}, problem::Any) = a
AIA.answer(tba::ToBeAggregatedGD, q::Vector{Float64}, problem::Any) = q - tba.stepsize * problem.∇f(q)
end
algorithm = AggregationAlgorithm(ToBeAggregatedGD(rand(10), 0.01); pids=workers())
history = start(algorithm, distributed_problem, (epoch=100,));
```

**Memory limitation:** At any point in time, the central worker should have access must have access to the latest answers $a_i$ from *all* the connected workers. This means storing a lot of $a_i$ if we use many workers. There is a workaround when the aggregation operation is an *average*. In this case, only the equivalent of one answer needs to be saved on the central node, regardless of the number of workers.

`AveragingAlgorithm`

implements this memory optimization. Here you only need to define `initialize`

, `query`

, the `answer`

```
@everywhere begin
struct ToBeAveragedGD <: AbstractAlgorithm{Vector{Float64},Vector{Float64}}
q1::Vector{Float64}
stepsize::Float64
end
AIA.initialize(tba::ToBeAveragedGD, problem::Any) = tba.q1
AIA.query(tba::ToBeAveragedGD, a::Vector{Float64}, problem::Any) = a
AIA.answer(tba::ToBeAveragedGD, q::Vector{Float64}, problem::Any) = q - tba.stepsize * problem.∇f(q)
end
algorithm = AveragingAlgorithm(ToBeAveragedGD(rand(10), 0.01); pids=workers(), weights=ones(nworkers()))
history = start(algorithm, distributed_problem, (epoch=100,));
```

Note that you can implement the custom callbacks on both these algorithms by defining them on your algorithm:

`report(::ToBeAggregatedGD) = # do something`

Wow you read all this! Hope you find this library helpful and look forward to seeing how you put it to use!