Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(types): remove blocking operation range_read and range_reader API #2912

Merged
merged 3 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
116 changes: 64 additions & 52 deletions core/src/types/operator/blocking_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

use std::io::Read;
use std::ops::RangeBounds;

use bytes::Bytes;

Expand Down Expand Up @@ -230,54 +229,61 @@ impl BlockingOperator {
/// # }
/// ```
pub fn read(&self, path: &str) -> Result<Vec<u8>> {
self.range_read(path, ..)
self.read_with(path).call()
}

/// Read the specified range of path into a bytes.
/// Read the whole path into a bytes with extra options.
///
/// This function will allocate a new bytes internally. For more precise memory control or
/// reading data lazily, please use [`BlockingOperator::range_reader`]
/// reading data lazily, please use [`BlockingOperator::reader`]
///
/// # Examples
///
/// ```no_run
/// # use std::io::Result;
/// # use opendal::BlockingOperator;
/// # use futures::TryStreamExt;
/// # use opendal::Scheme;
/// # use anyhow::Result;
/// use opendal::BlockingOperator;
/// use opendal::EntryMode;
/// use opendal::Metakey;
/// # fn test(op: BlockingOperator) -> Result<()> {
/// let bs = op.range_read("path/to/file", 1024..2048)?;
/// let bs = op.read_with("path/to/file").range(0..10).call()?;
/// # Ok(())
/// # }
/// ```
pub fn range_read(&self, path: &str, range: impl RangeBounds<u64>) -> Result<Vec<u8>> {
pub fn read_with(&self, path: &str) -> FunctionRead {
let path = normalize_path(path);

if !validate_path(&path, EntryMode::FILE) {
return Err(
Error::new(ErrorKind::IsADirectory, "read path is a directory")
.with_operation("BlockingOperator::range_read")
.with_context("service", self.info().scheme().into_static())
.with_context("path", &path),
);
}
FunctionRead(OperatorFunction::new(
self.inner().clone(),
path,
OpRead::default(),
|inner, path, args| {
if !validate_path(&path, EntryMode::FILE) {
return Err(
Error::new(ErrorKind::IsADirectory, "read path is a directory")
.with_operation("BlockingOperator::read_with")
.with_context("service", inner.info().scheme().into_static())
.with_context("path", &path),
);
}

let br = BytesRange::from(range);
let (rp, mut s) = self
.inner()
.blocking_read(&path, OpRead::new().with_range(br))?;

let mut buffer = Vec::with_capacity(rp.into_metadata().content_length() as usize);
s.read_to_end(&mut buffer).map_err(|err| {
Error::new(ErrorKind::Unexpected, "blocking range read failed")
.with_operation("BlockingOperator::range_read")
.with_context("service", self.info().scheme().into_static())
.with_context("path", path)
.with_context("range", br.to_string())
.set_source(err)
})?;

Ok(buffer)
let (rp, mut s) = inner.blocking_read(&path, args)?;
let mut buffer = Vec::with_capacity(rp.into_metadata().content_length() as usize);

match s.read_to_end(&mut buffer) {
Ok(n) => {
buffer.truncate(n);
Ok(buffer)
}
Err(err) => Err(
Error::new(ErrorKind::Unexpected, "blocking read_with failed")
.with_operation("BlockingOperator::read_with")
.with_context("service", inner.info().scheme().into_static())
.with_context("path", &path)
.set_source(err),
),
}
},
))
}

/// Create a new reader which can read the whole path.
Expand All @@ -294,37 +300,43 @@ impl BlockingOperator {
/// # }
/// ```
pub fn reader(&self, path: &str) -> Result<BlockingReader> {
self.range_reader(path, ..)
self.reader_with(path).call()
}

/// Create a new reader which can read the specified range.
/// Create a new reader with extra options
///
/// # Examples
///
/// ```no_run
/// # use std::io::Result;
/// # use opendal::BlockingOperator;
/// # use futures::TryStreamExt;
/// # use anyhow::Result;
/// use opendal::BlockingOperator;
/// use opendal::EntryMode;
/// use opendal::Metakey;
/// # fn test(op: BlockingOperator) -> Result<()> {
/// let r = op.range_reader("path/to/file", 1024..2048)?;
/// let r = op.reader_with("path/to/file").range(0..10).call()?;
/// # Ok(())
/// # }
/// ```
pub fn range_reader(&self, path: &str, range: impl RangeBounds<u64>) -> Result<BlockingReader> {
pub fn reader_with(&self, path: &str) -> FunctionReader {
let path = normalize_path(path);

if !validate_path(&path, EntryMode::FILE) {
return Err(
Error::new(ErrorKind::IsADirectory, "read path is a directory")
.with_operation("BlockingOperator::range_reader")
.with_context("service", self.info().scheme().into_static())
.with_context("path", &path),
);
}

let op = OpRead::new().with_range(range.into());
FunctionReader(OperatorFunction::new(
self.inner().clone(),
path,
OpRead::default(),
|inner, path, args| {
if !validate_path(&path, EntryMode::FILE) {
return Err(
Error::new(ErrorKind::IsADirectory, "reader path is a directory")
.with_operation("BlockingOperator::reader_with")
.with_context("service", inner.info().scheme().into_static())
.with_context("path", &path),
);
}

BlockingReader::create(self.inner().clone(), &path, op)
BlockingReader::create(inner.clone(), &path, args)
},
))
}

/// Write bytes into given path.
Expand Down
40 changes: 40 additions & 0 deletions core/src/types/operator/operator_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
//!
//! By using functions, users can add more options for operation.

use std::ops::RangeBounds;

use bytes::Bytes;
use flagset::FlagSet;

Expand Down Expand Up @@ -189,3 +191,41 @@ impl FunctionLister {
self.0.call()
}
}

/// Function that generated by [`BlockingOperator::read_with`].
///
/// Users can add more options by public functions provided by this struct.
pub struct FunctionRead(pub(crate) OperatorFunction<OpRead, Vec<u8>>);

impl FunctionRead {
/// Set the range for this operation.
pub fn range(mut self, range: impl RangeBounds<u64>) -> Self {
self.0 = self.0.map_args(|args| args.with_range(range.into()));
self
}

/// Call the function to consume all the input and generate a
/// result.
pub fn call(self) -> Result<Vec<u8>> {
self.0.call()
}
}

/// Function that generated by [`BlockingOperator::reader_with`].
///
/// Users can add more options by public functions provided by this struct.
pub struct FunctionReader(pub(crate) OperatorFunction<OpRead, BlockingReader>);

impl FunctionReader {
/// Set the range for this operation.
pub fn range(mut self, range: impl RangeBounds<u64>) -> Self {
self.0 = self.0.map_args(|args| args.with_range(range.into()));
self
}

/// Call the function to consume all the input and generate a
/// result.
pub fn call(self) -> Result<BlockingReader> {
self.0.call()
}
}
6 changes: 3 additions & 3 deletions core/tests/behavior/blocking_read_only.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub fn behavior_blocking_read_only_tests(op: &Operator) -> Vec<Trial> {
test_blocking_read_only_stat_special_chars,
test_blocking_read_only_stat_not_exist,
test_blocking_read_only_read_full,
test_blocking_read_only_read_range,
test_blocking_read_only_read_with_range,
test_blocking_read_only_read_not_exist
)
}
Expand Down Expand Up @@ -88,8 +88,8 @@ pub fn test_blocking_read_only_read_full(op: BlockingOperator) -> Result<()> {
}

/// Read full content should match.
pub fn test_blocking_read_only_read_range(op: BlockingOperator) -> Result<()> {
let bs = op.range_read("normal_file", 1024..2048)?;
pub fn test_blocking_read_only_read_with_range(op: BlockingOperator) -> Result<()> {
let bs = op.read_with("normal_file").range(1024..2048).call()?;
assert_eq!(bs.len(), 1024, "read size");
assert_eq!(
format!("{:x}", Sha256::digest(&bs)),
Expand Down
16 changes: 11 additions & 5 deletions core/tests/behavior/blocking_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ pub fn test_blocking_read_range(op: BlockingOperator) -> Result<()> {
op.write(&path, content.clone())
.expect("write must succeed");

let bs = op.range_read(&path, offset..offset + length)?;
let bs = op.read_with(&path).range(offset..offset + length).call()?;
assert_eq!(bs.len() as u64, length, "read size");
assert_eq!(
format!("{:x}", Sha256::digest(&bs)),
Expand Down Expand Up @@ -258,7 +258,7 @@ pub fn test_blocking_read_large_range(op: BlockingOperator) -> Result<()> {
op.write(&path, content.clone())
.expect("write must succeed");

let bs = op.range_read(&path, offset..u32::MAX as u64)?;
let bs = op.read_with(&path).range(offset..u32::MAX as u64).call()?;
assert_eq!(
bs.len() as u64,
size as u64 - offset,
Expand Down Expand Up @@ -298,7 +298,10 @@ pub fn test_blocking_fuzz_range_reader(op: BlockingOperator) -> Result<()> {
.expect("write must succeed");

let mut fuzzer = ObjectReaderFuzzer::new(&path, content.clone(), 0, content.len());
let mut o = op.range_reader(&path, 0..content.len() as u64)?;
let mut o = op
.reader_with(&path)
.range(0..content.len() as u64)
.call()?;

for _ in 0..100 {
match fuzzer.fuzz() {
Expand Down Expand Up @@ -335,7 +338,7 @@ pub fn test_blocking_fuzz_offset_reader(op: BlockingOperator) -> Result<()> {
.expect("write must succeed");

let mut fuzzer = ObjectReaderFuzzer::new(&path, content.clone(), 0, content.len());
let mut o = op.range_reader(&path, 0..)?;
let mut o = op.reader_with(&path).range(0..).call()?;

for _ in 0..100 {
match fuzzer.fuzz() {
Expand Down Expand Up @@ -373,7 +376,10 @@ pub fn test_blocking_fuzz_part_reader(op: BlockingOperator) -> Result<()> {
.expect("write must succeed");

let mut fuzzer = ObjectReaderFuzzer::new(&path, content, offset as usize, length as usize);
let mut o = op.range_reader(&path, offset..offset + length)?;
let mut o = op
.reader_with(&path)
.range(offset..offset + length)
.call()?;

for _ in 0..100 {
match fuzzer.fuzz() {
Expand Down