# Abaco

Abaco computes formulas output from a stream of metric values as soon as all needed input values are collected.

In a real world scenario values are coming asynchronously, delayed and out of orders. Abaco may manage values referring to different times.

## Getting Started

Installation:

`julia> using Pkg; Pkg.add("Abaco") `

Usage:

```
using Abaco
# Initialize abaco context with a time_window of 60 seconds and handle
# input values with timestamp ts up to 4 contiguous spans.
abaco = abaco_init(interval=60, ages=4) do ts, node, formula_name, value, inputs
@info "[$ts][$node] function $formula_name=$value"
end
# Add desired outputs in terms of inputs variables x, y, z, v, w
outputs = ["xysum = x + y", "rsigma = x * exp(y-1)", "wsum = (x*w + z*v)"]
for formula_def in outputs
formula(abaco, formula_def)
end
# Start receiving some inputs values
# normally ts is the UTC timestamp from epoch in seconds.
# but for semplicity assume time start from zero.
# the node AG101 sends the x value at timestamp 0.
ts = 0
node = "AG101"
ingest(abaco, ts, node, "x", 10)
# Time flows and about 1 minute later ...
# the node CE987 sends the y value at timestamp 65.
ts = 65
node = "CE987"
ingest(abaco, ts, node, "y", 10)
# Time flows and more than 1 minute later ...
# Finally the node AG101 sends the y value calculated at timestamp 0.
# At this instant the formulas that depends on x and y are computable
# for the node AG101 at timestamp 0.
ts = 0
node = "AG101"
ingest(abaco, ts, node, "y", 20)
[ Info: [0][AG101] function xysum=30
[ Info: [0][AG101] function rsigma=1.7848230096318724e9
# Now arrives the variable x from CE987 that make some formulas computables.
# Note that x timestamp is 65, y timestamp is 101
# and the formulas timestamp is 60: the START_TIME of the span.
ts = 101
node = "CE987"
ingest(abaco, ts, node, "x", 10)
[ Info: [60][CE987] function xysum=20
[ Info: [60][CE987] function rsigma=81030.83927575384
```

In the formula callback the first argument may be a user defined object, like a Socket a Channel or whatsoever communication endpoint.

For example:

```
sock = connect(3001)
abaco = abaco_init(handle=sock, interval=900) do sock, ts, node, formula_name, value, inputs
@info "ts [$ts]: [$node] $formula_name = $value"
msg = JSON3.write(Dict(
"node" => node,
"age" => ts,
"formula" => formula_name,
"value" => value))
write(sock, msg*"\n")
end
```

## Basic concepts walkthrough

A formula is named math expression defined by a string.

A node is a domain of values. Each node has a unique name identifier and it may be tagged.

Also a formula may be tagged: the tag binds a formula with a subset on nodes identified by the same tag.

```
# name expr
julia> formula(abaco, "my_formula", "x + y")
# tag
julia> formula(abaco, "my_formula", "x + y", "my_tag")
```

Ingested data has a name, a value, a timestamp and is associated with a node.

```
julia> ingest(abaco, # abaco instance
ts, # timestamp
"node_unique_name", # node name
"x", # metric name
100) # value
```

Values may also be ingested using a `Dict{String,Any}`

object with the following rule:

`ne`

and `ts`

are reserved dictionary keyword for node name and timestamp whereas the others entries are metrics values.

```
# A record with a single metric
julia> x_value = Dict(
"ne" => "my_network_element",
"ts" => 1642605647,
"x" => 1.5 # metric name => metric value
)
julia> y_value = Dict(
"ne" => "my_network_element",
"ts" => 1642605647,
"y" => 8.5
)
# A record with a batch of metrics:
julia> xyz_values = Dict(
"ne" => "my_network_element",
"ts" => 1642605647,
"x" => 1.5,
"y" => 25,
"z" => 999
)
julia> ingest(abaco, x_value)
julia> ingest(abaco, xyz_values)
```

Metrics names are the names of the variables used by Abaco formulas.

A node may be seen as a data source or a data fusion namespace: all variables that belong to a node are the inputs considered by the formulas tagged with the same value of the node tag.

As soon as all inputs variables are collected and belong to the same time window the formula result is calculated.

Using the above example as soon as both `x`

and `y`

are collected formula `my_formula`

is evaluated and onresult default callback is triggered.

The default callback print the result summary to the console.

```
julia> ingest(abaco, x_metric)
julia> ingest(abaco, y_metric)
my_formula(ts:1642605647, ne:my_network_element) = 10.0
```

A node may have a parent node: parent-child relationships and tags are powerful tools that enables aggregation and statistical functions:

```
# name tag
julia> aggregator = node(abaco, "my_region", "region")
# parent name tag
julia> node(abaco, aggregator, "city_1", "city")
julia> node(abaco, aggregator, "city_2", "city")
# tag name aggregator expr
julia> formula("region", "my_formula", "mean(city.x)")
```

### time span

A time span includes all the timestamps in a time interval:

`span = { t ∈ N | START_INTERVAL <= t < END_TIME }`

Timestamps `t`

are integer values with second granularity.

For example suppose that a data collection system uses a 15 minutes span interval: in this case an hour is divided into 4 intervals and from ten to eleven of some (omitted) day you have:

- span1 = { t ∈ [10:00:00, 10:15:00) }
- span2 = { t ∈ [10:15:00, 10:30:00) }
- span3 = { t ∈ [10:30:00, 10:45:00) }
- span4 = { t ∈ [10:45:00, 11:00:00) }

By convention the span interval is identified by its START_TIME.

The formula value computed from inputs with timestamps included into the span `[START_TIME, END_TIME)`

has timestamp equal to `START_TIME`

.

The `width`

of the span interval is an abaco setting, user-defined at startup.

### ages

The number of `ages`

defines how many consecutive time spans are managed.

For example, for a time span of 15 minutes, set `ages`

to 4 if your network devices may send data with a maximum delay of an hour. A received value marked with a timestamp distant 4 or more spans from the latest span is discarded.

The number of `ages`

is an abaco setting, user-defined at startup.

This is the minimal background theory, the below example should help to clarify the Abaco mechanics.

## API

`Abaco.Context`

`Abaco.EvalError`

`Abaco.Snap`

`Abaco.SnapsSetting`

`Abaco.ValueNotFound`

`Abaco.WrongFormula`

`Abaco.abaco_init`

`Abaco.add_formulas`

`Abaco.dependents`

`Abaco.formula`

`Abaco.formula`

`Abaco.get_values`

`Abaco.getsnap`

`Abaco.ingest`

`Abaco.ingest`

`Abaco.ingest!`

`Abaco.last_point`

`Abaco.last_value`

`Abaco.lastspan`

`Abaco.nowts`

`Abaco.snap_add`

`Abaco.span`

`Abaco.Context`

— TypeThe abaco registry.

`Abaco.EvalError`

— TypeFormula evaluation failure.

[`ingest`

] throws EvalError when a runtime formula evaluation fails, for example for a wrong numbers of method args:

```
formula(abaco, "div(x,y,z")
ingest(abaco, ts, ne, Dict("x"=>10, "y"=>1, "z"=1))
```

`Abaco.Snap`

— TypeMaintains the state of the abaco.

Before adding formulas and values an abaco `MonoContext`

must be initialized by `abaco_init`

.

`Abaco.SnapsSetting`

— TypeThe settings of snapshots.

Before adding formulas and values the `SnapsSetting`

must be initialized by `abaco_init`

.

`Abaco.ValueNotFound`

— TypeAttempt to get a value with an invalid index.

`Abaco.WrongFormula`

— TypeWrong formula definition.

`formula`

throws `WrongFormula`

when a formula is malformed, for example:

`formula(abaco, "myformula = x + ")`

`Abaco.abaco_init`

— Method`abaco_init(onresult; handle=nothing, interval::Int=900, ages::Int=4, emitone=true)::Context`

Initialize the abaco context:

`onresult`

: function callback that gets called each time a formula value is computed.`handle`

: user defined object. If handle is defined it is the first argument of`onresult`

, default to`nothing`

.`interval`

: the span interval in seconds, default to 900 seconds (15 minutes). If interval is equal to -1 there is just one infinite time span.`ages`

: the number of active rops managed by the abaco. Default to 4. If`interval`

is equal to -1`ages`

is not applicable because it loses meaning.`emitone`

: if`true`

emits for each time span at most 1 formula value, otherwise emits a new result at every new inputs. Defaut to`true`

Example 1: defining `onresult`

callback that uses of an handle object.

```
# the handle object is a socket
sock = connect(3001)
function onresult(handle, ts, ne, formula_name, value, inputs)
# build a pkt message from ts, ne, ...
pkt = ...
write(sock, pkt)
end
```

Example 2: defining `onresult`

callback that doesn't use an handle object.

```
abaco = abaco_init(onresult, handle=sock)
function onresult(ts, ne, formula_name, value, inputs)
@info "[$ts][$ne] function $fname=$value"
end
abaco = abaco_init(onresult)
```

`Abaco.add_formulas`

— Method`Abaco.dependents`

— Method`dependents(abaco::Context, tag::String, var::String)`

Returns the list of expressions that depends on `var`

.

`Abaco.formula`

— Method`formula(setting::SnapsSetting, name, expression)`

Add the formula `name`

defined by `expression`

: a mathematical expression like `x + y*w`

.

`Abaco.formula`

— Method`formula(setting::SnapsSetting, formula_def::String)`

Add a formula, with `formula_def`

formatted as `"formula_name = expression"`

, where expression is a mathematical expression, like `x + y*w`

.

`Abaco.get_values`

— Method`get_values(abaco, ne::String, var::String)::Dict{Int,Float64}`

Returns the ordered by time sequence of `var`

values for `ne`

node.

The returned values dictionary is ordered by descending time, most recent value first. The number of entries are at most equal to the value of ages.

`Abaco.getsnap`

— Method`getsnap(abaco::Context, ts, ne)`

Returns the `ne`

node snapshot relative to timestamp `ts`

.

`Abaco.ingest!`

— Method`ingest!(abaco, payload)`

Adds the input variables included in the `payload`

dictionary.

The Dict `msg`

must contains the keys `ts`

and `ne`

and a numbers of other keys managed as input variables.

This function modifies the `payload`

dictionary: `ne`

and `ts`

keys are popped out.

```
payload = Dict(
"ts" => nowts(),
"ne" => "trento.castello",
"x" => 23.2,
"y" => 100
)
ingest!(abaco, ts, ne, payload)
```

`Abaco.ingest`

— Method`ingest(abaco, ts, ne, values)`

Adds the input variables include in the dictionary `values`

.

```
# now timestamp
ts = nowts()
# short name of network node
ne = "trento.castello"
values = Dict(
"x" => 23.2,
"y" => 100
)
ingest(abaco, ts, ne, values)
```

`Abaco.ingest`

— Method`ingest(abaco, ts::Int, ne::String, var::String, val::Real)`

Adds the input variable `var`

with value `val`

.

`ts`

: timestamp with seconds resolution`ne`

: scope name`var`

: variable name`val`

: variable value

`Abaco.last_point`

— Method`last_point(abaco, ne::String, var::String)::Union{Nothing, Missing, Tuple{Int,Float64}}`

Returns the most recent in time value for the `ne`

node metric `var`

as a tuple (time, value).

If the node `ne`

is unknown returns `nothing`

and if there are not values for metric `var`

returns `missing`

.

`Abaco.last_value`

— Method`last_value(abaco, ne::String, var::String)::Union{Nothing, Missing, Float64}`

Returns the most recent in time value for the `ne`

node metric `var`

If the node `ne`

is unknown returns `nothing`

and if there are not values for metric `var`

returns `missing`

.

`Abaco.lastspan`

— Function`lastspan(interval::Int64=900)::Int64`

Returns the epoch start time of the last span. The last span is the nearest in the present time interval that satisfies the condition: `span.endtime < now`

.

`Abaco.nowts`

— Function`nowts()`

the current timestamp in seconds from epoch

`Abaco.snap_add`

— Method`snap_add(snap::Snap, ne, var::String, val::Real)`

Adds the variable value of `ne`

node to the `snap`

snapshot.

`Abaco.span`

— Function`span(ts::Int64, interval::Int64=900)`

Returns the start_time of the time interval that contains `ts`

.