DTables.DTable
— TypeDTable
Structure representing the distributed table based on Dagger.
The table is stored as a vector of Chunk
structures which hold partitions of the table. That vector can also store Dagger.EagerThunk
structures when an operation that modifies the underlying partitions was applied to it (currently only filter
).
DTables.DTable
— MethodDTable(table, chunksize; tabletype=nothing, interpartition_merges=true) -> DTable
Constructs a DTable
using a Tables.jl
compatible table
input. It assumes no initial partitioning of the table and uses the chunksize
argument to partition the table (based on row count).
Providing tabletype
kwarg overrides the internal table partition type.
Using the interpartition_merges
kwarg you can decide whether you want to opt out of merging rows between partitions. This option is enabled by default, which means it will prioritize creating chunks of the specified size even if it means taking rows from two or more partitions. When disabled there won't be any merges between partitions meaning several chunks can be smaller than expected due to shortage of rows within a partition. Please see tests for examples of behaviour.
DTables.DTable
— MethodDTable(loader_function, files::Vector{String}; tabletype=nothing)
Constructs a DTable
using a list of filenames and a loader_function
. Partitioning is based on the contents of the files provided, which means that one file is used to create one partition.
Providing tabletype
kwarg overrides the internal table partition type.
DTables.DTable
— MethodDTable(table; tabletype=nothing) -> DTable
Constructs a DTable
using a Tables.jl
-compatible input table
. Calls partitions
on table
and assumes the provided partitioning.
DTables.GDTable
— TypeGDTable
Structure representing a grouped DTable
. It wraps over a DTable
object and provides additional information on how the table is grouped. To represent the grouping a cols
field is used, which contains the column symbols used for grouping and an index
, which allows to effectively lookup the partitions grouped under a single key.
Base.fetch
— Methodfetch(d::DTable, sink)
Collects all the chunks in the DTable
into a single, non-distributed instance of table type created using the provided sink
function.
Base.fetch
— Methodfetch(d::DTable)
Collects all the chunks in the DTable
into a single, non-distributed instance of the underlying table type.
Fetching an empty DTable results in returning an empty NamedTuple
regardless of the underlying tabletype
.
Base.filter
— Methodfilter(f, gd::GDTable) -> GDTable
Filter 'gd' using 'f', returning a filtered GDTable
. Calling trim!
on a filtered GDTable
will clean up the empty keys and partitions.
Examples
julia> g = DTables.groupby(DTable((a=repeat('a':'d', inner=2),b=1:8), 2), :a)
GDTable with 4 partitions and 4 keys
Tabletype: NamedTuple
Grouped by: [:a]
julia> f = filter(x -> x.a ∈ ['a', 'b'], g)
GDTable with 4 partitions and 4 keys
Tabletype: NamedTuple
Grouped by: [:a]
julia> fetch(f)
(a = ['a', 'a', 'b', 'b'], b = [1, 2, 3, 4])
julia> trim!(f)
GDTable with 2 partitions and 2 keys
Tabletype: NamedTuple
Grouped by: [:a]
Base.filter
— Methodfilter(f, d::DTable) -> DTable
Filter d
using f
. Returns a filtered DTable
that can be processed further.
Examples
julia> d = DTable((a = [1, 2, 3], b = [1, 1, 1]), 2);
julia> f = filter(x -> x.a < 3, d)
DTable with 2 partitions
Tabletype: NamedTuple
julia> fetch(f)
(a = [1, 2], b = [1, 1])
julia> f = filter(x -> (x.a < 3) .& (x.b > 0), d)
DTable with 2 partitions
Tabletype: NamedTuple
julia> fetch(f)
(a = [1, 2], b = [1, 1])
Base.getindex
— Methodgetindex(gdt::GDTable, key) -> DTable
Retrieves a DTable
from gdt
with rows belonging to the provided grouping key.
Base.keys
— Methodkeys(gd::GDTable) -> KeySet
Returns the keys that gd
is grouped by.
Base.map
— Methodmap(f, gd::GDTable) -> GDTable
Applies f
to each row of gd
. The applied function needs to return a Tables.Row
compatible object (e.g. NamedTuple
).
Examples
julia> g = DTables.groupby(DTable((a=repeat('a':'c', inner=2),b=1:6), 2), :a)
GDTable with 3 partitions and 3 keys
Tabletype: NamedTuple
Grouped by: [:a]
julia> m = map(r -> (a = r.a, b = r.b, c = r.a + r.b), g)
GDTable with 3 partitions and 3 keys
Tabletype: NamedTuple
Grouped by: [:a]
julia> fetch(m)
(a = ['a', 'a', 'c', 'c', 'b', 'b'], b = [1, 2, 5, 6, 3, 4], c = ['b', 'c', 'h', 'i', 'e', 'f'])
Base.map
— Methodmap(f, d::DTable) -> DTable
Applies f
to each row of d
. The applied function needs to return a Tables.Row
compatible object (e.g. NamedTuple
).
Examples
julia> d = DTable((a = [1, 2, 3], b = [1, 1, 1]), 2);
julia> m = map(x -> (r = x.a + x.b,), d)
DTable with 2 partitions
Tabletype: NamedTuple
julia> fetch(m)
(r = [2, 3, 4],)
julia> m = map(x -> (r1 = x.a + x.b, r2 = x.a - x.b), d)
DTable with 2 partitions
Tabletype: NamedTuple
julia> fetch(m)
(r1 = [2, 3, 4], r2 = [0, 1, 2])
Base.reduce
— Methodreduce(f, gd::GDTable; cols=nothing, prefix="result_", [init]) -> Dagger.EagerThunk -> NamedTuple
Reduces gd
using function f
applied on all columns of the DTable. Returns results per group in columns with names prefixed with the prefix
kwarg. For more information on kwargs see reduce(f, d::DTable)
Examples
julia> g = DTables.groupby(DTable((a=repeat('a':'d', inner=2),b=1:8), 2), :a)
GDTable with 4 partitions and 4 keys
Tabletype: NamedTuple
Grouped by: [:a]
julia> fetch(reduce(*, g))
(a = ['a', 'c', 'd', 'b'], result_a = ["aa", "cc", "dd", "bb"], result_b = [2, 30, 56, 12])
Base.reduce
— Methodreduce(f, d::DTable; cols=nothing, [init]) -> NamedTuple
Reduces d
using function f
applied on all columns of the DTable.
By providing the kwarg cols
as a Vector{Symbol}
object it's possible to restrict the reduction to the specified columns. The reduced values are provided in a NamedTuple under names of reduced columns.
For the init
kwarg please refer to Base.reduce
documentation, as it follows the same principles.
Examples
julia> d = DTable((a = [1, 2, 3], b = [1, 1, 1]), 2);
julia> r1 = reduce(+, d)
Dagger.EagerThunk (running)
julia> fetch(r1)
(a = 6, b = 3)
julia> r2 = reduce(*, d, cols=[:a])
Dagger.EagerThunk (running)
julia> fetch(r2)
(a = 6,)
DTables._groupby
— Method_groupby(d::DTable, row_function::Function, cols::Union{Nothing, Vector{Symbol}}, merge::Bool, chunksize::Int)
Internal function for performing the groupby steps based on common arguments.
DTables._join
— Method_join(type::Symbol, l_chunk, r; kwargs...)
Low level join method for DTable
joins using the generic implementation. It joins an l_chunk
with r
assuming r
is a continuous table.
DTables._join
— Method_join(type::Symbol, l_chunk, r::DTable; kwargs...)
Low level join method for DTable
joins using the generic implementation. It joins an l_chunk
with r
assuming r
is a DTable
. In this case the join is split into multiple joins of l_chunk
with each chunk of r
and a final merge operation.
DTables.build_groupby_index
— Methodbuild_groupby_index(merge::Bool, chunksize::Int, tabletype, vs...)
Takes the intermediate result of distinct_partitions
and builds an index. Merges partitions if possible according to the chunksize
provided. It will only merge chunks if their length after merging is <= chunksize
. It doesn't split chunks larger than chunksize
and small chunks may be leftover after merging if no appropriate pair was found.
DTables.build_joined_table
— Methodbuild_joined_table(jointype, names, l, r, inner_l, inner_r, outer_l, other_r)
Takes the indices of matching rows (inner*
) and the ones that weren't matched (outer_l
) from the l
table and builds the result based on that.
Uses all the columns from the left column and the other_r
columns from the right table.
DTables.distinct_partitions
— Methoddistinct_partitions(chunk, f::Function)
Takes a partition and groups its rows according based on the key value returned by f
.
DTables.find_outer_indices
— Methodfind_outer_indices(d, inner_indices)
Finds the unmatched indices from the table.
DTables.grouped_cols
— Methodgrouped_cols(gd::GDTable) -> Vector{Symbol}
Returns the symbols of columns used in the grouping. In case grouping on a function was performed a :KEYS
symbol will be returned.
DTables.match_inner_indices
— Methodmatch_inner_indices(l, r, cmp_l, cmp_r, lookup, r_sorted, l_sorted, r_unique)
Function responsible for picking the optimal method of joining inner indices depending on the additional information about the tables provided by the user.
DTables.match_inner_indices
— Methodmatch_inner_indices(l, r, l_ind::NTuple{N,Int}, r_ind::NTuple{N,Int})
Returns two vectors containing indices of matched rows. Standard non-optimized use case.
DTables.match_inner_indices_lookup
— Methodmatch_inner_indices_lookup(l, lookup, l_ind::NTuple{N,Int})
Returns two vectors containing indices of matched rows. Uses lookup
to find the matching indices.
lookup
needs to be a dict-like structure that contains keys in form of a Tuple
of all matching columns and values in form of type Vector{UInt}
containing the related row indices.
DTables.match_inner_indices_lsorted_rsorted
— Methodmatch_inner_indices_lsorted_rsorted(l, r, cmp_l::NTuple{N,Int}, cmp_r::NTuple{N,Int}, runique::Bool)
Returns two vectors containing indices of matched rows. Optimized pass for the left table sorted, right table sorted and optionally right table only containing unique keys.
DTables.match_inner_indices_rsorted
— Methodmatch_inner_indices_rsorted(l, r, cmp_l::NTuple{N,Int}, cmp_r::NTuple{N,Int})
Returns two vectors containing indices of matched rows. Optimized pass for joins with a sorted right table.
DTables.match_inner_indices_runique
— Methodmatch_inner_indices_runique(l, r, cmp_l::NTuple{N,Int}, cmp_r::NTuple{N,Int})
Returns two vectors containing indices of matched rows. Optimized pass for joins with the right table containing unique keys only.
DTables.tabletype!
— Methodtabletype!(gd::GDTable)
Provides the type of the underlying table partition and caches it in gd
.
In case the tabletype cannot be obtained the default return value is NamedTuple
.
DTables.tabletype!
— Methodtabletype!(d::DTable)
Provides the type of the underlying table partition and caches it in d
.
In case the tabletype cannot be obtained the default return value is NamedTuple
.
DTables.tabletype
— Methodtabletype(gd::GDTable)
Provides the type of the underlying table partition. Uses the cached tabletype if available.
In case the tabletype cannot be obtained the default return value is NamedTuple
.
DTables.tabletype
— Methodtabletype(d::DTable)
Provides the type of the underlying table partition. Uses the cached tabletype if available.
In case the tabletype cannot be obtained the default return value is NamedTuple
.
DTables.trim!
— Methodtrim!(gd::GDTable) -> GDTable
Removes empty chunks from gd
and unused keys from its index.
DTables.trim!
— Methodtrim!(d::DTable) -> DTable
Removes empty chunks from d
.
DTables.trim
— Methodtrim(gd::GDTable) -> GDTable
Returns gd
with empty chunks and keys removed.
DTables.trim
— Methodtrim(d::DTable) -> DTable
Returns d
with empty chunks removed.
DTables.use_dataframe_join
— Methoduse_dataframe_join(d1type, d2type)
Determines whether to use the DataAPI join function, which leads to usage of DataFrames join function if both types are DataFrame
. Remove this function and it's usage once a generic Tables.jl compatible join function becomes available. Porting the Dagger join functions to TableOperations is an option to achieve that.
DataAPI.innerjoin
— Methodinnerjoin(d1::DTable, d2; on=nothing, l_sorted=false, r_sorted=false, r_unique=false, lookup=nothing)
Perform an inner join of d1
with any Tables.jl
compatible table type. Returns a DTable
with the result.
If the underlying table type happens to have a innerjoin
implementation and none of the below DTable
related kwargs will be provided the specialized function will be used. A good example of that is calling innerjoin
on a DTable
with a DataFrame
underlying type and a d2
of DataFrame
type.
Keyword arguments
on
: Column symbols to join on. Can be provided as a symbol or a pair of symbols in case the column names differ. For joins on multiple columns a vector of the previously mentioned can be provided.l_sorted
: To indicate the left table is sorted - only useful if ther_sorted
is set totrue
as well.r_sorted
: To indicate the right table is sorted.r_unique
: To indicate the right table only contains unique keys.lookup
: To provide a dict-like structure that will allow for direct matching of inner rows. The structure needs to contain keys in form of aTuple
and values in form of typeVector{UInt}
containing the related row indices.
DataAPI.leftjoin
— Methodleftjoin(d1::DTable, d2; on=nothing, l_sorted=false, r_sorted=false, r_unique=false, lookup=nothing)
Perform a left join of d1
with any Tables.jl
compatible table type. Returns a DTable
with the result.
If the underlying table type happens to have a leftjoin
implementation and none of the below DTable
related kwargs will be provided the specialized function will be used. A good example of that is calling leftjoin
on a DTable
with a DataFrame
underlying type and a d2
of DataFrame
type.
Keyword arguments
on
: Column symbols to join on. Can be provided as a symbol or a pair of symbols in case the column names differ. For joins on multiple columns a vector of the previously mentioned can be provided.l_sorted
: To indicate the left table is sorted - only useful if ther_sorted
is set totrue
as well.r_sorted
: To indicate the right table is sorted.r_unique
: To indicate the right table only contains unique keys.lookup
: To provide a dict-like structure that will allow for direct matching of inner rows. The structure needs to contain keys in form of aTuple
and values in form of typeVector{UInt}
containing the related row indices.
DataFrames.groupby
— Methodgroupby(d::DTable, f::Function; merge=true, chunksize=0)
Groups d
by the distinct set of keys created by applying f
to each row in d
.
For kwargs usage details see groupby(d::DTable, col::Symbol)
julia> d = DTable((a=shuffle(repeat('a':'d', inner=4, outer=4)),b=repeat(1:4, 16)), 4)
DTable with 16 partitions
Tabletype: NamedTuple
julia> function group_fun(row)
row.a + row.b
end
group_fun (generic function with 1 method)
julia> DTables.groupby(d, group_fun)
GDTable with 7 partitions and 7 keys
Tabletype: NamedTuple
Grouped by: group_fun
julia> DTables.groupby(d, row -> row.a + row.b, chunksize=3)
GDTable with 25 partitions and 7 keys
Tabletype: NamedTuple
Grouped by: group_fun
julia> DTables.groupby(d, row -> row.a + row.b, merge=false)
GDTable with 52 partitions and 7 keys
Tabletype: NamedTuple
Grouped by: group_fun
DataFrames.groupby
— Methodgroupby(d::DTable, col::Symbol; merge=true, chunksize=0) -> GDTable
Groups d
by distinct values of column col
.
The process of grouping can be affected by providing kwargs merge
and chunksize
. By default all the chunks belonging to a single key will be merged into a single partition. Providing a positive value in chunksize
will attempt to merge the smaller partitions into partitions not bigger than chunksize
. Please note that partitions bigger than chunksize
will not be split into partitions of chunksize
. Merging can be disabled completely by providing merge=false
.
Examples
julia> d = DTable((a=shuffle(repeat('a':'d', inner=4, outer=4)),), 4)
DTable with 16 partitions
Tabletype: NamedTuple
julia> DTables.groupby(d, :a)
GDTable with 4 partitions and 4 keys
Tabletype: NamedTuple
Grouped by: [:a]
julia> DTables.groupby(d, :a, chunksize=3)
GDTable with 24 partitions and 4 keys
Tabletype: NamedTuple
Grouped by: [:a]
julia> DTables.groupby(d, :a, merge=false)
GDTable with 42 partitions and 4 keys
Tabletype: NamedTuple
Grouped by: [:a]
DataFrames.groupby
— Methodgroupby(d::DTable, cols::Vector{Symbol}; merge=true, chunksize=0)
Groups the d
by distinct values of columns cols
. The key is constructed by creating a NamedTuple from each row based on cols
provided.
For kwargs usage details see groupby(d::DTable, col::Symbol)
Examples
julia> d = DTable((a=shuffle(repeat('a':'d', inner=4, outer=4)),b=repeat(1:4, 16)), 4)
DTable with 16 partitions
Tabletype: NamedTuple
julia> DTables.groupby(d, [:a,:b])
GDTable with 16 partitions and 16 keys
Tabletype: NamedTuple
Grouped by: [:a, :b]
julia> DTables.groupby(d, [:a,:b], chunksize=3)
GDTable with 27 partitions and 16 keys
Tabletype: NamedTuple
Grouped by: [:a, :b]
julia> DTables.groupby(d, [:a,:b], merge=false)
GDTable with 64 partitions and 16 keys
Tabletype: NamedTuple
Grouped by: [:a, :b]
DataFrames.select
— Methodselect(df::DTable, args...; copycols::Bool=true, renamecols::Bool=true)
Create a new DTable that contains columns from df
specified by args
and return it. The result is guaranteed to have the same number of rows as df, except when no columns are selected (in which case the result has zero rows).
This operation is supposed to provide the same functionality and syntax as DataFrames.select
, but for DTable input. Most cases should be covered and the output should be exactly the same as one obtained using DataFrames. In case of output differences or args
causing errors please file an issue with reproduction steps and data.
Please refer to DataFrames documentation for more details on usage.