Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Metakeys are not propagated with the blocking operators #3116

Merged
merged 2 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 37 additions & 9 deletions core/src/types/list.rs
Expand Up @@ -142,7 +142,11 @@ impl Stream for Lister {
///
/// Users can construct Lister by `blocking_lister`.
pub struct BlockingLister {
pager: oio::BlockingPager,
acc: FusedAccessor,
/// required_metakey is the metakey required by users.
required_metakey: FlagSet<Metakey>,

pager: Option<oio::BlockingPager>,
buf: VecDeque<oio::Entry>,
}

Expand All @@ -153,11 +157,17 @@ unsafe impl Sync for BlockingLister {}

impl BlockingLister {
/// Create a new lister.
pub(crate) fn new(pager: oio::BlockingPager) -> Self {
Self {
pager,
buf: VecDeque::default(),
}
pub(crate) fn create(acc: FusedAccessor, path: &str, args: OpList) -> Result<Self> {
let required_metakey = args.metakey();
let (_, pager) = acc.blocking_list(path, args)?;

Ok(Self {
acc,
required_metakey,

buf: VecDeque::new(),
pager: Some(pager),
})
}
}

Expand All @@ -167,15 +177,33 @@ impl Iterator for BlockingLister {

fn next(&mut self) -> Option<Self::Item> {
if let Some(oe) = self.buf.pop_front() {
return Some(Ok(oe.into_entry()));
let (path, metadata) = oe.into_entry().into_parts();
// TODO: we can optimize this by checking the provided metakey provided by services.
if metadata.contains_bit(self.required_metakey) {
return Some(Ok(Entry::new(path, metadata)));
}

let metadata = match self.acc.blocking_stat(&path, OpStat::default()) {
Ok(rp) => rp.into_metadata(),
Err(err) => return Some(Err(err)),
};
return Some(Ok(Entry::new(path, metadata)));
}

self.buf = match self.pager.next() {
let pager = match self.pager.as_mut() {
Some(pager) => pager,
None => return None,
};

self.buf = match pager.next() {
// Ideally, the convert from `Vec` to `VecDeque` will not do reallocation.
//
// However, this could be changed as described in [impl<T, A> From<Vec<T, A>> for VecDeque<T, A>](https://doc.rust-lang.org/std/collections/struct.VecDeque.html#impl-From%3CVec%3CT%2C%20A%3E%3E-for-VecDeque%3CT%2C%20A%3E)
Ok(Some(entries)) => entries.into(),
Ok(None) => return None,
Ok(None) => {
self.pager = None;
return None;
}
Err(err) => return Some(Err(err)),
};

Expand Down
7 changes: 2 additions & 5 deletions core/src/types/operator/blocking_operator.rs
Expand Up @@ -941,8 +941,7 @@ impl BlockingOperator {
.with_context("path", &path));
}

let (_, pager) = inner.blocking_list(&path, args)?;
let lister = BlockingLister::new(pager);
let lister = BlockingLister::create(inner, &path, args)?;

lister.collect()
},
Expand Down Expand Up @@ -1116,9 +1115,7 @@ impl BlockingOperator {
.with_context("path", &path));
}

let (_, pager) = inner.blocking_list(&path, args)?;

Ok(BlockingLister::new(pager))
BlockingLister::create(inner, &path, args)
},
))
}
Expand Down
91 changes: 91 additions & 0 deletions core/tests/behavior/blocking_list.rs
Expand Up @@ -33,6 +33,8 @@ pub fn behavior_blocking_list_tests(op: &Operator) -> Vec<Trial> {
blocking_trials!(
op,
test_blocking_list_dir,
test_blocking_list_dir_with_metakey,
test_blocking_list_dir_with_metakey_complete,
test_blocking_list_non_exist_dir,
test_blocking_scan,
test_blocking_remove_all
Expand Down Expand Up @@ -67,6 +69,95 @@ pub fn test_blocking_list_dir(op: BlockingOperator) -> Result<()> {
Ok(())
}

/// List dir with metakey
pub fn test_blocking_list_dir_with_metakey(op: BlockingOperator) -> Result<()> {
let parent = uuid::Uuid::new_v4().to_string();
let path = format!("{parent}/{}", uuid::Uuid::new_v4());
debug!("Generate a random file: {}", &path);
let (content, size) = gen_bytes();

op.write(&path, content).expect("write must succeed");

let mut obs = op
.lister_with(&format!("{parent}/"))
.metakey(
Metakey::Mode
| Metakey::CacheControl
| Metakey::ContentDisposition
| Metakey::ContentLength
| Metakey::ContentMd5
| Metakey::ContentRange
| Metakey::ContentType
| Metakey::Etag
| Metakey::LastModified
| Metakey::Version,
)
.call()?;
let mut found = false;
while let Some(de) = obs.next().transpose()? {
let meta = de.metadata();
if de.path() == path {
assert_eq!(meta.mode(), EntryMode::FILE);
assert_eq!(meta.content_length(), size as u64);

// We don't care about the value, we just to check there is no panic.
let _ = meta.cache_control();
let _ = meta.content_disposition();
let _ = meta.content_md5();
let _ = meta.content_range();
let _ = meta.content_type();
let _ = meta.etag();
let _ = meta.last_modified();
let _ = meta.version();

found = true
}
}
assert!(found, "file should be found in list");

op.delete(&path).expect("delete must succeed");
Ok(())
}

/// List dir with metakey complete
pub fn test_blocking_list_dir_with_metakey_complete(op: BlockingOperator) -> Result<()> {
let parent = uuid::Uuid::new_v4().to_string();
let path = format!("{parent}/{}", uuid::Uuid::new_v4());
debug!("Generate a random file: {}", &path);
let (content, size) = gen_bytes();

op.write(&path, content).expect("write must succeed");

let mut obs = op
.lister_with(&format!("{parent}/"))
.metakey(Metakey::Complete)
.call()?;
let mut found = false;
while let Some(de) = obs.next().transpose()? {
let meta = de.metadata();
if de.path() == path {
assert_eq!(meta.mode(), EntryMode::FILE);
assert_eq!(meta.content_length(), size as u64);

// We don't care about the value, we just to check there is no panic.
let _ = meta.cache_control();
let _ = meta.content_disposition();
let _ = meta.content_md5();
let _ = meta.content_range();
let _ = meta.content_type();
let _ = meta.etag();
let _ = meta.last_modified();
let _ = meta.version();

found = true
}
}
assert!(found, "file should be found in list");

op.delete(&path).expect("delete must succeed");
Ok(())
}

/// List non exist dir should return nothing.
pub fn test_blocking_list_non_exist_dir(op: BlockingOperator) -> Result<()> {
let dir = format!("{}/", uuid::Uuid::new_v4());
Expand Down