Skip to content

Commit

Permalink
feat(core): Implement Exact Buf Writer (#2917)
Browse files Browse the repository at this point in the history
* Add split support

Signed-off-by: Xuanwo <github@xuanwo.io>

* Update comments

Signed-off-by: Xuanwo <github@xuanwo.io>

* Implement exact buf write

Signed-off-by: Xuanwo <github@xuanwo.io>

* Add fuzzing test for cursor

Signed-off-by: Xuanwo <github@xuanwo.io>

* Implement

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix typo

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix clippy

Signed-off-by: Xuanwo <github@xuanwo.io>

---------

Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Aug 24, 2023
1 parent 5c8d9bb commit cbac9b8
Show file tree
Hide file tree
Showing 8 changed files with 567 additions and 14 deletions.
1 change: 1 addition & 0 deletions .typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@
# Random strings.
"Dum" = "Dum"
"ba" = "ba"
"Hel" = "Hel"
# Showed up in examples.
"thw" = "thw"
270 changes: 269 additions & 1 deletion core/src/raw/oio/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::cmp::Ordering;
use std::collections::VecDeque;
use std::io::Read;
use std::io::SeekFrom;
Expand Down Expand Up @@ -220,6 +221,103 @@ impl ChunkedCursor {
pub fn push(&mut self, bs: Bytes) {
self.inner.push_back(bs);
}

/// split_off will split the cursor into two cursors at given size.
///
/// After split, `self` will contains the `0..at` part and the returned cursor contains
/// `at..` parts.
///
/// # Panics
///
/// - Panics if `at > len`
/// - Panics if `idx != 0`, the cursor must be reset before split.
pub fn split_off(&mut self, at: usize) -> Self {
assert!(
at <= self.len(),
"split_off at must smaller than current size"
);
assert_eq!(self.idx, 0, "split_off must reset cursor first");

let mut chunks = VecDeque::new();
let mut size = self.len() - at;

while let Some(mut bs) = self.inner.pop_back() {
match size.cmp(&bs.len()) {
Ordering::Less => {
let remaining = bs.split_off(bs.len() - size);
chunks.push_front(remaining);
self.inner.push_back(bs);
break;
}
Ordering::Equal => {
chunks.push_front(bs);
break;
}
Ordering::Greater => {
size -= bs.len();
chunks.push_front(bs);
}
}
}

Self {
inner: chunks,
idx: 0,
}
}

/// split_to will split the cursor into two cursors at given size.
///
/// After split, `self` will contains the `at..` part and the returned cursor contains
/// `0..at` parts.
///
/// # Panics
///
/// - Panics if `at > len`
/// - Panics if `idx != 0`, the cursor must be reset before split.
pub fn split_to(&mut self, at: usize) -> Self {
assert!(
at <= self.len(),
"split_to at must smaller than current size"
);
assert_eq!(self.idx, 0, "split_to must reset cursor first");

let mut chunks = VecDeque::new();
let mut size = at;

while let Some(mut bs) = self.inner.pop_front() {
match size.cmp(&bs.len()) {
Ordering::Less => {
let remaining = bs.split_off(size);
chunks.push_back(bs);
self.inner.push_front(remaining);
break;
}
Ordering::Equal => {
chunks.push_back(bs);
break;
}
Ordering::Greater => {
size -= bs.len();
chunks.push_back(bs);
}
}
}

Self {
inner: chunks,
idx: 0,
}
}

#[cfg(test)]
fn concat(&self) -> Bytes {
let mut bs = BytesMut::new();
for v in &self.inner {
bs.extend_from_slice(v);
}
bs.freeze()
}
}

impl oio::Stream for ChunkedCursor {
Expand Down Expand Up @@ -401,9 +499,15 @@ impl VectorCursor {

#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use rand::thread_rng;
use rand::Rng;
use rand::RngCore;
use sha2::Digest;
use sha2::Sha256;

use super::*;
use crate::raw::oio::StreamExt;
use pretty_assertions::assert_eq;

#[test]
fn test_vector_cursor() {
Expand Down Expand Up @@ -457,4 +561,168 @@ mod tests {

Ok(())
}

#[test]
fn test_chunked_cursor_split_to() {
let mut base = ChunkedCursor::new();
base.push(Bytes::from("Hello"));
base.push(Bytes::from("Wor"));
base.push(Bytes::from("ld"));

// Case 1: split less than first chunk
let mut c1 = base.clone();
let c2 = c1.split_to(3);

assert_eq!(c1.len(), 7);
assert_eq!(
&c1.inner,
&[Bytes::from("lo"), Bytes::from("Wor"), Bytes::from("ld")]
);

assert_eq!(c2.len(), 3);
assert_eq!(&c2.inner, &[Bytes::from("Hel")]);

// Case 2: split larger than first chunk
let mut c1 = base.clone();
let c2 = c1.split_to(6);

assert_eq!(c1.len(), 4);
assert_eq!(&c1.inner, &[Bytes::from("or"), Bytes::from("ld")]);

assert_eq!(c2.len(), 6);
assert_eq!(&c2.inner, &[Bytes::from("Hello"), Bytes::from("W")]);

// Case 3: split at chunk edge
let mut c1 = base.clone();
let c2 = c1.split_to(8);

assert_eq!(c1.len(), 2);
assert_eq!(&c1.inner, &[Bytes::from("ld")]);

assert_eq!(c2.len(), 8);
assert_eq!(&c2.inner, &[Bytes::from("Hello"), Bytes::from("Wor")]);
}

#[test]
fn test_chunked_cursor_split_off() {
let mut base = ChunkedCursor::new();
base.push(Bytes::from("Hello"));
base.push(Bytes::from("Wor"));
base.push(Bytes::from("ld"));

// Case 1: split less than first chunk
let mut c1 = base.clone();
let c2 = c1.split_off(3);

assert_eq!(c1.len(), 3);
assert_eq!(&c1.inner, &[Bytes::from("Hel")]);

assert_eq!(c2.len(), 7);
assert_eq!(
&c2.inner,
&[Bytes::from("lo"), Bytes::from("Wor"), Bytes::from("ld")]
);

// Case 2: split larger than first chunk
let mut c1 = base.clone();
let c2 = c1.split_off(6);

assert_eq!(c1.len(), 6);
assert_eq!(&c1.inner, &[Bytes::from("Hello"), Bytes::from("W")]);

assert_eq!(c2.len(), 4);
assert_eq!(&c2.inner, &[Bytes::from("or"), Bytes::from("ld")]);

// Case 3: split at chunk edge
let mut c1 = base.clone();
let c2 = c1.split_off(8);

assert_eq!(c1.len(), 8);
assert_eq!(&c1.inner, &[Bytes::from("Hello"), Bytes::from("Wor")]);

assert_eq!(c2.len(), 2);
assert_eq!(&c2.inner, &[Bytes::from("ld")]);
}

#[test]
fn test_fuzz_chunked_cursor_split_to() {
let mut rng = thread_rng();
let mut expected = vec![];
let mut total_size = 0;

let mut cursor = ChunkedCursor::new();

// Build Cursor
let count = rng.gen_range(1..1000);
for _ in 0..count {
let size = rng.gen_range(1..100);
let mut content = vec![0; size];
rng.fill_bytes(&mut content);
total_size += size;

expected.extend_from_slice(&content);
cursor.push(Bytes::from(content));
}

// Test Cursor
for _ in 0..count {
let mut cursor = cursor.clone();

let at = rng.gen_range(0..total_size);
let to = cursor.split_to(at);

assert_eq!(cursor.len(), total_size - at);
assert_eq!(
format!("{:x}", Sha256::digest(&cursor.concat())),
format!("{:x}", Sha256::digest(&expected[at..])),
);

assert_eq!(to.len(), at);
assert_eq!(
format!("{:x}", Sha256::digest(&to.concat())),
format!("{:x}", Sha256::digest(&expected[0..at])),
);
}
}

#[test]
fn test_fuzz_chunked_cursor_split_off() {
let mut rng = thread_rng();
let mut expected = vec![];
let mut total_size = 0;

let mut cursor = ChunkedCursor::new();

// Build Cursor
let count = rng.gen_range(1..1000);
for _ in 0..count {
let size = rng.gen_range(1..100);
let mut content = vec![0; size];
rng.fill_bytes(&mut content);
total_size += size;

expected.extend_from_slice(&content);
cursor.push(Bytes::from(content));
}

// Test Cursor
for _ in 0..count {
let mut cursor = cursor.clone();

let at = rng.gen_range(0..total_size);
let off = cursor.split_off(at);

assert_eq!(cursor.len(), at);
assert_eq!(
format!("{:x}", Sha256::digest(&cursor.concat())),
format!("{:x}", Sha256::digest(&expected[..at])),
);

assert_eq!(off.len(), total_size - at);
assert_eq!(
format!("{:x}", Sha256::digest(&off.concat())),
format!("{:x}", Sha256::digest(&expected[at..])),
);
}
}
}
6 changes: 4 additions & 2 deletions core/src/raw/oio/stream/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::ready;
use std::task::Context;
use std::task::Poll;
use std::task::{ready, Context};

use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use bytes::BytesMut;
use pin_project::pin_project;

use crate::*;
Expand Down
16 changes: 11 additions & 5 deletions core/src/raw/oio/write/at_least_buf_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,21 @@
// specific language governing permissions and limitations
// under the License.

use crate::raw::oio::{StreamExt, Streamer};
use crate::raw::*;
use crate::*;
use async_trait::async_trait;
use bytes::Bytes;

/// AtLeastBufWrite is used to implement [`Write`] based on at least buffer.
use crate::raw::oio::StreamExt;
use crate::raw::oio::Streamer;
use crate::raw::*;
use crate::*;

/// AtLeastBufWriter is used to implement [`oio::Write`] based on at least buffer strategy: flush
/// the underlying storage when the buffered size is larger.
///
/// AtLeastBufWriter makes sure that the size of the data written to the underlying storage is at
/// least `buffer_size` bytes. It's useful when the underlying storage has a minimum size limit.
///
/// Users can wrap a writer and a buffer together.
/// For example, S3 requires at least 5MiB for multipart uploads.
pub struct AtLeastBufWriter<W: oio::Write> {
inner: W,

Expand Down

0 comments on commit cbac9b8

Please sign in to comment.