Skip to content

Commit

Permalink
feat(types): remove blocking operation range_read and range_reader API (
Browse files Browse the repository at this point in the history
#2912)

* feat(types): add read_with and reader_with API to blocking operator

* feat(types): remove blocking operator range_read and range_reader API

* feat(types): fix code
  • Loading branch information
oowl committed Aug 23, 2023
1 parent ab94da0 commit 581cd9f
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 60 deletions.
116 changes: 64 additions & 52 deletions core/src/types/operator/blocking_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

use std::io::Read;
use std::ops::RangeBounds;

use bytes::Bytes;

Expand Down Expand Up @@ -230,54 +229,61 @@ impl BlockingOperator {
/// # }
/// ```
pub fn read(&self, path: &str) -> Result<Vec<u8>> {
self.range_read(path, ..)
self.read_with(path).call()
}

/// Read the specified range of path into a bytes.
/// Read the whole path into a bytes with extra options.
///
/// This function will allocate a new bytes internally. For more precise memory control or
/// reading data lazily, please use [`BlockingOperator::range_reader`]
/// reading data lazily, please use [`BlockingOperator::reader`]
///
/// # Examples
///
/// ```no_run
/// # use std::io::Result;
/// # use opendal::BlockingOperator;
/// # use futures::TryStreamExt;
/// # use opendal::Scheme;
/// # use anyhow::Result;
/// use opendal::BlockingOperator;
/// use opendal::EntryMode;
/// use opendal::Metakey;
/// # fn test(op: BlockingOperator) -> Result<()> {
/// let bs = op.range_read("path/to/file", 1024..2048)?;
/// let bs = op.read_with("path/to/file").range(0..10).call()?;
/// # Ok(())
/// # }
/// ```
pub fn range_read(&self, path: &str, range: impl RangeBounds<u64>) -> Result<Vec<u8>> {
pub fn read_with(&self, path: &str) -> FunctionRead {
let path = normalize_path(path);

if !validate_path(&path, EntryMode::FILE) {
return Err(
Error::new(ErrorKind::IsADirectory, "read path is a directory")
.with_operation("BlockingOperator::range_read")
.with_context("service", self.info().scheme().into_static())
.with_context("path", &path),
);
}
FunctionRead(OperatorFunction::new(
self.inner().clone(),
path,
OpRead::default(),
|inner, path, args| {
if !validate_path(&path, EntryMode::FILE) {
return Err(
Error::new(ErrorKind::IsADirectory, "read path is a directory")
.with_operation("BlockingOperator::read_with")
.with_context("service", inner.info().scheme().into_static())
.with_context("path", &path),
);
}

let br = BytesRange::from(range);
let (rp, mut s) = self
.inner()
.blocking_read(&path, OpRead::new().with_range(br))?;

let mut buffer = Vec::with_capacity(rp.into_metadata().content_length() as usize);
s.read_to_end(&mut buffer).map_err(|err| {
Error::new(ErrorKind::Unexpected, "blocking range read failed")
.with_operation("BlockingOperator::range_read")
.with_context("service", self.info().scheme().into_static())
.with_context("path", path)
.with_context("range", br.to_string())
.set_source(err)
})?;

Ok(buffer)
let (rp, mut s) = inner.blocking_read(&path, args)?;
let mut buffer = Vec::with_capacity(rp.into_metadata().content_length() as usize);

match s.read_to_end(&mut buffer) {
Ok(n) => {
buffer.truncate(n);
Ok(buffer)
}
Err(err) => Err(
Error::new(ErrorKind::Unexpected, "blocking read_with failed")
.with_operation("BlockingOperator::read_with")
.with_context("service", inner.info().scheme().into_static())
.with_context("path", &path)
.set_source(err),
),
}
},
))
}

/// Create a new reader which can read the whole path.
Expand All @@ -294,37 +300,43 @@ impl BlockingOperator {
/// # }
/// ```
pub fn reader(&self, path: &str) -> Result<BlockingReader> {
self.range_reader(path, ..)
self.reader_with(path).call()
}

/// Create a new reader which can read the specified range.
/// Create a new reader with extra options
///
/// # Examples
///
/// ```no_run
/// # use std::io::Result;
/// # use opendal::BlockingOperator;
/// # use futures::TryStreamExt;
/// # use anyhow::Result;
/// use opendal::BlockingOperator;
/// use opendal::EntryMode;
/// use opendal::Metakey;
/// # fn test(op: BlockingOperator) -> Result<()> {
/// let r = op.range_reader("path/to/file", 1024..2048)?;
/// let r = op.reader_with("path/to/file").range(0..10).call()?;
/// # Ok(())
/// # }
/// ```
pub fn range_reader(&self, path: &str, range: impl RangeBounds<u64>) -> Result<BlockingReader> {
pub fn reader_with(&self, path: &str) -> FunctionReader {
let path = normalize_path(path);

if !validate_path(&path, EntryMode::FILE) {
return Err(
Error::new(ErrorKind::IsADirectory, "read path is a directory")
.with_operation("BlockingOperator::range_reader")
.with_context("service", self.info().scheme().into_static())
.with_context("path", &path),
);
}

let op = OpRead::new().with_range(range.into());
FunctionReader(OperatorFunction::new(
self.inner().clone(),
path,
OpRead::default(),
|inner, path, args| {
if !validate_path(&path, EntryMode::FILE) {
return Err(
Error::new(ErrorKind::IsADirectory, "reader path is a directory")
.with_operation("BlockingOperator::reader_with")
.with_context("service", inner.info().scheme().into_static())
.with_context("path", &path),
);
}

BlockingReader::create(self.inner().clone(), &path, op)
BlockingReader::create(inner.clone(), &path, args)
},
))
}

/// Write bytes into given path.
Expand Down
40 changes: 40 additions & 0 deletions core/src/types/operator/operator_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
//!
//! By using functions, users can add more options for operation.

use std::ops::RangeBounds;

use bytes::Bytes;
use flagset::FlagSet;

Expand Down Expand Up @@ -189,3 +191,41 @@ impl FunctionLister {
self.0.call()
}
}

/// Function that generated by [`BlockingOperator::read_with`].
///
/// Users can add more options by public functions provided by this struct.
pub struct FunctionRead(pub(crate) OperatorFunction<OpRead, Vec<u8>>);

impl FunctionRead {
/// Set the range for this operation.
pub fn range(mut self, range: impl RangeBounds<u64>) -> Self {
self.0 = self.0.map_args(|args| args.with_range(range.into()));
self
}

/// Call the function to consume all the input and generate a
/// result.
pub fn call(self) -> Result<Vec<u8>> {
self.0.call()
}
}

/// Function that generated by [`BlockingOperator::reader_with`].
///
/// Users can add more options by public functions provided by this struct.
pub struct FunctionReader(pub(crate) OperatorFunction<OpRead, BlockingReader>);

impl FunctionReader {
/// Set the range for this operation.
pub fn range(mut self, range: impl RangeBounds<u64>) -> Self {
self.0 = self.0.map_args(|args| args.with_range(range.into()));
self
}

/// Call the function to consume all the input and generate a
/// result.
pub fn call(self) -> Result<BlockingReader> {
self.0.call()
}
}
6 changes: 3 additions & 3 deletions core/tests/behavior/blocking_read_only.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub fn behavior_blocking_read_only_tests(op: &Operator) -> Vec<Trial> {
test_blocking_read_only_stat_special_chars,
test_blocking_read_only_stat_not_exist,
test_blocking_read_only_read_full,
test_blocking_read_only_read_range,
test_blocking_read_only_read_with_range,
test_blocking_read_only_read_not_exist
)
}
Expand Down Expand Up @@ -88,8 +88,8 @@ pub fn test_blocking_read_only_read_full(op: BlockingOperator) -> Result<()> {
}

/// Read full content should match.
pub fn test_blocking_read_only_read_range(op: BlockingOperator) -> Result<()> {
let bs = op.range_read("normal_file", 1024..2048)?;
pub fn test_blocking_read_only_read_with_range(op: BlockingOperator) -> Result<()> {
let bs = op.read_with("normal_file").range(1024..2048).call()?;
assert_eq!(bs.len(), 1024, "read size");
assert_eq!(
format!("{:x}", Sha256::digest(&bs)),
Expand Down
16 changes: 11 additions & 5 deletions core/tests/behavior/blocking_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ pub fn test_blocking_read_range(op: BlockingOperator) -> Result<()> {
op.write(&path, content.clone())
.expect("write must succeed");

let bs = op.range_read(&path, offset..offset + length)?;
let bs = op.read_with(&path).range(offset..offset + length).call()?;
assert_eq!(bs.len() as u64, length, "read size");
assert_eq!(
format!("{:x}", Sha256::digest(&bs)),
Expand Down Expand Up @@ -258,7 +258,7 @@ pub fn test_blocking_read_large_range(op: BlockingOperator) -> Result<()> {
op.write(&path, content.clone())
.expect("write must succeed");

let bs = op.range_read(&path, offset..u32::MAX as u64)?;
let bs = op.read_with(&path).range(offset..u32::MAX as u64).call()?;
assert_eq!(
bs.len() as u64,
size as u64 - offset,
Expand Down Expand Up @@ -298,7 +298,10 @@ pub fn test_blocking_fuzz_range_reader(op: BlockingOperator) -> Result<()> {
.expect("write must succeed");

let mut fuzzer = ObjectReaderFuzzer::new(&path, content.clone(), 0, content.len());
let mut o = op.range_reader(&path, 0..content.len() as u64)?;
let mut o = op
.reader_with(&path)
.range(0..content.len() as u64)
.call()?;

for _ in 0..100 {
match fuzzer.fuzz() {
Expand Down Expand Up @@ -335,7 +338,7 @@ pub fn test_blocking_fuzz_offset_reader(op: BlockingOperator) -> Result<()> {
.expect("write must succeed");

let mut fuzzer = ObjectReaderFuzzer::new(&path, content.clone(), 0, content.len());
let mut o = op.range_reader(&path, 0..)?;
let mut o = op.reader_with(&path).range(0..).call()?;

for _ in 0..100 {
match fuzzer.fuzz() {
Expand Down Expand Up @@ -373,7 +376,10 @@ pub fn test_blocking_fuzz_part_reader(op: BlockingOperator) -> Result<()> {
.expect("write must succeed");

let mut fuzzer = ObjectReaderFuzzer::new(&path, content, offset as usize, length as usize);
let mut o = op.range_reader(&path, offset..offset + length)?;
let mut o = op
.reader_with(&path)
.range(offset..offset + length)
.call()?;

for _ in 0..100 {
match fuzzer.fuzz() {
Expand Down

0 comments on commit 581cd9f

Please sign in to comment.