Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object_store: full HTTP range support #5222

Merged
merged 15 commits into from Jan 5, 2024
1 change: 1 addition & 0 deletions object_store/Cargo.toml
Expand Up @@ -54,6 +54,7 @@ reqwest = { version = "0.11", default-features = false, features = ["rustls-tls-
ring = { version = "0.17", default-features = false, features = ["std"], optional = true }
rustls-pemfile = { version = "1.0", default-features = false, optional = true }
tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util"] }
http-content-range = "0.1.2"
clbarnes marked this conversation as resolved.
Show resolved Hide resolved

[target.'cfg(target_family="unix")'.dev-dependencies]
nix = { version = "0.27.1", features = ["fs"] }
Expand Down
44 changes: 33 additions & 11 deletions object_store/src/client/get.rs
Expand Up @@ -15,13 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::Range;

use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::{Error, GetOptions, GetResult};
use crate::util::{as_generic_err, response_range};
use crate::{GetOptions, GetResult};
use crate::{GetResultPayload, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use reqwest::Response;
use snafu::Snafu;

/// A client that can perform a get request
#[async_trait]
Expand All @@ -34,6 +38,13 @@ pub trait GetClient: Send + Sync + 'static {
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response>;
}

#[derive(Debug, Snafu)]
#[snafu(display("Requested range {expected:?}, got {actual:?}"))]
pub struct UnexpectedRange {
expected: Range<usize>,
actual: Range<usize>,
}

/// Extension trait for [`GetClient`] that adds common retrieval functionality
#[async_trait]
pub trait GetClientExt {
Expand All @@ -45,23 +56,34 @@ impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
let response = self.get_request(location, options).await?;
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG)
.map_err(|e| as_generic_err(T::STORE, e))?;

// ensure that we receive the range we asked for
let out_range = if let Some(r) = range {
let actual = r
.as_range(meta.size)
.map_err(|source| as_generic_err(T::STORE, source))?;
let expected =
response_range(&response).map_err(|source| as_generic_err(T::STORE, source))?;
if actual != expected {
return Err(as_generic_err(
T::STORE,
UnexpectedRange { expected, actual },
));
}
})?;
actual
} else {
0..meta.size
};

let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.map_err(|source| as_generic_err(T::STORE, source))
.boxed();

Ok(GetResult {
range: range.unwrap_or(0..meta.size),
range: out_range,
payload: GetResultPayload::Stream(stream),
meta,
})
Expand Down
3 changes: 1 addition & 2 deletions object_store/src/client/mod.rs
Expand Up @@ -580,8 +580,7 @@ impl GetOptionsExt for RequestBuilder {
use hyper::header::*;

if let Some(range) = options.range {
let range = format!("bytes={}-{}", range.start, range.end.saturating_sub(1));
self = self.header(RANGE, range);
self = self.header(RANGE, format!("bytes={range}"));
}

if let Some(tag) = options.if_match {
Expand Down
5 changes: 3 additions & 2 deletions object_store/src/lib.rs
Expand Up @@ -497,6 +497,7 @@ mod parse;
mod util;

pub use parse::{parse_url, parse_url_opts};
use util::GetRange;

use crate::path::Path;
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -581,7 +582,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// in the given byte range
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let options = GetOptions {
range: Some(range.clone()),
range: Some(range.into()),
..Default::default()
};
self.get_opts(location, options).await?.bytes().await
Expand Down Expand Up @@ -911,7 +912,7 @@ pub struct GetOptions {
/// otherwise returning [`Error::NotModified`]
///
/// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
pub range: Option<Range<usize>>,
pub range: Option<GetRange>,
/// Request a particular object version
pub version: Option<String>,
/// Request transfer of no content
Expand Down
12 changes: 11 additions & 1 deletion object_store/src/local.rs
Expand Up @@ -19,6 +19,7 @@
use crate::{
maybe_spawn_blocking,
path::{absolute_path_to_url, Path},
util::as_generic_err,
GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutMode, PutOptions, PutResult, Result,
};
Expand All @@ -43,6 +44,8 @@ use tokio::io::AsyncWrite;
use url::Url;
use walkdir::{DirEntry, WalkDir};

const STORE: &'static str = "LocalFileSystem";

/// A specialized `Error` for filesystem object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
Expand Down Expand Up @@ -416,9 +419,16 @@ impl ObjectStore for LocalFileSystem {
let meta = convert_metadata(metadata, location)?;
options.check_preconditions(&meta)?;

let range = if let Some(r) = options.range {
r.as_range(meta.size)
.map_err(|e| as_generic_err(STORE, e))?
clbarnes marked this conversation as resolved.
Show resolved Hide resolved
} else {
0..meta.size
};

Ok(GetResult {
payload: GetResultPayload::File(file, path),
range: options.range.unwrap_or(0..meta.size),
range,
meta,
})
})
Expand Down
24 changes: 21 additions & 3 deletions object_store/src/memory.rs
Expand Up @@ -51,6 +51,9 @@ enum Error {
#[snafu(display("Invalid range: {}..{}", range.start, range.end))]
BadRange { range: Range<usize> },

#[snafu(display("Invalid suffix: {} bytes", nbytes))]
BadSuffix { nbytes: usize },

#[snafu(display("Object already exists at that location: {path}"))]
AlreadyExists { path: String },

Expand Down Expand Up @@ -206,6 +209,8 @@ impl ObjectStore for InMemory {
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
use crate::util::GetRange::*;

let entry = self.entry(location).await?;
let e_tag = entry.e_tag.to_string();

Expand All @@ -221,9 +226,22 @@ impl ObjectStore for InMemory {
let (range, data) = match options.range {
Some(range) => {
let len = entry.data.len();
ensure!(range.end <= len, OutOfRangeSnafu { range, len });
ensure!(range.start <= range.end, BadRangeSnafu { range });
(range.clone(), entry.data.slice(range))
match range {
Bounded(r) => {
ensure!(r.end <= len, OutOfRangeSnafu { range: r, len });
ensure!(r.start <= r.end, BadRangeSnafu { range: r });
(r.clone(), entry.data.slice(r))
}
Offset(o) => {
ensure!(o < len, OutOfRangeSnafu { range: o..len, len });
(o..len, entry.data.slice(o..len))
}
Suffix(n) => {
ensure!(n < len, BadSuffixSnafu { nbytes: n });
let start = len - n;
(start..len, entry.data.slice(start..len))
}
}
}
None => (0..entry.data.len(), entry.data),
};
Expand Down