jlDistributedQuery

DistributedQuery Is meant to host large datasets in partitioned across multiple workers with queryable access. Idealy, worker will be remote worker with their own memory. has 3 major part, deployDataStore(), sentinel(), make_query_channels(), query_client(). The basic exicution is the following,

using Test
using Distributed
using ClusterManagers
using DistributedQuery

if Base.current_project() != nothing
    proj_path = joinpath(["/", split(Base.current_project(), "/")[1:end-1]...])
    p = addprocs(SlurmManager(3),
                 time="00:30:00",
                 exeflags="--project=$(proj_path)", ntasks_per_node=1)
else
    p = addprocs(SlurmManager(3),
                 time="00:30:00",
                 ntasks_per_node=1)
end

@everywhere using DistributedQuery
@everywhere using DataFrames
@everywhere using CSV
#proc_chan, data_chan = make_query_channels(p, [1], chan_depth::Int=5);

_shard_file_list = ["../mockData/iris_df_1.jlb",
                    "../mockData/iris_df_2.jlb",
                    "../mockData/iris_df_3.jlb"]

shard_file_list = [joinpath(dirname(pathof(DistributedQuery)), sf) for sf in _shard_file_list]
serialized_file_list = shard_file_list
data_worker_pool = p
proc_worker_pool = [myid()]
fut = DistributedQuery.deployDataStore(data_worker_pool, serialized_file_list)

test_res = [fetch(fut[p]) == @fetchfrom p DistributedQuery.DataContainer for p in data_worker_pool]

@test all(test_res)

if all(test_res)
    print("DistributedQuery.deployDataStore passed\n")
else
    print("DistributedQuery.deployDataStore Failed\n test_res: $(test_res)")
end


proc_chan, data_chan = DistributedQuery.make_query_channels(data_worker_pool, proc_worker_pool)

status_chan = RemoteChannel(()->Channel{Any}(10000), myid())

query_f = (data, column) -> data[:, column]

query_args = ["sepal_l"]

agrigate_f = (x...) -> sum(vcat(x...))

sentinel_fut =
    [@spawnat p DistributedQuery.sentinel(DistributedQuery.DataContainer,
                                          data_chan[myid()] ,proc_chan,
                                          status_chan)
     for p in data_worker_pool]




query_task = @async DistributedQuery.query_client(data_chan, proc_chan[1], agrigate_f, query_f, query_args...)

[take!(status_chan) for i in 1:1000 if isready(status_chan)]
[put!(v, "Done") for (k,v) in data_chan]
[wait(f) for f in sentinel_fut]
[take!(status_chan) for i in 1:1000 if isready(status_chan)]

sentinel_fut =
    [@spawnat p DistributedQuery.sentinel(DistributedQuery.DataContainer,
                                          data_chan[myid()] ,proc_chan,
                                          status_chan)
     for p in data_worker_pool]

query_task = @async DistributedQuery.query_client(data_chan, proc_chan[1], agrigate_f, query_f, query_args...)
query_timeout_in_s = 10
sleep(query_timeout_in_s)


if istaskdone(query_task)
    local_result = sum([sum(fetch(f[2])[:, query_args[1]]) for f in fut])
    query_result = fetch(query_task)
    @test local_result == query_result
else
    print("test query was not done in query_timeout_in_s: $query_timeout_in_s")
    @test false
end


#rmprocs(p);