Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add retry for Writer::sink operation #2896

Merged
merged 1 commit into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion bindings/ocaml/src/operator/mod.rs
Expand Up @@ -19,9 +19,10 @@ mod _type;
mod metadata;
mod reader;

use super::*;
use _type::*;

use super::*;

#[ocaml::func]
#[ocaml::sig("string -> (string * string) list -> (operator, string) Result.t ")]
pub fn operator(
Expand Down
4 changes: 2 additions & 2 deletions bindings/ocaml/src/operator/reader.rs
Expand Up @@ -17,10 +17,10 @@

use std::io;

use super::*;

use opendal::raw::oio::BlockingRead;

use super::*;

#[ocaml::func]
#[ocaml::sig("reader -> bytes -> (int, string) Result.t ")]
pub fn reader_read(reader: &mut Reader, buf: &mut [u8]) -> Result<usize, String> {
Expand Down
60 changes: 58 additions & 2 deletions core/src/layers/retry.rs
Expand Up @@ -34,6 +34,7 @@ use backon::Retryable;
use bytes::Bytes;
use futures::FutureExt;
use log::warn;
use tokio::sync::Mutex;

use crate::raw::oio::PageOperation;
use crate::raw::oio::ReadOperation;
Expand Down Expand Up @@ -898,9 +899,64 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
}
}

/// Sink will move the input stream, so we can't retry it.
/// > Ooooooooooops, are you crazy!? Why we need to do `Arc<Mutex<S>>` here? Adding a lock has
/// a lot overhead!
///
/// Yes, you are right. But we have no choice. This is the only safe way for us to add retry
/// support for stream.
///
/// And the overhead is acceptable. Based on our benchmark, adding a lock
/// that has no conflicts will only cost 5ns.
///
/// ```shell
/// stream/without_arc_mutex
/// time: [10.715 ns 10.729 ns 10.744 ns]
/// thrpt: [ 90896 GiB/s 91019 GiB/s 91139 GiB/s]
/// stream/with_arc_mutex time: [14.891 ns 14.905 ns 14.928 ns]
/// thrpt: [ 65418 GiB/s 65517 GiB/s 65581 GiB/s]
/// ```
///
/// The overhead is constant, which means the overhead will not increase with the size of
/// stream. For example, if every `next` call cost 1ms, then the overhead will only take 0.005%
/// which is acceptable.
async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
self.inner.sink(size, s).await
let s = Arc::new(Mutex::new(s));

let mut backoff = self.builder.build();

loop {
match self.inner.sink(size, Box::new(s.clone())).await {
Ok(_) => return Ok(()),
Err(e) if !e.is_temporary() => return Err(e),
Err(e) => match backoff.next() {
None => return Err(e),
Some(dur) => {
{
use oio::StreamExt;

let mut stream = s.lock().await;
// Try to reset this stream.
//
// If error happened, we will return the sink error directly and stop retry.
if stream.reset().await.is_err() {
return Err(e);
}
}

self.notify.intercept(
&e,
dur,
&[
("operation", WriteOperation::Sink.into_static()),
("path", &self.path),
],
);
tokio::time::sleep(dur).await;
continue;
}
},
}
}
}

async fn abort(&mut self) -> Result<()> {
Expand Down
118 changes: 114 additions & 4 deletions core/src/raw/oio/stream/api.rs
Expand Up @@ -15,10 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use bytes::Bytes;
use pin_project::pin_project;

use crate::*;

Expand All @@ -32,6 +36,9 @@ pub type Streamer = Box<dyn Stream>;
pub trait Stream: Unpin + Send + Sync {
/// Poll next item `Result<Bytes>` from the stream.
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>>;

/// Reset this stream to the beginning.
fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>;
}

impl Stream for () {
Expand All @@ -40,6 +47,12 @@ impl Stream for () {

unimplemented!("poll_next is required to be implemented for oio::Stream")
}

fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
let _ = cx;

unimplemented!("poll_reset is required to be implemented for oio::Stream")
}
}

/// `Box<dyn Stream>` won't implement `Stream` automatically.
Expand All @@ -48,17 +61,114 @@ impl<T: Stream + ?Sized> Stream for Box<T> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
(**self).poll_next(cx)
}

fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
(**self).poll_reset(cx)
}
}

impl<T: Stream + ?Sized> Stream for Arc<std::sync::Mutex<T>> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
match self.try_lock() {
Ok(mut this) => this.poll_next(cx),
Err(_) => Poll::Ready(Some(Err(Error::new(
ErrorKind::Unexpected,
"the stream is expected to have only one consumer, but it's not",
)))),
}
}

fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self.try_lock() {
Ok(mut this) => this.poll_reset(cx),
Err(_) => Poll::Ready(Err(Error::new(
ErrorKind::Unexpected,
"the stream is expected to have only one consumer, but it's not",
))),
}
}
}

impl<T: Stream + ?Sized> Stream for Arc<tokio::sync::Mutex<T>> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
match self.try_lock() {
Ok(mut this) => this.poll_next(cx),
Err(_) => Poll::Ready(Some(Err(Error::new(
ErrorKind::Unexpected,
"the stream is expected to have only one consumer, but it's not",
)))),
}
}

fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self.try_lock() {
Ok(mut this) => this.poll_reset(cx),
Err(_) => Poll::Ready(Err(Error::new(
ErrorKind::Unexpected,
"the stream is expected to have only one consumer, but it's not",
))),
}
}
}

impl futures::Stream for dyn Stream {
type Item = Result<Bytes>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this: &mut dyn Stream = &mut *self;

this.poll_next(cx)
}
}

/// Impl StreamExt for all T: Stream
impl<T: Stream> StreamExt for T {}

/// Extension of [`Stream`] to make it easier for use.
pub trait StreamExt: Stream {
/// Build a future for `poll_next`.
fn next(&mut self) -> NextFuture<'_, Self> {
NextFuture { inner: self }
}

/// Build a future for `poll_reset`.
fn reset(&mut self) -> ResetFuture<'_, Self> {
ResetFuture { inner: self }
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct NextFuture<'a, T: Stream + Unpin + ?Sized> {
inner: &'a mut T,
}

impl<T> Future for NextFuture<'_, T>
where
T: Stream + Unpin + ?Sized,
{
type Output = Option<Result<Bytes>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
let this = self.project();
Pin::new(this.inner).poll_next(cx)
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct ResetFuture<'a, T: Stream + Unpin + ?Sized> {
inner: &'a mut T,
}

impl<T> Future for ResetFuture<'_, T>
where
T: Stream + Unpin + ?Sized,
{
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.project();
Pin::new(this.inner).poll_reset(cx)
}
}
7 changes: 7 additions & 0 deletions core/src/raw/oio/stream/into_stream.rs
Expand Up @@ -43,4 +43,11 @@ where
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
self.inner.try_poll_next_unpin(cx)
}

fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"IntoStream doesn't support reset",
)))
}
}
7 changes: 7 additions & 0 deletions core/src/raw/oio/stream/into_stream_from_reader.rs
Expand Up @@ -89,4 +89,11 @@ where
.set_source(err)))),
}
}

fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"FromReaderStream doesn't support reset",
)))
}
}
1 change: 1 addition & 0 deletions core/src/raw/oio/stream/mod.rs
Expand Up @@ -17,6 +17,7 @@

mod api;
pub use api::Stream;
pub use api::StreamExt;
pub use api::Streamer;

mod into_stream_from_reader;
Expand Down