ChunkedBase.ConsumeContexts.AbstractConsumeContext
— TypeAbstractConsumeContext
End users should subtype this to create custom consume contexts which are then used in parse_file_parallel
and parse_file_serial
, to dispatch on their populate_result_buffer!
method.
See also:
ChunkedBase.ConsumeContexts.cleanup
— Methodcleanup(consume_ctx::AbstractConsumeContext, e::Exception)
You can override this method do custom exception handling with your consume context. This method is called immediately before a rethrow()
which forwards the e
further.
See also consume!
, setup_tasks!
, task_done!
, sync_tasks
ChunkedBase.ConsumeContexts.consume!
— Functionconsume!(consume_ctx::AbstractConsumeContext, payload::ParsedPayload{<:AbstractResultBuffer, <:AbstractParsingContext})
Override with your AbstractConsumeContext
to provide a custom logic for processing the parsed results in AbstractResultBuffer
. The method is called from multiple tasks in parallel, just after each corresponding task_buf
has been populated. task_buf
is only filled once per chunk and is only accessed by one task at a time.
See also consume!
, setup_tasks!
, setup_tasks!
, cleanup
, AbstractResultBuffer
ChunkedBase.ConsumeContexts.setup_tasks!
— Methodsetup_tasks!(consume_ctx::AbstractConsumeContext, chunking_ctx::ChunkingContext, ntasks::Int)
Set the number of units of work that the parser/consume tasks need to finish (and report back as done via e.g. task_done!
) before the current chunk of input data is considered to be entirely processed.
This function is called just after the we're done detecting newline positions in the current chunk of data and we are about to submit partitions of the detected newlines to the parse/consume tasks.
ntasks
is between 1 and nworkers
argument to parse_file
, depending on the size of the input. Most of the time, the value is nworkers
is used, but for smaller buffer sizes, smaller files or when handling the last bytes of the file, ntasks
will be smaller as we try to ensure the minimal average tasks size if terms of bytes of input is at least 16.000 KiB. For :serial
parsing mode, ntasks
is always 1.
You should override this method when you further subdivide the amount of concurrent work on the chunk, e.g. when you want to process each column separately in @spawn
tasks, in which case you'd expect there to be ntasks * (1 + length(parsing_ctx.schema))
units of work per chunk (in which case you'd have to manually call task_done!
in the column-processing tasks).
See also consume!
, setup_tasks!
, task_done!
, cleanup
ChunkedBase.ConsumeContexts.sync_tasks
— Methodsync_tasks(consume_ctx::AbstractConsumeContext, chunking_ctx::ChunkingContext)
Wait for all parse/consume tasks to report all expected units of work to be done. Called after all work for the current chunk has been submitted to the parser/consume tasks and we're about to refill it.
See also consume!
, setup_tasks!
, task_done!
, cleanup
ChunkedBase.ConsumeContexts.task_done!
— Methodtask_done!(consume_ctx::AbstractConsumeContext, chunking_ctx::ChunkingContext)
Decrement the expected number of remaining work units by one. Called after each consume!
call.
The this function should be called ntasks
times where ntasks
comes from the corresponding setup_tasks!
call.
See also consume!
, setup_tasks!
, cleanup
ChunkedBase.AbstractParsingContext
— TypeAbstractParsingContext
Users should subtype this to create custom parsing contexts variables which are then used in parse_file_parallel
/ parse_file_serial
, to dispatch on their populate_result_buffer!
method.
See also:
ChunkedBase.AbstractResultBuffer
— TypeAbstractResultBuffer
Users should subtype this to create custom result buffer objects to store the parsed results in populate_result_buffer!
.
See also:
ChunkedBase.ChunkingContext
— TypeChunkingContext(
buffersize::Integer,
nworkers::Integer,
limit::Integer,
comment::Union{Nothing,UInt8,String,Char,Vector{UInt8}}
) -> ChunkingContext
A context object used to coordinate parallel parsing of a single file, chunk by chunk.
The user can use this object to specify the size of the byte buffer(s) to allocate, the number of worker tasks to spawn and the maximum number of rows to parse in parse_file_parallel
.
Arguments:
buffersize
: the size of the byte buffer to allocate . If the input is bigger thanbuffersize
, a secondaryChunkingContext
object will be used to double-buffer the input, which will allocate a new buffer of the same size asbuffersize
.nworkers
: the number of worker tasks that should be spawned inparse_file_parallel
.limit
: the maximum number of rows to parse, seelimit_eols!
, by default no limit is set.comment
: the comment prefix to skip, if any. By default no comment prefix is skipped.
Notes:
- One can use the
id
andbuffer_refills
fields to uniquely identify a chunk of input.
The id
field is necessary because we internally create a secondary ChunkingContext
object, with id
equal to the id
of the original ChunkingContext
+ 1.
- The
counter
field is used to synchronize the parser/consumer tasks. - The
newline_positions
field is used to store the newline positions in the input. - The
bytes
field is used to store the raw bytes ingested from the input. comment
can be used to skip the initial comment lines in theskip_rows_init!
. This value is also passed topopulate_result_buffer!
for user to apply handle commented rows in the middle of the file during parsing (_startswith
could be used to do the check).- The
buffersize
should be large enough to fit the longest row in the input, otherwise the lexer will fail. - The
buffersize
should be chosen such that each of thenworkers
tasks has enough bytes to work on. Using 1MiB per task seems to work reasonably well in practice.
See also:
ChunkedBase.ParsedPayload
— TypeParsedPayload{B, C<:AbstractParsingContext}
A payload of parsed results, which is passed to consume!
after each populate_result_buffer!
call.
Fields:
row_num::Int
: row number of the first row in the payloadlen::Int
: number of rows in the payloadresults::B
: parsed result buffer which was populated bypopulate_result_buffer!
parsing_ctx::C
: library-provided data (to distinguish JSONL and CSV processing)chunking_ctx::ChunkingContext
: contains the raw bytes, synchronization objects and newline positionseols_buffer_index::Int32
: The start index of the newline positions inchunking_ctx.newline_positions
that this payload corresponds to.
See also:
ChunkedBase.PayloadOrderer
— TypePayloadOrderer{B, C<:AbstractParsingContext} <: AbstractChannel{ParsedPayload{B,C}}
A channel-like object that ensures that the payloads are consumed in order.
To use the PayloadOrderer
you should create your own AbstractConsumeContext
that contains it and override the consume!
to only put!
payloads in the PayloadOrderer
and take!
payloads from it using a separate task. For example:
struct MyConsumeContext <: AbstractConsumeContext
orderer::PayloadOrderer{MyResultBuffer, MyParsingContext}
end
# Forward the payloads, which will arrive in random order, to the orderer
function ChunkedBase.consume!(consume_ctx::MyConsumeContext, payload::ParsedPayload)
put!(consume_ctx.orderer, payload)
end
# Expect `ChunkedBase.task_done!` to be called twice per payload.
# By default, we'd do it once after every `consume!`
# But we'll add another one in the task that takes from the orderer.
# This will make sure that the current chunk won't ger recycled after our task is done with it.
function ChunkedBase.setup_tasks!(::MyConsumeContext, chunking_ctx::ChunkingContext, ntasks::Int)
set!(chunking_ctx.counter, 2*ntasks)
end
consume_ctx = MyConsumeContext(PayloadOrderer{MyResultBuffer, MyParsingContext}())
# Our task that needs to process the payloads in order
@spawn begin
while true
payload = take!(consume_ctx.orderer)
do_something_that_requires_ordered_results(payload)
task_done!(consume_ctx, payload.chunking_ctx)
end
end
parse_file_parallel(lexer, parsing_ctx, consume_ctx, chunking_ctx, result_buf)
NOTE: It is not safe to call take!
from multiple tasks on a PayloadOrderer
.
See also:
ChunkedBase.parse_file_parallel
— Methodparse_file_parallel(
lexer::Lexer,
parsing_ctx::AbstractParsingContext,
consume_ctx::AbstractConsumeContext,
chunking_ctx::ChunkingContext,
result_buffers::Vector{<:AbstractResultBuffer},
::Type{CT}=Tuple{}
) where {CT} -> Nothing
Parse the file in lexer.io
in parallel using chunking_ctx.nworkers
tasks. User must provide a populate_result_buffer!
method which is used to parse ingested data in chunking_ctx.bytes
, using the newline positions in chunking_ctx.newline_positions
as row boundaries into the result_buffers
. The consume!
method is called after each populate_result_buffer!
call, so the user can process the parsed results in parallel. No result_buffer
is accessed by more than one task at a time.
Arguments:
lexer
: aNewlineLexers.Lexer
object which is used to find newline positions in the input. The
type of the lexer affects whether the search is quote-aware or not.
parsing_ctx
: a user-provided object which is passed topopulate_result_buffer!
consume_ctx
: a user-provided object which is passed toconsume!
chunking_ctx
: an internal object that is used to keep track of the current chunk of data being processedresult_buffers
: a vector of user-provided objects which are used to store the parsed resultsCT
: an optional, compile-time known type which is passed topopulate_result_buffer!
.
This is bit of niche functionality required by ChunkedCSV, which needs to know about "custom types" at compile time in order to unroll the parsing loop on them.
Exceptions:
UnmatchedQuoteError
: if the input ends with an unmatched quoteNoValidRowsInBufferError
: if not a single newline was found in the input bufferCapturedException
: if an exception was thrown in one of the parser/consumer tasks
Notes:
- You can initialize the
chunking_ctx
yourself usingread_and_lex!
or withinitial_read!
+initial_lex!
which gives you the opportunity to sniff the first chunk of data before you call parse_file_parallel
.
- If the input is bigger than
chunking_ctx.bytes
, a secondarychunking_ctx
object will be used to
double-buffer the input, which will allocate a new buffer of the same size as chunking_ctx.bytes
.
- This function spawns
chunking_ctx.nworkers
+ 1 tasks.
See also populate_result_buffer!
, consume!
, parse_file_serial
.
ChunkedBase.parse_file_serial
— Methodparse_file_serial(
lexer::Lexer,
parsing_ctx::AbstractParsingContext,
consume_ctx::AbstractConsumeContext,
chunking_ctx::ChunkingContext,
result_buf::AbstractResultBuffer,
::Type{CT}=Tuple{},
) where {CT}
The serial analog of parse_file_parallel
which doesn't spawn any tasks, useful for debugging and processing very small files.
See also populate_result_buffer!
, consume!
, parse_file_parallel
.
ChunkedBase.populate_result_buffer!
— Functionpopulate_result_buffer!(
result_buf::AbstractResultBuffer,
newline_segment:AbstractVector{Int32},
parsing_ctx::AbstractParsingContext,
bytes::Vector{UInt8},
comment::Union{Nothing,Vector{UInt8}}=nothing,
::Type{CT}=Tuple{}
) where {CT}
Override with your AbstractParsingContext
to provide custom logic for parsing the input bytes in parsing_ctx.bytes
between the newline positions in newline_segment
into result_buf
. The method is called from multiple tasks in parallel, each having a different newline_segment
, some sharing the same parsing_ctx.bytes
. The result_buf
is only accessed by one task at a time.
Arguments:
result_buf
: a user-provided object which is meant to store the parsing results from this functionnewline_segment
: a vector of newline positions inbytes
which delimit the rows of the input.parsing_ctx
: a user-provided object that is used to dispatch to this method and carry parsing specific configbytes
: the raw bytes ingested from the inputcomment
: the comment prefix to skip, if anyCT
: an optional, compile-time known object which was passed toparse_file_parallel
/parse_file_serial
Notes:
Each consecutive pair of newline_segment
values defines an exclusive range of bytes in bytes
which constitutes a single row.
The range needs to be treated as exclusive because we add a fictional newline at the beginning of the chunk at position 0 and past the end of the file if it doesn't end on a newline. A safe way of processing each row would be e.g.:
start_index = first(newline_segment)
for i in 2:length(newline_segment)
end_index = newline_segment[i]
row_bytes = view(bytes, start_index+1:end_index-1) # +/- 1 is needed!
# ... actually populate the result_buf
start_index = end_index
end