Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GenericColumnReader::read_records Yields Truncated Records #5150

Closed
ogrman opened this issue Nov 30, 2023 · 10 comments · Fixed by #5193
Closed

GenericColumnReader::read_records Yields Truncated Records #5150

ogrman opened this issue Nov 30, 2023 · 10 comments · Fixed by #5193
Assignees
Labels
bug enhancement Any new improvement worthy of a entry in the changelog parquet Changes to the parquet crate parquet-derive

Comments

@ogrman
Copy link

ogrman commented Nov 30, 2023

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Prior to version 41 of the parquet crate we had access to the read_batch function, which was deprecated (and changed) in favor of read_records. What we are trying to does not seem to be possible with the new API. We have a program that reads parquet files and concatenates them vertically, so that a number of parquet files with identical schemas become one file.

We did this by, for each input file and column:

loop {
    let (values_read, levels_read) = column_reader.read_batch(
        BATCH_SIZE,
        Some(&mut def_levels[..]),
        Some(&mut rep_levels[..]),
        &mut value_buffer[..],
    )?;
    
    if values_read == 0 && levels_read == 0 {
      break;
    }
    
    let values_written = column_writer.write_batch(
        &value_buffer[0..values_read],
        Some(&def_levels[0..levels_read]),
        Some(&rep_levels[0..levels_read]),
    )?;
    
    assert_eq!(values_written, values_read);
}

This simple loop turned many "small" files into one large file, with the same schema. After this change when we replace the call to read_batch with a call to read_records we will no longer get a complete batch which means that sometimes we will start writing a new batch while rep_levels is still 1.

Describe the solution you'd like

A way to simply read a complete batch without fiddling around with realigning our buffers between writes. I am also open to suggestions for why what we are doing is better solved in a different way, but the code we have works great with previous versions of parquet and we are currently blocked from upgrading.

Describe alternatives you've considered

Manually finding the last element with rep_levels = 0 and stopping our reads there, doing some math, writing a batch excluding the end of the buffers, copying the end of the buffers to the start of our buffers, and reading fewer records according to how much space is already used in the buffers.

@ogrman ogrman added the enhancement Any new improvement worthy of a entry in the changelog label Nov 30, 2023
@tustvold
Copy link
Contributor

tustvold commented Nov 30, 2023

we will no longer get a complete batch which means that sometimes we will start writing a new batch while rep_levels is still 1.

I'm confused by this, read_records was added and read_batch deprecated precisely because read_batch didn't guarantee this, whereas read_records does

@ogrman
Copy link
Author

ogrman commented Nov 30, 2023

we will no longer get a complete batch which means that sometimes we will start writing a new batch while rep_levels is still 1.

I'm confused by this, read_records was added and read_batch deprecated precisely because read_batch didn't guarantee this, whereas read_records does

You shouldn't be, I'm aware that I'm most likely the confused one here. We've been using the above code and it's been working fine for years. When we upgraded the parquet dep this code started crashing due with the naive implementation and I'm aware that the answer to this question might be to rewrite it.

In this case we know that the input is well-formed, and we expected there to be something akin to write_records for the write case. If we had a function that just let us write more records that would also be fine.

@tustvold
Copy link
Contributor

tustvold commented Nov 30, 2023

Perhaps you could share a reproducer of the crash, aa making the "naive" version correct was the motivation for the change, and so it should work. Previously it would have only been working because the writer wasn't strict about torn records - fixed in #4327

@VladimirBramstedt
Copy link

To clarify, i think #4327 (comment) your comment there is relevant, as that is the error we are running into. and the parquet files that we wrote previously are used at scale globally from AWS athena and presto readers, so they are very much not invalid.

@tustvold
Copy link
Contributor

tustvold commented Nov 30, 2023

so they are very much not invalid

There is context on the linked ticket on how they are invalid, they will cause subtle bugs for systems performing predicate pushdown using the page index - something at least historically Athena/Presto/Trino has not supported.

However, taking a step back here the whole purpose of read_records is to avoid users needing to concern themselves with this, and make the naive loop "just work". I am therefore interested in what has gone wrong here?

@VladimirBramstedt
Copy link

There is context on the linked ticket on how they are invalid, they will cause subtle bugs for systems performing predicate pushdown using the page index - something at least historically Athena/Presto/Trino has not supported.

Ok, when you say "invalid" my assumption was that you meant "broken in an unsable state", not that it may cause some issues on certain use cases. my mistake.
That is good to know, and i guess thats why we never ran into any issues (using athena and presto).

However, taking a step back here the whole purpose of read_records is to avoid users needing to concern themselves with this, and make the naive loop "just work". I am therefore interested in what has gone wrong here?

As are we. we'll try and get a minimal/sanitized reprod.

@ogrman
Copy link
Author

ogrman commented Dec 6, 2023

Here is a link to a reproduction:

https://github.com/ogrman/test-parquet-read-records

The output from this program is:

% cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.40s
     Running `target/debug/parquet-bug`
parquet file created: 405 bytes
reader: reading row group 0
reader: reading column 0
reader: 1 records read
reader: 5 values read
reader: 5 levels read
writer: 5 values written
reader: 1 records read
reader: 3 values read
reader: 3 levels read
Error: Parquet error: Write must start at a record boundary, got non-zero repetition level of 1

read_records claims that the next repetition level, if any, will be 0, but that does not seem to be the case here. If course this would work if we were to increase the buffer size, so that all values for the column would fit in there but for our actual case that would not be feasible due to the amount of data.

My expectation for this test program would be that read_records would read 1 record, 4 values and 4 levels in both of the calls.

@tustvold
Copy link
Contributor

tustvold commented Dec 6, 2023

Aah I see what the issue here is, thank you for the reproducer.

The problem is read_records will currently return incomplete reads if there isn't sufficient buffer space to accommodate the requested number of records. This is fine for the arrow APIs as RecordReader ensures that it then grows the buffers and reads out the remaining data. Unfortunately RecordReader is currently crate private, extremely specific to how the arrow decoding process works, and not really something I would want to expose.

On the flip-side ColumnWriter needs to ensure it has complete records, as otherwise write_mini_batch might flush a page with a partial record, which as discussed above is in contravention of both the standard and the expectations of many readers.

I will see if I can't make read_records behave the way it is documented to behave, and never return truncated records 😅

@tustvold tustvold self-assigned this Dec 6, 2023
@tustvold tustvold added the bug label Dec 6, 2023
@tustvold tustvold changed the title Reintroduce read_batch in GenericColumnReader GenericColumnReader::read_records Yields Truncated Records Dec 6, 2023
@tustvold tustvold added the parquet Changes to the parquet crate label Jan 5, 2024
@tustvold
Copy link
Contributor

tustvold commented Jan 5, 2024

label_issue.py automatically added labels {'parquet'} from #5193

@tustvold
Copy link
Contributor

tustvold commented Jan 5, 2024

label_issue.py automatically added labels {'parquet-derive'} from #5193

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug enhancement Any new improvement worthy of a entry in the changelog parquet Changes to the parquet crate parquet-derive
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants