Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement RFC-2758 Merge Append Into Write #2880

Merged
merged 10 commits into from
Aug 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion bindings/java/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ fn intern_append(
}

async fn do_append(op: &mut Operator, path: String, content: Vec<u8>) -> Result<()> {
Ok(op.append(&path, content).await?)
Ok(op.write_with(&path, content).append(true).await?)
}

/// # Safety
Expand Down
6 changes: 5 additions & 1 deletion bindings/nodejs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,11 @@ impl Operator {
Either::B(s) => s.into_bytes(),
};

self.0.append(&path, c).await.map_err(format_napi_error)
self.0
.write_with(&path, c)
.append(true)
.await
.map_err(format_napi_error)
}

/// Copy file according to given `from` and `to` path.
Expand Down
6 changes: 0 additions & 6 deletions core/src/layers/async_backtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ impl<A: Accessor> LayeredAccessor for AsyncBacktraceAccessor<A> {
type BlockingReader = A::BlockingReader;
type Writer = A::Writer;
type BlockingWriter = A::BlockingWriter;
type Appender = A::Appender;
type Pager = A::Pager;
type BlockingPager = A::BlockingPager;

Expand All @@ -83,11 +82,6 @@ impl<A: Accessor> LayeredAccessor for AsyncBacktraceAccessor<A> {
self.inner.write(path, args).await
}

#[async_backtrace::framed]
async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner.append(path, args).await
}

#[async_backtrace::framed]
async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.inner.copy(from, to, args).await
Expand Down
8 changes: 0 additions & 8 deletions core/src/layers/await_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ impl<A: Accessor> LayeredAccessor for AwaitTreeAccessor<A> {
type BlockingReader = A::BlockingReader;
type Writer = A::Writer;
type BlockingWriter = A::BlockingWriter;
type Appender = A::Appender;
type Pager = A::Pager;
type BlockingPager = A::BlockingPager;

Expand All @@ -95,13 +94,6 @@ impl<A: Accessor> LayeredAccessor for AwaitTreeAccessor<A> {
.await
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner
.append(path, args)
.instrument_await(format!("opendal::{}", Operation::Append))
.await
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.inner()
.copy(from, to, args)
Expand Down
5 changes: 0 additions & 5 deletions core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ impl<A: Accessor> LayeredAccessor for BlockingAccessor<A> {
type BlockingReader = BlockingWrapper<A::Reader>;
type Writer = A::Writer;
type BlockingWriter = BlockingWrapper<A::Writer>;
type Appender = A::Appender;
type Pager = A::Pager;
type BlockingPager = BlockingWrapper<A::Pager>;

Expand All @@ -97,10 +96,6 @@ impl<A: Accessor> LayeredAccessor for BlockingAccessor<A> {
self.inner.write(path, args).await
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner.append(path, args).await
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.inner.copy(from, to, args).await
}
Expand Down
5 changes: 0 additions & 5 deletions core/src/layers/chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ impl<A: Accessor> LayeredAccessor for ChaosAccessor<A> {
type BlockingReader = ChaosReader<A::BlockingReader>;
type Writer = A::Writer;
type BlockingWriter = A::BlockingWriter;
type Appender = A::Appender;
type Pager = A::Pager;
type BlockingPager = A::BlockingPager;

Expand Down Expand Up @@ -135,10 +134,6 @@ impl<A: Accessor> LayeredAccessor for ChaosAccessor<A> {
self.inner.blocking_write(path, args)
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner.append(path, args).await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
self.inner.list(path, args).await
}
Expand Down
67 changes: 0 additions & 67 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
type BlockingReader = CompleteReader<A, A::BlockingReader>;
type Writer = CompleteWriter<A::Writer>;
type BlockingWriter = CompleteWriter<A::BlockingWriter>;
type Appender = CompleteAppender<A::Appender>;
type Pager = CompletePager<A, A::Pager>;
type BlockingPager = CompletePager<A, A::BlockingPager>;

Expand Down Expand Up @@ -446,18 +445,6 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
.map(|(rp, w)| (rp, CompleteWriter::new(w, size)))
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
let capability = self.meta.full_capability();
if !capability.append {
return new_capability_unsupported_error(Operation::Append);
}

self.inner
.append(path, args)
.await
.map(|(rp, a)| (rp, CompleteAppender::new(a)))
}

async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let capability = self.meta.full_capability();
if !capability.create_dir {
Expand Down Expand Up @@ -854,54 +841,6 @@ where
}
}

pub struct CompleteAppender<A> {
inner: Option<A>,
}

impl<A> CompleteAppender<A> {
pub fn new(inner: A) -> CompleteAppender<A> {
CompleteAppender { inner: Some(inner) }
}
}

/// Check if the appender has been closed while debug_assertions enabled.
/// This code will never be executed in release mode.
#[cfg(debug_assertions)]
impl<A> Drop for CompleteAppender<A> {
fn drop(&mut self) {
if self.inner.is_some() {
// Do we need to panic here?
log::warn!("appender has not been closed, must be a bug")
}
}
}

#[async_trait]
impl<A> oio::Append for CompleteAppender<A>
where
A: oio::Append,
{
async fn append(&mut self, bs: Bytes) -> Result<()> {
let a = self
.inner
.as_mut()
.ok_or_else(|| Error::new(ErrorKind::Unexpected, "appender has been closed"))?;

a.append(bs).await
}

async fn close(&mut self) -> Result<()> {
let a = self
.inner
.as_mut()
.ok_or_else(|| Error::new(ErrorKind::Unexpected, "appender has been closed"))?;

a.close().await?;
self.inner = None;
Ok(())
}
}

fn new_capability_unsupported_error<R>(operation: Operation) -> Result<R> {
Err(Error::new(ErrorKind::Unsupported, "operation is not supported").with_operation(operation))
}
Expand Down Expand Up @@ -955,7 +894,6 @@ mod tests {
type BlockingReader = ();
type Writer = ();
type BlockingWriter = ();
type Appender = ();
type Pager = ();
type BlockingPager = ();

Expand All @@ -978,10 +916,6 @@ mod tests {
Ok((RpWrite::new(), ()))
}

async fn append(&self, _: &str, _: OpAppend) -> Result<(RpAppend, Self::Appender)> {
Ok((RpAppend::new(), ()))
}

async fn copy(&self, _: &str, _: &str, _: OpCopy) -> Result<RpCopy> {
Ok(RpCopy {})
}
Expand Down Expand Up @@ -1045,7 +979,6 @@ mod tests {
capability_test!(stat, |op| { op.stat("/path/to/mock_file") });
capability_test!(read, |op| { op.read("/path/to/mock_file") });
capability_test!(write, |op| { op.writer("/path/to/mock_file") });
capability_test!(append, |op| { op.appender("/path/to/mock_file") });
capability_test!(create_dir, |op| { op.create_dir("/path/to/mock_dir/") });
capability_test!(delete, |op| { op.delete("/path/to/mock_file") });
capability_test!(copy, |op| {
Expand Down
26 changes: 0 additions & 26 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
type BlockingReader = ConcurrentLimitWrapper<A::BlockingReader>;
type Writer = ConcurrentLimitWrapper<A::Writer>;
type BlockingWriter = ConcurrentLimitWrapper<A::BlockingWriter>;
type Appender = ConcurrentLimitWrapper<A::Appender>;
type Pager = ConcurrentLimitWrapper<A::Pager>;
type BlockingPager = ConcurrentLimitWrapper<A::BlockingPager>;

Expand Down Expand Up @@ -132,20 +131,6 @@ impl<A: Accessor> LayeredAccessor for ConcurrentLimitAccessor<A> {
.map(|(rp, w)| (rp, ConcurrentLimitWrapper::new(w, permit)))
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
let permit = self
.semaphore
.clone()
.acquire_owned()
.await
.expect("semaphore must be valid");

self.inner
.append(path, args)
.await
.map(|(rp, a)| (rp, ConcurrentLimitWrapper::new(a, permit)))
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let _permit = self
.semaphore
Expand Down Expand Up @@ -327,17 +312,6 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
}
}

#[async_trait]
impl<R: oio::Append> oio::Append for ConcurrentLimitWrapper<R> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
self.inner.append(bs).await
}

async fn close(&mut self) -> Result<()> {
self.inner.close().await
}
}

#[async_trait]
impl<R: oio::Page> oio::Page for ConcurrentLimitWrapper<R> {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
Expand Down
42 changes: 0 additions & 42 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures::TryFutureExt;

use crate::raw::oio::AppendOperation;
use crate::raw::oio::PageOperation;
use crate::raw::oio::ReadOperation;
use crate::raw::oio::WriteOperation;
Expand Down Expand Up @@ -71,7 +70,6 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
type BlockingReader = ErrorContextWrapper<A::BlockingReader>;
type Writer = ErrorContextWrapper<A::Writer>;
type BlockingWriter = ErrorContextWrapper<A::BlockingWriter>;
type Appender = ErrorContextWrapper<A::Appender>;
type Pager = ErrorContextWrapper<A::Pager>;
type BlockingPager = ErrorContextWrapper<A::BlockingPager>;

Expand Down Expand Up @@ -139,27 +137,6 @@ impl<A: Accessor> LayeredAccessor for ErrorContextAccessor<A> {
.await
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner
.append(path, args)
.map_ok(|(rp, os)| {
(
rp,
ErrorContextWrapper {
scheme: self.meta.scheme(),
path: path.to_string(),
inner: os,
},
)
})
.map_err(|err| {
err.with_operation(Operation::Append)
.with_context("service", self.meta.scheme())
.with_context("path", path)
})
.await
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
self.inner
.copy(from, to, args)
Expand Down Expand Up @@ -477,25 +454,6 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for ErrorContextWrapper<T> {
}
}

#[async_trait::async_trait]
impl<T: oio::Append> oio::Append for ErrorContextWrapper<T> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
self.inner.append(bs).await.map_err(|err| {
err.with_operation(AppendOperation::Append)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
}

async fn close(&mut self) -> Result<()> {
self.inner.close().await.map_err(|err| {
err.with_operation(AppendOperation::Close)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
}
}

#[async_trait::async_trait]
impl<T: oio::Page> oio::Page for ErrorContextWrapper<T> {
async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
Expand Down
5 changes: 0 additions & 5 deletions core/src/layers/immutable_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ impl<A: Accessor> LayeredAccessor for ImmutableIndexAccessor<A> {
type BlockingReader = A::BlockingReader;
type Writer = A::Writer;
type BlockingWriter = A::BlockingWriter;
type Appender = A::Appender;
type Pager = ImmutableDir;
type BlockingPager = ImmutableDir;

Expand Down Expand Up @@ -196,10 +195,6 @@ impl<A: Accessor> LayeredAccessor for ImmutableIndexAccessor<A> {
self.inner.blocking_write(path, args)
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
self.inner.append(path, args).await
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
let mut path = path;
if path == "/" {
Expand Down