Skip to content

Commit

Permalink
fix append close
Browse files Browse the repository at this point in the history
Signed-off-by: suyanhanx <suyanhanx@gmail.com>
  • Loading branch information
suyanhanx committed Sep 10, 2023
1 parent 8a49a69 commit f11f868
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
2 changes: 1 addition & 1 deletion core/src/raw/oio/write/exact_buf_write.rs
Expand Up @@ -226,7 +226,7 @@ mod tests {
let mut bs = Bytes::from(content.clone());
while !bs.is_empty() {
let n = writer.write(&bs).await?;
bs.advance(n as usize);
bs.advance(n);
}
}
writer.close().await?;
Expand Down
7 changes: 5 additions & 2 deletions core/src/services/fs/backend.rs
Expand Up @@ -388,7 +388,7 @@ impl Accessor for FsBackend {
.await
.map_err(parse_io_error)?;

Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f, op)))
}

async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
Expand Down Expand Up @@ -578,7 +578,10 @@ impl Accessor for FsBackend {
.open(tmp_path.as_ref().unwrap_or(&target_path))
.map_err(parse_io_error)?;

Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
Ok((
RpWrite::new(),
FsWriter::new(target_path, tmp_path, f, args),
))
}

fn blocking_copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
Expand Down
39 changes: 34 additions & 5 deletions core/src/services/fs/writer.rs
Expand Up @@ -29,14 +29,18 @@ pub struct FsWriter<F> {
target_path: PathBuf,
tmp_path: Option<PathBuf>,
f: F,

op: OpWrite,
}

impl<F> FsWriter<F> {
pub fn new(target_path: PathBuf, tmp_path: Option<PathBuf>, f: F) -> Self {
pub fn new(target_path: PathBuf, tmp_path: Option<PathBuf>, f: F, op: OpWrite) -> Self {
Self {
target_path,
tmp_path,
f,

op,
}
}
}
Expand All @@ -58,9 +62,23 @@ impl oio::Write for FsWriter<tokio::fs::File> {
self.f.sync_all().await.map_err(parse_io_error)?;

if let Some(tmp_path) = &self.tmp_path {
tokio::fs::rename(tmp_path, &self.target_path)
.await
.map_err(parse_io_error)?;
if self.op.append() {
let mut target = tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.append(true)
.open(&self.target_path)
.await
.map_err(parse_io_error)?;

tokio::io::copy(&mut self.f, &mut target)
.await
.map_err(parse_io_error)?;
} else {
tokio::fs::rename(tmp_path, &self.target_path)
.await
.map_err(parse_io_error)?;
}
}

Ok(())
Expand All @@ -76,7 +94,18 @@ impl oio::BlockingWrite for FsWriter<std::fs::File> {
self.f.sync_all().map_err(parse_io_error)?;

if let Some(tmp_path) = &self.tmp_path {
std::fs::rename(tmp_path, &self.target_path).map_err(parse_io_error)?;
if self.op.append() {
let mut target = std::fs::OpenOptions::new()
.create(true)
.write(true)
.append(true)
.open(&self.target_path)
.map_err(parse_io_error)?;

std::io::copy(&mut self.f, &mut target).map_err(parse_io_error)?;
} else {
std::fs::rename(tmp_path, &self.target_path).map_err(parse_io_error)?;
}
}

Ok(())
Expand Down

0 comments on commit f11f868

Please sign in to comment.