Intake Gets Some Wings From DuckDB

Loading data can be a pain. Let's say you have a 100GB folder of Gzipped CSVs sitting on Amazon S3 — what is the simplest way of converting this dataset into a Dask DataFrame that your data science team can work with? What about a Jupyter notebook that contains the five-ish lines of Python needed to handle imports, credentials, and loading? What if the source data changes, or the URL changes, or you want to include metadata or plots? Now those five-ish lines are broken and you need to somehow push the updates to all your users.

Enter Intake. With Intake you can encode all the details of your data collection into a single catalog. Remote files (including catalogs themselves) are a breeze thanks to fsspec. Details such as filetypes, transfer protocols, chunk sizes are all abstracted away from the user who only needs to know enough to open a catalog and read the data sources. Intake makes it easy for data stewards to curate large, varied, and complex datasets into easily distributable and maintainable catalogs. Importantly, it places the onus of handling the data's particular eccentricities (access patterns, etc.) on the data steward rather than the end user who just wants to be handed a DataFrame so they can move on with their life. Intake is used by data science teams and data engineers, and plays a key role in Anaconda Nucleus' new data catalogs service.

Intake aims to give you your data then get out of the way. It is just a data loader, not a query engine. But what if you want to provide your users with a transformed, subsampled, or aggregated view of your dataset? Intake provides an interface for building custom plugins that will handle these sorts of things, but building plugins or extending existing plugins takes some time and know-how. What if you could simply write some SQL and provide a transformed data source directly, without needing to modify or copy the original data?

Enter DuckDB. DuckDB wants to be your general purpose query engine for tabular data. It is a data format, a library of analytical tools, and an overall data science workhorse. Below we'll go into some of what makes DuckDB special, the problems it tries to solve, and why it is a natural extension to Intake's existing functionality.

Intake background

Intake organizes data sources with DataSource objects; a DataSource is a wrapper around some container type, commonly a DataFrame, that has a bunch of metadata about the source data and a .read() method for loading the actual data into memory. Data sources can point to remote files, integrate with Dask, and even load pre-defined plots with hvPlot. An intake Catalog is a collection of data sources. Catalogs can even nest inside other catalogs.

# nyc_taxi.yaml
sources:
  nyc_taxi:
    description: NYC Taxi dataset
    driver: parquet
    args:
      urlpath: s3://datashader-data/nyc_taxi_wide.parq
    metadata:
      plot:
        datashade: true
pip install intake intake-parquet s3fs
import intake

cat = intake.open_catalog("nyc_taxi.yaml")
cat.nyc_taxi.plot()

Intake allows you to extend the DataSource and Catalog classes and add them to the intake registry, wrapping your custom data structure, whether it's a file or database, in Intake's semantics. Now your user can call df = intake.open_catalog(...).source_name.read() on anything you'd like. Many such plugins already exist and can be found at the plugin directory.

Adding a query engine

The purpose of Intake is to make it easy to distribute existing data, but what if you want to share only part of a source, or an aggregation like a groupby? Well, you can perform the derivation yourself and save the result as a new source. That's not ideal. A better solution would be to use Intake's Dataset Transforms which operate on an existing DataSource and perform some sort of custom operation. The particular logic of the operation is up to the developer who needs to wrap the DerivedSource class, write the code, and package the code for distribution. Intake provides the Columns transformation which returns just the source DataFrame's columns as an example.

This sort of functionality is very useful, especially for teams and projects that know their data well and want to integrate Intake into a data pipeline, have sources that are complex derivations of other sources, or have non-DataFrame containers. See intake-xarray for an example of a custom Xarray Dataset transform.

For more general purpose transformations, the intake-duckdb plugin leverages DuckDB's unique ability to query many types of tabular datasets as if they were database tables. Being an embedable, single-file data format, DuckDB resembles SQLite but is optimized for analytics and aggregations. It can also operate directly on data that exists only in-memory without copying anything. With a modest amount of work, Intake-DuckDB extends this capability to the humble DataSource and any child class, as long as the container type is a DataFrame.

Now you can provide filtered or modified versions of your data without needing to write a custom Intake plugin or copy any data around. Just write a little SQL. Tools like Ibis can even help with converting complex DataFrame operations into valid SQL.

Example

The following catalog aggregates and joins some data about vehicle crashes in New York in 2023. Notice that ny_crashes_vs_registrations_2023 uses the duckdb_transform driver to perform a join on ny_vehicle_registrations and ny_crashes which are both CSVSources.

# ny_crashes.yaml
sources:
  ny_vehicle_registrations:
    description: New York vehicle registrations
    driver: csv
    args:
      urlpath: https://data.ny.gov/api/views/w4pv-hbkt/rows.csv
      csv_kwargs:
        usecols:
          - Zip
          - State
          - Record Type
          - Reg Valid Date
          - Reg Expiration Date
        parse_dates:
          - Reg Valid Date
          - Reg Expiration Date
        blocksize:

  ny_crashes:
    description: New York traffic crashes
    driver: csv
    args:
      urlpath: https://data.cityofnewyork.us/api/views/h9gi-nx95/rows.csv
      csv_kwargs:
        usecols:
          - ZIP CODE
          - CRASH DATE
        parse_dates:
          - CRASH DATE
        dtype: {ZIP CODE: object}
        blocksize:

  ny_crashes_vs_registrations_2023:
    description: Comparison of New York vehicle crashes vs. registrations by ZIP code
      in 2023
    driver: duckdb_transform
    args:
      targets:
        - ny_vehicle_registrations
        - ny_crashes
      sql_expr: |
        SELECT r.zip, c.crash_count, r.registration_count
        FROM (
            SELECT "ZIP CODE" as zip, COUNT(*) as crash_count
            FROM ny_crashes
            WHERE "CRASH DATE" BETWEEN '2023-01-01' AND '2023-12-31'
            GROUP BY "ZIP CODE"
        ) as c
        JOIN (
            SELECT Zip as zip, COUNT(*) as registration_count
            FROM ny_vehicle_registrations
            WHERE State = 'NY'
                AND "Record Type" = 'VEH'
                AND "Reg Valid Date" <= '2023-12-31'
                AND "Reg Expiration Date" >= '2023-01-01'
            GROUP BY Zip
        ) as r
        ON c.zip = r.zip
import intake

cat = intake.open_catalog("ny_crashes.yaml")
cat.ny_crashes_vs_registrations_2023.read()

Check out the README for some additional drivers that can build Intake catalogs from embedded DuckDB files and query them directly.

Limitations

Intake-DuckDB is in early stage development, arguably still a prototype. There are few configuration options for duckdb_transform sources, and interoperability with other Intake sources is largely untested. DuckDB contains a rich set of features, including running queries directly on remote datasets without needing to suck the whole thing into memory, and generating plots directly with the engine; Intake uses none of this, but could in the future. Intake could very well push more processing down to the Duck layer, or use DuckDB as a general purpose persistence store for any source type. Questions and PRs are more than welcome over on GitHub.