Foreword
Benchmarking is hard and usually biased by the author's experience and viewpoint, which strongly affects what they choose to benchmark and how many "tricks" they know to optimise performance.
This article is an extension of benchmarking on the Coiled blog, which showed that using PyArrow string rather than python string objects is beneficial for the workflow presented. I don't disagree with that conclusion, but would like to explore the performance details further.
As a side-note, the possible
advantages of PyArrow string storage was part of the discussion when Pandas decided to
switch to using PyArrow as the default Parquet loading engine (a decision Dask later
followed). As the author of fastparquet
, the previous
default, I clearly have a vested interest in showing that the package is still a performance
contender, so take my observations with some salt.
Setup
As in the original benchmark, we will be timing the following workflow:
df = dd.read_parquet("s3://coiled-datasets/uber-lyft-tlc/", ...)
df["tipped"] = df.tips != 0
df.groupby(df.hvfhs_license_num).tipped.mean().compute()
The timing will be done on the last line only, so the IO and shuffle-compute are both contributing, but IO dominates. The data are 25GiB of parquet in 720 files in a public access bucket on AWS S3. It will be only the first line (with the ellipsis) that we will be changing. Column "hvfhs_license_num" is strings with a small number of unique values and "tips" is float32. All are SNAPPY compressed.
For the timings here, there were 10 dask workers with 1 thread and 4GB of memory each, in a Kubernetes cluster via dask-gateway in AWS us-east-1. This is not the same cluster setup as the original! The same cluster was used throughout, and there was no significant memory pressure at any point. All versions were at the current latest (dask 2023.4.0, pyarrow 11.0.0, pandas 2.0.1, fastparquet 2023.4.0). Each time is the best of three repeats.
Runs
1. Baseline (pyarrow with python strings)
dask.config.set({"dataframe.convert-string": False})
df = dd.read_parquet(
"s3://coiled-datasets/uber-lyft-tlc/",
storage_options={"anon": True},
)
Time: 4m3s
Everything here is default
2. Use fastparquet instead
df = dd.read_parquet(
"s3://coiled-datasets/uber-lyft-tlc/",
storage_options={"anon": True},
engine="fastparquet",
)
Time: 2m40s
Yes, here is my big bias. Whenever someone says something can be improved in arrow,
I try with fastparquet
. What do you know? It's magically much better in this case.
I can't answer for particularly why this would be the case. Note that fastparquet
produces columns of python string objects when the base parquet type is UTF8.
3. Fastparquet with categories
df = dd.read_parquet(
"s3://coiled-datasets/uber-lyft-tlc/",
storage_options={"anon": True},
engine="fastparquet",
categories=["hvfhs_license_num"],
)
Time: 2m48s
This was the original supposition I was interested in: the grouping column is actually stored as dict-encoded in the files, so loading as pandas categorical should be much faster, but it made... almost no difference at all. More on this below.
Pyarrow does not allow you to load this data as categorical.
4. Pyarrow and pyarrow string
dask.config.set({"dataframe.convert-string": True})
df = dd.read_parquet(
"s3://coiled-datasets/uber-lyft-tlc/",
storage_options={"anon": True},
engine="pyarrow",
)
Time: 2m17s
This was the thesis of the original article, that the new string storage mechanism
should be much faster. It provides a decent boost over PyArrow (with Python string),
and is also better than fastparquet
, above.
I do not have the knowledge to be able to tweak pyarrow further, but note that
this is still using s3fs as the library to fetch bytes (a discussion about using
pyarrow's own s3 implementation is another reason I wanted to chase this topic).
5. Rust implementation of s3fs and fastparquet
fsspec.register_implementation("s3", rfsspec.RustyS3FileSystem, clobber=True)
df = dd.read_parquet(
"s3://coiled-datasets/uber-lyft-tlc/",
storage_options={"anon": True, "region": "us-east-2"},
engine="fastparquet",
)
Times: 2m5s
This is the same as 2., but the S3 transfer is being done by
rfsspec, which is still very experimental.
Note that rfsspec requires that the region is provided, so some speed boost will
be coming from avoiding HTTP redirection in all S3 calls. Also, rfsspec
has a larger
default buffer size, so there might be fewer requests here.
All the rest of the runs below use rfsspec.
6. Now with parquet-specific file access
df = dd.read_parquet(
"s3://coiled-datasets/uber-lyft-tlc/",
storage_options={"anon": True, "region": "us-east-2"},
engine="fastparquet",
open_file_options={"precache_options": {"method": "parquet"}},
)
Times: 2m5s
Please see this article for a motivation and description of using the footer information of a parquet file to know exactly which byte-ranges will be needed, and prospectively/concurrently fetching them. We see that it makes no difference whatever, which is extremely fishy!
7. Next, specify the columns manually
df = dd.read_parquet(
"s3://coiled-datasets/uber-lyft-tlc/",
storage_options={"anon": True, "region": "us-east-2"},
columns=["tips", "hvfhs_license_num"],
index=False,
engine="fastparquet",
open_file_options={"precache_options": {"method": "parquet"}},
)
Time: 49s
Bingo! This has made the biggest difference so far. So, it seems that dask did not figure out, that we only needed those two columns from the source, and maybe all the bytes of every file were being loaded every time. That also explains why the "precache-option" (6.) didn't make any difference - we were still loading the whole thing - and why categorising our groupby column (3.) didn't help - it was only affecting one column of many.
8. Add categories back in
df = dd.read_parquet(
"s3://coiled-datasets/uber-lyft-tlc/",
storage_options={"anon": True, "region": "us-east-2"},
columns=["tips", "hvfhs_license_num"],
index=False,
engine="fastparquet",
open_file_options={"precache_options": {"method": "parquet"}},
categories=["hvfhs_license_num"],
)
Time: 39s
This combines 7. and 3. Now we see that categorisation makes a decent difference, since we don't need to make python strings, and grouping on a code number is much faster too. This also has the best memory footprint (two bytes per value for the categorical column).
Remaining low-hanging fruit?
If 720 files are being processed in 10 workers in 40s, that means very roughly half a second each, assuming the final aggregations are a small fraction of the total. At this point, latency talking with the remote store starts to matter, as the number of bytes in the parquet headers and actual data bytes in the two columns don't account to much against AWS-to-AWS throughput.
Doing profiling, it turns out that each task calls fs.info
three times per input file: once to check whether it is a file, once to get the file
size (so you can read the footer with the parquet metadata) and once when it is
opened again to fetch data. s3fs wants to have file information available, so that
it can require an ETag match on the target, and avoid corrupted data should the target
get overwritten during IO. However, we should be able to cache these details at least for
a short time. Right now, it takes about 20% of the running time just to run info
,
(of worker thread time, according to the Dask profile dashboard)
and we can cut that by a factor of 3. s3fs already caches directory listings, but
rfsspec does not, and info() bypasses that anyway - so some work to do.
Conclusions
In brief, here I show a few ways in which you can really push performance for a relatively simple load-group-aggregate workflow on dask-dataframe. It turns out that the new pyarrow-strings flag available in dask is not the biggest level that you can pull, and particularly column selection is critical.