Skip to content

Commit

Permalink
Feature: s3 sns notification (getmoto#6838)
Browse files Browse the repository at this point in the history
  • Loading branch information
maederm authored and toshyak committed Oct 26, 2023
1 parent ba19259 commit 45578da
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/docs/services/s3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ s3
The configuration can be persisted, but at the moment we only send notifications to the following targets:

- AWSLambda
- SNS
- SQS

For the following events:
Expand Down
1 change: 1 addition & 0 deletions moto/s3/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2202,6 +2202,7 @@ def put_bucket_notification_configuration(
The configuration can be persisted, but at the moment we only send notifications to the following targets:
- AWSLambda
- SNS
- SQS
For the following events:
Expand Down
30 changes: 30 additions & 0 deletions moto/s3/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ def send_event(account_id: str, event_name: str, bucket: Any, key: Any) -> None:

_send_sqs_message(account_id, event_body, queue_name, region_name)

for notification in bucket.notification_configuration.topic:
if notification.matches(event_name, key.name):
event_body = _get_s3_event(event_name, bucket, key, notification.id)
region_name = _get_region_from_arn(notification.arn)
topic_arn = notification.arn

_send_sns_message(account_id, event_body, topic_arn, region_name)


def _send_sqs_message(
account_id: str, event_body: Any, queue_name: str, region_name: str
Expand All @@ -79,6 +87,22 @@ def _send_sqs_message(
pass


def _send_sns_message(
account_id: str, event_body: Any, topic_arn: str, region_name: str
) -> None:
try:
from moto.sns.models import sns_backends

sns_backend = sns_backends[account_id][region_name]
sns_backend.publish(arn=topic_arn, message=json.dumps(event_body))
except: # noqa
# This is an async action in AWS.
# Even if this part fails, the calling function should pass, so catch all errors
# Possible exceptions that could be thrown:
# - Topic does not exist
pass


def _invoke_awslambda(
account_id: str, event_body: Any, fn_arn: str, region_name: str
) -> None:
Expand Down Expand Up @@ -113,3 +137,9 @@ def send_test_event(account_id: str, bucket: Any) -> None:
queue_name = arn.split(":")[-1]
message_body = _get_test_event(bucket.name)
_send_sqs_message(account_id, message_body, queue_name, region_name)

arns = [n.arn for n in bucket.notification_configuration.topic]
for arn in set(arns):
region_name = _get_region_from_arn(arn)
message_body = _get_test_event(bucket.name)
_send_sns_message(account_id, message_body, arn, region_name)
100 changes: 99 additions & 1 deletion tests/test_s3/test_s3_lambda_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import boto3
import pytest

from moto import mock_lambda, mock_logs, mock_s3, mock_sqs
from moto import mock_lambda, mock_logs, mock_s3, mock_sns, mock_sqs
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from tests.markers import requires_docker
from tests.test_awslambda.utilities import (
Expand Down Expand Up @@ -285,3 +285,101 @@ def test_object_put__sends_to_queue__using_filter():
messages = queue.receive_messages()
assert not messages
_ = [m.delete() for m in messages]


@mock_s3
@mock_sns
@mock_sqs
def test_put_bucket_notification_sns_sqs():
s3_client = boto3.client("s3", region_name=REGION_NAME)
s3_client.create_bucket(Bucket="bucket")

sqs_client = boto3.client("sqs", region_name=REGION_NAME)
sqs_queue = sqs_client.create_queue(QueueName="queue")
sqs_queue_arn = sqs_client.get_queue_attributes(
QueueUrl=sqs_queue["QueueUrl"], AttributeNames=["QueueArn"]
)

sns_client = boto3.client("sns", region_name=REGION_NAME)
sns_topic = sns_client.create_topic(Name="topic")

# Subscribe SQS queue to SNS topic
sns_client.subscribe(
TopicArn=sns_topic["TopicArn"],
Protocol="sqs",
Endpoint=sqs_queue_arn["Attributes"]["QueueArn"],
)

# Set S3 to send ObjectCreated to SNS
s3_client.put_bucket_notification_configuration(
Bucket="bucket",
NotificationConfiguration={
"TopicConfigurations": [
{
"Id": "SomeID",
"TopicArn": sns_topic["TopicArn"],
"Events": ["s3:ObjectCreated:*"],
}
]
},
)

# We should receive a test message
messages = sqs_client.receive_message(
QueueUrl=sqs_queue["QueueUrl"], MaxNumberOfMessages=10
)
assert len(messages["Messages"]) == 1

sqs_client.delete_message(
QueueUrl=sqs_queue["QueueUrl"],
ReceiptHandle=messages["Messages"][0]["ReceiptHandle"],
)

message_body = messages["Messages"][0]["Body"]
sns_message = json.loads(message_body)
assert sns_message["Type"] == "Notification"

# Get S3 notification from SNS message
s3_message_body = json.loads(sns_message["Message"])
assert s3_message_body["Event"] == "s3:TestEvent"

# Upload file to trigger notification
s3_client.put_object(Bucket="bucket", Key="myfile", Body=b"asdf1324")

# Verify queue not empty
messages = sqs_client.receive_message(
QueueUrl=sqs_queue["QueueUrl"], MaxNumberOfMessages=10
)
assert len(messages["Messages"]) == 1

# Get SNS message from SQS
message_body = messages["Messages"][0]["Body"]
sns_message = json.loads(message_body)
assert sns_message["Type"] == "Notification"

# Get S3 notification from SNS message
s3_message_body = json.loads(sns_message["Message"])
assert s3_message_body["Records"][0]["eventName"] == "ObjectCreated:Put"


@mock_s3
def test_put_bucket_notification_sns_error():
s3_client = boto3.client("s3", region_name=REGION_NAME)
s3_client.create_bucket(Bucket="bucket")

# Set S3 to send ObjectCreated to SNS
s3_client.put_bucket_notification_configuration(
Bucket="bucket",
NotificationConfiguration={
"TopicConfigurations": [
{
"Id": "SomeID",
"TopicArn": "arn:aws:sns:us-east-1:012345678910:notexistingtopic",
"Events": ["s3:ObjectCreated:*"],
}
]
},
)

# This should not throw an exception
s3_client.put_object(Bucket="bucket", Key="myfile", Body=b"asdf1324")

0 comments on commit 45578da

Please sign in to comment.