ConcurrentCollections.Implementations.IndirectConcurrentRingQueueNodeType
IndirectConcurrentRingQueueNode{T}

Concurrent Ring Queue (CRQ) node which stores 64 bit slot in the .ring. Each slot contains a 32 bit value that locates the actual value in the thread-local .buffers; hence indirect.

This approach wastes memory but it is (more or less) implementable without atomics support of boxed element in array.

TODO: Create a 128 bit slot "direct" version that directly points to a boxed Julia value.

ConcurrentCollections.Implementations.clustersMethod
clusters(dict) -> ranges
clusters(slots) -> ranges

Compute clusters in the slots. Used for performance debugging.

using ConcurrentCollections
using ConcurrentCollections.Implementations: clusters
d = ConcurrentDict(1:1000 .=> 0)
cs = clusters(d.slots)

using UnicodePlots
histogram(map(length, cs))

using StatsBase
describe(map(length, cs))
ConcurrentCollections.Implementations.denqueue!Method
denqueue!(crq::IMPCRQ{T}, x::T) -> MPCRQ_CLOSED or MPCRQ_ENQUEUED or y::Waiter
denqueue!(crq::IMPCRQ{T}, x::Waiter{T}) -> MPCRQ_CLOSED or MPCRQ_ENQUEUED or y::Some{T}
where MPCRQ = IndirectMultiPolarityConcurrentRingQueueNode
ConcurrentCollections.Implementations.denqueue!Method
denqueue!(lcrq::DualLCRQ{T}, x::T) -> MPCRQ_ENQUEUED or y::Waiter
denqueue!(lcrq::DualLCRQ{T}, x::Waiter{T}) -> MPCRQ_ENQUEUED or y::Some{T}
where DualLCRQ = DualLinkedConcurrentRingQueue
ConcurrentCollections.Implementations.expand_parallel_basecase!Method
expand_parallel_basecase!(...) -> (nadded, seen_empty)

Process all clusters started within start0:(start0 + basesize) (mod length(slots)).

That is to say, try to process start0:(start0 + basesize) but make sure to avoid stepping into other base cases by using an empty slot to delimite the base case boundaries. This uses the property of linear probing dictionary that non-overlapping clusters (i.e., continuous regions of slots that are non-empty) are mapped to non-overlapping regions when the slots array is doubled in size.

More precisely:

  1. Process all clusters started within start0:(start0 + basesize - 1).
  2. If at least one cluster is processed, process the cluster that contains the start position of the next chunk start0 + basesize (mod length(slots)).

Unlike the original trick mentioned in Maier et al. (2019), there is a difference due to that DictSlot is reused in the newslots. Since the slot is re-used, we can't use it as the reliable marker for delimiting the clusters. Other tasks can empty a slot at any moment. So, instead, each basecase sets its own promise firstempties[ichunk]::Promise{Int} to the index of the first empty slot that it sees and successfully CAS'ed to Moved{Empty}. If no empty slot is observed, the promise is set to -1. When the basecase reaches the end of chunk (start0 + basesize), it confirms the end of the cluster it owns by finding out the first valid firstempties[mod1((ichunk+i),end)] with the smallest i > 0. If consecutive basecase tasks are started at the same time, by the time a basecase task needs the promise, it is likely already is ready since that's the first thing that the other task does.


This trick is mentioned in:

Maier, Tobias, Peter Sanders, and Roman Dementiev. “Concurrent Hash Tables: Fast and General(?)!” ACM Transactions on Parallel Computing 5, no. 4 (February 22, 2019): 16:1–16:32. https://doi.org/10.1145/3309206.

ConcurrentCollections.ConcurrentCollectionsModule

ConcurrentCollections

Dev

ConcurrentCollections.jl provides the following concurrent collections for Julia ≥ 1.7. Most of their operations are (almost) lock-free whenever appropriate.

NOTE: If you are trying to find a way to improve performance (especially the throughput) of your program, it is highly recommended to look for ways to avoid using concurrent collections first. In particular, consider applying the data-parallel pattern to dodge the difficulties in concurrent programming. For example, it is often a better idea to use task-local non-thread-safe Dicts instead of a ConcurrentDict shared across tasks. One of the most important techniques in data-parallel programming is how to merge such task-local states. For more information, see, e.g., Efficient and safe approaches to mutation in data parallelism.

ConcurrentCollections.ConcurrentDictType
ConcurrentDict{K,V}()

Concurrent dictionary. All operations are lock-free except when the dictionary is resized.

Note

Although tasks wait on concurrent modifications (e.g., setindex!) during resize, the worker threads participate in the resize to avoid wasting CPU resources.

Examples

julia> using ConcurrentCollections

julia> dict = ConcurrentDict{String,Int}();

julia> dict["hello"] = 1;

julia> dict["hello"]
1
ConcurrentCollections.DeleteType
Delete(ans)

A special type used in modify! to indicate that a slot should be removed.

That is to say

y = modify!(dict, key) do value
    Delete(f(something(value)))
end
y[]

is an optimization of

r = Ref{Any}()
modify!(dict, key) do value
    r[] = f(something(value))
    nothing
end
r[]
ConcurrentCollections.Implementations.ConcurrentQueueType
ConcurrentQueue{T}()

Concurrent queue of objects of type T.

Use push! to insert an element at the tail and maybepopfirst! to retrieve and remove an element at the head.

Implementation detail: It implements the Michael and Scott queue.

Examples

julia> using ConcurrentCollections

julia> queue = ConcurrentQueue{Int}();

julia> push!(queue, 1);

julia> push!(queue, 2);

julia> popfirst!(queue)
1

julia> maybepopfirst!(queue)
Some(2)

julia> maybepopfirst!(queue)  # returns nothing
ConcurrentCollections.Implementations.ConcurrentStackType
ConcurrentStack{T}()

Concurrent stack of objects of type T.

Use push! to insert an element and maybepop! to retrieve and remove an element.

It implements the Treiber stack.

Examples

julia> using ConcurrentCollections

julia> stack = ConcurrentStack{Int}();

julia> push!(stack, 1);

julia> push!(stack, 2);

julia> pop!(stack)
2

julia> maybepop!(stack)
Some(1)

julia> maybepop!(stack)  # returns nothing
ConcurrentCollections.Implementations.DualLinkedConcurrentRingQueueType
DualLinkedConcurrentRingQueue{T}()

A concurrent queue with "almost" nonblocking push! and popfirst!. Calling popfirst! on an empty queue waits for a push! in another task.

See also: LinkedConcurrentRingQueue, DualLinkedQueue

Examples

julia> using ConcurrentCollections

julia> q = DualLinkedConcurrentRingQueue{Int}();

julia> push!(q, 111);

julia> push!(q, 222);

julia> popfirst!(q)  # first-in first-out
111

julia> popfirst!(q)
222

Extended help

Since popfirst! blocks when called on an empty queue, a DualLinkedConcurrentRingQueue acts almost like an unbounded Base.Channel. However, DualLinkedConcurrentRingQueue does not support close or blocking on push! when exceeding a bound.

DualLinkedConcurrentRingQueue performs very well compared to other concurrent queue implementations. However, since it is based on linked fixed-size buffers, it has relatively large memory overhead.

DualLinkedConcurrentRingQueue is based on the linked multi-polarity dual ring queue by Izraelevitz and Scott (2017):

Izraelevitz, Joseph, and Michael L. Scott. “Generality and Speed in Nonblocking Dual Containers.” ACM Transactions on Parallel Computing 3, no. 4 (March 23, 2017): 22:1–22:37. https://doi.org/10.1145/3040220.

ConcurrentCollections.Implementations.DualLinkedQueueType
DualLinkedQueue{T}()

A concurrent queue with nonblocking push! and popfirst!. Calling popfirst! on an empty queue waits for a push! in another task.

DualLinkedConcurrentRingQueue provides a faster dual queue with a larger memory footprint.

Examples

julia> using ConcurrentCollections

julia> q = DualLinkedQueue{Int}();

julia> push!(q, 111);

julia> push!(q, 222);

julia> popfirst!(q)  # first-in first-out
111

julia> popfirst!(q)
222

Extended help

Since popfirst! blocks when called on an empty queue, a DualLinkedQueue acts almost like an unbounded Base.Channel. However, DualLinkedQueue does not support close or blocking on push! when exceeding a bound.

DualLinkedQueue implements the dual queue by Scherer and Scott (2004):

Scherer, William N., and Michael L. Scott. “Nonblocking Concurrent Data Structures with Condition Synchronization.” In Distributed Computing, edited by Rachid Guerraoui, 174–87. Lecture Notes in Computer Science. Berlin, Heidelberg: Springer, 2004. https://doi.org/10.1007/978-3-540-30186-8_13.

ConcurrentCollections.Implementations.LinkedConcurrentRingQueueType
LinkedConcurrentRingQueue{T}()

A concurrent queue with nonblocking push! and maybepopfirst!.

See also: DualLinkedConcurrentRingQueue

Examples

julia> using ConcurrentCollections

julia> q = LinkedConcurrentRingQueue{Int}();

julia> push!(q, 111);

julia> push!(q, 222);

julia> maybepopfirst!(q)  # first-in first-out
Some(111)

julia> maybepopfirst!(q)
Some(222)

julia> maybepopfirst!(q) === nothing  # queue is empty
true

Extended help

LinkedConcurrentRingQueue is based on Linked Concurrent Ring Queue (or List of Concurrent Ring Queues; LCRQ) by Morrison and Afek (2013):

Morrison, Adam, and Yehuda Afek. “Fast Concurrent Queues for X86 Processors.” In Proceedings of the 18th ACM SIGPLAN Symposium on Principles and Practice of Parallel Programming, 103–112. PPoPP ’13. New York, NY, USA: Association for Computing Machinery, 2013. https://doi.org/10.1145/2442516.2442527. (Revised version: https://www.cs.tau.ac.il/~mad/publications/ppopp2013-x86queues.pdf)

ConcurrentCollections.Implementations.WorkStealingDequeType
WorkStealingDeque{T}()

Concurrent work-stealing "deque" of objects of type T.

This is not a full deque in the sense that:

  • push! and maybepop! operating at the tail of the collection can only be executed by a single task.
  • maybepopfirst! (aka steal) for retrieving and removing an element at the head can be invoked from any tasks. However, there is no pushfirst!.

Examples

julia> using ConcurrentCollections

julia> deque = WorkStealingDeque{Int}();

julia> push!(deque, 1);

julia> push!(deque, 2);

julia> push!(deque, 3);

julia> maybepop!(deque)
Some(3)

julia> fetch(Threads.@spawn maybepopfirst!(deque))
Some(1)

julia> fetch(Threads.@spawn popfirst!(deque))
2

julia> maybepopfirst!(deque)  # returns nothing

Extended help

The current implementation is known to be not fully compliant with the C/C++ memory model (on which Julia's memory model is designed).

It implements the dynamic circular work-stealing deque by Chase and Lev (2005):

Chase, David, and Yossi Lev. “Dynamic Circular Work-Stealing Deque.” In Proceedings of the Seventeenth Annual ACM Symposium on Parallelism in Algorithms and Architectures, 21–28. SPAA ’05. New York, NY, USA: Association for Computing Machinery, 2005. https://doi.org/10.1145/1073970.1073974.

ConcurrentCollections.KeepType
Keep(ans)

A special type used in modify! to indicate that a slot should be remain unchanged while propagating the result ans of some computation to the caller.

That is to say,

y = modify!(dict, key) do value
    Keep(f(something(value)))
end
y[]

is an optimization of

r = Ref{Any}()
modify!(dict, key) do value
    r[] = f(something(value))
    Some(value)
end
r[]
ConcurrentCollections.maybepop!Function
maybepop!(collection) -> Some(value::T) or nothing

Try to pop a value from the tail of collection. Return Some(value) if it is non-empty. Return nothing if empty.

Examples

julia> using ConcurrentCollections

julia> stack = ConcurrentStack{Int}();

julia> push!(stack, 1);

julia> maybepop!(stack)
Some(1)

julia> maybepop!(stack)  # returns nothing
ConcurrentCollections.maybepopfirst!Function
maybepopfirst!(collection) -> Some(value::T) or nothing

Try to pop a value from the head of collection. Return Some(value) if it is non-empty. Return nothing if empty.

Examples

julia> using ConcurrentCollections

julia> queue = ConcurrentQueue{Int}();

julia> push!(queue, 1);

julia> maybepopfirst!(queue)
Some(1)

julia> maybepopfirst!(queue)  # returns nothing
ConcurrentCollections.modify!Function
modify!(f, dict::ConcurrentDict{K,V}, key::K) -> y

Atomically update key slot of dict using a function f.

If key does not exist, f is called with nothing. The call f(nothing) must return either (1) nothing to keep the slot unoccupied or (2) Some(value::V) to insert value.

If key exist, f is called with a ref such that ref[] retrieves the value corresponding to the key. The call f(ref) must return either (1) nothing to delete the slot, (2) Some(value′::V) to insert value, (3) Keep(ans) to return y = Keep(ans) from modify!, or (4) Delete(ans) to delete slot and return a value y = Delete(ans) from modify!.

The function f may be called more than once if multiple tasks try to modify the dictionary.

Examples

julia> using ConcurrentCollections

julia> dict = ConcurrentDict{String,Int}();

julia> modify!(dict, "hello") do _
           Some(1)
       end
Some(1)

julia> modify!(dict, "hello") do ref
           Some(something(ref[]) + 1)
       end
Some(2)