Parsing from an IO

Some file types are gigabytes or tens of gigabytes in size. For these files, parsing from a buffer may be impractical, as they require you to read in the entire file in memory at once. Automa enables this by hooking into TranscodingStreams.jl, a package that provides a wrapper IO of the type TranscodingStream. Importantly, these streams buffer their input data. Automa is thus able to operate directly on the input buffers of TranscodingStream objects.

Unfortunately, this significantly complicates things compared to parsing from a simple buffer. The main problem is that, when reading from a buffered stream, the byte array visible from Automa is only a small slice of the total input data. Worse, when the end of the stream is reached, data from the buffer is flushed, i.e. removed from the stream. To handle this, Automa must reach deep into the implementation details of TranscodingStreams, and also break some of its own abstractions. It's not pretty, but it's what we have.

Practically speaking, parsing from IO is done with the function Automa.generate_reader. Despite its name, this function is NOT directly used to generate objects like FASTA.Reader. Instead, this function produces Julia code (an Expr object) that, when evaluated, defines a function that can execute an Automa machine on an IO. Let me first show the code generated by generate_reader in pseudocode format:

function { function name }(stream::TranscodingStream, { args... })
    { init code }

    @label __exec__

    p = current buffer position
    p_end = final buffer position

    # the eof call below will first flush any used data from buffer,
    # then load in new data, before checking if it's really eof.
    is_eof = eof(stream)
    execute normal automa parsing of the buffer
    update buffer position to match p

    { loop code }

    if cs < 0 # meaning: erroneous input or erroneous EOF
        { error code }

    if machine errored or reached EOF
        @label __return__
        { return code }
    @goto __exec__

The content marked { function name }, { args... }, { init code }, { loop code }, { error code } and { return code } are arguments provided to Automa.generate_reader. By providing these, the user can customize the generated function further.

The main difference from the code generated to parse a buffer is the label/GOTO pair __exec__, which causes Automa to repeatedly load data into the buffer, execute the machine, then flush used data from the buffer, then execute the machine, and so on, until interrupted.

Importantly, when parsing from a buffer, p and p_end refer to the position in the current buffer. This may not be the position in the stream, and when the data in the buffer is flushed, it may move the data in the buffer so that p now become invalid. This means you can't simply store a variable marked_pos that points to the current value of p and expect that the same data is at that position later. Furthermore, is_eof is set to whether the stream has reached EOF.

Example use

Let's show the simplest possible example of such a function. We have a Machine (which, recall, is a compiled regex) called machine, and we want to make a function that returns true if a given IO contain data that conforms to the regex format specified by the Machine.

We will still use the machine from before, just without any actions:

machine = let
    header = re"[a-z]+"
    seqline = re"[ACGT]+"
    record = re">" * header * '\n' * rep1(seqline * '\n')
@assert machine isa Automa.Machine

To create our simple IO reader, we simply need to call generate_reader, where the { return code } is a check if iszero(cs), meaning if the machine exited at a proper exit state. We also need to set error_code to an empty expression in order to prevent throwing an error on invalid code. Instead, we want it to go immediately to return - we call this section __return__, so we need to @goto __return__. Then, we need to evaluate the code created by generate_reader in order to define the function validate_fasta

julia> return_code = :(iszero(cs));

julia> error_code = :(@goto __return__);

julia> eval(generate_reader(:validate_fasta,  machine; returncode=return_code, errorcode=error_code));

The generated function validate_fasta has the function signature: validate_fasta(stream::TranscodingStream). If our input IO is not a TranscodingStream, we can wrap it in the relatively lightweight NoopStream, which, as the name suggests, does nothing to the data:

julia> io = NoopStream(IOBuffer(">a\nTAG\nTA\n>bac\nG\n"));

julia> validate_fasta(io)

julia> validate_fasta(NoopStream(IOBuffer("random data")))

Reading a single record


The following code is only for demonstration purposes. It has one important flaw, which will be addressed in a later section, so do not copy-paste it for serious work.

There are a few more subtleties related to the generate_reader function. Suppose we instead want to create a function that reads a single FASTA record from an IO. In this case, it's no good that the function created from generate_reader will loop until the IO reaches EOF - we need to find a way to stop it after reading a single record. We can do this with the pseudomacro @escape, as shown below.

We will reuse our Seq struct and our Machine from the "parsing from a buffer" section of this tutorial:

struct Seq

machine = let
    header = onexit!(onenter!(re"[a-z]+", :mark_pos), :header)
    seqline = onexit!(onenter!(re"[ACGT]+", :mark_pos), :seqline)
    record = onexit!(re">" * header * '\n' * rep1(seqline * '\n'), :record)
@assert machine isa Automa.Machine

# output

The code below contains @escape in the :record action - meaning: Break out of machine execution.

actions = Dict{Symbol, Expr}(
    :mark_pos => :(pos = p),
    :header => :(header = String(data[pos:p-1])),
    :seqline => :(append!(seqbuffer, data[pos:p-1])),

    # Only this action is different from before!
    :record => quote
        seq = Seq(header, String(seqbuffer))
        found_sequence = true
        # Reset p one byte if we're not at the end
        p -= !(is_eof && p > p_end)
@assert actions isa Dict

# output

@escape is not actually a real macro, but what Automa calls a "pseudomacro". It is expanded during Automa's own compiler pass before Julia's lowering. The @escape pseudomacro is replaced with code that breaks it out of the executing machine, without reaching EOF or an invalid byte.

Let's see how I use generate_reader, then I will explain each part:

        seqbuffer = UInt8[]
        pos = 0
        found_sequence = false
        header = ""
        if (is_eof && p > p_end) || found_sequence
            @goto __return__
    returncode=:(found_sequence ? seq : nothing)
) |> eval

In the :record, action, a few new things happen.

  • First, I set the flag found_sequence = false. In the loop code, I look for this flag to signal that the function should return. Remember, the loop code happens after machine execution, which can mean either that the execution was broken out of by @escape, or than the buffer ran out and need to be refilled. I could just return the sequence directly in the action, but then I would skip a bunch of the code generated by generate_reader which sets the buffer state correctly, so this is never adviced. Instead, in the loop code, which executes after the buffer has been flushed, I check for this flag, and goes to __return__ if necessary. I could also just return directly in the loopcode, but I prefer only having one place to retun from the function.
  • I use @escape to break out of the machine, i.e. stop machine execution
  • Finally, I decrement p, if and only if the machine has not reached EOF (which happens when is_eof is true, meaning the last part of the IO has been buffered, and p > p_end, meaning the end of the buffer has been reached). This is because, the first record ends when the IO reads the second > symbol. If I then were to read another record from the same IO, I would have already read the > symbol. I need to reset p by 1, so the > is also read on the next call to read_record.

I can use the function like this:

julia> io = NoopStream(IOBuffer(">a\nT\n>tag\nGAGA\nTATA\n"));

julia> read_record(io)
Seq("a", "T")

julia> read_record(io)
Seq("tag", "GAGATATA")

julia> read_record(io)

Preserving data by marking the buffer

There are several problems with the implementation above: The following code in my actions dict:

header = String(data[pos:p-1])

Creates header by accessing the data buffer. However, when reading an IO, how can I know that the data hasn't shifted around in the buffer between when I defined pos? For example, suppose we have a short buffer of only 8 bytes, and the following FASTA file: >abcdefghijkl\nA. Then, the buffer is first filled with >abcdefg. When entering the header, I execute the action :mark_position at p = 2, so pos = 2. But now, when I reach the end of the header, the used data in the buffer has been flushed, and the data is now: hijkl\nA, and p = 14. I then try to access data[2:13], which is out of bounds!

Luckily, the buffers of TranscodingStreams allow us to "mark" a position to save it. The buffer will not flush the marked position, or any position after the marked position. If necessary, it will resize the buffer to be able to load more data while keeping the marked position.

Inside the function generated by generate_reader, we can use the zero-argument pseudomacro @mark(), which marks the position p. The macro @markpos() can then be used to get the marked position, which will point to the same data in the buffer, even after the data in the buffer has been shifted after it's been flushed. This works because the mark is stored inside the TranscodingStream buffer, and the buffer makes sure to update the mark if the content moves. Hence, we can re-write the actions:

actions = Dict{Symbol, Expr}(
    :mark_position => :(@mark),
    :header => :(header = String(data[@markpos():p-1])),
    :seqline => :(append!(buffer, data[@markpos():p-1])),

    [:record action omitted...]

In our example above with the small 8-byte buffer, this is what would happen: First, the buffer contains the first 8 bytes. When p = 2, the mark is set, and the second byte is marked::

content: >abcdefg
mark:     ^
p = 2     ^

Then, when p = 9 the buffer is exhausted, the used data is removed, BUT, the mark stays, so byte 2 is preserved, and only the first byte is removed. The code in generate_reader loops around to @label __exec__, which sets p to the current buffer position. The buffer now looks like this:

content: abcdefgh
mark:    ^
p = 8           ^

Only 1 byte was cleared, so when p = 9, the buffer will be exhausted again. This time, no data can be cleared, so instead, the buffer is resized to fit more data:

content: abcdefghijkl\nA
mark:    ^
p = 9            ^

Finally, when we reach the newline p = 13, the whole header is in the buffer, and so data[@markpos():p-1] will correctly refer to the header (now, 1:12).

content: abcdefghijkl\nA
mark:    ^
p = 13               ^

Remember to update the mark, or to clear it with @unmark() in order to be able to flush data from the buffer afterwards.


generate_reader(funcname::Symbol, machine::Automa.Machine; kwargs...)

Generate a streaming reader function of the name funcname from machine.

The generated function consumes data from a stream passed as the first argument and executes the machine with filling the data buffer.

This function returns an expression object of the generated function. The user need to evaluate it in a module in which the generated function is needed.

Keyword Arguments

  • arguments: Additional arguments funcname will take (default: ()). The default signature of the generated function is (stream::TranscodingStream,), but it is possible to supply more arguments to the signature with this keyword argument.
  • context: Automa's codegenerator (default: Automa.CodeGenContext()).
  • actions: A dictionary of action code (default: Dict{Symbol,Expr}()).
  • initcode: Initialization code (default: :()).
  • loopcode: Loop code (default: :()).
  • returncode: Return code (default: :(return cs)).
  • errorcode: Executed if cs < 0 after loopcode (default error message)

See the source code of this function to see how the generated code looks like


Pseudomacro. When encountered during Machine execution, the machine will stop executing. This is useful to interrupt the parsing process, for example to emit a record during parsing of a larger file. p will be advanced as normally, so if @escape is hit on B during parsing of "ABC", the next byte will be C.


Pseudomacro, to be used with IO-parsing Automa functions. This macro will "mark" the position of p in the current buffer. The marked position will not be flushed from the buffer after being consumed. For example, Automa code can call @mark() at the beginning of a large string, then when the string is exited at position p, it is guaranteed that the whole string resides in the buffer at positions markpos():p-1.


Pseudomacro. Removes the mark from the buffer. This allows all previous data to be cleared from the buffer.

See also: @mark, @markpos


Pseudomacro. Returns the integer position of the current TranscodingStreams buffer (only used with the generate_reader function).


# Inside some Automa action code
description = sub_parser(stream)
p = @bufferpos()

See also: @setbuffer


Automa pseudomacro. Return the position of p relative to @markpos(). Equivalent to p - @markpos() + 1. This can be used to mark additional points in the stream when the mark is set, after which their action position can be retrieved using abspos(x).

Behaviour is undefined if mark has not yet been set.

Example usage:

# In one action
identifier_pos = @relpos(p)

# Later, in a different action
identifier = data[@abspos(identifier_pos):p]

See also: @abspos


Automa pseudomacro. Used to obtain the actual position of a relative position obtained from @relpos. See @relpos for more details.

Behaviour is undefined if mark has not yet been set.


Updates the buffer position to match p. The buffer position is syncronized with p before and after calls to functions generated by generate_reader. @setbuffer() can be used to the buffer before calling another parser.


# Inside some Automa action code
description = sub_parser(stream)
p = @bufferpos()

See also: @bufferpos