Loading data can be a pain. Let's say you have a 100GB folder of Gzipped CSVs sitting on Amazon S3 — what is the simplest way of converting this dataset into a Dask DataFrame that your data science team can work with? What about a Jupyter notebook that contains the five-ish lines of Python needed to handle imports, credentials, and loading? What if the source data changes, or the URL changes, or you want to include metadata or plots? Now those five-ish lines are broken and you need to somehow push the updates to all your users.
Enter Intake. With Intake you can encode all the details of your data collection into a single catalog. Remote files (including catalogs themselves) are a breeze thanks to fsspec. Details such as filetypes, transfer protocols, chunk sizes are all abstracted away from the user who only needs to know enough to open a catalog and read the data sources. Intake makes it easy for data stewards to curate large, varied, and complex datasets into easily distributable and maintainable catalogs. Importantly, it places the onus of handling the data's particular eccentricities (access patterns, etc.) on the data steward rather than the end user who just wants to be handed a DataFrame so they can move on with their life. Intake is used by data science teams and data engineers, and plays a key role in Anaconda Nucleus' new data catalogs service.
Intake aims to give you your data then get out of the way. It is just a data loader, not a query engine. But what if you want to provide your users with a transformed, subsampled, or aggregated view of your dataset? Intake provides an interface for building custom plugins that will handle these sorts of things, but building plugins or extending existing plugins takes some time and know-how. What if you could simply write some SQL and provide a transformed data source directly, without needing to modify or copy the original data?
Enter DuckDB. DuckDB wants to be your general purpose query engine for tabular data. It is a data format, a library of analytical tools, and an overall data science workhorse. Below we'll go into some of what makes DuckDB special, the problems it tries to solve, and why it is a natural extension to Intake's existing functionality.
Intake background
Intake organizes data sources with DataSource
objects; a DataSource
is a wrapper around some container type, commonly a DataFrame, that has a bunch of metadata about the source data and a .read()
method for loading the actual data into memory. Data sources can point to remote files, integrate with Dask, and even load pre-defined plots with hvPlot. An intake Catalog
is a collection of data sources. Catalogs can even nest inside other catalogs.
# nyc_taxi.yaml
sources:
nyc_taxi:
description: NYC Taxi dataset
driver: parquet
args:
urlpath: s3://datashader-data/nyc_taxi_wide.parq
metadata:
plot:
datashade: true
pip install intake intake-parquet s3fs
import intake
cat = intake.open_catalog("nyc_taxi.yaml")
cat.nyc_taxi.plot()
Intake allows you to extend the DataSource
and Catalog
classes and add them to the intake registry, wrapping your custom data structure, whether it's a file or database, in Intake's semantics. Now your user can call df = intake.open_catalog(...).source_name.read()
on anything you'd like. Many such plugins already exist and can be found at the plugin directory.
Adding a query engine
The purpose of Intake is to make it easy to distribute existing data, but what if you want to share only part of a source, or an aggregation like a groupby? Well, you can perform the derivation yourself and save the result as a new source. That's not ideal. A better solution would be to use Intake's Dataset Transforms which operate on an existing DataSource
and perform some sort of custom operation. The particular logic of the operation is up to the developer who needs to wrap the DerivedSource
class, write the code, and package the code for distribution. Intake provides the Columns
transformation which returns just the source DataFrame's columns as an example.
This sort of functionality is very useful, especially for teams and projects that know their data well and want to integrate Intake into a data pipeline, have sources that are complex derivations of other sources, or have non-DataFrame containers. See intake-xarray for an example of a custom Xarray Dataset transform.
For more general purpose transformations, the intake-duckdb plugin leverages DuckDB's unique ability to query many types of tabular datasets as if they were database tables. Being an embedable, single-file data format, DuckDB resembles SQLite but is optimized for analytics and aggregations. It can also operate directly on data that exists only in-memory without copying anything. With a modest amount of work, Intake-DuckDB extends this capability to the humble DataSource
and any child class, as long as the container type is a DataFrame.
Now you can provide filtered or modified versions of your data without needing to write a custom Intake plugin or copy any data around. Just write a little SQL. Tools like Ibis can even help with converting complex DataFrame operations into valid SQL.
Example
The following catalog aggregates and joins some data about vehicle crashes in New York in 2023. Notice that ny_crashes_vs_registrations_2023
uses the duckdb_transform
driver to perform a join on ny_vehicle_registrations
and ny_crashes
which are both CSVSource
s.
# ny_crashes.yaml
sources:
ny_vehicle_registrations:
description: New York vehicle registrations
driver: csv
args:
urlpath: https://data.ny.gov/api/views/w4pv-hbkt/rows.csv
csv_kwargs:
usecols:
- Zip
- State
- Record Type
- Reg Valid Date
- Reg Expiration Date
parse_dates:
- Reg Valid Date
- Reg Expiration Date
blocksize:
ny_crashes:
description: New York traffic crashes
driver: csv
args:
urlpath: https://data.cityofnewyork.us/api/views/h9gi-nx95/rows.csv
csv_kwargs:
usecols:
- ZIP CODE
- CRASH DATE
parse_dates:
- CRASH DATE
dtype: {ZIP CODE: object}
blocksize:
ny_crashes_vs_registrations_2023:
description: Comparison of New York vehicle crashes vs. registrations by ZIP code
in 2023
driver: duckdb_transform
args:
targets:
- ny_vehicle_registrations
- ny_crashes
sql_expr: |
SELECT r.zip, c.crash_count, r.registration_count
FROM (
SELECT "ZIP CODE" as zip, COUNT(*) as crash_count
FROM ny_crashes
WHERE "CRASH DATE" BETWEEN '2023-01-01' AND '2023-12-31'
GROUP BY "ZIP CODE"
) as c
JOIN (
SELECT Zip as zip, COUNT(*) as registration_count
FROM ny_vehicle_registrations
WHERE State = 'NY'
AND "Record Type" = 'VEH'
AND "Reg Valid Date" <= '2023-12-31'
AND "Reg Expiration Date" >= '2023-01-01'
GROUP BY Zip
) as r
ON c.zip = r.zip
import intake
cat = intake.open_catalog("ny_crashes.yaml")
cat.ny_crashes_vs_registrations_2023.read()
Check out the README for some additional drivers that can build Intake catalogs from embedded DuckDB files and query them directly.
Limitations
Intake-DuckDB is in early stage development, arguably still a prototype. There are few configuration options for duckdb_transform
sources, and interoperability with other Intake sources is largely untested. DuckDB contains a rich set of features, including running queries directly on remote datasets without needing to suck the whole thing into memory, and generating plots directly with the engine; Intake uses none of this, but could in the future. Intake could very well push more processing down to the Duck layer, or use DuckDB as a general purpose persistence store for any source type. Questions and PRs are more than welcome over on GitHub.