Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement RFC-2774 Lister API #2787

Merged
merged 11 commits into from
Aug 6, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/oay/src/services/s3/service.rs
Expand Up @@ -101,7 +101,7 @@ async fn handle_list_objects(

let mut lister = state
.op
.list_with(&params.prefix)
.lister_with(&params.prefix)
.start_after(&params.start_after)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion bin/oay/src/services/webdav/webdavfs.rs
Expand Up @@ -62,7 +62,7 @@ impl DavFileSystem for WebdavFs {
) -> dav_server::fs::FsFuture<dav_server::fs::FsStream<Box<dyn dav_server::fs::DavDirEntry>>>
{
async move {
let lister = self.op.list(path.as_url_string().as_str()).await.unwrap();
let lister = self.op.lister(path.as_url_string().as_str()).await.unwrap();
Ok(DavStream::new(self.op.clone(), lister).boxed())
}
.boxed()
Expand Down
2 changes: 1 addition & 1 deletion bin/oli/src/commands/cp.rs
Expand Up @@ -57,7 +57,7 @@ pub async fn main(args: &ArgMatches) -> Result<()> {
}

let dst_root = Path::new(&dst_path);
let mut ds = src_op.scan(&src_path).await?;
let mut ds = src_op.lister_with(&src_path).delimiter("").await?;
while let Some(de) = ds.try_next().await? {
let meta = src_op.metadata(&de, Metakey::Mode).await?;
if meta.mode().is_dir() {
Expand Down
4 changes: 2 additions & 2 deletions bin/oli/src/commands/ls.rs
Expand Up @@ -41,14 +41,14 @@ pub async fn main(args: &ArgMatches) -> Result<()> {
let (op, path) = cfg.parse_location(target)?;

if !recursive {
let mut ds = op.list(&path).await?;
let mut ds = op.lister(&path).await?;
while let Some(de) = ds.try_next().await? {
println!("{}", de.name());
}
return Ok(());
}

let mut ds = op.scan(&path).await?;
let mut ds = op.lister_with(&path).delimiter("").await?;
while let Some(de) = ds.try_next().await? {
println!("{}", de.path());
}
Expand Down
1 change: 1 addition & 0 deletions bindings/nodejs/index.d.ts
Expand Up @@ -216,6 +216,7 @@ export class Operator {
* An error will be returned if given path doesn't end with /.
*
* ### Example
*
* ```javascript
* const lister = await op.scan("/path/to/dir/");
* while (true) {
Expand Down
13 changes: 11 additions & 2 deletions bindings/nodejs/src/lib.rs
Expand Up @@ -288,6 +288,7 @@ impl Operator {
/// An error will be returned if given path doesn't end with /.
///
/// ### Example
///
/// ```javascript
/// const lister = await op.scan("/path/to/dir/");
/// while (true) {
Expand All @@ -303,7 +304,13 @@ impl Operator {
/// `````
#[napi]
pub async fn scan(&self, path: String) -> Result<Lister> {
Ok(Lister(self.0.scan(&path).await.map_err(format_napi_error)?))
Ok(Lister(
self.0
.lister_with(&path)
.delimiter("")
.await
.map_err(format_napi_error)?,
))
}

/// List dir in flat way synchronously.
Expand Down Expand Up @@ -408,7 +415,9 @@ impl Operator {
/// ```
#[napi]
pub async fn list(&self, path: String) -> Result<Lister> {
Ok(Lister(self.0.list(&path).await.map_err(format_napi_error)?))
Ok(Lister(
self.0.lister(&path).await.map_err(format_napi_error)?,
))
}

/// List given path synchronously.
Expand Down
5 changes: 3 additions & 2 deletions bindings/object_store/src/lib.rs
Expand Up @@ -148,7 +148,8 @@ impl ObjectStore for OpendalStore {
let path = prefix.map_or("".into(), |x| format!("{}/", x));
let stream = self
.inner
.scan(&path)
.lister_with(&path)
.delimiter("")
.await
.map_err(|err| format_object_store_error(err, &path))?;

Expand All @@ -170,7 +171,7 @@ impl ObjectStore for OpendalStore {
let path = prefix.map_or("".into(), |x| format!("{}/", x));
let mut stream = self
.inner
.list(&path)
.lister(&path)
.await
.map_err(|err| format_object_store_error(err, &path))?;

Expand Down
8 changes: 6 additions & 2 deletions bindings/python/src/asyncio.rs
Expand Up @@ -139,7 +139,7 @@ impl AsyncOperator {
pub fn list<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let lister = this.list(&path).await.map_err(format_pyerr)?;
let lister = this.lister(&path).await.map_err(format_pyerr)?;
let pylister: PyObject = Python::with_gil(|py| AsyncLister::new(lister).into_py(py));
Ok(pylister)
})
Expand All @@ -149,7 +149,11 @@ impl AsyncOperator {
pub fn scan<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> {
let this = self.0.clone();
future_into_py(py, async move {
let lister = this.scan(&path).await.map_err(format_pyerr)?;
let lister = this
.lister_with(&path)
.delimiter("")
.await
.map_err(format_pyerr)?;
let pylister: PyObject = Python::with_gil(|py| AsyncLister::new(lister).into_py(py));
Ok(pylister)
})
Expand Down
12 changes: 12 additions & 0 deletions core/src/docs/upgrade.md
@@ -1,3 +1,15 @@
# Unreleased

## Public API

### RFC-2774 Lister API

RFC-2774 proposes a new `lister` API to replace current `list` and `scan`. And we add a new API `list` to return entries directly.
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved

- For listing a directory at once, please use `list()` for convenience.
- For listing a directory recursively, please use `list_with().delimiter("")` or `lister_with().delimiter("")` instead.
- For listing in streaming, please use `lister()` or `lister_with()` instead.
Xuanwo marked this conversation as resolved.
Show resolved Hide resolved

# Upgrade to v0.39

## Public API
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/complete.rs
Expand Up @@ -1014,7 +1014,7 @@ mod tests {
capability_test!(rename, |op| {
op.rename("/path/to/mock_file", "/path/to/mock_file_2")
});
capability_test!(list, |op| { op.list("/path/to/mock_dir/") });
capability_test!(list, |op| { op.lister("/path/to/mock_dir/") });
capability_test!(presign, |op| {
op.presign_read("/path/to/mock_file", Duration::from_secs(1))
});
Expand Down
10 changes: 5 additions & 5 deletions core/src/layers/immutable_index.rs
Expand Up @@ -302,7 +302,7 @@ mod tests {

let mut map = HashMap::new();
let mut set = HashSet::new();
let mut ds = op.list("").await?;
let mut ds = op.lister("").await?;
while let Some(entry) = ds.try_next().await? {
debug!("got entry: {}", entry.path());
assert!(
Expand Down Expand Up @@ -341,7 +341,7 @@ mod tests {
.layer(iil)
.finish();

let mut ds = op.scan("/").await?;
let mut ds = op.lister_with("/").delimiter("").await?;
let mut set = HashSet::new();
let mut map = HashMap::new();
while let Some(entry) = ds.try_next().await? {
Expand Down Expand Up @@ -391,7 +391,7 @@ mod tests {
// List /
let mut map = HashMap::new();
let mut set = HashSet::new();
let mut ds = op.list("/").await?;
let mut ds = op.lister("/").await?;
while let Some(entry) = ds.try_next().await? {
assert!(
set.insert(entry.path().to_string()),
Expand All @@ -410,7 +410,7 @@ mod tests {
// List dataset/stateful/
let mut map = HashMap::new();
let mut set = HashSet::new();
let mut ds = op.list("dataset/stateful/").await?;
let mut ds = op.lister("dataset/stateful/").await?;
while let Some(entry) = ds.try_next().await? {
assert!(
set.insert(entry.path().to_string()),
Expand Down Expand Up @@ -452,7 +452,7 @@ mod tests {
.layer(iil)
.finish();

let mut ds = op.scan("/").await?;
let mut ds = op.lister_with("/").delimiter("").await?;

let mut map = HashMap::new();
let mut set = HashSet::new();
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/retry.rs
Expand Up @@ -1356,7 +1356,7 @@ mod tests {
let expected = vec!["hello", "world", "2023/", "0208/"];

let mut lister = op
.list("retryable_error/")
.lister("retryable_error/")
.await
.expect("service must support list");
let mut actual = Vec::new();
Expand Down