Skip to content

Commit

Permalink
Use Vec in ColumnReader (apache#5177)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Dec 8, 2023
1 parent 2a213bc commit 51bbdbf
Show file tree
Hide file tree
Showing 17 changed files with 256 additions and 573 deletions.
34 changes: 17 additions & 17 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ use crate::schema::types::ColumnDescPtr;
use arrow_array::{
Array, ArrayRef, BinaryArray, Decimal128Array, Decimal256Array, OffsetSizeTrait,
};
use arrow_buffer::{i256, Buffer};
use arrow_buffer::i256;
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;

/// Returns an [`ArrayReader`] that decodes the provided byte array column
Expand Down Expand Up @@ -79,8 +78,8 @@ pub fn make_byte_array_reader(
struct ByteArrayReader<I: OffsetSizeTrait> {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
rep_levels_buffer: Option<Buffer>,
def_levels_buffer: Option<Vec<i16>>,
rep_levels_buffer: Option<Vec<i16>>,
record_reader: GenericRecordReader<OffsetBuffer<I>, ByteArrayColumnValueDecoder<I>>,
}

Expand Down Expand Up @@ -154,11 +153,11 @@ impl<I: OffsetSizeTrait> ArrayReader for ByteArrayReader<I> {
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
self.def_levels_buffer.as_deref()
}

fn get_rep_levels(&self) -> Option<&[i16]> {
self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
self.rep_levels_buffer.as_deref()
}
}

Expand All @@ -170,7 +169,7 @@ struct ByteArrayColumnValueDecoder<I: OffsetSizeTrait> {
}

impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayColumnValueDecoder<I> {
type Slice = OffsetBuffer<I>;
type Buffer = OffsetBuffer<I>;

fn new(desc: &ColumnDescPtr) -> Self {
let validate_utf8 = desc.converted_type() == ConvertedType::UTF8;
Expand Down Expand Up @@ -227,13 +226,13 @@ impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayColumnValueDecoder<I> {
Ok(())
}

fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize> {
fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
let decoder = self
.decoder
.as_mut()
.ok_or_else(|| general_err!("no decoder set"))?;

decoder.read(out, range.end - range.start, self.dict.as_ref())
decoder.read(out, num_values, self.dict.as_ref())
}

fn skip_values(&mut self, num_values: usize) -> Result<usize> {
Expand Down Expand Up @@ -590,6 +589,7 @@ mod tests {
use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column};
use crate::arrow::record_reader::buffer::ValuesBuffer;
use arrow_array::{Array, StringArray};
use arrow_buffer::Buffer;

#[test]
fn test_byte_array_decoder() {
Expand All @@ -607,20 +607,20 @@ mod tests {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);

assert_eq!(output.values.as_slice(), "hello".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5]);

assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1);
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(output.values.as_slice(), "helloworld".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5, 10]);

assert_eq!(decoder.read(&mut output, 2..4).unwrap(), 2);
assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
assert_eq!(output.values.as_slice(), "helloworldab".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 11, 12]);

assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);
assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);

let valid = [false, false, true, true, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
Expand Down Expand Up @@ -662,19 +662,19 @@ mod tests {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);

assert_eq!(output.values.as_slice(), "hello".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5]);

assert_eq!(decoder.skip_values(1).unwrap(), 1);
assert_eq!(decoder.skip_values(1).unwrap(), 1);

assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1);
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(output.values.as_slice(), "hellob".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5, 6]);

assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);
assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);

let valid = [false, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
Expand Down Expand Up @@ -705,7 +705,7 @@ mod tests {
for (encoding, page) in pages.clone() {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, None).unwrap();
assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
}

// test nulls skip
Expand Down
45 changes: 24 additions & 21 deletions parquet/src/arrow/array_reader/byte_array_dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@

use std::any::Any;
use std::marker::PhantomData;
use std::ops::Range;
use std::sync::Arc;

use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
use arrow_buffer::{ArrowNativeType, Buffer};
use arrow_buffer::ArrowNativeType;
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;

use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain};
use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::{dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer};
use crate::arrow::record_reader::buffer::BufferQueue;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
Expand Down Expand Up @@ -124,8 +122,8 @@ pub fn make_byte_array_dictionary_reader(
struct ByteArrayDictionaryReader<K: ArrowNativeType, V: OffsetSizeTrait> {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
rep_levels_buffer: Option<Buffer>,
def_levels_buffer: Option<Vec<i16>>,
rep_levels_buffer: Option<Vec<i16>>,
record_reader: GenericRecordReader<DictionaryBuffer<K, V>, DictionaryDecoder<K, V>>,
}

Expand Down Expand Up @@ -183,11 +181,11 @@ where
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
self.def_levels_buffer.as_deref()
}

fn get_rep_levels(&self) -> Option<&[i16]> {
self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
self.rep_levels_buffer.as_deref()
}
}

Expand Down Expand Up @@ -224,7 +222,7 @@ where
K: FromBytes + Ord + ArrowNativeType,
V: OffsetSizeTrait,
{
type Slice = DictionaryBuffer<K, V>;
type Buffer = DictionaryBuffer<K, V>;

fn new(col: &ColumnDescPtr) -> Self {
let validate_utf8 = col.converted_type() == ConvertedType::UTF8;
Expand Down Expand Up @@ -306,16 +304,16 @@ where
Ok(())
}

fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize> {
fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
match self.decoder.as_mut().expect("decoder set") {
MaybeDictionaryDecoder::Fallback(decoder) => {
decoder.read(out.spill_values()?, range.end - range.start, None)
decoder.read(out.spill_values()?, num_values, None)
}
MaybeDictionaryDecoder::Dict {
decoder,
max_remaining_values,
} => {
let len = (range.end - range.start).min(*max_remaining_values);
let len = num_values.min(*max_remaining_values);

let dict = self
.dict
Expand All @@ -332,8 +330,12 @@ where
Some(keys) => {
// Happy path - can just copy keys
// Keys will be validated on conversion to arrow
let keys_slice = keys.get_output_slice(len);
let len = decoder.get_batch(keys_slice)?;

// TODO: Push vec into decoder (#5177)
let start = keys.len();
keys.resize(start + len, K::default());
let len = decoder.get_batch(&mut keys[start..])?;
keys.truncate(start + len);
*max_remaining_values -= len;
Ok(len)
}
Expand Down Expand Up @@ -381,6 +383,7 @@ where
mod tests {
use arrow::compute::cast;
use arrow_array::{Array, StringArray};
use arrow_buffer::Buffer;

use crate::arrow::array_reader::test_util::{
byte_array_all_encodings, encode_dictionary, utf8_column,
Expand Down Expand Up @@ -416,15 +419,15 @@ mod tests {
.unwrap();

let mut output = DictionaryBuffer::<i32, i32>::default();
assert_eq!(decoder.read(&mut output, 0..3).unwrap(), 3);
assert_eq!(decoder.read(&mut output, 3).unwrap(), 3);

let mut valid = vec![false, false, true, true, false, true];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
output.pad_nulls(0, 3, valid.len(), valid_buffer.as_slice());

assert!(matches!(output, DictionaryBuffer::Dict { .. }));

assert_eq!(decoder.read(&mut output, 0..4).unwrap(), 4);
assert_eq!(decoder.read(&mut output, 4).unwrap(), 4);

valid.extend_from_slice(&[false, false, true, true, false, true, true, false]);
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
Expand Down Expand Up @@ -484,17 +487,17 @@ mod tests {
let mut output = DictionaryBuffer::<i32, i32>::default();

// read two skip one
assert_eq!(decoder.read(&mut output, 0..2).unwrap(), 2);
assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
assert_eq!(decoder.skip_values(1).unwrap(), 1);

assert!(matches!(output, DictionaryBuffer::Dict { .. }));

// read two skip one
assert_eq!(decoder.read(&mut output, 2..4).unwrap(), 2);
assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
assert_eq!(decoder.skip_values(1).unwrap(), 1);

// read one and test on skip at the end
assert_eq!(decoder.read(&mut output, 4..5).unwrap(), 1);
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(decoder.skip_values(4).unwrap(), 0);

let valid = [true, true, true, true, true];
Expand Down Expand Up @@ -536,7 +539,7 @@ mod tests {

for (encoding, page) in pages {
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 4);
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 4);
}
let array = output.into_array(None, &data_type).unwrap();
assert_eq!(array.data_type(), &data_type);
Expand Down Expand Up @@ -580,7 +583,7 @@ mod tests {
for (encoding, page) in pages {
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
decoder.skip_values(2).expect("skipping two values");
assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 2);
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 2);
}
let array = output.into_array(None, &data_type).unwrap();
assert_eq!(array.data_type(), &data_type);
Expand Down Expand Up @@ -641,7 +644,7 @@ mod tests {
for (encoding, page) in pages.clone() {
let mut output = DictionaryBuffer::<i32, i32>::default();
decoder.set_data(encoding, page, 8, None).unwrap();
assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);

output.pad_nulls(0, 0, 8, &[0]);
let array = output
Expand Down

0 comments on commit 51bbdbf

Please sign in to comment.