Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: s3 sns notification #6838

Merged
merged 4 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/services/s3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ s3
The configuration can be persisted, but at the moment we only send notifications to the following targets:

- AWSLambda
- SNS
- SQS

For the following events:
Expand Down
1 change: 1 addition & 0 deletions moto/s3/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2202,6 +2202,7 @@ def put_bucket_notification_configuration(
The configuration can be persisted, but at the moment we only send notifications to the following targets:

- AWSLambda
- SNS
- SQS

For the following events:
Expand Down
30 changes: 30 additions & 0 deletions moto/s3/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ def send_event(account_id: str, event_name: str, bucket: Any, key: Any) -> None:

_send_sqs_message(account_id, event_body, queue_name, region_name)

for notification in bucket.notification_configuration.topic:
if notification.matches(event_name, key.name):
event_body = _get_s3_event(event_name, bucket, key, notification.id)
region_name = _get_region_from_arn(notification.arn)
topic_arn = notification.arn

_send_sns_message(account_id, event_body, topic_arn, region_name)


def _send_sqs_message(
account_id: str, event_body: Any, queue_name: str, region_name: str
Expand All @@ -79,6 +87,22 @@ def _send_sqs_message(
pass


def _send_sns_message(
account_id: str, event_body: Any, topic_arn: str, region_name: str
) -> None:
try:
from moto.sns.models import sns_backends

sns_backend = sns_backends[account_id][region_name]
sns_backend.publish(arn=topic_arn, message=json.dumps(event_body))
except: # noqa
# This is an async action in AWS.
# Even if this part fails, the calling function should pass, so catch all errors
# Possible exceptions that could be thrown:
# - Topic does not exist
pass


def _invoke_awslambda(
account_id: str, event_body: Any, fn_arn: str, region_name: str
) -> None:
Expand Down Expand Up @@ -113,3 +137,9 @@ def send_test_event(account_id: str, bucket: Any) -> None:
queue_name = arn.split(":")[-1]
message_body = _get_test_event(bucket.name)
_send_sqs_message(account_id, message_body, queue_name, region_name)

arns = [n.arn for n in bucket.notification_configuration.topic]
for arn in set(arns):
region_name = _get_region_from_arn(arn)
message_body = _get_test_event(bucket.name)
_send_sns_message(account_id, message_body, arn, region_name)
100 changes: 99 additions & 1 deletion tests/test_s3/test_s3_lambda_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import boto3
import pytest

from moto import mock_lambda, mock_logs, mock_s3, mock_sqs
from moto import mock_lambda, mock_logs, mock_s3, mock_sns, mock_sqs
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from tests.markers import requires_docker
from tests.test_awslambda.utilities import (
Expand Down Expand Up @@ -285,3 +285,101 @@ def test_object_put__sends_to_queue__using_filter():
messages = queue.receive_messages()
assert not messages
_ = [m.delete() for m in messages]


@mock_s3
@mock_sns
@mock_sqs
def test_put_bucket_notification_sns_sqs():
s3_client = boto3.client("s3", region_name=REGION_NAME)
s3_client.create_bucket(Bucket="bucket")

sqs_client = boto3.client("sqs", region_name=REGION_NAME)
sqs_queue = sqs_client.create_queue(QueueName="queue")
sqs_queue_arn = sqs_client.get_queue_attributes(
QueueUrl=sqs_queue["QueueUrl"], AttributeNames=["QueueArn"]
)

sns_client = boto3.client("sns", region_name=REGION_NAME)
sns_topic = sns_client.create_topic(Name="topic")

# Subscribe SQS queue to SNS topic
sns_client.subscribe(
TopicArn=sns_topic["TopicArn"],
Protocol="sqs",
Endpoint=sqs_queue_arn["Attributes"]["QueueArn"],
)

# Set S3 to send ObjectCreated to SNS
s3_client.put_bucket_notification_configuration(
Bucket="bucket",
NotificationConfiguration={
"TopicConfigurations": [
{
"Id": "SomeID",
"TopicArn": sns_topic["TopicArn"],
"Events": ["s3:ObjectCreated:*"],
}
]
},
)

# We should receive a test message
messages = sqs_client.receive_message(
QueueUrl=sqs_queue["QueueUrl"], MaxNumberOfMessages=10
)
assert len(messages["Messages"]) == 1

sqs_client.delete_message(
QueueUrl=sqs_queue["QueueUrl"],
ReceiptHandle=messages["Messages"][0]["ReceiptHandle"],
)

message_body = messages["Messages"][0]["Body"]
sns_message = json.loads(message_body)
assert sns_message["Type"] == "Notification"

# Get S3 notification from SNS message
s3_message_body = json.loads(sns_message["Message"])
assert s3_message_body["Event"] == "s3:TestEvent"

# Upload file to trigger notification
s3_client.put_object(Bucket="bucket", Key="myfile", Body=b"asdf1324")

# Verify queue not empty
messages = sqs_client.receive_message(
QueueUrl=sqs_queue["QueueUrl"], MaxNumberOfMessages=10
)
assert len(messages["Messages"]) == 1

# Get SNS message from SQS
message_body = messages["Messages"][0]["Body"]
sns_message = json.loads(message_body)
assert sns_message["Type"] == "Notification"

# Get S3 notification from SNS message
s3_message_body = json.loads(sns_message["Message"])
assert s3_message_body["Records"][0]["eventName"] == "ObjectCreated:Put"


@mock_s3
def test_put_bucket_notification_sns_error():
s3_client = boto3.client("s3", region_name=REGION_NAME)
s3_client.create_bucket(Bucket="bucket")

# Set S3 to send ObjectCreated to SNS
s3_client.put_bucket_notification_configuration(
Bucket="bucket",
NotificationConfiguration={
"TopicConfigurations": [
{
"Id": "SomeID",
"TopicArn": "arn:aws:sns:us-east-1:012345678910:notexistingtopic",
"Events": ["s3:ObjectCreated:*"],
}
]
},
)

# This should not throw an exception
s3_client.put_object(Bucket="bucket", Key="myfile", Body=b"asdf1324")