ConcurrentCollections.Implementations.IndirectConcurrentRingQueueNode
— TypeIndirectConcurrentRingQueueNode{T}
Concurrent Ring Queue (CRQ) node which stores 64 bit slot in the .ring
. Each slot contains a 32 bit value that locates the actual value in the thread-local .buffers
; hence indirect.
This approach wastes memory but it is (more or less) implementable without atomics support of boxed element in array.
TODO: Create a 128 bit slot "direct" version that directly points to a boxed Julia value.
ConcurrentCollections.Implementations.Moved
— TypeMoved{Value}
A "tag" used for making that this slot is being moved to a new slots
vector.
ConcurrentCollections.Implementations.clusters
— Methodclusters(dict) -> ranges
clusters(slots) -> ranges
Compute clusters in the slots. Used for performance debugging.
using ConcurrentCollections
using ConcurrentCollections.Implementations: clusters
d = ConcurrentDict(1:1000 .=> 0)
cs = clusters(d.slots)
using UnicodePlots
histogram(map(length, cs))
using StatsBase
describe(map(length, cs))
ConcurrentCollections.Implementations.denqueue!
— Methoddenqueue!(crq::IMPCRQ{T}, x::T) -> MPCRQ_CLOSED or MPCRQ_ENQUEUED or y::Waiter
denqueue!(crq::IMPCRQ{T}, x::Waiter{T}) -> MPCRQ_CLOSED or MPCRQ_ENQUEUED or y::Some{T}
where MPCRQ = IndirectMultiPolarityConcurrentRingQueueNode
ConcurrentCollections.Implementations.denqueue!
— Methoddenqueue!(lcrq::DualLCRQ{T}, x::T) -> MPCRQ_ENQUEUED or y::Waiter
denqueue!(lcrq::DualLCRQ{T}, x::Waiter{T}) -> MPCRQ_ENQUEUED or y::Some{T}
where DualLCRQ = DualLinkedConcurrentRingQueue
ConcurrentCollections.Implementations.expand_parallel_basecase!
— Methodexpand_parallel_basecase!(...) -> (nadded, seen_empty)
Process all clusters started within start0:(start0 + basesize)
(mod length(slots)
).
That is to say, try to process start0:(start0 + basesize)
but make sure to avoid stepping into other base cases by using an empty slot to delimite the base case boundaries. This uses the property of linear probing dictionary that non-overlapping clusters (i.e., continuous regions of slots that are non-empty) are mapped to non-overlapping regions when the slots
array is doubled in size.
More precisely:
- Process all clusters started within
start0:(start0 + basesize - 1)
. - If at least one cluster is processed, process the cluster that contains the start position of the next chunk
start0 + basesize
(modlength(slots)
).
Unlike the original trick mentioned in Maier et al. (2019), there is a difference due to that DictSlot
is reused in the newslots
. Since the slot is re-used, we can't use it as the reliable marker for delimiting the clusters. Other tasks can empty a slot at any moment. So, instead, each basecase sets its own promise firstempties[ichunk]::Promise{Int}
to the index of the first empty slot that it sees and successfully CAS'ed to Moved{Empty}
. If no empty slot is observed, the promise is set to -1. When the basecase reaches the end of chunk (start0 + basesize
), it confirms the end of the cluster it owns by finding out the first valid firstempties[mod1((ichunk+i),end)]
with the smallest i > 0
. If consecutive basecase tasks are started at the same time, by the time a basecase task needs the promise, it is likely already is ready since that's the first thing that the other task does.
This trick is mentioned in:
Maier, Tobias, Peter Sanders, and Roman Dementiev. “Concurrent Hash Tables: Fast and General(?)!” ACM Transactions on Parallel Computing 5, no. 4 (February 22, 2019): 16:1–16:32. https://doi.org/10.1145/3309206.
ConcurrentCollections.Implementations.is_pointerfree_type
— Methodis_pointerfree_type(T::Type) :: Bool
Return true
if any instances of T
do not contain boxed Julia objects.
ConcurrentCollections.Implementations.migrate_serial_nofill!
— MethodMigrate DictSlots
entries to slots
to newslots
.
ConcurrentCollections.ConcurrentCollections
— ModuleConcurrentCollections
ConcurrentCollections.jl provides the following concurrent collections for Julia ≥ 1.7. Most of their operations are (almost) lock-free whenever appropriate.
DualLinkedConcurrentRingQueue
DualLinkedQueue
LinkedConcurrentRingQueue
ConcurrentQueue
ConcurrentStack
WorkStealingDeque
ConcurrentDict
NOTE: If you are trying to find a way to improve performance (especially the throughput) of your program, it is highly recommended to look for ways to avoid using concurrent collections first. In particular, consider applying the data-parallel pattern to dodge the difficulties in concurrent programming. For example, it is often a better idea to use task-local non-thread-safe Dict
s instead of a ConcurrentDict
shared across tasks. One of the most important techniques in data-parallel programming is how to merge such task-local states. For more information, see, e.g., Efficient and safe approaches to mutation in data parallelism.
ConcurrentCollections.ConcurrentDict
— TypeConcurrentDict{K,V}()
Concurrent dictionary. All operations are lock-free except when the dictionary is resized.
Although tasks wait
on concurrent modifications (e.g., setindex!
) during resize, the worker threads participate in the resize to avoid wasting CPU resources.
Examples
julia> using ConcurrentCollections
julia> dict = ConcurrentDict{String,Int}();
julia> dict["hello"] = 1;
julia> dict["hello"]
1
ConcurrentCollections.Delete
— TypeDelete(ans)
A special type used in modify!
to indicate that a slot should be removed.
That is to say
y = modify!(dict, key) do value
Delete(f(something(value)))
end
y[]
is an optimization of
r = Ref{Any}()
modify!(dict, key) do value
r[] = f(something(value))
nothing
end
r[]
ConcurrentCollections.Implementations.ConcurrentQueue
— TypeConcurrentQueue{T}()
Concurrent queue of objects of type T
.
Use push!
to insert an element at the tail and maybepopfirst!
to retrieve and remove an element at the head.
Implementation detail: It implements the Michael and Scott queue.
Examples
julia> using ConcurrentCollections
julia> queue = ConcurrentQueue{Int}();
julia> push!(queue, 1);
julia> push!(queue, 2);
julia> popfirst!(queue)
1
julia> maybepopfirst!(queue)
Some(2)
julia> maybepopfirst!(queue) # returns nothing
ConcurrentCollections.Implementations.ConcurrentStack
— TypeConcurrentStack{T}()
Concurrent stack of objects of type T
.
Use push!
to insert an element and maybepop!
to retrieve and remove an element.
It implements the Treiber stack.
Examples
julia> using ConcurrentCollections
julia> stack = ConcurrentStack{Int}();
julia> push!(stack, 1);
julia> push!(stack, 2);
julia> pop!(stack)
2
julia> maybepop!(stack)
Some(1)
julia> maybepop!(stack) # returns nothing
ConcurrentCollections.Implementations.DualLinkedConcurrentRingQueue
— TypeDualLinkedConcurrentRingQueue{T}()
A concurrent queue with "almost" nonblocking push!
and popfirst!
. Calling popfirst!
on an empty queue waits for a push!
in another task.
See also: LinkedConcurrentRingQueue
, DualLinkedQueue
Examples
julia> using ConcurrentCollections
julia> q = DualLinkedConcurrentRingQueue{Int}();
julia> push!(q, 111);
julia> push!(q, 222);
julia> popfirst!(q) # first-in first-out
111
julia> popfirst!(q)
222
Extended help
Since popfirst!
blocks when called on an empty queue, a DualLinkedConcurrentRingQueue
acts almost like an unbounded Base.Channel
. However, DualLinkedConcurrentRingQueue
does not support close
or blocking on push!
when exceeding a bound.
DualLinkedConcurrentRingQueue
performs very well compared to other concurrent queue implementations. However, since it is based on linked fixed-size buffers, it has relatively large memory overhead.
DualLinkedConcurrentRingQueue
is based on the linked multi-polarity dual ring queue by Izraelevitz and Scott (2017):
Izraelevitz, Joseph, and Michael L. Scott. “Generality and Speed in Nonblocking Dual Containers.” ACM Transactions on Parallel Computing 3, no. 4 (March 23, 2017): 22:1–22:37. https://doi.org/10.1145/3040220.
ConcurrentCollections.Implementations.DualLinkedQueue
— TypeDualLinkedQueue{T}()
A concurrent queue with nonblocking push!
and popfirst!
. Calling popfirst!
on an empty queue waits for a push!
in another task.
DualLinkedConcurrentRingQueue
provides a faster dual queue with a larger memory footprint.
Examples
julia> using ConcurrentCollections
julia> q = DualLinkedQueue{Int}();
julia> push!(q, 111);
julia> push!(q, 222);
julia> popfirst!(q) # first-in first-out
111
julia> popfirst!(q)
222
Extended help
Since popfirst!
blocks when called on an empty queue, a DualLinkedQueue
acts almost like an unbounded Base.Channel
. However, DualLinkedQueue
does not support close
or blocking on push!
when exceeding a bound.
DualLinkedQueue
implements the dual queue by Scherer and Scott (2004):
Scherer, William N., and Michael L. Scott. “Nonblocking Concurrent Data Structures with Condition Synchronization.” In Distributed Computing, edited by Rachid Guerraoui, 174–87. Lecture Notes in Computer Science. Berlin, Heidelberg: Springer, 2004. https://doi.org/10.1007/978-3-540-30186-8_13.
ConcurrentCollections.Implementations.LinkedConcurrentRingQueue
— TypeLinkedConcurrentRingQueue{T}()
A concurrent queue with nonblocking push!
and maybepopfirst!
.
See also: DualLinkedConcurrentRingQueue
Examples
julia> using ConcurrentCollections
julia> q = LinkedConcurrentRingQueue{Int}();
julia> push!(q, 111);
julia> push!(q, 222);
julia> maybepopfirst!(q) # first-in first-out
Some(111)
julia> maybepopfirst!(q)
Some(222)
julia> maybepopfirst!(q) === nothing # queue is empty
true
Extended help
LinkedConcurrentRingQueue
is based on Linked Concurrent Ring Queue (or List of Concurrent Ring Queues; LCRQ) by Morrison and Afek (2013):
Morrison, Adam, and Yehuda Afek. “Fast Concurrent Queues for X86 Processors.” In Proceedings of the 18th ACM SIGPLAN Symposium on Principles and Practice of Parallel Programming, 103–112. PPoPP ’13. New York, NY, USA: Association for Computing Machinery, 2013. https://doi.org/10.1145/2442516.2442527. (Revised version: https://www.cs.tau.ac.il/~mad/publications/ppopp2013-x86queues.pdf)
ConcurrentCollections.Implementations.WorkStealingDeque
— TypeWorkStealingDeque{T}()
Concurrent work-stealing "deque" of objects of type T
.
This is not a full deque in the sense that:
push!
andmaybepop!
operating at the tail of the collection can only be executed by a single task.maybepopfirst!
(aka steal) for retrieving and removing an element at the head can be invoked from any tasks. However, there is nopushfirst!
.
Examples
julia> using ConcurrentCollections
julia> deque = WorkStealingDeque{Int}();
julia> push!(deque, 1);
julia> push!(deque, 2);
julia> push!(deque, 3);
julia> maybepop!(deque)
Some(3)
julia> fetch(Threads.@spawn maybepopfirst!(deque))
Some(1)
julia> fetch(Threads.@spawn popfirst!(deque))
2
julia> maybepopfirst!(deque) # returns nothing
Extended help
The current implementation is known to be not fully compliant with the C/C++ memory model (on which Julia's memory model is designed).
It implements the dynamic circular work-stealing deque by Chase and Lev (2005):
Chase, David, and Yossi Lev. “Dynamic Circular Work-Stealing Deque.” In Proceedings of the Seventeenth Annual ACM Symposium on Parallelism in Algorithms and Architectures, 21–28. SPAA ’05. New York, NY, USA: Association for Computing Machinery, 2005. https://doi.org/10.1145/1073970.1073974.
ConcurrentCollections.Keep
— TypeKeep(ans)
A special type used in modify!
to indicate that a slot should be remain unchanged while propagating the result ans
of some computation to the caller.
That is to say,
y = modify!(dict, key) do value
Keep(f(something(value)))
end
y[]
is an optimization of
r = Ref{Any}()
modify!(dict, key) do value
r[] = f(something(value))
Some(value)
end
r[]
ConcurrentCollections.maybeget
— Functionmaybeget(dict::ConcurrentDict{K,V}, key) -> Some(value::T) or nothing
ConcurrentCollections.maybepop!
— Functionmaybepop!(collection) -> Some(value::T) or nothing
Try to pop a value
from the tail of collection
. Return Some(value)
if it is non-empty. Return nothing
if empty.
Examples
julia> using ConcurrentCollections
julia> stack = ConcurrentStack{Int}();
julia> push!(stack, 1);
julia> maybepop!(stack)
Some(1)
julia> maybepop!(stack) # returns nothing
ConcurrentCollections.maybepopfirst!
— Functionmaybepopfirst!(collection) -> Some(value::T) or nothing
Try to pop a value
from the head of collection
. Return Some(value)
if it is non-empty. Return nothing
if empty.
Examples
julia> using ConcurrentCollections
julia> queue = ConcurrentQueue{Int}();
julia> push!(queue, 1);
julia> maybepopfirst!(queue)
Some(1)
julia> maybepopfirst!(queue) # returns nothing
ConcurrentCollections.modify!
— Functionmodify!(f, dict::ConcurrentDict{K,V}, key::K) -> y
Atomically update key
slot of dict
using a function f
.
If key
does not exist, f
is called with nothing
. The call f(nothing)
must return either (1) nothing
to keep the slot unoccupied or (2) Some(value::V)
to insert value
.
If key
exist, f
is called with a ref
such that ref[]
retrieves the value
corresponding to the key
. The call f(ref)
must return either (1) nothing
to delete the slot, (2) Some(value′::V)
to insert value
, (3) Keep(ans)
to return y = Keep(ans)
from modify!
, or (4) Delete(ans)
to delete slot and return a value y = Delete(ans)
from modify!
.
The function f
may be called more than once if multiple tasks try to modify the dictionary.
Examples
julia> using ConcurrentCollections
julia> dict = ConcurrentDict{String,Int}();
julia> modify!(dict, "hello") do _
Some(1)
end
Some(1)
julia> modify!(dict, "hello") do ref
Some(something(ref[]) + 1)
end
Some(2)