Benchmarking a dask-parquet-s3 workflow

Foreword

Benchmarking is hard and usually biased by the author's experience and viewpoint, which strongly affects what they choose to benchmark and how many "tricks" they know to optimise performance.

This article is an extension of benchmarking on the Coiled blog, which showed that using PyArrow string rather than python string objects is beneficial for the workflow presented. I don't disagree with that conclusion, but would like to explore the performance details further.

As a side-note, the possible advantages of PyArrow string storage was part of the discussion when Pandas decided to switch to using PyArrow as the default Parquet loading engine (a decision Dask later followed). As the author of fastparquet, the previous default, I clearly have a vested interest in showing that the package is still a performance contender, so take my observations with some salt.

Setup

As in the original benchmark, we will be timing the following workflow:

df = dd.read_parquet("s3://coiled-datasets/uber-lyft-tlc/", ...)
df["tipped"] = df.tips != 0
df.groupby(df.hvfhs_license_num).tipped.mean().compute()

The timing will be done on the last line only, so the IO and shuffle-compute are both contributing, but IO dominates. The data are 25GiB of parquet in 720 files in a public access bucket on AWS S3. It will be only the first line (with the ellipsis) that we will be changing. Column "hvfhs_license_num" is strings with a small number of unique values and "tips" is float32. All are SNAPPY compressed.

For the timings here, there were 10 dask workers with 1 thread and 4GB of memory each, in a Kubernetes cluster via dask-gateway in AWS us-east-1. This is not the same cluster setup as the original! The same cluster was used throughout, and there was no significant memory pressure at any point. All versions were at the current latest (dask 2023.4.0, pyarrow 11.0.0, pandas 2.0.1, fastparquet 2023.4.0). Each time is the best of three repeats.

Runs

1. Baseline (pyarrow with python strings)

dask.config.set({"dataframe.convert-string": False})
df = dd.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
    storage_options={"anon": True},
)

Time: 4m3s

Everything here is default

2. Use fastparquet instead

df = dd.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
    storage_options={"anon": True},
    engine="fastparquet",
)

Time: 2m40s

Yes, here is my big bias. Whenever someone says something can be improved in arrow, I try with fastparquet. What do you know? It's magically much better in this case. I can't answer for particularly why this would be the case. Note that fastparquet produces columns of python string objects when the base parquet type is UTF8.

3. Fastparquet with categories

df = dd.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
    storage_options={"anon": True},
    engine="fastparquet",
    categories=["hvfhs_license_num"],
)

Time: 2m48s

This was the original supposition I was interested in: the grouping column is actually stored as dict-encoded in the files, so loading as pandas categorical should be much faster, but it made... almost no difference at all. More on this below.

Pyarrow does not allow you to load this data as categorical.

4. Pyarrow and pyarrow string

dask.config.set({"dataframe.convert-string": True})
df = dd.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
    storage_options={"anon": True},
    engine="pyarrow",
)

Time: 2m17s

This was the thesis of the original article, that the new string storage mechanism should be much faster. It provides a decent boost over PyArrow (with Python string), and is also better than fastparquet, above. I do not have the knowledge to be able to tweak pyarrow further, but note that this is still using s3fs as the library to fetch bytes (a discussion about using pyarrow's own s3 implementation is another reason I wanted to chase this topic).

5. Rust implementation of s3fs and fastparquet

fsspec.register_implementation("s3", rfsspec.RustyS3FileSystem, clobber=True)
df = dd.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
    storage_options={"anon": True, "region": "us-east-2"},
    engine="fastparquet",
)

Times: 2m5s

This is the same as 2., but the S3 transfer is being done by rfsspec, which is still very experimental. Note that rfsspec requires that the region is provided, so some speed boost will be coming from avoiding HTTP redirection in all S3 calls. Also, rfsspec has a larger default buffer size, so there might be fewer requests here.

All the rest of the runs below use rfsspec.

6. Now with parquet-specific file access

df = dd.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
    storage_options={"anon": True, "region": "us-east-2"},
    engine="fastparquet",
    open_file_options={"precache_options": {"method": "parquet"}},
)

Times: 2m5s

Please see this article for a motivation and description of using the footer information of a parquet file to know exactly which byte-ranges will be needed, and prospectively/concurrently fetching them. We see that it makes no difference whatever, which is extremely fishy!

7. Next, specify the columns manually

df = dd.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
    storage_options={"anon": True, "region": "us-east-2"},
    columns=["tips", "hvfhs_license_num"],
    index=False,
    engine="fastparquet",
    open_file_options={"precache_options": {"method": "parquet"}},
)

Time: 49s

Bingo! This has made the biggest difference so far. So, it seems that dask did not figure out, that we only needed those two columns from the source, and maybe all the bytes of every file were being loaded every time. That also explains why the "precache-option" (6.) didn't make any difference - we were still loading the whole thing - and why categorising our groupby column (3.) didn't help - it was only affecting one column of many.

8. Add categories back in

df = dd.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
    storage_options={"anon": True, "region": "us-east-2"},
    columns=["tips", "hvfhs_license_num"],
    index=False,
    engine="fastparquet",
    open_file_options={"precache_options": {"method": "parquet"}},
    categories=["hvfhs_license_num"],
)

Time: 39s

This combines 7. and 3. Now we see that categorisation makes a decent difference, since we don't need to make python strings, and grouping on a code number is much faster too. This also has the best memory footprint (two bytes per value for the categorical column).

Remaining low-hanging fruit?

If 720 files are being processed in 10 workers in 40s, that means very roughly half a second each, assuming the final aggregations are a small fraction of the total. At this point, latency talking with the remote store starts to matter, as the number of bytes in the parquet headers and actual data bytes in the two columns don't account to much against AWS-to-AWS throughput.

Doing profiling, it turns out that each task calls fs.info three times per input file: once to check whether it is a file, once to get the file size (so you can read the footer with the parquet metadata) and once when it is opened again to fetch data. s3fs wants to have file information available, so that it can require an ETag match on the target, and avoid corrupted data should the target get overwritten during IO. However, we should be able to cache these details at least for a short time. Right now, it takes about 20% of the running time just to run info, (of worker thread time, according to the Dask profile dashboard) and we can cut that by a factor of 3. s3fs already caches directory listings, but rfsspec does not, and info() bypasses that anyway - so some work to do.

Conclusions

In brief, here I show a few ways in which you can really push performance for a relatively simple load-group-aggregate workflow on dask-dataframe. It turns out that the new pyarrow-strings flag available in dask is not the biggest level that you can pull, and particularly column selection is critical.