ConcurrentUtilities.Pools.Pool
— TypePool{T}(limit::Int=4096)
Pool{K, T}(limit::Int=4096)
A threadsafe object for managing a pool of objects of type T
, optionally keyed by objects of type K
.
Objects can be requested by calling acquire(f, pool, [key])
, where f
is a function that returns a new object of type T
. The key
argument is optional and can be used to lookup objects that match a certain criteria (a Dict
is used internally, so matching is isequal
).
The limit
argument will limit the number of objects that can be in use at any given time. If the limit has been reached, acquire
will block until an object is released via release
.
release(pool, obj)
will return the object to the pool for reuse.release(pool)
will decrement the number in use but not return any object for reuse.drain!
can be used to remove objects that have been returned to the pool for reuse; it does not release any objects that are in use.
See also acquire
, release
, Pools.limit
, Pools.in_use
, Pools.in_pool
, drain!
. The key and object types can be inspected with keytype
and valtype
respectively.
Base.acquire
— Methodacquire(f, pool::Pool{K, T}, [key::K]; forcenew::Bool=false, isvalid::Function) -> T
Get an object from a pool
, optionally keyed by the provided key
. The provided function f
must create a new object instance of type T
. Each acquire
call MUST be matched by exactly one release
call. The forcenew
keyword argument can be used to force the creation of a new object, ignoring any existing objects in the pool. The isvalid
keyword argument can be used to specify a function that will be called to determine if an object is still valid for reuse. By default, all objects are considered valid. If there are no objects available for reuse, f
will be called to create a new object. If the pool is already at its usage limit, acquire
will block until an object is returned to the pool via release
.
Base.keytype
— Methodkeytype(::Pool)
Return the type of the keys for the pool. If the pool is not keyed, this will return Nothing
.
Base.release
— Methodrelease(pool::Pool{K, T}, key::K, obj::Union{T, Nothing}=nothing)
release(pool::Pool{K, T}, obj::T)
release(pool::Pool{K, T})
Release an object from usage by a pool
, optionally keyed by the provided key
. If obj
is provided, it will be returned to the pool for reuse. Otherwise, if nothing
is returned, or release(pool)
is called, the usage count will be decremented without an object being returned to the pool for reuse.
Base.valtype
— Methodvaltype(::Pool)
Return the type of the objects that can be stored in the pool.
ConcurrentUtilities.Pools.drain!
— Methoddrain!(pool)
Remove all objects from the pool for reuse, but do not release any active acquires.
ConcurrentUtilities.Pools.in_pool
— MethodPools.in_pool(pool::Pool) -> Int
Return the number of objects in the pool available for reuse.
ConcurrentUtilities.Pools.in_use
— MethodPools.in_use(pool::Pool) -> Int
Return the number of objects currently in use. Less than or equal to Pools.limit(pool)
.
ConcurrentUtilities.Pools.limit
— MethodPools.limit(pool::Pool) -> Int
Return the maximum number of objects permitted to be in use at the same time. See Pools.in_use(pool)
for the number of objects currently in use.
ConcurrentUtilities.Lockable
— TypeLockable(value, lock = ReentrantLock())
Creates a Lockable
object that wraps value
and associates it with the provided lock
.
ConcurrentUtilities.OrderedSynchronizer
— TypeOrderedSynchronizer(i=1)
A threadsafe synchronizer that allows ensuring concurrent work is done in a specific order. The OrderedSynchronizer
is initialized with an integer i
that represents the current "order" of the synchronizer.
Work is "scheduled" by calling put!(f, x, i)
, where f
is a function that will be called like f()
when the synchronizer is at order i
, and will otherwise wait until other calls to put!
have finished to bring the synchronizer's state to i
. Once f()
is called, the synchronizer's state is incremented by 1 and any waiting put!
calls check to see if it's their turn to execute.
A synchronizer's state can be reset to a specific value (1 by default) by calling reset!(x, i)
.
ConcurrentUtilities.TimedOut
— TypeTimedOut
Helper object passed to user-provided f
in try_with_timeout
that allows checking if the calling context reached a time out. Call x[]
, which returns a Bool
, to check if the timeout was reached.
ConcurrentUtilities.TimeoutException
— TypeTimeoutException
Thrown from try_with_timeout
when the timeout is reached.
Base.lock
— Methodlock(f::Function, l::Lockable)
Acquire the lock associated with l
, execute f
with the lock held, and release the lock when f
returns. f
will receive one positional argument: the value wrapped by l
. If the lock is already locked by a different task/thread, wait for it to become available. When this function returns, the lock
has been released, so the caller should not attempt to unlock
it.
Base.put!
— Functionput!(f::Function, x::OrderedSynchronizer, i::Int, incr::Int=1)
Schedule f
to be called when x
is at order i
. Note that put!
will block until f
is executed. The typical usage involves something like:
x = OrderedSynchronizer()
@sync for i = 1:N
Threads.@spawn begin
# do some concurrent work
# once work is done, schedule synchronization
put!(x, $i) do
# report back result of concurrent work
# won't be executed until all `i-1` calls to `put!` have already finished
end
end
end
The incr
argument controls how much the synchronizer's state is incremented after f
is called. By default, incr
is 1.
ConcurrentUtilities.init
— FunctionConcurrentUtilities.init(nworkers=Threads.nthreads() - 1)
Initialize background workers that will execute tasks spawned via ConcurrentUtilities.@spawn
. If nworkers == 1
, a single worker will be started on thread 1 where tasks will be executed in contention with other thread 1 work. Background worker tasks can be inspected by looking at ConcurrentUtilities.WORKER_TASKS
.
ConcurrentUtilities.reset!
— Functionreset!(x::OrderedSynchronizer, i=1)
Reset the state of x
to i
.
ConcurrentUtilities.try_with_timeout
— Methodtry_with_timeout(f, timeout, T=Any) -> T
Run f
in a new task, and return its result. If f
does not complete within timeout
seconds, throw a TimeoutException
. If f
throws an exception, rethrow it. If f
completes successfully, return its result. f
should be of the form f(x::TimedOut)
, where x
is a TimedOut
object. This allows the calling function to check whether the timeout has been reached by checking x[]
and if true
, the timeout was reached and the function can cancel/abort gracefully. The 3rd argument T
is optional (default Any
) and allows passing an expected return type that f
should return; this allows avoiding a dynamic dispatch from non-inferability of using try_with_timeout
with f
.
Examples
julia> try_with_timeout(_ -> 1, 1)
1
julia> try_with_timeout(_ -> sleep(3), 1)
ERROR: TimeoutException: try_with_timeout timed out after 1.0 seconds
Stacktrace:
[1] try_with_timeout(::var"#1#2", ::Int64) at ./REPL[1]:1
[2] top-level scope at REPL[2]:1
julia> try_with_timeout(_ -> error("hey"), 1)
ERROR: hey
Stacktrace:
[1] error(::String) at ./error.jl:33
[2] (::var"#1#2")(::TimedOut{Any}) at ./REPL[1]:1
[3] try_with_timeout(::var"#1#2", ::Int64) at ./REPL[1]:1
[4] top-level scope at REPL[3]:1
julia> try_with_timeout(_ -> 1, 1, Int)
1
# usage with `TimedOut`
julia> try_with_timeout(1) do timedout
while !timedout[]
# do iterative computation that may take too long
end
end
julia> try_with_timeout(1) do timedout
sleep(3)
timedout[] && abort_gracefully()
end
ConcurrentUtilities.@spawn
— MacroConcurrentUtilities.@spawn expr ConcurrentUtilities.@spawn passthroughstorage::Bool expr
Similar to Threads.@spawn
, schedule and execute a task (given by expr
) that will be run on a "background worker" (see ConcurrentUtilities.init
).
In the 2-argument invocation, passthroughstorage
controls whether the task-local storage of the current_task()
should be "passed through" to the spawned task.
ConcurrentUtilities.@spawn
— MacroConcurrentUtilities.@spawn expr ConcurrentUtilities.@spawn passthroughstorage::Bool expr
Similar to Threads.@spawn
, schedule and execute a task (given by expr
) that will be run on a "background worker" (see ConcurrentUtilities.init
).
In the 2-argument invocation, passthroughstorage
controls whether the task-local storage of the current_task()
should be "passed through" to the spawned task.
ConcurrentUtilities.@wkspawn
— Macro@wkspawn [:default|:interactive] expr
Create a Task
and schedule
it to run on any available thread in the specified threadpool (:default
if unspecified). The task is allocated to a thread once one becomes available. To wait for the task to finish, call wait
on the result of this macro, or call fetch
to wait and then obtain its return value.
Values can be interpolated into @wkspawn
via $
, which copies the value directly into the constructed underlying closure. This allows you to insert the value of a variable, isolating the asynchronous code from changes to the variable's value in the current task.