Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object_store: full HTTP range support #5222

Merged
merged 15 commits into from
Jan 5, 2024
19 changes: 19 additions & 0 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ pub(crate) enum Error {
source: crate::client::header::Error,
},

#[snafu(display("Operation not supported by this store: {reason}"))]
NotSupported { reason: &'static str },

#[snafu(display("ETag required for conditional update"))]
MissingETag,
}
Expand All @@ -109,6 +112,9 @@ impl From<Error> for crate::Error {
Error::GetRequest { source, path }
| Error::DeleteRequest { source, path }
| Error::PutRequest { source, path } => source.error(STORE, path),
Error::NotSupported { .. } => Self::NotSupported {
source: Box::new(err),
},
_ => Self::Generic {
store: STORE,
source: Box::new(err),
Expand Down Expand Up @@ -356,6 +362,19 @@ impl GetClient for AzureClient {
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
// As of 2024-01-02, Azure does not support suffix requests,
// so we should fail fast here rather than sending one
if let Some(r) = options.range.as_ref() {
match r {
crate::util::GetRange::Suffix(_) => {
Err(Error::NotSupported {
reason: "suffix request",
})?;
}
_ => (),
}
}

let credential = self.get_credential().await?;
let url = self.config.path_url(path);
let method = match options.head {
Expand Down
85 changes: 74 additions & 11 deletions object_store/src/client/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,62 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::Range;

use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::{Error, GetOptions, GetResult};
use crate::util::{as_generic_err, response_range};
use crate::{GetOptions, GetResult};
use crate::{GetResultPayload, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use reqwest::Response;
use snafu::{ResultExt, Snafu};

/// A specialized `Error` for get-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub(crate) enum Error {
#[snafu(display("Could not extract metadata from response headers"))]
Header {
store: &'static str,
source: crate::client::header::Error,
},

#[snafu(display("Requested an invalid range"))]
InvalidRangeRequest {
store: &'static str,
source: crate::util::InvalidGetRange,
},

#[snafu(display("Got an invalid range response"))]
InvalidRangeResponse {
store: &'static str,
source: crate::util::InvalidRangeResponse,
},

#[snafu(display("Requested {expected:?}, got {actual:?}"))]
UnexpectedRange {
store: &'static str,
expected: Range<usize>,
actual: Range<usize>,
},
}

impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
let store = match err {
Error::Header { store, .. } => store,
Error::InvalidRangeRequest { store, .. } => store,
Error::InvalidRangeResponse { store, .. } => store,
Error::UnexpectedRange { store, .. } => store,
};
Self::Generic {
store: store,
source: Box::new(err),
}
}
}

/// A client that can perform a get request
#[async_trait]
Expand All @@ -45,23 +94,37 @@ impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
let response = self.get_request(location, options).await?;
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG)
.context(HeaderSnafu { store: T::STORE })?;

// ensure that we receive the range we asked for
let out_range = if let Some(r) = range {
let actual = r
.as_range(meta.size)
.context(InvalidRangeRequestSnafu { store: T::STORE })?;

let expected =
response_range(&response).context(InvalidRangeResponseSnafu { store: T::STORE })?;

if actual != expected {
Err(Error::UnexpectedRange {
store: T::STORE,
expected,
actual: actual.clone(),
})?;
}
})?;
actual
} else {
0..meta.size
};

let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.map_err(|source| as_generic_err(T::STORE, source))
.boxed();

Ok(GetResult {
range: range.unwrap_or(0..meta.size),
range: out_range,
payload: GetResultPayload::Stream(stream),
meta,
})
Expand Down
3 changes: 1 addition & 2 deletions object_store/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,7 @@ impl GetOptionsExt for RequestBuilder {
use hyper::header::*;

if let Some(range) = options.range {
let range = format!("bytes={}-{}", range.start, range.end.saturating_sub(1));
self = self.header(RANGE, range);
self = self.header(RANGE, format!("bytes={range}"));
}

if let Some(tag) = options.if_match {
Expand Down
5 changes: 3 additions & 2 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ mod parse;
mod util;

pub use parse::{parse_url, parse_url_opts};
use util::GetRange;

use crate::path::Path;
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -581,7 +582,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// in the given byte range
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let options = GetOptions {
range: Some(range.clone()),
range: Some(range.into()),
..Default::default()
};
self.get_opts(location, options).await?.bytes().await
Expand Down Expand Up @@ -911,7 +912,7 @@ pub struct GetOptions {
/// otherwise returning [`Error::NotModified`]
///
/// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
pub range: Option<Range<usize>>,
pub range: Option<GetRange>,
/// Request a particular object version
pub version: Option<String>,
/// Request transfer of no content
Expand Down
14 changes: 13 additions & 1 deletion object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use crate::{
maybe_spawn_blocking,
path::{absolute_path_to_url, Path},
util::InvalidGetRange,
GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutMode, PutOptions, PutResult, Result,
};
Expand Down Expand Up @@ -111,6 +112,11 @@ pub(crate) enum Error {
actual: usize,
},

#[snafu(display("Requested range was invalid"))]
InvalidRange {
source: InvalidGetRange,
},

#[snafu(display("Unable to copy file from {} to {}: {}", from.display(), to.display(), source))]
UnableToCopyFile {
from: PathBuf,
Expand Down Expand Up @@ -416,9 +422,15 @@ impl ObjectStore for LocalFileSystem {
let meta = convert_metadata(metadata, location)?;
options.check_preconditions(&meta)?;

let range = if let Some(r) = options.range {
r.as_range(meta.size).context(InvalidRangeSnafu)?
} else {
0..meta.size
};

Ok(GetResult {
payload: GetResultPayload::File(file, path),
range: options.range.unwrap_or(0..meta.size),
range,
meta,
})
})
Expand Down
24 changes: 21 additions & 3 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ enum Error {
#[snafu(display("Invalid range: {}..{}", range.start, range.end))]
BadRange { range: Range<usize> },

#[snafu(display("Invalid suffix: {} bytes", nbytes))]
BadSuffix { nbytes: usize },

#[snafu(display("Object already exists at that location: {path}"))]
AlreadyExists { path: String },

Expand Down Expand Up @@ -206,6 +209,8 @@ impl ObjectStore for InMemory {
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
use crate::util::GetRange::*;

let entry = self.entry(location).await?;
let e_tag = entry.e_tag.to_string();

Expand All @@ -221,9 +226,22 @@ impl ObjectStore for InMemory {
let (range, data) = match options.range {
Some(range) => {
let len = entry.data.len();
ensure!(range.end <= len, OutOfRangeSnafu { range, len });
ensure!(range.start <= range.end, BadRangeSnafu { range });
(range.clone(), entry.data.slice(range))
match range {
Bounded(r) => {
ensure!(r.end <= len, OutOfRangeSnafu { range: r, len });
ensure!(r.start <= r.end, BadRangeSnafu { range: r });
(r.clone(), entry.data.slice(r))
}
Offset(o) => {
ensure!(o < len, OutOfRangeSnafu { range: o..len, len });
(o..len, entry.data.slice(o..len))
}
Suffix(n) => {
ensure!(n < len, BadSuffixSnafu { nbytes: n });
let start = len - n;
(start..len, entry.data.slice(start..len))
}
}
}
None => (0..entry.data.len(), entry.data),
};
Expand Down