Skip to content

Commit

Permalink
feat: sign with user delegated keys
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Dec 29, 2023
1 parent 1c0f33b commit 5d68d75
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 18 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -14,6 +14,7 @@ parquet/data.parquet
justfile
.prettierignore
.env
.editorconfig
# local azurite file
__azurite*
__blobstorage__
Expand Down
80 changes: 78 additions & 2 deletions object_store/src/azure/client.rs
Expand Up @@ -101,6 +101,15 @@ pub(crate) enum Error {

#[snafu(display("ETag required for conditional update"))]
MissingETag,

#[snafu(display("Error requesting user delegation key: {}", source))]
DelegationKeyRequest { source: crate::client::retry::Error },

#[snafu(display("Error getting user delegation key response body: {}", source))]
DelegationKeyResponseBody { source: reqwest::Error },

#[snafu(display("Got invalid user delegation key response: {}", source))]
DelegationKeyResponse { source: quick_xml::de::DeError },
}

impl From<Error> for crate::Error {
Expand Down Expand Up @@ -324,6 +333,45 @@ impl AzureClient {
Ok(())
}

/// Make a Get User Delegation Key request
/// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-user-delegation-key>
pub async fn get_user_delegation_key(
&self,
start: &DateTime<Utc>,
end: &DateTime<Utc>,
) -> Result<UserDelegationKey> {
let credential = self.get_credential().await?;
let url = self.config.service.clone();

let start = start.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
let expiry = end.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);

let mut body = String::new();
body.push_str("<?xml version=\"1.0\" encoding=\"utf-8\"?>\n<KeyInfo>\n");
body.push_str(&format!(
"\t<Start>{start}</Start>\n\t<Expiry>{expiry}</Expiry>\n"
));
body.push_str("</KeyInfo>");

let response = self
.client
.request(Method::POST, url)
.body(body)
.query(&[("restype", "service"), ("comp", "userdelegationkey")])
.with_azure_authorization(&credential, &self.config.account)
.send_retry(&self.config.retry_config)
.await
.context(DelegationKeyRequestSnafu)?
.bytes()
.await
.context(DelegationKeyResponseBodySnafu)?;

let response: UserDelegationKey =
quick_xml::de::from_reader(response.reader()).context(DelegationKeyResponseSnafu)?;

Ok(response)
}

#[cfg(test)]
pub async fn get_blob_tagging(&self, path: &Path) -> Result<Response> {
let credential = self.get_credential().await?;
Expand Down Expand Up @@ -600,6 +648,18 @@ impl BlockList {
}
}

#[derive(Debug, Clone, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct UserDelegationKey {
pub signed_oid: String,
pub signed_tid: String,
pub signed_start: String,
pub signed_expiry: String,
pub signed_service: String,
pub signed_version: String,
pub value: String,
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
Expand Down Expand Up @@ -757,8 +817,7 @@ mod tests {
<NextMarker/>
</EnumerationResults>";

let mut _list_blobs_response_internal: ListResultInternal =
quick_xml::de::from_str(S).unwrap();
let _list_blobs_response_internal: ListResultInternal = quick_xml::de::from_str(S).unwrap();
}

#[test]
Expand All @@ -778,4 +837,21 @@ mod tests {

assert_eq!(res, S)
}

#[test]
fn test_delegated_key_response() {
const S: &str = r#"<?xml version="1.0" encoding="utf-8"?>
<UserDelegationKey>
<SignedOid>String containing a GUID value</SignedOid>
<SignedTid>String containing a GUID value</SignedTid>
<SignedStart>String formatted as ISO date</SignedStart>
<SignedExpiry>String formatted as ISO date</SignedExpiry>
<SignedService>b</SignedService>
<SignedVersion>String specifying REST api version to use to create the user delegation key</SignedVersion>
<Value>String containing the user delegation key</Value>
</UserDelegationKey>"#;

let _delegated_key_response_internal: UserDelegationKey =
quick_xml::de::from_str(S).unwrap();
}
}
130 changes: 117 additions & 13 deletions object_store/src/azure/credential.rs
Expand Up @@ -42,6 +42,8 @@ use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use url::Url;

use super::client::UserDelegationKey;

static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2023-11-03");
static VERSION: HeaderName = HeaderName::from_static("x-ms-version");
pub(crate) static BLOB_TYPE: HeaderName = HeaderName::from_static("x-ms-blob-type");
Expand Down Expand Up @@ -143,14 +145,20 @@ pub mod authority_hosts {
#[derive(Debug)]
pub struct AzureAuthorizer<'a> {
credential: &'a AzureCredential,
delegation_key: Option<&'a UserDelegationKey>,
account: &'a str,
}

impl<'a> AzureAuthorizer<'a> {
/// Create a new [`AzureAuthorizer`]
pub fn new(credential: &'a AzureCredential, account: &'a str) -> Self {
pub fn new(
credential: &'a AzureCredential,
delegation_key: Option<&'a UserDelegationKey>,
account: &'a str,
) -> Self {
AzureAuthorizer {
credential,
delegation_key,
account,
}
}
Expand Down Expand Up @@ -199,20 +207,40 @@ impl<'a> AzureAuthorizer<'a> {
}
}

pub(crate) fn sign(&self, method: Method, url: &mut Url, expires_in: Duration) -> Result<()> {
/// Sign a url with a shared access signature (SAS).
pub(crate) fn sign(
&self,
method: Method,
url: &mut Url,
start: &DateTime<Utc>,
end: &DateTime<Utc>,
) -> Result<()> {
if let Some(delegation_key) = self.delegation_key {
let (str_to_sign, query_pairs) = string_to_sign_user_delegation_sas(
url,
&method,
self.account,
start,
end,
delegation_key,
);
let signing_key = AzureAccessKey::try_new(&delegation_key.value)?;
let auth = hmac_sha256(signing_key.0, str_to_sign);
url.query_pairs_mut().extend_pairs(query_pairs);
url.query_pairs_mut()
.append_pair("sig", BASE64_STANDARD.encode(auth).as_str());
return Ok(());
}
match self.credential {
AzureCredential::AccessKey(key) => {
let (str_to_sign, query_pairs) =
string_to_sign_service_sas(url, &method, self.account, expires_in);
string_to_sign_service_sas(url, &method, self.account, start, end);
let auth = hmac_sha256(&key.0, str_to_sign);
url.query_pairs_mut().extend_pairs(query_pairs);
url.query_pairs_mut()
.append_pair("sig", BASE64_STANDARD.encode(auth).as_str());
}
AzureCredential::BearerToken(token) => {
todo!()
}
AzureCredential::SASToken(_) => return Err(Error::SASforSASNotSupported),
_ => return Err(Error::SASforSASNotSupported),
};
Ok(())
}
Expand All @@ -229,7 +257,7 @@ impl CredentialExt for RequestBuilder {
let (client, request) = self.build_split();
let mut request = request.expect("request valid");

AzureAuthorizer::new(credential, account).authorize(&mut request);
AzureAuthorizer::new(credential, None, account).authorize(&mut request);

Self::from_parts(client, request)
}
Expand Down Expand Up @@ -258,12 +286,13 @@ fn add_if_exists<'a>(h: &'a HeaderMap, key: &HeaderName) -> &'a str {
.unwrap_or_default()
}

fn string_to_sign_service_sas(
fn string_to_sign_sas(
u: &Url,
method: &Method,
account: &str,
expires_in: Duration,
) -> (String, HashMap<&'static str, String>) {
start: &DateTime<Utc>,
end: &DateTime<Utc>,
) -> (String, String, String, String, String) {
let signed_resource = if u
.query()
.map(|q| q.contains("comp=list"))
Expand Down Expand Up @@ -291,8 +320,8 @@ fn string_to_sign_service_sas(
_ => "",
}
.to_string();
let signed_start = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true);
let signed_expiry = (Utc::now() + expires_in).to_rfc3339_opts(SecondsFormat::Secs, true);
let signed_start = start.to_rfc3339_opts(SecondsFormat::Secs, true);
let signed_expiry = end.to_rfc3339_opts(SecondsFormat::Secs, true);
let canonicalized_resource = if u.host_str().unwrap_or_default().contains(account) {
format!("/blob/{}{}", account, u.path())
} else {
Expand All @@ -301,6 +330,25 @@ fn string_to_sign_service_sas(
format!("/blob{}", u.path())
};

(
signed_resource,
signed_permissions,
signed_start,
signed_expiry,
canonicalized_resource,
)
}

fn string_to_sign_service_sas(
u: &Url,
method: &Method,
account: &str,
start: &DateTime<Utc>,
end: &DateTime<Utc>,
) -> (String, HashMap<&'static str, String>) {
let (signed_resource, signed_permissions, signed_start, signed_expiry, canonicalized_resource) =
string_to_sign_sas(u, method, account, start, end);

// https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#version-2020-12-06-and-later
let string_to_sign = format!(
"{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
Expand Down Expand Up @@ -332,6 +380,62 @@ fn string_to_sign_service_sas(
(string_to_sign, pairs)
}

fn string_to_sign_user_delegation_sas(
u: &Url,
method: &Method,
account: &str,
start: &DateTime<Utc>,
end: &DateTime<Utc>,
delegation_key: &UserDelegationKey,
) -> (String, HashMap<&'static str, String>) {
let (signed_resource, signed_permissions, signed_start, signed_expiry, canonicalized_resource) =
string_to_sign_sas(u, method, account, start, end);

// https://learn.microsoft.com/en-us/rest/api/storageservices/create-user-delegation-sas#version-2020-12-06-and-later
let string_to_sign = format!(
"{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
signed_permissions,
signed_start,
signed_expiry,
canonicalized_resource,
delegation_key.signed_oid, // signed key object id
delegation_key.signed_tid, // signed key tenant id
delegation_key.signed_start, // signed key start
delegation_key.signed_expiry, // signed key expiry
delegation_key.signed_service, // signed key service
delegation_key.signed_version, // signed key version
"", // signed authorized user object id
"", // signed unauthorized user object id
"", // signed correlation id
"", // signed ip
"", // signed protocol
&AZURE_VERSION.to_str().unwrap(), // signed version
signed_resource, // signed resource
"", // signed snapshot time
"", // signed encryption scope
"", // rscc - response header: Cache-Control
"", // rscd - response header: Content-Disposition
"", // rsce - response header: Content-Encoding
"", // rscl - response header: Content-Language
"", // rsct - response header: Content-Type
);

let mut pairs = HashMap::new();
pairs.insert("sv", AZURE_VERSION.to_str().unwrap().to_string());
pairs.insert("sp", signed_permissions);
pairs.insert("st", signed_start);
pairs.insert("se", signed_expiry);
pairs.insert("sr", signed_resource);
pairs.insert("skoid", delegation_key.signed_oid.clone());
pairs.insert("sktid", delegation_key.signed_tid.clone());
pairs.insert("skt", delegation_key.signed_start.clone());
pairs.insert("ske", delegation_key.signed_expiry.clone());
pairs.insert("sks", delegation_key.signed_service.clone());
pairs.insert("skv", delegation_key.signed_version.clone());

(string_to_sign, pairs)
}

/// <https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key#constructing-the-signature-string>
fn string_to_sign(h: &HeaderMap, u: &Url, method: &Method, account: &str) -> String {
// content length must only be specified if != 0
Expand Down

0 comments on commit 5d68d75

Please sign in to comment.