Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(object_store): Azure url signing #5259

Merged
merged 8 commits into from Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -14,6 +14,7 @@ parquet/data.parquet
justfile
.prettierignore
.env
.editorconfig
# local azurite file
__azurite*
__blobstorage__
Expand Down
6 changes: 3 additions & 3 deletions object_store/src/aws/credential.rs
Expand Up @@ -177,7 +177,7 @@ impl<'a> AwsAuthorizer<'a> {
request.headers_mut().insert(AUTH_HEADER, authorization_val);
}

pub(crate) fn sign(&self, method: Method, url: &mut Url, expires_in: Duration) {
pub(crate) fn sign(&self, method: &Method, url: &mut Url, expires_in: Duration) {
let date = self.date.unwrap_or_else(Utc::now);
let scope = self.scope(date);

Expand Down Expand Up @@ -212,7 +212,7 @@ impl<'a> AwsAuthorizer<'a> {
let string_to_sign = self.string_to_sign(
date,
&scope,
&method,
method,
url,
&canonical_headers,
&signed_headers,
Expand Down Expand Up @@ -766,7 +766,7 @@ mod tests {
};

let mut url = Url::parse("https://examplebucket.s3.amazonaws.com/test.txt").unwrap();
authorizer.sign(Method::GET, &mut url, Duration::from_secs(86400));
authorizer.sign(&Method::GET, &mut url, Duration::from_secs(86400));

assert_eq!(
url,
Expand Down
4 changes: 2 additions & 2 deletions object_store/src/aws/mod.rs
Expand Up @@ -136,14 +136,14 @@ impl Signer for AmazonS3 {
/// .build()?;
///
/// let url = s3.signed_url(
/// Method::PUT,
/// &Method::PUT,
/// &Path::from("some-folder/some-file.txt"),
/// Duration::from_secs(60 * 60)
/// ).await?;
/// # Ok(())
/// # }
/// ```
async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
async fn signed_url(&self, method: &Method, path: &Path, expires_in: Duration) -> Result<Url> {
let credential = self.credentials().get_credential().await?;
let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config().region);

Expand Down
113 changes: 111 additions & 2 deletions object_store/src/azure/client.rs
Expand Up @@ -46,6 +46,7 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use url::Url;

const VERSION_HEADER: &str = "x-ms-version-id";
Expand Down Expand Up @@ -101,6 +102,18 @@ pub(crate) enum Error {

#[snafu(display("ETag required for conditional update"))]
MissingETag,

#[snafu(display("Error requesting user delegation key: {}", source))]
DelegationKeyRequest { source: crate::client::retry::Error },

#[snafu(display("Error getting user delegation key response body: {}", source))]
DelegationKeyResponseBody { source: reqwest::Error },

#[snafu(display("Got invalid user delegation key response: {}", source))]
DelegationKeyResponse { source: quick_xml::de::DeError },

#[snafu(display("Generating SAS keys with SAS tokens auth is not supported"))]
SASforSASNotSupported,
}

impl From<Error> for crate::Error {
Expand Down Expand Up @@ -324,6 +337,74 @@ impl AzureClient {
Ok(())
}

/// Make a Get User Delegation Key request
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-user-delegation-key>
async fn get_user_delegation_key(
&self,
start: &DateTime<Utc>,
tustvold marked this conversation as resolved.
Show resolved Hide resolved
end: &DateTime<Utc>,
) -> Result<UserDelegationKey> {
let credential = self.get_credential().await?;
let url = self.config.service.clone();

let start = start.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
let expiry = end.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);

let mut body = String::new();
body.push_str("<?xml version=\"1.0\" encoding=\"utf-8\"?>\n<KeyInfo>\n");
body.push_str(&format!(
"\t<Start>{start}</Start>\n\t<Expiry>{expiry}</Expiry>\n"
));
body.push_str("</KeyInfo>");

let response = self
.client
.request(Method::POST, url)
.body(body)
.query(&[("restype", "service"), ("comp", "userdelegationkey")])
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
.context(DelegationKeyRequestSnafu)?
.bytes()
.await
.context(DelegationKeyResponseBodySnafu)?;

let response: UserDelegationKey =
quick_xml::de::from_reader(response.reader()).context(DelegationKeyResponseSnafu)?;

Ok(response)
}

pub async fn signer(&self, expires_in: Duration) -> Result<AzureSigner> {
roeap marked this conversation as resolved.
Show resolved Hide resolved
let credential = self.get_credential().await?;
let signed_start = chrono::Utc::now();
let signed_expiry = signed_start + expires_in;
match credential.as_ref() {
AzureCredential::BearerToken(_) => {
let key = self
.get_user_delegation_key(&signed_start, &signed_expiry)
.await?;
let signing_key = AzureAccessKey::try_new(&key.value)?;
Ok(AzureSigner::new(
signing_key,
self.config.account.clone(),
signed_start,
signed_expiry,
Some(key),
))
}
AzureCredential::AccessKey(key) => Ok(AzureSigner::new(
key.to_owned(),
self.config.account.clone(),
signed_start,
signed_expiry,
None,
)),
_ => Err(Error::SASforSASNotSupported.into()),
}
}

#[cfg(test)]
pub async fn get_blob_tagging(&self, path: &Path) -> Result<Response> {
let credential = self.get_credential().await?;
Expand Down Expand Up @@ -600,6 +681,18 @@ impl BlockList {
}
}

#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) struct UserDelegationKey {
pub signed_oid: String,
pub signed_tid: String,
pub signed_start: String,
pub signed_expiry: String,
pub signed_service: String,
pub signed_version: String,
pub value: String,
Comment on lines +691 to +697
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these fields need to be public? Do we expect users to use this directly?

}

#[cfg(test)]
mod tests {
use bytes::Bytes;
Expand Down Expand Up @@ -757,8 +850,7 @@ mod tests {
<NextMarker/>
</EnumerationResults>";

let mut _list_blobs_response_internal: ListResultInternal =
quick_xml::de::from_str(S).unwrap();
let _list_blobs_response_internal: ListResultInternal = quick_xml::de::from_str(S).unwrap();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive by: remove unnecessary mut

}

#[test]
Expand All @@ -778,4 +870,21 @@ mod tests {

assert_eq!(res, S)
}

#[test]
fn test_delegated_key_response() {
const S: &str = r#"<?xml version="1.0" encoding="utf-8"?>
<UserDelegationKey>
<SignedOid>String containing a GUID value</SignedOid>
<SignedTid>String containing a GUID value</SignedTid>
<SignedStart>String formatted as ISO date</SignedStart>
<SignedExpiry>String formatted as ISO date</SignedExpiry>
<SignedService>b</SignedService>
<SignedVersion>String specifying REST api version to use to create the user delegation key</SignedVersion>
<Value>String containing the user delegation key</Value>
</UserDelegationKey>"#;

let _delegated_key_response_internal: UserDelegationKey =
quick_xml::de::from_str(S).unwrap();
}
}