ChunkedBase.ConsumeContexts.consume!Function
consume!(consume_ctx::AbstractConsumeContext, payload::ParsedPayload{<:AbstractResultBuffer, <:AbstractParsingContext})

Override with your AbstractConsumeContext to provide a custom logic for processing the parsed results in AbstractResultBuffer. The method is called from multiple tasks in parallel, just after each corresponding task_buf has been populated. task_buf is only filled once per chunk and is only accessed by one task at a time.

See also consume!, setup_tasks!, setup_tasks!, cleanup, AbstractResultBuffer

ChunkedBase.ConsumeContexts.setup_tasks!Method
setup_tasks!(consume_ctx::AbstractConsumeContext, chunking_ctx::ChunkingContext, ntasks::Int)

Set the number of units of work that the parser/consume tasks need to finish (and report back as done via e.g. task_done!) before the current chunk of input data is considered to be entirely processed.

This function is called just after the we're done detecting newline positions in the current chunk of data and we are about to submit partitions of the detected newlines to the parse/consume tasks.

ntasks is between 1 and nworkers argument to parse_file, depending on the size of the input. Most of the time, the value is nworkers is used, but for smaller buffer sizes, smaller files or when handling the last bytes of the file, ntasks will be smaller as we try to ensure the minimal average tasks size if terms of bytes of input is at least 16.000 KiB. For :serial parsing mode, ntasks is always 1.

You should override this method when you further subdivide the amount of concurrent work on the chunk, e.g. when you want to process each column separately in @spawn tasks, in which case you'd expect there to be ntasks * (1 + length(parsing_ctx.schema)) units of work per chunk (in which case you'd have to manually call task_done! in the column-processing tasks).

See also consume!, setup_tasks!, task_done!, cleanup

ChunkedBase.ConsumeContexts.sync_tasksMethod
sync_tasks(consume_ctx::AbstractConsumeContext, chunking_ctx::ChunkingContext)

Wait for all parse/consume tasks to report all expected units of work to be done. Called after all work for the current chunk has been submitted to the parser/consume tasks and we're about to refill it.

See also consume!, setup_tasks!, task_done!, cleanup

ChunkedBase.ConsumeContexts.task_done!Method
task_done!(consume_ctx::AbstractConsumeContext, chunking_ctx::ChunkingContext)

Decrement the expected number of remaining work units by one. Called after each consume! call.

The this function should be called ntasks times where ntasks comes from the corresponding setup_tasks! call.

See also consume!, setup_tasks!, cleanup

ChunkedBase.ChunkingContextType
ChunkingContext(
    buffersize::Integer,
    nworkers::Integer,
    limit::Integer,
    comment::Union{Nothing,UInt8,String,Char,Vector{UInt8}}
) -> ChunkingContext

A context object used to coordinate parallel parsing of a single file, chunk by chunk.

The user can use this object to specify the size of the byte buffer(s) to allocate, the number of worker tasks to spawn and the maximum number of rows to parse in parse_file_parallel.

Arguments:

  • buffersize: the size of the byte buffer to allocate . If the input is bigger than buffersize, a secondary ChunkingContext object will be used to double-buffer the input, which will allocate a new buffer of the same size as buffersize.
  • nworkers: the number of worker tasks that should be spawned in parse_file_parallel.
  • limit: the maximum number of rows to parse, see limit_eols!, by default no limit is set.
  • comment: the comment prefix to skip, if any. By default no comment prefix is skipped.

Notes:

  • One can use the id and buffer_refills fields to uniquely identify a chunk of input.

The id field is necessary because we internally create a secondary ChunkingContext object, with id equal to the id of the original ChunkingContext + 1.

  • The counter field is used to synchronize the parser/consumer tasks.
  • The newline_positions field is used to store the newline positions in the input.
  • The bytes field is used to store the raw bytes ingested from the input.
  • comment can be used to skip the initial comment lines in the skip_rows_init!. This value is also passed to populate_result_buffer! for user to apply handle commented rows in the middle of the file during parsing (_startswith could be used to do the check).
  • The buffersize should be large enough to fit the longest row in the input, otherwise the lexer will fail.
  • The buffersize should be chosen such that each of the nworkers tasks has enough bytes to work on. Using 1MiB per task seems to work reasonably well in practice.

See also:

ChunkedBase.ParsedPayloadType
ParsedPayload{B, C<:AbstractParsingContext}

A payload of parsed results, which is passed to consume! after each populate_result_buffer! call.

Fields:

  • row_num::Int: row number of the first row in the payload
  • len::Int: number of rows in the payload
  • results::B: parsed result buffer which was populated by populate_result_buffer!
  • parsing_ctx::C: library-provided data (to distinguish JSONL and CSV processing)
  • chunking_ctx::ChunkingContext: contains the raw bytes, synchronization objects and newline positions
  • eols_buffer_index::Int32: The start index of the newline positions in chunking_ctx.newline_positions that this payload corresponds to.

See also:

ChunkedBase.PayloadOrdererType
PayloadOrderer{B, C<:AbstractParsingContext} <: AbstractChannel{ParsedPayload{B,C}}

A channel-like object that ensures that the payloads are consumed in order.

To use the PayloadOrderer you should create your own AbstractConsumeContext that contains it and override the consume! to only put! payloads in the PayloadOrderer and take! payloads from it using a separate task. For example:

struct MyConsumeContext <: AbstractConsumeContext
    orderer::PayloadOrderer{MyResultBuffer, MyParsingContext}
end

# Forward the payloads, which will arrive in random order, to the orderer
function ChunkedBase.consume!(consume_ctx::MyConsumeContext, payload::ParsedPayload)
    put!(consume_ctx.orderer, payload)
end

# Expect `ChunkedBase.task_done!` to be called twice per payload.
# By default, we'd do it once after every `consume!`
# But we'll add another one in the task that takes from the orderer.
# This will make sure that the current chunk won't ger recycled after our task is done with it.
function ChunkedBase.setup_tasks!(::MyConsumeContext, chunking_ctx::ChunkingContext, ntasks::Int)
    set!(chunking_ctx.counter, 2*ntasks)
end

consume_ctx = MyConsumeContext(PayloadOrderer{MyResultBuffer, MyParsingContext}())

# Our task that needs to process the payloads in order
@spawn begin
    while true
        payload = take!(consume_ctx.orderer)
        do_something_that_requires_ordered_results(payload)
        task_done!(consume_ctx, payload.chunking_ctx)
    end
end

parse_file_parallel(lexer, parsing_ctx, consume_ctx, chunking_ctx, result_buf)

NOTE: It is not safe to call take! from multiple tasks on a PayloadOrderer.

See also:

ChunkedBase.parse_file_parallelMethod
parse_file_parallel(
    lexer::Lexer,
    parsing_ctx::AbstractParsingContext,
    consume_ctx::AbstractConsumeContext,
    chunking_ctx::ChunkingContext,
    result_buffers::Vector{<:AbstractResultBuffer},
    ::Type{CT}=Tuple{}
) where {CT} -> Nothing

Parse the file in lexer.io in parallel using chunking_ctx.nworkers tasks. User must provide a populate_result_buffer! method which is used to parse ingested data in chunking_ctx.bytes, using the newline positions in chunking_ctx.newline_positions as row boundaries into the result_buffers. The consume! method is called after each populate_result_buffer! call, so the user can process the parsed results in parallel. No result_buffer is accessed by more than one task at a time.

Arguments:

  • lexer: a NewlineLexers.Lexer object which is used to find newline positions in the input. The

type of the lexer affects whether the search is quote-aware or not.

  • parsing_ctx: a user-provided object which is passed to populate_result_buffer!
  • consume_ctx: a user-provided object which is passed to consume!
  • chunking_ctx: an internal object that is used to keep track of the current chunk of data being processed
  • result_buffers: a vector of user-provided objects which are used to store the parsed results
  • CT: an optional, compile-time known type which is passed to populate_result_buffer!.

This is bit of niche functionality required by ChunkedCSV, which needs to know about "custom types" at compile time in order to unroll the parsing loop on them.

Exceptions:

  • UnmatchedQuoteError: if the input ends with an unmatched quote
  • NoValidRowsInBufferError: if not a single newline was found in the input buffer
  • CapturedException: if an exception was thrown in one of the parser/consumer tasks

Notes:

  • You can initialize the chunking_ctx yourself using read_and_lex! or with initial_read! + initial_lex!

which gives you the opportunity to sniff the first chunk of data before you call parse_file_parallel.

  • If the input is bigger than chunking_ctx.bytes, a secondary chunking_ctx object will be used to

double-buffer the input, which will allocate a new buffer of the same size as chunking_ctx.bytes.

  • This function spawns chunking_ctx.nworkers + 1 tasks.

See also populate_result_buffer!, consume!, parse_file_serial.

ChunkedBase.parse_file_serialMethod
parse_file_serial(
    lexer::Lexer,
    parsing_ctx::AbstractParsingContext,
    consume_ctx::AbstractConsumeContext,
    chunking_ctx::ChunkingContext,
    result_buf::AbstractResultBuffer,
    ::Type{CT}=Tuple{},
) where {CT}

The serial analog of parse_file_parallel which doesn't spawn any tasks, useful for debugging and processing very small files.

See also populate_result_buffer!, consume!, parse_file_parallel.

ChunkedBase.populate_result_buffer!Function
populate_result_buffer!(
    result_buf::AbstractResultBuffer,
    newline_segment:AbstractVector{Int32},
    parsing_ctx::AbstractParsingContext,
    bytes::Vector{UInt8},
    comment::Union{Nothing,Vector{UInt8}}=nothing,
    ::Type{CT}=Tuple{}
) where {CT}

Override with your AbstractParsingContext to provide custom logic for parsing the input bytes in parsing_ctx.bytes between the newline positions in newline_segment into result_buf. The method is called from multiple tasks in parallel, each having a different newline_segment, some sharing the same parsing_ctx.bytes. The result_buf is only accessed by one task at a time.

Arguments:

  • result_buf: a user-provided object which is meant to store the parsing results from this function
  • newline_segment: a vector of newline positions in bytes which delimit the rows of the input.
  • parsing_ctx: a user-provided object that is used to dispatch to this method and carry parsing specific config
  • bytes: the raw bytes ingested from the input
  • comment: the comment prefix to skip, if any
  • CT: an optional, compile-time known object which was passed to parse_file_parallel / parse_file_serial

Notes:

Each consecutive pair of newline_segment values defines an exclusive range of bytes in bytes which constitutes a single row.

The range needs to be treated as exclusive because we add a fictional newline at the beginning of the chunk at position 0 and past the end of the file if it doesn't end on a newline. A safe way of processing each row would be e.g.:

start_index = first(newline_segment)
for i in 2:length(newline_segment)
    end_index = newline_segment[i]
    row_bytes = view(bytes, start_index+1:end_index-1) # +/- 1 is needed!

    # ... actually populate the result_buf

    start_index = end_index
end