DTables.DTableType
DTable

Structure representing the distributed table based on Dagger.

The table is stored as a vector of Chunk structures which hold partitions of the table. That vector can also store Dagger.EagerThunk structures when an operation that modifies the underlying partitions was applied to it (currently only filter).

DTables.DTableMethod
DTable(table, chunksize; tabletype=nothing, interpartition_merges=true) -> DTable

Constructs a DTable using a Tables.jl compatible table input. It assumes no initial partitioning of the table and uses the chunksize argument to partition the table (based on row count).

Providing tabletype kwarg overrides the internal table partition type.

Using the interpartition_merges kwarg you can decide whether you want to opt out of merging rows between partitions. This option is enabled by default, which means it will prioritize creating chunks of the specified size even if it means taking rows from two or more partitions. When disabled there won't be any merges between partitions meaning several chunks can be smaller than expected due to shortage of rows within a partition. Please see tests for examples of behaviour.

DTables.DTableMethod
DTable(loader_function, files::Vector{String}; tabletype=nothing)

Constructs a DTable using a list of filenames and a loader_function. Partitioning is based on the contents of the files provided, which means that one file is used to create one partition.

Providing tabletype kwarg overrides the internal table partition type.

DTables.DTableMethod
DTable(table; tabletype=nothing) -> DTable

Constructs a DTable using a Tables.jl-compatible input table. Calls partitions on table and assumes the provided partitioning.

DTables.GDTableType
GDTable

Structure representing a grouped DTable. It wraps over a DTable object and provides additional information on how the table is grouped. To represent the grouping a cols field is used, which contains the column symbols used for grouping and an index, which allows to effectively lookup the partitions grouped under a single key.

Base.fetchMethod
fetch(d::DTable, sink)

Collects all the chunks in the DTable into a single, non-distributed instance of table type created using the provided sink function.

Base.fetchMethod
fetch(d::DTable)

Collects all the chunks in the DTable into a single, non-distributed instance of the underlying table type.

Fetching an empty DTable results in returning an empty NamedTuple regardless of the underlying tabletype.

Base.filterMethod
filter(f, gd::GDTable) -> GDTable

Filter 'gd' using 'f', returning a filtered GDTable. Calling trim! on a filtered GDTable will clean up the empty keys and partitions.

Examples

julia> g = DTables.groupby(DTable((a=repeat('a':'d', inner=2),b=1:8), 2), :a)
GDTable with 4 partitions and 4 keys
Tabletype: NamedTuple
Grouped by: [:a]

julia> f = filter(x -> x.a ∈ ['a', 'b'], g)
GDTable with 4 partitions and 4 keys
Tabletype: NamedTuple
Grouped by: [:a]

julia> fetch(f)
(a = ['a', 'a', 'b', 'b'], b = [1, 2, 3, 4])

julia> trim!(f)
GDTable with 2 partitions and 2 keys
Tabletype: NamedTuple
Grouped by: [:a]
Base.filterMethod
filter(f, d::DTable) -> DTable

Filter d using f. Returns a filtered DTable that can be processed further.

Examples

julia> d = DTable((a = [1, 2, 3], b = [1, 1, 1]), 2);

julia> f = filter(x -> x.a < 3, d)
DTable with 2 partitions
Tabletype: NamedTuple

julia> fetch(f)
(a = [1, 2], b = [1, 1])

julia> f = filter(x -> (x.a < 3) .& (x.b > 0), d)
DTable with 2 partitions
Tabletype: NamedTuple

julia> fetch(f)
(a = [1, 2], b = [1, 1])
Base.getindexMethod
getindex(gdt::GDTable, key) -> DTable

Retrieves a DTable from gdt with rows belonging to the provided grouping key.

Base.keysMethod
keys(gd::GDTable) -> KeySet

Returns the keys that gd is grouped by.

Base.mapMethod
map(f, gd::GDTable) -> GDTable

Applies f to each row of gd. The applied function needs to return a Tables.Row compatible object (e.g. NamedTuple).

Examples

julia> g = DTables.groupby(DTable((a=repeat('a':'c', inner=2),b=1:6), 2), :a)
GDTable with 3 partitions and 3 keys
Tabletype: NamedTuple
Grouped by: [:a]

julia> m = map(r -> (a = r.a, b = r.b, c = r.a + r.b), g)
GDTable with 3 partitions and 3 keys
Tabletype: NamedTuple
Grouped by: [:a]

julia> fetch(m)
(a = ['a', 'a', 'c', 'c', 'b', 'b'], b = [1, 2, 5, 6, 3, 4], c = ['b', 'c', 'h', 'i', 'e', 'f'])
Base.mapMethod
map(f, d::DTable) -> DTable

Applies f to each row of d. The applied function needs to return a Tables.Row compatible object (e.g. NamedTuple).

Examples

julia> d = DTable((a = [1, 2, 3], b = [1, 1, 1]), 2);

julia> m = map(x -> (r = x.a + x.b,), d)
DTable with 2 partitions
Tabletype: NamedTuple

julia> fetch(m)
(r = [2, 3, 4],)

julia> m = map(x -> (r1 = x.a + x.b, r2 = x.a - x.b), d)
DTable with 2 partitions
Tabletype: NamedTuple

julia> fetch(m)
(r1 = [2, 3, 4], r2 = [0, 1, 2])
Base.reduceMethod
reduce(f, gd::GDTable; cols=nothing, prefix="result_", [init]) -> Dagger.EagerThunk -> NamedTuple

Reduces gd using function f applied on all columns of the DTable. Returns results per group in columns with names prefixed with the prefix kwarg. For more information on kwargs see reduce(f, d::DTable)

Examples

julia> g = DTables.groupby(DTable((a=repeat('a':'d', inner=2),b=1:8), 2), :a)
GDTable with 4 partitions and 4 keys
Tabletype: NamedTuple
Grouped by: [:a]

julia> fetch(reduce(*, g))
(a = ['a', 'c', 'd', 'b'], result_a = ["aa", "cc", "dd", "bb"], result_b = [2, 30, 56, 12])
Base.reduceMethod
reduce(f, d::DTable; cols=nothing, [init]) -> NamedTuple

Reduces d using function f applied on all columns of the DTable.

By providing the kwarg cols as a Vector{Symbol} object it's possible to restrict the reduction to the specified columns. The reduced values are provided in a NamedTuple under names of reduced columns.

For the init kwarg please refer to Base.reduce documentation, as it follows the same principles.

Examples

julia> d = DTable((a = [1, 2, 3], b = [1, 1, 1]), 2);

julia> r1 = reduce(+, d)
Dagger.EagerThunk (running)

julia> fetch(r1)
(a = 6, b = 3)

julia> r2 = reduce(*, d, cols=[:a])
Dagger.EagerThunk (running)

julia> fetch(r2)
(a = 6,)
DTables._groupbyMethod
_groupby(d::DTable, row_function::Function, cols::Union{Nothing, Vector{Symbol}}, merge::Bool, chunksize::Int)

Internal function for performing the groupby steps based on common arguments.

DTables._joinMethod
_join(type::Symbol, l_chunk, r; kwargs...)

Low level join method for DTable joins using the generic implementation. It joins an l_chunk with r assuming r is a continuous table.

DTables._joinMethod
_join(type::Symbol, l_chunk, r::DTable; kwargs...)

Low level join method for DTable joins using the generic implementation. It joins an l_chunk with r assuming r is a DTable. In this case the join is split into multiple joins of l_chunk with each chunk of r and a final merge operation.

DTables.build_groupby_indexMethod
build_groupby_index(merge::Bool, chunksize::Int, tabletype, vs...)

Takes the intermediate result of distinct_partitions and builds an index. Merges partitions if possible according to the chunksize provided. It will only merge chunks if their length after merging is <= chunksize. It doesn't split chunks larger than chunksize and small chunks may be leftover after merging if no appropriate pair was found.

DTables.build_joined_tableMethod
build_joined_table(jointype, names, l, r, inner_l, inner_r, outer_l, other_r)

Takes the indices of matching rows (inner*) and the ones that weren't matched (outer_l) from the l table and builds the result based on that.

Uses all the columns from the left column and the other_r columns from the right table.

DTables.distinct_partitionsMethod
distinct_partitions(chunk, f::Function)

Takes a partition and groups its rows according based on the key value returned by f.

DTables.grouped_colsMethod
grouped_cols(gd::GDTable) -> Vector{Symbol}

Returns the symbols of columns used in the grouping. In case grouping on a function was performed a :KEYS symbol will be returned.

DTables.match_inner_indicesMethod
match_inner_indices(l, r, cmp_l, cmp_r, lookup, r_sorted, l_sorted, r_unique)

Function responsible for picking the optimal method of joining inner indices depending on the additional information about the tables provided by the user.

DTables.match_inner_indicesMethod
match_inner_indices(l, r, l_ind::NTuple{N,Int}, r_ind::NTuple{N,Int})

Returns two vectors containing indices of matched rows. Standard non-optimized use case.

DTables.match_inner_indices_lookupMethod
match_inner_indices_lookup(l, lookup, l_ind::NTuple{N,Int})

Returns two vectors containing indices of matched rows. Uses lookup to find the matching indices.

lookup needs to be a dict-like structure that contains keys in form of a Tuple of all matching columns and values in form of type Vector{UInt} containing the related row indices.

DTables.match_inner_indices_lsorted_rsortedMethod
match_inner_indices_lsorted_rsorted(l, r, cmp_l::NTuple{N,Int}, cmp_r::NTuple{N,Int}, runique::Bool)

Returns two vectors containing indices of matched rows. Optimized pass for the left table sorted, right table sorted and optionally right table only containing unique keys.

DTables.match_inner_indices_rsortedMethod
match_inner_indices_rsorted(l, r, cmp_l::NTuple{N,Int}, cmp_r::NTuple{N,Int})

Returns two vectors containing indices of matched rows. Optimized pass for joins with a sorted right table.

DTables.match_inner_indices_runiqueMethod
match_inner_indices_runique(l, r, cmp_l::NTuple{N,Int}, cmp_r::NTuple{N,Int})

Returns two vectors containing indices of matched rows. Optimized pass for joins with the right table containing unique keys only.

DTables.tabletype!Method
tabletype!(gd::GDTable)

Provides the type of the underlying table partition and caches it in gd.

In case the tabletype cannot be obtained the default return value is NamedTuple.

DTables.tabletype!Method
tabletype!(d::DTable)

Provides the type of the underlying table partition and caches it in d.

In case the tabletype cannot be obtained the default return value is NamedTuple.

DTables.tabletypeMethod
tabletype(gd::GDTable)

Provides the type of the underlying table partition. Uses the cached tabletype if available.

In case the tabletype cannot be obtained the default return value is NamedTuple.

DTables.tabletypeMethod
tabletype(d::DTable)

Provides the type of the underlying table partition. Uses the cached tabletype if available.

In case the tabletype cannot be obtained the default return value is NamedTuple.

DTables.trim!Method
trim!(gd::GDTable) -> GDTable

Removes empty chunks from gd and unused keys from its index.

DTables.trim!Method
trim!(d::DTable) -> DTable

Removes empty chunks from d.

DTables.trimMethod
trim(gd::GDTable) -> GDTable

Returns gd with empty chunks and keys removed.

DTables.trimMethod
trim(d::DTable) -> DTable

Returns d with empty chunks removed.

DTables.use_dataframe_joinMethod
use_dataframe_join(d1type, d2type)

Determines whether to use the DataAPI join function, which leads to usage of DataFrames join function if both types are DataFrame. Remove this function and it's usage once a generic Tables.jl compatible join function becomes available. Porting the Dagger join functions to TableOperations is an option to achieve that.

DataAPI.innerjoinMethod
innerjoin(d1::DTable, d2; on=nothing, l_sorted=false, r_sorted=false, r_unique=false, lookup=nothing)

Perform an inner join of d1 with any Tables.jl compatible table type. Returns a DTable with the result.

If the underlying table type happens to have a innerjoin implementation and none of the below DTable related kwargs will be provided the specialized function will be used. A good example of that is calling innerjoin on a DTable with a DataFrame underlying type and a d2 of DataFrame type.

Keyword arguments

  • on: Column symbols to join on. Can be provided as a symbol or a pair of symbols in case the column names differ. For joins on multiple columns a vector of the previously mentioned can be provided.
  • l_sorted: To indicate the left table is sorted - only useful if the r_sorted is set to true as well.
  • r_sorted: To indicate the right table is sorted.
  • r_unique: To indicate the right table only contains unique keys.
  • lookup: To provide a dict-like structure that will allow for direct matching of inner rows. The structure needs to contain keys in form of a Tuple and values in form of type Vector{UInt} containing the related row indices.
DataAPI.leftjoinMethod
leftjoin(d1::DTable, d2; on=nothing, l_sorted=false, r_sorted=false, r_unique=false, lookup=nothing)

Perform a left join of d1 with any Tables.jl compatible table type. Returns a DTable with the result.

If the underlying table type happens to have a leftjoin implementation and none of the below DTable related kwargs will be provided the specialized function will be used. A good example of that is calling leftjoin on a DTable with a DataFrame underlying type and a d2 of DataFrame type.

Keyword arguments

  • on: Column symbols to join on. Can be provided as a symbol or a pair of symbols in case the column names differ. For joins on multiple columns a vector of the previously mentioned can be provided.
  • l_sorted: To indicate the left table is sorted - only useful if the r_sorted is set to true as well.
  • r_sorted: To indicate the right table is sorted.
  • r_unique: To indicate the right table only contains unique keys.
  • lookup: To provide a dict-like structure that will allow for direct matching of inner rows. The structure needs to contain keys in form of a Tuple and values in form of type Vector{UInt} containing the related row indices.
DataFrames.groupbyMethod
groupby(d::DTable, f::Function; merge=true, chunksize=0)

Groups d by the distinct set of keys created by applying f to each row in d.

For kwargs usage details see groupby(d::DTable, col::Symbol)

julia> d = DTable((a=shuffle(repeat('a':'d', inner=4, outer=4)),b=repeat(1:4, 16)), 4)
DTable with 16 partitions
Tabletype: NamedTuple

julia> function group_fun(row)
           row.a + row.b
       end
group_fun (generic function with 1 method)

julia> DTables.groupby(d, group_fun)
GDTable with 7 partitions and 7 keys
Tabletype: NamedTuple
Grouped by: group_fun

julia> DTables.groupby(d, row -> row.a + row.b, chunksize=3)
GDTable with 25 partitions and 7 keys
Tabletype: NamedTuple
Grouped by: group_fun

julia> DTables.groupby(d, row -> row.a + row.b, merge=false)
GDTable with 52 partitions and 7 keys
Tabletype: NamedTuple
Grouped by: group_fun
DataFrames.groupbyMethod
groupby(d::DTable, col::Symbol; merge=true, chunksize=0) -> GDTable

Groups d by distinct values of column col.

The process of grouping can be affected by providing kwargs merge and chunksize. By default all the chunks belonging to a single key will be merged into a single partition. Providing a positive value in chunksize will attempt to merge the smaller partitions into partitions not bigger than chunksize. Please note that partitions bigger than chunksize will not be split into partitions of chunksize. Merging can be disabled completely by providing merge=false.

Examples

julia> d = DTable((a=shuffle(repeat('a':'d', inner=4, outer=4)),), 4)
DTable with 16 partitions
Tabletype: NamedTuple

julia> DTables.groupby(d, :a)
GDTable with 4 partitions and 4 keys
Tabletype: NamedTuple
Grouped by: [:a]

julia> DTables.groupby(d, :a, chunksize=3)
GDTable with 24 partitions and 4 keys
Tabletype: NamedTuple
Grouped by: [:a]

julia> DTables.groupby(d, :a, merge=false)
GDTable with 42 partitions and 4 keys
Tabletype: NamedTuple
Grouped by: [:a]
DataFrames.groupbyMethod
groupby(d::DTable, cols::Vector{Symbol}; merge=true, chunksize=0)

Groups the d by distinct values of columns cols. The key is constructed by creating a NamedTuple from each row based on cols provided.

For kwargs usage details see groupby(d::DTable, col::Symbol)

Examples

julia> d = DTable((a=shuffle(repeat('a':'d', inner=4, outer=4)),b=repeat(1:4, 16)), 4)
DTable with 16 partitions
Tabletype: NamedTuple

julia> DTables.groupby(d, [:a,:b])
GDTable with 16 partitions and 16 keys
Tabletype: NamedTuple
Grouped by: [:a, :b]

julia> DTables.groupby(d, [:a,:b], chunksize=3)
GDTable with 27 partitions and 16 keys
Tabletype: NamedTuple
Grouped by: [:a, :b]

julia> DTables.groupby(d, [:a,:b], merge=false)
GDTable with 64 partitions and 16 keys
Tabletype: NamedTuple
Grouped by: [:a, :b]
DataFrames.selectMethod
select(df::DTable, args...; copycols::Bool=true, renamecols::Bool=true)

Create a new DTable that contains columns from df specified by args and return it. The result is guaranteed to have the same number of rows as df, except when no columns are selected (in which case the result has zero rows).

This operation is supposed to provide the same functionality and syntax as DataFrames.select, but for DTable input. Most cases should be covered and the output should be exactly the same as one obtained using DataFrames. In case of output differences or args causing errors please file an issue with reproduction steps and data.

Please refer to DataFrames documentation for more details on usage.