DistributedJLFluxML.build_data_loader_from_RemoteChannelMethod

Leaving this hear for future work. Looks like theres a problem with Distributed & Threads that might be breaking this.

See https://github.com/JuliaLang/julia/issues/32677 and https://github.com/JuliaLang/julia/pull/33555

When Julia 1.8 is realeased, readdress this

DistributedJLFluxML.eval_modelMethod
eval_model(model, data, evalWorkers;
           status_chan=nothing, get_step=nothing,
           device=gpu)

This function evaluates the model on a set of data partitioned across many workers. eval_model will deploy model and the approprate RemoteChannel from data to evalWorkers. There, it will call model(x) on the data iterated by data[myid()]. Finally, the results will be fetched and aggregated into a single array.

Arguments

  • model::Chain: The model that will be evaluated
  • data::Dict{Int,RemoteChannel}: The dict key is the worker id that the remote channel will be sent to
  • evalWorkers::AbstractArray: List of workers ids to perform evaluation on.
  • statusChan::RemoteChannel: status messages and data will be placed on this channel to monitor progress
  • device::Function: the device a model will be copied to on remote worker. usually gpu or cpu

Returns

  • Array: concatenated array of results from each of the workers

Throws

  • nothing
DistributedJLFluxML.eval_modelMethod
eval_model(saved_model_dir::String, model, _data, workers;
           status_chan=nothing, get_step=nothing,
           device=gpu)

Not tested yet. Still need to build model saving in train!

DistributedJLFluxML.std_grad_calcMethod
std_grad_calc(xy, loss_f, model; device)

Standard combination on loss_f, model, input and output to be used in

  l = Flux.gradient(θ) do
    ...
  end

Arguments

  • xy::Tuple: A tuple with the first item as the model input and second item the expected model output
  • loss_f::Function: evauates loss_f(y̅, y)
  • model::Chain: The model to be trained

Returns

  • Float: the value of the loss calculated by loss_f
DistributedJLFluxML.train!Method
train!(loss, model, data, opt, workers;
       cb, save_on_step_cb, status_chan, save_model_dir, device,
       no_block, grad_calc=std_grad_calc)

Uses a loss function and training data to improve the model parameters according to a particular optimisation rule opt. Runs the training loop in parellel on workers, agrigates the gradients through an allReduce, then updates the model parameters.

Example

See test dir /juliateam/.julia/packages/DistributedJLFluxML/dkZuT/test, in particular /juliateam/.julia/packages/DistributedJLFluxML/dkZuT/test/trainModel.jl, for more details.

Partition and load data on workers. The following example loads an already partitioned version of the iris dataset

    batch_size=8
    epochs = 50

    deser_fut = [@spawnat w global rawData = deserialize(f)
                 for (w, f) in zip(workersToHostData, shard_file_list)]
    for fut in deser_fut
        wait(fut)
    end
    epoch_length_worker = @fetchfrom p[1] nrow(rawData)
    @everywhere p labels = ["Iris-versicolor", "Iris-virginica", "Iris-setosa"]

    @everywhere p x_array =
        Array(rawData[:,
                      [:sepal_l, :sepal_w,
                       :petal_l, :petal_w
                       ]])

    @everywhere p y_array =
        Flux.onehotbatch(rawData[:,"class"],
                         labels)

    @everywhere p dataChan = Channel(1) do ch
        n_chunk = ceil(Int,size(x_array)[1]/$batch_size)
        x_dat = Flux.chunk(transpose(x_array), n_chunk)
        y_dat = Flux.chunk(y_array, n_chunk)
        for epoch in 1:$epochs
            for d in zip(x_dat, y_dat)
                put!(ch, d)
            end
        end
    end

    @everywhere p datRemChan = RemoteChannel(() -> dataChan, myid())

    datRemChansDict = Dict(k => @fetchfrom w datRemChan for (k, w) in zip(workersToRunTrain,workersToHostData))

Once the data is set up for your needs, then you need to define the model, loss, optimizer and pass it to train!

    loss_f = Flux.Losses.logitcrossentropy
    opt = Flux.Optimise.ADAM(0.001)

    model = Chain(Dense(4,8),Dense(8,16), Dense(16,3))

    DistributedJLFluxML.train!(loss_f, model, datRemChansDict, opt, p)

Example

Argument grad_calc is meant for novel nomalization schemes. For example, if your DataChannel returns a 3 touple, say (x, s, y), a desired grad calc coule be

function node_norm_grad_calc(xsy, loss_f, model; device=gpu)
    x,s,y = xsy
    loss_f(model(x), y, agg=sum)/s
end

When train! returns, it will have updated the parameters of model.

Arguments

  • loss::Function: evauates loss(y, y̅)
  • model::Chain: The model to be trained
  • data::Dict{Int,RemoteChannel}: The dict key is the worker id that the remote channel will be sent to
  • opt::AbstractOptimizer: The optimized used during training
  • workers::AbstractArray: List of workers ids to perform training on.
  • cb::Function: a callback that is called after optimize.update! on each training worker
  • save_on_step_cb::Function: The training step is passed to this cb on each training iteration. If the cb returns true, a copy of the model will be saved to saved_model_dir
  • saved_model_dir::String: path to directory where saved model will be placed
  • device::Function: the device a model will be copied to on remote worker. usually gpu or cpu
  • grad_calc::Function: a function that will be called in Flux.gradient to combine a single data sample, the model, and the loss function. See std_grad_calc as an example.

Returns

  • nothing

Throws

  • nothing