DistributedJLFluxML.build_data_loader_from_RemoteChannel
— MethodLeaving this hear for future work. Looks like theres a problem with Distributed & Threads that might be breaking this.
See https://github.com/JuliaLang/julia/issues/32677 and https://github.com/JuliaLang/julia/pull/33555
When Julia 1.8 is realeased, readdress this
DistributedJLFluxML.do_train_on_remote
— Methoddo_train_on_remote()
Not documented. Called though @swpanat in train!()
. No one whould be calling this directly.
DistributedJLFluxML.eval_model
— Methodeval_model(model, data, evalWorkers;
status_chan=nothing, get_step=nothing,
device=gpu)
This function evaluates the model on a set of data partitioned across many workers. eval_model
will deploy model
and the approprate RemoteChannel
from data
to evalWorkers
. There, it will call model(x)
on the data iterated by data[myid()]
. Finally, the results will be fetched and aggregated into a single array.
Arguments
model::Chain
: The model that will be evaluateddata::Dict{Int,RemoteChannel}
: The dict key is the worker id that the remote channel will be sent toevalWorkers::AbstractArray
: List of workers ids to perform evaluation on.statusChan::RemoteChannel
: status messages and data will be placed on this channel to monitor progressdevice::Function
: the device a model will be copied to on remote worker. usuallygpu
orcpu
Returns
Array
: concatenated array of results from each of the workers
Throws
nothing
DistributedJLFluxML.eval_model
— Methodeval_model(saved_model_dir::String, model, _data, workers;
status_chan=nothing, get_step=nothing,
device=gpu)
Not tested yet. Still need to build model saving in train!
DistributedJLFluxML.std_grad_calc
— Methodstd_grad_calc(xy, loss_f, model; device)
Standard combination on loss_f, model, input and output to be used in
l = Flux.gradient(θ) do
...
end
Arguments
xy::Tuple
: A tuple with the first item as the model input and second item the expected model outputloss_f::Function
: evauatesloss_f(y̅, y)
model::Chain
: The model to be trained
Returns
Float
: the value of the loss calculated by loss_f
DistributedJLFluxML.train!
— Methodtrain!(loss, model, data, opt, workers;
cb, save_on_step_cb, status_chan, save_model_dir, device,
no_block, grad_calc=std_grad_calc)
Uses a loss
function and training data
to improve the model
parameters according to a particular optimisation rule opt
. Runs the training loop in parellel on workers
, agrigates the gradients through an allReduce, then updates the model parameters.
Example
See test dir /juliateam/.julia/packages/DistributedJLFluxML/dkZuT/test, in particular /juliateam/.julia/packages/DistributedJLFluxML/dkZuT/test/trainModel.jl, for more details.
Partition and load data on workers. The following example loads an already partitioned version of the iris dataset
batch_size=8
epochs = 50
deser_fut = [@spawnat w global rawData = deserialize(f)
for (w, f) in zip(workersToHostData, shard_file_list)]
for fut in deser_fut
wait(fut)
end
epoch_length_worker = @fetchfrom p[1] nrow(rawData)
@everywhere p labels = ["Iris-versicolor", "Iris-virginica", "Iris-setosa"]
@everywhere p x_array =
Array(rawData[:,
[:sepal_l, :sepal_w,
:petal_l, :petal_w
]])
@everywhere p y_array =
Flux.onehotbatch(rawData[:,"class"],
labels)
@everywhere p dataChan = Channel(1) do ch
n_chunk = ceil(Int,size(x_array)[1]/$batch_size)
x_dat = Flux.chunk(transpose(x_array), n_chunk)
y_dat = Flux.chunk(y_array, n_chunk)
for epoch in 1:$epochs
for d in zip(x_dat, y_dat)
put!(ch, d)
end
end
end
@everywhere p datRemChan = RemoteChannel(() -> dataChan, myid())
datRemChansDict = Dict(k => @fetchfrom w datRemChan for (k, w) in zip(workersToRunTrain,workersToHostData))
Once the data is set up for your needs, then you need to define the model, loss, optimizer and pass it to train!
loss_f = Flux.Losses.logitcrossentropy
opt = Flux.Optimise.ADAM(0.001)
model = Chain(Dense(4,8),Dense(8,16), Dense(16,3))
DistributedJLFluxML.train!(loss_f, model, datRemChansDict, opt, p)
Example
Argument grad_calc
is meant for novel nomalization schemes. For example, if your DataChannel
returns a 3 touple, say (x, s, y)
, a desired grad calc coule be
function node_norm_grad_calc(xsy, loss_f, model; device=gpu)
x,s,y = xsy
loss_f(model(x), y, agg=sum)/s
end
When train!
returns, it will have updated the parameters of model
.
Arguments
loss::Function
: evauatesloss(y, y̅)
model::Chain
: The model to be traineddata::Dict{Int,RemoteChannel}
: The dict key is the worker id that the remote channel will be sent toopt::AbstractOptimizer
: The optimized used during trainingworkers::AbstractArray
: List of workers ids to perform training on.cb::Function
: a callback that is called after optimize.update! on each training workersave_on_step_cb::Function
: The training step is passed to this cb on each training iteration. If the cb returns true, a copy of the model will be saved tosaved_model_dir
saved_model_dir::String
: path to directory where saved model will be placeddevice::Function
: the device a model will be copied to on remote worker. usuallygpu
orcpu
grad_calc::Function
: a function that will be called in Flux.gradient to combine a single data sample, the model, and the loss function. Seestd_grad_calc
as an example.
Returns
nothing
Throws
nothing