Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(oay): Add read_dir #2736

Merged
merged 2 commits into from Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions bin/oay/Cargo.toml
Expand Up @@ -37,7 +37,8 @@ default = [

frontends-webdav = [
"dep:dav-server",
"dep:bytes"
"dep:bytes",
"dep:futures-util"
]
frontends-s3 = []

Expand Down Expand Up @@ -65,4 +66,5 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.3.1"
uuid = { version = "1", features = ["v4", "fast-rng"] }
dav-server = { version = "0.5.5", optional = true }
bytes = { version = "1.4.0", optional = true }
bytes = { version = "1.4.0", optional = true }
futures-util = { version = "0.3.16", optional = true }
25 changes: 21 additions & 4 deletions bin/oay/src/services/webdav/webdav_dir_entry.rs
Expand Up @@ -15,16 +15,33 @@
// specific language governing permissions and limitations
// under the License.

use dav_server::fs::DavDirEntry;
use dav_server::fs::{DavDirEntry, DavMetaData};
use futures::FutureExt;
use opendal::{Entry, Operator};

struct WebDAVDirEntry {}
use super::webdav_metadata::WebdavMetaData;

pub struct WebDAVDirEntry {
dir_entry: Entry,
op: Operator,
}

impl DavDirEntry for WebDAVDirEntry {
fn name(&self) -> Vec<u8> {
todo!()
self.dir_entry.name().as_bytes().to_vec()
}

fn metadata(&self) -> dav_server::fs::FsFuture<Box<dyn dav_server::fs::DavMetaData>> {
todo!()
async move {
let metedata = self.op.stat(self.dir_entry.path()).await.unwrap();
Ok(Box::new(WebdavMetaData::new(metedata)) as Box<dyn DavMetaData>)
}
.boxed()
}
}

impl WebDAVDirEntry {
pub fn new(dir_entry: Entry, op: Operator) -> Self {
WebDAVDirEntry { dir_entry, op }
}
}
6 changes: 3 additions & 3 deletions bin/oay/src/services/webdav/webdav_file.rs
Expand Up @@ -37,10 +37,10 @@ pub struct WebdavFile {
impl DavFile for WebdavFile {
fn read_bytes(&mut self, count: usize) -> FsFuture<bytes::Bytes> {
async move {
let file_path = self.path.as_rel_ospath();
let file_path = self.path.as_url_string();
let content = self
.op
.range_read(file_path.to_str().unwrap(), 0..count as u64)
.range_read(&file_path, 0..count as u64)
.await
.unwrap();
//error handle ?
Expand All @@ -53,7 +53,7 @@ impl DavFile for WebdavFile {
async move {
let opendal_metadata = self
.op
.stat(self.path.as_rel_ospath().to_str().unwrap())
.stat(self.path.as_url_string().as_str())
.await
.unwrap();
Ok(Box::new(WebdavMetaData::new(opendal_metadata)) as Box<dyn DavMetaData>)
Expand Down
7 changes: 5 additions & 2 deletions bin/oay/src/services/webdav/webdav_metadata.rs
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use dav_server::fs::DavMetaData;
use dav_server::fs::{DavMetaData, FsError};
use opendal::Metadata;

#[derive(Debug, Clone)]
Expand All @@ -35,7 +35,10 @@ impl DavMetaData for WebdavMetaData {
}

fn modified(&self) -> dav_server::fs::FsResult<std::time::SystemTime> {
Ok(self.metadata.last_modified().unwrap().into())
match self.metadata.last_modified() {
Some(t) => Ok(t.into()),
None => Err(FsError::GeneralFailure),
}
}

fn is_dir(&self) -> bool {
Expand Down
57 changes: 50 additions & 7 deletions bin/oay/src/services/webdav/webdavfs.rs
Expand Up @@ -15,12 +15,21 @@
// specific language governing permissions and limitations
// under the License.

use std::pin::Pin;
use std::task::Poll::{Pending, Ready};

use dav_server::fs::DavDirEntry;
use dav_server::fs::DavFile;
use dav_server::fs::DavFileSystem;
use dav_server::fs::DavMetaData;
use futures::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use opendal::Lister;
use opendal::Operator;

use crate::services::webdav::webdav_dir_entry::WebDAVDirEntry;

use super::webdav_file::WebdavFile;
use super::webdav_metadata::WebdavMetaData;

Expand Down Expand Up @@ -48,23 +57,23 @@ impl DavFileSystem for WebdavFs {

fn read_dir<'a>(
&'a self,
_path: &'a dav_server::davpath::DavPath,
path: &'a dav_server::davpath::DavPath,
_meta: dav_server::fs::ReadDirMeta,
) -> dav_server::fs::FsFuture<dav_server::fs::FsStream<Box<dyn dav_server::fs::DavDirEntry>>>
{
todo!()
async move {
let lister = self.op.list(path.as_url_string().as_str()).await.unwrap();
Ok(DavStream::new(self.op.clone(), lister).boxed())
}
.boxed()
}

fn metadata<'a>(
&'a self,
path: &'a dav_server::davpath::DavPath,
) -> dav_server::fs::FsFuture<Box<dyn dav_server::fs::DavMetaData>> {
async move {
let opendal_metadata = self
.op
.stat(path.as_rel_ospath().to_str().unwrap())
.await
.unwrap();
let opendal_metadata = self.op.stat(path.as_url_string().as_str()).await.unwrap();
Ok(Box::new(WebdavMetaData::new(opendal_metadata)) as Box<dyn DavMetaData>)
}
.boxed()
Expand All @@ -76,3 +85,37 @@ impl WebdavFs {
Box::new(WebdavFs { op })
}
}

struct DavStream {
op: Operator,
lister: Lister,
}

impl Stream for DavStream {
type Item = Box<dyn DavDirEntry>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let dav_stream = self.get_mut();
let lister = Pin::new(&mut dav_stream.lister).get_mut();

match Pin::new(lister).poll_next(cx) {
Ready(entry) => match entry {
Some(entry) => {
let webdav_entry = WebDAVDirEntry::new(entry.unwrap(), dav_stream.op.clone());
Ready(Some(Box::new(webdav_entry) as Box<dyn DavDirEntry>))
}
None => Ready(None),
},
Pending => Pending,
}
}
}

impl DavStream {
fn new(op: Operator, lister: Lister) -> Self {
DavStream { op, lister }
}
}