Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object_store: full HTTP range support #5222

Merged
merged 15 commits into from Jan 5, 2024
10 changes: 9 additions & 1 deletion object_store/src/azure/client.rs
Expand Up @@ -25,7 +25,7 @@ use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::util::deserialize_rfc1123;
use crate::util::{deserialize_rfc1123, GetRange};
use crate::{
ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, PutOptions, PutResult,
Result, RetryConfig,
Expand Down Expand Up @@ -356,6 +356,14 @@ impl GetClient for AzureClient {
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
async fn get_request(&self, path: &Path, options: GetOptions) -> Result<Response> {
// As of 2024-01-02, Azure does not support suffix requests,
// so we should fail fast here rather than sending one
if let Some(GetRange::Suffix(_)) = options.range.as_ref() {
return Err(crate::Error::NotSupported {
source: "Azure does not support suffix range requests".into(),
});
}

let credential = self.get_credential().await?;
let url = self.config.path_url(path);
let method = match options.head {
Expand Down
120 changes: 99 additions & 21 deletions object_store/src/client/get.rs
Expand Up @@ -15,13 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use std::ops::Range;

use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
use crate::{Error, GetOptions, GetResult};
use crate::{GetResultPayload, Result};
use crate::{GetOptions, GetRange, GetResult, GetResultPayload, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use hyper::header::CONTENT_RANGE;
use hyper::StatusCode;
use reqwest::header::ToStrError;
use reqwest::Response;
use snafu::{ensure, OptionExt, ResultExt, Snafu};

/// A client that can perform a get request
#[async_trait]
Expand All @@ -45,25 +50,98 @@ impl<T: GetClient> GetClientExt for T {
async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
let range = options.range.clone();
let response = self.get_request(location, options).await?;
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG).map_err(|e| {
Error::Generic {
store: T::STORE,
source: Box::new(e),
}
})?;

let stream = response
.bytes_stream()
.map_err(|source| Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.boxed();

Ok(GetResult {
range: range.unwrap_or(0..meta.size),
payload: GetResultPayload::Stream(stream),
meta,
get_result::<T>(location, range, response).map_err(|e| crate::Error::Generic {
store: T::STORE,
source: Box::new(e),
})
}
}
fn parse_content_range(s: &str) -> Option<Range<usize>> {
let rem = s.trim().strip_prefix("bytes ")?;
let (range, _) = rem.split_once('/')?;
let (start_s, end_s) = range.split_once('-')?;

let start = start_s.parse().ok()?;
let end: usize = end_s.parse().ok()?;

Some(start..(end + 1))
}

/// A specialized `Error` for get-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum GetResultError {
#[snafu(context(false))]
Header {
source: crate::client::header::Error,
},

#[snafu(context(false))]
InvalidRangeRequest {
source: crate::util::InvalidGetRange,
},

#[snafu(display("Received non-partial response when range requested"))]
NotPartial,

#[snafu(display("Content-Range header not present"))]
NoContentRange,

#[snafu(display("Failed to parse value for CONTENT_RANGE header: {value}"))]
ParseContentRange { value: String },

#[snafu(display("Content-Range header contained non UTF-8 characters"))]
InvalidContentRange { source: ToStrError },

#[snafu(display("Requested {expected:?}, got {actual:?}"))]
UnexpectedRange {
expected: Range<usize>,
actual: Range<usize>,
},
}

fn get_result<T: GetClient>(
location: &Path,
range: Option<GetRange>,
response: Response,
) -> Result<GetResult, GetResultError> {
let meta = header_meta(location, response.headers(), T::HEADER_CONFIG)?;

// ensure that we receive the range we asked for
let out_range = if let Some(r) = range {
let expected = r.as_range(meta.size)?;

ensure!(
response.status() == StatusCode::PARTIAL_CONTENT,
NotPartialSnafu
);
let val = response
.headers()
.get(CONTENT_RANGE)
.context(NoContentRangeSnafu)?;

let value = val.to_str().context(InvalidContentRangeSnafu)?;
let actual = parse_content_range(value).context(ParseContentRangeSnafu { value })?;
ensure!(
actual == expected,
UnexpectedRangeSnafu { expected, actual }
);
actual
} else {
0..meta.size
};

let stream = response
.bytes_stream()
.map_err(|source| crate::Error::Generic {
store: T::STORE,
source: Box::new(source),
})
.boxed();

Ok(GetResult {
range: out_range,
payload: GetResultPayload::Stream(stream),
meta,
})
}
3 changes: 1 addition & 2 deletions object_store/src/client/mod.rs
Expand Up @@ -580,8 +580,7 @@ impl GetOptionsExt for RequestBuilder {
use hyper::header::*;

if let Some(range) = options.range {
let range = format!("bytes={}-{}", range.start, range.end.saturating_sub(1));
self = self.header(RANGE, range);
self = self.header(RANGE, format!("bytes={range}"));
}

if let Some(tag) = options.if_match {
Expand Down
7 changes: 4 additions & 3 deletions object_store/src/lib.rs
Expand Up @@ -497,6 +497,7 @@ mod parse;
mod util;

pub use parse::{parse_url, parse_url_opts};
pub use util::GetRange;

use crate::path::Path;
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -581,7 +582,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
/// in the given byte range
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let options = GetOptions {
range: Some(range.clone()),
range: Some(range.into()),
..Default::default()
};
self.get_opts(location, options).await?.bytes().await
Expand Down Expand Up @@ -911,7 +912,7 @@ pub struct GetOptions {
/// otherwise returning [`Error::NotModified`]
///
/// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
pub range: Option<Range<usize>>,
pub range: Option<GetRange>,
/// Request a particular object version
pub version: Option<String>,
/// Request transfer of no content
Expand Down Expand Up @@ -1888,7 +1889,7 @@ mod tests {

// We can abort an in-progress write
let (upload_id, mut writer) = storage.put_multipart(&location).await.unwrap();
if let Some(chunk) = data.get(0) {
if let Some(chunk) = data.first() {
writer.write_all(chunk).await.unwrap();
let _ = writer.write(chunk).await.unwrap();
}
Expand Down
13 changes: 12 additions & 1 deletion object_store/src/local.rs
Expand Up @@ -19,6 +19,7 @@
use crate::{
maybe_spawn_blocking,
path::{absolute_path_to_url, Path},
util::InvalidGetRange,
GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutMode, PutOptions, PutResult, Result,
};
Expand Down Expand Up @@ -111,6 +112,11 @@ pub(crate) enum Error {
actual: usize,
},

#[snafu(display("Requested range was invalid"))]
InvalidRange {
source: InvalidGetRange,
},

#[snafu(display("Unable to copy file from {} to {}: {}", from.display(), to.display(), source))]
UnableToCopyFile {
from: PathBuf,
Expand Down Expand Up @@ -416,9 +422,14 @@ impl ObjectStore for LocalFileSystem {
let meta = convert_metadata(metadata, location)?;
options.check_preconditions(&meta)?;

let range = match options.range {
Some(r) => r.as_range(meta.size).context(InvalidRangeSnafu)?,
None => 0..meta.size,
};

Ok(GetResult {
payload: GetResultPayload::File(file, path),
range: options.range.unwrap_or(0..meta.size),
range,
meta,
})
})
Expand Down
24 changes: 21 additions & 3 deletions object_store/src/memory.rs
Expand Up @@ -51,6 +51,9 @@ enum Error {
#[snafu(display("Invalid range: {}..{}", range.start, range.end))]
BadRange { range: Range<usize> },

#[snafu(display("Invalid suffix: {} bytes", nbytes))]
BadSuffix { nbytes: usize },

#[snafu(display("Object already exists at that location: {path}"))]
AlreadyExists { path: String },

Expand Down Expand Up @@ -206,6 +209,8 @@ impl ObjectStore for InMemory {
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
use crate::util::GetRange::*;

let entry = self.entry(location).await?;
let e_tag = entry.e_tag.to_string();

Expand All @@ -221,9 +226,22 @@ impl ObjectStore for InMemory {
let (range, data) = match options.range {
Some(range) => {
let len = entry.data.len();
ensure!(range.end <= len, OutOfRangeSnafu { range, len });
ensure!(range.start <= range.end, BadRangeSnafu { range });
(range.clone(), entry.data.slice(range))
match range {
Bounded(r) => {
ensure!(r.end <= len, OutOfRangeSnafu { range: r, len });
ensure!(r.start <= r.end, BadRangeSnafu { range: r });
(r.clone(), entry.data.slice(r))
}
Offset(o) => {
ensure!(o < len, OutOfRangeSnafu { range: o..len, len });
(o..len, entry.data.slice(o..len))
}
Suffix(n) => {
ensure!(n < len, BadSuffixSnafu { nbytes: n });
let start = len - n;
(start..len, entry.data.slice(start..len))
}
}
}
None => (0..entry.data.len(), entry.data),
};
Expand Down