Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(parquet): add byte buffer when disable buffered stream #302

Merged
merged 5 commits into from
Mar 7, 2025

Conversation

joechenrh
Copy link
Contributor

@joechenrh joechenrh commented Mar 4, 2025

Rationale for this change

When we create a reader with BufferedStreamEnabled = false, we have to first create a data slice.

func (r *ReaderProperties) GetStream(source io.ReaderAt, start, nbytes int64) (BufferedReader, error) {
if r.BufferedStreamEnabled {
return utils.NewBufferedReader(io.NewSectionReader(source, start, nbytes), int(r.BufferSize)), nil
}
data := make([]byte, nbytes)
n, err := source.ReadAt(data, start)
if err != nil {
return nil, fmt.Errorf("parquet: tried reading from file, but got error: %w", err)
}
if n != int(nbytes) {
return nil, fmt.Errorf("parquet: tried reading %d bytes starting at position %d from file but only got %d", nbytes, start, n)
}
return utils.NewBufferedReader(bytes.NewReader(data), int(nbytes)), nil
}

And we will create another slice with same size in the NewBufferedReader again, which is redundant.

func NewBufferedReader(rd Reader, sz int) *bufferedReader {
r := &bufferedReader{
rd: rd,
}
r.resizeBuffer(sz)
return r
}

What changes are included in this PR?

  • Add a new struct byteReader, which is a wrapper of bytes.NewReader and implement BufferedReader interface.
  • Remove BufferSize() method from BufferedReader interface

Are these changes tested?

BufferedReader created from bytes.NewReader in tests are all replaced by new function NewByteReader.

Are there any user-facing changes?

No.

@joechenrh joechenrh requested a review from zeroshade as a code owner March 4, 2025 13:33
Comment on lines 534 to 536
rd := utils.NewBufferedReader(
io.NewSectionReader(p.r.Outer(), p.dictOffset-p.baseOffset, p.dataOffset-p.baseOffset),
p.r.BufferSize())
int(parquet.DefaultBufSize))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would remove our respecting of the BufferSize property in the reader properties, we shouldn't be removing the BufferSize() method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So how about change it to:

readBufSize := min(p.dataOffset-p.baseOffset, p.r.BufferSize())
rd := utils.NewBufferedReader(
	io.NewSectionReader(p.r.Outer(), p.dictOffset-p.baseOffset, p.dataOffset-p.baseOffset),
	readBufSize)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that works

@joechenrh
Copy link
Contributor Author

Can I manually rerun the CI?

@kou
Copy link
Member

kou commented Mar 7, 2025

Done!

Copy link
Member

@zeroshade zeroshade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@zeroshade zeroshade merged commit 0f0d667 into apache:main Mar 7, 2025
23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants