Event Latency

There is a built-in DelayQueue if you need a store Store with latency between put! and take! events. However here, we show you how you could have built one for yourself. If you modify this in order to construct a particularly useful type of latency store, please contribute it to the library through a pull request.

Description

In this example we show how to separate the time delay between processes from the processes themselves. We model a communications channel, called a Cable, where a sender sends messages regularly each SEND_PERIOD time units and a receiver listens each RECEIVE_PERIOD. The messages in the cable have a delay of DELAY_DURATION until they reach the receiver.

Load Packages

using ConcurrentSim
using ResumableFunctions
import Base: put!, take!

# output

Define Constants

const SIM_DURATION = 100.
const SEND_PERIOD = 5.0
const RECEIVE_PERIOD = 3.0;

nothing # hide

# output

Define Cable model

The Cable contains reference to the simulation it is part of, the delay that messages experience, and a store that contains the sent messages

mutable struct Cable
    env::Simulation
    delay::Float64
    store::Store{String}
    
    function Cable(env::Simulation, delay::Float64)
        return new(env, delay, Store{String}(env))
    end
end;

nothing # hide

# output

The latency function is a generator which yields two events: first a timeout that represents the transmission delay, then a put event when the message gets stored in the store.

@resumable function latency(env::Simulation, cable::Cable, value::String)
    @yield timeout(cable.env, cable.delay)
    @yield put!(cable.store, value)
end;

nothing # hide

# output

The put! and take! functions allow interaction with the cable (note that these are not @resumable because they need to return the result of the operation and not the operation itself).

function put!(cable::Cable, value::String)
    @process latency(cable.env, cable, value) # results in the scheduling of all events generated by latency
end

function take!(cable::Cable); output = false
    take!(cable.store) # returns an element stored in the cable store
end;

nothing # hide

# output

The sender and receiver generators yield events to the simulator.

@resumable function sender(env::Simulation, cable::Cable)
    while true
        @yield timeout(env, SEND_PERIOD)
        value = "sender sent this at $(now(env))"
        put!(cable, value)
    end
end

@resumable function receiver(env::Simulation, cable::Cable)
    while true
        @yield timeout(env, RECEIVE_PERIOD)
        msg = @yield take!(cable)
        println("Received this at $(now(env)) while $msg")
    end
end;

nothing # hide

# output

Create simulation, register events, and run!

env = Simulation()
cable = Cable(env, 10.)
@process sender(env, cable)
@process receiver(env, cable)

run(env, SIM_DURATION)

# output

Received this at 15.0 while sender sent this at 5.0
Received this at 20.0 while sender sent this at 10.0
Received this at 25.0 while sender sent this at 15.0
Received this at 30.0 while sender sent this at 20.0
Received this at 35.0 while sender sent this at 25.0
Received this at 40.0 while sender sent this at 30.0
Received this at 45.0 while sender sent this at 35.0
Received this at 50.0 while sender sent this at 40.0
Received this at 55.0 while sender sent this at 45.0
Received this at 60.0 while sender sent this at 50.0
Received this at 65.0 while sender sent this at 55.0
Received this at 70.0 while sender sent this at 60.0
Received this at 75.0 while sender sent this at 65.0
Received this at 80.0 while sender sent this at 70.0
Received this at 85.0 while sender sent this at 75.0
Received this at 90.0 while sender sent this at 80.0
Received this at 95.0 while sender sent this at 85.0