Skip to content

Commit

Permalink
feat: add native & full capability (#2874)
Browse files Browse the repository at this point in the history
* feat: add native & full capability

Signed-off-by: yah01 <yah2er0ne@outlook.com>

* fix move meta out

Signed-off-by: yah01 <yah2er0ne@outlook.com>

* fix tests

Signed-off-by: yah01 <yah2er0ne@outlook.com>

---------

Signed-off-by: yah01 <yah2er0ne@outlook.com>
  • Loading branch information
yah01 committed Aug 19, 2023
1 parent 18c5a18 commit f4cd502
Show file tree
Hide file tree
Showing 52 changed files with 151 additions and 172 deletions.
2 changes: 1 addition & 1 deletion bin/oay/src/services/s3/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn handle_list_objects(
) -> Result<OkResponse, ErrorResponse> {
debug!("got params: {:?}", params);

if !state.op.info().capability().list_with_start_after {
if !state.op.info().full_capability().list_with_start_after {
return Err(ErrorResponse {
code: StatusCode::NOT_IMPLEMENTED,
err: Error {
Expand Down
7 changes: 3 additions & 4 deletions core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,9 @@ impl<A: Accessor> LayeredAccessor for BlockingAccessor<A> {
}

fn metadata(&self) -> AccessorInfo {
let mut info = self.inner.info();
let cap = info.capability_mut();
cap.blocking = true;
info
let mut meta = self.inner.info();
meta.full_capability_mut().blocking = true;
meta
}

async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
Expand Down
57 changes: 33 additions & 24 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,8 @@ impl<A: Accessor> Layer<A> for CompleteLayer {
type LayeredAccessor = CompleteReaderAccessor<A>;

fn layer(&self, inner: A) -> Self::LayeredAccessor {
let meta = inner.info();
CompleteReaderAccessor {
meta,
meta: inner.info(),
inner: Arc::new(inner),
}
}
Expand All @@ -157,7 +156,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
path: &str,
args: OpRead,
) -> Result<(RpRead, CompleteReader<A, A::Reader>)> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.read {
return new_capability_unsupported_error(Operation::Read);
}
Expand Down Expand Up @@ -210,7 +209,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
path: &str,
args: OpRead,
) -> Result<(RpRead, CompleteReader<A, A::BlockingReader>)> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.read || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingRead);
}
Expand Down Expand Up @@ -266,7 +265,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
path: &str,
args: OpList,
) -> Result<(RpList, CompletePager<A, A::Pager>)> {
let cap = self.meta.capability();
let cap = self.meta.full_capability();
if !cap.list {
return Err(
Error::new(ErrorKind::Unsupported, "operation is not supported")
Expand Down Expand Up @@ -315,7 +314,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
path: &str,
args: OpList,
) -> Result<(RpList, CompletePager<A, A::BlockingPager>)> {
let cap = self.meta.capability();
let cap = self.meta.full_capability();
if !cap.list {
return Err(
Error::new(ErrorKind::Unsupported, "operation is not supported")
Expand Down Expand Up @@ -376,6 +375,16 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
&self.inner
}

fn metadata(&self) -> AccessorInfo {
let mut meta = self.meta.clone();
let cap = meta.full_capability_mut();
if cap.read {
cap.read_can_next = true;
cap.read_can_seek = true;
}
meta
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
self.complete_reader(path, args).await
}
Expand All @@ -385,7 +394,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.stat {
return new_capability_unsupported_error(Operation::Stat);
}
Expand All @@ -399,7 +408,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.stat || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingStat);
}
Expand All @@ -413,7 +422,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.write {
return new_capability_unsupported_error(Operation::Write);
}
Expand All @@ -426,7 +435,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.write || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingWrite);
}
Expand All @@ -438,7 +447,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, Self::Appender)> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.append {
return new_capability_unsupported_error(Operation::Append);
}
Expand All @@ -450,7 +459,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.create_dir {
return new_capability_unsupported_error(Operation::CreateDir);
}
Expand All @@ -459,7 +468,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.create_dir || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingCreateDir);
}
Expand All @@ -468,7 +477,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.delete {
return new_capability_unsupported_error(Operation::Delete);
}
Expand All @@ -477,7 +486,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.delete || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingDelete);
}
Expand All @@ -486,7 +495,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.copy {
return new_capability_unsupported_error(Operation::Copy);
}
Expand All @@ -495,7 +504,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.copy || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingCopy);
}
Expand All @@ -504,7 +513,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.rename {
return new_capability_unsupported_error(Operation::Rename);
}
Expand All @@ -513,7 +522,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.rename || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingRename);
}
Expand All @@ -522,7 +531,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.list {
return new_capability_unsupported_error(Operation::List);
}
Expand All @@ -531,7 +540,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.list || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingList);
}
Expand All @@ -540,7 +549,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.presign {
return new_capability_unsupported_error(Operation::Presign);
}
Expand All @@ -549,7 +558,7 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> {
}

async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
let capability = self.meta.capability();
let capability = self.meta.full_capability();
if !capability.batch {
return new_capability_unsupported_error(Operation::Batch);
}
Expand Down Expand Up @@ -952,7 +961,7 @@ mod tests {

fn info(&self) -> AccessorInfo {
let mut info = AccessorInfo::default();
info.set_capability(self.capability);
info.set_full_capability(self.capability);

info
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/immutable_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl<A: Accessor> LayeredAccessor for ImmutableIndexAccessor<A> {
fn metadata(&self) -> AccessorInfo {
let mut meta = self.inner.info();

let cap = meta.capability_mut();
let cap = meta.full_capability_mut();
cap.list = true;
cap.list_with_delimiter_slash = true;
cap.list_without_delimiter = true;
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/madsim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl LayeredAccessor for MadsimAccessor {
let mut info = AccessorInfo::default();
info.set_name("madsim");

info.set_capability(Capability {
info.set_full_capability(Capability {
read: true,
write: true,
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion core/src/layers/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,7 @@ mod tests {

fn info(&self) -> AccessorInfo {
let mut am = AccessorInfo::default();
am.set_capability(Capability {
am.set_full_capability(Capability {
read: true,
list: true,
list_with_delimiter_slash: true,
Expand Down
29 changes: 18 additions & 11 deletions core/src/raw/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ impl Accessor for () {
scheme: Scheme::Custom("dummy"),
root: "".to_string(),
name: "dummy".to_string(),
capability: Capability::default(),
native_capability: Capability::default(),
full_capability: Capability::default(),
}
}
}
Expand Down Expand Up @@ -509,7 +510,8 @@ pub struct AccessorInfo {
root: String,
name: String,

capability: Capability,
native_capability: Capability,
full_capability: Capability,
}

impl AccessorInfo {
Expand Down Expand Up @@ -553,19 +555,24 @@ impl AccessorInfo {
self
}

/// Get backend's capabilities.
pub fn capability(&self) -> Capability {
self.capability
/// Get backend's native capabilities.
pub fn native_capability(&self) -> Capability {
self.native_capability
}

/// Get backend's capabilities.
pub fn capability_mut(&mut self) -> &mut Capability {
&mut self.capability
/// Get service's full capabilities.
pub fn full_capability(&self) -> Capability {
self.full_capability
}

/// Set capabilities for backend.
pub fn set_capability(&mut self, capability: Capability) -> &mut Self {
self.capability = capability;
/// Get service's full capabilities.
pub fn full_capability_mut(&mut self) -> &mut Capability {
&mut self.full_capability
}

/// Set full capabilities for service.
pub fn set_full_capability(&mut self, capability: Capability) -> &mut Self {
self.full_capability = capability;
self
}
}
2 changes: 1 addition & 1 deletion core/src/raw/adapters/kv/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl From<Metadata> for AccessorInfo {
let mut am = AccessorInfo::default();
am.set_name(m.name());
am.set_scheme(m.scheme());
am.set_capability(m.capabilities());
am.set_full_capability(m.capabilities());

am
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl<S: Adapter> Accessor for Backend<S> {
let mut am: AccessorInfo = self.kv.metadata().into();
am.set_root(&self.root);

let cap = am.capability_mut();
let cap = am.full_capability_mut();
if cap.read {
cap.read_can_seek = true;
cap.read_can_next = true;
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/adapters/typed_kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<S: Adapter> Accessor for Backend<S> {
am.set_scheme(kv_info.scheme());
am.set_name(kv_info.name());
let kv_cap = kv_info.capabilities();
let cap = am.capability_mut();
let cap = am.full_capability_mut();
if kv_cap.get {
cap.read = true;
cap.read_can_seek = true;
Expand Down
6 changes: 3 additions & 3 deletions core/src/raw/oio/page/into_flat_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn into_flat_page<A: Accessor, P>(acc: A, path: &str, size: usize) -> FlatPa
{
let meta = acc.info();
debug_assert!(
meta.capability().list_with_delimiter_slash,
meta.full_capability().list_with_delimiter_slash,
"service doesn't support list hierarchy, it must be a bug"
);
}
Expand Down Expand Up @@ -254,8 +254,8 @@ mod tests {

fn info(&self) -> AccessorInfo {
let mut am = AccessorInfo::default();
am.capability_mut().list = true;
am.capability_mut().list_with_delimiter_slash = true;
am.full_capability_mut().list = true;
am.full_capability_mut().list_with_delimiter_slash = true;

am
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/raw/oio/read/into_seekable_read_by_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ mod tests {

fn info(&self) -> AccessorInfo {
let mut am = AccessorInfo::default();
am.set_capability(Capability {
am.set_full_capability(Capability {
read: true,
..Default::default()
});
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/azblob/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ impl Accessor for AzblobBackend {
am.set_scheme(Scheme::Azblob)
.set_root(&self.core.root)
.set_name(&self.core.container)
.set_capability(Capability {
.set_full_capability(Capability {
stat: true,
stat_with_if_match: true,
stat_with_if_none_match: true,
Expand Down

0 comments on commit f4cd502

Please sign in to comment.