Skip to content

Commit

Permalink
Reserve capacity for null padding
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Dec 11, 2023
1 parent 8f23a32 commit 4af59a5
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 14 deletions.
34 changes: 24 additions & 10 deletions parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,14 @@ impl FixedLenByteArrayReader {
data_type: ArrowType,
byte_length: usize,
) -> Self {
let values = FixedLenByteArrayBuffer::new(byte_length);
Self {
data_type,
byte_length,
pages,
def_levels_buffer: None,
rep_levels_buffer: None,
record_reader: GenericRecordReader::new(column_desc),
record_reader: GenericRecordReader::new_with_values(column_desc, values),
}
}
}
Expand Down Expand Up @@ -232,23 +233,34 @@ impl ArrayReader for FixedLenByteArrayReader {
}
}

#[derive(Default)]
struct FixedLenByteArrayBuffer {
buffer: Vec<u8>,
/// The length of each element in bytes
byte_length: Option<usize>,
byte_length: usize,
}

impl FixedLenByteArrayBuffer {
fn new(byte_length: usize) -> Self {
Self {
byte_length,
buffer: Default::default(),
}
}
}

impl ValuesBuffer for FixedLenByteArrayBuffer {
fn reserve(&mut self, additional: usize) {
self.buffer.reserve(additional * self.byte_length);
}

fn pad_nulls(
&mut self,
read_offset: usize,
values_read: usize,
levels_read: usize,
valid_mask: &[u8],
) {
let byte_length = self.byte_length.unwrap_or_default();

let byte_length = self.byte_length;
assert_eq!(self.buffer.len(), (read_offset + values_read) * byte_length);
self.buffer
.resize((read_offset + levels_read) * byte_length, 0);
Expand All @@ -268,6 +280,13 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {
}
}
}

fn take(&mut self) -> Self {
Self {
buffer: std::mem::take(&mut self.buffer),
byte_length: self.byte_length,
}
}
}

struct ValueDecoder {
Expand Down Expand Up @@ -345,11 +364,6 @@ impl ColumnValueDecoder for ValueDecoder {
}

fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
match out.byte_length {
Some(x) => assert_eq!(x, self.byte_length),
None => out.byte_length = Some(self.byte_length),
}

match self.decoder.as_mut().unwrap() {
Decoder::Plain { offset, buf } => {
let to_read =
Expand Down
11 changes: 11 additions & 0 deletions parquet/src/arrow/buffer/dictionary_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ impl<K: ArrowNativeType + Ord, V: OffsetSizeTrait> DictionaryBuffer<K, V> {
}

impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for DictionaryBuffer<K, V> {
fn reserve(&mut self, additional: usize) {
match self {
Self::Dict { keys, .. } => keys.reserve(additional),
Self::Values { values, .. } => values.reserve(additional),
}
}

fn pad_nulls(
&mut self,
read_offset: usize,
Expand All @@ -202,6 +209,10 @@ impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for DictionaryBuffer<K
}
}
}

fn take(&mut self) -> Self {
std::mem::take(self)
}
}

#[cfg(test)]
Expand Down
8 changes: 8 additions & 0 deletions parquet/src/arrow/buffer/offset_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ impl<I: OffsetSizeTrait> OffsetBuffer<I> {
}

impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
fn reserve(&mut self, additional: usize) {
self.offsets.reserve(additional)
}

fn pad_nulls(
&mut self,
read_offset: usize,
Expand Down Expand Up @@ -188,6 +192,10 @@ impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
*x = last_start_offset
}
}

fn take(&mut self) -> Self {
std::mem::take(self)
}
}

#[cfg(test)]
Expand Down
16 changes: 15 additions & 1 deletion parquet/src/arrow/record_reader/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
use crate::arrow::buffer::bit_util::iter_set_bits_rev;

/// A buffer that supports padding with nulls
pub trait ValuesBuffer: Default {
pub trait ValuesBuffer {
/// Reserve space for `additional` values
fn reserve(&mut self, additional: usize);

/// If a column contains nulls, more level data may be read than value data, as null
/// values are not encoded. Therefore, first the levels data is read, the null count
/// determined, and then the corresponding number of values read to a [`ValuesBuffer`].
Expand All @@ -40,9 +43,16 @@ pub trait ValuesBuffer: Default {
levels_read: usize,
valid_mask: &[u8],
);

/// Take the contents of this buffer
fn take(&mut self) -> Self;
}

impl<T: Copy + Default> ValuesBuffer for Vec<T> {
fn reserve(&mut self, additional: usize) {
self.reserve(additional)
}

fn pad_nulls(
&mut self,
read_offset: usize,
Expand All @@ -61,4 +71,8 @@ impl<T: Copy + Default> ValuesBuffer for Vec<T> {
self[level_pos] = self[value_pos];
}
}

fn take(&mut self) -> Self {
std::mem::take(self)
}
}
19 changes: 16 additions & 3 deletions parquet/src/arrow/record_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,28 @@ pub struct GenericRecordReader<V, CV> {

impl<V, CV> GenericRecordReader<V, CV>
where
V: ValuesBuffer,
V: ValuesBuffer + Default,
CV: ColumnValueDecoder<Buffer = V>,
{
/// Create a new [`GenericRecordReader`]
pub fn new(desc: ColumnDescPtr) -> Self {
Self::new_with_values(desc, V::default())
}
}

impl<V, CV> GenericRecordReader<V, CV>
where
V: ValuesBuffer,
CV: ColumnValueDecoder<Buffer = V>,
{
pub fn new_with_values(desc: ColumnDescPtr, values: V) -> Self {
let def_levels = (desc.max_def_level() > 0)
.then(|| DefinitionLevelBuffer::new(&desc, packed_null_mask(&desc)));

let rep_levels = (desc.max_rep_level() > 0).then(Vec::new);

Self {
values: V::default(),
values,
def_levels,
rep_levels,
column_reader: None,
Expand Down Expand Up @@ -169,7 +179,7 @@ where
/// Returns currently stored buffer data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_record_data(&mut self) -> V {
std::mem::take(&mut self.values)
self.values.take()
}

/// Returns currently stored null bitmap data.
Expand All @@ -195,6 +205,9 @@ where

/// Try to read one batch of data returning the number of records read
fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
// Reserve additional space in values for null padding
self.values.reserve(batch_size);

let (records_read, values_read, levels_read) =
self.column_reader.as_mut().unwrap().read_records(
batch_size,
Expand Down

0 comments on commit 4af59a5

Please sign in to comment.