DataFlowTasks.jl

Stable Dev Build Status codecov Lifecycle Aqua QA

DataFlowTasks.jl is a Julia package dedicated to parallel programming on multi-core shared memory CPUs. From user annotations (READ, WRITE, READWRITE) on program data, DataFlowTasks.jl automatically infers dependencies between parallel tasks.

This README is also available in notebook form: ipynb nbviewer

Installation

using Pkg
Pkd.add("https://github.com/maltezfaria/DataFlowTasks.jl.git")

Basic Usage

This package defines a @dspawn macro which behaves very much like Threads.@spawn, except that it allows the user to specify explicit data dependencies for the spawned Task. This information is then used to automatically infer task dependencies by constructing and analyzing a directed acyclic graph based on how tasks access the underlying data. The premise is that it is sometimes simpler to specify how tasks depend on data than to specify how tasks depend on each other.

When creating a Task using @dspawn, the following annotations can be used to declare how the Task accesses the data:

  • read-only: @R or @READ
  • write-only: @W or @WRITE
  • read-write: @RW or @READWRITE

An @R(A) annotation for example implies that A will be accessed in read-only mode by the task.

Let's look at a simple example:

using DataFlowTasks
A = Vector{Float64}(undef, 4)
result = let
    @dspawn fill!(@W(A), 0)           # task 1: accesses everything
    @dspawn @RW(view(A, 1:2)) .+= 2   # task 2: modifies the first half
    @dspawn @RW(view(A, 3:4)) .+= 3   # task 3: modifies the second half
    @dspawn @R(A)                     # task 4: get the result
end
fetch(result)

From annotations describing task-data dependencies, DataFlowTasks.jl infers dependencies between tasks. Internally, this set of dependencies is represented as a Directed Acyclic Graph. All the data needed to reconstruct the DAG (as well as the parallalel traces) can be collected using the @log macro:

log_info = DataFlowTasks.@log let
    @dspawn fill!(@W(A), 0)             label="write whole"
    @dspawn @RW(view(A, 1:2)) .+= 2     label="write 1:2"
    @dspawn @RW(view(A, 3:4)) .+= 3     label="write 3:4"
    res = @dspawn @R(A)                 label="read whole"
    fetch(res)
end

And the DAG can be visualized using GraphViz:

DataFlowTasks.stack_weakdeps_env!() # Set up a stacked environment so that weak dependencies such as GraphViz can be loaded. More about that hereafter.
using GraphViz # triggers additional code loading, powered by weak dependencies (julia >= 1.9)
dag = GraphViz.Graph(log_info)

In the example above, the tasks write 1:2 and write 3:4 access different parts of the array A and are therefore independent, as shown in the DAG.

Example : Parallel Cholesky Factorization

As a less contrived example, we illustrate below the use of DataFlowTasks to parallelize a tiled Cholesky factorization. The implementation shown here is delibarately made as simple as possible.

The Cholesky factorization algorithm takes a symmetric positive definite matrix A and finds a lower triangular matrix L such that A = LLᵀ. The tiled version of this algorithm decomposes the matrix A into tiles (of even sizes, in this simplified version). At each step of the algorithm, we do a Cholesky factorization on the diagonal tile, use a triangular solve to update all of the tiles at the right of the diagonal tile, and finally update all the tiles of the submatrix with a Schur complement.

If we have a matrix A decomposed in n x n tiles, then the algorithm will have n steps. The i-th step (with i ∈ [1:n]) will perform

  • 1 Cholesky factorization of the (i,i) block,
  • (i-1) triangular solves (one for each block in the i-th row),
  • i*(i-1)/2 matrix multiplications to update the submatrix.

The following image illustrates the 2nd step of the algorithm:

A sequential tiled factorization algorithm can be implemented as:

using LinearAlgebra

tilerange(ti, ts) = (ti-1)*ts+1:ti*ts

function cholesky_tiled!(A, ts)
    m = size(A, 1); @assert m==size(A, 2)
    m%ts != 0 && error("Tilesize doesn't fit the matrix")
    n = m÷ts  # number of tiles in each dimension

    T = [view(A, tilerange(i, ts), tilerange(j, ts)) for i in 1:n, j in 1:n]

    for i in 1:n
        # Diagonal Cholesky serial factorization
        cholesky!(T[i,i])

        # Left blocks update
        U = UpperTriangular(T[i,i])
        for j in i+1:n
            ldiv!(U', T[i,j])
        end

        # Submatrix update
        for j in i+1:n
            for k in j:n
                mul!(T[j,k], T[i,j]', T[i,k], -1, 1)
            end
        end
    end

    # Construct the factorized object
    return Cholesky(A, 'U', zero(LinearAlgebra.BlasInt))
end

Parallelizing the code with DataFlowTasks.jl is as easy as wrapping function calls within @dspawn, and adding annotations describing data access modes:

using DataFlowTasks

function cholesky_dft!(A, ts)
    m = size(A, 1); @assert m==size(A, 2)
    m%ts != 0 && error("Tilesize doesn't fit the matrix")
    n = m÷ts  # number of tiles in each dimension

    T = [view(A, tilerange(i, ts), tilerange(j, ts)) for i in 1:n, j in 1:n]

    for i in 1:n
        # Diagonal Cholesky serial factorization
        @dspawn cholesky!(@RW(T[i,i])) label="chol ($i,$i)"

        # Left blocks update
        U = UpperTriangular(T[i,i])
        for j in i+1:n
            @dspawn ldiv!(@R(U)', @RW(T[i,j])) label="ldiv ($i,$j)"
        end

        # Submatrix update
        for j in i+1:n
            for k in j:n
                @dspawn mul!(@RW(T[j,k]), @R(T[i,j])', @R(T[i,k]), -1, 1) label="schur ($j,$k)"
            end
        end
    end

    # Construct the factorized object
    r = @dspawn Cholesky(@R(A), 'U', zero(LinearAlgebra.BlasInt)) label="result"
    return fetch(r)
end

(Also note how extra annotations were added in the code, in order to attach meaningful labels to the tasks. These will later be useful to interpret the output of debugging & profiling tools.)

The code below shows how to use this cholesky_tiled! function, as well as how to profile the program and get information about how tasks were scheduled:

# DataFlowTasks environment setup

# Context
n  = 2048
ts = 512
A = rand(n, n)
A = (A + adjoint(A))/2
A = A + n*I;
# First run to trigger compilation
F = cholesky_dft!(copy(A), ts)

# Check results
err = norm(F.L*F.U-A,Inf)/max(norm(A),norm(F.L*F.U))

Debugging and Profiling

DataFlowTasks comes with debugging and profiling tools that help understanding how task dependencies were inferred, and how tasks were scheduled during execution.

As usual when profiling code, it is recommended to start from a state where all code has already been compiled, and all previous profiling information has been discarded:

# Manually call GC to avoid noise from previous runs
GC.gc()

# Profile the code and return a `LogInfo` object:
log_info = DataFlowTasks.@log cholesky_dft!(A ,ts);

Visualizing the DAG can be helpful. When debugging, this representation of dependencies between tasks as inferred by DataFlowTasks can help identify missing or erroneous data dependency annotations. When profiling, identifying the critical path (plotted in red in the DAG) can help understand the performances of the implementation.

In this more complex example, we can see how quickly the DAG complexity increases (even though the test case only has 4x4 blocks here):

dag = GraphViz.Graph(log_info)

The LogInfo object also contains all the data needed to profile the parallel application. A summary of the profiling information can be displayed in the REPL using the DataFlowTasks.describe function:

DataFlowTasks.describe(log_info; categories=["chol", "ldiv", "schur"])

but it is often more convenient to see this information in a graphical way. The parallel trace plot shows a timeline of the tasks execution on available threads. It helps in understanding how tasks were scheduled. The same window also carries other general information allowing to better understand the performance limiting factors:

using CairoMakie # or GLMakie in order to have more interactivity
trace = plot(log_info; categories=["chol", "ldiv", "schur"])

We see here that the execution time is bounded by the length of the critical path: with this block size and matrix size, the algorithm does not expose enough parallelism to occupy all threads without waiting periods.

We'll cover in detail the usage and possibilities of the visualization in the documentation.

Note that the debugging & profiling tools need additional dependencies such as Makie and GraphViz, which are only meant to be used interactively during the development process. These packages are therefore only considered as optional dependencies; assuming they are available in your work environment, calling e.g. using GraphViz will load some additional code from DataFlowTasks. If these dependencies are not directly available in the current environment stack, DataFlowTasks.stack_weakdeps_env!() can be called to push to the loading stack a new environment in which these optional dependencies are available.

Going further: examples and performance

The online documentation contains a variety of examples and benchmarks of applications where DataFlowTasks can be used to parallelize code. These include:

Each example comes with a notebook version, which can be downloaded and run locally: give it a try, and if DataFlowTasks is useful to you, please consider submitting your own example application!


This page was generated using Literate.jl.