Dagger.AnyScope
— TypeWidest scope that contains all processors.
Dagger.ArrayDomain
— TypeArrayDomain{N}
An N
-dimensional domain over an array.
Dagger.AutoBlocks
— TypeAutoBlocks
Automatically determines the size and number of blocks for a distributed array. This may construct any kind of Dagger.AbstractBlocks
partitioning.
Dagger.Blocks
— MethodBlocks(xs...)
Indicates the size of an array operation, specified as xs
, whose length indicates the number of dimensions in the resulting array.
Dagger.Chunk
— TypeChunk
A reference to a piece of data located on a remote worker. Chunk
s are typically created with Dagger.tochunk(data)
, and the data can then be accessed from any worker with collect(::Chunk)
. Chunk
s are serialization-safe, and use distributed refcounting (provided by MemPool.DRef
) to ensure that the data referenced by a Chunk
won't be GC'd, as long as a reference exists on some worker.
Each Chunk
is associated with a given Dagger.Processor
, which is (in a sense) the processor that "owns" or contains the data. Calling collect(::Chunk)
will perform data movement and conversions defined by that processor to safely serialize the data to the calling worker.
Constructors
See tochunk
.
Dagger.Context
— TypeContext(xs::Vector{OSProc}) -> Context
Context(xs::Vector{Int}) -> Context
Create a Context, by default adding each available worker.
It is also possible to create a Context from a vector of OSProc
, or equivalently the underlying process ids can also be passed directly as a Vector{Int}
.
Special fields include:
- 'log_sink': A log sink object to use, if any.
profile::Bool
: Whether or not to perform profiling with Profile stdlib.
Dagger.DArray
— TypeDArray{T,N,F}(domain, subdomains, chunks, concat)
DArray(T, domain, subdomains, chunks, [concat=cat])
An N-dimensional distributed array of element type T, with a concatenation function of type F.
Arguments
T
: element typedomain::ArrayDomain{N}
: the whole ArrayDomain of the arraysubdomains::AbstractArray{ArrayDomain{N}, N}
: aDomainBlocks
of the same dimensions as the arraychunks::AbstractArray{Union{Chunk,Thunk}, N}
: an array of chunks of dimension Nconcat::F
: a function of typeF
.concat(x, y; dims=d)
takes two chunksx
andy
and concatenates them along dimensiond
.cat
is used by default.
Dagger.DTask
— TypeDTask
Returned from Dagger.@spawn
/Dagger.spawn
calls. Represents a task that is in the scheduler, potentially ready to execute, executing, or finished executing. May be fetch
'd or wait
'd on at any time. See Dagger.@spawn
for more details.
Dagger.DTaskFinalizer
— TypeWhen finalized, cleans-up the associated DTask
.
Dagger.Deps
— TypeSpecifies one or more dependencies.
Dagger.ExactScope
— TypeScoped to a specific processor.
Dagger.File
— MethodFile(path::AbstractString;
serialize::Base.Callable, deserialize::Base.Callable,
use_io::Bool, mmap::Bool) -> Dagger.File
References data in the file at path
, using the derialization function deserialize
. use_io
specifies whether deserialize
takes an IO
(the default) or takes a String
path. mmap
is experimental, and specifies whether deserialize
can use MemPool's MMWrap
memory mapping functionality (default is false
).
The file at path
is not yet loaded when the call to File
returns; passing it to a task will cause reading to occur, and the result will be passed to the task.
Dagger.FileReader
— TypeFileReader
Used as a Chunk
handle for reading a file, starting at a given offset.
Dagger.In
— TypeSpecifies a read-only dependency.
Dagger.InOut
— TypeSpecifies a read-write dependency.
Dagger.InvalidScope
— TypeIndicates that the applied scopes x
and y
are incompatible.
Dagger.NodeScope
— TypeScoped to the same physical node.
Dagger.OSProc
— TypeOSProc <: Processor
Julia CPU (OS) process, identified by Distributed pid. The logical parent of all processors on a given node, but otherwise does not participate in computations.
Dagger.Options
— TypeOptions(::NamedTuple)
Options(; kwargs...)
Options for thunks and the scheduler. See Task Spawning for more information.
Dagger.Out
— TypeSpecifies a write-only dependency.
Dagger.ProcessScope
— TypeScoped to the same OS process.
Dagger.Processor
— TypeProcessor
An abstract type representing a processing device and associated memory, where data can be stored and operated on. Subtypes should be immutable, and instances should compare equal if they represent the same logical processing device/memory. Subtype instances should be serializable between different nodes. Subtype instances may contain a "parent" Processor
to make it easy to transfer data to/from other types of Processor
at runtime.
Dagger.PromotePartition
— TypeThis is a way of suggesting that stage should call stage_operand with the operation and other arguments.
Dagger.Shard
— TypeMaps a value to one of multiple distributed "mirror" values automatically when used as a thunk argument. Construct using @shard
or shard
.
Dagger.TaintScope
— TypeTaints a scope for later evaluation.
Dagger.ThreadProc
— TypeThreadProc <: Processor
Julia CPU (OS) thread, identified by Julia thread ID.
Dagger.Thunk
— TypeThunk
Wraps a callable object to be run with Dagger. A Thunk
is typically created through a call to delayed
or its macro equivalent @par
.
Constructors
delayed(f; kwargs...)(args...)
@par [option=value]... f(args...)
Examples
julia> t = delayed(sin)(π) # creates a Thunk to be computed later
Thunk(sin, (π,))
julia> collect(t) # computes the result and returns it to the current process
1.2246467991473532e-16
Arguments
f
: The function to be called upon execution of theThunk
.args
: The arguments to be passed to theThunk
.kwargs
: The properties describing unique behavior of thisThunk
. Details
for each property are described in the next section.
option=value
: The same as passingkwargs
todelayed
.
Public Properties
meta::Bool=false
: Iftrue
, instead of fetching cached arguments from
Chunk
s and passing the raw arguments to f
, instead pass the Chunk
. Useful for doing manual fetching or manipulation of Chunk
references. Non-Chunk
arguments are still passed as-is.
processor::Processor=OSProc()
- The processor associated withf
. Useful if
f
is a callable struct that exists on a given processor and should be transferred appropriately.
scope::Dagger.AbstractScope=DefaultScope()
- The scope associated withf
.
Useful if f
is a function or callable struct that may only be transferred to, and executed within, the specified scope.
Options
options
: ASch.ThunkOptions
struct providing the options for theThunk
.
If omitted, options can also be specified by passing key-value pairs as kwargs
.
Dagger.ThunkFuture
— TypeA future holding the result of a Thunk
.
Dagger.ThunkSummary
— TypeA summary of the data contained in a Thunk, which can be safely serialized.
Dagger.UnionScope
— TypeUnion of two or more scopes.
Dagger.UnitDomain
— TypeUnitDomain
Default domain – has no information about the value
Dagger.WeakThunk
— TypeA weak reference to a Thunk
.
Base.fetch
— MethodBase.fetch(c::DArray)
If a DArray
tree has a Thunk
in it, make the whole thing a big thunk.
Base.lock
— Methodlock(f, ctx::Context)
Acquire ctx.proc_lock
, execute f
with the lock held, and release the lock when f
returns.
Base.view
— Methodview(c::DArray, d)
A view
of a DArray
chunk returns a DArray
of Thunk
s.
Dagger.DefaultScope
— MethodDefault scope that contains the set of default_enabled
processors.
Dagger.ProcessorTypeScope
— MethodScoped to any processor with a given supertype.
Dagger._delayed
— Methoddelayed(f, options=Options())(args...; kwargs...) -> Thunk
delayed(f; options...)(args...; kwargs...) -> Thunk
Creates a Thunk
object which can be executed later, which will call f
with args
and kwargs
. options
controls various properties of the resulting Thunk
.
Dagger.addprocs!
— Methodaddprocs!(ctx::Context, xs)
Add new workers xs
to ctx
.
Workers will typically be assigned new tasks in the next scheduling iteration if scheduling is ongoing.
Workers can be either Processor
s or the underlying process IDs as Integer
s.
Dagger.alignfirst
— Methodalignfirst(a) -> ArrayDomain
Make a subdomain a standalone domain.
Example
julia> alignfirst(ArrayDomain(11:25, 21:100))
ArrayDomain((1:15), (1:80))
Dagger.all_processors
— Functionall_processors(ctx::Context=Sch.eager_context()) -> Set{Processor}
Returns the set of all processors available to the scheduler, across all Distributed workers.
Dagger.allowscalar
— FunctionAllow/disallow scalar indexing for the duration of executing f
.
Dagger.allowscalar!
— FunctionAllow/disallow scalar indexing for the current task.
Dagger.compatible_processors
— Functioncompatible_processors(scope::AbstractScope, ctx::Context=Sch.eager_context()) -> Set{Processor}
Returns the set of all processors (across all Distributed workers) that are compatible with the given scope.
Dagger.compute
— Methodcompute(ctx::Context, d::Thunk; options=nothing) -> Chunk
Compute a Thunk - creates the DAG, assigns ranks to nodes for tie breaking and runs the scheduler with the specified options. Returns a Chunk which references the result.
Dagger.constrain
— Methodconstraint(x::AbstractScope, y::AbstractScope) -> ::AbstractScope
Constructs a scope that is the intersection of scopes x
and y
.
Dagger.default_enabled
— Methoddefault_enabled(proc::Processor) -> Bool
Returns whether processor proc
is enabled by default. The default value is false
, which is an opt-out of the processor from execution when not specifically requested by the user, and true
implies opt-in, which causes the processor to always participate in execution when possible.
Dagger.default_option
— Methoddefault_option(::Val{name}, Tf, Targs...) where name = value
Defines the default value for option name
to value
when Dagger is preparing to execute a function with type Tf
with the argument types Targs
. Users and libraries may override this to set default values for tasks.
An easier way to define these defaults is with @option
.
Note that the actual task's argument values are not passed, as it may not always be possible or efficient to gather all Dagger task arguments on one worker.
This function may be executed within the scheduler, so it should generally be made very cheap to execute. If the function throws an error, the scheduler will use whatever the global default value is for that option instead.
Dagger.dependents
— Methoddependents(node::Thunk) -> Dict{Union{Thunk,Chunk}, Set{Thunk}}
Find the set of direct dependents for each task.
Dagger.disable_logging!
— Methoddisable_logging!()
Disables logging previously enabled with enable_logging!
.
Dagger.domain
— Functiondomain(x::T)
Returns metadata about x
. This metadata will be in the domain
field of a Chunk object when an object of type T
is created as the result of evaluating a Thunk.
Dagger.domain
— Methoddomain(x::AbstractArray) -> ArrayDomain
The domain of an array is an ArrayDomain.
Dagger.domain
— MethodIf no domain
method is defined on an object, then we use the UnitDomain
on it. A UnitDomain
is indivisible.
Dagger.dsort_chunks
— Functiondsort_chunks(cs, [nchunks, nsamples]; options...)
Sort contents of chunks (cs
) and return a new set of chunks such that the chunks when concatenated return a sorted collection. Each chunk in turn is sorted.
Arguments
nchunks
: the number of chunks to produce, regardless of how many chunks were given as inputnsamples
: the number of elements to sample from each chunk to guess the splitters (nchunks-1
splitters) each chunk will be delimited by the splitter.merge
: a function to merge two sorted collections.sub
: a function to get a subset of the collection takes (collection, range) (defaults togetindex
)order
:Base.Sort.Ordering
to be used for sortingbatchsize
: number of chunks to split and merge at a time (e.g. if there are 128 input chunks and 128 output chunks, and batchsize is 8, then we first sort among batches of 8 chunks – giving 16 batches. Then we sort among the first chunk of the first 8 batches (all elements less than the first splitter), then go on to the first 8 chunks of the second 8 batches, and so on...chunks_presorted
: is each chunk in the input already sorted?sortandsample
: a function to sort a chunk, then sample N elements to infer the splitters. It takes 3 arguments: (collection, N, presorted). presorted is a boolean which is true if the chunk is already sorted.affinities
: a list of processes where the output chunks should go. If the length is not equal tonchunks
then affinities array is cycled through.
Returns
A tuple of (chunk, samples)
where chunk
is the Dagger.Chunk
object. chunk
can be nothing
if no change to the initial array was made (e.g. it was already sorted)
Dagger.enable_disk_caching!
— Functionenable_disk_caching!(mem_limits::Dict{Int,Int}, disk_limit_mb::Int=16*2^10, processes::Vector{Int}=procs())
Sets up disk caching on participating processes. This is a low level method for applying the mem_limits
directly onto the processes. This skips the process discovery stage and the limit calculation.
Dagger.enable_disk_caching!
— Functionenable_disk_caching!(ram_percentage_limit::Int=30, disk_limit_mb::Int=16*2^10, processes::Vector{Int}=procs())
Sets up disk caching on all processes available in the environment according to the provided limits. The user should provide the percentage, which will decide what's the memory limit on each participating machine (differentiated by hostname). The disk limit is set strictly per process and doesn't include any hostname related logic.
Dagger.enable_logging!
— Methodenable_logging!(;kwargs...)
Enables logging globally for all workers. Certain core events are always enabled by this call, but additional ones may be specified via kwargs
.
Extra events:
metrics::Bool
: Enables various utilization and allocation metricstimeline::Bool
: Enables raw "timeline" values, which are event-specific; not recommended except for debuggingtasknames::Bool
: Enables generating unique task names for each tasktaskdeps::Bool
: Enables reporting of upstream task dependencies (as task IDs) for each task argumenttaskargs::Bool
: Enables reporting of upstream non-task dependencies (asobjectid
hash) for each task argumenttaskargmoves::Bool
: Enables reporting of copies of upstream dependencies (as original and copyobjectid
hashes) for each task argumentprofile::Bool
: Enables profiling of task execution; not currently recommended, as it adds significant overhead
Dagger.execute!
— Functionexecute!(proc::Processor, f, args...; kwargs...) -> Any
Executes the function f
with arguments args
and keyword arguments kwargs
on processor proc
. This function can be overloaded by Processor
subtypes to allow executing function calls differently than normal Julia.
Dagger.fetch_logs!
— Methodfetch_logs!() -> Dict{Int, Dict{Symbol, Vector}}
Fetches and returns the currently-accumulated logs for each worker. Each entry of the outer Dict
is keyed on worker ID, so logs[1]
are the logs for worker 1
.
Consider using show_logs
or render_logs
to generate a renderable display of these logs.
Dagger.gemm_dagger!
— MethodPerforms one of the matrix-matrix operations
C = alpha [op( A ) * op( B )] + beta C,
where op( X ) is one of
op( X ) = X or op( X ) = X' or op( X ) = g( X' )
alpha and beta are scalars, and A, B and C are matrices, with op( A ) an m by k matrix, op( B ) a k by n matrix and C an m by n matrix.
Dagger.get_options
— Methodget_options(key::Symbol, default) -> Any
get_options(key::Symbol) -> Any
Returns the value of the option named key
. If option
does not have a value set, then an error will be thrown, unless default
is set, in which case it will be returned instead of erroring.
get_options() -> NamedTuple
Returns a NamedTuple
of all option key-value pairs.
Dagger.get_parent
— Functionget_parent(proc::Processor) -> Processor
Returns the parent processor for proc
. The ultimate parent processor is an OSProc
. Processor
subtypes should overload this to return their most direct parent.
Dagger.get_processors
— Methodget_processors(proc::Processor) -> Set{<:Processor}
Returns the set of processors contained in proc
, if any. Processor
subtypes should overload this function if they can contain sub-processors. The default method will return a Set
containing proc
itself.
Dagger.get_tls
— Methodget_tls()
Gets all Dagger TLS variable as a NamedTuple
.
Dagger.has_writedep
— MethodWhether arg
has any writedep at or before executing task
in this datadeps region.
Dagger.has_writedep
— MethodWhether arg
has any writedep in this datadeps region.
Dagger.in_thunk
— Methodin_thunk()
Returns true
if currently in a Thunk
process, else false
.
Dagger.is_writedep
— MethodWhether arg
is written to by task
.
Dagger.iscompatible
— Methodiscompatible(proc::Processor, opts, f, Targs...) -> Bool
Indicates whether proc
can execute f
over Targs
given opts
. Processor
subtypes should overload this function to return true
if and only if it is essentially guaranteed that f(::Targs...)
is supported. Additionally, iscompatible_func
and iscompatible_arg
can be overriden to determine compatibility of f
and Targs
individually. The default implementation returns false
.
Dagger.load
— Methodload(ctx::Context, file_path)
Load an Union{Chunk, Thunk} from a file.
Dagger.load
— Methodload(ctx::Context, ::Type{Chunk}, fpath, io)
Load a Chunk object from a file, the file path is required for creating a FileReader object
Dagger.logs_annotate!
— MethodAssociates an argument arg
with name
in the logs, which logs renderers may utilize for display purposes.
Dagger.mem_limit
— Methodmem_limit(total::UInt, percentage_limit::Int, nprocs::Int)
Returns the per process mem limit in MiB based on the provided total
memory, percentage_limit
(1-100) and the number of processes in the current setup.
Dagger.move
— Methodmove(from_proc::Processor, to_proc::Processor, x)
Moves and/or converts x
such that it's available and suitable for usage on the to_proc
processor. This function can be overloaded by Processor
subtypes to transport arguments and convert them to an appropriate form before being used for exection. Subtypes of Processor
wishing to implement efficient data movement should provide implementations where x::Chunk
.
Dagger.mutable
— Methodmutable(f::Base.Callable; worker, processor, scope) -> Chunk
Calls f()
on the specified worker or processor, returning a Chunk
referencing the result with the specified scope scope
.
Dagger.noffspring
— Methodnoffspring(dpents::Dict{Union{Thunk,Chunk}, Set{Thunk}}) -> Dict{Thunk, Int}
Recursively find the number of tasks dependent on each task in the DAG. Takes a Dict as returned by dependents
.
Dagger.num_processors
— Functionnum_processors(scope::AbstractScope=DefaultScope(), all::Bool=false) -> Int
Returns the number of processors available to Dagger by default, or if specified, according to scope
. If all=true
, instead returns the number of processors known to Dagger, whether or not they've been disabled by the user. Most users will want to use num_processors()
.
Dagger.order
— Methodorder(node::Thunk, ndeps) -> Dict{Thunk,Int}
Given a root node of the DAG, calculates a total order for tie-breaking.
- Root node gets score 1,
- rest of the nodes are explored in DFS fashion but chunks of each node are explored in order of
noffspring
, i.e. total number of tasks depending on the result of the said node.
Args:
- node: root node
- ndeps: result of
noffspring
Dagger.recursive_splitters
— Methodrecursive_splitters(ord, splitters, nchunks, batchsize) -> Tuple{Vector, Vector{Vector}}
Split the splitters themselves into batches.
Arguments
ord
:Sorting.Ordering
objectsplitters
: thenchunks-1
splittersbatchsize
: batch size
Returns
A Tuple{Vector, Vector{Vector}}
– the coarse splitters which create batchsize
splits, finer splitters within those batches which create a total of nchunks
splits.
Example
julia> Dagger.recursive_splitters(Dagger.default_ord,
[10,20,30,40,50,60], 5,3)
([30], Any[[10, 20], [40, 50, 60]])
The first value [30]
represents a coarse split that cuts the dataset from -Inf-30, and 30-Inf. Each part is further recursively split using the next set of splitters
Dagger.render_logs
— Functionrender_logs(logs, vizmode::Symbol; options...)
render_logs(t, logs, vizmode::Symbol; options...)
Returns a displayable representation of the logs of a task t
and/or logs object logs
using the visualization mode vizmode
. options
are specific to the visualization mode.
Dagger.rmprocs!
— Methodrmprocs!(ctx::Context, xs)
Remove the specified workers xs
from ctx
.
Workers will typically finish all their assigned tasks if scheduling is ongoing but will not be assigned new tasks after removal.
Workers can be either Processor
s or the underlying process IDs as Integer
s.
Dagger.save
— Methodsave(io::IO, val)
Save a value into the IO buffer. In the case of arrays and sparse matrices, this will save it in a memory-mappable way.
load(io::IO, t::Type, domain)
will load the object given its domain
Dagger.save
— Methodsave(ctx, chunk::Union{Chunk, Thunk}, file_path::AbsractString)
Save a chunk to a file at file_path
.
Dagger.save
— Methodsave(ctx, chunk, file_path)
Special case distmem writing - write to disk on the process with the chunk.
Dagger.scope
— Methodscope(scs...) -> AbstractScope
scope(;scs...) -> AbstractScope
Constructs an AbstractScope
from a set of scope specifiers. Each element in scs
is a separate specifier; if scs
is empty, an empty UnionScope()
is produced; if scs
has one element, then exactly one specifier is constructed; if scs
has more than one element, a UnionScope
of the scopes specified by scs
is constructed. A variety of specifiers can be passed to construct a scope:
:any
- Constructs anAnyScope()
:default
- Constructs aDefaultScope()
(scs...,)
- Constructs aUnionScope
of scopes, each specified byscs
thread=tid
orthreads=[tids...]
- Constructs anExactScope
orUnionScope
containing allDagger.ThreadProc
s with thread IDtid
/tids
across all workers.worker=wid
orworkers=[wids...]
- Constructs aProcessScope
orUnionScope
containing allDagger.ThreadProc
s with worker IDwid
/wids
across all threads.thread=tid
/threads=tids
andworker=wid
/workers=wids
- Constructs anExactScope
,ProcessScope
, orUnionScope
containing allDagger.ThreadProc
s with worker IDwid
/wids
and threadstid
/tids
.
Aside from the worker and thread specifiers, it's possible to add custom specifiers for scoping to other kinds of processors (like GPUs) or providing different ways to specify a scope. Specifier selection is determined by a precedence ordering: by default, all specifiers have precedence 0
, which can be changed by defining scope_key_precedence(::Val{spec}) = precedence
(where spec
is the specifier as a Symbol)
. The specifier with the highest precedence in a set of specifiers is used to determine the scope by calling to_scope(::Val{spec}, sc::NamedTuple)
(where sc
is the full set of specifiers), which should be overriden for each custom specifier, and which returns an AbstractScope
. For example:
# Setup a GPU specifier
Dagger.scope_key_precedence(::Val{:gpu}) = 1
Dagger.to_scope(::Val{:gpu}, sc::NamedTuple) = ExactScope(MyGPUDevice(sc.worker, sc.gpu))
# Generate an `ExactScope` for `MyGPUDevice` on worker 2, device 3
Dagger.scope(gpu=3, worker=2)
Dagger.set_tls!
— Methodset_tls!(tls)
Sets all Dagger TLS variables from the NamedTuple
tls
.
Dagger.shard
— Methodshard(f; kwargs...) -> Chunk{Shard}
Executes f
on all workers in workers
, wrapping the result in a process-scoped Chunk
, and constructs a Chunk{Shard}
containing all of these Chunk
s on the current worker.
Keyword arguments:
procs
– The list of processors to create pieces on. May be any iterable container ofProcessor
s.workers
– The list of workers to create pieces on. May be any iterable container ofInteger
s.per_thread::Bool=false
– Iftrue
, creates a piece per each thread, rather than a piece per each worker.
Dagger.short_name
— MethodReturns a very brief String
representation of proc
.
Dagger.show_logs
— Functionshow_logs(io::IO, logs, vizmode::Symbol; options...)
show_logs(io::IO, t, logs, vizmode::Symbol; options...)
Displays the logs of a task t
and/or logs object logs
using the visualization mode vizmode
, which is written to the given IO stream io
. options
are specific to the visualization mode.
show_logs(logs, vizmode::Symbol; options...)
show_logs(t, logs, vizmode::Symbol; options...)
Returns a string representation of the logs of a task t
and/or logs object logs
using the visualization mode vizmode
. options
are specific to the visualization mode.
Dagger.spawn
— MethodDagger.spawn(f, args...; kwargs...) -> DTask
Spawns a DTask
that will call f(args...; kwargs...)
. Also supports passing a Dagger.Options
struct as the first argument to set task options. See Dagger.@spawn
for more details on DTask
s.
Dagger.spawn_datadeps
— Methodspawn_datadeps(f::Base.Callable; traversal::Symbol=:inorder)
Constructs a "datadeps" (data dependencies) region and calls f
within it. Dagger tasks launched within f
may wrap their arguments with In
, Out
, or InOut
to indicate whether the task will read, write, or read+write that argument, respectively. These argument dependencies will be used to specify which tasks depend on each other based on the following rules:
- Dependencies across different arguments are independent; only dependencies on the same argument synchronize with each other ("same-ness" is determined based on
isequal
) InOut
is the same asIn
andOut
applied simultaneously, and synchronizes with the union of theIn
andOut
effects- Any two or more
In
dependencies do not synchronize with each other, and may execute in parallel - An
Out
dependency synchronizes with any previousIn
andOut
dependencies - An
In
dependency synchronizes with any previousOut
dependencies - If unspecified, an
In
dependency is assumed
In general, the result of executing tasks following the above rules will be equivalent to simply executing tasks sequentially and in order of submission. Of course, if dependencies are incorrectly specified, undefined behavior (and unexpected results) may occur.
Unlike other Dagger tasks, tasks executed within a datadeps region are allowed to write to their arguments when annotated with Out
or InOut
appropriately.
At the end of executing f
, spawn_datadeps
will wait for all launched tasks to complete, rethrowing the first error, if any. The result of f
will be returned from spawn_datadeps
.
The keyword argument traversal
controls the order that tasks are launched by the scheduler, and may be set to :bfs
or :dfs
for Breadth-First Scheduling or Depth-First Scheduling, respectively. All traversal orders respect the dependencies and ordering of the launched tasks, but may provide better or worse performance for a given set of datadeps tasks. This argument is experimental and subject to change.
Dagger.split_range
— MethodUtility function to divide the range range
into n
chunks
Dagger.split_range_interval
— Methodsplit_range_interval(range, n)
split a range into pieces each of length n
or lesser
Dagger.stage_operands
— MethodAn operand which should be distributed as per convenience
Dagger.syrk_dagger!
— MethodPerforms one of the symmetric/hermitian rank k operations
C = alpha [ op( A ) * g( op( A )' )] + beta C,
where op( X ) is one of
op( X ) = X or op( X ) = g( X' )
where alpha and beta are real scalars, C is an n-by-n symmetric/hermitian matrix and A is an n-by-k matrix in the first case and a k-by-n matrix in the second case.
Dagger.system_uuid
— MethodGets the unique UUID for the server hosting worker wid
.
Dagger.thunk_processor
— Methodthunk_processor()
Get the current processor executing the current thunk.
Dagger.tochunk
— Methodtochunk(x, proc::Processor, scope::AbstractScope; device=nothing, kwargs...) -> Chunk
Create a chunk from data x
which resides on proc
and which has scope scope
.
device
specifies a MemPool.StorageDevice
(which is itself wrapped in a Chunk
) which will be used to manage the reference contained in the Chunk
generated by this function. If device
is nothing
(the default), the data will be inspected to determine if it's safe to serialize; if so, the default MemPool storage device will be used; if not, then a MemPool.CPURAMDevice
will be used.
All other kwargs are passed directly to MemPool.poolset
.
Dagger.tofile
— Methodtofile(data, path::AbstractString;
serialize::Base.Callable, deserialize::Base.Callable,
use_io::Bool, mmap::Bool) -> Dagger.File
Writes data
to disk at path
, using the serialization function serialize
. use_io
specifies whether serialize
takes an IO
(the default) or takes a String
path. mmap
is experimental, and specifies whether serialize
can use MemPool's MMWrap
memory mapping functionality (default is false
).
The returned File
object can be passed to tasks as an argument, or returned from tasks as a result.
Dagger.treereduce
— MethodTree reduce
Dagger.with_options
— Methodwith_options(f, options::NamedTuple) -> Any
with_options(f; options...) -> Any
Sets one or more options to the given values, executes f()
, resets the options to their previous values, and returns the result of f()
. This is the recommended way to set options, as it only affects tasks spawned within its scope. Note that setting an option here will propagate its value across Julia or Dagger tasks spawned by f()
or its callees (i.e. the options propagate).
Distributed.procs
— Methodprocs(ctx::Context)
Fetch the list of procs currently known to ctx
.
Dagger.@mutable
— Macro@mutable [worker=1] [processor=OSProc()] [scope=ProcessorScope()] f()
Helper macro for mutable()
.
Dagger.@option
— Macro@option name myfunc(A, B, C) = value
A convenience macro for defining default_option
. For example:
Dagger.@option single mylocalfunc(Int) = 1
The above call will set the single
option to 1
for any Dagger task calling mylocalfunc(Int)
with an Int
argument.
Dagger.@par
— Macro@par [opts] f(args...; kwargs...) -> Thunk
Convenience macro to call Dagger.delayed
on f
with arguments args
and keyword arguments kwargs
. May also be called with a series of assignments like so:
x = @par begin
a = f(1,2)
b = g(a,3)
h(a,b)
end
x
will hold the Thunk representing h(a,b)
; additionally, a
and b
will be defined in the same local scope and will be equally accessible for later calls.
Options to the Thunk
can be set as opts
with namedtuple syntax, e.g. single=1
. Multiple options may be provided, and will be applied to all generated thunks.
Dagger.@shard
— MacroCreates a Shard
. See Dagger.shard
for details.
Dagger.@spawn
— MacroDagger.@spawn [option=value]... f(args...; kwargs...) -> DTask
Spawns a Dagger DTask
that will call f(args...; kwargs...)
. This DTask
is like a Julia Task
, and has many similarities:
- The
DTask
can bewait
'd on andfetch
'd from to see its final result - By default, the
DTask
will be automatically run on the first available compute resource - If all dependencies are satisfied, the
DTask
will be run as soon as possible - The
DTask
may be run in parallel with otherDTask
s, and the scheduler will automatically manage dependencies - If a
DTask
throws an exception, it will be propagated to any calls tofetch
, but not to calls towait
However, the DTask
also has many key differences from a Task
:
- The
DTask
may run on any thread of any Julia process, and even on a remote machine, in your cluster (seeDistributed.addprocs
) - The
DTask
might automatically utilize GPUs or other accelerators, if available - If arguments to a
DTask
are alsoDTask
s, then the scheduler will execute those arguments'DTask
s first, before running the "downstream" task - If an argument to a
DTask
t2
is aDTask
t1
, then the result oft1
(gotten viafetch(t1)
) will be passed tot2
(no need fort2
to callfetch
!) DTask
s are generally expected to be defined "functionally", meaning that they should not mutate global state, mutate their arguments, or have side effectsDTask
s are function call-focused, meaning thatDagger.@spawn
expects a single function call, and not a block of code- All
DTask
arguments are expected to be safe to serialize and send to other Julia processes; if not, use thescope
option orDagger.@mutable
to control execution location
Options to the DTask
can be set before the call to f
with key-value syntax, e.g. Dagger.@spawn myopt=2 do_something(1, 3.0)
, which would set the option myopt
to 2
for this task. Multiple options may be provided, which are specified like Dagger.@spawn myopt=2 otheropt=4 do_something(1, 3.0)
.
These options control a variety of properties of the resulting DTask
:
scope
: The execution "scope" of the task, which determines where the task will run. By default, the task will run on the first available compute resource. If you have multiple compute resources, you can specify a scope to run the task on a specific resource. For example,Dagger.@spawn scope=Dagger.scope(worker=2) do_something(1, 3.0)
would rundo_something(1, 3.0)
on worker 2.meta
: Iftrue
, instead of the scheduler automatically fetching values from other tasks, the rawChunk
objects will be passed tof
. Useful for doing manual fetching or manipulation ofChunk
references. Non-Chunk
arguments are still passed as-is.
Other options exist; see Dagger.Sch.ThunkOptions
for the full list.
This macro is a semi-thin wrapper around Dagger.spawn
- it creates a call to Dagger.spawn
on f
with arguments args
and keyword arguments kwargs
, and also passes along any options in an Options
struct. For example, Dagger.@spawn myopt=2 do_something(1, 3.0)
would essentially become Dagger.spawn(do_something, Dagger.Options(;myopt=2), 1, 3.0)
.
Dagger.@unimplemented
— Macro@unimplemented fname(<args...>)
While it is nice to define generic function ad-hoc, it can sometimes get confusing to figure out which method is missing. @interface
creates a function which errors out pointing which method is missing.
Dagger.Sch.PROCESSOR_TIME_UTILIZATION
— ConstantProcess-local dictionary tracking per-processor total time utilization.
Dagger.Sch.TASKS_RUNNING
— ConstantProcess-local set of running task IDs.
Dagger.Sch.TASK_SYNC
— ConstantProcess-local condition variable (and lock) indicating task completion.
Dagger.Sch.ComputeState
— TypeComputeState
The internal state-holding struct of the scheduler.
Fields:
uid::UInt64
- Unique identifier for this scheduler instancewaiting::OneToMany
- Map from downstreamThunk
to upstreamThunk
s that still need to executewaiting_data::Dict{Union{Thunk,Chunk},Set{Thunk}}
- Map from inputChunk
/upstreamThunk
to all unfinished downstreamThunk
s, to retain cachesready::Vector{Thunk}
- The list ofThunk
s that are ready to executecache::WeakKeyDict{Thunk, Any}
- Maps from a finishedThunk
to it's cached result, often a DRefvalid::WeakKeyDict{Thunk, Nothing}
- Tracks allThunk
s that are in a valid scheduling staterunning::Set{Thunk}
- The set of currently-runningThunk
srunning_on::Dict{Thunk,OSProc}
- Map fromThunk
to the OS process executing itthunk_dict::Dict{Int, WeakThunk}
- Maps from thunk IDs to aThunk
node_order::Any
- Function that returns the order of a thunkworker_time_pressure::Dict{Int,Dict{Processor,UInt64}}
- Maps from worker ID to processor pressureworker_storage_pressure::Dict{Int,Dict{Union{StorageResource,Nothing},UInt64}}
- Maps from worker ID to storage resource pressureworker_storage_capacity::Dict{Int,Dict{Union{StorageResource,Nothing},UInt64}}
- Maps from worker ID to storage resource capacityworker_loadavg::Dict{Int,NTuple{3,Float64}}
- Worker load averageworker_chans::Dict{Int, Tuple{RemoteChannel,RemoteChannel}}
- Communication channels between the scheduler and each workerprocs_cache_list::Base.RefValue{Union{ProcessorCacheEntry,Nothing}}
- Cached linked list of processors ready to be usedsignature_time_cost::Dict{Signature,UInt64}
- Cache of estimated CPU time (in nanoseconds) required to compute calls with the given signaturesignature_alloc_cost::Dict{Signature,UInt64}
- Cache of estimated CPU RAM (in bytes) required to compute calls with the given signaturetransfer_rate::Ref{UInt64}
- Estimate of the network transfer rate in bytes per secondhalt::Base.Event
- Event indicating that the scheduler is haltinglock::ReentrantLock
- Lock around operations which modify the statefutures::Dict{Thunk, Vector{ThunkFuture}}
- Futures registered for waiting on the result of a thunk.errored::WeakKeyDict{Thunk,Bool}
- Indicates if a thunk's result is an error.thunks_to_delete::Set{Thunk}
- The list ofThunk
s ready to be deleted upon completion.chan::RemoteChannel{Channel{Any}}
- Channel for receiving completed thunks.
Dagger.Sch.DynamicThunkException
— TypeThrown when a dynamic thunk encounters an exception in Dagger's utilities.
Dagger.Sch.MaxUtilization
— TypeMaxUtilization
Indicates a thunk that uses all processors of a given type.
Dagger.Sch.SchedulerHaltedException
— TypeThrown when the scheduler halts before finishing processing the DAG.
Dagger.Sch.SchedulerHandle
— TypeA handle to the scheduler, used by dynamic thunks.
Dagger.Sch.SchedulerOptions
— TypeSchedulerOptions
Stores DAG-global options to be passed to the Dagger.Sch scheduler.
Arguments
single::Int=0
: (Deprecated) Force all work onto worker with specified id.0
disables this option.proclist=nothing
: (Deprecated) Force scheduler to use one or more processors that are instances/subtypes of a contained type. Alternatively, a function can be supplied, and the function will be called with a processor as the sole argument and should return aBool
result to indicate whether or not to use the given processor.nothing
enables all default processors.allow_errors::Bool=true
: Allow thunks to error without affecting non-dependent thunks.checkpoint=nothing
: If notnothing
, uses the provided function to save the final result of the current scheduler invocation to persistent storage, for later retrieval byrestore
.restore=nothing
: If notnothing
, uses the provided function to return the (cached) final result of the current scheduler invocation, were it to execute. If this returns aChunk
, all thunks will be skipped, and theChunk
will be returned. Ifnothing
is returned, restoring is skipped, and the scheduler will execute as usual. If this function throws an error, restoring will be skipped, and the error will be displayed.
Dagger.Sch.ThunkID
— TypeIdentifies a thunk by its ID, and preserves the thunk in the scheduler.
Dagger.Sch.ThunkOptions
— TypeThunkOptions
Stores Thunk-local options to be passed to the Dagger.Sch scheduler.
Arguments
single::Int=0
: (Deprecated) Force thunk onto worker with specified id.0
disables this option.proclist=nothing
: (Deprecated) Force thunk to use one or more processors that are instances/subtypes of a contained type. Alternatively, a function can be supplied, and the function will be called with a processor as the sole argument and should return aBool
result to indicate whether or not to use the given processor.nothing
enables all default processors.time_util::Dict{Type,Any}
: Indicates the maximum expected time utilization for this thunk. Each keypair maps a processor type to the utilization, where the value can be a real (approximately the number of nanoseconds taken), orMaxUtilization()
(utilizes all processors of this type). By default, the scheduler assumes that this thunk only uses one processor.alloc_util::Dict{Type,UInt64}
: Indicates the maximum expected memory utilization for this thunk. Each keypair maps a processor type to the utilization, where the value is an integer representing approximately the maximum number of bytes allocated at any one time.occupancy::Dict{Type,Real}
: Indicates the maximum expected processor occupancy for this thunk. Each keypair maps a processor type to the utilization, where the value can be a real between 0 and 1 (the occupancy ratio, where 1 is full occupancy). By default, the scheduler assumes that this thunk has full occupancy.allow_errors::Bool=true
: Allow this thunk to error without affecting non-dependent thunks.checkpoint=nothing
: If notnothing
, uses the provided function to save the result of the thunk to persistent storage, for later retrieval byrestore
.restore=nothing
: If notnothing
, uses the provided function to return the (cached) result of this thunk, were it to execute. If this returns aChunk
, this thunk will be skipped, and its result will be set to theChunk
. Ifnothing
is returned, restoring is skipped, and the thunk will execute as usual. If this function throws an error, restoring will be skipped, and the error will be displayed.storage::Union{Chunk,Nothing}=nothing
: If notnothing
, references aMemPool.StorageDevice
which will be passed toMemPool.poolset
internally when constructingChunk
s (such as when constructing the return value). The device must supportMemPool.CPURAMResource
. Whennothing
, usesMemPool.GLOBAL_DEVICE[]
.storage_root_tag::Any=nothing
: If notnothing
, specifies the MemPool storage leaf tag to associate with the thunk's result. This tag can be used by MemPool's storage devices to manipulate their behavior, such as the file name used to store data on disk."storage_leaf_tag::MemPool.Tag,Nothing}=nothing
: If notnothing
, specifies the MemPool storage leaf tag to associate with the thunk's result. This tag can be used by MemPool's storage devices to manipulate their behavior, such as the file name used to store data on disk."storage_retain::Bool=false
: The value ofretain
to pass toMemPool.poolset
when constructing the resultChunk
.
Base.fetch
— MethodWaits on a thunk to complete, and fetches its result.
Base.merge
— MethodBase.merge(sopts::SchedulerOptions, topts::ThunkOptions) -> ThunkOptions
Combine SchedulerOptions
and ThunkOptions
into a new ThunkOptions
.
Base.wait
— MethodWaits on a thunk to complete.
Dagger.Sch.add_thunk!
— MethodAdds a new Thunk to the DAG.
Dagger.Sch.cleanup_syncdeps!
— MethodCleans up any syncdeps that aren't needed any longer, and returns a Set{Chunk}
of all chunks that can now be evicted from workers.
Dagger.Sch.collect_task_inputs
— MethodCollects all arguments for task
, converting Thunk inputs to Chunks.
Dagger.Sch.do_task
— Methoddo_task(to_proc, task_desc) -> Any
Executes a single task specified by task_desc
on to_proc
.
Dagger.Sch.do_tasks
— Methoddo_tasks(to_proc, return_queue, tasks)
Executes a batch of tasks on to_proc
, returning their results through return_queue
.
Dagger.Sch.dynamic_listener!
— MethodProcesses dynamic messages from worker-executing thunks.
Dagger.Sch.errormonitor_tracked
— MethodLike errormonitor
, but tracks how many outstanding tasks are running.
Dagger.Sch.estimate_task_costs
— MethodEstimates the cost of scheduling task
on each processor in procs
. Considers current estimated per-processor compute pressure, and transfer costs for each Chunk
argument to task
. Returns (procs, costs)
, with procs
sorted in order of ascending cost.
Dagger.Sch.exec!
— MethodExecutes an arbitrary function within the scheduler, returning the result.
Dagger.Sch.fill_registered_futures!
— MethodFills the result for all registered futures of node
.
Dagger.Sch.get_dag_ids
— MethodReturns all Thunks IDs as a Dict, mapping a Thunk to its downstream dependents.
Dagger.Sch.get_propagated_options
— MethodGets a NamedTuple
of options propagated by thunk
.
Dagger.Sch.halt!
— MethodCommands the scheduler to halt execution immediately.
Dagger.Sch.handle_fault
— Methodhandle_fault(...)
An internal function to handle a worker dying or being killed by the OS. Attempts to determine which Thunk
s were running on (or had their results cached on) the dead worker, and stores them in a "deadlist". It uses this deadlist to correct the scheduler's internal ComputeState
struct to recover from the fault.
Note: The logic for this functionality is not currently perfectly robust to all failure modes, and is only really intended as a last-ditch attempt to repair and continue executing. While it should never cause incorrect execution of DAGs, it may cause a KeyError
or other failures in the scheduler due to the complexity of getting the internal state back to a consistent and proper state.
Dagger.Sch.impute_sum
— MethodLike sum
, but replaces nothing
entries with the average of non-nothing
entries.
Dagger.Sch.monitor_procs_changed!
— MethodMonitors for workers being added/removed to/from ctx
, sets up or tears down per-worker state, and notifies the scheduler so that work can be reassigned.
Dagger.Sch.populate_defaults
— Methodpopulate_defaults(opts::ThunkOptions, Tf, Targs) -> ThunkOptions
Returns a ThunkOptions
with default values filled in for a function of type Tf
with argument types Targs
, if the option was previously unspecified in opts
.
Dagger.Sch.print_sch_status
— MethodInternal utility, useful for debugging scheduler state.
Dagger.Sch.register_future!
— FunctionWaits on a thunk to complete, and fetches its result. If check
is set to true
(the default), then a domination check will occur to ensure that the future isn't being registered on a thunk dominated by the calling thunk.
Dagger.Sch.reschedule_syncdeps!
— FunctionPrepares the scheduler to schedule thunk
. Will mark thunk
as ready if its inputs are satisfied.
Dagger.Sch.sch_handle
— MethodGets the scheduler handle for the currently-executing thunk.
Dagger.Sch.schedule_dependents!
— MethodSchedules any dependents that may be ready to execute.
Dagger.Sch.set_failed!
— FunctionMarks thunk
and all dependent thunks as failed.
Dagger.Sch.thunk_yield
— MethodAllows a thunk to safely wait on another thunk by temporarily reducing its effective occupancy to 0, which allows a newly-spawned task to run.
Dagger.Sch.unwrap_nested_exception
— Methodunwrap_nested_exception(err::Exception) -> Bool
Extracts the "core" exception from a nested exception."
Dagger.Sch.walk_data
— Methodwalk_data(f, x)
Walks the data contained in x
in DFS fashion, and executes f
at each object that hasn't yet been seen.
Dagger.Sch.walk_storage_safe
— MethodWalks x
and returns a Bool
indicating whether x
is safe to serialize.
Dagger.Events.BytesAllocd
— TypeBytesAllocd
Tracks memory allocated for Chunk
s.
Dagger.Events.ProcessorSaturation
— TypeProcessorSaturation
Tracks the compute saturation (running tasks) per-processor.
Dagger.Events.TaskArgumentMoves
— TypeTaskArgumentMoves
Records any move
-derived copies of arguments of each task.
Dagger.Events.TaskArguments
— TypeTaskArguments
Records the raw (mutable) arguments of each submitted task.
Dagger.Events.TaskDependencies
— TypeTaskDependencies
Records the dependencies of each submitted task.
Dagger.Events.TaskNames
— TypeTaskNames
Creates a unique name for each task.
Dagger.Events.WorkerSaturation
— TypeWorkerSaturation
Tracks the compute saturation (running tasks).