Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(blocking): tests for blocking append #3023

Merged
merged 5 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 38 additions & 8 deletions core/src/services/fs/backend.rs
Expand Up @@ -55,6 +55,11 @@ impl FsBuilder {
}

/// Set temp dir for atomic write.
///
/// # Notes
///
/// - When append is enabled, we will not use atomic write
/// to avoid data loss and performance issue.
pub fn atomic_write_dir(&mut self, dir: &str) -> &mut Self {
self.atomic_write_dir = if dir.is_empty() {
None
Expand Down Expand Up @@ -367,15 +372,24 @@ impl Accessor for FsBackend {
let target_path = Self::ensure_write_abs_path(&self.root, path).await?;
let tmp_path =
Self::ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path)).await?;
(target_path, Some(tmp_path))

// If the target file exists, we should append to the end of it directly.
if op.append()
&& tokio::fs::try_exists(&target_path)
.await
.map_err(parse_io_error)?
{
(target_path, None)
} else {
(target_path, Some(tmp_path))
}
} else {
let p = Self::ensure_write_abs_path(&self.root, path).await?;

(p, None)
};

let mut open_options = tokio::fs::OpenOptions::new();

open_options.create(true).write(true);
if op.append() {
open_options.append(true);
Expand Down Expand Up @@ -554,22 +568,38 @@ impl Accessor for FsBackend {
Ok((RpRead::new(end - start), r))
}

fn blocking_write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
fn blocking_write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir {
let target_path = Self::blocking_ensure_write_abs_path(&self.root, path)?;
let tmp_path =
Self::blocking_ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))?;
(target_path, Some(tmp_path))

// If the target file exists, we should append to the end of it directly.
if op.append()
&& Path::new(&target_path)
.try_exists()
.map_err(parse_io_error)?
{
(target_path, None)
} else {
(target_path, Some(tmp_path))
}
} else {
let p = Self::blocking_ensure_write_abs_path(&self.root, path)?;

(p, None)
};

let f = std::fs::OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
let mut f = std::fs::OpenOptions::new();
f.create(true).write(true);

if op.append() {
f.append(true);
} else {
f.truncate(true);
}

let f = f
.open(tmp_path.as_ref().unwrap_or(&target_path))
.map_err(parse_io_error)?;

Expand Down
1 change: 1 addition & 0 deletions core/src/services/fs/writer.rs
Expand Up @@ -45,6 +45,7 @@ impl<F> FsWriter<F> {
Self {
target_path,
tmp_path,

f: Some(f),
fut: None,
}
Expand Down
220 changes: 220 additions & 0 deletions core/tests/behavior/blocking_append.rs
@@ -0,0 +1,220 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::vec;

use anyhow::Result;
use sha2::Digest;
use sha2::Sha256;
use std::io::BufReader;
use std::io::Cursor;

use crate::*;

pub fn behavior_blocking_append_tests(op: &Operator) -> Vec<Trial> {
let cap = op.info().full_capability();

if !(cap.read && cap.write && cap.blocking && cap.write_can_append) {
return vec![];
}

blocking_trials!(
op,
test_blocking_append_create_append,
test_blocking_append_with_dir_path,
test_blocking_append_with_cache_control,
test_blocking_append_with_content_type,
test_blocking_append_with_content_disposition,
test_blocking_appender_std_copy,
test_blocking_fuzz_appender
)
}

/// Test append to a file must success.
pub fn test_blocking_append_create_append(op: BlockingOperator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();
let (content_one, size_one) = gen_bytes();
let (content_two, size_two) = gen_bytes();

op.write_with(&path, content_one.clone())
.append(true)
.call()
.expect("append file first time must success");

op.write_with(&path, content_two.clone())
.append(true)
.call()
.expect("append to an existing file must success");

let bs = op.read(&path).expect("read file must success");

assert_eq!(bs.len(), size_one + size_two);
assert_eq!(bs[..size_one], content_one);
assert_eq!(bs[size_one..], content_two);

op.delete(&path).expect("delete file must success");

Ok(())
}

/// Test append to a directory path must fail.
pub fn test_blocking_append_with_dir_path(op: BlockingOperator) -> Result<()> {
let path = format!("{}/", uuid::Uuid::new_v4());
let (content, _) = gen_bytes();

let res = op.write_with(&path, content).append(true).call();
assert!(res.is_err());
assert_eq!(res.unwrap_err().kind(), ErrorKind::IsADirectory);

Ok(())
}

/// Test append with cache control must success.
pub fn test_blocking_append_with_cache_control(op: BlockingOperator) -> Result<()> {
if !op.info().full_capability().write_with_cache_control {
return Ok(());
}

let path = uuid::Uuid::new_v4().to_string();
let (content, _) = gen_bytes();

let target_cache_control = "no-cache, no-store, max-age=300";
op.write_with(&path, content)
.append(true)
.cache_control(target_cache_control)
.call()?;

let meta = op.stat(&path).expect("stat must succeed");
assert_eq!(meta.mode(), EntryMode::FILE);
assert_eq!(
meta.cache_control().expect("cache control must exist"),
target_cache_control
);

op.delete(&path).expect("delete must succeed");

Ok(())
}

/// Test append with content type must success.
pub fn test_blocking_append_with_content_type(op: BlockingOperator) -> Result<()> {
if !op.info().full_capability().write_with_content_type {
return Ok(());
}

let path = uuid::Uuid::new_v4().to_string();
let (content, size) = gen_bytes();

let target_content_type = "application/json";
op.write_with(&path, content)
.append(true)
.content_type(target_content_type)
.call()?;

let meta = op.stat(&path).expect("stat must succeed");
assert_eq!(meta.mode(), EntryMode::FILE);
assert_eq!(
meta.content_type().expect("content type must exist"),
target_content_type
);
assert_eq!(meta.content_length(), size as u64);

op.delete(&path).expect("delete must succeed");

Ok(())
}

/// Write a single file with content disposition should succeed.
pub fn test_blocking_append_with_content_disposition(op: BlockingOperator) -> Result<()> {
if !op.info().full_capability().write_with_content_disposition {
return Ok(());
}

let path = uuid::Uuid::new_v4().to_string();
let (content, size) = gen_bytes();

let target_content_disposition = "attachment; filename=\"filename.jpg\"";
op.write_with(&path, content)
.append(true)
.content_disposition(target_content_disposition)
.call()?;

let meta = op.stat(&path).expect("stat must succeed");
assert_eq!(meta.mode(), EntryMode::FILE);
assert_eq!(
meta.content_disposition().expect("content type must exist"),
target_content_disposition
);
assert_eq!(meta.content_length(), size as u64);

op.delete(&path).expect("delete must succeed");

Ok(())
}

/// Copy data from reader to writer
pub fn test_blocking_appender_std_copy(op: BlockingOperator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();
let (content, size): (Vec<u8>, usize) =
gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);

let mut a = op.writer_with(&path).append(true).call()?;

// Wrap a buf reader here to make sure content is read in 1MiB chunks.
let mut cursor = BufReader::with_capacity(1024 * 1024, Cursor::new(content.clone()));
std::io::copy(&mut cursor, &mut a)?;
a.close()?;

let meta = op.stat(&path).expect("stat must succeed");
assert_eq!(meta.content_length(), size as u64);

let bs = op.read(&path)?;
assert_eq!(bs.len(), size, "read size");
assert_eq!(
format!("{:x}", Sha256::digest(&bs[..size])),
format!("{:x}", Sha256::digest(content)),
"read content"
);

op.delete(&path).expect("delete must succeed");
Ok(())
}

/// Test for fuzzing appender.
pub fn test_blocking_fuzz_appender(op: BlockingOperator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();

let mut fuzzer = ObjectWriterFuzzer::new(&path, None);

let mut a = op.writer_with(&path).append(true).call()?;

for _ in 0..100 {
match fuzzer.fuzz() {
ObjectWriterAction::Write(bs) => {
a.write(bs)?;
}
}
}
a.close()?;

let content = op.read(&path)?;
fuzzer.check(&content);

op.delete(&path).expect("delete file must success");

Ok(())
}
3 changes: 3 additions & 0 deletions core/tests/behavior/main.rs
Expand Up @@ -42,12 +42,14 @@ use rename::behavior_rename_tests;
use write::behavior_write_tests;

// Blocking test cases
mod blocking_append;
mod blocking_copy;
mod blocking_list;
mod blocking_read_only;
mod blocking_rename;
mod blocking_write;

use blocking_append::behavior_blocking_append_tests;
use blocking_copy::behavior_blocking_copy_tests;
use blocking_list::behavior_blocking_list_tests;
use blocking_read_only::behavior_blocking_read_only_tests;
Expand All @@ -73,6 +75,7 @@ fn behavior_test<B: Builder>() -> Vec<Trial> {

let mut trials = vec![];
// Blocking tests
trials.extend(behavior_blocking_append_tests(&operator));
trials.extend(behavior_blocking_copy_tests(&operator));
trials.extend(behavior_blocking_list_tests(&operator));
trials.extend(behavior_blocking_read_only_tests(&operator));
Expand Down