Skip to content

Commit

Permalink
feat(oay): Add read_dir (#2736)
Browse files Browse the repository at this point in the history
* add  for oay-webdav

* port to DavStream
  • Loading branch information
Young-Flash committed Aug 2, 2023
1 parent 4958c84 commit ef1aeeb
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 18 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions bin/oay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ default = [

frontends-webdav = [
"dep:dav-server",
"dep:bytes"
"dep:bytes",
"dep:futures-util"
]
frontends-s3 = []

Expand Down Expand Up @@ -65,4 +66,5 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.3.1"
uuid = { version = "1", features = ["v4", "fast-rng"] }
dav-server = { version = "0.5.5", optional = true }
bytes = { version = "1.4.0", optional = true }
bytes = { version = "1.4.0", optional = true }
futures-util = { version = "0.3.16", optional = true }
25 changes: 21 additions & 4 deletions bin/oay/src/services/webdav/webdav_dir_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,33 @@
// specific language governing permissions and limitations
// under the License.

use dav_server::fs::DavDirEntry;
use dav_server::fs::{DavDirEntry, DavMetaData};
use futures::FutureExt;
use opendal::{Entry, Operator};

struct WebDAVDirEntry {}
use super::webdav_metadata::WebdavMetaData;

pub struct WebDAVDirEntry {
dir_entry: Entry,
op: Operator,
}

impl DavDirEntry for WebDAVDirEntry {
fn name(&self) -> Vec<u8> {
todo!()
self.dir_entry.name().as_bytes().to_vec()
}

fn metadata(&self) -> dav_server::fs::FsFuture<Box<dyn dav_server::fs::DavMetaData>> {
todo!()
async move {
let metedata = self.op.stat(self.dir_entry.path()).await.unwrap();
Ok(Box::new(WebdavMetaData::new(metedata)) as Box<dyn DavMetaData>)
}
.boxed()
}
}

impl WebDAVDirEntry {
pub fn new(dir_entry: Entry, op: Operator) -> Self {
WebDAVDirEntry { dir_entry, op }
}
}
6 changes: 3 additions & 3 deletions bin/oay/src/services/webdav/webdav_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ pub struct WebdavFile {
impl DavFile for WebdavFile {
fn read_bytes(&mut self, count: usize) -> FsFuture<bytes::Bytes> {
async move {
let file_path = self.path.as_rel_ospath();
let file_path = self.path.as_url_string();
let content = self
.op
.range_read(file_path.to_str().unwrap(), 0..count as u64)
.range_read(&file_path, 0..count as u64)
.await
.unwrap();
//error handle ?
Expand All @@ -53,7 +53,7 @@ impl DavFile for WebdavFile {
async move {
let opendal_metadata = self
.op
.stat(self.path.as_rel_ospath().to_str().unwrap())
.stat(self.path.as_url_string().as_str())
.await
.unwrap();
Ok(Box::new(WebdavMetaData::new(opendal_metadata)) as Box<dyn DavMetaData>)
Expand Down
7 changes: 5 additions & 2 deletions bin/oay/src/services/webdav/webdav_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use dav_server::fs::DavMetaData;
use dav_server::fs::{DavMetaData, FsError};
use opendal::Metadata;

#[derive(Debug, Clone)]
Expand All @@ -35,7 +35,10 @@ impl DavMetaData for WebdavMetaData {
}

fn modified(&self) -> dav_server::fs::FsResult<std::time::SystemTime> {
Ok(self.metadata.last_modified().unwrap().into())
match self.metadata.last_modified() {
Some(t) => Ok(t.into()),
None => Err(FsError::GeneralFailure),
}
}

fn is_dir(&self) -> bool {
Expand Down
57 changes: 50 additions & 7 deletions bin/oay/src/services/webdav/webdavfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,21 @@
// specific language governing permissions and limitations
// under the License.

use std::pin::Pin;
use std::task::Poll::{Pending, Ready};

use dav_server::fs::DavDirEntry;
use dav_server::fs::DavFile;
use dav_server::fs::DavFileSystem;
use dav_server::fs::DavMetaData;
use futures::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use opendal::Lister;
use opendal::Operator;

use crate::services::webdav::webdav_dir_entry::WebDAVDirEntry;

use super::webdav_file::WebdavFile;
use super::webdav_metadata::WebdavMetaData;

Expand Down Expand Up @@ -48,23 +57,23 @@ impl DavFileSystem for WebdavFs {

fn read_dir<'a>(
&'a self,
_path: &'a dav_server::davpath::DavPath,
path: &'a dav_server::davpath::DavPath,
_meta: dav_server::fs::ReadDirMeta,
) -> dav_server::fs::FsFuture<dav_server::fs::FsStream<Box<dyn dav_server::fs::DavDirEntry>>>
{
todo!()
async move {
let lister = self.op.list(path.as_url_string().as_str()).await.unwrap();
Ok(DavStream::new(self.op.clone(), lister).boxed())
}
.boxed()
}

fn metadata<'a>(
&'a self,
path: &'a dav_server::davpath::DavPath,
) -> dav_server::fs::FsFuture<Box<dyn dav_server::fs::DavMetaData>> {
async move {
let opendal_metadata = self
.op
.stat(path.as_rel_ospath().to_str().unwrap())
.await
.unwrap();
let opendal_metadata = self.op.stat(path.as_url_string().as_str()).await.unwrap();
Ok(Box::new(WebdavMetaData::new(opendal_metadata)) as Box<dyn DavMetaData>)
}
.boxed()
Expand All @@ -76,3 +85,37 @@ impl WebdavFs {
Box::new(WebdavFs { op })
}
}

struct DavStream {
op: Operator,
lister: Lister,
}

impl Stream for DavStream {
type Item = Box<dyn DavDirEntry>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let dav_stream = self.get_mut();
let lister = Pin::new(&mut dav_stream.lister).get_mut();

match Pin::new(lister).poll_next(cx) {
Ready(entry) => match entry {
Some(entry) => {
let webdav_entry = WebDAVDirEntry::new(entry.unwrap(), dav_stream.op.clone());
Ready(Some(Box::new(webdav_entry) as Box<dyn DavDirEntry>))
}
None => Ready(None),
},
Pending => Pending,
}
}
}

impl DavStream {
fn new(op: Operator, lister: Lister) -> Self {
DavStream { op, lister }
}
}

0 comments on commit ef1aeeb

Please sign in to comment.