Links

Links are built on top of Channels of Julia. They are used as communication primitives for Tasks of Julia. A Link basically includes a Channel and a Buffer. The mode of the buffer is Cyclic.(see Buffer Modes for information on buffer modes). Every item sent through a Link is sent through the channel of the Link and written to the Buffer so that all the data flowing through a Link is recorded.

The construction of a Link is very simple: just specify its buffer length and element type.

julia> using Causal # hide

julia> Link{Bool}(5)
Link(state:open, eltype:Bool, isreadable:false, iswritable:false)

julia> Link{Int}(10)
Link(state:open, eltype:Int64, isreadable:false, iswritable:false)

julia> Link(5)
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)

julia> Link()
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)

The data can be read from and written into Links if active tasks are bound to them. Links can be thought of like a pipe. In order to write data to a Link from one of its ends, a task that reads written data from the other end must be bounded to the Link. Similarly, in order to read data from one of the Link from one of its end, a task that writes the read data must be bound to the Link. Reading from and writing to Link is carried out with take! and put! functions. For more clarity, let us see some examples.

Let us first construct a Link,


julia> l = Link(5)
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)

l is a Link with a buffer length of 5 and element type of Float64. Not that the l is open, but it is not ready for data reading or writing. To write data, we must bound a task that reads the written data.

julia> function reader(link::Link)  # Define job.
           while true
               val = take!(link)
               val === NaN && break  # Poison-pill the tasks to terminate safely.
           end
       end
reader (generic function with 1 method)

julia> t = @async reader(l)
Task (runnable) @0x00007fee1b3fca90

The reader is defined such that the data written from one end of l is read until the data is NaN. Now, we have runnable a task t. This means the l is ready for data writing.

julia> put!(l, 1.)
1.0

julia> put!(l, 2.)
2.0

Note that the data flown through the l is written to its buffer.

julia> l.buffer
5-element Buffer{Cyclic, Float64, 1}:
 2.0
 1.0
 0.0
 0.0
 0.0

To terminate the task, we must write NaN to l.

julia> put!(l, NaN)  # Terminate the task
NaN

julia> t   # Show that the `t` is terminated.
Task (done) @0x00007fee1b3fca90

Whenever the bound task to the l is runnable, the data can be written to l. That is, the data length that can be written to l is not limited by the buffer length of l. But, beware that the buffer of Links is Cyclic. That means, when the buffer is full, its data is overwritten.

julia> l = Link(5)
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)

julia> t = @async reader(l)
Task (runnable) @0x00007fee1be9e4d0

julia> for item in 1. : 10.
           put!(l, item)
           @show outbuf(l.buffer)
       end
outbuf(l.buffer) = [1.0, 0.0, 0.0, 0.0, 0.0]
outbuf(l.buffer) = [2.0, 1.0, 0.0, 0.0, 0.0]
outbuf(l.buffer) = [3.0, 2.0, 1.0, 0.0, 0.0]
outbuf(l.buffer) = [4.0, 3.0, 2.0, 1.0, 0.0]
outbuf(l.buffer) = [5.0, 4.0, 3.0, 2.0, 1.0]
outbuf(l.buffer) = [6.0, 5.0, 4.0, 3.0, 2.0]
outbuf(l.buffer) = [7.0, 6.0, 5.0, 4.0, 3.0]
outbuf(l.buffer) = [8.0, 7.0, 6.0, 5.0, 4.0]
outbuf(l.buffer) = [9.0, 8.0, 7.0, 6.0, 5.0]
outbuf(l.buffer) = [10.0, 9.0, 8.0, 7.0, 6.0]

The case is very similar to read data from l. Again, a runnable task is bound the l


julia> l = Link(5)
Link(state:open, eltype:Float64, isreadable:false, iswritable:false)

julia> function writer(link::Link, vals)
           for val in vals
               put!(link, val)
           end
       end
writer (generic function with 1 method)

julia> t = @async writer(l, 1.:5.)
Task (runnable) @0x00007fee3ded2380

julia> bind(l, t)
Channel{Float64}(0) (1 item available)

julia> take!(l)
1.0

julia> take!(l)
2.0

It is possible to read data from l until t is active. To read all the data at once, collect can be used.

julia> t
Task (runnable) @0x00007fee3ded2380

julia> collect(l)
3-element Vector{Float64}:
 3.0
 4.0
 5.0

julia> t  # Show that `t` is terminated.
Task (done) @0x00007fee3ded2380

Full API

Causal.LinkType
mutable struct Link{T}

A Link connects an Outpin and Inpin for data flow. See Outpin, Inpin, connect!

Fields

  • buffer::Buffer{Cyclic, T, 1} where T

    Internal buffer to record data flowing throgh link

  • channel::Channel

    Internal channel for data flow

  • masterid::Base.UUID

    Unique identifier of the outpin of the source of the link

  • slaveid::Base.UUID

    Unique identifier fo the inpin of the destination of the link

  • id::Base.UUID

    Unique identifier

Example

julia> l = Link{Int}(5)
Link(state:open, eltype:Int64, isreadable:false, iswritable:false)

julia> l = Link{Bool}()
Link(state:open, eltype:Bool, isreadable:false, iswritable:false)
Base.bindMethod
bind(link, task)

Binds task to link. When task is done link is closed.

Base.collectMethod
collect(link)

Collects all the available data on the link.

Warning

To collect all available data from link, a task must be bounded to it.

Example

julia> l = Link();  # Construct a link.

julia> t = @async for item in 1 : 5  # Construct a task
       put!(l, item)
       end;

julia> bind(l, t);  # Bind it to the link.

julia> take!(l)  # Take element from link.
1.0

julia> take!(l)  # Take again ...
2.0

julia> collect(l)  # Collect remaining data.
3-element Array{Float64,1}:
 3.0
 4.0
 5.0
Base.isopenMethod
isopen(link)

Returns true if link is open. A link is open if its channel is open.

Base.isreadableMethod
isreadable(link)

Returns true if link is readable. When link is readable, data can be read from link with take function.

Base.iswritableMethod
iswritable(link)

Returns true if link is writable. When link is writable, data can be written into link with put function.

Base.put!Method
put!(link, val)

Puts val to link. val is handed over to the channel of link. val is also written in to the buffer of link.

Warning

link must be writable to put val. That is, a runnable task that takes items from the link must be bounded to link.

Example

julia> l = Link();

julia> t  = @async while true 
       item = take!(l)
       item === NaN && break 
       println("Took " * string(item))
       end;

julia> bind(l, t);

julia> put!(l, 1.)
Took 1.0
1.0

julia> put!(l, 2.)
Took 2.0
2.0

julia> put!(l, NaN)
NaN
Base.take!Method
take!(link)

Take an element from link.

Warning

link must be readable to take value. That is, a runnable task that puts items from the link must be bounded to link.

Example

julia> l = Link(5);

julia> t = @async for item in 1. : 5.
       put!(l, item)
       end;

julia> bind(l, t);

julia> take!(l)
1.0

julia> take!(l)
2.0
Causal.isfullMethod
isfull(link)

Returns true if the buffer of link is full.

Causal.launchMethod
launch(link, valrange)

Constructs a putter task and binds it to link. putter tasks puts the data in valrange.

Causal.launchMethod
launch(link)

Constructs a taker task and binds it to link. The taker task reads the data and prints an info message until missing is read from the link.

Causal.refresh!Method

Reconstructst the channel of link is its channel is closed.

Causal.snapshotMethod
snapshot(link)

Returns all the data of the buffer of link.