Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove operator range_read and range_reader API #2898

Merged
merged 4 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion bin/oay/src/services/webdav/webdav_file.rs
Expand Up @@ -40,7 +40,8 @@ impl DavFile for WebdavFile {
async move {
let file_path = self.path.as_url_string();
self.op
.range_read(&file_path, 0..count as u64)
.read_with(&file_path)
.range(0..count as u64)
.await
.map(Bytes::from)
.map_err(convert_error)
Expand Down
8 changes: 5 additions & 3 deletions core/benches/ops/read.rs
Expand Up @@ -58,7 +58,8 @@ fn bench_read_full(c: &mut Criterion, name: &str, op: Operator) {
group.bench_with_input(size.to_string(), &(op.clone(), &path), |b, (op, path)| {
b.to_async(&*TOKIO).iter(|| async {
let r = op
.range_reader(path, 0..=size.bytes() as u64)
.reader_with(path)
.range(0..=size.bytes() as u64)
.await
.unwrap();
io::copy(r, &mut io::sink()).await.unwrap();
Expand Down Expand Up @@ -91,7 +92,7 @@ fn bench_read_part(c: &mut Criterion, name: &str, op: Operator) {
group.throughput(criterion::Throughput::Bytes(size.bytes() as u64));
group.bench_with_input(size.to_string(), &(op.clone(), &path), |b, (op, path)| {
b.to_async(&*TOKIO).iter(|| async {
let r = op.range_reader(path, offset..).await.unwrap();
let r = op.reader_with(path).range(offset..).await.unwrap();
io::copy(r, &mut io::sink()).await.unwrap();
})
});
Expand Down Expand Up @@ -130,7 +131,8 @@ fn bench_read_parallel(c: &mut Criterion, name: &str, op: Operator) {
.map(|_| async {
let mut buf = buf.clone();
let mut r = op
.range_reader(path, offset..=offset + size.bytes() as u64)
.reader_with(path)
.range(offset..=offset + size.bytes() as u64)
.await
.unwrap();
r.read_exact(&mut buf).await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion core/fuzz/fuzz_reader.rs
Expand Up @@ -226,7 +226,7 @@ async fn fuzz_reader(op: Operator, input: FuzzInput) -> Result<()> {
let mut checker = ReadChecker::new(input.size, input.range);
op.write(&path, checker.raw_data.clone()).await?;

let mut o = op.range_reader(&path, input.range.to_range()).await?;
let mut o = op.reader_with(&path).range(input.range.to_range()).await?;

for action in input.actions {
match action {
Expand Down
11 changes: 8 additions & 3 deletions core/src/layers/logging.rs
Expand Up @@ -401,7 +401,6 @@ impl<A: Accessor> LayeredAccessor for LoggingAccessor<A> {
from,
to,
self.ctx.error_print(&err),

)
};
err
Expand Down Expand Up @@ -1512,7 +1511,10 @@ impl<P: oio::Page> oio::Page for LoggingPager<P> {
Ok(None) => {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} -> finished", self.ctx.scheme, self.op, self.path
"service={} operation={} path={} -> finished",
self.ctx.scheme,
self.op,
self.path
);
self.finished = true;
}
Expand Down Expand Up @@ -1553,7 +1555,10 @@ impl<P: oio::BlockingPage> oio::BlockingPage for LoggingPager<P> {
Ok(None) => {
debug!(
target: LOGGING_TARGET,
"service={} operation={} path={} -> finished", self.ctx.scheme, self.op, self.path
"service={} operation={} path={} -> finished",
self.ctx.scheme,
self.op,
self.path
);
self.finished = true;
}
Expand Down
57 changes: 5 additions & 52 deletions core/src/types/operator/operator.rs
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::RangeBounds;
use std::time::Duration;

use bytes::Bytes;
Expand Down Expand Up @@ -327,7 +326,7 @@ impl Operator {
/// # }
/// ```
pub async fn read(&self, path: &str) -> Result<Vec<u8>> {
self.range_read(path, ..).await
self.read_with(path).await
}

/// Read the whole path into a bytes with extra options.
Expand All @@ -344,6 +343,7 @@ impl Operator {
/// # #[tokio::main]
/// # async fn test(op: Operator) -> Result<()> {
/// let bs = op.read_with("path/to/file").await?;
/// let bs = op.read_with("path/to/file").range(0..10).await?;
/// # Ok(())
/// # }
/// ```
Expand All @@ -361,7 +361,7 @@ impl Operator {
ErrorKind::IsADirectory,
"read path is a directory",
)
.with_operation("range_read")
.with_operation("read")
.with_context("service", inner.info().scheme())
.with_context("path", &path));
}
Expand All @@ -381,7 +381,7 @@ impl Operator {
// TODO: use native read api
s.read_exact(buf.initialized_mut()).await.map_err(|err| {
Error::new(ErrorKind::Unexpected, "read from storage")
.with_operation("range_read")
.with_operation("read")
.with_context("service", inner.info().scheme().into_static())
.with_context("path", &path)
.with_context("range", br.to_string())
Expand All @@ -401,31 +401,6 @@ impl Operator {
fut
}

/// Read the specified range of path into a bytes.
oowl marked this conversation as resolved.
Show resolved Hide resolved
///
/// This function will allocate a new bytes internally. For more precise memory control or
/// reading data lazily, please use [`Operator::range_reader`]
///
/// # Notes
///
/// - The returning content's length may be smaller than the range specified.
///
/// # Examples
///
/// ```
/// # use std::io::Result;
/// # use opendal::Operator;
/// # use futures::TryStreamExt;
/// # #[tokio::main]
/// # async fn test(op: Operator) -> Result<()> {
/// let bs = op.range_read("path/to/file", 1024..2048).await?;
/// # Ok(())
/// # }
/// ```
pub async fn range_read(&self, path: &str, range: impl RangeBounds<u64>) -> Result<Vec<u8>> {
self.read_with(path).range(range).await
}

/// Create a new reader which can read the whole path.
///
/// # Examples
Expand All @@ -445,28 +420,6 @@ impl Operator {
self.reader_with(path).await
}

/// Create a new reader which can read the specified range.
oowl marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Notes
///
/// - The returning content's length may be smaller than the range specified.
///
/// # Examples
///
/// ```no_run
/// # use std::io::Result;
/// # use opendal::Operator;
/// # use futures::TryStreamExt;
/// # #[tokio::main]
/// # async fn test(op: Operator) -> Result<()> {
/// let r = op.range_reader("path/to/file", 1024..2048).await?;
/// # Ok(())
/// # }
/// ```
pub async fn range_reader(&self, path: &str, range: impl RangeBounds<u64>) -> Result<Reader> {
self.reader_with(path).range(range).await
}

/// Create a new reader with extra options
///
/// # Examples
Expand Down Expand Up @@ -496,7 +449,7 @@ impl Operator {
ErrorKind::IsADirectory,
"read path is a directory",
)
.with_operation("Operator::range_reader")
.with_operation("Operator::reader")
.with_context("service", inner.info().scheme())
.with_context("path", path));
}
Expand Down
2 changes: 1 addition & 1 deletion core/tests/behavior/fuzz.rs
Expand Up @@ -81,7 +81,7 @@ pub async fn test_fuzz_issue_2717(op: Operator) -> Result<()> {
.await
.expect("write must succeed");

let mut r = op.range_reader(&path, 1..2).await?;
let mut r = op.reader_with(&path).range(1..2).await?;

// Perform a seek
let result = r.seek(SeekFrom::End(-2)).await;
Expand Down
16 changes: 8 additions & 8 deletions core/tests/behavior/read_only.rs
Expand Up @@ -40,8 +40,8 @@ pub fn behavior_read_only_tests(op: &Operator) -> Vec<Trial> {
test_read_only_stat_root,
test_read_only_read_full,
test_read_only_read_full_with_special_chars,
test_read_only_read_range,
test_read_only_reader_range,
test_read_only_read_with_range,
test_read_only_reader_with_range,
test_read_only_reader_from,
test_read_only_reader_tail,
test_read_only_read_not_exist,
Expand Down Expand Up @@ -184,8 +184,8 @@ pub async fn test_read_only_read_full_with_special_chars(op: Operator) -> Result
}

/// Read full content should match.
pub async fn test_read_only_read_range(op: Operator) -> Result<()> {
let bs = op.range_read("normal_file", 1024..2048).await?;
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved
pub async fn test_read_only_read_with_range(op: Operator) -> Result<()> {
let bs = op.read_with("normal_file").range(1024..2048).await?;
assert_eq!(bs.len(), 1024, "read size");
assert_eq!(
format!("{:x}", Sha256::digest(&bs)),
Expand All @@ -197,8 +197,8 @@ pub async fn test_read_only_read_range(op: Operator) -> Result<()> {
}

/// Read range should match.
pub async fn test_read_only_reader_range(op: Operator) -> Result<()> {
let mut r = op.range_reader("normal_file", 1024..2048).await?;
pub async fn test_read_only_reader_with_range(op: Operator) -> Result<()> {
let mut r = op.reader_with("normal_file").range(1024..2048).await?;

let mut bs = Vec::new();
r.read_to_end(&mut bs).await?;
Expand All @@ -215,7 +215,7 @@ pub async fn test_read_only_reader_range(op: Operator) -> Result<()> {

/// Read from should match.
pub async fn test_read_only_reader_from(op: Operator) -> Result<()> {
let mut r = op.range_reader("normal_file", 261120..).await?;
let mut r = op.reader_with("normal_file").range(261120..).await?;

let mut bs = Vec::new();
r.read_to_end(&mut bs).await?;
Expand All @@ -232,7 +232,7 @@ pub async fn test_read_only_reader_from(op: Operator) -> Result<()> {

/// Read tail should match.
pub async fn test_read_only_reader_tail(op: Operator) -> Result<()> {
let mut r = op.range_reader("normal_file", ..1024).await?;
let mut r = op.reader_with("normal_file").range(..1024).await?;

let mut bs = Vec::new();
r.read_to_end(&mut bs).await?;
Expand Down
20 changes: 10 additions & 10 deletions core/tests/behavior/write.rs
Expand Up @@ -68,7 +68,7 @@ pub fn behavior_write_tests(op: &Operator) -> Vec<Trial> {
test_read_not_exist,
test_read_with_if_match,
test_read_with_if_none_match,
test_fuzz_range_reader,
test_fuzz_reader_with_range,
test_fuzz_offset_reader,
test_fuzz_part_reader,
test_read_with_dir_path,
Expand Down Expand Up @@ -438,7 +438,7 @@ pub async fn test_read_range(op: Operator) -> Result<()> {
.await
.expect("write must succeed");

let bs = op.range_read(&path, offset..offset + length).await?;
let bs = op.read_with(&path).range(offset..offset + length).await?;
assert_eq!(bs.len() as u64, length, "read size");
assert_eq!(
format!("{:x}", Sha256::digest(&bs)),
Expand Down Expand Up @@ -468,7 +468,7 @@ pub async fn test_read_large_range(op: Operator) -> Result<()> {
.await
.expect("write must succeed");

let bs = op.range_read(&path, offset..u32::MAX as u64).await?;
let bs = op.read_with(&path).range(offset..u32::MAX as u64).await?;
assert_eq!(
bs.len() as u64,
size as u64 - offset,
Expand Down Expand Up @@ -499,7 +499,7 @@ pub async fn test_reader_range(op: Operator) -> Result<()> {
.await
.expect("write must succeed");

let mut r = op.range_reader(&path, offset..offset + length).await?;
let mut r = op.reader_with(&path).range(offset..offset + length).await?;

let mut bs = Vec::new();
r.read_to_end(&mut bs).await?;
Expand Down Expand Up @@ -532,7 +532,7 @@ pub async fn test_reader_from(op: Operator) -> Result<()> {
.await
.expect("write must succeed");

let mut r = op.range_reader(&path, offset..).await?;
let mut r = op.reader_with(&path).range(offset..).await?;

let mut bs = Vec::new();
r.read_to_end(&mut bs).await?;
Expand Down Expand Up @@ -563,7 +563,7 @@ pub async fn test_reader_tail(op: Operator) -> Result<()> {
.await
.expect("write must succeed");

let mut r = match op.range_reader(&path, ..length).await {
let mut r = match op.reader_with(&path).range(..length).await {
Ok(r) => r,
// Not all services support range with tail range, let's tolerate this.
Err(err) if err.kind() == ErrorKind::Unsupported => {
Expand Down Expand Up @@ -663,7 +663,7 @@ pub async fn test_read_with_if_none_match(op: Operator) -> Result<()> {
Ok(())
}

pub async fn test_fuzz_range_reader(op: Operator) -> Result<()> {
pub async fn test_fuzz_reader_with_range(op: Operator) -> Result<()> {
if !op.info().full_capability().read_with_range {
return Ok(());
}
Expand All @@ -677,7 +677,7 @@ pub async fn test_fuzz_range_reader(op: Operator) -> Result<()> {
.expect("write must succeed");

let mut fuzzer = ObjectReaderFuzzer::new(&path, content.clone(), 0, content.len());
let mut o = op.range_reader(&path, 0..content.len() as u64).await?;
let mut o = op.reader_with(&path).range(0..content.len() as u64).await?;

for _ in 0..100 {
match fuzzer.fuzz() {
Expand Down Expand Up @@ -718,7 +718,7 @@ pub async fn test_fuzz_offset_reader(op: Operator) -> Result<()> {
.expect("write must succeed");

let mut fuzzer = ObjectReaderFuzzer::new(&path, content.clone(), 0, content.len());
let mut o = op.range_reader(&path, 0..).await?;
let mut o = op.reader_with(&path).range(0..).await?;

for _ in 0..100 {
match fuzzer.fuzz() {
Expand Down Expand Up @@ -761,7 +761,7 @@ pub async fn test_fuzz_part_reader(op: Operator) -> Result<()> {

let mut fuzzer =
ObjectReaderFuzzer::new(&path, content.clone(), offset as usize, length as usize);
let mut o = op.range_reader(&path, offset..offset + length).await?;
let mut o = op.reader_with(&path).range(offset..offset + length).await?;

for _ in 0..100 {
match fuzzer.fuzz() {
Expand Down
3 changes: 2 additions & 1 deletion integrations/object_store/src/lib.rs
Expand Up @@ -111,7 +111,8 @@ impl ObjectStore for OpendalStore {
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let bs = self
.inner
.range_read(location.as_ref(), range.start as u64..range.end as u64)
.read_with(location.as_ref())
.range(range.start as u64..range.end as u64)
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;

Expand Down