Skip to content

Commit

Permalink
pr feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Jan 4, 2024
1 parent 156ab15 commit 475219a
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 89 deletions.
4 changes: 2 additions & 2 deletions object_store/src/aws/credential.rs
Expand Up @@ -177,7 +177,7 @@ impl<'a> AwsAuthorizer<'a> {
request.headers_mut().insert(AUTH_HEADER, authorization_val);
}

pub(crate) fn sign(&self, method: Method, url: &mut Url, expires_in: Duration) {
pub(crate) fn sign(&self, method: &Method, url: &mut Url, expires_in: Duration) {
let date = self.date.unwrap_or_else(Utc::now);
let scope = self.scope(date);

Expand Down Expand Up @@ -766,7 +766,7 @@ mod tests {
};

let mut url = Url::parse("https://examplebucket.s3.amazonaws.com/test.txt").unwrap();
authorizer.sign(Method::GET, &mut url, Duration::from_secs(86400));
authorizer.sign(&Method::GET, &mut url, Duration::from_secs(86400));

assert_eq!(
url,
Expand Down
2 changes: 1 addition & 1 deletion object_store/src/aws/mod.rs
Expand Up @@ -143,7 +143,7 @@ impl Signer for AmazonS3 {
/// # Ok(())
/// # }
/// ```
async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
async fn signed_url(&self, method: &Method, path: &Path, expires_in: Duration) -> Result<Url> {
let credential = self.credentials().get_credential().await?;
let authorizer = AwsAuthorizer::new(&credential, "s3", &self.client.config().region);

Expand Down
37 changes: 35 additions & 2 deletions object_store/src/azure/client.rs
Expand Up @@ -46,6 +46,7 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use url::Url;

const VERSION_HEADER: &str = "x-ms-version-id";
Expand Down Expand Up @@ -110,6 +111,9 @@ pub(crate) enum Error {

#[snafu(display("Got invalid user delegation key response: {}", source))]
DelegationKeyResponse { source: quick_xml::de::DeError },

#[snafu(display("Generating SAS keys with SAS tokens auth is not supported"))]
SASforSASNotSupported,
}

impl From<Error> for crate::Error {
Expand Down Expand Up @@ -335,7 +339,7 @@ impl AzureClient {

/// Make a Get User Delegation Key request
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-user-delegation-key>
pub async fn get_user_delegation_key(
async fn get_user_delegation_key(
&self,
start: &DateTime<Utc>,
end: &DateTime<Utc>,
Expand Down Expand Up @@ -372,6 +376,35 @@ impl AzureClient {
Ok(response)
}

pub async fn signer(&self, expires_in: Duration) -> Result<AzureSigner> {
let credential = self.get_credential().await?;
let signed_start = chrono::Utc::now();
let signed_expiry = signed_start + expires_in;
match credential.as_ref() {
AzureCredential::BearerToken(_) => {
let key = self
.get_user_delegation_key(&signed_start, &signed_expiry)
.await?;
let signing_key = AzureAccessKey::try_new(&key.value)?;
Ok(AzureSigner::new(
signing_key,
self.config.account.clone(),
signed_start,
signed_expiry,
Some(key),
))
}
AzureCredential::AccessKey(key) => Ok(AzureSigner::new(
key.to_owned(),
self.config.account.clone(),
signed_start,
signed_expiry,
None,
)),
_ => Err(Error::SASforSASNotSupported.into()),
}
}

#[cfg(test)]
pub async fn get_blob_tagging(&self, path: &Path) -> Result<Response> {
let credential = self.get_credential().await?;
Expand Down Expand Up @@ -650,7 +683,7 @@ impl BlockList {

#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct UserDelegationKey {
pub(crate) struct UserDelegationKey {
pub signed_oid: String,
pub signed_tid: String,
pub signed_start: String,
Expand Down
105 changes: 55 additions & 50 deletions object_store/src/azure/credential.rs
Expand Up @@ -101,7 +101,7 @@ impl From<Error> for crate::Error {
}

/// A shared Azure Storage Account Key
#[derive(Debug, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct AzureAccessKey(Vec<u8>);

impl AzureAccessKey {
Expand Down Expand Up @@ -141,24 +141,63 @@ pub mod authority_hosts {
pub const AZURE_PUBLIC_CLOUD: &str = "https://login.microsoftonline.com";
}

pub(crate) struct AzureSigner {
signing_key: AzureAccessKey,
start: DateTime<Utc>,
end: DateTime<Utc>,
account: String,
delegation_key: Option<UserDelegationKey>,
}

impl AzureSigner {
pub fn new(
signing_key: AzureAccessKey,
account: String,
start: DateTime<Utc>,
end: DateTime<Utc>,
delegation_key: Option<UserDelegationKey>,
) -> Self {
Self {
signing_key,
account,
start,
end,
delegation_key,
}
}

pub fn sign(&self, method: &Method, url: &mut Url) -> Result<()> {
let (str_to_sign, query_pairs) = match &self.delegation_key {
Some(delegation_key) => string_to_sign_user_delegation_sas(
url,
&method,
&self.account,
&self.start,
&self.end,
delegation_key,
),
None => string_to_sign_service_sas(url, method, &self.account, &self.start, &self.end),
};
let auth = hmac_sha256(&self.signing_key.0, str_to_sign);
url.query_pairs_mut().extend_pairs(query_pairs);
url.query_pairs_mut()
.append_pair("sig", BASE64_STANDARD.encode(auth).as_str());
Ok(())
}
}

/// Authorize a [`Request`] with an [`AzureAuthorizer`]
#[derive(Debug)]
pub struct AzureAuthorizer<'a> {
credential: &'a AzureCredential,
delegation_key: Option<&'a UserDelegationKey>,
account: &'a str,
}

impl<'a> AzureAuthorizer<'a> {
/// Create a new [`AzureAuthorizer`]
pub fn new(
credential: &'a AzureCredential,
delegation_key: Option<&'a UserDelegationKey>,
account: &'a str,
) -> Self {
pub fn new(credential: &'a AzureCredential, account: &'a str) -> Self {
AzureAuthorizer {
credential,
delegation_key,
account,
}
}
Expand Down Expand Up @@ -206,44 +245,6 @@ impl<'a> AzureAuthorizer<'a> {
}
}
}

/// Sign a url with a shared access signature (SAS).
pub(crate) fn sign(
&self,
method: Method,
url: &mut Url,
start: &DateTime<Utc>,
end: &DateTime<Utc>,
) -> Result<()> {
if let Some(delegation_key) = self.delegation_key {
let (str_to_sign, query_pairs) = string_to_sign_user_delegation_sas(
url,
&method,
self.account,
start,
end,
delegation_key,
);
let signing_key = AzureAccessKey::try_new(&delegation_key.value)?;
let auth = hmac_sha256(signing_key.0, str_to_sign);
url.query_pairs_mut().extend_pairs(query_pairs);
url.query_pairs_mut()
.append_pair("sig", BASE64_STANDARD.encode(auth).as_str());
return Ok(());
}
match self.credential {
AzureCredential::AccessKey(key) => {
let (str_to_sign, query_pairs) =
string_to_sign_service_sas(url, &method, self.account, start, end);
let auth = hmac_sha256(&key.0, str_to_sign);
url.query_pairs_mut().extend_pairs(query_pairs);
url.query_pairs_mut()
.append_pair("sig", BASE64_STANDARD.encode(auth).as_str());
}
_ => return Err(Error::SASforSASNotSupported),
};
Ok(())
}
}

pub(crate) trait CredentialExt {
Expand All @@ -257,7 +258,7 @@ impl CredentialExt for RequestBuilder {
let (client, request) = self.build_split();
let mut request = request.expect("request valid");

AzureAuthorizer::new(credential, None, account).authorize(&mut request);
AzureAuthorizer::new(credential, account).authorize(&mut request);

Self::from_parts(client, request)
}
Expand Down Expand Up @@ -339,6 +340,9 @@ fn string_to_sign_sas(
)
}

/// Create a string to be signed for authorization via [service sas].
///
/// [service sas]: https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#version-2020-12-06-and-later
fn string_to_sign_service_sas(
u: &Url,
method: &Method,
Expand All @@ -349,7 +353,6 @@ fn string_to_sign_service_sas(
let (signed_resource, signed_permissions, signed_start, signed_expiry, canonicalized_resource) =
string_to_sign_sas(u, method, account, start, end);

// https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#version-2020-12-06-and-later
let string_to_sign = format!(
"{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
signed_permissions,
Expand Down Expand Up @@ -380,6 +383,9 @@ fn string_to_sign_service_sas(
(string_to_sign, pairs)
}

/// Create a string to be signed for authorization via [user delegation sas].
///
/// [user delegation sas]: https://learn.microsoft.com/en-us/rest/api/storageservices/create-user-delegation-sas#version-2020-12-06-and-later
fn string_to_sign_user_delegation_sas(
u: &Url,
method: &Method,
Expand All @@ -391,7 +397,6 @@ fn string_to_sign_user_delegation_sas(
let (signed_resource, signed_permissions, signed_start, signed_expiry, canonicalized_resource) =
string_to_sign_sas(u, method, account, start, end);

// https://learn.microsoft.com/en-us/rest/api/storageservices/create-user-delegation-sas#version-2020-12-06-and-later
let string_to_sign = format!(
"{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
signed_permissions,
Expand Down Expand Up @@ -1047,7 +1052,7 @@ mod tests {
integration.put(&path, data.clone()).await.unwrap();

let signed = integration
.signed_url(Method::GET, &path, Duration::from_secs(60))
.signed_url(&Method::GET, &path, Duration::from_secs(60))
.await
.unwrap();

Expand Down
53 changes: 20 additions & 33 deletions object_store/src/azure/mod.rs
Expand Up @@ -168,29 +168,28 @@ impl Signer for MicrosoftAzure {
/// # Ok(())
/// # }
/// ```
async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url> {
let credential = self.credentials().get_credential().await?;
let signed_start = chrono::Utc::now();
let signed_expiry = signed_start + expires_in;
let delegation_key = match credential.as_ref() {
AzureCredential::BearerToken(_) => Some(
self.client
.get_user_delegation_key(&signed_start, &signed_expiry)
.await?,
),
_ => None,
};

let authorizer = AzureAuthorizer::new(
&credential,
delegation_key.as_ref(),
&self.client.config().account,
);
async fn signed_url(&self, method: &Method, path: &Path, expires_in: Duration) -> Result<Url> {
let mut url = self.path_url(path);
authorizer.sign(method, &mut url, &signed_start, &signed_expiry)?;

let signer = self.client.signer(expires_in).await?;
signer.sign(method, &mut url)?;
Ok(url)
}

async fn signed_urls(
&self,
method: &Method,
paths: &[Path],
expires_in: Duration,
) -> Result<Vec<Url>> {
let mut urls = Vec::with_capacity(paths.len());
let signer = self.client.signer(expires_in).await?;
for path in paths {
let mut url = self.path_url(path);
signer.sign(&method, &mut url)?;
urls.push(url);
}
Ok(urls)
}
}

/// Relevant docs: <https://azure.github.io/Storage/docs/application-and-user-data/basics/azure-blob-storage-upload-apis/>
Expand Down Expand Up @@ -293,24 +292,12 @@ mod tests {
.build()
.unwrap();

let start = chrono::Utc::now();
let end = start + chrono::Duration::days(1);

let key = integration
.client
.get_user_delegation_key(&start, &end)
.await
.unwrap();

assert!(!key.value.is_empty());
assert_eq!(key.signed_tid, tenant_id);

let data = Bytes::from("hello world");
let path = Path::from("file.txt");
integration.put(&path, data.clone()).await.unwrap();

let signed = integration
.signed_url(Method::GET, &path, Duration::from_secs(60))
.signed_url(&Method::GET, &path, Duration::from_secs(60))
.await
.unwrap();

Expand Down
18 changes: 17 additions & 1 deletion object_store/src/signer.rs
Expand Up @@ -30,5 +30,21 @@ pub trait Signer: Send + Sync + fmt::Debug + 'static {
/// the URL should be valid, return a signed [`Url`] created with the object store
/// implementation's credentials such that the URL can be handed to something that doesn't have
/// access to the object store's credentials, to allow limited access to the object store.
async fn signed_url(&self, method: Method, path: &Path, expires_in: Duration) -> Result<Url>;
async fn signed_url(&self, method: &Method, path: &Path, expires_in: Duration) -> Result<Url>;

/// Generate signed urls for multiple paths.
///
/// See [`Signer::signed_url`] for more details.
async fn signed_urls(
&self,
method: &Method,
paths: &[Path],
expires_in: Duration,
) -> Result<Vec<Url>> {
let mut urls = Vec::with_capacity(paths.len());
for path in paths {
urls.push(self.signed_url(method, path, expires_in).await?);
}
Ok(urls)
}
}

0 comments on commit 475219a

Please sign in to comment.