Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Vec in ColumnReader (#5177) #5193

Merged
merged 2 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 17 additions & 17 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ use crate::schema::types::ColumnDescPtr;
use arrow_array::{
Array, ArrayRef, BinaryArray, Decimal128Array, Decimal256Array, OffsetSizeTrait,
};
use arrow_buffer::{i256, Buffer};
use arrow_buffer::i256;
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;

/// Returns an [`ArrayReader`] that decodes the provided byte array column
Expand Down Expand Up @@ -79,8 +78,8 @@ pub fn make_byte_array_reader(
struct ByteArrayReader<I: OffsetSizeTrait> {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
rep_levels_buffer: Option<Buffer>,
def_levels_buffer: Option<Vec<i16>>,
rep_levels_buffer: Option<Vec<i16>>,
record_reader: GenericRecordReader<OffsetBuffer<I>, ByteArrayColumnValueDecoder<I>>,
}

Expand Down Expand Up @@ -154,11 +153,11 @@ impl<I: OffsetSizeTrait> ArrayReader for ByteArrayReader<I> {
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
self.def_levels_buffer.as_deref()
}

fn get_rep_levels(&self) -> Option<&[i16]> {
self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
self.rep_levels_buffer.as_deref()
}
}

Expand All @@ -170,7 +169,7 @@ struct ByteArrayColumnValueDecoder<I: OffsetSizeTrait> {
}

impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayColumnValueDecoder<I> {
type Slice = OffsetBuffer<I>;
type Buffer = OffsetBuffer<I>;

fn new(desc: &ColumnDescPtr) -> Self {
let validate_utf8 = desc.converted_type() == ConvertedType::UTF8;
Expand Down Expand Up @@ -227,13 +226,13 @@ impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayColumnValueDecoder<I> {
Ok(())
}

fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize> {
fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
Copy link
Member

@viirya viirya Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So previously seems you can pick up where (i.e., range.start) to start the read, now you must use skip_values to skip values?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, actually I also don't see how range.start is used to skip value. Maybe it is just no skip?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a weird quirk of how this API allowed using slices that didn't track their position, this is no longer necessary

let decoder = self
.decoder
.as_mut()
.ok_or_else(|| general_err!("no decoder set"))?;

decoder.read(out, range.end - range.start, self.dict.as_ref())
decoder.read(out, num_values, self.dict.as_ref())
}

fn skip_values(&mut self, num_values: usize) -> Result<usize> {
Expand Down Expand Up @@ -590,6 +589,7 @@ mod tests {
use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column};
use crate::arrow::record_reader::buffer::ValuesBuffer;
use arrow_array::{Array, StringArray};
use arrow_buffer::Buffer;

#[test]
fn test_byte_array_decoder() {
Expand All @@ -607,20 +607,20 @@ mod tests {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);

assert_eq!(output.values.as_slice(), "hello".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5]);

assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1);
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(output.values.as_slice(), "helloworld".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5, 10]);

assert_eq!(decoder.read(&mut output, 2..4).unwrap(), 2);
assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
assert_eq!(output.values.as_slice(), "helloworldab".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 11, 12]);

assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);
assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);

let valid = [false, false, true, true, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
Expand Down Expand Up @@ -662,19 +662,19 @@ mod tests {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);

assert_eq!(output.values.as_slice(), "hello".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5]);

assert_eq!(decoder.skip_values(1).unwrap(), 1);
assert_eq!(decoder.skip_values(1).unwrap(), 1);

assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1);
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(output.values.as_slice(), "hellob".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5, 6]);

assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);
assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);

let valid = [false, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
Expand Down Expand Up @@ -705,7 +705,7 @@ mod tests {
for (encoding, page) in pages.clone() {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, None).unwrap();
assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
}

// test nulls skip
Expand Down
45 changes: 24 additions & 21 deletions parquet/src/arrow/array_reader/byte_array_dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,16 @@

use std::any::Any;
use std::marker::PhantomData;
use std::ops::Range;
use std::sync::Arc;

use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
use arrow_buffer::{ArrowNativeType, Buffer};
use arrow_buffer::ArrowNativeType;
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;

use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain};
use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::{dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer};
use crate::arrow::record_reader::buffer::BufferQueue;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
Expand Down Expand Up @@ -124,8 +122,8 @@ pub fn make_byte_array_dictionary_reader(
struct ByteArrayDictionaryReader<K: ArrowNativeType, V: OffsetSizeTrait> {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
def_levels_buffer: Option<Buffer>,
rep_levels_buffer: Option<Buffer>,
def_levels_buffer: Option<Vec<i16>>,
rep_levels_buffer: Option<Vec<i16>>,
record_reader: GenericRecordReader<DictionaryBuffer<K, V>, DictionaryDecoder<K, V>>,
}

Expand Down Expand Up @@ -183,11 +181,11 @@ where
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
self.def_levels_buffer.as_deref()
}

fn get_rep_levels(&self) -> Option<&[i16]> {
self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
self.rep_levels_buffer.as_deref()
}
}

Expand Down Expand Up @@ -224,7 +222,7 @@ where
K: FromBytes + Ord + ArrowNativeType,
V: OffsetSizeTrait,
{
type Slice = DictionaryBuffer<K, V>;
type Buffer = DictionaryBuffer<K, V>;

fn new(col: &ColumnDescPtr) -> Self {
let validate_utf8 = col.converted_type() == ConvertedType::UTF8;
Expand Down Expand Up @@ -306,16 +304,16 @@ where
Ok(())
}

fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize> {
fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
match self.decoder.as_mut().expect("decoder set") {
MaybeDictionaryDecoder::Fallback(decoder) => {
decoder.read(out.spill_values()?, range.end - range.start, None)
decoder.read(out.spill_values()?, num_values, None)
}
MaybeDictionaryDecoder::Dict {
decoder,
max_remaining_values,
} => {
let len = (range.end - range.start).min(*max_remaining_values);
let len = num_values.min(*max_remaining_values);

let dict = self
.dict
Expand All @@ -332,8 +330,12 @@ where
Some(keys) => {
// Happy path - can just copy keys
// Keys will be validated on conversion to arrow
let keys_slice = keys.get_output_slice(len);
let len = decoder.get_batch(keys_slice)?;

// TODO: Push vec into decoder (#5177)
let start = keys.len();
keys.resize(start + len, K::default());
let len = decoder.get_batch(&mut keys[start..])?;
keys.truncate(start + len);
*max_remaining_values -= len;
Ok(len)
}
Expand Down Expand Up @@ -381,6 +383,7 @@ where
mod tests {
use arrow::compute::cast;
use arrow_array::{Array, StringArray};
use arrow_buffer::Buffer;

use crate::arrow::array_reader::test_util::{
byte_array_all_encodings, encode_dictionary, utf8_column,
Expand Down Expand Up @@ -416,15 +419,15 @@ mod tests {
.unwrap();

let mut output = DictionaryBuffer::<i32, i32>::default();
assert_eq!(decoder.read(&mut output, 0..3).unwrap(), 3);
assert_eq!(decoder.read(&mut output, 3).unwrap(), 3);

let mut valid = vec![false, false, true, true, false, true];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
output.pad_nulls(0, 3, valid.len(), valid_buffer.as_slice());

assert!(matches!(output, DictionaryBuffer::Dict { .. }));

assert_eq!(decoder.read(&mut output, 0..4).unwrap(), 4);
assert_eq!(decoder.read(&mut output, 4).unwrap(), 4);

valid.extend_from_slice(&[false, false, true, true, false, true, true, false]);
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
Expand Down Expand Up @@ -484,17 +487,17 @@ mod tests {
let mut output = DictionaryBuffer::<i32, i32>::default();

// read two skip one
assert_eq!(decoder.read(&mut output, 0..2).unwrap(), 2);
assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
assert_eq!(decoder.skip_values(1).unwrap(), 1);

assert!(matches!(output, DictionaryBuffer::Dict { .. }));

// read two skip one
assert_eq!(decoder.read(&mut output, 2..4).unwrap(), 2);
assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
assert_eq!(decoder.skip_values(1).unwrap(), 1);

// read one and test on skip at the end
assert_eq!(decoder.read(&mut output, 4..5).unwrap(), 1);
assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(decoder.skip_values(4).unwrap(), 0);

let valid = [true, true, true, true, true];
Expand Down Expand Up @@ -536,7 +539,7 @@ mod tests {

for (encoding, page) in pages {
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 4);
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 4);
}
let array = output.into_array(None, &data_type).unwrap();
assert_eq!(array.data_type(), &data_type);
Expand Down Expand Up @@ -580,7 +583,7 @@ mod tests {
for (encoding, page) in pages {
decoder.set_data(encoding, page, 4, Some(4)).unwrap();
decoder.skip_values(2).expect("skipping two values");
assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 2);
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 2);
}
let array = output.into_array(None, &data_type).unwrap();
assert_eq!(array.data_type(), &data_type);
Expand Down Expand Up @@ -641,7 +644,7 @@ mod tests {
for (encoding, page) in pages.clone() {
let mut output = DictionaryBuffer::<i32, i32>::default();
decoder.set_data(encoding, page, 8, None).unwrap();
assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);

output.pad_nulls(0, 0, 8, &[0]);
let array = output
Expand Down