Examples
Establishing a database connection
We use FunSQL to assemble SQL queries. To actually run these queries, we need a regular database library such as SQLite.jl, LibPQ.jl, MySQL.jl, or ODBC.jl.
In the following examples, we use a SQLite database containing a tiny sample of the CMS DE-SynPuf dataset. See the Usage Guide for the description of the database schema.
Download the database file.
const URL = "https://github.com/MechanicalRabbit/ohdsi-synpuf-demo/releases/download/20210412/synpuf-10p.sqlite"
const DB = download(URL)
Download the database file as an artifact.
using Pkg.Artifacts, LazyArtifacts
const DB = joinpath(artifact"synpuf-10p", "synpuf-10p.sqlite")
#-> ⋮
Create a SQLite connection object.
using SQLite
const conn = SQLite.DB(DB)
Importing FunSQL
FunSQL does not export any symbols by default. The following statement imports all available query constructors, a SQLTable
constructor, and the function render
.
using FunSQL:
Agg, Append, As, Asc, Bind, Define, Desc, Fun, From, Get, Group,
Highlight, Join, LeftJoin, Limit, Lit, Order, Partition, SQLTable,
Select, Sort, Var, Where, render
Database introspection (SQLite)
For each database table referenced in a query, we need to create a SQLTable
object encapsulating the name of the table and the list of the table columns.
SQLTable(:person, columns = [:person_id, :year_of_birth, :location_id])
Instead of creating SQLTable
objects manually, we could create them automatically by extracting the information about the available tables from the database itself. For SQLite, this could be done as follows.
using Tables
const introspect_sqlite_sql = """
SELECT NULL AS schema, sm.name, pti.name AS column
FROM sqlite_master sm, pragma_table_info(sm.name) pti
WHERE sm.type IN ('table', 'view') AND sm.name NOT LIKE 'sqlite_%'
ORDER BY sm.name
"""
introspect_sqlite(conn) =
DBInterface.execute(conn, introspect_sqlite_sql) |>
make_tables
function make_tables(res)
tables = SQLTable[]
schema = name = nothing
columns = Symbol[]
for (s, n, c) in Tables.rows(res)
s = s !== missing ? Symbol(s) : nothing
n = Symbol(n)
c = Symbol(c)
if s === schema && n === name
push!(columns, c)
else
if !isempty(columns)
t = SQLTable(schema = schema, name = name, columns = columns)
push!(tables, t)
end
schema = s
name = n
columns = [c]
end
end
if !isempty(columns)
t = SQLTable(schema = schema, name = name, columns = columns)
push!(tables, t)
end
return tables
end
const tables = introspect_sqlite(conn)
The vector tables
contains all the tables available in the database.
display(tables)
#=>
44-element Vector{SQLTable}:
SQLTable(:attribute_definition, …)
SQLTable(:care_site, …)
SQLTable(:cdm_source, …)
SQLTable(:cohort, …)
SQLTable(:cohort_ace, …)
SQLTable(:cohort_all, …)
SQLTable(:cohort_ami, …)
SQLTable(:cohort_ang, …)
SQLTable(:cohort_attribute, …)
SQLTable(:cohort_definition, …)
⋮
SQLTable(:procedure_cost, …)
SQLTable(:procedure_occurrence, …)
SQLTable(:provider, …)
SQLTable(:relationship, …)
SQLTable(:source_to_concept_map, …)
SQLTable(:specimen, …)
SQLTable(:visit_cost, …)
SQLTable(:visit_occurrence, …)
SQLTable(:vocabulary, …)
=#
It is convenient to add the SQLTable
objects to the global scope.
for t in tables
@eval const $(t.name) = $t
end
display(person)
#=>
SQLTable(:person,
columns = [:person_id,
:gender_concept_id,
:year_of_birth,
:month_of_birth,
:day_of_birth,
:time_of_birth,
:race_concept_id,
:ethnicity_concept_id,
:location_id,
:provider_id,
:care_site_id,
:person_source_value,
:gender_source_value,
:gender_source_concept_id,
:race_source_value,
:race_source_concept_id,
:ethnicity_source_value,
:ethnicity_source_concept_id])
=#
Alternatively, we could encapsulate all SQLTable
objects in a NamedTuple
.
const db = NamedTuple([t.name => t for t in tables])
display(db.person)
#=>
SQLTable(:person,
columns = [:person_id,
:gender_concept_id,
:year_of_birth,
:month_of_birth,
:day_of_birth,
:time_of_birth,
:race_concept_id,
:ethnicity_concept_id,
:location_id,
:provider_id,
:care_site_id,
:person_source_value,
:gender_source_value,
:gender_source_concept_id,
:race_source_value,
:race_source_concept_id,
:ethnicity_source_value,
:ethnicity_source_concept_id])
=#
Database introspection (PostgreSQL)
The following code generates SQLTable
objects for a PostgreSQL database. See the section Database introspection (SQLite) for the definition of the make_tables()
function and instructions on how to bring the generated SQLTable
objects into the global scope.
const introspect_postgresql_sql = """
SELECT n.nspname AS schema, c.relname AS name, a.attname AS column
FROM pg_catalog.pg_namespace AS n
JOIN pg_catalog.pg_class AS c ON (n.oid = c.relnamespace)
JOIN pg_catalog.pg_attribute AS a ON (c.oid = a.attrelid)
WHERE n.nspname = \$1 AND
c.relkind IN ('r', 'v') AND
HAS_TABLE_PRIVILEGE(c.oid, 'SELECT') AND
a.attnum > 0 AND
NOT a.attisdropped
ORDER BY n.nspname, c.relname, a.attnum
"""
introspect_postgresql(conn, schema = :public) =
execute(conn, introspect_postgresql_sql, (String(schema),)) |>
make_tables
Alternatively, we could generate the introspection query using FunSQL.
const pg_namespace =
SQLTable(schema = :pg_catalog,
name = :pg_namespace,
columns = [:oid, :nspname])
const pg_class =
SQLTable(schema = :pg_catalog,
name = :pg_class,
columns = [:oid, :relname, :relnamespace, :relkind])
const pg_attribute =
SQLTable(schema = :pg_catalog,
name = :pg_attribute,
columns = [:attrelid, :attname, :attnum, :attisdropped])
const IntrospectPostgreSQL =
From(pg_class) |>
Where(Fun.in(Get.relkind, "r", "v")) |>
Where(Fun.has_table_privilege(Get.oid, "SELECT")) |>
Join(From(pg_namespace) |>
Where(Get.nspname .== Var.schema) |>
As(:nsp),
on = Get.relnamespace .== Get.nsp.oid) |>
Join(From(pg_attribute) |>
Where(Fun.and(Get.attnum .> 0, Fun.not(Get.attisdropped))) |>
As(:att),
on = Get.oid .== Get.att.attrelid) |>
Order(Get.nsp.nspname, Get.relname, Get.att.attnum) |>
Select(Get.nsp.nspname, Get.relname, Get.att.attname)
const introspect_postgresql_sql =
render(IntrospectPostgreSQL, dialect = :postgresql)
Database introspection (MySQL)
The following code generates SQLTable
objects for a MySQL database. See the section Database introspection (SQLite) for the definition of the make_tables()
function and instructions on how to bring the generated SQLTable
objects into the global scope.
const introspect_mysql_sql = """
SELECT table_schema AS `schema`, table_name AS `name`, column_name AS `column`
FROM information_schema.columns
WHERE table_schema = COALESCE(?, DATABASE())
ORDER BY table_schema, table_name, ordinal_position
"""
introspect_mysql(conn, schema = nothing) =
DBInterface.execute(
DBInterface.prepare(conn, introspect_mysql_sql),
(schema !== nothing ? String(schema) : missing,)) |>
make_tables
Alternatively, we could generate the introspection query using FunSQL.
const information_schema_columns =
SQLTable(schema = :information_schema,
name = :columns,
columns = [:table_schema, :table_name, :column_name, :ordinal_position])
const IntrospectMySQL =
From(information_schema_columns) |>
Where(Get.table_schema .== Fun.coalesce(Var.schema, Fun.database())) |>
Order(Get.table_schema, Get.table_name, Get.ordinal_position) |>
Select(Get.table_schema, Get.table_name, Get.column_name)
const introspect_mysql_sql =
render(IntrospectMySQL, dialect = :mysql) |> String
Database introspection (Microsoft SQL Server)
The following code generates SQLTable
objects for a Microsoft SQL Server database. See the section Database introspection (SQLite) for the definition of the make_tables()
function and instructions on how to bring the generated SQLTable
objects into the global scope.
const introspect_sqlserver_sql = """
SELECT s.name AS [schema], o.name AS [name], c.name AS [column]
FROM sys.schemas AS s
JOIN sys.objects AS o ON (s.schema_id = o.schema_id)
JOIN sys.columns AS c ON (o.object_id = c.object_id)
WHERE s.name = ? AND o.type IN ('U', 'V')
ORDER BY s.name, o.name, c.column_id
"""
introspect_sqlserver(conn, schema = :dbo) =
DBInterface.execute(conn, introspect_sqlserver_sql, (String(schema),)) |>
make_tables
Alternatively, we could generate the introspection query using FunSQL.
const sys_schemas =
SQLTable(schema = :sys, name = :schemas, columns = [:schema_id, :name])
const sys_tables =
SQLTable(schema = :sys, name = :tables, columns = [:schema_id, :object_id, :name, :type])
const sys_columns =
SQLTable(schema = :sys, name = :columns, columns = [:object_id, :column_id, :name])
const IntrospectSQLServer =
From(sys_tables) |>
Where(Fun.in(Get.type, "U", "V")) |>
Join(From(sys_schemas) |>
Where(Get.name .== Var.schema) |>
As(:schema),
on = Get.schema_id .== Get.schema.schema_id) |>
Join(From(sys_columns) |>
As(:column),
on = Get.object_id .== Get.column.object_id) |>
Order(Get.schema.name, Get.name, Get.column.column_id) |>
Select(:schema => Get.schema.name, Get.name, :column => Get.column.name)
const introspect_sqlserver_sql =
render(IntrospectSQLServer, dialect = :sqlserver)
Database introspection (Amazon RedShift)
See Database introspection (PostgreSQL).
SELECT * FROM table
FunSQL does not require that a query object contains Select
, so a minimal FunSQL query consists of a single From
node.
Show all patient records.
q = From(person)
sql = render(q, dialect = :sqlite)
print(sql)
#=>
SELECT
"person_1"."person_id",
⋮
"person_1"."ethnicity_source_concept_id"
FROM "person" AS "person_1"
=#
res = DBInterface.execute(conn, sql)
To display the output of a query, it is convenient to use the DataFrame interface.
using DataFrames
DataFrame(res)
#=>
10×18 DataFrame
Row │ person_id gender_concept_id year_of_birth month_of_birth day_of_bir ⋯
│ Int64 Int64 Int64 Int64 Int64 ⋯
─────┼──────────────────────────────────────────────────────────────────────────
1 │ 1780 8532 1940 2 ⋯
2 │ 30091 8532 1932 8
3 │ 37455 8532 1913 7
4 │ 42383 8507 1922 2
5 │ 69985 8532 1956 7 ⋯
6 │ 72120 8507 1937 10
7 │ 82328 8532 1957 9
8 │ 95538 8507 1923 11
9 │ 107680 8532 1963 12 ⋯
10 │ 110862 8507 1911 4
14 columns omitted
=#
WHERE
, ORDER
, LIMIT
Tabular operations such as Where
, Order
, and Limit
are available in FunSQL. Unlike SQL, FunSQL lets you apply them in any order.
Show the top 3 oldest male patients.
q = From(person) |>
Where(Get.gender_concept_id .== 8507) |>
Order(Get.year_of_birth) |>
Limit(3)
sql = render(q, dialect = :sqlite)
print(sql)
#=>
SELECT
"person_1"."person_id",
⋮
"person_1"."ethnicity_source_concept_id"
FROM "person" AS "person_1"
WHERE ("person_1"."gender_concept_id" = 8507)
ORDER BY "person_1"."year_of_birth"
LIMIT 3
=#
res = DBInterface.execute(conn, sql)
DataFrame(res)
#=>
3×18 DataFrame
Row │ person_id gender_concept_id year_of_birth month_of_birth day_of_bir ⋯
│ Int64 Int64 Int64 Int64 Int64 ⋯
─────┼──────────────────────────────────────────────────────────────────────────
1 │ 110862 8507 1911 4 ⋯
2 │ 42383 8507 1922 2
3 │ 95538 8507 1923 11
14 columns omitted
=#
Show all males among the top 3 oldest patients.
q = From(person) |>
Order(Get.year_of_birth) |>
Limit(3) |>
Where(Get.gender_concept_id .== 8507)
sql = render(q, dialect = :sqlite)
print(sql)
#=>
SELECT
"person_2"."person_id",
⋮
"person_2"."ethnicity_source_concept_id"
FROM (
SELECT
"person_1"."person_id",
⋮
"person_1"."ethnicity_source_concept_id"
FROM "person" AS "person_1"
ORDER BY "person_1"."year_of_birth"
LIMIT 3
) AS "person_2"
WHERE ("person_2"."gender_concept_id" = 8507)
=#
res = DBInterface.execute(conn, sql)
DataFrame(res)
#=>
2×18 DataFrame
Row │ person_id gender_concept_id year_of_birth month_of_birth day_of_bir ⋯
│ Int64 Int64 Int64 Int64 Int64 ⋯
─────┼──────────────────────────────────────────────────────────────────────────
1 │ 110862 8507 1911 4 ⋯
2 │ 42383 8507 1922 2
14 columns omitted
=#
SELECT COUNT(*) FROM table
To apply an aggregate function to the dataset as a whole, we use a Group
node without arguments.
Show the number of patient records.
q = From(person) |>
Group() |>
Select(Agg.count())
sql = render(q, dialect = :sqlite)
print(sql)
#=>
SELECT COUNT(*) AS "count"
FROM "person" AS "person_1"
=#
res = DBInterface.execute(conn, sql)
DataFrame(res)
#=>
1×1 DataFrame
Row │ count
│ Int64
─────┼───────
1 │ 10
=#
SELECT DISTINCT
If we use a Group
node, but do not apply any aggregate functions, FunSQL will render it as a SELECT DISTINCT
clause.
Show all US states present in the location records.
q = From(location) |>
Group(Get.state)
sql = render(q, dialect = :sqlite)
print(sql)
#=>
SELECT DISTINCT "location_1"."state"
FROM "location" AS "location_1"
=#
res = DBInterface.execute(conn, sql)
DataFrame(res)
#=>
10×1 DataFrame
Row │ state
│ String
─────┼────────
1 │ MI
2 │ WA
3 │ FL
4 │ MD
5 │ NY
6 │ MS
7 │ CO
8 │ GA
9 │ MA
10 │ IL
=#
Filtering output columns
Either broadcasting or vector comprehension could be used to filter the list of output columns.
Filter out all "source" columns from patient records.
is_not_source_column(c::Symbol) =
!contains(String(c), "source")
q = From(person) |>
Select(Get.(filter(is_not_source_column, person.columns))...)
# q = From(person) |>
# Select(args = [Get(c) for c in person.columns if is_not_source_column(c)])
display(q)
#=>
let person = SQLTable(:person, …),
q1 = From(person),
q2 = q1 |>
Select(Get.person_id,
Get.gender_concept_id,
Get.year_of_birth,
Get.month_of_birth,
Get.day_of_birth,
Get.time_of_birth,
Get.race_concept_id,
Get.ethnicity_concept_id,
Get.location_id,
Get.provider_id,
Get.care_site_id)
q2
end
=#
sql = render(q, dialect = :sqlite)
print(sql)
#=>
SELECT
"person_1"."person_id",
"person_1"."gender_concept_id",
"person_1"."year_of_birth",
"person_1"."month_of_birth",
"person_1"."day_of_birth",
"person_1"."time_of_birth",
"person_1"."race_concept_id",
"person_1"."ethnicity_concept_id",
"person_1"."location_id",
"person_1"."provider_id",
"person_1"."care_site_id"
FROM "person" AS "person_1"
=#
res = DBInterface.execute(conn, sql)
DataFrame(res)
#=>
10×11 DataFrame
Row │ person_id gender_concept_id year_of_birth month_of_birth day_of_bir ⋯
│ Int64 Int64 Int64 Int64 Int64 ⋯
─────┼──────────────────────────────────────────────────────────────────────────
1 │ 1780 8532 1940 2 ⋯
2 │ 30091 8532 1932 8
3 │ 37455 8532 1913 7
4 │ 42383 8507 1922 2
5 │ 69985 8532 1956 7 ⋯
6 │ 72120 8507 1937 10
7 │ 82328 8532 1957 9
8 │ 95538 8507 1923 11
9 │ 107680 8532 1963 12 ⋯
10 │ 110862 8507 1911 4
7 columns omitted
=#
Output columns of a Join
As
is often used to disambiguate the columns of the two input branches of the Join
node. By default, columns fenced by As
are not present in the output.
q = From(person) |>
Join(From(visit_occurrence) |> As(:visit),
on = Get.person_id .== Get.visit.person_id)
print(render(q, dialect = :sqlite))
#=>
SELECT
"person_1"."person_id",
⋮
"person_1"."ethnicity_source_concept_id"
FROM "person" AS "person_1"
JOIN "visit_occurrence" AS "visit_occurrence_1" ON ("person_1"."person_id" = "visit_occurrence_1"."person_id")
=#
q′ = From(person) |> As(:person) |>
Join(From(visit_occurrence),
on = Get.person.person_id .== Get.person_id)
print(render(q′, dialect = :sqlite))
#=>
SELECT
"visit_occurrence_1"."visit_occurrence_id",
⋮
"visit_occurrence_1"."visit_source_concept_id"
FROM "person" AS "person_1"
JOIN "visit_occurrence" AS "visit_occurrence_1" ON ("person_1"."person_id" = "visit_occurrence_1"."person_id")
=#
We could use a Select
node to output the columns of both branches, however we must ensure that all column names are unique.
q = q |>
Select(Get.(person.columns)...,
Get.(visit_occurrence.columns, over = Get.visit)...)
#=>
ERROR: FunSQL.DuplicateLabelError: person_id is used more than once in:
⋮
=#
q = q |>
Select(Get.(person.columns)...,
Get.(filter(!in(person.columns), visit_occurrence.columns),
over = Get.visit)...)
print(render(q, dialect = :sqlite))
#=>
SELECT
"person_1"."person_id",
⋮
"person_1"."ethnicity_source_concept_id",
"visit_occurrence_1"."visit_occurrence_id",
⋮
"visit_occurrence_1"."visit_source_concept_id"
FROM "person" AS "person_1"
JOIN "visit_occurrence" AS "visit_occurrence_1" ON ("person_1"."person_id" = "visit_occurrence_1"."person_id")
=#
Encapsulating complex SQL expressions
Show the number of patients diagnosed with myocardial infarction stratified by the age group at the time of diagnosis.
PersonAgeAt(date) =
Fun.strftime("%Y", date) .- Get.year_of_birth
AgeGroup(age) =
Fun.case(Iterators.flatten([(age .< y, "$(y-5) - $(y-1)")
for y = 5:5:100])...,
"100 +")
ConceptByName(name) =
From(concept) |>
Where(Fun.like(Get.concept_name, "%$(name)%"))
MyocardialInfarctionConcept() =
ConceptByName("myocardial infarction")
MyocardialInfarctionOccurrence() =
From(condition_occurrence) |>
Join(:concept => MyocardialInfarctionConcept(),
on = Get.condition_concept_id .== Get.concept.concept_id)
q = From(person) |>
Join(:condition => MyocardialInfarctionOccurrence(),
on = Get.person_id .== Get.condition.person_id) |>
Group(:age_group => AgeGroup(PersonAgeAt(Get.condition.condition_start_date))) |>
Select(Get.age_group, Agg.count())
sql = render(q, dialect = :sqlite)
print(sql)
#=>
SELECT
(CASE WHEN ((STRFTIME('%Y', "condition_1"."condition_start_date") - "person_1"."year_of_birth") < 5) THEN '0 - 4' … ELSE '100 +' END) AS "age_group",
COUNT(*) AS "count"
FROM "person" AS "person_1"
JOIN (
SELECT
"condition_occurrence_1"."person_id",
"condition_occurrence_1"."condition_start_date"
FROM "condition_occurrence" AS "condition_occurrence_1"
JOIN (
SELECT "concept_1"."concept_id"
FROM "concept" AS "concept_1"
WHERE ("concept_1"."concept_name" LIKE '%myocardial infarction%')
) AS "concept_2" ON ("condition_occurrence_1"."condition_concept_id" = "concept_2"."concept_id")
) AS "condition_1" ON ("person_1"."person_id" = "condition_1"."person_id")
GROUP BY (CASE WHEN ((STRFTIME('%Y', "condition_1"."condition_start_date") - "person_1"."year_of_birth") < 5) THEN '0 - 4' … ELSE '100 +' END)
=#
res = DBInterface.execute(conn, sql)
DataFrame(res)
#=>
3×2 DataFrame
Row │ age_group count
│ String Int64
─────┼──────────────────
1 │ 50 - 54 1
2 │ 65 - 69 1
3 │ 95 - 99 4
=#
Assembling queries incrementally
It is often convenient to build a query incrementally, one component at a time. This allows us to validate individual components, inspect their output, and possibly reuse them in other queries. Note that FunSQL allows to encapsulate not just intermediate datasets, but also dataset operations such as FilterByGap()
.
Find all occurrences of myocardial infarction that was diagnosed during an inpatient visit. Filter out repeating occurrences by requiring a 180-day gap between consecutive events.
using Dates
ConceptByName(name) =
From(concept) |>
Where(Fun.like(Get.concept_name, "%$(name)%"))
MyocardialInfarctionConcept() =
ConceptByName("myocardial infarction")
MyocardialInfarctionOccurrence() =
From(condition_occurrence) |>
Join(:concept => MyocardialInfarctionConcept(),
on = Get.condition_concept_id .== Get.concept.concept_id)
InpatientVisitConcept() =
ConceptByName("inpatient")
InpatientVisitOccurrence() =
From(visit_occurrence) |>
Join(:concept => InpatientVisitConcept(),
on = Get.visit_concept_id .== Get.concept.concept_id)
CorrelatedInpatientVisit(person_id, date) =
InpatientVisitOccurrence() |>
Where(Fun.and(Get.person_id .== Var.person_id,
Fun.between(Var.date, Get.visit_start_date, Get.visit_end_date))) |>
Bind(:person_id => person_id,
:date => date)
MyocardialInfarctionDuringInpatientVisit() =
MyocardialInfarctionOccurrence() |>
Where(Fun.exists(CorrelatedInpatientVisit(Get.person_id, Get.condition_start_date)))
FilterByGap(date, gap) =
Partition(Get.person_id, order_by = [date]) |>
Define(:boundary => Agg.lag(Fun.date(date, gap))) |>
Where(Fun.or(Fun."is null"(Get.boundary),
Get.boundary .< date))
FilteredMyocardialInfarctionDuringInpatientVisit() =
MyocardialInfarctionDuringInpatientVisit() |>
FilterByGap(Get.condition_start_date, Day(180))
q = FilteredMyocardialInfarctionDuringInpatientVisit() |>
Select(Get.person_id, Get.condition_start_date)
sql = render(q, dialect = :sqlite)
print(sql)
#=>
SELECT
"condition_occurrence_2"."person_id",
"condition_occurrence_2"."condition_start_date"
FROM (
SELECT
"condition_occurrence_1"."person_id",
"condition_occurrence_1"."condition_start_date",
(LAG(DATE("condition_occurrence_1"."condition_start_date", '180 days')) OVER (PARTITION BY "condition_occurrence_1"."person_id" ORDER BY "condition_occurrence_1"."condition_start_date")) AS "boundary"
FROM "condition_occurrence" AS "condition_occurrence_1"
JOIN (
SELECT "concept_1"."concept_id"
FROM "concept" AS "concept_1"
WHERE ("concept_1"."concept_name" LIKE '%myocardial infarction%')
) AS "concept_2" ON ("condition_occurrence_1"."condition_concept_id" = "concept_2"."concept_id")
WHERE (EXISTS (
SELECT NULL
FROM "visit_occurrence" AS "visit_occurrence_1"
JOIN (
SELECT "concept_3"."concept_id"
FROM "concept" AS "concept_3"
WHERE ("concept_3"."concept_name" LIKE '%inpatient%')
) AS "concept_4" ON ("visit_occurrence_1"."visit_concept_id" = "concept_4"."concept_id")
WHERE
("visit_occurrence_1"."person_id" = "condition_occurrence_1"."person_id") AND
("condition_occurrence_1"."condition_start_date" BETWEEN "visit_occurrence_1"."visit_start_date" AND "visit_occurrence_1"."visit_end_date")
))
) AS "condition_occurrence_2"
WHERE (("condition_occurrence_2"."boundary" IS NULL) OR ("condition_occurrence_2"."boundary" < "condition_occurrence_2"."condition_start_date"))
=#
res = DBInterface.execute(conn, sql)
DataFrame(res)
#=>
1×2 DataFrame
Row │ person_id condition_start_date
│ Int64 String
─────┼─────────────────────────────────
1 │ 1780 2008-04-10
=#
Merging overlapping intervals
Merging overlapping intervals into a single encompassing period could be done in three steps:
- Tag the intervals that start a new period.
- Enumerate the periods.
- Group the intervals by the period number.
FunSQL lets us encapsulate and reuse this rather complex sequence of transformations.
Merge overlapping visits.
MergeOverlappingIntervals(start_date, end_date) =
Partition(Get.person_id,
order_by = [start_date],
frame = (mode = :rows, start = -Inf, finish = -1)) |>
Define(:new => Fun.case(start_date .<= Agg.max(end_date), 0, 1)) |>
Partition(Get.person_id,
order_by = [start_date, .- Get.new],
frame = :rows) |>
Define(:period => Agg.sum(Get.new)) |>
Group(Get.person_id, Get.period) |>
Define(:start_date => Agg.min(start_date),
:end_date => Agg.max(end_date))
q = From(visit_occurrence) |>
MergeOverlappingIntervals(Get.visit_start_date, Get.visit_end_date) |>
Select(Get.person_id, Get.start_date, Get.end_date)
sql = render(q, dialect = :sqlite)
print(sql)
#=>
SELECT
"visit_occurrence_3"."person_id",
MIN("visit_occurrence_3"."visit_start_date") AS "start_date",
MAX("visit_occurrence_3"."visit_end_date") AS "end_date"
FROM (
SELECT
"visit_occurrence_2"."person_id",
(SUM("visit_occurrence_2"."new") OVER (PARTITION BY "visit_occurrence_2"."person_id" ORDER BY "visit_occurrence_2"."visit_start_date", (- "visit_occurrence_2"."new") ROWS UNBOUNDED PRECEDING)) AS "period",
"visit_occurrence_2"."visit_start_date",
"visit_occurrence_2"."visit_end_date"
FROM (
SELECT
"visit_occurrence_1"."person_id",
(CASE WHEN ("visit_occurrence_1"."visit_start_date" <= (MAX("visit_occurrence_1"."visit_end_date") OVER (PARTITION BY "visit_occurrence_1"."person_id" ORDER BY "visit_occurrence_1"."visit_start_date" ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING))) THEN 0 ELSE 1 END) AS "new",
"visit_occurrence_1"."visit_start_date",
"visit_occurrence_1"."visit_end_date"
FROM "visit_occurrence" AS "visit_occurrence_1"
) AS "visit_occurrence_2"
) AS "visit_occurrence_3"
GROUP BY
"visit_occurrence_3"."person_id",
"visit_occurrence_3"."period"
=#
res = DBInterface.execute(conn, sql)
DataFrame(res)
#=>
25×3 DataFrame
Row │ person_id start_date end_date
│ Int64 String String
─────┼───────────────────────────────────
1 │ 1780 2008-04-09 2008-04-13
2 │ 1780 2008-11-22 2008-11-22
3 │ 1780 2009-05-22 2009-05-22
4 │ 30091 2008-11-12 2008-11-12
5 │ 30091 2009-07-30 2009-08-07
6 │ 37455 2008-03-18 2008-03-18
7 │ 37455 2008-10-30 2008-10-30
8 │ 37455 2010-08-12 2010-08-12
⋮ │ ⋮ ⋮ ⋮
19 │ 95538 2009-09-02 2009-09-02
20 │ 107680 2009-06-07 2009-06-07
21 │ 107680 2009-07-20 2009-07-30
22 │ 110862 2008-09-07 2008-09-16
23 │ 110862 2009-06-30 2009-06-30
24 │ 110862 2009-09-30 2009-10-01
25 │ 110862 2010-06-07 2010-06-07
10 rows omitted
=#
Derive a patient's observation periods by merging visits with less than one year gap between them.
MergeIntervalsByGap(start_date, end_date, gap) =
MergeOverlappingIntervals(start_date, Fun.date(end_date, gap)) |>
Define(:end_date => Fun.date(Get.end_date, -gap))
q = From(visit_occurrence) |>
MergeIntervalsByGap(Get.visit_start_date, Get.visit_end_date, Day(365)) |>
Select(Get.person_id, Get.start_date, Get.end_date)
sql = render(q, dialect = :sqlite)
print(sql)
#=>
SELECT
"visit_occurrence_3"."person_id",
MIN("visit_occurrence_3"."visit_start_date") AS "start_date",
DATE(MAX(DATE("visit_occurrence_3"."visit_end_date", '365 days')), '-365 days') AS "end_date"
FROM (
SELECT
"visit_occurrence_2"."person_id",
(SUM("visit_occurrence_2"."new") OVER (PARTITION BY "visit_occurrence_2"."person_id" ORDER BY "visit_occurrence_2"."visit_start_date", (- "visit_occurrence_2"."new") ROWS UNBOUNDED PRECEDING)) AS "period",
"visit_occurrence_2"."visit_start_date",
"visit_occurrence_2"."visit_end_date"
FROM (
SELECT
"visit_occurrence_1"."person_id",
(CASE WHEN ("visit_occurrence_1"."visit_start_date" <= (MAX(DATE("visit_occurrence_1"."visit_end_date", '365 days')) OVER (PARTITION BY "visit_occurrence_1"."person_id" ORDER BY "visit_occurrence_1"."visit_start_date" ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING))) THEN 0 ELSE 1 END) AS "new",
"visit_occurrence_1"."visit_start_date",
"visit_occurrence_1"."visit_end_date"
FROM "visit_occurrence" AS "visit_occurrence_1"
) AS "visit_occurrence_2"
) AS "visit_occurrence_3"
GROUP BY
"visit_occurrence_3"."person_id",
"visit_occurrence_3"."period"
=#
res = DBInterface.execute(conn, sql)
DataFrame(res)
#=>
12×3 DataFrame
Row │ person_id start_date end_date
│ Int64 String String
─────┼───────────────────────────────────
1 │ 1780 2008-04-09 2009-05-22
2 │ 30091 2008-11-12 2009-08-07
3 │ 37455 2008-03-18 2008-10-30
4 │ 37455 2010-08-12 2010-08-12
5 │ 42383 2009-06-29 2010-04-15
6 │ 69985 2009-01-09 2009-01-09
7 │ 69985 2010-04-17 2010-07-30
8 │ 72120 2008-12-15 2008-12-15
9 │ 82328 2008-10-20 2009-01-25
10 │ 95538 2009-03-30 2009-09-02
11 │ 107680 2009-06-07 2009-07-30
12 │ 110862 2008-09-07 2010-06-07
=#