Skip to content

Commit

Permalink
S3: Adding notification for eventbridge (#7252)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsugumi-sys committed Feb 16, 2024
1 parent 59248f3 commit b9d7c20
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 1 deletion.
7 changes: 6 additions & 1 deletion moto/s3/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,7 @@ def __init__(
topic: Optional[List[Dict[str, Any]]] = None,
queue: Optional[List[Dict[str, Any]]] = None,
cloud_function: Optional[List[Dict[str, Any]]] = None,
event_bridge: Optional[Dict[str, Any]] = None,
):
self.topic = (
[
Expand Down Expand Up @@ -923,6 +924,7 @@ def __init__(
if cloud_function
else []
)
self.event_bridge = event_bridge

def to_config_dict(self) -> Dict[str, Any]:
data: Dict[str, Any] = {"configurations": {}}
Expand All @@ -945,6 +947,8 @@ def to_config_dict(self) -> Dict[str, Any]:
cf_config["type"] = "LambdaConfiguration"
data["configurations"][cloud_function.id] = cf_config

if self.event_bridge is not None:
data["configurations"]["EventBridgeConfiguration"] = self.event_bridge
return data


Expand Down Expand Up @@ -1325,6 +1329,7 @@ def set_notification_configuration(
topic=notification_config.get("TopicConfiguration"),
queue=notification_config.get("QueueConfiguration"),
cloud_function=notification_config.get("CloudFunctionConfiguration"),
event_bridge=notification_config.get("EventBridgeConfiguration"),
)

# Validate that the region is correct:
Expand Down Expand Up @@ -2315,9 +2320,9 @@ def put_bucket_notification_configuration(
- AWSLambda
- SNS
- SQS
- EventBridge
For the following events:
- 's3:ObjectCreated:Copy'
- 's3:ObjectCreated:Put'
"""
Expand Down
98 changes: 98 additions & 0 deletions moto/s3/notifications.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import copy
import json
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List

from moto.core.utils import unix_time

_EVENT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"


Expand Down Expand Up @@ -122,6 +125,9 @@ def send_event(

_send_sns_message(account_id, event_body, topic_arn, region_name)

if bucket.notification_configuration.event_bridge is not None:
_send_event_bridge_message(account_id, bucket, event_name, key)


def _send_sqs_message(
account_id: str, event_body: Any, queue_name: str, region_name: str
Expand Down Expand Up @@ -157,6 +163,98 @@ def _send_sns_message(
pass


def _send_event_bridge_message(
account_id: str,
bucket: Any,
event_name: str,
key: Any,
) -> None:
try:
from moto.events.models import events_backends
from moto.events.utils import _BASE_EVENT_MESSAGE

event = copy.deepcopy(_BASE_EVENT_MESSAGE)
event["detail-type"] = _detail_type(event_name)
event["source"] = "aws.s3"
event["account"] = account_id
event["time"] = unix_time()
event["region"] = bucket.region_name
event["resources"] = [f"arn:aws:s3:::{bucket.name}"]
event["detail"] = {
"version": "0",
"bucket": {"name": bucket.name},
"object": {
"key": key.name,
"size": key.size,
"eTag": key.etag.replace('"', ""),
"version-id": key.version_id,
"sequencer": "617f08299329d189",
},
"request-id": "N4N7GDK58NMKJ12R",
"requester": "123456789012",
"source-ip-address": "1.2.3.4",
# ex) s3:ObjectCreated:Put -> ObjectCreated
"reason": event_name.split(":")[1],
}

events_backend = events_backends[account_id][bucket.region_name]
for event_bus in events_backend.event_buses.values():
for rule in event_bus.rules.values():
rule.send_to_targets(event)

except: # noqa
# This is an async action in AWS.
# Even if this part fails, the calling function should pass, so catch all errors
# Possible exceptions that could be thrown:
# - EventBridge does not exist
pass


def _detail_type(event_name: str) -> str:
"""Detail type field values for event messages of s3 EventBridge notification
document: https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html
"""
if event_name in [e for e in S3NotificationEvent.events() if "ObjectCreated" in e]:
return "Object Created"
elif event_name in [
e
for e in S3NotificationEvent.events()
if "ObjectRemoved" in e or "LifecycleExpiration" in e
]:
return "Object Deleted"
elif event_name in [
e for e in S3NotificationEvent.events() if "ObjectRestore" in e
]:
if event_name == S3NotificationEvent.OBJECT_RESTORE_POST_EVENT:
return "Object Restore Initiated"
elif event_name == S3NotificationEvent.OBJECT_RESTORE_COMPLETED_EVENT:
return "Object Restore Completed"
else:
# s3:ObjectRestore:Delete event
return "Object Restore Expired"
elif event_name in [
e for e in S3NotificationEvent.events() if "LifecycleTransition" in e
]:
return "Object Storage Class Changed"
elif event_name in [
e for e in S3NotificationEvent.events() if "IntelligentTiering" in e
]:
return "Object Access Tier Changed"
elif event_name in [e for e in S3NotificationEvent.events() if "ObjectAcl" in e]:
return "Object ACL Updated"
elif event_name in [e for e in S3NotificationEvent.events() if "ObjectTagging"]:
if event_name == S3NotificationEvent.OBJECT_TAGGING_PUT_EVENT:
return "Object Tags Added"
else:
# s3:ObjectTagging:Delete event
return "Object Tags Deleted"
else:
raise ValueError(
f"unsupported event `{event_name}` for s3 eventbridge notification (https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html)"
)


def _invoke_awslambda(
account_id: str, event_body: Any, fn_arn: str, region_name: str
) -> None:
Expand Down
7 changes: 7 additions & 0 deletions moto/s3/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -2092,12 +2092,19 @@ def _notification_config_from_body(self) -> Dict[str, Any]:
("Topic", "sns"),
("Queue", "sqs"),
("CloudFunction", "lambda"),
("EventBridge", "events"),
]

found_notifications = (
0 # Tripwire -- if this is not ever set, then there were no notifications
)
for name, arn_string in notification_fields:
# EventBridgeConfiguration is passed as an empty dict.
if name == "EventBridge":
events_field = f"{name}Configuration"
if events_field in parsed_xml["NotificationConfiguration"]:
parsed_xml["NotificationConfiguration"][events_field] = {}
found_notifications += 1
# 1st verify that the proper notification configuration has been passed in (with an ARN that is close
# to being correct -- nothing too complex in the ARN logic):
the_notification = parsed_xml["NotificationConfiguration"].get(
Expand Down
2 changes: 2 additions & 0 deletions tests/test_s3/test_s3_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ def test_s3_notification_config_dict():
},
}
],
"EventBridgeConfiguration": {},
}

s3_config_query.backends[DEFAULT_ACCOUNT_ID][
Expand Down Expand Up @@ -389,6 +390,7 @@ def test_s3_notification_config_dict():
"queueARN": "arn:aws:lambda:us-west-2:012345678910:function:mylambda",
"type": "LambdaConfiguration",
},
"EventBridgeConfiguration": {},
}
}

Expand Down
59 changes: 59 additions & 0 deletions tests/test_s3/test_s3_eventbridge_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import json
from uuid import uuid4

import boto3

from moto import mock_aws
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID

REGION_NAME = "us-east-1"


@mock_aws
def test_pub_object_notification():
s3_res = boto3.resource("s3", region_name=REGION_NAME)
s3_client = boto3.client("s3", region_name=REGION_NAME)
events_client = boto3.client("events", region_name=REGION_NAME)
logs_client = boto3.client("logs", region_name=REGION_NAME)

rule_name = "test-rule"
events_client.put_rule(
Name=rule_name, EventPattern=json.dumps({"account": [ACCOUNT_ID]})
)
log_group_name = "/test-group"
logs_client.create_log_group(logGroupName=log_group_name)
events_client.put_targets(
Rule=rule_name,
Targets=[
{
"Id": "test",
"Arn": f"arn:aws:logs:{REGION_NAME}:{ACCOUNT_ID}:log-group:{log_group_name}",
}
],
)

# Create S3 bucket
bucket_name = str(uuid4())
s3_res.create_bucket(Bucket=bucket_name)

# Put Notification
s3_client.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={"EventBridgeConfiguration": {}},
)

# Put Object
s3_client.put_object(Bucket=bucket_name, Key="keyname", Body="bodyofnewobject")

events = sorted(
logs_client.filter_log_events(logGroupName=log_group_name)["events"],
key=lambda item: item["eventId"],
)
assert len(events) == 1
event_message = json.loads(events[0]["message"])
assert event_message["detail-type"] == "Object Created"
assert event_message["source"] == "aws.s3"
assert event_message["account"] == ACCOUNT_ID
assert event_message["region"] == REGION_NAME
assert event_message["detail"]["bucket"]["name"] == bucket_name
assert event_message["detail"]["reason"] == "ObjectCreated"
135 changes: 135 additions & 0 deletions tests/test_s3/test_s3_notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import json
from typing import List
from unittest import SkipTest
from uuid import uuid4

import boto3
import pytest

from moto import mock_aws, settings
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from moto.s3.models import FakeBucket, FakeKey
from moto.s3.notifications import (
S3NotificationEvent,
_detail_type,
_send_event_bridge_message,
)

REGION_NAME = "us-east-1"


@pytest.mark.parametrize(
"event_names, expected_event_message",
[
(
[
S3NotificationEvent.OBJECT_CREATED_PUT_EVENT,
S3NotificationEvent.OBJECT_CREATED_POST_EVENT,
S3NotificationEvent.OBJECT_CREATED_COPY_EVENT,
S3NotificationEvent.OBJECT_CREATED_COMPLETE_MULTIPART_UPLOAD_EVENT,
],
"Object Created",
),
(
[
S3NotificationEvent.OBJECT_REMOVED_DELETE_EVENT,
S3NotificationEvent.OBJECT_REMOVED_DELETE_MARKER_CREATED_EVENT,
],
"Object Deleted",
),
([S3NotificationEvent.OBJECT_RESTORE_POST_EVENT], "Object Restore Initiated"),
(
[S3NotificationEvent.OBJECT_RESTORE_COMPLETED_EVENT],
"Object Restore Completed",
),
(
[S3NotificationEvent.OBJECT_RESTORE_DELETE_EVENT],
"Object Restore Expired",
),
(
[S3NotificationEvent.LIFECYCLE_TRANSITION_EVENT],
"Object Storage Class Changed",
),
([S3NotificationEvent.INTELLIGENT_TIERING_EVENT], "Object Access Tier Changed"),
([S3NotificationEvent.OBJECT_ACL_EVENT], "Object ACL Updated"),
([S3NotificationEvent.OBJECT_TAGGING_PUT_EVENT], "Object Tags Added"),
([S3NotificationEvent.OBJECT_TAGGING_DELETE_EVENT], "Object Tags Deleted"),
],
)
def test_detail_type(event_names: List[str], expected_event_message: str):
for event_name in event_names:
assert _detail_type(event_name) == expected_event_message


def test_detail_type_unknown_event():
with pytest.raises(ValueError) as ex:
_detail_type("unknown event")
assert (
str(ex.value)
== "unsupported event `unknown event` for s3 eventbridge notification (https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html)"
)


@mock_aws
def test_send_event_bridge_message():
# setup mocks
events_client = boto3.client("events", region_name=REGION_NAME)
logs_client = boto3.client("logs", region_name=REGION_NAME)
rule_name = "test-rule"
events_client.put_rule(
Name=rule_name, EventPattern=json.dumps({"account": [ACCOUNT_ID]})
)
log_group_name = "/test-group"
logs_client.create_log_group(logGroupName=log_group_name)
mocked_bucket = FakeBucket(str(uuid4()), ACCOUNT_ID, REGION_NAME)
mocked_key = FakeKey(
"test-key", bytes("test content", encoding="utf-8"), ACCOUNT_ID
)

# do nothing if event target does not exists.
_send_event_bridge_message(
ACCOUNT_ID,
mocked_bucket,
S3NotificationEvent.OBJECT_CREATED_PUT_EVENT,
mocked_key,
)
assert (
len(logs_client.filter_log_events(logGroupName=log_group_name)["events"]) == 0
)

# do nothing even if an error is raised while sending events.
events_client.put_targets(
Rule=rule_name,
Targets=[
{
"Id": "test",
"Arn": f"arn:aws:logs:{REGION_NAME}:{ACCOUNT_ID}:log-group:{log_group_name}",
}
],
)

_send_event_bridge_message(ACCOUNT_ID, mocked_bucket, "unknown-event", mocked_key)
assert (
len(logs_client.filter_log_events(logGroupName=log_group_name)["events"]) == 0
)

if not settings.TEST_DECORATOR_MODE:
raise SkipTest(("Doesn't quite work right with the Proxy or Server"))
# an event is correctly sent to the log group.
_send_event_bridge_message(
ACCOUNT_ID,
mocked_bucket,
S3NotificationEvent.OBJECT_CREATED_PUT_EVENT,
mocked_key,
)
events = logs_client.filter_log_events(logGroupName=log_group_name)["events"]
assert len(events) == 1
event_msg = json.loads(events[0]["message"])
assert event_msg["detail-type"] == "Object Created"
assert event_msg["source"] == "aws.s3"
assert event_msg["region"] == REGION_NAME
assert event_msg["resources"] == [f"arn:aws:s3:::{mocked_bucket.name}"]
event_detail = event_msg["detail"]
assert event_detail["bucket"] == {"name": mocked_bucket.name}
assert event_detail["object"]["key"] == mocked_key.name
assert event_detail["reason"] == "ObjectCreated"

0 comments on commit b9d7c20

Please sign in to comment.