Dagger.AutoBlocksType
AutoBlocks

Automatically determines the size and number of blocks for a distributed array. This may construct any kind of Dagger.AbstractBlocks partitioning.

Dagger.BlocksMethod
Blocks(xs...)

Indicates the size of an array operation, specified as xs, whose length indicates the number of dimensions in the resulting array.

Dagger.ChunkType
Chunk

A reference to a piece of data located on a remote worker. Chunks are typically created with Dagger.tochunk(data), and the data can then be accessed from any worker with collect(::Chunk). Chunks are serialization-safe, and use distributed refcounting (provided by MemPool.DRef) to ensure that the data referenced by a Chunk won't be GC'd, as long as a reference exists on some worker.

Each Chunk is associated with a given Dagger.Processor, which is (in a sense) the processor that "owns" or contains the data. Calling collect(::Chunk) will perform data movement and conversions defined by that processor to safely serialize the data to the calling worker.

Constructors

See tochunk.

Dagger.ContextType
Context(xs::Vector{OSProc}) -> Context
Context(xs::Vector{Int}) -> Context

Create a Context, by default adding each available worker.

It is also possible to create a Context from a vector of OSProc, or equivalently the underlying process ids can also be passed directly as a Vector{Int}.

Special fields include:

  • 'log_sink': A log sink object to use, if any.
  • profile::Bool: Whether or not to perform profiling with Profile stdlib.
Dagger.DArrayType
DArray{T,N,F}(domain, subdomains, chunks, concat)
DArray(T, domain, subdomains, chunks, [concat=cat])

An N-dimensional distributed array of element type T, with a concatenation function of type F.

Arguments

  • T: element type
  • domain::ArrayDomain{N}: the whole ArrayDomain of the array
  • subdomains::AbstractArray{ArrayDomain{N}, N}: a DomainBlocks of the same dimensions as the array
  • chunks::AbstractArray{Union{Chunk,Thunk}, N}: an array of chunks of dimension N
  • concat::F: a function of type F. concat(x, y; dims=d) takes two chunks x and y and concatenates them along dimension d. cat is used by default.
Dagger.DTaskType
DTask

Returned from Dagger.@spawn/Dagger.spawn calls. Represents a task that is in the scheduler, potentially ready to execute, executing, or finished executing. May be fetch'd or wait'd on at any time. See Dagger.@spawn for more details.

Dagger.DepsType

Specifies one or more dependencies.

Dagger.FileMethod
File(path::AbstractString;
     serialize::Base.Callable, deserialize::Base.Callable,
     use_io::Bool, mmap::Bool) -> Dagger.File

References data in the file at path, using the derialization function deserialize. use_io specifies whether deserialize takes an IO (the default) or takes a String path. mmap is experimental, and specifies whether deserialize can use MemPool's MMWrap memory mapping functionality (default is false).

The file at path is not yet loaded when the call to File returns; passing it to a task will cause reading to occur, and the result will be passed to the task.

Dagger.FileReaderType
FileReader

Used as a Chunk handle for reading a file, starting at a given offset.

Dagger.InType

Specifies a read-only dependency.

Dagger.OSProcType
OSProc <: Processor

Julia CPU (OS) process, identified by Distributed pid. The logical parent of all processors on a given node, but otherwise does not participate in computations.

Dagger.OptionsType
Options(::NamedTuple)
Options(; kwargs...)

Options for thunks and the scheduler. See Task Spawning for more information.

Dagger.OutType

Specifies a write-only dependency.

Dagger.ProcessorType
Processor

An abstract type representing a processing device and associated memory, where data can be stored and operated on. Subtypes should be immutable, and instances should compare equal if they represent the same logical processing device/memory. Subtype instances should be serializable between different nodes. Subtype instances may contain a "parent" Processor to make it easy to transfer data to/from other types of Processor at runtime.

Dagger.PromotePartitionType

This is a way of suggesting that stage should call stage_operand with the operation and other arguments.

Dagger.ShardType

Maps a value to one of multiple distributed "mirror" values automatically when used as a thunk argument. Construct using @shard or shard.

Dagger.ThreadProcType
ThreadProc <: Processor

Julia CPU (OS) thread, identified by Julia thread ID.

Dagger.ThunkType
Thunk

Wraps a callable object to be run with Dagger. A Thunk is typically created through a call to delayed or its macro equivalent @par.

Constructors

delayed(f; kwargs...)(args...)
@par [option=value]... f(args...)

Examples

julia> t = delayed(sin)(π)  # creates a Thunk to be computed later
Thunk(sin, (π,))

julia> collect(t)  # computes the result and returns it to the current process
1.2246467991473532e-16

Arguments

  • f: The function to be called upon execution of the Thunk.
  • args: The arguments to be passed to the Thunk.
  • kwargs: The properties describing unique behavior of this Thunk. Details

for each property are described in the next section.

  • option=value: The same as passing kwargs to delayed.

Public Properties

  • meta::Bool=false: If true, instead of fetching cached arguments from

Chunks and passing the raw arguments to f, instead pass the Chunk. Useful for doing manual fetching or manipulation of Chunk references. Non-Chunk arguments are still passed as-is.

  • processor::Processor=OSProc() - The processor associated with f. Useful if

f is a callable struct that exists on a given processor and should be transferred appropriately.

  • scope::Dagger.AbstractScope=DefaultScope() - The scope associated with f.

Useful if f is a function or callable struct that may only be transferred to, and executed within, the specified scope.

Options

  • options: A Sch.ThunkOptions struct providing the options for the Thunk.

If omitted, options can also be specified by passing key-value pairs as kwargs.

Dagger.ThunkSummaryType

A summary of the data contained in a Thunk, which can be safely serialized.

Dagger.UnitDomainType
UnitDomain

Default domain – has no information about the value

Base.fetchMethod
Base.fetch(c::DArray)

If a DArray tree has a Thunk in it, make the whole thing a big thunk.

Base.lockMethod
lock(f, ctx::Context)

Acquire ctx.proc_lock, execute f with the lock held, and release the lock when f returns.

Base.viewMethod
view(c::DArray, d)

A view of a DArray chunk returns a DArray of Thunks.

Dagger.DefaultScopeMethod

Default scope that contains the set of default_enabled processors.

Dagger._delayedMethod
delayed(f, options=Options())(args...; kwargs...) -> Thunk
delayed(f; options...)(args...; kwargs...) -> Thunk

Creates a Thunk object which can be executed later, which will call f with args and kwargs. options controls various properties of the resulting Thunk.

Dagger.addprocs!Method
addprocs!(ctx::Context, xs)

Add new workers xs to ctx.

Workers will typically be assigned new tasks in the next scheduling iteration if scheduling is ongoing.

Workers can be either Processors or the underlying process IDs as Integers.

Dagger.alignfirstMethod
alignfirst(a) -> ArrayDomain

Make a subdomain a standalone domain.

Example

julia> alignfirst(ArrayDomain(11:25, 21:100))
ArrayDomain((1:15), (1:80))
Dagger.all_processorsFunction
all_processors(ctx::Context=Sch.eager_context()) -> Set{Processor}

Returns the set of all processors available to the scheduler, across all Distributed workers.

Dagger.allowscalarFunction

Allow/disallow scalar indexing for the duration of executing f.

Dagger.compatible_processorsFunction
compatible_processors(scope::AbstractScope, ctx::Context=Sch.eager_context()) -> Set{Processor}

Returns the set of all processors (across all Distributed workers) that are compatible with the given scope.

Dagger.computeMethod
compute(ctx::Context, d::Thunk; options=nothing) -> Chunk

Compute a Thunk - creates the DAG, assigns ranks to nodes for tie breaking and runs the scheduler with the specified options. Returns a Chunk which references the result.

Dagger.constrainMethod
constraint(x::AbstractScope, y::AbstractScope) -> ::AbstractScope

Constructs a scope that is the intersection of scopes x and y.

Dagger.default_enabledMethod
default_enabled(proc::Processor) -> Bool

Returns whether processor proc is enabled by default. The default value is false, which is an opt-out of the processor from execution when not specifically requested by the user, and true implies opt-in, which causes the processor to always participate in execution when possible.

Dagger.default_optionMethod
default_option(::Val{name}, Tf, Targs...) where name = value

Defines the default value for option name to value when Dagger is preparing to execute a function with type Tf with the argument types Targs. Users and libraries may override this to set default values for tasks.

An easier way to define these defaults is with @option.

Note that the actual task's argument values are not passed, as it may not always be possible or efficient to gather all Dagger task arguments on one worker.

This function may be executed within the scheduler, so it should generally be made very cheap to execute. If the function throws an error, the scheduler will use whatever the global default value is for that option instead.

Dagger.dependentsMethod
dependents(node::Thunk) -> Dict{Union{Thunk,Chunk}, Set{Thunk}}

Find the set of direct dependents for each task.

Dagger.domainFunction
domain(x::T)

Returns metadata about x. This metadata will be in the domain field of a Chunk object when an object of type T is created as the result of evaluating a Thunk.

Dagger.domainMethod
domain(x::AbstractArray) -> ArrayDomain

The domain of an array is an ArrayDomain.

Dagger.domainMethod

If no domain method is defined on an object, then we use the UnitDomain on it. A UnitDomain is indivisible.

Dagger.dsort_chunksFunction
dsort_chunks(cs, [nchunks, nsamples]; options...)

Sort contents of chunks (cs) and return a new set of chunks such that the chunks when concatenated return a sorted collection. Each chunk in turn is sorted.

Arguments

  • nchunks: the number of chunks to produce, regardless of how many chunks were given as input
  • nsamples: the number of elements to sample from each chunk to guess the splitters (nchunks-1 splitters) each chunk will be delimited by the splitter.
  • merge: a function to merge two sorted collections.
  • sub: a function to get a subset of the collection takes (collection, range) (defaults to getindex)
  • order: Base.Sort.Ordering to be used for sorting
  • batchsize: number of chunks to split and merge at a time (e.g. if there are 128 input chunks and 128 output chunks, and batchsize is 8, then we first sort among batches of 8 chunks – giving 16 batches. Then we sort among the first chunk of the first 8 batches (all elements less than the first splitter), then go on to the first 8 chunks of the second 8 batches, and so on...
  • chunks_presorted: is each chunk in the input already sorted?
  • sortandsample: a function to sort a chunk, then sample N elements to infer the splitters. It takes 3 arguments: (collection, N, presorted). presorted is a boolean which is true if the chunk is already sorted.
  • affinities: a list of processes where the output chunks should go. If the length is not equal to nchunks then affinities array is cycled through.

Returns

A tuple of (chunk, samples) where chunk is the Dagger.Chunk object. chunk can be nothing if no change to the initial array was made (e.g. it was already sorted)

Dagger.enable_disk_caching!Function
enable_disk_caching!(mem_limits::Dict{Int,Int}, disk_limit_mb::Int=16*2^10, processes::Vector{Int}=procs())

Sets up disk caching on participating processes. This is a low level method for applying the mem_limits directly onto the processes. This skips the process discovery stage and the limit calculation.

Dagger.enable_disk_caching!Function
enable_disk_caching!(ram_percentage_limit::Int=30, disk_limit_mb::Int=16*2^10, processes::Vector{Int}=procs())

Sets up disk caching on all processes available in the environment according to the provided limits. The user should provide the percentage, which will decide what's the memory limit on each participating machine (differentiated by hostname). The disk limit is set strictly per process and doesn't include any hostname related logic.

Dagger.enable_logging!Method
enable_logging!(;kwargs...)

Enables logging globally for all workers. Certain core events are always enabled by this call, but additional ones may be specified via kwargs.

Extra events:

  • metrics::Bool: Enables various utilization and allocation metrics
  • timeline::Bool: Enables raw "timeline" values, which are event-specific; not recommended except for debugging
  • tasknames::Bool: Enables generating unique task names for each task
  • taskdeps::Bool: Enables reporting of upstream task dependencies (as task IDs) for each task argument
  • taskargs::Bool: Enables reporting of upstream non-task dependencies (as objectid hash) for each task argument
  • taskargmoves::Bool: Enables reporting of copies of upstream dependencies (as original and copy objectid hashes) for each task argument
  • profile::Bool: Enables profiling of task execution; not currently recommended, as it adds significant overhead
Dagger.execute!Function
execute!(proc::Processor, f, args...; kwargs...) -> Any

Executes the function f with arguments args and keyword arguments kwargs on processor proc. This function can be overloaded by Processor subtypes to allow executing function calls differently than normal Julia.

Dagger.fetch_logs!Method
fetch_logs!() -> Dict{Int, Dict{Symbol, Vector}}

Fetches and returns the currently-accumulated logs for each worker. Each entry of the outer Dict is keyed on worker ID, so logs[1] are the logs for worker 1.

Consider using show_logs or render_logs to generate a renderable display of these logs.

Dagger.gemm_dagger!Method

Performs one of the matrix-matrix operations

C = alpha [op( A ) * op( B )] + beta C,

where op( X ) is one of

op( X ) = X or op( X ) = X' or op( X ) = g( X' )

alpha and beta are scalars, and A, B and C are matrices, with op( A ) an m by k matrix, op( B ) a k by n matrix and C an m by n matrix.

Dagger.get_optionsMethod
get_options(key::Symbol, default) -> Any
get_options(key::Symbol) -> Any

Returns the value of the option named key. If option does not have a value set, then an error will be thrown, unless default is set, in which case it will be returned instead of erroring.

get_options() -> NamedTuple

Returns a NamedTuple of all option key-value pairs.

Dagger.get_parentFunction
get_parent(proc::Processor) -> Processor

Returns the parent processor for proc. The ultimate parent processor is an OSProc. Processor subtypes should overload this to return their most direct parent.

Dagger.get_processorsMethod
get_processors(proc::Processor) -> Set{<:Processor}

Returns the set of processors contained in proc, if any. Processor subtypes should overload this function if they can contain sub-processors. The default method will return a Set containing proc itself.

Dagger.get_tlsMethod
get_tls()

Gets all Dagger TLS variable as a NamedTuple.

Dagger.has_writedepMethod

Whether arg has any writedep at or before executing task in this datadeps region.

Dagger.iscompatibleMethod
iscompatible(proc::Processor, opts, f, Targs...) -> Bool

Indicates whether proc can execute f over Targs given opts. Processor subtypes should overload this function to return true if and only if it is essentially guaranteed that f(::Targs...) is supported. Additionally, iscompatible_func and iscompatible_arg can be overriden to determine compatibility of f and Targs individually. The default implementation returns false.

Dagger.loadMethod
load(ctx::Context, file_path)

Load an Union{Chunk, Thunk} from a file.

Dagger.loadMethod
load(ctx::Context, ::Type{Chunk}, fpath, io)

Load a Chunk object from a file, the file path is required for creating a FileReader object

Dagger.logs_annotate!Method

Associates an argument arg with name in the logs, which logs renderers may utilize for display purposes.

Dagger.mem_limitMethod
mem_limit(total::UInt, percentage_limit::Int, nprocs::Int)

Returns the per process mem limit in MiB based on the provided total memory, percentage_limit (1-100) and the number of processes in the current setup.

Dagger.moveMethod
move(from_proc::Processor, to_proc::Processor, x)

Moves and/or converts x such that it's available and suitable for usage on the to_proc processor. This function can be overloaded by Processor subtypes to transport arguments and convert them to an appropriate form before being used for exection. Subtypes of Processor wishing to implement efficient data movement should provide implementations where x::Chunk.

Dagger.mutableMethod
mutable(f::Base.Callable; worker, processor, scope) -> Chunk

Calls f() on the specified worker or processor, returning a Chunk referencing the result with the specified scope scope.

Dagger.noffspringMethod
noffspring(dpents::Dict{Union{Thunk,Chunk}, Set{Thunk}}) -> Dict{Thunk, Int}

Recursively find the number of tasks dependent on each task in the DAG. Takes a Dict as returned by dependents.

Dagger.num_processorsFunction
num_processors(scope::AbstractScope=DefaultScope(), all::Bool=false) -> Int

Returns the number of processors available to Dagger by default, or if specified, according to scope. If all=true, instead returns the number of processors known to Dagger, whether or not they've been disabled by the user. Most users will want to use num_processors().

Dagger.orderMethod
order(node::Thunk, ndeps) -> Dict{Thunk,Int}

Given a root node of the DAG, calculates a total order for tie-breaking.

  • Root node gets score 1,
  • rest of the nodes are explored in DFS fashion but chunks of each node are explored in order of noffspring, i.e. total number of tasks depending on the result of the said node.

Args:

Dagger.recursive_splittersMethod
recursive_splitters(ord, splitters, nchunks, batchsize) -> Tuple{Vector, Vector{Vector}}

Split the splitters themselves into batches.

Arguments

  • ord: Sorting.Ordering object
  • splitters: the nchunks-1 splitters
  • batchsize: batch size

Returns

A Tuple{Vector, Vector{Vector}} – the coarse splitters which create batchsize splits, finer splitters within those batches which create a total of nchunks splits.

Example

julia> Dagger.recursive_splitters(Dagger.default_ord,
    [10,20,30,40,50,60], 5,3)
([30], Any[[10, 20], [40, 50, 60]])

The first value [30] represents a coarse split that cuts the dataset from -Inf-30, and 30-Inf. Each part is further recursively split using the next set of splitters

Dagger.render_logsFunction
render_logs(logs, vizmode::Symbol; options...)
render_logs(t, logs, vizmode::Symbol; options...)

Returns a displayable representation of the logs of a task t and/or logs object logs using the visualization mode vizmode. options are specific to the visualization mode.

Dagger.rmprocs!Method
rmprocs!(ctx::Context, xs)

Remove the specified workers xs from ctx.

Workers will typically finish all their assigned tasks if scheduling is ongoing but will not be assigned new tasks after removal.

Workers can be either Processors or the underlying process IDs as Integers.

Dagger.saveMethod
save(io::IO, val)

Save a value into the IO buffer. In the case of arrays and sparse matrices, this will save it in a memory-mappable way.

load(io::IO, t::Type, domain) will load the object given its domain

Dagger.saveMethod
save(ctx, chunk::Union{Chunk, Thunk}, file_path::AbsractString)

Save a chunk to a file at file_path.

Dagger.saveMethod
save(ctx, chunk, file_path)

Special case distmem writing - write to disk on the process with the chunk.

Dagger.scopeMethod
scope(scs...) -> AbstractScope
scope(;scs...) -> AbstractScope

Constructs an AbstractScope from a set of scope specifiers. Each element in scs is a separate specifier; if scs is empty, an empty UnionScope() is produced; if scs has one element, then exactly one specifier is constructed; if scs has more than one element, a UnionScope of the scopes specified by scs is constructed. A variety of specifiers can be passed to construct a scope:

  • :any - Constructs an AnyScope()
  • :default - Constructs a DefaultScope()
  • (scs...,) - Constructs a UnionScope of scopes, each specified by scs
  • thread=tid or threads=[tids...] - Constructs an ExactScope or UnionScope containing all Dagger.ThreadProcs with thread ID tid/tids across all workers.
  • worker=wid or workers=[wids...] - Constructs a ProcessScope or UnionScope containing all Dagger.ThreadProcs with worker ID wid/wids across all threads.
  • thread=tid/threads=tids and worker=wid/workers=wids - Constructs an ExactScope, ProcessScope, or UnionScope containing all Dagger.ThreadProcs with worker ID wid/wids and threads tid/tids.

Aside from the worker and thread specifiers, it's possible to add custom specifiers for scoping to other kinds of processors (like GPUs) or providing different ways to specify a scope. Specifier selection is determined by a precedence ordering: by default, all specifiers have precedence 0, which can be changed by defining scope_key_precedence(::Val{spec}) = precedence (where spec is the specifier as a Symbol). The specifier with the highest precedence in a set of specifiers is used to determine the scope by calling to_scope(::Val{spec}, sc::NamedTuple) (where sc is the full set of specifiers), which should be overriden for each custom specifier, and which returns an AbstractScope. For example:

# Setup a GPU specifier
Dagger.scope_key_precedence(::Val{:gpu}) = 1
Dagger.to_scope(::Val{:gpu}, sc::NamedTuple) = ExactScope(MyGPUDevice(sc.worker, sc.gpu))

# Generate an `ExactScope` for `MyGPUDevice` on worker 2, device 3
Dagger.scope(gpu=3, worker=2)
Dagger.set_tls!Method
set_tls!(tls)

Sets all Dagger TLS variables from the NamedTuple tls.

Dagger.shardMethod
shard(f; kwargs...) -> Chunk{Shard}

Executes f on all workers in workers, wrapping the result in a process-scoped Chunk, and constructs a Chunk{Shard} containing all of these Chunks on the current worker.

Keyword arguments:

  • procs – The list of processors to create pieces on. May be any iterable container of Processors.
  • workers – The list of workers to create pieces on. May be any iterable container of Integers.
  • per_thread::Bool=false – If true, creates a piece per each thread, rather than a piece per each worker.
Dagger.show_logsFunction
show_logs(io::IO, logs, vizmode::Symbol; options...)
show_logs(io::IO, t, logs, vizmode::Symbol; options...)

Displays the logs of a task t and/or logs object logs using the visualization mode vizmode, which is written to the given IO stream io. options are specific to the visualization mode.


show_logs(logs, vizmode::Symbol; options...)
show_logs(t, logs, vizmode::Symbol; options...)

Returns a string representation of the logs of a task t and/or logs object logs using the visualization mode vizmode. options are specific to the visualization mode.

Dagger.spawnMethod
Dagger.spawn(f, args...; kwargs...) -> DTask

Spawns a DTask that will call f(args...; kwargs...). Also supports passing a Dagger.Options struct as the first argument to set task options. See Dagger.@spawn for more details on DTasks.

Dagger.spawn_datadepsMethod
spawn_datadeps(f::Base.Callable; traversal::Symbol=:inorder)

Constructs a "datadeps" (data dependencies) region and calls f within it. Dagger tasks launched within f may wrap their arguments with In, Out, or InOut to indicate whether the task will read, write, or read+write that argument, respectively. These argument dependencies will be used to specify which tasks depend on each other based on the following rules:

  • Dependencies across different arguments are independent; only dependencies on the same argument synchronize with each other ("same-ness" is determined based on isequal)
  • InOut is the same as In and Out applied simultaneously, and synchronizes with the union of the In and Out effects
  • Any two or more In dependencies do not synchronize with each other, and may execute in parallel
  • An Out dependency synchronizes with any previous In and Out dependencies
  • An In dependency synchronizes with any previous Out dependencies
  • If unspecified, an In dependency is assumed

In general, the result of executing tasks following the above rules will be equivalent to simply executing tasks sequentially and in order of submission. Of course, if dependencies are incorrectly specified, undefined behavior (and unexpected results) may occur.

Unlike other Dagger tasks, tasks executed within a datadeps region are allowed to write to their arguments when annotated with Out or InOut appropriately.

At the end of executing f, spawn_datadeps will wait for all launched tasks to complete, rethrowing the first error, if any. The result of f will be returned from spawn_datadeps.

The keyword argument traversal controls the order that tasks are launched by the scheduler, and may be set to :bfs or :dfs for Breadth-First Scheduling or Depth-First Scheduling, respectively. All traversal orders respect the dependencies and ordering of the launched tasks, but may provide better or worse performance for a given set of datadeps tasks. This argument is experimental and subject to change.

Dagger.syrk_dagger!Method

Performs one of the symmetric/hermitian rank k operations

C = alpha [ op( A ) * g( op( A )' )] + beta C,

where op( X ) is one of

op( X ) = X or op( X ) = g( X' )

where alpha and beta are real scalars, C is an n-by-n symmetric/hermitian matrix and A is an n-by-k matrix in the first case and a k-by-n matrix in the second case.

Dagger.tochunkMethod
tochunk(x, proc::Processor, scope::AbstractScope; device=nothing, kwargs...) -> Chunk

Create a chunk from data x which resides on proc and which has scope scope.

device specifies a MemPool.StorageDevice (which is itself wrapped in a Chunk) which will be used to manage the reference contained in the Chunk generated by this function. If device is nothing (the default), the data will be inspected to determine if it's safe to serialize; if so, the default MemPool storage device will be used; if not, then a MemPool.CPURAMDevice will be used.

All other kwargs are passed directly to MemPool.poolset.

Dagger.tofileMethod
tofile(data, path::AbstractString;
       serialize::Base.Callable, deserialize::Base.Callable,
       use_io::Bool, mmap::Bool) -> Dagger.File

Writes data to disk at path, using the serialization function serialize. use_io specifies whether serialize takes an IO (the default) or takes a String path. mmap is experimental, and specifies whether serialize can use MemPool's MMWrap memory mapping functionality (default is false).

The returned File object can be passed to tasks as an argument, or returned from tasks as a result.

Dagger.with_optionsMethod
with_options(f, options::NamedTuple) -> Any
with_options(f; options...) -> Any

Sets one or more options to the given values, executes f(), resets the options to their previous values, and returns the result of f(). This is the recommended way to set options, as it only affects tasks spawned within its scope. Note that setting an option here will propagate its value across Julia or Dagger tasks spawned by f() or its callees (i.e. the options propagate).

Distributed.procsMethod
procs(ctx::Context)

Fetch the list of procs currently known to ctx.

Dagger.@mutableMacro
@mutable [worker=1] [processor=OSProc()] [scope=ProcessorScope()] f()

Helper macro for mutable().

Dagger.@optionMacro
@option name myfunc(A, B, C) = value

A convenience macro for defining default_option. For example:

Dagger.@option single mylocalfunc(Int) = 1

The above call will set the single option to 1 for any Dagger task calling mylocalfunc(Int) with an Int argument.

Dagger.@parMacro
@par [opts] f(args...; kwargs...) -> Thunk

Convenience macro to call Dagger.delayed on f with arguments args and keyword arguments kwargs. May also be called with a series of assignments like so:

x = @par begin
    a = f(1,2)
    b = g(a,3)
    h(a,b)
end

x will hold the Thunk representing h(a,b); additionally, a and b will be defined in the same local scope and will be equally accessible for later calls.

Options to the Thunk can be set as opts with namedtuple syntax, e.g. single=1. Multiple options may be provided, and will be applied to all generated thunks.

Dagger.@spawnMacro
Dagger.@spawn [option=value]... f(args...; kwargs...) -> DTask

Spawns a Dagger DTask that will call f(args...; kwargs...). This DTask is like a Julia Task, and has many similarities:

  • The DTask can be wait'd on and fetch'd from to see its final result
  • By default, the DTask will be automatically run on the first available compute resource
  • If all dependencies are satisfied, the DTask will be run as soon as possible
  • The DTask may be run in parallel with other DTasks, and the scheduler will automatically manage dependencies
  • If a DTask throws an exception, it will be propagated to any calls to fetch, but not to calls to wait

However, the DTask also has many key differences from a Task:

  • The DTask may run on any thread of any Julia process, and even on a remote machine, in your cluster (see Distributed.addprocs)
  • The DTask might automatically utilize GPUs or other accelerators, if available
  • If arguments to a DTask are also DTasks, then the scheduler will execute those arguments' DTasks first, before running the "downstream" task
  • If an argument to a DTask t2 is a DTask t1, then the result of t1 (gotten via fetch(t1)) will be passed to t2 (no need for t2 to call fetch!)
  • DTasks are generally expected to be defined "functionally", meaning that they should not mutate global state, mutate their arguments, or have side effects
  • DTasks are function call-focused, meaning that Dagger.@spawn expects a single function call, and not a block of code
  • All DTask arguments are expected to be safe to serialize and send to other Julia processes; if not, use the scope option or Dagger.@mutable to control execution location

Options to the DTask can be set before the call to f with key-value syntax, e.g. Dagger.@spawn myopt=2 do_something(1, 3.0), which would set the option myopt to 2 for this task. Multiple options may be provided, which are specified like Dagger.@spawn myopt=2 otheropt=4 do_something(1, 3.0).

These options control a variety of properties of the resulting DTask:

  • scope: The execution "scope" of the task, which determines where the task will run. By default, the task will run on the first available compute resource. If you have multiple compute resources, you can specify a scope to run the task on a specific resource. For example, Dagger.@spawn scope=Dagger.scope(worker=2) do_something(1, 3.0) would run do_something(1, 3.0) on worker 2.
  • meta: If true, instead of the scheduler automatically fetching values from other tasks, the raw Chunk objects will be passed to f. Useful for doing manual fetching or manipulation of Chunk references. Non-Chunk arguments are still passed as-is.

Other options exist; see Dagger.Sch.ThunkOptions for the full list.

This macro is a semi-thin wrapper around Dagger.spawn - it creates a call to Dagger.spawn on f with arguments args and keyword arguments kwargs, and also passes along any options in an Options struct. For example, Dagger.@spawn myopt=2 do_something(1, 3.0) would essentially become Dagger.spawn(do_something, Dagger.Options(;myopt=2), 1, 3.0).

Dagger.@unimplementedMacro
@unimplemented fname(<args...>)

While it is nice to define generic function ad-hoc, it can sometimes get confusing to figure out which method is missing. @interface creates a function which errors out pointing which method is missing.

Dagger.Sch.TASK_SYNCConstant

Process-local condition variable (and lock) indicating task completion.

Dagger.Sch.ComputeStateType
ComputeState

The internal state-holding struct of the scheduler.

Fields:

  • uid::UInt64 - Unique identifier for this scheduler instance
  • waiting::OneToMany - Map from downstream Thunk to upstream Thunks that still need to execute
  • waiting_data::Dict{Union{Thunk,Chunk},Set{Thunk}} - Map from input Chunk/upstream Thunk to all unfinished downstream Thunks, to retain caches
  • ready::Vector{Thunk} - The list of Thunks that are ready to execute
  • cache::WeakKeyDict{Thunk, Any} - Maps from a finished Thunk to it's cached result, often a DRef
  • valid::WeakKeyDict{Thunk, Nothing} - Tracks all Thunks that are in a valid scheduling state
  • running::Set{Thunk} - The set of currently-running Thunks
  • running_on::Dict{Thunk,OSProc} - Map from Thunk to the OS process executing it
  • thunk_dict::Dict{Int, WeakThunk} - Maps from thunk IDs to a Thunk
  • node_order::Any - Function that returns the order of a thunk
  • worker_time_pressure::Dict{Int,Dict{Processor,UInt64}} - Maps from worker ID to processor pressure
  • worker_storage_pressure::Dict{Int,Dict{Union{StorageResource,Nothing},UInt64}} - Maps from worker ID to storage resource pressure
  • worker_storage_capacity::Dict{Int,Dict{Union{StorageResource,Nothing},UInt64}} - Maps from worker ID to storage resource capacity
  • worker_loadavg::Dict{Int,NTuple{3,Float64}} - Worker load average
  • worker_chans::Dict{Int, Tuple{RemoteChannel,RemoteChannel}} - Communication channels between the scheduler and each worker
  • procs_cache_list::Base.RefValue{Union{ProcessorCacheEntry,Nothing}} - Cached linked list of processors ready to be used
  • signature_time_cost::Dict{Signature,UInt64} - Cache of estimated CPU time (in nanoseconds) required to compute calls with the given signature
  • signature_alloc_cost::Dict{Signature,UInt64} - Cache of estimated CPU RAM (in bytes) required to compute calls with the given signature
  • transfer_rate::Ref{UInt64} - Estimate of the network transfer rate in bytes per second
  • halt::Base.Event - Event indicating that the scheduler is halting
  • lock::ReentrantLock - Lock around operations which modify the state
  • futures::Dict{Thunk, Vector{ThunkFuture}} - Futures registered for waiting on the result of a thunk.
  • errored::WeakKeyDict{Thunk,Bool} - Indicates if a thunk's result is an error.
  • thunks_to_delete::Set{Thunk} - The list of Thunks ready to be deleted upon completion.
  • chan::RemoteChannel{Channel{Any}} - Channel for receiving completed thunks.
Dagger.Sch.SchedulerOptionsType
SchedulerOptions

Stores DAG-global options to be passed to the Dagger.Sch scheduler.

Arguments

  • single::Int=0: (Deprecated) Force all work onto worker with specified id. 0 disables this option.
  • proclist=nothing: (Deprecated) Force scheduler to use one or more processors that are instances/subtypes of a contained type. Alternatively, a function can be supplied, and the function will be called with a processor as the sole argument and should return a Bool result to indicate whether or not to use the given processor. nothing enables all default processors.
  • allow_errors::Bool=true: Allow thunks to error without affecting non-dependent thunks.
  • checkpoint=nothing: If not nothing, uses the provided function to save the final result of the current scheduler invocation to persistent storage, for later retrieval by restore.
  • restore=nothing: If not nothing, uses the provided function to return the (cached) final result of the current scheduler invocation, were it to execute. If this returns a Chunk, all thunks will be skipped, and the Chunk will be returned. If nothing is returned, restoring is skipped, and the scheduler will execute as usual. If this function throws an error, restoring will be skipped, and the error will be displayed.
Dagger.Sch.ThunkIDType

Identifies a thunk by its ID, and preserves the thunk in the scheduler.

Dagger.Sch.ThunkOptionsType
ThunkOptions

Stores Thunk-local options to be passed to the Dagger.Sch scheduler.

Arguments

  • single::Int=0: (Deprecated) Force thunk onto worker with specified id. 0 disables this option.
  • proclist=nothing: (Deprecated) Force thunk to use one or more processors that are instances/subtypes of a contained type. Alternatively, a function can be supplied, and the function will be called with a processor as the sole argument and should return a Bool result to indicate whether or not to use the given processor. nothing enables all default processors.
  • time_util::Dict{Type,Any}: Indicates the maximum expected time utilization for this thunk. Each keypair maps a processor type to the utilization, where the value can be a real (approximately the number of nanoseconds taken), or MaxUtilization() (utilizes all processors of this type). By default, the scheduler assumes that this thunk only uses one processor.
  • alloc_util::Dict{Type,UInt64}: Indicates the maximum expected memory utilization for this thunk. Each keypair maps a processor type to the utilization, where the value is an integer representing approximately the maximum number of bytes allocated at any one time.
  • occupancy::Dict{Type,Real}: Indicates the maximum expected processor occupancy for this thunk. Each keypair maps a processor type to the utilization, where the value can be a real between 0 and 1 (the occupancy ratio, where 1 is full occupancy). By default, the scheduler assumes that this thunk has full occupancy.
  • allow_errors::Bool=true: Allow this thunk to error without affecting non-dependent thunks.
  • checkpoint=nothing: If not nothing, uses the provided function to save the result of the thunk to persistent storage, for later retrieval by restore.
  • restore=nothing: If not nothing, uses the provided function to return the (cached) result of this thunk, were it to execute. If this returns a Chunk, this thunk will be skipped, and its result will be set to the Chunk. If nothing is returned, restoring is skipped, and the thunk will execute as usual. If this function throws an error, restoring will be skipped, and the error will be displayed.
  • storage::Union{Chunk,Nothing}=nothing: If not nothing, references a MemPool.StorageDevice which will be passed to MemPool.poolset internally when constructing Chunks (such as when constructing the return value). The device must support MemPool.CPURAMResource. When nothing, uses MemPool.GLOBAL_DEVICE[].
  • storage_root_tag::Any=nothing: If not nothing, specifies the MemPool storage leaf tag to associate with the thunk's result. This tag can be used by MemPool's storage devices to manipulate their behavior, such as the file name used to store data on disk."
  • storage_leaf_tag::MemPool.Tag,Nothing}=nothing: If not nothing, specifies the MemPool storage leaf tag to associate with the thunk's result. This tag can be used by MemPool's storage devices to manipulate their behavior, such as the file name used to store data on disk."
  • storage_retain::Bool=false: The value of retain to pass to MemPool.poolset when constructing the result Chunk.
Base.fetchMethod

Waits on a thunk to complete, and fetches its result.

Base.mergeMethod
Base.merge(sopts::SchedulerOptions, topts::ThunkOptions) -> ThunkOptions

Combine SchedulerOptions and ThunkOptions into a new ThunkOptions.

Base.waitMethod

Waits on a thunk to complete.

Dagger.Sch.cleanup_syncdeps!Method

Cleans up any syncdeps that aren't needed any longer, and returns a Set{Chunk} of all chunks that can now be evicted from workers.

Dagger.Sch.do_taskMethod
do_task(to_proc, task_desc) -> Any

Executes a single task specified by task_desc on to_proc.

Dagger.Sch.do_tasksMethod
do_tasks(to_proc, return_queue, tasks)

Executes a batch of tasks on to_proc, returning their results through return_queue.

Dagger.Sch.estimate_task_costsMethod

Estimates the cost of scheduling task on each processor in procs. Considers current estimated per-processor compute pressure, and transfer costs for each Chunk argument to task. Returns (procs, costs), with procs sorted in order of ascending cost.

Dagger.Sch.exec!Method

Executes an arbitrary function within the scheduler, returning the result.

Dagger.Sch.get_dag_idsMethod

Returns all Thunks IDs as a Dict, mapping a Thunk to its downstream dependents.

Dagger.Sch.halt!Method

Commands the scheduler to halt execution immediately.

Dagger.Sch.handle_faultMethod
handle_fault(...)

An internal function to handle a worker dying or being killed by the OS. Attempts to determine which Thunks were running on (or had their results cached on) the dead worker, and stores them in a "deadlist". It uses this deadlist to correct the scheduler's internal ComputeState struct to recover from the fault.

Note: The logic for this functionality is not currently perfectly robust to all failure modes, and is only really intended as a last-ditch attempt to repair and continue executing. While it should never cause incorrect execution of DAGs, it may cause a KeyError or other failures in the scheduler due to the complexity of getting the internal state back to a consistent and proper state.

Dagger.Sch.impute_sumMethod

Like sum, but replaces nothing entries with the average of non-nothing entries.

Dagger.Sch.monitor_procs_changed!Method

Monitors for workers being added/removed to/from ctx, sets up or tears down per-worker state, and notifies the scheduler so that work can be reassigned.

Dagger.Sch.populate_defaultsMethod
populate_defaults(opts::ThunkOptions, Tf, Targs) -> ThunkOptions

Returns a ThunkOptions with default values filled in for a function of type Tf with argument types Targs, if the option was previously unspecified in opts.

Dagger.Sch.register_future!Function

Waits on a thunk to complete, and fetches its result. If check is set to true (the default), then a domination check will occur to ensure that the future isn't being registered on a thunk dominated by the calling thunk.

Dagger.Sch.thunk_yieldMethod

Allows a thunk to safely wait on another thunk by temporarily reducing its effective occupancy to 0, which allows a newly-spawned task to run.

Dagger.Sch.walk_dataMethod
walk_data(f, x)

Walks the data contained in x in DFS fashion, and executes f at each object that hasn't yet been seen.