Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add async list with metakey support #2803

Merged
merged 12 commits into from
Aug 7, 2023
18 changes: 8 additions & 10 deletions bin/oay/src/services/s3/service.rs
Expand Up @@ -25,6 +25,7 @@ use axum::response::Response;
use axum::routing::get;
use axum::Router;
use chrono::SecondsFormat;
use futures_util::StreamExt;
use opendal::Metakey;
use opendal::Operator;
use serde::Deserialize;
Expand Down Expand Up @@ -103,21 +104,18 @@ async fn handle_list_objects(
.op
.lister_with(&params.prefix)
.start_after(&params.start_after)
.await?;
.metakey(Metakey::Mode | Metakey::LastModified | Metakey::Etag | Metakey::ContentLength)
.await?
.chunks(1000);

let page = lister.next_page().await?.unwrap_or_default();
let page = lister.next().await.unwrap_or_default();

let is_truncated = lister.has_next().await?;
let is_truncated = page.len() >= 1000;
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved

let (mut common_prefixes, mut contents) = (vec![], vec![]);
for v in page {
let meta = state
.op
.metadata(
&v,
Metakey::Mode | Metakey::LastModified | Metakey::Etag | Metakey::ContentLength,
)
.await?;
let v = v?;
let meta = v.metadata();

if meta.is_dir() {
common_prefixes.push(CommonPrefix {
Expand Down
3 changes: 1 addition & 2 deletions bin/oli/src/commands/cp.rs
Expand Up @@ -25,7 +25,6 @@ use clap::ArgAction;
use clap::ArgMatches;
use clap::Command;
use futures::TryStreamExt;
use opendal::Metakey;

use crate::config::Config;

Expand Down Expand Up @@ -59,7 +58,7 @@ pub async fn main(args: &ArgMatches) -> Result<()> {
let dst_root = Path::new(&dst_path);
let mut ds = src_op.lister_with(&src_path).delimiter("").await?;
while let Some(de) = ds.try_next().await? {
let meta = src_op.metadata(&de, Metakey::Mode).await?;
let meta = de.metadata();
if meta.mode().is_dir() {
continue;
}
Expand Down
43 changes: 41 additions & 2 deletions bindings/c/include/opendal.h
Expand Up @@ -85,7 +85,7 @@ typedef enum opendal_code {
* BlockingLister is designed to list entries at given path in a blocking
* manner.
*
* Users can construct Lister by `blocking_list` or `blocking_scan`.
* Users can construct Lister by `blocking_lister`.
*/
typedef struct BlockingLister BlockingLister;

Expand Down Expand Up @@ -122,7 +122,46 @@ typedef struct BlockingLister BlockingLister;
typedef struct BlockingOperator BlockingOperator;

/**
* Entry is the file/dir entry returned by `Lister`.
* Entry returned by [`Lister`] or [`BlockingLister`] to represent a path and it's relative metadata.
*
* # Notes
*
* Entry returned by [`Lister`] or [`BlockingLister`] may carry some already known metadata.
* Lister by default only make sure that `Mode` is fetched. To make sure the entry contains
* metadata you want, please use `list_with` or `lister_with` and `metakey`.
*
* For example:
*
* ```no_run
* # use anyhow::Result;
* use opendal::EntryMode;
* use opendal::Metakey;
* use opendal::Operator;
* # #[tokio::main]
* # async fn test(op: Operator) -> Result<()> {
* let mut entries = op
* .list_with("dir/")
* .metakey(Metakey::ContentLength | Metakey::LastModified)
* .await?;
* for entry in entries {
* let meta = entry.metadata();
* match meta.mode() {
* EntryMode::FILE => {
* println!(
* "Handling file {} with size {}",
* entry.path(),
* meta.content_length()
* )
* }
* EntryMode::DIR => {
* println!("Handling dir {}", entry.path())
* }
* EntryMode::Unknown => continue,
* }
* }
* # Ok(())
* # }
* ```
*/
typedef struct Entry Entry;

Expand Down
23 changes: 7 additions & 16 deletions bindings/object_store/src/lib.rs
Expand Up @@ -149,19 +149,16 @@ impl ObjectStore for OpendalStore {
let stream = self
.inner
.lister_with(&path)
.metakey(Metakey::ContentLength | Metakey::LastModified)
.delimiter("")
.await
.map_err(|err| format_object_store_error(err, &path))?;

let stream = stream.then(|res| async {
let entry = res.map_err(|err| format_object_store_error(err, ""))?;
let meta = self
.inner
.metadata(&entry, Metakey::ContentLength | Metakey::LastModified)
.await
.map_err(|err| format_object_store_error(err, entry.path()))?;
let meta = entry.metadata();

Ok(format_object_meta(entry.path(), &meta))
Ok(format_object_meta(entry.path(), meta))
});

Ok(stream.boxed())
Expand All @@ -171,7 +168,8 @@ impl ObjectStore for OpendalStore {
let path = prefix.map_or("".into(), |x| format!("{}/", x));
let mut stream = self
.inner
.lister(&path)
.lister_with(&path)
.metakey(Metakey::Mode | Metakey::ContentLength | Metakey::LastModified)
.await
.map_err(|err| format_object_store_error(err, &path))?;

Expand All @@ -180,19 +178,12 @@ impl ObjectStore for OpendalStore {

while let Some(res) = stream.next().await {
let entry = res.map_err(|err| format_object_store_error(err, ""))?;
let meta = self
.inner
.metadata(
&entry,
Metakey::Mode | Metakey::ContentLength | Metakey::LastModified,
)
.await
.map_err(|err| format_object_store_error(err, entry.path()))?;
let meta = entry.metadata();

if meta.is_dir() {
common_prefixes.push(entry.path().into());
} else {
objects.push(format_object_meta(entry.path(), &meta));
objects.push(format_object_meta(entry.path(), meta));
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/async_backtrace.rs
Expand Up @@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use async_trait::async_trait;

use crate::raw::*;
use crate::*;

use async_trait::async_trait;

/// Add Efficient, logical 'stack' traces of async functions for the underlying services.
///
/// # Async Backtrace
Expand Down
7 changes: 3 additions & 4 deletions core/src/layers/await_tree.rs
Expand Up @@ -15,13 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use crate::raw::*;
use crate::*;

use async_trait::async_trait;

use await_tree::InstrumentAwait;

use crate::raw::*;
use crate::*;

/// Add a Instrument await-tree for actor-based applications to the underlying services.
///
/// # AwaitTree
Expand Down
25 changes: 5 additions & 20 deletions core/src/layers/immutable_index.rs
Expand Up @@ -310,10 +310,7 @@ mod tests {
"duplicated value: {}",
entry.path()
);
map.insert(
entry.path().to_string(),
op.metadata(&entry, Metakey::Mode).await?.mode(),
);
map.insert(entry.path().to_string(), entry.metadata().mode());
}

assert_eq!(map["file"], EntryMode::FILE);
Expand Down Expand Up @@ -351,10 +348,7 @@ mod tests {
"duplicated value: {}",
entry.path()
);
map.insert(
entry.path().to_string(),
op.metadata(&entry, Metakey::Mode).await?.mode(),
);
map.insert(entry.path().to_string(), entry.metadata().mode());
}

debug!("current files: {:?}", map);
Expand Down Expand Up @@ -398,10 +392,7 @@ mod tests {
"duplicated value: {}",
entry.path()
);
map.insert(
entry.path().to_string(),
op.metadata(&entry, Metakey::Mode).await?.mode(),
);
map.insert(entry.path().to_string(), entry.metadata().mode());
}

assert_eq!(map.len(), 1);
Expand All @@ -417,10 +408,7 @@ mod tests {
"duplicated value: {}",
entry.path()
);
map.insert(
entry.path().to_string(),
op.metadata(&entry, Metakey::Mode).await?.mode(),
);
map.insert(entry.path().to_string(), entry.metadata().mode());
}

assert_eq!(map["dataset/stateful/ontime_2007_200.csv"], EntryMode::FILE);
Expand Down Expand Up @@ -462,10 +450,7 @@ mod tests {
"duplicated value: {}",
entry.path()
);
map.insert(
entry.path().to_string(),
op.metadata(&entry, Metakey::Mode).await?.mode(),
);
map.insert(entry.path().to_string(), entry.metadata().mode());
}

debug!("current files: {:?}", map);
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/oio/entry.rs
Expand Up @@ -79,6 +79,6 @@ impl Entry {
///
/// NOTE: implement this by hand to avoid leaking raw entry to end-users.
pub(crate) fn into_entry(self) -> crate::Entry {
crate::Entry::new_with(self.path, self.meta)
crate::Entry::new(self.path, self.meta)
}
}
2 changes: 1 addition & 1 deletion core/src/raw/oio/read/into_seekable_read_by_range.rs
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use bytes::Bytes;
use std::future::Future;
use std::io::SeekFrom;
use std::pin::Pin;
Expand All @@ -24,6 +23,7 @@ use std::task::ready;
use std::task::Context;
use std::task::Poll;

use bytes::Bytes;
use futures::future::BoxFuture;

use crate::raw::*;
Expand Down
3 changes: 2 additions & 1 deletion core/src/raw/oio/write/multipart_upload_write.rs
Expand Up @@ -18,7 +18,8 @@
use async_trait::async_trait;
use bytes::Bytes;

use crate::{raw::*, *};
use crate::raw::*;
use crate::*;

const DEFAULT_WRITE_MIN_SIZE: usize = 8 * 1024 * 1024;

Expand Down
20 changes: 20 additions & 0 deletions core/src/raw/ops.rs
Expand Up @@ -21,7 +21,10 @@

use std::time::Duration;

use flagset::FlagSet;

use crate::raw::*;
use crate::Metakey;

/// Args for `create` operation.
///
Expand Down Expand Up @@ -77,6 +80,8 @@ pub struct OpList {

/// The delimiter used to for the list operation. Default to be `/`
delimiter: String,

metakey: FlagSet<Metakey>,
}

impl Default for OpList {
Expand All @@ -85,6 +90,8 @@ impl Default for OpList {
limit: None,
start_after: None,
delimiter: "/".to_string(),
// By default, we want to know what's the mode of this entry.
metakey: Metakey::Mode.into(),
}
}
}
Expand Down Expand Up @@ -127,6 +134,19 @@ impl OpList {
pub fn delimiter(&self) -> &str {
&self.delimiter
}

/// Change the metakey of this list operation.
///
/// The default metakey is `Metakey::Mode`.
pub fn with_metakey(mut self, metakey: impl Into<FlagSet<Metakey>>) -> Self {
self.metakey = metakey.into();
self
}

/// Get the current metakey.
pub fn metakey(&self) -> FlagSet<Metakey> {
self.metakey
}
}

/// Args for `presign` operation.
Expand Down
15 changes: 7 additions & 8 deletions core/src/services/tikv/backend.rs
Expand Up @@ -16,23 +16,22 @@
// under the License.

use std::collections::HashMap;
use tikv_client::Config;
use tikv_client::RawClient;
use std::fmt::Debug;
use std::fmt::Formatter;

use crate::raw::adapters::kv;
use crate::Capability;
use crate::Scheme;
use async_trait::async_trait;
use tikv_client::Config;
use tikv_client::RawClient;
use tokio::sync::OnceCell;

use crate::raw::adapters::kv;
use crate::Builder;
use crate::Capability;
use crate::Error;
use crate::ErrorKind;
use crate::Scheme;
use crate::*;

use std::fmt::Debug;
use std::fmt::Formatter;

/// TiKV backend builder
#[derive(Clone, Default)]
pub struct TikvBuilder {
Expand Down