Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB ConditionalPut #5247

Merged
merged 5 commits into from Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/object_store.yml
Expand Up @@ -113,6 +113,7 @@ jobs:
AWS_ENDPOINT: http://localhost:4566
AWS_ALLOW_HTTP: true
AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:2000
AWS_CONDITIONAL_PUT: dynamo:test-table:2000
HTTP_URL: "http://localhost:8080"
GOOGLE_BUCKET: test-bucket
GOOGLE_SERVICE_ACCOUNT: "/tmp/gcs.json"
Expand All @@ -137,7 +138,7 @@ jobs:
docker run -d -p 4566:4566 localstack/localstack:3.0.1
docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=key,KeyType=HASH --attribute-definitions AttributeName=key,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

- name: Configure Azurite (Azure emulation)
# the magical connection string is from
Expand Down
152 changes: 120 additions & 32 deletions object_store/src/aws/dynamo.rs
Expand Up @@ -17,7 +17,9 @@

//! A DynamoDB based lock system

use std::borrow::Cow;
use std::collections::HashMap;
use std::future::Future;
use std::time::{Duration, Instant};

use chrono::Utc;
Expand Down Expand Up @@ -61,16 +63,24 @@ const STORE: &str = "DynamoDB";
///
/// The DynamoDB schema is as follows:
///
/// * A string hash key named `"key"`
/// * A string partition key named `"path"`
/// * A string sort key named `"etag"`
Comment on lines +66 to +67
Copy link
Contributor Author

@tustvold tustvold Dec 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change, but given #4918 was only merged this morning and hasn't been released I think that is fine.

Before I had planned to encode both the path and the etag in the partition key, as was documented here, but this requires a delimiter, which after #5020 can't be #. This new approach avoids this, whilst also being I think easier to understand.

/// * A numeric [TTL] attribute named `"ttl"`
/// * A numeric attribute named `"generation"`
/// * A numeric attribute named `"timeout"`
///
/// To perform a conditional operation on an object with a given `path` and `etag` (if exists),
/// An appropriate DynamoDB table can be created with the CLI as follows:
///
/// ```bash
/// $ aws dynamodb create-table --table-name <TABLE_NAME> --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S
/// $ aws dynamodb update-time-to-live --table-name <TABLE_NAME> --time-to-live-specification Enabled=true,AttributeName=ttl
/// ```
///
/// To perform a conditional operation on an object with a given `path` and `etag` (`*` if creating),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DynamoDB doesn't let you use an empty string for a key attribute, so we use *, which is an illegal ETAG value (as If-Match wildcards would then be ambiguous)

/// the commit protocol is as follows:
///
/// 1. Perform HEAD request on `path` and error on precondition mismatch
/// 2. Create record in DynamoDB with key `{path}#{etag}` with the configured timeout
/// 2. Create record in DynamoDB with given `path` and `etag` with the configured timeout
/// 1. On Success: Perform operation with the configured timeout
/// 2. On Conflict:
/// 1. Periodically re-perform HEAD request on `path` and error on precondition mismatch
Expand Down Expand Up @@ -154,6 +164,16 @@ impl DynamoCommit {
self
}

/// Parse [`DynamoCommit`] from a string
pub(crate) fn from_str(value: &str) -> Option<Self> {
Some(match value.split_once(':') {
Some((table_name, timeout)) => {
DynamoCommit::new(table_name.trim().to_string()).with_timeout(timeout.parse().ok()?)
}
None => DynamoCommit::new(value.trim().to_string()),
})
}

/// Returns the name of the DynamoDB table.
pub(crate) fn table_name(&self) -> &str {
&self.table_name
Expand All @@ -165,23 +185,40 @@ impl DynamoCommit {
from: &Path,
to: &Path,
) -> Result<()> {
check_not_exists(client, to).await?;
self.conditional_op(client, to, None, || async {
client.copy_request(from, to).send().await?;
Ok(())
})
.await
}

pub(crate) async fn conditional_op<F, Fut, T>(
&self,
client: &S3Client,
to: &Path,
etag: Option<&str>,
op: F,
) -> Result<T>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<T, Error>>,
{
check_precondition(client, to, etag).await?;

let mut previous_lease = None;

loop {
let existing = previous_lease.as_ref();
match self.try_lock(client, to.as_ref(), existing).await? {
match self.try_lock(client, to.as_ref(), etag, existing).await? {
TryLockResult::Ok(lease) => {
let fut = client.copy_request(from, to).send();
let expiry = lease.acquire + lease.timeout;
return match tokio::time::timeout_at(expiry.into(), fut).await {
Ok(Ok(_)) => Ok(()),
return match tokio::time::timeout_at(expiry.into(), op()).await {
Ok(Ok(v)) => Ok(v),
Ok(Err(e)) => Err(e.into()),
Err(_) => Err(Error::Generic {
store: "DynamoDB",
source: format!(
"Failed to perform copy operation in {} milliseconds",
"Failed to perform conditional operation in {} milliseconds",
self.timeout
)
.into(),
Expand All @@ -193,7 +230,7 @@ impl DynamoCommit {
let expiry = conflict.timeout * self.max_clock_skew_rate;
loop {
interval.tick().await;
check_not_exists(client, to).await?;
check_precondition(client, to, etag).await?;
if conflict.acquire.elapsed() > expiry {
previous_lease = Some(conflict);
break;
Expand All @@ -205,8 +242,11 @@ impl DynamoCommit {
}

/// Retrieve a lock, returning an error if it doesn't exist
async fn get_lock(&self, s3: &S3Client, key: &str) -> Result<Lease> {
let key_attributes = [("key", AttributeValue::String(key))];
async fn get_lock(&self, s3: &S3Client, path: &str, etag: Option<&str>) -> Result<Lease> {
let key_attributes = [
("path", AttributeValue::from(path)),
("etag", AttributeValue::from(etag.unwrap_or("*"))),
];
let req = GetItem {
table_name: &self.table_name,
key: Map(&key_attributes),
Expand All @@ -216,7 +256,7 @@ impl DynamoCommit {
let resp = self
.request(s3, credential.as_deref(), "DynamoDB_20120810.GetItem", req)
.await
.map_err(|e| e.error(STORE, key.to_string()))?;
.map_err(|e| e.error(STORE, path.to_string()))?;

let body = resp.bytes().await.map_err(|e| Error::Generic {
store: STORE,
Expand All @@ -230,7 +270,7 @@ impl DynamoCommit {
})?;

extract_lease(&response.item).ok_or_else(|| Error::NotFound {
path: key.into(),
path: path.into(),
source: "DynamoDB GetItem returned no items".to_string().into(),
})
}
Expand All @@ -239,7 +279,8 @@ impl DynamoCommit {
async fn try_lock(
&self,
s3: &S3Client,
key: &str,
path: &str,
etag: Option<&str>,
existing: Option<&Lease>,
) -> Result<TryLockResult> {
let attributes;
Expand All @@ -257,12 +298,13 @@ impl DynamoCommit {

let ttl = (Utc::now() + self.ttl).timestamp();
let items = [
("key", AttributeValue::String(key)),
("path", AttributeValue::from(path)),
("etag", AttributeValue::from(etag.unwrap_or("*"))),
("generation", AttributeValue::Number(next_gen)),
("timeout", AttributeValue::Number(self.timeout)),
("ttl", AttributeValue::Number(ttl as _)),
];
let names = [("#pk", "key")];
let names = [("#pk", "path")];

let req = PutItem {
table_name: &self.table_name,
Expand Down Expand Up @@ -302,7 +344,9 @@ impl DynamoCommit {
// <https://aws.amazon.com/about-aws/whats-new/2023/06/amazon-dynamodb-cost-failed-conditional-writes/>
// <https://repost.aws/questions/QUNfADrK4RT6WHe61RzTK8aw/dynamodblocal-support-for-returnvaluesonconditioncheckfailure-for-single-write-operations>
// <https://github.com/localstack/localstack/issues/9040>
None => Ok(TryLockResult::Conflict(self.get_lock(s3, key).await?)),
None => Ok(TryLockResult::Conflict(
self.get_lock(s3, path, etag).await?,
)),
},
_ => Err(Error::Generic {
store: STORE,
Expand Down Expand Up @@ -347,19 +391,37 @@ enum TryLockResult {
Conflict(Lease),
}

/// Returns an [`Error::AlreadyExists`] if `path` exists
async fn check_not_exists(client: &S3Client, path: &Path) -> Result<()> {
/// Validates that `path` has the given `etag` or doesn't exist if `None`
async fn check_precondition(client: &S3Client, path: &Path, etag: Option<&str>) -> Result<()> {
let options = GetOptions {
head: true,
..Default::default()
};
match client.get_opts(path, options).await {
Ok(_) => Err(Error::AlreadyExists {
path: path.to_string(),
source: "Already Exists".to_string().into(),
}),
Err(Error::NotFound { .. }) => Ok(()),
Err(e) => Err(e),

match etag {
Some(expected) => match client.get_opts(path, options).await {
Ok(r) => match r.meta.e_tag {
Some(actual) if expected == actual => Ok(()),
actual => Err(Error::Precondition {
path: path.to_string(),
source: format!("{} does not match {expected}", actual.unwrap_or_default())
.into(),
}),
},
Err(Error::NotFound { .. }) => Err(Error::Precondition {
path: path.to_string(),
source: format!("Object at location {path} not found").into(),
}),
Err(e) => Err(e),
},
None => match client.get_opts(path, options).await {
Ok(_) => Err(Error::AlreadyExists {
path: path.to_string(),
source: "Already Exists".to_string().into(),
}),
Err(Error::NotFound { .. }) => Ok(()),
Err(e) => Err(e),
},
}
}

Expand Down Expand Up @@ -493,11 +555,17 @@ impl<'a, K: Serialize, V: Serialize> Serialize for Map<'a, K, V> {
#[derive(Debug, Serialize, Deserialize)]
enum AttributeValue<'a> {
#[serde(rename = "S")]
String(&'a str),
String(Cow<'a, str>),
#[serde(rename = "N", with = "number")]
Number(u64),
}

impl<'a> From<&'a str> for AttributeValue<'a> {
fn from(value: &'a str) -> Self {
Self::String(Cow::Borrowed(value))
}
}

/// Numbers are serialized as strings
mod number {
use serde::{Deserialize, Deserializer, Serializer};
Expand All @@ -518,10 +586,11 @@ pub(crate) use tests::integration_test;

#[cfg(test)]
mod tests {

use super::*;
use crate::aws::AmazonS3;
use crate::ObjectStore;
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};

#[test]
fn test_attribute_serde() {
Expand All @@ -544,24 +613,43 @@ mod tests {
let _ = integration.delete(&dst).await; // Delete if present

// Create a lock if not already exists
let existing = match d.try_lock(client, dst.as_ref(), None).await.unwrap() {
let existing = match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() {
TryLockResult::Conflict(l) => l,
TryLockResult::Ok(l) => l,
};

// Should not be able to acquire a lock again
let r = d.try_lock(client, dst.as_ref(), None).await;
let r = d.try_lock(client, dst.as_ref(), None, None).await;
assert!(matches!(r, Ok(TryLockResult::Conflict(_))));

// But should still be able to reclaim lock and perform copy
d.copy_if_not_exists(client, &src, &dst).await.unwrap();

match d.try_lock(client, dst.as_ref(), None).await.unwrap() {
match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() {
TryLockResult::Conflict(new) => {
// Should have incremented generation to do so
assert_eq!(new.generation, existing.generation + 1);
}
_ => panic!("Should conflict"),
}

let rng = thread_rng();
let etag = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
let t = Some(etag.as_str());

let l = match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() {
TryLockResult::Ok(l) => l,
_ => panic!("should not conflict"),
};

match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() {
TryLockResult::Conflict(c) => assert_eq!(l.generation, c.generation),
_ => panic!("should conflict"),
}

match d.try_lock(client, dst.as_ref(), t, Some(&l)).await.unwrap() {
TryLockResult::Ok(new) => assert_eq!(new.generation, l.generation + 1),
_ => panic!("should not conflict"),
}
}
}
18 changes: 16 additions & 2 deletions object_store/src/aws/mod.rs
Expand Up @@ -187,12 +187,26 @@ impl ObjectStore for AmazonS3 {
r => r,
}
}
(PutMode::Update(v), Some(S3ConditionalPut::ETagMatch)) => {
(PutMode::Create, Some(S3ConditionalPut::Dynamo(d))) => {
d.conditional_op(&self.client, location, None, move || request.do_put())
.await
}
(PutMode::Update(v), Some(put)) => {
let etag = v.e_tag.ok_or_else(|| Error::Generic {
store: STORE,
source: "ETag required for conditional put".to_string().into(),
})?;
request.header(&IF_MATCH, etag.as_str()).do_put().await
match put {
S3ConditionalPut::ETagMatch => {
request.header(&IF_MATCH, etag.as_str()).do_put().await
}
S3ConditionalPut::Dynamo(d) => {
d.conditional_op(&self.client, location, Some(&etag), move || {
request.do_put()
})
.await
}
}
}
}
}
Expand Down