From 710fee651195a3bf5e4da0c7299a7c7a431efc13 Mon Sep 17 00:00:00 2001 From: constanca Date: Wed, 17 Apr 2024 11:52:34 +0200 Subject: [PATCH] . --- tests/handlers/aws/test_integrations.py | 1713 ----------------------- 1 file changed, 1713 deletions(-) diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index 3390a568..7efffb58 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -672,1716 +672,3 @@ def test_continuing(self) -> None: assert res["hits"]["hits"][7]["_source"]["message"] == logstash_message[7]["message"] assert res["hits"]["hits"][7]["_source"]["tags"] == logstash_message[7]["tags"] - def test_continuing_no_timeout_input_from_originalEventSourceARN_message_attribute(self) -> None: - assert isinstance(self.logstash, LogstashContainer) - assert isinstance(self.localstack, LocalStackContainer) - - fixtures = [ - _load_file_fixture("cloudwatch-log-1.json"), - _load_file_fixture("cloudwatch-log-2.json"), - _load_file_fixture("cloudwatch-log-3.json"), - ] - - sqs_queue_name = _time_based_id(suffix="source-sqs") - - sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) - - sqs_queue_arn = sqs_queue["QueueArn"] - sqs_queue_url = sqs_queue["QueueUrl"] - sqs_queue_url_path = sqs_queue["QueueUrlPath"] - - _sqs_send_messages(self.sqs_client, sqs_queue_url, fixtures[0]) - _sqs_send_messages(self.sqs_client, sqs_queue_url, fixtures[1]) - _sqs_send_messages(self.sqs_client, sqs_queue_url, fixtures[2]) - - config_yaml: str = f""" - inputs: - - type: sqs - id: "{sqs_queue_arn}" - tags: {self.default_tags} - outputs: - - type: "logstash" - args: - logstash_url: "{self.logstash.get_url()}" - ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} - username: "{self.logstash.logstash_user}" - password: "{self.logstash.logstash_password}" - """ - - config_file_path = "config.yaml" - config_bucket_name = _time_based_id(suffix="config-bucket") - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - ) - - os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - - events_sqs, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) - - first_message_id = events_sqs["Records"][0]["messageId"] - second_message_id = events_sqs["Records"][1]["messageId"] - - ctx = ContextMock() - first_call = handler(events_sqs, ctx) # type:ignore - - assert first_call == "continuing" - - logstash_message = self.logstash.get_messages(expected=1) - assert len(logstash_message) == 1 - - assert logstash_message[0]["message"] == fixtures[0].rstrip("\n") - assert logstash_message[0]["log"]["offset"] == 0 - assert logstash_message[0]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[0]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[0]["aws"]["sqs"]["message_id"] == first_message_id - assert logstash_message[0]["cloud"]["provider"] == "aws" - assert logstash_message[0]["cloud"]["region"] == "us-east-1" - assert logstash_message[0]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[0]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - continued_events, _ = _sqs_get_messages( - self.sqs_client, os.environ["SQS_CONTINUE_URL"], self.sqs_continue_queue_arn - ) - - continued_events["Records"][2]["messageAttributes"]["originalEventSourceARN"][ - "stringValue" - ] += "-not-configured-arn" - second_call = handler(continued_events, ctx) # type:ignore - - assert second_call == "continuing" - - logstash_message = self.logstash.get_messages(expected=2) - assert len(logstash_message) == 2 - - assert logstash_message[1]["message"] == fixtures[1].rstrip("\n") - assert logstash_message[1]["log"]["offset"] == 0 - assert logstash_message[1]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[1]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[1]["aws"]["sqs"]["message_id"] == second_message_id - assert logstash_message[1]["cloud"]["provider"] == "aws" - assert logstash_message[1]["cloud"]["region"] == "us-east-1" - assert logstash_message[1]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[1]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) - continued_events, _ = _sqs_get_messages( - self.sqs_client, os.environ["SQS_CONTINUE_URL"], self.sqs_continue_queue_arn - ) - - third_call = handler(continued_events, ctx) # type:ignore - - assert third_call == "completed" - - logstash_message = self.logstash.get_messages(expected=2) - assert len(logstash_message) == 2 - - def test_replay(self) -> None: - assert isinstance(self.elasticsearch, ElasticsearchContainer) - assert isinstance(self.logstash, LogstashContainer) - assert isinstance(self.localstack, LocalStackContainer) - - fixtures = [ - _load_file_fixture("cloudwatch-log-1.json"), - _load_file_fixture("cloudwatch-log-2.json"), - ] - - s3_bucket_name = _time_based_id(suffix="test-bucket") - first_filename = "exportedlog/uuid/yyyy-mm-dd-[$LATEST]hash/000000.gz" - _s3_upload_content_to_bucket( - client=self.s3_client, - content=gzip.compress("".join(fixtures).encode("utf-8")), - content_type="application/x-gzip", - bucket_name=s3_bucket_name, - key=first_filename, - ) - - cloudwatch_group_name = _time_based_id(suffix="source-group") - cloudwatch_group = _logs_create_cloudwatch_logs_group(self.logs_client, group_name=cloudwatch_group_name) - - cloudwatch_stream_name = _time_based_id(suffix="source-stream") - _logs_create_cloudwatch_logs_stream( - self.logs_client, group_name=cloudwatch_group_name, stream_name=cloudwatch_stream_name - ) - - _logs_upload_event_to_cloudwatch_logs( - self.logs_client, - group_name=cloudwatch_group_name, - stream_name=cloudwatch_stream_name, - messages_body=["".join(fixtures)], - ) - - cloudwatch_group_arn = cloudwatch_group["arn"] - - cloudwatch_group_name = cloudwatch_group_name - cloudwatch_stream_name = cloudwatch_stream_name - - sqs_queue_name = _time_based_id(suffix="source-sqs") - s3_sqs_queue_name = _time_based_id(suffix="source-s3-sqs") - - sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) - s3_sqs_queue = _sqs_create_queue(self.sqs_client, s3_sqs_queue_name, self.localstack.get_url()) - - sqs_queue_arn = sqs_queue["QueueArn"] - sqs_queue_url = sqs_queue["QueueUrl"] - sqs_queue_url_path = sqs_queue["QueueUrlPath"] - - s3_sqs_queue_arn = s3_sqs_queue["QueueArn"] - s3_sqs_queue_url = s3_sqs_queue["QueueUrl"] - - _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) - _sqs_send_s3_notifications(self.sqs_client, s3_sqs_queue_url, s3_bucket_name, [first_filename]) - - kinesis_stream_name = _time_based_id(suffix="source-kinesis") - kinesis_stream = _kinesis_create_stream(self.kinesis_client, kinesis_stream_name) - kinesis_stream_arn = kinesis_stream["StreamDescription"]["StreamARN"] - - _kinesis_put_records(self.kinesis_client, kinesis_stream_name, ["".join(fixtures)]) - - # the way to let logstash fail is to give wrong credentials - config_yaml: str = f""" - inputs: - - type: "kinesis-data-stream" - id: "{kinesis_stream_arn}" - tags: {self.default_tags} - outputs: - - type: "elasticsearch" - args: - elasticsearch_url: "{self.elasticsearch.get_url()}" - ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} - username: "{self.secret_arn}:username" - password: "{self.secret_arn}:password" - - type: "logstash" - args: - logstash_url: "{self.logstash.get_url()}" - ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} - username: "wrong_username" - password: "wrong_username" - - type: "cloudwatch-logs" - id: "{cloudwatch_group_arn}" - tags: {self.default_tags} - outputs: - - type: "elasticsearch" - args: - elasticsearch_url: "{self.elasticsearch.get_url()}" - ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} - username: "{self.secret_arn}:username" - password: "{self.secret_arn}:password" - - type: "logstash" - args: - logstash_url: "{self.logstash.get_url()}" - ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} - username: "wrong_username" - password: "wrong_username" - - type: sqs - id: "{sqs_queue_arn}" - tags: {self.default_tags} - outputs: - - type: "elasticsearch" - args: - elasticsearch_url: "{self.elasticsearch.get_url()}" - ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} - username: "{self.secret_arn}:username" - password: "{self.secret_arn}:password" - - type: "logstash" - args: - logstash_url: "{self.logstash.get_url()}" - ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} - username: "wrong_username" - password: "wrong_username" - - type: s3-sqs - id: "{s3_sqs_queue_arn}" - tags: {self.default_tags} - outputs: - - type: "elasticsearch" - args: - elasticsearch_url: "{self.elasticsearch.get_url()}" - ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} - username: "{self.secret_arn}:username" - password: "{self.secret_arn}:password" - - type: "logstash" - args: - logstash_url: "{self.logstash.get_url()}" - ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} - username: "wrong_username" - password: "wrong_username" - """ - - config_file_path = "config.yaml" - config_bucket_name = _time_based_id(suffix="config-bucket") - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - ) - - os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - - events_s3, _ = _sqs_get_messages(self.sqs_client, s3_sqs_queue_url, s3_sqs_queue_arn) - - bucket_arn: str = f"arn:aws:s3:::{s3_bucket_name}" - event_time = int( - datetime.datetime.strptime(_S3_NOTIFICATION_EVENT_TIME, "%Y-%m-%dT%H:%M:%S.%fZ").timestamp() * 1000 - ) - - hash_first = get_hex_prefix(f"{bucket_arn}-{first_filename}") - prefix_s3_first = f"{event_time}-{hash_first}" - - events_sqs, events_sent_timestamps_sqs = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) - - message_id = events_sqs["Records"][0]["messageId"] - hash_sqs = get_hex_prefix(f"{sqs_queue_name}-{message_id}") - prefix_sqs: str = f"{events_sent_timestamps_sqs[0]}-{hash_sqs}" - - ( - events_cloudwatch_logs, - event_ids_cloudwatch_logs, - event_timestamps_cloudwatch_logs, - ) = _logs_retrieve_event_from_cloudwatch_logs(self.logs_client, cloudwatch_group_name, cloudwatch_stream_name) - - hash_cw_logs = get_hex_prefix( - f"{cloudwatch_group_name}-{cloudwatch_stream_name}-{event_ids_cloudwatch_logs[0]}" - ) - prefix_cloudwatch_logs = f"{event_timestamps_cloudwatch_logs[0]}-{hash_cw_logs}" - - events_kinesis, event_timestamps_kinesis_records = _kinesis_retrieve_event_from_kinesis_stream( - self.kinesis_client, kinesis_stream_name, kinesis_stream_arn - ) - sequence_number = events_kinesis["Records"][0]["kinesis"]["sequenceNumber"] - hash_kinesis_record = get_hex_prefix(f"stream-{kinesis_stream_name}-PartitionKey-{sequence_number}") - prefix_kinesis = f"{int(float(event_timestamps_kinesis_records[0]) * 1000)}-{hash_kinesis_record}" - - # Create an expected id for s3-sqs so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_s3_first}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, - ) - - # Create an expected id for sqs so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_sqs}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, - ) - - # Create an expected id for cloudwatch-logs so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_cloudwatch_logs}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, - ) - - # Create an expected id for kinesis-data-stream so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_kinesis}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, - ) - - self.elasticsearch.refresh(index="logs-generic-default") - - res = self.elasticsearch.search(index="logs-generic-default") - assert res["hits"]["total"] == {"value": 4, "relation": "eq"} - - ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) - - first_call = handler(events_s3, ctx) # type:ignore - - assert first_call == "completed" - - self.elasticsearch.refresh(index="logs-generic-default") - res = self.elasticsearch.search( - index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, - sort="_seq_no", - ) - - assert res["hits"]["total"] == {"value": 1, "relation": "eq"} - - assert res["hits"]["hits"][0]["_source"]["message"] == fixtures[1].rstrip("\n") - assert res["hits"]["hits"][0]["_source"]["log"]["offset"] == 94 - assert ( - res["hits"]["hits"][0]["_source"]["log"]["file"]["path"] - == f"https://{s3_bucket_name}.s3.eu-central-1.amazonaws.com/{first_filename}" - ) - assert res["hits"]["hits"][0]["_source"]["aws"]["s3"]["bucket"]["name"] == s3_bucket_name - assert res["hits"]["hits"][0]["_source"]["aws"]["s3"]["bucket"]["arn"] == f"arn:aws:s3:::{s3_bucket_name}" - assert res["hits"]["hits"][0]["_source"]["aws"]["s3"]["object"]["key"] == first_filename - assert res["hits"]["hits"][0]["_source"]["cloud"]["provider"] == "aws" - assert res["hits"]["hits"][0]["_source"]["cloud"]["region"] == "eu-central-1" - assert res["hits"]["hits"][0]["_source"]["cloud"]["account"]["id"] == "000000000000" - assert res["hits"]["hits"][0]["_source"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] - - logstash_message = self.logstash.get_messages(expected=0) - assert len(logstash_message) == 0 - - second_call = handler(events_sqs, ctx) # type:ignore - - assert second_call == "completed" - - self.elasticsearch.refresh(index="logs-generic-default") - res = self.elasticsearch.search( - index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, - sort="_seq_no", - ) - - assert res["hits"]["total"] == {"value": 2, "relation": "eq"} - - assert res["hits"]["hits"][1]["_source"]["message"] == fixtures[1].rstrip("\n") - assert res["hits"]["hits"][1]["_source"]["log"]["offset"] == 94 - assert res["hits"]["hits"][1]["_source"]["log"]["file"]["path"] == sqs_queue_url_path - assert res["hits"]["hits"][1]["_source"]["aws"]["sqs"]["name"] == sqs_queue_name - assert res["hits"]["hits"][1]["_source"]["aws"]["sqs"]["message_id"] == message_id - assert res["hits"]["hits"][1]["_source"]["cloud"]["provider"] == "aws" - assert res["hits"]["hits"][1]["_source"]["cloud"]["region"] == "us-east-1" - assert res["hits"]["hits"][1]["_source"]["cloud"]["account"]["id"] == "000000000000" - assert res["hits"]["hits"][1]["_source"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] - - logstash_message = self.logstash.get_messages(expected=0) - assert len(logstash_message) == 0 - - third_call = handler(events_cloudwatch_logs, ctx) # type:ignore - - assert third_call == "completed" - - self.elasticsearch.refresh(index="logs-generic-default") - res = self.elasticsearch.search( - index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, - sort="_seq_no", - ) - - assert res["hits"]["total"] == {"value": 3, "relation": "eq"} - - assert res["hits"]["hits"][2]["_source"]["message"] == fixtures[1].rstrip("\n") - assert res["hits"]["hits"][2]["_source"]["log"]["offset"] == 94 - assert ( - res["hits"]["hits"][2]["_source"]["log"]["file"]["path"] - == f"{cloudwatch_group_name}/{cloudwatch_stream_name}" - ) - assert res["hits"]["hits"][2]["_source"]["aws"]["cloudwatch"]["log_group"] == cloudwatch_group_name - assert res["hits"]["hits"][2]["_source"]["aws"]["cloudwatch"]["log_stream"] == cloudwatch_stream_name - assert res["hits"]["hits"][2]["_source"]["aws"]["cloudwatch"]["event_id"] == event_ids_cloudwatch_logs[0] - assert res["hits"]["hits"][2]["_source"]["cloud"]["provider"] == "aws" - assert res["hits"]["hits"][2]["_source"]["cloud"]["region"] == "us-east-1" - assert res["hits"]["hits"][2]["_source"]["cloud"]["account"]["id"] == "000000000000" - assert res["hits"]["hits"][2]["_source"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] - - logstash_message = self.logstash.get_messages(expected=0) - assert len(logstash_message) == 0 - - fourth_call = handler(events_kinesis, ctx) # type:ignore - - assert fourth_call == "completed" - - self.elasticsearch.refresh(index="logs-generic-default") - res = self.elasticsearch.search( - index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, - sort="_seq_no", - ) - - assert res["hits"]["total"] == {"value": 4, "relation": "eq"} - - assert res["hits"]["hits"][3]["_source"]["message"] == fixtures[1].rstrip("\n") - assert res["hits"]["hits"][3]["_source"]["log"]["offset"] == 94 - assert res["hits"]["hits"][3]["_source"]["log"]["file"]["path"] == kinesis_stream_arn - assert res["hits"]["hits"][3]["_source"]["aws"]["kinesis"]["type"] == "stream" - assert res["hits"]["hits"][3]["_source"]["aws"]["kinesis"]["partition_key"] == "PartitionKey" - assert res["hits"]["hits"][3]["_source"]["aws"]["kinesis"]["name"] == kinesis_stream_name - assert ( - res["hits"]["hits"][3]["_source"]["aws"]["kinesis"]["sequence_number"] - == events_kinesis["Records"][0]["kinesis"]["sequenceNumber"] - ) - assert res["hits"]["hits"][3]["_source"]["cloud"]["provider"] == "aws" - assert res["hits"]["hits"][3]["_source"]["cloud"]["region"] == "us-east-1" - assert res["hits"]["hits"][3]["_source"]["cloud"]["account"]["id"] == "000000000000" - assert res["hits"]["hits"][3]["_source"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] - - logstash_message = self.logstash.get_messages(expected=0) - assert len(logstash_message) == 0 - - replayed_events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) - with self.assertRaises(ReplayHandlerException): - handler(replayed_events, ctx) # type:ignore - - self.elasticsearch.refresh(index="logs-generic-default") - - # Remove the expected id for s3-sqs so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", body={"query": {"ids": {"values": [f"{prefix_s3_first}-000000000000"]}}} - ) - - # Remove the expected id for sqs so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", body={"query": {"ids": {"values": [f"{prefix_sqs}-000000000000"]}}} - ) - - # Remove the expected id for cloudwatch logs so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", - body={"query": {"ids": {"values": [f"{prefix_cloudwatch_logs}-000000000000"]}}}, - ) - - # Remove the expected id for kinesis data stream so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", - body={"query": {"ids": {"values": [f"{prefix_kinesis}-000000000000"]}}}, - ) - - self.elasticsearch.refresh(index="logs-generic-default") - - # let's update the config file so that logstash won't fail anymore - config_yaml = f""" - inputs: - - type: "kinesis-data-stream" - id: "{kinesis_stream_arn}" - tags: {self.default_tags} - outputs: {self.default_outputs} - - type: "cloudwatch-logs" - id: "{cloudwatch_group_arn}" - tags: {self.default_tags} - outputs: {self.default_outputs} - - type: sqs - id: "{sqs_queue_arn}" - tags: {self.default_tags} - outputs: {self.default_outputs} - - type: s3-sqs - id: "{s3_sqs_queue_arn}" - tags: {self.default_tags} - outputs: {self.default_outputs} - """ - - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - create_bucket=False, - ) - - ctx = ContextMock(remaining_time_in_millis=_REMAINING_TIME_FORCE_CONTINUE_0ms) - - # implicit wait for the message to be back on the queue - time.sleep(35) - replayed_events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) - fifth_call = handler(replayed_events, ctx) # type:ignore - - assert fifth_call == "replayed" - - self.elasticsearch.refresh(index="logs-generic-default") - assert self.elasticsearch.count(index="logs-generic-default")["count"] == 5 - - self.elasticsearch.refresh(index="logs-generic-default") - res = self.elasticsearch.search(index="logs-generic-default", sort="_seq_no") - - assert res["hits"]["total"] == {"value": 5, "relation": "eq"} - - assert res["hits"]["hits"][4]["_source"]["message"] == fixtures[0].rstrip("\n") - assert res["hits"]["hits"][4]["_source"]["log"]["offset"] == 0 - assert ( - res["hits"]["hits"][4]["_source"]["log"]["file"]["path"] - == f"https://{s3_bucket_name}.s3.eu-central-1.amazonaws.com/{first_filename}" - ) - assert res["hits"]["hits"][4]["_source"]["aws"]["s3"]["bucket"]["name"] == s3_bucket_name - assert res["hits"]["hits"][4]["_source"]["aws"]["s3"]["bucket"]["arn"] == f"arn:aws:s3:::{s3_bucket_name}" - assert res["hits"]["hits"][4]["_source"]["aws"]["s3"]["object"]["key"] == first_filename - assert res["hits"]["hits"][4]["_source"]["cloud"]["provider"] == "aws" - assert res["hits"]["hits"][4]["_source"]["cloud"]["region"] == "eu-central-1" - assert res["hits"]["hits"][4]["_source"]["cloud"]["account"]["id"] == "000000000000" - assert res["hits"]["hits"][4]["_source"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] - - logstash_message = self.logstash.get_messages(expected=0) - assert len(logstash_message) == 0 - - # implicit wait for the message to be back on the queue - time.sleep(35) - replayed_events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) - sixth_call = handler(replayed_events, ctx) # type:ignore - - assert sixth_call == "replayed" - - self.elasticsearch.refresh(index="logs-generic-default") - assert self.elasticsearch.count(index="logs-generic-default")["count"] == 5 - - res = self.elasticsearch.search(index="logs-generic-default", sort="_seq_no") - assert res["hits"]["total"] == {"value": 5, "relation": "eq"} - - logstash_message = self.logstash.get_messages(expected=1) - assert len(logstash_message) == 1 - # positions on res["hits"]["hits"] are skewed compared to logstash_message - # in elasticsearch we inserted the second event of each input before the first one - res["hits"]["hits"][4]["_source"]["tags"].remove("generic") - assert res["hits"]["hits"][4]["_source"]["aws"] == logstash_message[0]["aws"] - assert res["hits"]["hits"][4]["_source"]["cloud"] == logstash_message[0]["cloud"] - assert res["hits"]["hits"][4]["_source"]["log"] == logstash_message[0]["log"] - assert res["hits"]["hits"][4]["_source"]["message"] == logstash_message[0]["message"] - assert res["hits"]["hits"][4]["_source"]["tags"] == logstash_message[0]["tags"] - - ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) - - # implicit wait for the message to be back on the queue - time.sleep(35) - replayed_events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) - seventh_call = handler(replayed_events, ctx) # type:ignore - - assert seventh_call == "replayed" - - self.elasticsearch.refresh(index="logs-generic-default") - assert self.elasticsearch.count(index="logs-generic-default")["count"] == 8 - - self.elasticsearch.refresh(index="logs-generic-default") - res = self.elasticsearch.search(index="logs-generic-default", sort="_seq_no") - - assert res["hits"]["total"] == {"value": 8, "relation": "eq"} - - assert res["hits"]["hits"][5]["_source"]["message"] == fixtures[0].rstrip("\n") - assert res["hits"]["hits"][5]["_source"]["log"]["offset"] == 0 - assert res["hits"]["hits"][5]["_source"]["log"]["file"]["path"] == sqs_queue_url_path - assert res["hits"]["hits"][5]["_source"]["aws"]["sqs"]["name"] == sqs_queue_name - assert res["hits"]["hits"][5]["_source"]["aws"]["sqs"]["message_id"] == message_id - assert res["hits"]["hits"][5]["_source"]["cloud"]["provider"] == "aws" - assert res["hits"]["hits"][5]["_source"]["cloud"]["region"] == "us-east-1" - assert res["hits"]["hits"][5]["_source"]["cloud"]["account"]["id"] == "000000000000" - assert res["hits"]["hits"][5]["_source"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] - - assert res["hits"]["hits"][6]["_source"]["message"] == fixtures[0].rstrip("\n") - assert res["hits"]["hits"][6]["_source"]["log"]["offset"] == 0 - assert ( - res["hits"]["hits"][6]["_source"]["log"]["file"]["path"] - == f"{cloudwatch_group_name}/{cloudwatch_stream_name}" - ) - assert res["hits"]["hits"][6]["_source"]["aws"]["cloudwatch"]["log_group"] == cloudwatch_group_name - assert res["hits"]["hits"][6]["_source"]["aws"]["cloudwatch"]["log_stream"] == cloudwatch_stream_name - assert res["hits"]["hits"][6]["_source"]["aws"]["cloudwatch"]["event_id"] == event_ids_cloudwatch_logs[0] - assert res["hits"]["hits"][6]["_source"]["cloud"]["provider"] == "aws" - assert res["hits"]["hits"][6]["_source"]["cloud"]["region"] == "us-east-1" - assert res["hits"]["hits"][6]["_source"]["cloud"]["account"]["id"] == "000000000000" - assert res["hits"]["hits"][6]["_source"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] - - assert res["hits"]["hits"][7]["_source"]["message"] == fixtures[0].rstrip("\n") - assert res["hits"]["hits"][7]["_source"]["log"]["offset"] == 0 - assert res["hits"]["hits"][7]["_source"]["log"]["file"]["path"] == kinesis_stream_arn - assert res["hits"]["hits"][7]["_source"]["aws"]["kinesis"]["type"] == "stream" - assert res["hits"]["hits"][7]["_source"]["aws"]["kinesis"]["partition_key"] == "PartitionKey" - assert res["hits"]["hits"][7]["_source"]["aws"]["kinesis"]["name"] == kinesis_stream_name - assert ( - res["hits"]["hits"][7]["_source"]["aws"]["kinesis"]["sequence_number"] - == events_kinesis["Records"][0]["kinesis"]["sequenceNumber"] - ) - assert res["hits"]["hits"][7]["_source"]["cloud"]["provider"] == "aws" - assert res["hits"]["hits"][7]["_source"]["cloud"]["region"] == "us-east-1" - assert res["hits"]["hits"][7]["_source"]["cloud"]["account"]["id"] == "000000000000" - assert res["hits"]["hits"][7]["_source"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] - - logstash_message = self.logstash.get_messages(expected=8) - assert len(logstash_message) == 8 - res["hits"]["hits"][0]["_source"]["tags"].remove("generic") - res["hits"]["hits"][1]["_source"]["tags"].remove("generic") - res["hits"]["hits"][2]["_source"]["tags"].remove("generic") - res["hits"]["hits"][3]["_source"]["tags"].remove("generic") - res["hits"]["hits"][5]["_source"]["tags"].remove("generic") - res["hits"]["hits"][6]["_source"]["tags"].remove("generic") - res["hits"]["hits"][7]["_source"]["tags"].remove("generic") - - # positions on res["hits"]["hits"] are skewed compared to logstash_message - # in elasticsearch we inserted the second event of each input before the first one - assert res["hits"]["hits"][0]["_source"]["aws"] == logstash_message[1]["aws"] - assert res["hits"]["hits"][0]["_source"]["cloud"] == logstash_message[1]["cloud"] - assert res["hits"]["hits"][0]["_source"]["log"] == logstash_message[1]["log"] - assert res["hits"]["hits"][0]["_source"]["message"] == logstash_message[1]["message"] - assert res["hits"]["hits"][0]["_source"]["tags"] == logstash_message[1]["tags"] - - assert res["hits"]["hits"][5]["_source"]["aws"] == logstash_message[2]["aws"] - assert res["hits"]["hits"][5]["_source"]["cloud"] == logstash_message[2]["cloud"] - assert res["hits"]["hits"][5]["_source"]["log"] == logstash_message[2]["log"] - assert res["hits"]["hits"][5]["_source"]["message"] == logstash_message[2]["message"] - assert res["hits"]["hits"][5]["_source"]["tags"] == logstash_message[2]["tags"] - - assert res["hits"]["hits"][1]["_source"]["aws"] == logstash_message[3]["aws"] - assert res["hits"]["hits"][1]["_source"]["cloud"] == logstash_message[3]["cloud"] - assert res["hits"]["hits"][1]["_source"]["log"] == logstash_message[3]["log"] - assert res["hits"]["hits"][1]["_source"]["message"] == logstash_message[3]["message"] - assert res["hits"]["hits"][1]["_source"]["tags"] == logstash_message[3]["tags"] - - assert res["hits"]["hits"][6]["_source"]["aws"] == logstash_message[4]["aws"] - assert res["hits"]["hits"][6]["_source"]["cloud"] == logstash_message[4]["cloud"] - assert res["hits"]["hits"][6]["_source"]["log"] == logstash_message[4]["log"] - assert res["hits"]["hits"][6]["_source"]["message"] == logstash_message[4]["message"] - assert res["hits"]["hits"][6]["_source"]["tags"] == logstash_message[4]["tags"] - - assert res["hits"]["hits"][2]["_source"]["aws"] == logstash_message[5]["aws"] - assert res["hits"]["hits"][2]["_source"]["cloud"] == logstash_message[5]["cloud"] - assert res["hits"]["hits"][2]["_source"]["log"] == logstash_message[5]["log"] - assert res["hits"]["hits"][2]["_source"]["message"] == logstash_message[5]["message"] - assert res["hits"]["hits"][2]["_source"]["tags"] == logstash_message[5]["tags"] - - assert res["hits"]["hits"][7]["_source"]["aws"] == logstash_message[6]["aws"] - assert res["hits"]["hits"][7]["_source"]["cloud"] == logstash_message[6]["cloud"] - assert res["hits"]["hits"][7]["_source"]["log"] == logstash_message[6]["log"] - assert res["hits"]["hits"][7]["_source"]["message"] == logstash_message[6]["message"] - assert res["hits"]["hits"][7]["_source"]["tags"] == logstash_message[6]["tags"] - - assert res["hits"]["hits"][3]["_source"]["aws"] == logstash_message[7]["aws"] - assert res["hits"]["hits"][3]["_source"]["cloud"] == logstash_message[7]["cloud"] - assert res["hits"]["hits"][3]["_source"]["log"] == logstash_message[7]["log"] - assert res["hits"]["hits"][3]["_source"]["message"] == logstash_message[7]["message"] - assert res["hits"]["hits"][3]["_source"]["tags"] == logstash_message[7]["tags"] - - def test_empty(self) -> None: - assert isinstance(self.elasticsearch, ElasticsearchContainer) - assert isinstance(self.logstash, LogstashContainer) - assert isinstance(self.localstack, LocalStackContainer) - - fixtures = [" \n"] # once stripped it is an empty event - - s3_bucket_name = _time_based_id(suffix="test-bucket") - first_filename = "exportedlog/uuid/yyyy-mm-dd-[$LATEST]hash/000000.gz" - _s3_upload_content_to_bucket( - client=self.s3_client, - content=gzip.compress("".join(fixtures).encode("utf-8")), - content_type="application/x-gzip", - bucket_name=s3_bucket_name, - key=first_filename, - ) - - cloudwatch_group_name = _time_based_id(suffix="source-group") - cloudwatch_group = _logs_create_cloudwatch_logs_group(self.logs_client, group_name=cloudwatch_group_name) - - cloudwatch_stream_name = _time_based_id(suffix="source-stream") - _logs_create_cloudwatch_logs_stream( - self.logs_client, group_name=cloudwatch_group_name, stream_name=cloudwatch_stream_name - ) - - _logs_upload_event_to_cloudwatch_logs( - self.logs_client, - group_name=cloudwatch_group_name, - stream_name=cloudwatch_stream_name, - messages_body=["".join(fixtures)], - ) - - cloudwatch_group_arn = cloudwatch_group["arn"] - - cloudwatch_group_name = cloudwatch_group_name - cloudwatch_stream_name = cloudwatch_stream_name - - sqs_queue_name = _time_based_id(suffix="source-sqs") - s3_sqs_queue_name = _time_based_id(suffix="source-s3-sqs") - - sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) - s3_sqs_queue = _sqs_create_queue(self.sqs_client, s3_sqs_queue_name, self.localstack.get_url()) - - sqs_queue_arn = sqs_queue["QueueArn"] - sqs_queue_url = sqs_queue["QueueUrl"] - - s3_sqs_queue_arn = s3_sqs_queue["QueueArn"] - s3_sqs_queue_url = s3_sqs_queue["QueueUrl"] - - _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) - _sqs_send_s3_notifications(self.sqs_client, s3_sqs_queue_url, s3_bucket_name, [first_filename]) - - kinesis_stream_name = _time_based_id(suffix="source-kinesis") - kinesis_stream = _kinesis_create_stream(self.kinesis_client, kinesis_stream_name) - kinesis_stream_arn = kinesis_stream["StreamDescription"]["StreamARN"] - - _kinesis_put_records(self.kinesis_client, kinesis_stream_name, ["".join(fixtures)]) - - config_yaml: str = f""" - inputs: - - type: "kinesis-data-stream" - id: "{kinesis_stream_arn}" - outputs: {self.default_outputs} - - type: "cloudwatch-logs" - id: "{cloudwatch_group_arn}" - outputs: {self.default_outputs} - - type: sqs - id: "{sqs_queue_arn}" - outputs: {self.default_outputs} - - type: s3-sqs - id: "{s3_sqs_queue_arn}" - outputs: {self.default_outputs} - """ - - config_file_path = "config.yaml" - config_bucket_name = _time_based_id(suffix="config-bucket") - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - ) - - os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - - events_s3, _ = _sqs_get_messages(self.sqs_client, s3_sqs_queue_url, s3_sqs_queue_arn) - - events_sqs, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) - - events_cloudwatch_logs, _, _ = _logs_retrieve_event_from_cloudwatch_logs( - self.logs_client, cloudwatch_group_name, cloudwatch_stream_name - ) - - events_kinesis, _ = _kinesis_retrieve_event_from_kinesis_stream( - self.kinesis_client, kinesis_stream_name, kinesis_stream_arn - ) - - ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) - first_call = handler(events_s3, ctx) # type:ignore - - assert first_call == "completed" - - self.elasticsearch.refresh(index="logs-generic-default", ignore_unavailable=True) - assert self.elasticsearch.count(index="logs-generic-default", ignore_unavailable=True)["count"] == 0 - - logstash_message = self.logstash.get_messages(expected=0) - assert len(logstash_message) == 0 - - second_call = handler(events_sqs, ctx) # type:ignore - - assert second_call == "completed" - - self.elasticsearch.refresh(index="logs-generic-default", ignore_unavailable=True) - assert self.elasticsearch.count(index="logs-generic-default", ignore_unavailable=True)["count"] == 0 - - logstash_message = self.logstash.get_messages(expected=0) - assert len(logstash_message) == 0 - - third_call = handler(events_cloudwatch_logs, ctx) # type:ignore - - assert third_call == "completed" - - self.elasticsearch.refresh(index="logs-generic-default", ignore_unavailable=True) - assert self.elasticsearch.count(index="logs-generic-default", ignore_unavailable=True)["count"] == 0 - - logstash_message = self.logstash.get_messages(expected=0) - assert len(logstash_message) == 0 - - fourth_call = handler(events_kinesis, ctx) # type:ignore - - assert fourth_call == "completed" - - self.elasticsearch.refresh(index="logs-generic-default", ignore_unavailable=True) - assert self.elasticsearch.count(index="logs-generic-default", ignore_unavailable=True)["count"] == 0 - - logstash_message = self.logstash.get_messages(expected=0) - assert len(logstash_message) == 0 - - def test_filtered(self) -> None: - assert isinstance(self.elasticsearch, ElasticsearchContainer) - assert isinstance(self.logstash, LogstashContainer) - assert isinstance(self.localstack, LocalStackContainer) - - fixtures = ["excluded"] - - s3_bucket_name = _time_based_id(suffix="test-bucket") - first_filename = "exportedlog/uuid/yyyy-mm-dd-[$LATEST]hash/000000.gz" - _s3_upload_content_to_bucket( - client=self.s3_client, - content=gzip.compress("".join(fixtures).encode("utf-8")), - content_type="application/x-gzip", - bucket_name=s3_bucket_name, - key=first_filename, - ) - - cloudwatch_group_name = _time_based_id(suffix="source-group") - cloudwatch_group = _logs_create_cloudwatch_logs_group(self.logs_client, group_name=cloudwatch_group_name) - - cloudwatch_stream_name = _time_based_id(suffix="source-stream") - _logs_create_cloudwatch_logs_stream( - self.logs_client, group_name=cloudwatch_group_name, stream_name=cloudwatch_stream_name - ) - - _logs_upload_event_to_cloudwatch_logs( - self.logs_client, - group_name=cloudwatch_group_name, - stream_name=cloudwatch_stream_name, - messages_body=["".join(fixtures)], - ) - - cloudwatch_group_arn = cloudwatch_group["arn"] - - cloudwatch_group_name = cloudwatch_group_name - cloudwatch_stream_name = cloudwatch_stream_name - - sqs_queue_name = _time_based_id(suffix="source-sqs") - s3_sqs_queue_name = _time_based_id(suffix="source-s3-sqs") - - sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) - s3_sqs_queue = _sqs_create_queue(self.sqs_client, s3_sqs_queue_name, self.localstack.get_url()) - - sqs_queue_arn = sqs_queue["QueueArn"] - sqs_queue_url = sqs_queue["QueueUrl"] - - s3_sqs_queue_arn = s3_sqs_queue["QueueArn"] - s3_sqs_queue_url = s3_sqs_queue["QueueUrl"] - - _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) - _sqs_send_s3_notifications(self.sqs_client, s3_sqs_queue_url, s3_bucket_name, [first_filename]) - - kinesis_stream_name = _time_based_id(suffix="source-kinesis") - kinesis_stream = _kinesis_create_stream(self.kinesis_client, kinesis_stream_name) - kinesis_stream_arn = kinesis_stream["StreamDescription"]["StreamARN"] - - _kinesis_put_records(self.kinesis_client, kinesis_stream_name, ["".join(fixtures)]) - - config_yaml: str = f""" - inputs: - - type: "kinesis-data-stream" - id: "{kinesis_stream_arn}" - exclude: - - "excluded" - outputs: {self.default_outputs} - - type: "cloudwatch-logs" - id: "{cloudwatch_group_arn}" - exclude: - - "excluded" - outputs: {self.default_outputs} - - type: sqs - id: "{sqs_queue_arn}" - exclude: - - "excluded" - outputs: {self.default_outputs} - - type: s3-sqs - id: "{s3_sqs_queue_arn}" - exclude: - - "excluded" - outputs: {self.default_outputs} - """ - - config_file_path = "config.yaml" - config_bucket_name = _time_based_id(suffix="config-bucket") - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - ) - - os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - - events_s3, _ = _sqs_get_messages(self.sqs_client, s3_sqs_queue_url, s3_sqs_queue_arn) - - events_sqs, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) - - events_cloudwatch_logs, _, _ = _logs_retrieve_event_from_cloudwatch_logs( - self.logs_client, cloudwatch_group_name, cloudwatch_stream_name - ) - - events_kinesis, _ = _kinesis_retrieve_event_from_kinesis_stream( - self.kinesis_client, kinesis_stream_name, kinesis_stream_arn - ) - - ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) - first_call = handler(events_s3, ctx) # type:ignore - - assert first_call == "completed" - - self.elasticsearch.refresh(index="logs-generic-default", ignore_unavailable=True) - assert self.elasticsearch.count(index="logs-generic-default", ignore_unavailable=True)["count"] == 0 - - logstash_message = self.logstash.get_messages(expected=0) - assert len(logstash_message) == 0 - - second_call = handler(events_sqs, ctx) # type:ignore - - assert second_call == "completed" - - self.elasticsearch.refresh(index="logs-generic-default", ignore_unavailable=True) - assert self.elasticsearch.count(index="logs-generic-default", ignore_unavailable=True)["count"] == 0 - - logstash_message = self.logstash.get_messages(expected=0) - assert len(logstash_message) == 0 - - third_call = handler(events_cloudwatch_logs, ctx) # type:ignore - - assert third_call == "completed" - - self.elasticsearch.refresh(index="logs-generic-default", ignore_unavailable=True) - assert self.elasticsearch.count(index="logs-generic-default", ignore_unavailable=True)["count"] == 0 - - logstash_message = self.logstash.get_messages(expected=0) - assert len(logstash_message) == 0 - - fourth_call = handler(events_kinesis, ctx) # type:ignore - - assert fourth_call == "completed" - - self.elasticsearch.refresh(index="logs-generic-default", ignore_unavailable=True) - assert self.elasticsearch.count(index="logs-generic-default", ignore_unavailable=True)["count"] == 0 - - logstash_message = self.logstash.get_messages(expected=0) - assert len(logstash_message) == 0 - - def test_expand_event_from_list_empty_line(self) -> None: - assert isinstance(self.logstash, LogstashContainer) - assert isinstance(self.localstack, LocalStackContainer) - - first_expanded_event: str = _load_file_fixture("cloudwatch-log-1.json") - second_expanded_event: str = _load_file_fixture("cloudwatch-log-2.json") - third_expanded_event: str = _load_file_fixture("cloudwatch-log-3.json") - - fixtures = [ - f"""{{"aField": [{first_expanded_event},{second_expanded_event}]}}\n""" - f"""\n{{"aField": [{third_expanded_event}]}}""" - ] - - sqs_queue_name = _time_based_id(suffix="source-sqs") - - sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) - - sqs_queue_arn = sqs_queue["QueueArn"] - sqs_queue_url = sqs_queue["QueueUrl"] - sqs_queue_url_path = sqs_queue["QueueUrlPath"] - - _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) - - config_yaml: str = f""" - inputs: - - type: "sqs" - id: "{sqs_queue_arn}" - expand_event_list_from_field: aField - tags: {self.default_tags} - outputs: - - type: "logstash" - args: - logstash_url: "{self.logstash.get_url()}" - ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} - username: "{self.logstash.logstash_user}" - password: "{self.logstash.logstash_password}" - """ - - config_file_path = "config.yaml" - config_bucket_name = _time_based_id(suffix="config-bucket") - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - ) - - os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - - events_sqs, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) - - message_id = events_sqs["Records"][0]["messageId"] - - ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) - - first_call = handler(events_sqs, ctx) # type:ignore - - assert first_call == "completed" - - logstash_message = self.logstash.get_messages(expected=3) - assert len(logstash_message) == 3 - - assert logstash_message[0]["message"] == json_dumper(json_parser(first_expanded_event)) - assert logstash_message[0]["log"]["offset"] == 0 - assert logstash_message[0]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[0]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[0]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[0]["cloud"]["provider"] == "aws" - assert logstash_message[0]["cloud"]["region"] == "us-east-1" - assert logstash_message[0]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[0]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - assert logstash_message[1]["message"] == json_dumper(json_parser(second_expanded_event)) - assert logstash_message[1]["log"]["offset"] == 174 - assert logstash_message[1]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[1]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[1]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[1]["cloud"]["provider"] == "aws" - assert logstash_message[1]["cloud"]["region"] == "us-east-1" - assert logstash_message[1]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[1]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - assert logstash_message[2]["message"] == json_dumper(json_parser(third_expanded_event)) - assert logstash_message[2]["log"]["offset"] == 349 - assert logstash_message[2]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[2]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[2]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[2]["cloud"]["provider"] == "aws" - assert logstash_message[2]["cloud"]["region"] == "us-east-1" - assert logstash_message[2]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[2]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - def test_expand_event_from_list_empty_event_not_expanded(self) -> None: - assert isinstance(self.logstash, LogstashContainer) - assert isinstance(self.localstack, LocalStackContainer) - - first_expanded_event: str = _load_file_fixture("cloudwatch-log-1.json") - second_expanded_event: str = _load_file_fixture("cloudwatch-log-2.json") - - fixtures = [f"""{{"aField": [{first_expanded_event},"",{second_expanded_event}]}}"""] - - sqs_queue_name = _time_based_id(suffix="source-sqs") - - sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) - - sqs_queue_arn = sqs_queue["QueueArn"] - sqs_queue_url = sqs_queue["QueueUrl"] - sqs_queue_url_path = sqs_queue["QueueUrlPath"] - - _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) - - config_yaml: str = f""" - inputs: - - type: "sqs" - id: "{sqs_queue_arn}" - expand_event_list_from_field: aField - tags: {self.default_tags} - outputs: - - type: "logstash" - args: - logstash_url: "{self.logstash.get_url()}" - ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} - username: "{self.logstash.logstash_user}" - password: "{self.logstash.logstash_password}" - """ - - config_file_path = "config.yaml" - config_bucket_name = _time_based_id(suffix="config-bucket") - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - ) - - os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - - events_sqs, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) - - message_id = events_sqs["Records"][0]["messageId"] - - ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) - - first_call = handler(events_sqs, ctx) # type:ignore - - assert first_call == "completed" - - logstash_message = self.logstash.get_messages(expected=2) - assert len(logstash_message) == 2 - - assert logstash_message[0]["message"] == json_dumper(json_parser(first_expanded_event)) - assert logstash_message[0]["log"]["offset"] == 0 - assert logstash_message[0]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[0]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[0]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[0]["cloud"]["provider"] == "aws" - assert logstash_message[0]["cloud"]["region"] == "us-east-1" - assert logstash_message[0]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[0]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - assert logstash_message[1]["message"] == json_dumper(json_parser(second_expanded_event)) - assert logstash_message[1]["log"]["offset"] == 233 - assert logstash_message[1]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[1]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[1]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[1]["cloud"]["provider"] == "aws" - assert logstash_message[1]["cloud"]["region"] == "us-east-1" - assert logstash_message[1]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[1]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - def test_root_fields_to_add_to_expanded_event_no_dict_event(self) -> None: - assert isinstance(self.logstash, LogstashContainer) - assert isinstance(self.localstack, LocalStackContainer) - - first_expanded_event: str = '"first_expanded_event"' - second_expanded_event: str = '"second_expanded_event"' - third_expanded_event: str = '"third_expanded_event"' - - fixtures = [ - f"""{{"firstRootField": "firstRootField", "secondRootField":"secondRootField", - "aField": [{first_expanded_event},{second_expanded_event},{third_expanded_event}]}}""" - ] - - sqs_queue_name = _time_based_id(suffix="source-sqs") - - sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) - - sqs_queue_arn = sqs_queue["QueueArn"] - sqs_queue_url = sqs_queue["QueueUrl"] - sqs_queue_url_path = sqs_queue["QueueUrlPath"] - - _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) - - config_yaml: str = f""" - inputs: - - type: "sqs" - id: "{sqs_queue_arn}" - expand_event_list_from_field: aField - root_fields_to_add_to_expanded_event: ["secondRootField"] - tags: {self.default_tags} - outputs: - - type: "logstash" - args: - logstash_url: "{self.logstash.get_url()}" - ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} - username: "{self.logstash.logstash_user}" - password: "{self.logstash.logstash_password}" - """ - - config_file_path = "config.yaml" - config_bucket_name = _time_based_id(suffix="config-bucket") - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - ) - - os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - - events_sqs, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) - - message_id = events_sqs["Records"][0]["messageId"] - - ctx = ContextMock() - first_call = handler(events_sqs, ctx) # type:ignore - - assert first_call == "continuing" - - logstash_message = self.logstash.get_messages(expected=1) - assert len(logstash_message) == 1 - - assert logstash_message[0]["message"] == first_expanded_event - assert logstash_message[0]["log"]["offset"] == 0 - assert logstash_message[0]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[0]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[0]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[0]["cloud"]["provider"] == "aws" - assert logstash_message[0]["cloud"]["region"] == "us-east-1" - assert logstash_message[0]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[0]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) - - continued_events, _ = _sqs_get_messages( - self.sqs_client, os.environ["SQS_CONTINUE_URL"], self.sqs_continue_queue_arn - ) - second_call = handler(continued_events, ctx) # type:ignore - - assert second_call == "completed" - - logstash_message = self.logstash.get_messages(expected=3) - assert len(logstash_message) == 3 - - assert logstash_message[1]["message"] == second_expanded_event - assert logstash_message[1]["log"]["offset"] == 56 - assert logstash_message[1]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[1]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[1]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[1]["cloud"]["provider"] == "aws" - assert logstash_message[1]["cloud"]["region"] == "us-east-1" - assert logstash_message[1]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[1]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - assert logstash_message[2]["message"] == third_expanded_event - assert logstash_message[2]["log"]["offset"] == 112 - assert logstash_message[2]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[2]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[2]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[2]["cloud"]["provider"] == "aws" - assert logstash_message[2]["cloud"]["region"] == "us-east-1" - assert logstash_message[2]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[2]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - def test_root_fields_to_add_to_expanded_event_event_not_expanded(self) -> None: - assert isinstance(self.logstash, LogstashContainer) - assert isinstance(self.localstack, LocalStackContainer) - - first_expanded_event: str = _load_file_fixture("cloudwatch-log-1.json") - first_expanded_with_root_fields: dict[str, Any] = json_parser(first_expanded_event) - first_expanded_with_root_fields["secondRootField"] = "secondRootField" - - second_expanded_event: str = _load_file_fixture("cloudwatch-log-3.json") - second_expanded_with_root_fields: dict[str, Any] = json_parser(second_expanded_event) - second_expanded_with_root_fields["secondRootField"] = "secondRootField" - - fixtures = [ - f"""{{"firstRootField": "firstRootField", "secondRootField":"secondRootField", - "aField": [{first_expanded_event},{{}},{second_expanded_event}]}}""" - ] - - sqs_queue_name = _time_based_id(suffix="source-sqs") - - sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) - - sqs_queue_arn = sqs_queue["QueueArn"] - sqs_queue_url = sqs_queue["QueueUrl"] - sqs_queue_url_path = sqs_queue["QueueUrlPath"] - - _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) - - config_yaml: str = f""" - inputs: - - type: "sqs" - id: "{sqs_queue_arn}" - expand_event_list_from_field: aField - root_fields_to_add_to_expanded_event: ["secondRootField"] - tags: {self.default_tags} - outputs: - - type: "logstash" - args: - logstash_url: "{self.logstash.get_url()}" - ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} - username: "{self.logstash.logstash_user}" - password: "{self.logstash.logstash_password}" - """ - - config_file_path = "config.yaml" - config_bucket_name = _time_based_id(suffix="config-bucket") - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - ) - - os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - - events_sqs, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) - - message_id = events_sqs["Records"][0]["messageId"] - - ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) - - first_call = handler(events_sqs, ctx) # type:ignore - - assert first_call == "completed" - - logstash_message = self.logstash.get_messages(expected=2) - assert len(logstash_message) == 2 - - assert logstash_message[0]["message"] == json_dumper(first_expanded_with_root_fields) - assert logstash_message[0]["log"]["offset"] == 0 - assert logstash_message[0]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[0]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[0]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[0]["cloud"]["provider"] == "aws" - assert logstash_message[0]["cloud"]["region"] == "us-east-1" - assert logstash_message[0]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[0]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - assert logstash_message[1]["message"] == json_dumper(second_expanded_with_root_fields) - assert logstash_message[1]["log"]["offset"] == 180 - assert logstash_message[1]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[1]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[1]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[1]["cloud"]["provider"] == "aws" - assert logstash_message[1]["cloud"]["region"] == "us-east-1" - assert logstash_message[1]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[1]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - def test_root_fields_to_add_to_expanded_event_list(self) -> None: - assert isinstance(self.logstash, LogstashContainer) - assert isinstance(self.localstack, LocalStackContainer) - - first_expanded_event: str = _load_file_fixture("cloudwatch-log-1.json") - first_expanded_with_root_fields: dict[str, Any] = json_parser(first_expanded_event) - first_expanded_with_root_fields["secondRootField"] = "secondRootField" - - second_expanded_event: str = _load_file_fixture("cloudwatch-log-3.json") - second_expanded_with_root_fields: dict[str, Any] = json_parser(second_expanded_event) - second_expanded_with_root_fields["secondRootField"] = "secondRootField" - - third_expanded_event: str = _load_file_fixture("cloudwatch-log-3.json") - third_expanded_event_with_root_fields: dict[str, Any] = json_parser(third_expanded_event) - third_expanded_event_with_root_fields["secondRootField"] = "secondRootField" - - fixtures = [ - f"""{{"firstRootField": "firstRootField", "secondRootField":"secondRootField", - "aField": [{first_expanded_event},{second_expanded_event},{third_expanded_event}]}}""" - ] - - sqs_queue_name = _time_based_id(suffix="source-sqs") - - sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) - - sqs_queue_arn = sqs_queue["QueueArn"] - sqs_queue_url = sqs_queue["QueueUrl"] - sqs_queue_url_path = sqs_queue["QueueUrlPath"] - - _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) - - config_yaml: str = f""" - inputs: - - type: "sqs" - id: "{sqs_queue_arn}" - expand_event_list_from_field: aField - root_fields_to_add_to_expanded_event: ["secondRootField"] - tags: {self.default_tags} - outputs: - - type: "logstash" - args: - logstash_url: "{self.logstash.get_url()}" - ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} - username: "{self.logstash.logstash_user}" - password: "{self.logstash.logstash_password}" - """ - - config_file_path = "config.yaml" - config_bucket_name = _time_based_id(suffix="config-bucket") - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - ) - - os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - - events_sqs, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) - - message_id = events_sqs["Records"][0]["messageId"] - - ctx = ContextMock() - first_call = handler(events_sqs, ctx) # type:ignore - - assert first_call == "continuing" - - logstash_message = self.logstash.get_messages(expected=1) - assert len(logstash_message) == 1 - - assert logstash_message[0]["message"] == json_dumper(first_expanded_with_root_fields) - assert logstash_message[0]["log"]["offset"] == 0 - assert logstash_message[0]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[0]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[0]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[0]["cloud"]["provider"] == "aws" - assert logstash_message[0]["cloud"]["region"] == "us-east-1" - assert logstash_message[0]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[0]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) - - continued_events, _ = _sqs_get_messages( - self.sqs_client, os.environ["SQS_CONTINUE_URL"], self.sqs_continue_queue_arn - ) - second_call = handler(continued_events, ctx) # type:ignore - - assert second_call == "completed" - - logstash_message = self.logstash.get_messages(expected=3) - assert len(logstash_message) == 3 - - assert logstash_message[1]["message"] == json_dumper(second_expanded_with_root_fields) - assert logstash_message[1]["log"]["offset"] == 114 - assert logstash_message[1]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[1]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[1]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[1]["cloud"]["provider"] == "aws" - assert logstash_message[1]["cloud"]["region"] == "us-east-1" - assert logstash_message[1]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[1]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - assert logstash_message[2]["message"] == json_dumper(third_expanded_event_with_root_fields) - assert logstash_message[2]["log"]["offset"] == 228 - assert logstash_message[2]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[2]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[2]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[2]["cloud"]["provider"] == "aws" - assert logstash_message[2]["cloud"]["region"] == "us-east-1" - assert logstash_message[2]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[2]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - def test_root_fields_to_add_to_expanded_event_list_no_fields_in_root(self) -> None: - assert isinstance(self.logstash, LogstashContainer) - assert isinstance(self.localstack, LocalStackContainer) - - first_expanded_event: str = _load_file_fixture("cloudwatch-log-1.json") - first_expanded_with_root_fields: dict[str, Any] = json_parser(first_expanded_event) - first_expanded_with_root_fields["secondRootField"] = "secondRootField" - - second_expanded_event: str = _load_file_fixture("cloudwatch-log-3.json") - second_expanded_with_root_fields: dict[str, Any] = json_parser(second_expanded_event) - second_expanded_with_root_fields["secondRootField"] = "secondRootField" - - third_expanded_event: str = _load_file_fixture("cloudwatch-log-3.json") - third_expanded_event_with_root_fields: dict[str, Any] = json_parser(third_expanded_event) - third_expanded_event_with_root_fields["secondRootField"] = "secondRootField" - - fixtures = [ - f"""{{"firstRootField": "firstRootField", "secondRootField":"secondRootField", - "aField": [{first_expanded_event},{second_expanded_event},{third_expanded_event}]}}""" - ] - - sqs_queue_name = _time_based_id(suffix="source-sqs") - - sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) - - sqs_queue_arn = sqs_queue["QueueArn"] - sqs_queue_url = sqs_queue["QueueUrl"] - sqs_queue_url_path = sqs_queue["QueueUrlPath"] - - _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) - - config_yaml: str = f""" - inputs: - - type: "sqs" - id: "{sqs_queue_arn}" - expand_event_list_from_field: aField - root_fields_to_add_to_expanded_event: ["secondRootField", "thirdRootField"] - tags: {self.default_tags} - outputs: - - type: "logstash" - args: - logstash_url: "{self.logstash.get_url()}" - ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} - username: "{self.logstash.logstash_user}" - password: "{self.logstash.logstash_password}" - """ - - config_file_path = "config.yaml" - config_bucket_name = _time_based_id(suffix="config-bucket") - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - ) - - os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - - events_sqs, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) - - message_id = events_sqs["Records"][0]["messageId"] - - ctx = ContextMock() - first_call = handler(events_sqs, ctx) # type:ignore - - assert first_call == "continuing" - - logstash_message = self.logstash.get_messages(expected=1) - assert len(logstash_message) == 1 - - assert logstash_message[0]["message"] == json_dumper(first_expanded_with_root_fields) - assert logstash_message[0]["log"]["offset"] == 0 - assert logstash_message[0]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[0]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[0]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[0]["cloud"]["provider"] == "aws" - assert logstash_message[0]["cloud"]["region"] == "us-east-1" - assert logstash_message[0]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[0]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) - - continued_events, _ = _sqs_get_messages( - self.sqs_client, os.environ["SQS_CONTINUE_URL"], self.sqs_continue_queue_arn - ) - second_call = handler(continued_events, ctx) # type:ignore - - assert second_call == "completed" - - logstash_message = self.logstash.get_messages(expected=3) - assert len(logstash_message) == 3 - - assert logstash_message[1]["message"] == json_dumper(second_expanded_with_root_fields) - assert logstash_message[1]["log"]["offset"] == 114 - assert logstash_message[1]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[1]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[1]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[1]["cloud"]["provider"] == "aws" - assert logstash_message[1]["cloud"]["region"] == "us-east-1" - assert logstash_message[1]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[1]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - assert logstash_message[2]["message"] == json_dumper(third_expanded_event_with_root_fields) - assert logstash_message[2]["log"]["offset"] == 228 - assert logstash_message[2]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[2]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[2]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[2]["cloud"]["provider"] == "aws" - assert logstash_message[2]["cloud"]["region"] == "us-east-1" - assert logstash_message[2]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[2]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - def test_root_fields_to_add_to_expanded_event_all(self) -> None: - assert isinstance(self.logstash, LogstashContainer) - assert isinstance(self.localstack, LocalStackContainer) - - first_expanded_event: str = _load_file_fixture("cloudwatch-log-1.json") - first_expanded_with_root_fields: dict[str, Any] = json_parser(first_expanded_event) - first_expanded_with_root_fields["firstRootField"] = "firstRootField" - first_expanded_with_root_fields["secondRootField"] = "secondRootField" - - second_expanded_event: str = _load_file_fixture("cloudwatch-log-3.json") - second_expanded_with_root_fields: dict[str, Any] = json_parser(second_expanded_event) - second_expanded_with_root_fields["firstRootField"] = "firstRootField" - second_expanded_with_root_fields["secondRootField"] = "secondRootField" - - third_expanded_event: str = _load_file_fixture("cloudwatch-log-3.json") - third_expanded_event_with_root_fields: dict[str, Any] = json_parser(third_expanded_event) - third_expanded_event_with_root_fields["firstRootField"] = "firstRootField" - third_expanded_event_with_root_fields["secondRootField"] = "secondRootField" - - fixtures = [ - f"""{{"firstRootField": "firstRootField", "secondRootField":"secondRootField", - "aField": [{first_expanded_event},{second_expanded_event},{third_expanded_event}]}}""" - ] - - sqs_queue_name = _time_based_id(suffix="source-sqs") - - sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) - - sqs_queue_arn = sqs_queue["QueueArn"] - sqs_queue_url = sqs_queue["QueueUrl"] - sqs_queue_url_path = sqs_queue["QueueUrlPath"] - - _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) - - config_yaml: str = f""" - inputs: - - type: "sqs" - id: "{sqs_queue_arn}" - expand_event_list_from_field: aField - root_fields_to_add_to_expanded_event: all - tags: {self.default_tags} - outputs: - - type: "logstash" - args: - logstash_url: "{self.logstash.get_url()}" - ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} - username: "{self.logstash.logstash_user}" - password: "{self.logstash.logstash_password}" - """ - - config_file_path = "config.yaml" - config_bucket_name = _time_based_id(suffix="config-bucket") - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - ) - - os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - - events_sqs, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) - - message_id = events_sqs["Records"][0]["messageId"] - - ctx = ContextMock() - first_call = handler(events_sqs, ctx) # type:ignore - - assert first_call == "continuing" - - logstash_message = self.logstash.get_messages(expected=1) - assert len(logstash_message) == 1 - - assert logstash_message[0]["message"] == json_dumper(first_expanded_with_root_fields) - assert logstash_message[0]["log"]["offset"] == 0 - assert logstash_message[0]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[0]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[0]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[0]["cloud"]["provider"] == "aws" - assert logstash_message[0]["cloud"]["region"] == "us-east-1" - assert logstash_message[0]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[0]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) - - continued_events, _ = _sqs_get_messages( - self.sqs_client, os.environ["SQS_CONTINUE_URL"], self.sqs_continue_queue_arn - ) - second_call = handler(continued_events, ctx) # type:ignore - - assert second_call == "completed" - - logstash_message = self.logstash.get_messages(expected=3) - assert len(logstash_message) == 3 - - assert logstash_message[1]["message"] == json_dumper(second_expanded_with_root_fields) - assert logstash_message[1]["log"]["offset"] == 114 - assert logstash_message[1]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[1]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[1]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[1]["cloud"]["provider"] == "aws" - assert logstash_message[1]["cloud"]["region"] == "us-east-1" - assert logstash_message[1]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[1]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] - - assert logstash_message[2]["message"] == json_dumper(third_expanded_event_with_root_fields) - assert logstash_message[2]["log"]["offset"] == 228 - assert logstash_message[2]["log"]["file"]["path"] == sqs_queue_url_path - assert logstash_message[2]["aws"]["sqs"]["name"] == sqs_queue_name - assert logstash_message[2]["aws"]["sqs"]["message_id"] == message_id - assert logstash_message[2]["cloud"]["provider"] == "aws" - assert logstash_message[2]["cloud"]["region"] == "us-east-1" - assert logstash_message[2]["cloud"]["account"]["id"] == "000000000000" - assert logstash_message[2]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] -