Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

collect(streaming=True).write_parquet() is slow when there are many small chunks #14484

Closed
2 tasks done
itamarst opened this issue Feb 14, 2024 · 1 comment · Fixed by #14487
Closed
2 tasks done

collect(streaming=True).write_parquet() is slow when there are many small chunks #14484

itamarst opened this issue Feb 14, 2024 · 1 comment · Fixed by #14487
Assignees
Labels
bug Something isn't working rust Related to Rust Polars

Comments

@itamarst
Copy link
Contributor

itamarst commented Feb 14, 2024

This is a followup to #11699, covering the case that was originally in #14346 but was removed from the PR before it was merged.

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

The actual issue is in the Rust code.

from datetime import datetime

import polars as pl

start_datetime = datetime(2020, 1, 1)
end_datetime = datetime(
    2024,
    1,
    1,
)
N_ids = 10
interval = "1m"
df = (
    pl.LazyFrame(
        {
            "time": pl.datetime_range(
                start_datetime, end_datetime, interval=interval, eager=True
            )
        }
    )
    .join(
        pl.LazyFrame(
            {"id": [f"id{i:05d}" for i in range(N_ids)], "value": list(range(N_ids))}
        ),
        how="cross",
    )
    .select("time", "id", "value")
)

# print(df.profile(streaming=True))
from time import time

start = time()
df.sink_parquet("/tmp/out.parquet")
print("SINK", time() - start)

start = time()
df.collect().write_parquet("/tmp/out2.parquet")
print("COLLECT+WRITE", time() - start)

start = time()
df.collect(streaming=True).write_parquet("/tmp/out3.parquet")
print("STREAMING COLLECT+WRITE", time() - start)

Log output

SINK 1.0721302032470703
COLLECT+WRITE 0.9235994815826416
STREAMING COLLECT+WRITE 4.254129648208618

Issue description

Certain streaming operations result in small chunks, for example the example script in #11699 (comment). That script does 2.5KB chunks. The overhead of writing parquet with such small chunks means writes are very slow, 4× slower than what they could be with larger chunks of e.g. 4MB.

Expected behavior

collect().write_parquet() and collect(streaming=True).write_parquet() should have similar amounts of time for writing the parquet.

Installed versions

Git commit 126ccc1b65e60f10663c490da83e60f78eec5541.

@itamarst itamarst added bug Something isn't working needs triage Awaiting prioritization by a maintainer rust Related to Rust Polars labels Feb 14, 2024
@itamarst
Copy link
Contributor Author

itamarst commented Feb 14, 2024

I am going to try to fix this. The original proposed fix was merging chunks into larger 4MB sizes, implemented inside OrderedSink, which @ritchie46 didn't like. So instead this will do the merging in write_parquet().

@deanm0000 deanm0000 removed the needs triage Awaiting prioritization by a maintainer label Feb 14, 2024
ritchie46 pushed a commit that referenced this issue Feb 22, 2024
… single chunk (#14484) (#14487)

Co-authored-by: Itamar Turner-Trauring <itamar@pythonspeed.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working rust Related to Rust Polars
Projects
None yet
2 participants