Skip to content

Commit

Permalink
Add lambda event summary for aws:sqs events
Browse files Browse the repository at this point in the history
  • Loading branch information
zmoog committed Nov 29, 2024
1 parent 276560a commit 0757e7b
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 3 deletions.
76 changes: 75 additions & 1 deletion handlers/aws/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,87 @@ def wrapper(lambda_event: dict[str, Any], lambda_context: context_.Context) -> s
if apm_client:
apm_client.capture_exception()

shared_logger.exception("exception raised", exc_info=e)
shared_logger.exception(
"exception raised",
exc_info=e,
extra={
"summary": summarize_lambda_event(lambda_event),
},
)

return f"exception raised: {e.__repr__()}"

return wrapper


def summarize_lambda_event(event: dict[str, Any], max_records: int = 10) -> dict[str, Any]:
"""
Summarize the lambda event to include only the most relevant information.
"""
summary = {}

try:
records = event.get("Records", [])

for record in records:
event_source = record.get("eventSource", "unknown")

if event_source == "aws:sqs":
aws_sqs_summary = summary.get(
"aws:sqs",
# if the key does not exist, we initialize the summary
{
"total_records": 0,
"records": [],
},
)

# The body contains the SQS message payload which is the S3
# notification event encoded as a JSON string.
event = json_parser(record["body"])

# So users know if we included only a
# subset of the records.
aws_sqs_summary["total_records"] += len(event["Records"])

for r in event["Records"]:
# we only include the s3 object key in the summary.
#
# Here is an example of a record:
#
# {
# "Records": [
# {
# "awsRegion": "eu-west-1",
# "eventName": "ObjectCreated:Put",
# "eventSource": "aws:s3",
# "eventVersion": "2.1",
# "s3": {
# "bucket": {
# "arn": "arn:aws:s3:::mbranca-esf-data",
# "name": "mbranca-esf-data"
# },
# "object": {
# "key": "AWSLogs/1234567890/CloudTrail-Digest/"
# }
# }
# }
# ]
# }

# We limit the number of records to `max_records` to
# avoid large log payloads.
if len(aws_sqs_summary["records"]) < max_records:
aws_sqs_summary["records"].append(r.get("s3"))

summary["aws:sqs"] = aws_sqs_summary

except Exception as e:
shared_logger.exception("error summarizing lambda event", exc_info=e)

return summary


def discover_integration_scope(s3_object_key: str) -> str:
if s3_object_key == "":
shared_logger.debug("s3 object key is empty, dataset set to `generic`")
Expand Down
96 changes: 94 additions & 2 deletions tests/handlers/aws/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ def test_get_trigger_type_and_config_source(self) -> None:
assert get_trigger_type_and_config_source(event=event) == ("cloudwatch-logs", CONFIG_FROM_S3FILE)

with self.subTest("no Records"):
with self.assertRaisesRegexp(Exception, "Not supported trigger"):
with self.assertRaisesRegex(Exception, "Not supported trigger"):
event = {}

get_trigger_type_and_config_source(event=event)

with self.subTest("len(Records) < 1"):
with self.assertRaisesRegexp(Exception, "Not supported trigger"):
with self.assertRaisesRegex(Exception, "Not supported trigger"):
event = {"Records": []}

get_trigger_type_and_config_source(event=event)
Expand Down Expand Up @@ -460,3 +460,95 @@ def test_without_variables(self) -> None:

with pytest.raises(ValueError):
get_lambda_region()


@pytest.mark.unit
class TestSummarizeLambdaEvent(TestCase):

def test_with_single_s3_sqs_record(self) -> None:
from handlers.aws.utils import summarize_lambda_event

event = {
"Records": [
{
"body": '{"Records":[{"awsRegion":"eu-west-1","eventName":"ObjectCreated:Put","eventSource":"aws:s3","eventVersion":"2.1","s3":{"bucket":{"arn":"arn:aws:s3:::mbranca-esf-data","name":"mbranca-esf-data"},"object":{"key":"AWSLogs/627286350134/CloudTrail-Digest/"}}}]}',
"eventSource": "aws:sqs",
}
]
}

summary = summarize_lambda_event(event=event)

assert summary == {
"aws:sqs": {
"total_records": 1,
"records": [
{
"bucket": {"arn": "arn:aws:s3:::mbranca-esf-data", "name": "mbranca-esf-data"},
"object": {"key": "AWSLogs/627286350134/CloudTrail-Digest/"},
}
],
}
}

def test_with_multiple_s3_sqs_records(self) -> None:
from handlers.aws.utils import summarize_lambda_event

event = {
"Records": [
{
"body": '{"Records":[{"awsRegion":"eu-west-1","eventName":"ObjectCreated:Put","eventSource":"aws:s3","eventVersion":"2.1","s3":{"bucket":{"arn":"arn:aws:s3:::mbranca-esf-data","name":"mbranca-esf-data"},"object":{"key":"AWSLogs/123456789012/1.log"}}},{"awsRegion":"eu-west-1","eventName":"ObjectCreated:Put","eventSource":"aws:s3","eventVersion":"2.1","s3":{"bucket":{"arn":"arn:aws:s3:::mbranca-esf-data","name":"mbranca-esf-data"},"object":{"key":"AWSLogs/123456789012/2.log"}}}]}',
"eventSource": "aws:sqs",
}
]
}

with self.subTest("no limits"):
summary = summarize_lambda_event(event=event)

assert summary == {
"aws:sqs": {
"total_records": 2,
"records": [
{
"bucket": {"arn": "arn:aws:s3:::mbranca-esf-data", "name": "mbranca-esf-data"},
"object": {"key": "AWSLogs/123456789012/1.log"},
},
{
"bucket": {"arn": "arn:aws:s3:::mbranca-esf-data", "name": "mbranca-esf-data"},
"object": {"key": "AWSLogs/123456789012/2.log"},
},
],
}
}

with self.subTest("with limits"):
summary = summarize_lambda_event(event=event, max_records=1)

assert summary == {
"aws:sqs": {
"total_records": 2,
"records": [
{
"bucket": {"arn": "arn:aws:s3:::mbranca-esf-data", "name": "mbranca-esf-data"},
"object": {"key": "AWSLogs/123456789012/1.log"},
}
],
}
}

def test_with_invalid_s3_sqs_notification(self) -> None:
from handlers.aws.utils import summarize_lambda_event

event = {
"Records": [
{
"body": "I am not a valid JSON string.",
"eventSource": "aws:sqs",
}
]
}

summary = summarize_lambda_event(event)

assert summary == {}

0 comments on commit 0757e7b

Please sign in to comment.