Documentation

start

AsynchronousIterativeAlgorithms.startFunction
start(algorithm, problem_constructor, stopat; kwargs...)
start(algorithm, distributed_problem, stopat; kwargs...)

Solve the distributed problem returned by problem_constructor (or referenced by distributed_problem) using the algorithm until the stopat conditions are reached.

Arguments

  • algorithm::AbstractAlgorithm{Q,A}: subtypes AbstractAlgorithm{Q,A} and implementing its functor calls
  • problem_constructor::Function: for each pid in {pids ⋃ current pid}, process pid calling problem_constructor(pid) should return the process' assigned problem
  • distributed_problem::DistributedObject: for each pid in {pids ⋃ current pid}, distributed_problem should reference process pid's assigned problem on pid
  • stopat::NamedTuple: you can specify any of the following
    • iteration::Int64: maximum number of iterations
    • epoch::Int64: maximum number of epochs (an epoch passes when all workers have answered at least one time)
    • time::Float64: maximum elapsed time (in seconds)
    • other custom stopping conditions that you have specified by implementing stopnow

Keywords

  • saveat=NamedTuple(): when to record query iterates (::Q), iterations, epochs, timestamps (and other custom values specified by implementing progress). Specified with any of the following
    • iteration::Int64: save every iteration> 0
    • epoch::Int64: , save every epoch> 0
    • other custom saving conditions that you have specified by implementing savenow
  • save_answers=false: answer iterates (::A) along with the pids of the workers that sent them are recorder
  • pids=workers(): pids of the active workers, you can start a non-distributed (and necessarily synchronous) version of your algorithm with pids=[1]
  • synchronous=false: if synchronous=true, the central node waits for all workers to answer before making a step
  • resilience=0: number of workers allowed to fail before the execution is stopped
  • verbose=1: if > 0, a progress bar is displayed (implent progress and/or showvalues to customize the display)

Returns

  • NamedTuple: a record of the queries and the iterations, epochs, timestamps at which they were recorded, as well an answer_count of each worker, additionally,
    • if save_answers=true, a record of the answers and the answers_origin
    • other custom values you have specified by implementing report

Throws

  • ArgumentError: if the arguments don't match the specifications.

AbstractAlgorithm

The algorithm you pass to start should subtype AbstractAlgorithm{Q,A}.

AsynchronousIterativeAlgorithms.AbstractAlgorithmType
AbstractAlgorithm{Q,A}

To be compatible with start, types subtyping AbstractAlgorithm should be callable with the following signatures:

  • (algorithm::AbstractAlgorithm{Q,A})(problem::Any)::Q where {Q,A}: the initialization step that create the first query iterate
  • (algorithm::AbstractAlgorithm{Q,A})(q::Q, problem::Any)::A where {Q,A}: the answer step perfromed by the wokers when they receive a query q::Q from the central node
  • (algorithm::AbstractAlgorithm{Q,A})(a::A, worker::Int64, problem::Any)::Q where {Q,A}: the query step performed by the central node when receiving an answer a::A from a worker
  • when start takes the keyword synchronous=true, (algorithm::AbstractAlgorithm{Q,A})(as::Vector{A}, workers::Vector{Int64}, problem::Any)::Q where {Q,A}: the query step performed by the central node when receiving the answers as::Vector{A} respectively from the workers

Customization of start's execution

AsynchronousIterativeAlgorithms.stopnowFunction
stopnow(::AbstractAlgorithm, stopat::NamedTuple) = false

Define this method on your algorithm<:AbstractAlgorithm to add a stopping criterion: return true if your stopping condition has been reached.

AsynchronousIterativeAlgorithms.savenowFunction
savenow(::AbstractAlgorithm, saveat::NamedTuple) = false

Define this method on your algorithm<:AbstractAlgorithm to add saving stops: return true if your saving condition has been reached

AsynchronousIterativeAlgorithms.savevaluesFunction
savevalues(::AbstractAlgorithm) = nothing

Define this method on your algorithm<:AbstractAlgorithm. It will be called at each iteration where savenow returns true: store some values on your algorithm object (don't forget to define report to retrieve what you stored)

AsynchronousIterativeAlgorithms.reportFunction
report(::AbstractAlgorithm) = NamedTuple()

Define this method on your algorithm<:AbstractAlgorithm to add custom values to the results outputted by start: return a NamedTuple() with those values, making sure to not reuse the field names queries, answers, iterations, epochs, timestamps, answers_origin, answer_count.

AsynchronousIterativeAlgorithms.progressFunction
progress(::AbstractAlgorithm, stopat::NamedTuple) = 0.

Define this method on your algorithm<:AbstractAlgorithm to change the display of the progress bar: return how close the current step is to reaching your stopping requirement on a scale of 0 to 1

AsynchronousIterativeAlgorithms.showvaluesFunction
showvalues(::AbstractAlgorithm) = Tuple{Symbol, Any}[]

Define this method on your algorithm<:AbstractAlgorithm to add a values to be displayed below the progress bar when verbose>1: return a Tuple{Symbol, Any} with those values.

Algorithm wrappers

The two following algorithms already subtype AbstractAlgorithm{Q,A} and are ready to use in start.

AsynchronousIterativeAlgorithms.AggregationAlgorithmType
AggregationAlgorithm(arg; kwarg)::AbstractAlgorithm

Distributed algorithm that writes: q_j <- query(aggregate([answer(q_i) for i in connected])) Where a "connected" worker is a worker that has answered at least once. (Not memory optimized: length(pids) answers are stored on the central worker at all times)

Argument

  • algorithm<:AbstractAlgorithm{Q,A} which should define the following (where const AIA = AsynchronousIterativeAlgorithms)
    • AIA.initialize(algorithm, problem::Any)::Q: step that creates the first query iterate
    • AIA.aggregate(algorithm, as::Vector{A}, workers::Vector{Int64})::AggregatedA where A: step performed by the central node when receiving the answers as::Vector{A} from the workers
    • AIA.query(algorithm, agg::AggregatedA, problem::Any)::Q: step producing a query from the aggregated answer agg::AggregatedA, performed by the central node
    • AIA.answer(algorithm, q::Q, problem::Any)::A: step perfromed by the wokers when they receive a query q::Q from the central node

Keyword

  • pids=workers(): pids of the active workers
AsynchronousIterativeAlgorithms.AveragingAlgorithmType
AveragingAlgorithm(arg; kwarg)::AbstractAlgorithm

Distributed algorithm that writes: q_j <- query(weighted_average([answer(q_i) for i in connected])) Where a "connected" worker is a worker that has answered at least once. (Memory optimized: only the equivalent of one answer is stored on the central worker at all times)

Argument

  • algorithm<:AbstractAlgorithm{Q,A} which should define the following (where const AIA = AsynchronousIterativeAlgorithms)
    • AIA.initialize(algorithm, problem::Any)::Q: step that creates the first query iterate
    • AIA.query(algorithm, a::A, problem::Any)::Q: step producing a query from the averaged answer, performed by the central node
    • AIA.answer(algorithm, q::Q, problem::Any)::A: step perfromed by the wokers when they receive a query q::Q from the central node

Keyword

  • pids=workers(): pids of the active workers
  • weights=ones(length(pids)): weights of each pid in the weighted average