- Sponsor
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: Combine small chunks in sinks for streaming pipelines #14346
perf: Combine small chunks in sinks for streaming pipelines #14346
Conversation
Updates:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice improvement. I have left some comments.
@ritchie46 OK hopefully I've finally understood you, sorry it took so long. If I didn't, please feel free to finish this PR. Final numbers:
I.e. the |
@@ -57,32 +55,7 @@ impl Sink for OrderedSink { | |||
self.sort(); | |||
|
|||
let chunks = std::mem::take(&mut self.chunks); | |||
let mut combiner = StreamingVstacker::default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we would rechunk, we could simply rechunk here. But I don't want to do that as that should be left to the consumer of the streaming engine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case it should probably be done in write_parquet()
, otherwise the collect(streaming=True).write_parquet()
case will continue to be slow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which would requite the new struct be e.g. moved into the polars-core
crate and made public.
Here's the runtime with latest commit on my computer, last case is slow again:
SINK 1.0721302032470703
COLLECT+WRITE 0.9235994815826416
STREAMING COLLECT+WRITE 4.254129648208618
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But maybe also write_feather()
. or the cloud parquet writer. etc.. (Having it in OrderedSink seemed like a low-cost smoothing of performance bumps, limited to a single place.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then that logic should be in write_parquet
indeed. That writer should check the chunk sizes.
I will first merge this one, and then we can follow up with the write_parquet
optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But maybe also
write_feather()
. or the cloud parquet writer. etc.. (Having it in OrderedSink seemed like a low-cost smoothing of performance bumps, limited to a single place.)
Yes, but it is more expensive for other operations. Operations themselves should knie their best chunking strategies.
Thanks for this @itamarst. I think this can be used by more writers. Are you interested in the follow up PR? |
Thanks for all the feedback and help, and yes, I'm happy to do follow-up PR for |
Fixes #11699
Small chunks add significant overhead. This PR merges them in the sink. If there are lots of small chunks, this should make things faster. If there are lots of large chunks, it adds a little overhead but it's fixed and small per chunk, so that's fine. There may however be a slow down in some midway edge cases where the chunk merging involves copying extra data.
This doesn't fix the fact that small chunks are being generated, though... I will file a separate PR for the specific edge case in the reproducer script, and file an issue for another likely case if I can reproduce it.
Benchmark
Before:
After:
Using this script: