Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: derive boundary order when writing #5110

Merged
merged 5 commits into from Nov 28, 2023

Conversation

Jefffrey
Copy link
Contributor

Which issue does this PR close?

Closes #5074

Rationale for this change

What changes are included in this PR?

Change ColumnIndexBuilder to add new append method which incrementally keeps track of sort state of min/max values of data pages to eventually derive the boundary_order thrift field

Are there any user-facing changes?

@github-actions github-actions bot added the parquet Changes to the parquet crate label Nov 22, 2023
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry a bit about the performance impact of this, especially as I think for strings it will perform multiple allocations per-value.

Perhaps we might do something simpler whereby the user can specify the boundary order via WriterProperties or something? This would also avoid potential issues where the boundary order configuration becomes data-dependent, which users might find surprising

@Jefffrey
Copy link
Contributor Author

I worry a bit about the performance impact of this, especially as I think for strings it will perform multiple allocations per-value.

The casting back to ParquetValueType which would require allocation is quite ugly, I agree. ColumnIndexBuilder seemed the most natural and easiest place to put this boundary order check logic, as it'll be easier to keep track as new values are appended, but by that point the values are already pure bytes.

Could try investigate pushing this check logic earlier up the chain, such as to here:

/// Update the column index and offset index when adding the data page
fn update_column_offset_index(&mut self, page_statistics: Option<&Statistics>) {
// update the column index
let null_page =
(self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
// a page contains only null values,
// and writers have to set the corresponding entries in min_values and max_values to byte[0]
if null_page && self.column_index_builder.valid() {
self.column_index_builder.append(
null_page,
vec![0; 1],
vec![0; 1],
self.page_metrics.num_page_nulls as i64,
);
} else if self.column_index_builder.valid() {
// from page statistics
// If can't get the page statistics, ignore this column/offset index for this column chunk
match &page_statistics {
None => {
self.column_index_builder.to_invalid();
}
Some(stat) => {
// We only truncate if the data is represented as binary
match self.descr.physical_type() {
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
self.column_index_builder.append(
null_page,
self.truncate_min_value(
self.props.column_index_truncate_length(),
stat.min_bytes(),
)
.0,
self.truncate_max_value(
self.props.column_index_truncate_length(),
stat.max_bytes(),
)
.0,
self.page_metrics.num_page_nulls as i64,
);
}
_ => {
self.column_index_builder.append(
null_page,
stat.min_bytes().to_vec(),
stat.max_bytes().to_vec(),
self.page_metrics.num_page_nulls as i64,
);
}
}
}
}
}
// update the offset index
self.offset_index_builder
.append_row_count(self.page_metrics.num_buffered_rows as i64);
}

Since I think Statistics would hold the original values as ParquetValueType.

Alternatively could try find a way to get ParquetValueType from bytes without allocation (though that kind of behaviour I'd assume would be unsafe to some degree).

Could also try make ColumnIndexBuilder generic over ParquetValueType and then cast the values to bytes when building to thrift, after all checks are completed.

Perhaps we might do something simpler whereby the user can specify the boundary order via WriterProperties or something? This would also avoid potential issues where the boundary order configuration becomes data-dependent, which users might find surprising

I'm not sure this should be writer configurable, as if I'm understanding correctly then boundary_order has to be data-dependent, unless the writer knows in advance the sort order for the min/max values of all pages in the column.

@tustvold
Copy link
Contributor

I'm not sure this should be writer configurable, as if I'm understanding correctly then boundary_order has to be data-dependent, unless the writer knows in advance the sort order for the min/max values of all pages in the column.

My understanding of the use-case for this feature is where you do know this, e.g. because you've configured the query producing the data to ensure this.

@Jefffrey
Copy link
Contributor Author

Jefffrey commented Nov 22, 2023

I see your point, though the implementations of arrow c++

https://github.com/apache/arrow/blob/862792132297f0ca519c83e524e59c7d685298e8/cpp/src/parquet/page_index.cc#L551-L601

And impala:

https://github.com/apache/impala/blob/ae848a6cefc59b027644d7ea54ab365593f4fc6e/be/src/exec/parquet/parquet-column-stats.inline.h#L49-L72

Seem to point towards this property being dynamically derived, at least from the code snippets I've seen

@tustvold
Copy link
Contributor

Aah I see this is being done per-page, not per-value that is a lot less problematic 😄

Could try investigate pushing this check logic earlier up the chain

Might be worth a try, if it turns into a mess let me know and we can proceed as written here

@Jefffrey Jefffrey marked this pull request as draft November 26, 2023 01:21
Copy link
Contributor Author

@Jefffrey Jefffrey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried pushing the boundary check logic up out of ColumnIndexBuilder

If this approach is preferable then will proceed with this and add the tests (or can revert to previous approach)

@tustvold

Comment on lines 731 to 755
let null_page = (self.page_metrics.num_buffered_rows as u64)
== self.page_metrics.num_page_nulls;
if !null_page {
if let Some((latest_min, latest_max)) = &self.latest_non_null_data_page_min_max
{
if self.data_page_boundary_ascending {
// If latest min/max are greater than new min/max then not ascending anymore
let not_ascending = compare_greater(&self.descr, latest_min, &min)
|| compare_greater(&self.descr, latest_max, &max);
if not_ascending {
self.data_page_boundary_ascending = false;
}
}

if self.data_page_boundary_descending {
// If new min/max are greater than latest min/max then not descending anymore
let not_descending = compare_greater(&self.descr, &min, latest_min)
|| compare_greater(&self.descr, &max, latest_max);
if not_descending {
self.data_page_boundary_descending = false;
}
}
}
self.latest_non_null_data_page_min_max = Some((min.clone(), max.clone()));
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried putting the incremental check here instead of inside update_column_offset_index(..) as I couldn't figure out an easy way to get the T: ParquetValueType out of a Statistics enum

One caveat about putting this check here is that it compares the min/maxes before truncation occurs, though I think this should still be ok.

Copy link
Contributor

@tustvold tustvold Nov 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whilst I think this is correct, perhaps you could just change update_column_offset_index to instead take Option<&ValueStatistics<T>>? This would likely make the existing logic faster, and would make this logic perhaps slightly easier to follow - it is a little surprising that boundary_order is being updated outside of update_column_offset_index

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like where this is headed, sorry for taking so long to review

parquet/src/column/writer/mod.rs Outdated Show resolved Hide resolved
parquet/src/column/writer/mod.rs Outdated Show resolved Hide resolved
parquet/src/column/writer/mod.rs Outdated Show resolved Hide resolved
Comment on lines 731 to 755
let null_page = (self.page_metrics.num_buffered_rows as u64)
== self.page_metrics.num_page_nulls;
if !null_page {
if let Some((latest_min, latest_max)) = &self.latest_non_null_data_page_min_max
{
if self.data_page_boundary_ascending {
// If latest min/max are greater than new min/max then not ascending anymore
let not_ascending = compare_greater(&self.descr, latest_min, &min)
|| compare_greater(&self.descr, latest_max, &max);
if not_ascending {
self.data_page_boundary_ascending = false;
}
}

if self.data_page_boundary_descending {
// If new min/max are greater than latest min/max then not descending anymore
let not_descending = compare_greater(&self.descr, &min, latest_min)
|| compare_greater(&self.descr, &max, latest_max);
if not_descending {
self.data_page_boundary_descending = false;
}
}
}
self.latest_non_null_data_page_min_max = Some((min.clone(), max.clone()));
}
Copy link
Contributor

@tustvold tustvold Nov 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whilst I think this is correct, perhaps you could just change update_column_offset_index to instead take Option<&ValueStatistics<T>>? This would likely make the existing logic faster, and would make this logic perhaps slightly easier to follow - it is a little surprising that boundary_order is being updated outside of update_column_offset_index

@Jefffrey Jefffrey marked this pull request as ready for review November 27, 2023 11:47
@Jefffrey
Copy link
Contributor Author

I've refactored as per suggestion, and it worked out quite well

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good, my only question concerns the correctness of the null handling, I couldn't find information on how nulls are supposed to be handled...

&[
&[Some(-10), Some(10)],
&[Some(-5), Some(11)],
&[None],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some documentation of this behaviour in the parquet spec, or failing that an example implementation doing similar. Normally I would have expected a null to break the ordering as it would break the ability to do binary search

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is this example test in arrow c++ that shows null pages doesn't have an effect on ordering:

https://github.com/apache/arrow/blob/862792132297f0ca519c83e524e59c7d685298e8/cpp/src/parquet/arrow/arrow_reader_writer_test.cc#L5597-L5610

Same here:

https://github.com/apache/arrow/blob/84c15da1997559c37841dc16f9e2c70c643dd9d2/cpp/src/parquet/page_index_test.cc#L567-L581

Though I do agree that the Parquet spec is lacking explicit documentation on how null pages are handled

@tustvold tustvold merged commit 34a816d into apache:master Nov 28, 2023
16 checks passed
@Jefffrey Jefffrey deleted the parquet_boundary_order branch November 28, 2023 11:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Parquet: derive boundary order when writing columns
2 participants