Abaco
Abaco computes formulas output from a stream of metric values as soon as all needed input values are collected.
In a real world scenario values are coming asynchronously, delayed and out of orders. Abaco may manage values referring to different times.
Getting Started
Installation:
julia> using Pkg; Pkg.add("Abaco")
Usage:
using Abaco
# Initialize abaco context with a time_window of 60 seconds and handle
# input values with timestamp ts up to 4 contiguous spans.
abaco = abaco_init(interval=60, ages=4) do ts, node, formula_name, value, inputs
@info "[$ts][$node] function $formula_name=$value"
end
# Add desired outputs in terms of inputs variables x, y, z, v, w
outputs = ["xysum = x + y", "rsigma = x * exp(y-1)", "wsum = (x*w + z*v)"]
for formula_def in outputs
formula(abaco, formula_def)
end
# Start receiving some inputs values
# normally ts is the UTC timestamp from epoch in seconds.
# but for semplicity assume time start from zero.
# the node AG101 sends the x value at timestamp 0.
ts = 0
node = "AG101"
ingest(abaco, ts, node, "x", 10)
# Time flows and about 1 minute later ...
# the node CE987 sends the y value at timestamp 65.
ts = 65
node = "CE987"
ingest(abaco, ts, node, "y", 10)
# Time flows and more than 1 minute later ...
# Finally the node AG101 sends the y value calculated at timestamp 0.
# At this instant the formulas that depends on x and y are computable
# for the node AG101 at timestamp 0.
ts = 0
node = "AG101"
ingest(abaco, ts, node, "y", 20)
[ Info: [0][AG101] function xysum=30
[ Info: [0][AG101] function rsigma=1.7848230096318724e9
# Now arrives the variable x from CE987 that make some formulas computables.
# Note that x timestamp is 65, y timestamp is 101
# and the formulas timestamp is 60: the START_TIME of the span.
ts = 101
node = "CE987"
ingest(abaco, ts, node, "x", 10)
[ Info: [60][CE987] function xysum=20
[ Info: [60][CE987] function rsigma=81030.83927575384
In the formula callback the first argument may be a user defined object, like a Socket a Channel or whatsoever communication endpoint.
For example:
sock = connect(3001)
abaco = abaco_init(handle=sock, interval=900) do sock, ts, node, formula_name, value, inputs
@info "ts [$ts]: [$node] $formula_name = $value"
msg = JSON3.write(Dict(
"node" => node,
"age" => ts,
"formula" => formula_name,
"value" => value))
write(sock, msg*"\n")
end
Basic concepts walkthrough
A formula is named math expression defined by a string.
A node is a domain of values. Each node has a unique name identifier and it may be tagged.
Also a formula may be tagged: the tag binds a formula with a subset on nodes identified by the same tag.
# name expr
julia> formula(abaco, "my_formula", "x + y")
# tag
julia> formula(abaco, "my_formula", "x + y", "my_tag")
Ingested data has a name, a value, a timestamp and is associated with a node.
julia> ingest(abaco, # abaco instance
ts, # timestamp
"node_unique_name", # node name
"x", # metric name
100) # value
Values may also be ingested using a Dict{String,Any}
object with the following rule:
ne
and ts
are reserved dictionary keyword for node name and timestamp whereas the others entries are metrics values.
# A record with a single metric
julia> x_value = Dict(
"ne" => "my_network_element",
"ts" => 1642605647,
"x" => 1.5 # metric name => metric value
)
julia> y_value = Dict(
"ne" => "my_network_element",
"ts" => 1642605647,
"y" => 8.5
)
# A record with a batch of metrics:
julia> xyz_values = Dict(
"ne" => "my_network_element",
"ts" => 1642605647,
"x" => 1.5,
"y" => 25,
"z" => 999
)
julia> ingest(abaco, x_value)
julia> ingest(abaco, xyz_values)
Metrics names are the names of the variables used by Abaco formulas.
A node may be seen as a data source or a data fusion namespace: all variables that belong to a node are the inputs considered by the formulas tagged with the same value of the node tag.
As soon as all inputs variables are collected and belong to the same time window the formula result is calculated.
Using the above example as soon as both x
and y
are collected formula my_formula
is evaluated and onresult default callback is triggered.
The default callback print the result summary to the console.
julia> ingest(abaco, x_metric)
julia> ingest(abaco, y_metric)
my_formula(ts:1642605647, ne:my_network_element) = 10.0
A node may have a parent node: parent-child relationships and tags are powerful tools that enables aggregation and statistical functions:
# name tag
julia> aggregator = node(abaco, "my_region", "region")
# parent name tag
julia> node(abaco, aggregator, "city_1", "city")
julia> node(abaco, aggregator, "city_2", "city")
# tag name aggregator expr
julia> formula("region", "my_formula", "mean(city.x)")
time span
A time span includes all the timestamps in a time interval:
span = { t ∈ N | START_INTERVAL <= t < END_TIME }
Timestamps t
are integer values with second granularity.
For example suppose that a data collection system uses a 15 minutes span interval: in this case an hour is divided into 4 intervals and from ten to eleven of some (omitted) day you have:
- span1 = { t ∈ [10:00:00, 10:15:00) }
- span2 = { t ∈ [10:15:00, 10:30:00) }
- span3 = { t ∈ [10:30:00, 10:45:00) }
- span4 = { t ∈ [10:45:00, 11:00:00) }
By convention the span interval is identified by its START_TIME.
The formula value computed from inputs with timestamps included into the span [START_TIME, END_TIME)
has timestamp equal to START_TIME
.
The width
of the span interval is an abaco setting, user-defined at startup.
ages
The number of ages
defines how many consecutive time spans are managed.
For example, for a time span of 15 minutes, set ages
to 4 if your network devices may send data with a maximum delay of an hour. A received value marked with a timestamp distant 4 or more spans from the latest span is discarded.
The number of ages
is an abaco setting, user-defined at startup.
This is the minimal background theory, the below example should help to clarify the Abaco mechanics.
API
Abaco.Context
Abaco.EvalError
Abaco.Snap
Abaco.SnapsSetting
Abaco.ValueNotFound
Abaco.WrongFormula
Abaco.abaco_init
Abaco.add_formulas
Abaco.dependents
Abaco.formula
Abaco.formula
Abaco.get_values
Abaco.getsnap
Abaco.ingest
Abaco.ingest
Abaco.ingest!
Abaco.last_point
Abaco.last_value
Abaco.lastspan
Abaco.nowts
Abaco.snap_add
Abaco.span
Abaco.Context
— TypeThe abaco registry.
Abaco.EvalError
— TypeFormula evaluation failure.
[ingest
] throws EvalError when a runtime formula evaluation fails, for example for a wrong numbers of method args:
formula(abaco, "div(x,y,z")
ingest(abaco, ts, ne, Dict("x"=>10, "y"=>1, "z"=1))
Abaco.Snap
— TypeMaintains the state of the abaco.
Before adding formulas and values an abaco MonoContext
must be initialized by abaco_init
.
Abaco.SnapsSetting
— TypeThe settings of snapshots.
Before adding formulas and values the SnapsSetting
must be initialized by abaco_init
.
Abaco.ValueNotFound
— TypeAttempt to get a value with an invalid index.
Abaco.WrongFormula
— TypeWrong formula definition.
formula
throws WrongFormula
when a formula is malformed, for example:
formula(abaco, "myformula = x + ")
Abaco.abaco_init
— Methodabaco_init(onresult; handle=nothing, interval::Int=900, ages::Int=4, emitone=true)::Context
Initialize the abaco context:
onresult
: function callback that gets called each time a formula value is computed.handle
: user defined object. If handle is defined it is the first argument ofonresult
, default tonothing
.interval
: the span interval in seconds, default to 900 seconds (15 minutes). If interval is equal to -1 there is just one infinite time span.ages
: the number of active rops managed by the abaco. Default to 4. Ifinterval
is equal to -1ages
is not applicable because it loses meaning.emitone
: iftrue
emits for each time span at most 1 formula value, otherwise emits a new result at every new inputs. Defaut totrue
Example 1: defining onresult
callback that uses of an handle object.
# the handle object is a socket
sock = connect(3001)
function onresult(handle, ts, ne, formula_name, value, inputs)
# build a pkt message from ts, ne, ...
pkt = ...
write(sock, pkt)
end
Example 2: defining onresult
callback that doesn't use an handle object.
abaco = abaco_init(onresult, handle=sock)
function onresult(ts, ne, formula_name, value, inputs)
@info "[$ts][$ne] function $fname=$value"
end
abaco = abaco_init(onresult)
Abaco.add_formulas
— MethodAbaco.dependents
— Methoddependents(abaco::Context, tag::String, var::String)
Returns the list of expressions that depends on var
.
Abaco.formula
— Methodformula(setting::SnapsSetting, name, expression)
Add the formula name
defined by expression
: a mathematical expression like x + y*w
.
Abaco.formula
— Methodformula(setting::SnapsSetting, formula_def::String)
Add a formula, with formula_def
formatted as "formula_name = expression"
, where expression is a mathematical expression, like x + y*w
.
Abaco.get_values
— Methodget_values(abaco, ne::String, var::String)::Dict{Int,Float64}
Returns the ordered by time sequence of var
values for ne
node.
The returned values dictionary is ordered by descending time, most recent value first. The number of entries are at most equal to the value of ages.
Abaco.getsnap
— Methodgetsnap(abaco::Context, ts, ne)
Returns the ne
node snapshot relative to timestamp ts
.
Abaco.ingest!
— Methodingest!(abaco, payload)
Adds the input variables included in the payload
dictionary.
The Dict msg
must contains the keys ts
and ne
and a numbers of other keys managed as input variables.
This function modifies the payload
dictionary: ne
and ts
keys are popped out.
payload = Dict(
"ts" => nowts(),
"ne" => "trento.castello",
"x" => 23.2,
"y" => 100
)
ingest!(abaco, ts, ne, payload)
Abaco.ingest
— Methodingest(abaco, ts, ne, values)
Adds the input variables include in the dictionary values
.
# now timestamp
ts = nowts()
# short name of network node
ne = "trento.castello"
values = Dict(
"x" => 23.2,
"y" => 100
)
ingest(abaco, ts, ne, values)
Abaco.ingest
— Methodingest(abaco, ts::Int, ne::String, var::String, val::Real)
Adds the input variable var
with value val
.
ts
: timestamp with seconds resolutionne
: scope namevar
: variable nameval
: variable value
Abaco.last_point
— Methodlast_point(abaco, ne::String, var::String)::Union{Nothing, Missing, Tuple{Int,Float64}}
Returns the most recent in time value for the ne
node metric var
as a tuple (time, value).
If the node ne
is unknown returns nothing
and if there are not values for metric var
returns missing
.
Abaco.last_value
— Methodlast_value(abaco, ne::String, var::String)::Union{Nothing, Missing, Float64}
Returns the most recent in time value for the ne
node metric var
If the node ne
is unknown returns nothing
and if there are not values for metric var
returns missing
.
Abaco.lastspan
— Functionlastspan(interval::Int64=900)::Int64
Returns the epoch start time of the last span. The last span is the nearest in the present time interval that satisfies the condition: span.endtime < now
.
Abaco.nowts
— Functionnowts()
the current timestamp in seconds from epoch
Abaco.snap_add
— Methodsnap_add(snap::Snap, ne, var::String, val::Real)
Adds the variable value of ne
node to the snap
snapshot.
Abaco.span
— Functionspan(ts::Int64, interval::Int64=900)
Returns the start_time of the time interval that contains ts
.