Skip to content

Commit

Permalink
Unchecked pad_nulls
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Dec 13, 2023
1 parent 8f23a32 commit 7399917
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 9 deletions.
5 changes: 4 additions & 1 deletion parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Expand Up @@ -254,7 +254,10 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {
.resize((read_offset + levels_read) * byte_length, 0);

let values_range = read_offset..read_offset + values_read;
for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
for (value_pos, level_pos) in values_range
.rev()
.zip(iter_set_bits_rev(valid_mask, read_offset + levels_read))
{
debug_assert!(level_pos >= value_pos);
if level_pos <= value_pos {
break;
Expand Down
7 changes: 3 additions & 4 deletions parquet/src/arrow/buffer/bit_util.rs
Expand Up @@ -25,8 +25,7 @@ pub fn count_set_bits(bytes: &[u8], range: Range<usize>) -> usize {
}

/// Iterates through the set bit positions in `bytes` in reverse order
pub fn iter_set_bits_rev(bytes: &[u8]) -> impl Iterator<Item = usize> + '_ {
let bit_length = bytes.len() * 8;
pub fn iter_set_bits_rev(bytes: &[u8], bit_length: usize) -> impl Iterator<Item = usize> + '_ {
let unaligned = UnalignedBitChunk::new(bytes, 0, bit_length);
let mut chunk_end_idx = bit_length + unaligned.lead_padding() + unaligned.trailing_padding();

Expand Down Expand Up @@ -78,7 +77,7 @@ mod tests {
let mut nulls = BooleanBufferBuilder::new(mask_length);
bools.iter().for_each(|b| nulls.append(*b));

let actual: Vec<_> = iter_set_bits_rev(nulls.as_slice()).collect();
let actual: Vec<_> = iter_set_bits_rev(nulls.as_slice(), mask_length).collect();
let expected: Vec<_> = bools
.iter()
.enumerate()
Expand All @@ -87,7 +86,7 @@ mod tests {
.collect();
assert_eq!(actual, expected);

assert_eq!(iter_set_bits_rev(&[]).count(), 0);
assert_eq!(iter_set_bits_rev(&[], 0).count(), 0);
assert_eq!(count_set_bits(&[], 0..0), 0);
assert_eq!(count_set_bits(&[0xFF], 1..1), 0);

Expand Down
1 change: 0 additions & 1 deletion parquet/src/arrow/buffer/dictionary_buffer.rs
Expand Up @@ -194,7 +194,6 @@ impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for DictionaryBuffer<K
) {
match self {
Self::Dict { keys, .. } => {
keys.resize(read_offset + levels_read, K::default());
keys.pad_nulls(read_offset, values_read, levels_read, valid_mask)
}
Self::Values { values, .. } => {
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/buffer/offset_buffer.rs
Expand Up @@ -161,7 +161,7 @@ impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
for (value_pos, level_pos) in values_range
.clone()
.rev()
.zip(iter_set_bits_rev(valid_mask))
.zip(iter_set_bits_rev(valid_mask, read_offset + levels_read))
{
assert!(level_pos >= value_pos);
assert!(level_pos < last_pos);
Expand Down
9 changes: 7 additions & 2 deletions parquet/src/arrow/record_reader/buffer.rs
Expand Up @@ -50,15 +50,20 @@ impl<T: Copy + Default> ValuesBuffer for Vec<T> {
levels_read: usize,
valid_mask: &[u8],
) {
assert!(values_read <= levels_read);
self.resize(read_offset + levels_read, T::default());

let values_range = read_offset..read_offset + values_read;
for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
for (value_pos, level_pos) in values_range
.rev()
.zip(iter_set_bits_rev(valid_mask, read_offset + levels_read))
{
debug_assert!(level_pos >= value_pos);
if level_pos <= value_pos {
break;
}
self[level_pos] = self[value_pos];
// Safety: indices must be in bounds by construction
unsafe { *self.get_unchecked_mut(level_pos) = *self.get_unchecked(value_pos) }
}
}
}

0 comments on commit 7399917

Please sign in to comment.