Skip to content

Commit

Permalink
feat(types): synchronous blocking operator and operator's API (#2924)
Browse files Browse the repository at this point in the history
  • Loading branch information
oowl committed Aug 24, 2023
1 parent c058ee0 commit 8f86153
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 12 deletions.
49 changes: 39 additions & 10 deletions core/src/types/operator/blocking_operator.rs
Expand Up @@ -593,19 +593,48 @@ impl BlockingOperator {
/// # }
/// ```
pub fn writer(&self, path: &str) -> Result<BlockingWriter> {
self.writer_with(path).call()
}

/// Create a new reader with extra options
///
/// # Examples
///
/// ```no_run
/// # use anyhow::Result;
/// use opendal::BlockingOperator;
/// use opendal::EntryMode;
/// use opendal::Metakey;
/// # fn test(op: BlockingOperator) -> Result<()> {
/// let mut w = op.writer_with("path/to/file").call()?;
/// w.write(vec![0; 4096])?;
/// w.write(vec![1; 4096])?;
/// w.close()?;
/// # Ok(())
/// # }
/// ```
pub fn writer_with(&self, path: &str) -> FunctionWriter {
let path = normalize_path(path);

if !validate_path(&path, EntryMode::FILE) {
return Err(
Error::new(ErrorKind::IsADirectory, "write path is a directory")
.with_operation("BlockingOperator::writer")
.with_context("service", self.info().scheme().into_static())
.with_context("path", &path),
);
}
FunctionWriter(OperatorFunction::new(
self.inner().clone(),
path,
OpWrite::default(),
|inner, path, args| {
let path = normalize_path(&path);

let op = OpWrite::default();
BlockingWriter::create(self.inner().clone(), &path, op)
if !validate_path(&path, EntryMode::FILE) {
return Err(
Error::new(ErrorKind::IsADirectory, "write path is a directory")
.with_operation("BlockingOperator::writer_with")
.with_context("service", inner.info().scheme().into_static())
.with_context("path", &path),
);
}

BlockingWriter::create(inner.clone(), &path, args)
},
))
}

/// Delete given path.
Expand Down
159 changes: 157 additions & 2 deletions core/src/types/operator/operator_functions.rs
Expand Up @@ -76,29 +76,142 @@ pub struct FunctionWrite(
);

impl FunctionWrite {
/// Set the content length for this operation.
/// Set the append mode of op.
///
/// If the append mode is set, the data will be appended to the end of the file.
///
/// # Notes
///
/// Service could return `Unsupported` if the underlying storage does not support append.
pub fn append(mut self, v: bool) -> Self {
self.0 = self.0.map_args(|(args, bs)| (args.with_append(v), bs));
self
}

/// Set the buffer size of op.
///
/// If buffer size is set, the data will be buffered by the underlying writer.
///
/// ## NOTE
///
/// Service could have their own minimum buffer size while perform write operations like
/// multipart uploads. So the buffer size may be larger than the given buffer size.
pub fn buffer_size(mut self, v: usize) -> Self {
self.0 = self.0.map_args(|(args, bs)| (args.with_buffer_size(v), bs));
self
}

/// Set the content length of op.
///
/// If the content length is not set, the content length will be
/// calculated automatically by buffering part of data.
pub fn content_length(mut self, v: u64) -> Self {
self.0 = self
.0
.map_args(|(args, bs)| (args.with_content_length(v), bs));
self
}

/// Set the content type for this operation.
/// Set the content type of option
pub fn content_type(mut self, v: &str) -> Self {
self.0 = self
.0
.map_args(|(args, bs)| (args.with_content_type(v), bs));
self
}

/// Set the content disposition of option
pub fn content_disposition(mut self, v: &str) -> Self {
self.0 = self
.0
.map_args(|(args, bs)| (args.with_content_disposition(v), bs));
self
}

/// Set the content type of option
pub fn cache_control(mut self, v: &str) -> Self {
self.0 = self
.0
.map_args(|(args, bs)| (args.with_cache_control(v), bs));
self
}

/// Call the function to consume all the input and generate a
/// result.
pub fn call(self) -> Result<()> {
self.0.call()
}
}

/// Function that generated by [`BlockingOperator::writer_with`].
///
/// Users can add more options by public functions provided by this struct.
pub struct FunctionWriter(
/// The args for FunctionWriter is a bit special because we also
/// need to move the bytes input this function.
pub(crate) OperatorFunction<OpWrite, BlockingWriter>,
);

impl FunctionWriter {
/// Set the append mode of op.
///
/// If the append mode is set, the data will be appended to the end of the file.
///
/// # Notes
///
/// Service could return `Unsupported` if the underlying storage does not support append.
pub fn append(mut self, v: bool) -> Self {
self.0 = self.0.map_args(|args| args.with_append(v));
self
}

/// Set the buffer size of op.
///
/// If buffer size is set, the data will be buffered by the underlying writer.
///
/// ## NOTE
///
/// Service could have their own minimum buffer size while perform write operations like
/// multipart uploads. So the buffer size may be larger than the given buffer size.
pub fn buffer_size(mut self, v: usize) -> Self {
self.0 = self.0.map_args(|args| args.with_buffer_size(v));
self
}

/// Set the content length of op.
///
/// If the content length is not set, the content length will be
/// calculated automatically by buffering part of data.
pub fn content_length(mut self, v: u64) -> Self {
self.0 = self.0.map_args(|args| args.with_content_length(v));
self
}

/// Set the content type of option
pub fn content_type(mut self, v: &str) -> Self {
self.0 = self.0.map_args(|args| args.with_content_type(v));
self
}

/// Set the content disposition of option
pub fn content_disposition(mut self, v: &str) -> Self {
self.0 = self.0.map_args(|args| args.with_content_disposition(v));
self
}

/// Set the content type of option
pub fn cache_control(mut self, v: &str) -> Self {
self.0 = self.0.map_args(|args| args.with_cache_control(v));
self
}

/// Call the function to consume all the input and generate a
/// result.
pub fn call(self) -> Result<BlockingWriter> {
self.0.call()
}
}

/// Function that generated by [`BlockingOperator::delete_with`].
///
/// Users can add more options by public functions provided by this struct.
Expand Down Expand Up @@ -223,6 +336,48 @@ impl FunctionReader {
self
}

/// Sets the content-disposition header that should be send back by the remote read operation.
pub fn override_content_disposition(mut self, content_disposition: &str) -> Self {
self.0 = self
.0
.map_args(|args| args.with_override_content_disposition(content_disposition));
self
}

/// Sets the cache-control header that should be send back by the remote read operation.
pub fn override_cache_control(mut self, cache_control: &str) -> Self {
self.0 = self
.0
.map_args(|args| args.with_override_cache_control(cache_control));
self
}

/// Sets the content-type header that should be send back by the remote read operation.
pub fn override_content_type(mut self, content_type: &str) -> Self {
self.0 = self
.0
.map_args(|args| args.with_override_content_type(content_type));
self
}

/// Set the If-Match for this operation.
pub fn if_match(mut self, v: &str) -> Self {
self.0 = self.0.map_args(|args| args.with_if_match(v));
self
}

/// Set the If-None-Match for this operation.
pub fn if_none_match(mut self, v: &str) -> Self {
self.0 = self.0.map_args(|args| args.with_if_none_match(v));
self
}

/// Set the version for this operation.
pub fn version(mut self, v: &str) -> Self {
self.0 = self.0.map_args(|args| args.with_version(v));
self
}

/// Call the function to consume all the input and generate a
/// result.
pub fn call(self) -> Result<BlockingReader> {
Expand Down
6 changes: 6 additions & 0 deletions core/src/types/operator/operator_futures.rs
Expand Up @@ -546,6 +546,12 @@ impl Future for FutureDelete {
pub struct FutureList(pub(crate) OperatorFuture<OpList, Vec<Entry>>);

impl FutureList {
/// Change the limit of this list operation.
pub fn limit(mut self, v: usize) -> Self {
self.0 = self.0.map_args(|args| args.with_limit(v));
self
}

/// Change the start_after of this list operation.
pub fn start_after(mut self, v: &str) -> Self {
self.0 = self.0.map_args(|args| args.with_start_after(v));
Expand Down

0 comments on commit 8f86153

Please sign in to comment.