Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix an issue with multiple short list rowgroups using the Parquet chunked reader. #15342

Merged
merged 7 commits into from
Mar 20, 2024
68 changes: 36 additions & 32 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ struct split_info {
};

struct cumulative_page_info {
size_t row_index; // row index
size_t size_bytes; // cumulative size in bytes
int key; // schema index
size_t end_row_index; // end row index (exclusive)
vuule marked this conversation as resolved.
Show resolved Hide resolved
size_t size_bytes; // cumulative size in bytes
int key; // schema index
};

// the minimum amount of memory we can safely expect to be enough to
Expand Down Expand Up @@ -260,7 +260,7 @@ struct set_row_index {
auto const& chunk = chunks[page.chunk_idx];
size_t const page_end_row = chunk.start_row + page.chunk_row + page.num_rows;
// if we have been passed in a cap, apply it
c_info[i].row_index = max_row > 0 ? min(max_row, page_end_row) : page_end_row;
c_info[i].end_row_index = max_row > 0 ? min(max_row, page_end_row) : page_end_row;
}
};

Expand Down Expand Up @@ -293,13 +293,13 @@ struct page_total_size {
auto const end = key_offsets[idx + 1];
auto iter = cudf::detail::make_counting_transform_iterator(
0, cuda::proclaim_return_type<size_t>([&] __device__(size_type i) {
return c_info[i].row_index;
return c_info[i].end_row_index;
}));
auto const page_index =
thrust::lower_bound(thrust::seq, iter + start, iter + end, i.row_index) - iter;
thrust::lower_bound(thrust::seq, iter + start, iter + end, i.end_row_index) - iter;
sum += c_info[page_index].size_bytes;
}
return {i.row_index, sum, i.key};
return {i.end_row_index, sum, i.key};
}
};

Expand All @@ -318,18 +318,9 @@ size_t find_start_index(cudf::host_span<cumulative_page_info const> aggregated_i
size_t start_row)
{
auto start = thrust::make_transform_iterator(
aggregated_info.begin(), [&](cumulative_page_info const& i) { return i.row_index; });
auto start_index =
thrust::lower_bound(thrust::host, start, start + aggregated_info.size(), start_row) - start;

// cumulative_page_info.row_index is the -end- of the rows of a given page. so move forward until
// we find the next group of pages
while (start_index < (static_cast<int64_t>(aggregated_info.size()) - 1) &&
(start_index < 0 || aggregated_info[start_index].row_index == start_row)) {
start_index++;
}

return start_index;
aggregated_info.begin(), [&](cumulative_page_info const& i) { return i.end_row_index; });
return thrust::lower_bound(thrust::host, start, start + aggregated_info.size(), start_row) -
start;
}

/**
Expand All @@ -353,16 +344,18 @@ int64_t find_next_split(int64_t cur_pos,
int64_t split_pos = thrust::lower_bound(thrust::seq, start + cur_pos, end, size_limit) - start;

// if we're past the end, or if the returned bucket is > than the chunk_read_limit, move back
// one.
// one. note that in the case where we can't even fit the current set of rows within the size
// limit, this will cause split_pos to go below cur_pos. but that is handled in the loop below.
if (static_cast<size_t>(split_pos) >= sizes.size() ||
(sizes[split_pos].size_bytes - cur_cumulative_size > size_limit)) {
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
split_pos--;
}

// cumulative_page_info.row_index is the -end- of the rows of a given page. so move forward until
// we find the next group of pages
// move forward until we find the next group of pages that will actually advance our row count.
// this guarantees that even if we cannot fit the set of rows represented by our where our cur_pos
// is, we will still move forward instead of failing.
while (split_pos < (static_cast<int64_t>(sizes.size()) - 1) &&
(split_pos < 0 || sizes[split_pos].row_index == cur_row_index)) {
(split_pos < cur_pos || sizes[split_pos].end_row_index == cur_row_index)) {
nvdbaranec marked this conversation as resolved.
Show resolved Hide resolved
split_pos++;
}

Expand Down Expand Up @@ -413,7 +406,7 @@ template <typename T = uint8_t>
struct row_count_less {
__device__ bool operator()(cumulative_page_info const& a, cumulative_page_info const& b) const
{
return a.row_index < b.row_index;
return a.end_row_index < b.end_row_index;
}
};

Expand Down Expand Up @@ -501,10 +494,10 @@ struct page_span {
size_t start, end;
};

struct get_page_row_index {
struct get_page_end_row_index {
device_span<cumulative_page_info const> c_info;

__device__ size_t operator()(size_t i) const { return c_info[i].row_index; }
__device__ size_t operator()(size_t i) const { return c_info[i].end_row_index; }
};

/**
Expand All @@ -514,15 +507,18 @@ struct get_page_row_index {
template <typename RowIndexIter>
struct get_page_span {
device_span<size_type const> page_offsets;
device_span<ColumnChunkDesc const> chunks;
RowIndexIter page_row_index;
size_t const start_row;
size_t const end_row;

get_page_span(device_span<size_type const> _page_offsets,
device_span<ColumnChunkDesc const> _chunks,
RowIndexIter _page_row_index,
size_t _start_row,
size_t _end_row)
: page_offsets(_page_offsets),
chunks(_chunks),
page_row_index(_page_row_index),
start_row(_start_row),
end_row(_end_row)
Expand All @@ -535,12 +531,17 @@ struct get_page_span {
auto const column_page_start = page_row_index + first_page_index;
auto const column_page_end = page_row_index + page_offsets[column_index + 1];
auto const num_pages = column_page_end - column_page_start;
bool const is_list = chunks[column_index].max_level[level_type::REPETITION] > 0;

auto start_page =
(thrust::lower_bound(thrust::seq, column_page_start, column_page_end, start_row) -
column_page_start) +
first_page_index;
if (page_row_index[start_page] == start_row) { start_page++; }
// list rows can span page boundaries, so it is not always safe to assume that the row
// represented by end_row_index starts on the subsequent page. It is possible that
// the values for row end_row_index start within the page itself. so we must
// include the page in that case.
if (page_row_index[start_page] == start_row && !is_list) { start_page++; }

auto end_page = (thrust::lower_bound(thrust::seq, column_page_start, column_page_end, end_row) -
column_page_start) +
Expand Down Expand Up @@ -623,6 +624,7 @@ struct copy_subpass_page {
*
* @param c_info The cumulative page size information (row count and byte size) per column
* @param pages All of the pages in the pass
* @param chunks All of the chunks in the pass
* @param page_offsets Offsets into the pages array representing the first page for each column
* @param start_row The row to start the subpass at
* @param size_limit The size limit in bytes of the subpass
Expand All @@ -636,6 +638,7 @@ struct copy_subpass_page {
std::tuple<rmm::device_uvector<page_span>, size_t, size_t> compute_next_subpass(
device_span<cumulative_page_info const> c_info,
device_span<PageInfo const> pages,
device_span<ColumnChunkDesc const> chunks,
device_span<size_type const> page_offsets,
size_t start_row,
size_t size_limit,
Expand All @@ -658,18 +661,18 @@ std::tuple<rmm::device_uvector<page_span>, size_t, size_t> compute_next_subpass(
start_row == 0 || start_index == 0 ? 0 : h_aggregated_info[start_index - 1].size_bytes;
auto const end_index =
find_next_split(start_index, start_row, cumulative_size, h_aggregated_info, size_limit);
auto const end_row = h_aggregated_info[end_index].row_index;
auto const end_row = h_aggregated_info[end_index].end_row_index;

// for each column, collect the set of pages that spans start_row / end_row
rmm::device_uvector<page_span> page_bounds(num_columns, stream);
auto iter = thrust::make_counting_iterator(size_t{0});
auto page_row_index =
cudf::detail::make_counting_transform_iterator(0, get_page_row_index{c_info});
cudf::detail::make_counting_transform_iterator(0, get_page_end_row_index{c_info});
thrust::transform(rmm::exec_policy_nosync(stream),
iter,
iter + num_columns,
page_bounds.begin(),
get_page_span{page_offsets, page_row_index, start_row, end_row});
get_page_span{page_offsets, chunks, page_row_index, start_row, end_row});

// total page count over all columns
auto page_count_iter = thrust::make_transform_iterator(page_bounds.begin(), get_span_size{});
Expand Down Expand Up @@ -700,13 +703,13 @@ std::vector<row_range> compute_page_splits_by_row(device_span<cumulative_page_in
size_t cur_pos = find_start_index(h_aggregated_info, skip_rows);
size_t cur_row_index = skip_rows;
size_t cur_cumulative_size = 0;
auto const max_row = min(skip_rows + num_rows, h_aggregated_info.back().row_index);
auto const max_row = min(skip_rows + num_rows, h_aggregated_info.back().end_row_index);
while (cur_row_index < max_row) {
auto const split_pos =
find_next_split(cur_pos, cur_row_index, cur_cumulative_size, h_aggregated_info, size_limit);

auto const start_row = cur_row_index;
cur_row_index = min(max_row, h_aggregated_info[split_pos].row_index);
cur_row_index = min(max_row, h_aggregated_info[split_pos].end_row_index);
splits.push_back({start_row, cur_row_index - start_row});
cur_pos = split_pos;
cur_cumulative_size = h_aggregated_info[split_pos].size_bytes;
Expand Down Expand Up @@ -1375,6 +1378,7 @@ void reader::impl::setup_next_subpass(bool uses_custom_row_bounds)
// get the next batch of pages
return compute_next_subpass(c_info,
pass.pages,
pass.chunks,
pass.page_offsets,
pass.processed_rows + pass.skip_rows,
remaining_read_limit,
Expand Down
30 changes: 14 additions & 16 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -700,16 +700,16 @@ struct set_list_row_count_estimate {
struct set_final_row_count {
device_span<PageInfo> pages;
device_span<const ColumnChunkDesc> chunks;
device_span<const size_type> page_offsets;
size_t const max_row;

__device__ void operator()(size_t i)
{
auto const last_page_index = page_offsets[i + 1] - 1;
auto const& page = pages[last_page_index];
auto const& chunk = chunks[page.chunk_idx];
size_t const page_start_row = chunk.start_row + page.chunk_row;
pages[last_page_index].num_rows = max_row - page_start_row;
auto& page = pages[i];
auto const& chunk = chunks[page.chunk_idx];
// only do this for the last page in each chunk
if (i < pages.size() - 1 && (pages[i + 1].chunk_idx == page.chunk_idx)) { return; }
size_t const page_start_row = chunk.start_row + page.chunk_row;
size_t const chunk_last_row = chunk.start_row + chunk.num_rows;
page.num_rows = chunk_last_row - page_start_row;
}
};

Expand Down Expand Up @@ -1300,17 +1300,15 @@ void reader::impl::generate_list_column_row_count_estimates()
chunk_row_output_iter{pass.pages.device_ptr()});
}

// finally, fudge the last page for each column such that it ends on the real known row count
// for the pass. this is so that as we march through the subpasses, we will find that every column
// cleanly ends up the expected row count at the row group boundary.
auto const& last_chunk = pass.chunks[pass.chunks.size() - 1];
auto const num_columns = _input_columns.size();
size_t const max_row = last_chunk.start_row + last_chunk.num_rows;
auto iter = thrust::make_counting_iterator(0);
// to compensate for the list row size estimates, force the row count on the last page for each
// column chunk (each rowgroup) such that it ends on the real known row count. this is so that as
// we march through the subpasses, we will find that every column cleanly ends up the expected row
// count at the row group boundary and our split computations work correctly.
auto iter = thrust::make_counting_iterator(0);
thrust::for_each(rmm::exec_policy_nosync(_stream),
iter,
iter + num_columns,
set_final_row_count{pass.pages, pass.chunks, pass.page_offsets, max_row});
iter + pass.pages.size(),
set_final_row_count{pass.pages, pass.chunks});

pass.chunks.device_to_host_async(_stream);
pass.pages.device_to_host_async(_stream);
Expand Down
86 changes: 82 additions & 4 deletions cpp/tests/io/parquet_chunked_reader_test.cu
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ auto write_file(std::vector<std::unique_ptr<cudf::column>>& input_columns,
return std::pair{std::move(input_table), std::move(filepath)};
}

auto chunked_read(std::string const& filepath,
auto chunked_read(std::vector<std::string> const& filepaths,
std::size_t output_limit,
std::size_t input_limit = 0)
{
auto const read_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}).build();
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepaths}).build();
auto reader = cudf::io::chunked_parquet_reader(output_limit, input_limit, read_opts);

auto num_chunks = 0;
Expand All @@ -141,6 +141,14 @@ auto chunked_read(std::string const& filepath,
return std::pair(cudf::concatenate(out_tviews), num_chunks);
}

auto chunked_read(std::string const& filepath,
std::size_t output_limit,
std::size_t input_limit = 0)
{
std::vector<std::string> vpath{filepath};
return chunked_read(vpath, output_limit, input_limit);
}

} // namespace

struct ParquetChunkedReaderTest : public cudf::test::BaseFixture {};
Expand Down Expand Up @@ -1113,7 +1121,7 @@ TEST_F(ParquetChunkedReaderInputLimitConstrainedTest, SingleFixedWidthColumn)
input_limit_test_write(test_filenames, tbl);

// semi-reasonable limit
constexpr int expected_a[] = {1, 17, 4, 1};
constexpr int expected_a[] = {1, 25, 5, 1};
input_limit_test_read(test_filenames, tbl, 0, 2 * 1024 * 1024, expected_a);
// an unreasonable limit
constexpr int expected_b[] = {1, 50, 50, 1};
Expand Down Expand Up @@ -1145,7 +1153,7 @@ TEST_F(ParquetChunkedReaderInputLimitConstrainedTest, MixedColumns)

input_limit_test_write(test_filenames, tbl);

constexpr int expected_a[] = {1, 50, 10, 7};
constexpr int expected_a[] = {1, 50, 13, 7};
input_limit_test_read(test_filenames, tbl, 0, 2 * 1024 * 1024, expected_a);
constexpr int expected_b[] = {1, 50, 50, 50};
input_limit_test_read(test_filenames, tbl, 0, 1, expected_b);
Expand Down Expand Up @@ -1227,6 +1235,76 @@ TEST_F(ParquetChunkedReaderInputLimitTest, List)
input_limit_test_read(test_filenames, tbl, 128 * 1024 * 1024, 512 * 1024 * 1024, expected_c);
}

void tiny_list_rowgroup_test(bool just_list_col)
{
auto iter = thrust::make_counting_iterator(0);

// test a specific edge case: a list column composed of multiple row groups, where each row
// group contains a single, relatively small row.
std::vector<int> row_sizes{12, 7, 16, 20, 10, 3, 15};
std::vector<std::unique_ptr<cudf::table>> row_groups;
for (size_t idx = 0; idx < row_sizes.size(); idx++) {
std::vector<std::unique_ptr<cudf::column>> cols;

// add a column before the list
if (!just_list_col) {
cudf::test::fixed_width_column_wrapper<int> int_col({idx});
cols.push_back(int_col.release());
}

// write out the single-row list column as it's own file
cudf::test::fixed_width_column_wrapper<int> values(iter, iter + row_sizes[idx]);
cudf::test::fixed_width_column_wrapper<int> offsets({0, row_sizes[idx]});
cols.push_back(cudf::make_lists_column(1, offsets.release(), values.release(), 0, {}));

// add a column after the list
if (!just_list_col) {
cudf::test::fixed_width_column_wrapper<float> float_col({idx});
cols.push_back(float_col.release());
}

auto tbl = std::make_unique<cudf::table>(std::move(cols));

auto filepath = temp_env->get_temp_filepath("Tlrg" + std::to_string(idx));
auto const write_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, *tbl).build();
cudf::io::write_parquet(write_opts);

// store off the table
row_groups.push_back(std::move(tbl));
}

// build expected
std::vector<cudf::table_view> views;
std::transform(row_groups.begin(),
row_groups.end(),
std::back_inserter(views),
[](std::unique_ptr<cudf::table> const& tbl) { return tbl->view(); });
auto expected = cudf::concatenate(views);

// load the individual files all at once
std::vector<std::string> source_files;
std::transform(iter, iter + row_groups.size(), std::back_inserter(source_files), [](int i) {
return temp_env->get_temp_filepath("Tlrg" + std::to_string(i));
});
auto result =
chunked_read(source_files, size_t{2} * 1024 * 1024 * 1024, size_t{2} * 1024 * 1024 * 1024);

CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, *(result.first));
}

TEST_F(ParquetChunkedReaderInputLimitTest, TinyListRowGroupsSingle)
{
// test with just a single list column
tiny_list_rowgroup_test(true);
}

TEST_F(ParquetChunkedReaderInputLimitTest, TinyListRowGroupsMixed)
{
// test with other columns mixed in
tiny_list_rowgroup_test(false);
}

struct char_values {
__device__ int8_t operator()(int i)
{
Expand Down