ConcurrentUtilities.Pools.PoolType
Pool{T}(limit::Int=4096)
Pool{K, T}(limit::Int=4096)

A threadsafe object for managing a pool of objects of type T, optionally keyed by objects of type K.

Objects can be requested by calling acquire(f, pool, [key]), where f is a function that returns a new object of type T. The key argument is optional and can be used to lookup objects that match a certain criteria (a Dict is used internally, so matching is isequal).

The limit argument will limit the number of objects that can be in use at any given time. If the limit has been reached, acquire will block until an object is released via release.

  • release(pool, obj) will return the object to the pool for reuse.
  • release(pool) will decrement the number in use but not return any object for reuse.
  • drain! can be used to remove objects that have been returned to the pool for reuse; it does not release any objects that are in use.

See also acquire, release, Pools.limit, Pools.in_use, Pools.in_pool, drain!. The key and object types can be inspected with keytype and valtype respectively.

Base.acquireMethod
acquire(f, pool::Pool{K, T}, [key::K]; forcenew::Bool=false, isvalid::Function) -> T

Get an object from a pool, optionally keyed by the provided key. The provided function f must create a new object instance of type T. Each acquire call MUST be matched by exactly one release call. The forcenew keyword argument can be used to force the creation of a new object, ignoring any existing objects in the pool. The isvalid keyword argument can be used to specify a function that will be called to determine if an object is still valid for reuse. By default, all objects are considered valid. If there are no objects available for reuse, f will be called to create a new object. If the pool is already at its usage limit, acquire will block until an object is returned to the pool via release.

Base.keytypeMethod
keytype(::Pool)

Return the type of the keys for the pool. If the pool is not keyed, this will return Nothing.

Base.releaseMethod
release(pool::Pool{K, T}, key::K, obj::Union{T, Nothing}=nothing)
release(pool::Pool{K, T}, obj::T)
release(pool::Pool{K, T})

Release an object from usage by a pool, optionally keyed by the provided key. If obj is provided, it will be returned to the pool for reuse. Otherwise, if nothing is returned, or release(pool) is called, the usage count will be decremented without an object being returned to the pool for reuse.

Base.valtypeMethod
valtype(::Pool)

Return the type of the objects that can be stored in the pool.

ConcurrentUtilities.Pools.in_useMethod
Pools.in_use(pool::Pool) -> Int

Return the number of objects currently in use. Less than or equal to Pools.limit(pool).

ConcurrentUtilities.Pools.limitMethod
Pools.limit(pool::Pool) -> Int

Return the maximum number of objects permitted to be in use at the same time. See Pools.in_use(pool) for the number of objects currently in use.

ConcurrentUtilities.LockableType

Lockable(value, lock = ReentrantLock())

Creates a Lockable object that wraps value and associates it with the provided lock.

ConcurrentUtilities.OrderedSynchronizerType
OrderedSynchronizer(i=1)

A threadsafe synchronizer that allows ensuring concurrent work is done in a specific order. The OrderedSynchronizer is initialized with an integer i that represents the current "order" of the synchronizer.

Work is "scheduled" by calling put!(f, x, i), where f is a function that will be called like f() when the synchronizer is at order i, and will otherwise wait until other calls to put! have finished to bring the synchronizer's state to i. Once f() is called, the synchronizer's state is incremented by 1 and any waiting put! calls check to see if it's their turn to execute.

A synchronizer's state can be reset to a specific value (1 by default) by calling reset!(x, i).

ConcurrentUtilities.TimedOutType
TimedOut

Helper object passed to user-provided f in try_with_timeout that allows checking if the calling context reached a time out. Call x[], which returns a Bool, to check if the timeout was reached.

Base.lockMethod

lock(f::Function, l::Lockable)

Acquire the lock associated with l, execute f with the lock held, and release the lock when f returns. f will receive one positional argument: the value wrapped by l. If the lock is already locked by a different task/thread, wait for it to become available. When this function returns, the lock has been released, so the caller should not attempt to unlock it.

Base.put!Function
put!(f::Function, x::OrderedSynchronizer, i::Int, incr::Int=1)

Schedule f to be called when x is at order i. Note that put! will block until f is executed. The typical usage involves something like:

x = OrderedSynchronizer()
@sync for i = 1:N
    Threads.@spawn begin
        # do some concurrent work
        # once work is done, schedule synchronization
        put!(x, $i) do
            # report back result of concurrent work
            # won't be executed until all `i-1` calls to `put!` have already finished
        end
    end
end

The incr argument controls how much the synchronizer's state is incremented after f is called. By default, incr is 1.

ConcurrentUtilities.initFunction

ConcurrentUtilities.init(nworkers=Threads.nthreads() - 1)

Initialize background workers that will execute tasks spawned via ConcurrentUtilities.@spawn. If nworkers == 1, a single worker will be started on thread 1 where tasks will be executed in contention with other thread 1 work. Background worker tasks can be inspected by looking at ConcurrentUtilities.WORKER_TASKS.

ConcurrentUtilities.try_with_timeoutMethod
try_with_timeout(f, timeout, T=Any) -> T

Run f in a new task, and return its result. If f does not complete within timeout seconds, throw a TimeoutException. If f throws an exception, rethrow it. If f completes successfully, return its result. f should be of the form f(x::TimedOut), where x is a TimedOut object. This allows the calling function to check whether the timeout has been reached by checking x[] and if true, the timeout was reached and the function can cancel/abort gracefully. The 3rd argument T is optional (default Any) and allows passing an expected return type that f should return; this allows avoiding a dynamic dispatch from non-inferability of using try_with_timeout with f.

Examples

julia> try_with_timeout(_ -> 1, 1)
1

julia> try_with_timeout(_ -> sleep(3), 1)
ERROR: TimeoutException: try_with_timeout timed out after 1.0 seconds
Stacktrace:
 [1] try_with_timeout(::var"#1#2", ::Int64) at ./REPL[1]:1
 [2] top-level scope at REPL[2]:1

julia> try_with_timeout(_ -> error("hey"), 1)
ERROR: hey
Stacktrace:
 [1] error(::String) at ./error.jl:33
 [2] (::var"#1#2")(::TimedOut{Any}) at ./REPL[1]:1
 [3] try_with_timeout(::var"#1#2", ::Int64) at ./REPL[1]:1
 [4] top-level scope at REPL[3]:1

julia> try_with_timeout(_ -> 1, 1, Int)
1

# usage with `TimedOut`
julia> try_with_timeout(1) do timedout
    while !timedout[]
        # do iterative computation that may take too long
    end
end

julia> try_with_timeout(1) do timedout
    sleep(3)
    timedout[] && abort_gracefully()
end
ConcurrentUtilities.@spawnMacro

ConcurrentUtilities.@spawn expr ConcurrentUtilities.@spawn passthroughstorage::Bool expr

Similar to Threads.@spawn, schedule and execute a task (given by expr) that will be run on a "background worker" (see ConcurrentUtilities.init).

In the 2-argument invocation, passthroughstorage controls whether the task-local storage of the current_task() should be "passed through" to the spawned task.

ConcurrentUtilities.@spawnMacro

ConcurrentUtilities.@spawn expr ConcurrentUtilities.@spawn passthroughstorage::Bool expr

Similar to Threads.@spawn, schedule and execute a task (given by expr) that will be run on a "background worker" (see ConcurrentUtilities.init).

In the 2-argument invocation, passthroughstorage controls whether the task-local storage of the current_task() should be "passed through" to the spawned task.

ConcurrentUtilities.@wkspawnMacro
@wkspawn [:default|:interactive] expr

Create a Task and schedule it to run on any available thread in the specified threadpool (:default if unspecified). The task is allocated to a thread once one becomes available. To wait for the task to finish, call wait on the result of this macro, or call fetch to wait and then obtain its return value.

Values can be interpolated into @wkspawn via $, which copies the value directly into the constructed underlying closure. This allows you to insert the value of a variable, isolating the asynchronous code from changes to the variable's value in the current task.