From 45e6c3586edd0b0513b79f355a26bc18cb00ed9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Constan=C3=A7a=20Manteigas?= <113898685+constanca-m@users.noreply.github.com> Date: Tue, 14 May 2024 14:12:42 +0200 Subject: [PATCH] Place messages from misconfigured IDs in the replay queue (#711) * Fix misconfigured id Signed-off-by: constanca * Fix lint errors Signed-off-by: constanca * Fix lint errors Signed-off-by: constanca * Fix coverage tests Signed-off-by: constanca * Update CHANGELOG.md Signed-off-by: constanca * Fix mismatched region for queues in test_handler Signed-off-by: constanca * Add architecture note Signed-off-by: constanca * Add architecture note Signed-off-by: constanca --------- Signed-off-by: constanca --- CHANGELOG.md | 4 + handlers/aws/cloudwatch_logs_trigger.py | 47 +++--- handlers/aws/handler.py | 133 ++++++++++++---- handlers/aws/kinesis_trigger.py | 58 ++++--- handlers/aws/s3_sqs_trigger.py | 59 ++++--- handlers/aws/sqs_trigger.py | 57 ++++--- handlers/aws/utils.py | 5 +- how-to-test-locally/.env | 11 ++ how-to-test-locally/README.md | 69 ++++++++ how-to-test-locally/Taskfile.yaml | 49 ++++++ share/version.py | 2 +- tests/handlers/aws/test_handler.py | 16 +- tests/handlers/aws/test_integrations.py | 200 ++++++++++++++++++++++++ 13 files changed, 582 insertions(+), 128 deletions(-) create mode 100644 how-to-test-locally/.env create mode 100644 how-to-test-locally/README.md create mode 100644 how-to-test-locally/Taskfile.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f3ca873..02c81b06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +### v1.14.0 - 2024/05/07 +##### Bug fixes +* Report misconfigured input ids as an error instead of warning, and place those messages in the replaying queue [#711](https://github.com/elastic/elastic-serverless-forwarder/pull/711). + ### v1.13.1 - 2024/03/07 ##### Features * Add documentation and optimise performance for `root_fields_to_add_to_expanded_event` [#642](https://github.com/elastic/elastic-serverless-forwarder/pull/642) diff --git a/handlers/aws/cloudwatch_logs_trigger.py b/handlers/aws/cloudwatch_logs_trigger.py index b90eb0b1..129978cb 100644 --- a/handlers/aws/cloudwatch_logs_trigger.py +++ b/handlers/aws/cloudwatch_logs_trigger.py @@ -22,15 +22,16 @@ def _from_awslogs_data_to_event(awslogs_data: str) -> Any: return json_parser(cloudwatch_logs_payload_plain) -def _handle_cloudwatch_logs_continuation( +def _handle_cloudwatch_logs_move( sqs_client: BotoBaseClient, - sqs_continuing_queue: str, - last_ending_offset: Optional[int], - last_event_expanded_offset: Optional[int], + sqs_destination_queue: str, cloudwatch_logs_event: dict[str, Any], - current_log_event: int, - event_input_id: str, + input_id: str, config_yaml: str, + continuing_queue: bool = True, + current_log_event: int = 0, + last_ending_offset: Optional[int] = None, + last_event_expanded_offset: Optional[int] = None, ) -> None: """ Handler of the continuation queue for cloudwatch logs inputs @@ -51,7 +52,7 @@ def _handle_cloudwatch_logs_continuation( message_attributes = { "config": {"StringValue": config_yaml, "DataType": "String"}, "originalEventId": {"StringValue": log_event["id"], "DataType": "String"}, - "originalEventSourceARN": {"StringValue": event_input_id, "DataType": "String"}, + "originalEventSourceARN": {"StringValue": input_id, "DataType": "String"}, "originalLogGroup": {"StringValue": log_group_name, "DataType": "String"}, "originalLogStream": {"StringValue": log_stream_name, "DataType": "String"}, "originalEventTimestamp": {"StringValue": str(log_event["timestamp"]), "DataType": "Number"}, @@ -70,21 +71,31 @@ def _handle_cloudwatch_logs_continuation( } sqs_client.send_message( - QueueUrl=sqs_continuing_queue, + QueueUrl=sqs_destination_queue, MessageBody=log_event["message"], MessageAttributes=message_attributes, ) - shared_logger.debug( - "continuing", - extra={ - "sqs_continuing_queue": sqs_continuing_queue, - "last_ending_offset": last_ending_offset, - "last_event_expanded_offset": last_event_expanded_offset, - "event_id": log_event["id"], - "event_timestamp": log_event["timestamp"], - }, - ) + if continuing_queue: + shared_logger.debug( + "continuing", + extra={ + "sqs_continuing_queue": sqs_destination_queue, + "last_ending_offset": last_ending_offset, + "last_event_expanded_offset": last_event_expanded_offset, + "event_id": log_event["id"], + "event_timestamp": log_event["timestamp"], + }, + ) + else: + shared_logger.debug( + "replaying", + extra={ + "sqs_replaying_queue": sqs_destination_queue, + "event_id": log_event["id"], + "event_timestamp": log_event["timestamp"], + }, + ) def _handle_cloudwatch_logs_event( diff --git a/handlers/aws/handler.py b/handlers/aws/handler.py index 626fc8ad..913a6469 100644 --- a/handlers/aws/handler.py +++ b/handlers/aws/handler.py @@ -13,13 +13,13 @@ from .cloudwatch_logs_trigger import ( _from_awslogs_data_to_event, - _handle_cloudwatch_logs_continuation, _handle_cloudwatch_logs_event, + _handle_cloudwatch_logs_move, ) -from .kinesis_trigger import _handle_kinesis_continuation, _handle_kinesis_record +from .kinesis_trigger import _handle_kinesis_move, _handle_kinesis_record from .replay_trigger import ReplayedEventReplayHandler, get_shipper_for_replay_event -from .s3_sqs_trigger import _handle_s3_sqs_continuation, _handle_s3_sqs_event -from .sqs_trigger import _handle_sqs_continuation, _handle_sqs_event +from .s3_sqs_trigger import _handle_s3_sqs_event, _handle_s3_sqs_move +from .sqs_trigger import _handle_sqs_event, handle_sqs_move from .utils import ( CONFIG_FROM_PAYLOAD, INTEGRATION_SCOPE_GENERIC, @@ -130,6 +130,10 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex sent_events: int = 0 empty_events: int = 0 skipped_events: int = 0 + error_events: int = 0 + + sqs_replaying_queue = os.environ["SQS_REPLAY_URL"] + sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"] if trigger_type == "cloudwatch-logs": cloudwatch_logs_event = _from_awslogs_data_to_event(lambda_event["awslogs"]["data"]) @@ -144,8 +148,25 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex ) if event_input is None: - shared_logger.warning("no input defined", extra={"input_type": trigger_type, "input_id": input_id}) - + shared_logger.error("no input defined", extra={"input_id": input_id}) + error_events += 1 + _handle_cloudwatch_logs_move( + sqs_client=sqs_client, + sqs_destination_queue=sqs_replaying_queue, + cloudwatch_logs_event=cloudwatch_logs_event, + input_id=input_id, + config_yaml=config_yaml, + continuing_queue=False, + ) + shared_logger.info( + "lambda is going to shutdown", + extra={ + "error_events": error_events, + "sent_events": sent_events, + "empty_events": empty_events, + "skipped_events": skipped_events, + }, + ) return "completed" aws_region = input_id.split(":")[3] @@ -180,8 +201,6 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex empty_events += 1 if lambda_context is not None and lambda_context.get_remaining_time_in_millis() < _completion_grace_period: - sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"] - shared_logger.info( "lambda is going to shutdown, continuing on dedicated sqs queue", extra={ @@ -194,14 +213,14 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex composite_shipper.flush() - _handle_cloudwatch_logs_continuation( + _handle_cloudwatch_logs_move( sqs_client=sqs_client, - sqs_continuing_queue=sqs_continuing_queue, + sqs_destination_queue=sqs_continuing_queue, last_ending_offset=last_ending_offset, last_event_expanded_offset=last_event_expanded_offset, cloudwatch_logs_event=cloudwatch_logs_event, current_log_event=current_log_event_n, - event_input_id=input_id, + input_id=input_id, config_yaml=config_yaml, ) @@ -210,7 +229,12 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex composite_shipper.flush() shared_logger.info( "lambda processed all the events", - extra={"sent_event": sent_events, "empty_events": empty_events, "skipped_events": skipped_events}, + extra={ + "sent_events": sent_events, + "empty_events": empty_events, + "skipped_events": skipped_events, + "error_events": error_events, + }, ) if trigger_type == "kinesis-data-stream": @@ -218,9 +242,30 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex input_id = lambda_event["Records"][0]["eventSourceARN"] event_input = config.get_input_by_id(input_id) + if event_input is None: - shared_logger.warning("no input defined", extra={"input_id": input_id}) + shared_logger.error("no input defined", extra={"input_id": input_id}) + error_events += len(lambda_event["Records"]) + + for kinesis_record in lambda_event["Records"]: + _handle_kinesis_move( + sqs_client=sqs_client, + sqs_destination_queue=sqs_replaying_queue, + kinesis_record=kinesis_record, + event_input_id=input_id, + config_yaml=config_yaml, + continuing_queue=False, + ) + shared_logger.info( + "lambda is going to shutdown", + extra={ + "sent_events": sent_events, + "empty_events": empty_events, + "skipped_events": skipped_events, + "error_events": error_events, + }, + ) return "completed" composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml) @@ -253,8 +298,6 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex empty_events += 1 if lambda_context is not None and lambda_context.get_remaining_time_in_millis() < _completion_grace_period: - sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"] - shared_logger.info( "lambda is going to shutdown, continuing on dedicated sqs queue", extra={ @@ -262,6 +305,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex "sent_events": sent_events, "empty_events": empty_events, "skipped_events": skipped_events, + "error_events": error_events, }, ) @@ -275,9 +319,9 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex continuing_last_ending_offset = None continuing_last_event_expanded_offset = None - _handle_kinesis_continuation( + _handle_kinesis_move( sqs_client=sqs_client, - sqs_continuing_queue=sqs_continuing_queue, + sqs_destination_queue=sqs_continuing_queue, last_ending_offset=continuing_last_ending_offset, last_event_expanded_offset=continuing_last_event_expanded_offset, kinesis_record=kinesis_record, @@ -290,7 +334,12 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex composite_shipper.flush() shared_logger.info( "lambda processed all the events", - extra={"sent_event": sent_events, "empty_events": empty_events, "skipped_events": skipped_events}, + extra={ + "sent_events": sent_events, + "empty_events": empty_events, + "skipped_events": skipped_events, + "error_events": error_events, + }, ) if trigger_type == "s3-sqs" or trigger_type == "sqs": @@ -318,12 +367,10 @@ def handle_timeout( timeout_config_yaml: str, timeout_current_s3_record: int = 0, ) -> None: - timeout_sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"] - shared_logger.info( "lambda is going to shutdown, continuing on dedicated sqs queue", extra={ - "sqs_queue": timeout_sqs_continuing_queue, + "sqs_queue": sqs_continuing_queue, "sent_events": timeout_sent_events, "empty_events": timeout_empty_events, "skipped_events": timeout_skipped_events, @@ -349,24 +396,24 @@ def handle_timeout( continue if timeout_input.type == "s3-sqs": - _handle_s3_sqs_continuation( + _handle_s3_sqs_move( sqs_client=sqs_client, - sqs_continuing_queue=timeout_sqs_continuing_queue, + sqs_destination_queue=sqs_continuing_queue, last_ending_offset=timeout_last_ending_offset, last_event_expanded_offset=timeout_last_event_expanded_offset, sqs_record=timeout_sqs_record, current_s3_record=timeout_current_s3_record, - event_input_id=timeout_input_id, + input_id=timeout_input_id, config_yaml=timeout_config_yaml, ) else: - _handle_sqs_continuation( + handle_sqs_move( sqs_client=sqs_client, - sqs_continuing_queue=timeout_sqs_continuing_queue, + sqs_destination_queue=sqs_continuing_queue, last_ending_offset=timeout_last_ending_offset, last_event_expanded_offset=timeout_last_event_expanded_offset, sqs_record=timeout_sqs_record, - event_input_id=timeout_input_id, + input_id=timeout_input_id, config_yaml=timeout_config_yaml, ) @@ -382,15 +429,36 @@ def handle_timeout( input_id = sqs_record["messageAttributes"]["originalEventSourceARN"]["stringValue"] event_input = config.get_input_by_id(input_id) + if event_input is None: - shared_logger.warning("no input defined", extra={"input_id": input_id}) + # This could happen if aws_lambda_event_source_mapping is set correctly, but + # the id on the config.yaml was writen incorrectly. + shared_logger.error("no input defined", extra={"input_id": input_id}) + if trigger_type == "s3-sqs": + _handle_s3_sqs_move( + sqs_client=sqs_client, + sqs_destination_queue=sqs_replaying_queue, + sqs_record=sqs_record, + input_id=input_id, + config_yaml=config_yaml, + continuing_queue=False, + ) + elif trigger_type == "sqs": + handle_sqs_move( + sqs_client=sqs_client, + sqs_destination_queue=sqs_replaying_queue, + sqs_record=sqs_record, + input_id=input_id, + config_yaml=config_yaml, + continuing_queue=False, + ) + error_events += 1 continue if input_id in composite_shipper_cache: composite_shipper = composite_shipper_cache[input_id] else: composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml) - composite_shipper_cache[event_input.id] = composite_shipper continuing_event_expanded_offset: Optional[int] = None @@ -493,7 +561,12 @@ def handle_timeout( shared_logger.info( "lambda processed all the events", - extra={"sent_events": sent_events, "empty_events": empty_events, "skipped_events": skipped_events}, + extra={ + "sent_events": sent_events, + "empty_events": empty_events, + "skipped_events": skipped_events, + "error_events": error_events, + }, ) return "completed" diff --git a/handlers/aws/kinesis_trigger.py b/handlers/aws/kinesis_trigger.py index f9627dd7..dac715ec 100644 --- a/handlers/aws/kinesis_trigger.py +++ b/handlers/aws/kinesis_trigger.py @@ -13,21 +13,26 @@ from .utils import get_account_id_from_arn, get_kinesis_stream_name_type_and_region_from_arn -def _handle_kinesis_continuation( +def _handle_kinesis_move( sqs_client: BotoBaseClient, - sqs_continuing_queue: str, - last_ending_offset: Optional[int], - last_event_expanded_offset: Optional[int], + sqs_destination_queue: str, kinesis_record: dict[str, Any], event_input_id: str, config_yaml: str, + continuing_queue: bool = True, + last_ending_offset: Optional[int] = None, + last_event_expanded_offset: Optional[int] = None, ) -> None: """ - Handler of the continuation queue for kinesis data stream inputs - If a kinesis data stream records batch cannot be fully processed before the - timeout of the lambda this handler will be called: it will - send new sqs messages for the unprocessed records in the batch to the - internal continuing sqs queue + Handler of the continuation/replay queue for kinesis data stream inputs. + If a kinesis data stream records batch cannot be fully processed before the timeout of the lambda, the handler will + be called for the continuation queue: it will send new sqs messages for the unprocessed records to the + internal continuing sqs queue. + If a sqs message has an eventSourceARN not present in the config.yaml ids, then the handler should be called, + so it can get placed in the internal replay queue. + + :param continuing_queue: should be set to true if the sqs message is going to be placed in the continuing + queue. Otherwise, we assume it will be placed in the replaying queue, and, in that case, it should be set to false. """ sequence_number = kinesis_record["kinesis"]["sequenceNumber"] @@ -60,22 +65,33 @@ def _handle_kinesis_continuation( kinesis_data: str = kinesis_record["kinesis"]["data"] sqs_client.send_message( - QueueUrl=sqs_continuing_queue, + QueueUrl=sqs_destination_queue, MessageBody=kinesis_data, MessageAttributes=message_attributes, ) - shared_logger.debug( - "continuing", - extra={ - "sqs_continuing_queue": sqs_continuing_queue, - "last_ending_offset": last_ending_offset, - "last_event_expanded_offset": last_event_expanded_offset, - "partition_key": partition_key, - "approximate_arrival_timestamp": approximate_arrival_timestamp, - "sequence_number": sequence_number, - }, - ) + if continuing_queue: + shared_logger.debug( + "continuing", + extra={ + "sqs_continuing_queue": sqs_destination_queue, + "last_ending_offset": last_ending_offset, + "last_event_expanded_offset": last_event_expanded_offset, + "partition_key": partition_key, + "approximate_arrival_timestamp": approximate_arrival_timestamp, + "sequence_number": sequence_number, + }, + ) + else: + shared_logger.debug( + "replaying", + extra={ + "sqs_replaying_queue": sqs_destination_queue, + "partition_key": partition_key, + "approximate_arrival_timestamp": approximate_arrival_timestamp, + "sequence_number": sequence_number, + }, + ) def _handle_kinesis_record( diff --git a/handlers/aws/s3_sqs_trigger.py b/handlers/aws/s3_sqs_trigger.py index c981efe1..c23c4296 100644 --- a/handlers/aws/s3_sqs_trigger.py +++ b/handlers/aws/s3_sqs_trigger.py @@ -20,22 +20,27 @@ ) -def _handle_s3_sqs_continuation( +def _handle_s3_sqs_move( sqs_client: BotoBaseClient, - sqs_continuing_queue: str, - last_ending_offset: Optional[int], - last_event_expanded_offset: Optional[int], + sqs_destination_queue: str, sqs_record: dict[str, Any], - current_s3_record: int, - event_input_id: str, + input_id: str, config_yaml: str, + current_s3_record: int = 0, + continuing_queue: bool = True, + last_ending_offset: Optional[int] = None, + last_event_expanded_offset: Optional[int] = None, ) -> None: """ - Handler of the continuation queue for s3-sqs inputs - If a sqs message cannot be fully processed before the - timeout of the lambda this handler will be called: it will - send new sqs messages for the unprocessed records to the - internal continuing sqs queue + Handler of the continuation/replay queue for s3-sqs inputs. + If a sqs message cannot be fully processed before the timeout of the lambda, the handler will be called + for the continuation queue: it will send new sqs messages for the unprocessed records to the + internal continuing sqs queue. + If a sqs message has an eventSourceARN not present in the config.yaml ids, then the handler should be called, + so it can get placed in the internal replay queue. + + :param continuing_queue: should be set to true if the sqs message is going to be placed in the continuing + queue. Otherwise, we assume it will be placed in the replaying queue, and, in that case, it should be set to false. """ body = json_parser(sqs_record["body"]) @@ -51,23 +56,33 @@ def _handle_s3_sqs_continuation( sqs_record["body"] = json_dumper(body) sqs_client.send_message( - QueueUrl=sqs_continuing_queue, + QueueUrl=sqs_destination_queue, MessageBody=sqs_record["body"], MessageAttributes={ "config": {"StringValue": config_yaml, "DataType": "String"}, - "originalEventSourceARN": {"StringValue": event_input_id, "DataType": "String"}, + "originalEventSourceARN": {"StringValue": input_id, "DataType": "String"}, }, ) - shared_logger.debug( - "continuing", - extra={ - "sqs_continuing_queue": sqs_continuing_queue, - "last_ending_offset": last_ending_offset, - "last_event_expanded_offset": last_event_expanded_offset, - "current_s3_record": current_s3_record, - }, - ) + if continuing_queue: + shared_logger.debug( + "continuing", + extra={ + "sqs_continuing_queue": sqs_destination_queue, + "last_ending_offset": last_ending_offset, + "last_event_expanded_offset": last_event_expanded_offset, + "current_s3_record": current_s3_record, + }, + ) + else: + shared_logger.debug( + "replaying", + extra={ + "sqs_replaying_queue": sqs_destination_queue, + "input_id": input_id, + "message_id": sqs_record["messageId"], + }, + ) def _handle_s3_sqs_event( diff --git a/handlers/aws/sqs_trigger.py b/handlers/aws/sqs_trigger.py index c8fbb01f..fdb4faf9 100644 --- a/handlers/aws/sqs_trigger.py +++ b/handlers/aws/sqs_trigger.py @@ -13,21 +13,26 @@ from .utils import get_account_id_from_arn, get_queue_url_from_sqs_arn, get_sqs_queue_name_and_region_from_arn -def _handle_sqs_continuation( +def handle_sqs_move( sqs_client: BotoBaseClient, - sqs_continuing_queue: str, - last_ending_offset: Optional[int], - last_event_expanded_offset: Optional[int], + sqs_destination_queue: str, sqs_record: dict[str, Any], - event_input_id: str, + input_id: str, config_yaml: str, + continuing_queue: bool = True, + last_ending_offset: Optional[int] = None, + last_event_expanded_offset: Optional[int] = None, ) -> None: """ - Handler of the continuation queue for sqs inputs - If a sqs message cannot be fully processed before the - timeout of the lambda this handler will be called: it will - send new sqs messages for the unprocessed records to the - internal continuing sqs queue + Handler of the continuation/replay queue for sqs inputs. + If a sqs message cannot be fully processed before the timeout of the lambda, the handler will be called + for the continuation queue: it will send new sqs messages for the unprocessed records to the + internal continuing sqs queue. + If a sqs message has an eventSourceARN not present in the config.yaml ids, then the handler should be called, + so it can get placed in the internal replay queue. + + :param continuing_queue: should be set to true if the sqs message is going to be placed in the continuing + queue. Otherwise, we assume it will be placed in the replaying queue, and, in that case, it should be set to false. """ message_attributes = {} @@ -48,7 +53,7 @@ def _handle_sqs_continuation( "StringValue": str(sqs_record["attributes"]["SentTimestamp"]), "DataType": "Number", }, - "originalEventSourceARN": {"StringValue": event_input_id, "DataType": "String"}, + "originalEventSourceARN": {"StringValue": input_id, "DataType": "String"}, } if last_ending_offset is not None: @@ -61,20 +66,30 @@ def _handle_sqs_continuation( } sqs_client.send_message( - QueueUrl=sqs_continuing_queue, + QueueUrl=sqs_destination_queue, MessageBody=sqs_record["body"], MessageAttributes=message_attributes, ) - shared_logger.debug( - "continuing", - extra={ - "sqs_continuing_queue": sqs_continuing_queue, - "last_ending_offset": last_ending_offset, - "last_event_expanded_offset": last_event_expanded_offset, - "message_id": sqs_record["messageId"], - }, - ) + if continuing_queue: + shared_logger.debug( + "continuing", + extra={ + "sqs_continuing_queue": sqs_destination_queue, + "last_ending_offset": last_ending_offset, + "last_event_expanded_offset": last_event_expanded_offset, + "message_id": sqs_record["messageId"], + }, + ) + else: + shared_logger.debug( + "replaying", + extra={ + "sqs_replaying_queue": sqs_destination_queue, + "input_id": input_id, + "message_id": sqs_record["messageId"], + }, + ) def _handle_sqs_event( diff --git a/handlers/aws/utils.py b/handlers/aws/utils.py index 4f0cf9b7..f952ef53 100644 --- a/handlers/aws/utils.py +++ b/handlers/aws/utils.py @@ -397,6 +397,9 @@ def get_input_from_log_group_subscription_data( all_regions = get_ec2_client().describe_regions(AllRegions=True) assert "Regions" in all_regions for region_data in all_regions["Regions"]: + + # arn:aws:logs:region:account-id:log-group:log_group_name:* + region = region_data["RegionName"] aws_or_gov = "aws" @@ -418,7 +421,7 @@ def get_input_from_log_group_subscription_data( if event_input is not None: return log_group_arn, event_input - return "", None + return f"arn:aws:logs:%AWS_REGION%:{account_id}:log-group:{log_group_name}:*", None def delete_sqs_record(sqs_arn: str, receipt_handle: str) -> None: diff --git a/how-to-test-locally/.env b/how-to-test-locally/.env new file mode 100644 index 00000000..f9ab295d --- /dev/null +++ b/how-to-test-locally/.env @@ -0,0 +1,11 @@ +# List of requirement files. +# Split them with , and without space, like this: example1.txt,example2.txt +REQUIREMENTS=requirements.txt + +# List of python files/directories to add to the zip file. +# Split them with , and without space, like this: example1.txt,example2.txt +DEPENDENCIES=main_aws.py,handlers,share,storage,shippers + +# Zip filename +FILENAME=local_esf.zip + diff --git a/how-to-test-locally/README.md b/how-to-test-locally/README.md new file mode 100644 index 00000000..f693886d --- /dev/null +++ b/how-to-test-locally/README.md @@ -0,0 +1,69 @@ +This is just an example of how to build and run ESF locally. + +## Requirements + +- [Terraform](https://www.terraform.io/) +- (Optional) [Taskfile](https://taskfile.dev/installation/) + + +## Steps + +**Important note**: ESF dependencies have been tested on architecture `x86_64`. Make sure to use it as well. + +### Step 1: Build your dependencies zip file + +You can build your own, or you can choose to run: +```bash +task +``` +To build it automatically. + +You can update the task variables in the `.env` file: +- The list of python dependencies, `DEPENDENCIES`. +- The list of python requirement files, `REQUIREMENTS`. +- The name of the zip file, `FILENAME`. + + +### Step 2: Run ESF terraform + +Use the code in [ESF terraform repository](https://github.com/elastic/terraform-elastic-esf). + +> **NOTE**: ESF lambda function is using architecture `x86_64`. + + +Place your `local_esf.zip` (or `` if you changed the value) in the same directory as ESF terraform. + +Go to `esf.tf` file and edit: + +```terraform +locals { + ... + dependencies-file = "local_esf.zip" # value of FILENAME in .env + ... +} +``` + +Remove/comment these lines from `esf.tf` file: + +```terraform +#resource "terraform_data" "curl-dependencies-zip" { +# provisioner "local-exec" { +# command = "curl -L -O ${local.dependencies-bucket-url}/${local.dependencies-file}" +# } +#} +``` + +And fix the now missing dependency in `dependencies-file`: + +```terraform +resource "aws_s3_object" "dependencies-file" { + bucket = local.config-bucket-name + key = local.dependencies-file + source = local.dependencies-file + + depends_on = [aws_s3_bucket.esf-config-bucket] #, terraform_data.curl-dependencies-zip] +} +``` + +Now follow the README file from [ESF terraform repository](https://github.com/elastic/terraform-elastic-esf) on how to configure the remaining necessary variables. You will have to configure `release-version` variable, but it will not be relevant to this. You can set any value you want for it. + diff --git a/how-to-test-locally/Taskfile.yaml b/how-to-test-locally/Taskfile.yaml new file mode 100644 index 00000000..16c96bd1 --- /dev/null +++ b/how-to-test-locally/Taskfile.yaml @@ -0,0 +1,49 @@ +version: '3' + +env: + # Directory to place the dependencies - just internal to this taskfile + DIR: dependencies + +dotenv: ['.env'] + +tasks: + default: + cmds: + - task: install-requirements + - task: build-zip-file + - task: remove-dependencies-dir + - task: add-to-zip + + install-requirements: + desc: "Install requirements from $REQUIREMENTS." + internal: true + requires: + var: REQUIREMENTS + cmds: + - rm -rf $DIR + - for: + var: REQUIREMENTS + split: ',' + cmd: pip3.9 install -r ../{{ .ITEM }} -t $DIR + + build-zip-file: + desc: "Zip $DIR to build $FILENAME." + internal: true + cmds: + - rm -rf $FILENAME + - cd $DIR && zip -r ../$FILENAME . + + remove-dependencies-dir: + desc: "Delete $DIR." + internal: true + cmds: + - rm -rf $DIR + + add-to-zip: + desc: "Add $DEPENDENCIES to zip file." + internal: true + cmds: + - for: + var: DEPENDENCIES + split: ',' + cmd: zip -r $FILENAME ../{{ .ITEM }} diff --git a/share/version.py b/share/version.py index db306ffb..dcd8436d 100644 --- a/share/version.py +++ b/share/version.py @@ -2,4 +2,4 @@ # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. -version = "1.13.1" +version = "1.14.0" diff --git a/tests/handlers/aws/test_handler.py b/tests/handlers/aws/test_handler.py index 6858d0bd..de9c83b7 100644 --- a/tests/handlers/aws/test_handler.py +++ b/tests/handlers/aws/test_handler.py @@ -399,6 +399,8 @@ def test_lambda_handler_noop(self) -> None: with self.subTest("no originalEventSourceARN in messageAttributes"): ctx = ContextMock() os.environ["S3_CONFIG_FILE"] = "s3://s3_config_file_bucket/s3_config_file_object_key" + os.environ["SQS_REPLAY_URL"] = "https://sqs.eu-central-1.amazonaws.com/123456789012/replay_queue" + os.environ["SQS_CONTINUE_URL"] = "https://sqs.eu-central-1.amazonaws.com/123456789012/continue_queue" lambda_event = deepcopy(_dummy_lambda_event) del lambda_event["Records"][0]["messageAttributes"]["originalEventSourceARN"] assert handler(lambda_event, ctx) == "completed" # type:ignore @@ -462,20 +464,6 @@ def test_lambda_handler_noop(self) -> None: del lambda_event["Records"][0]["messageAttributes"]["originalEventSourceARN"] assert handler(lambda_event, ctx) == "completed" # type:ignore - with self.subTest("no input defined for kinesis-data-stream"): - ctx = ContextMock() - os.environ["S3_CONFIG_FILE"] = "s3://s3_config_file_bucket/s3_config_file_object_key" - lambda_event = { - "Records": [ - { - "eventSource": "aws:kinesis", - "kinesis": {"data": ""}, - "eventSourceARN": "arn:aws:kinesis:eu-central-1:123456789:stream/test-esf-kinesis-stream", - } - ] - } - assert handler(lambda_event, ctx) == "completed" # type:ignore - with self.subTest("body is neither replay queue nor s3-sqs"): ctx = ContextMock() os.environ["S3_CONFIG_FILE"] = "s3://s3_config_file_bucket/s3_config_file_object_key" diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index bf2a632c..0b75847a 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -2602,6 +2602,82 @@ def test_cloudwatch_logs_stream_as_input_instead_of_group(self) -> None: assert logstash_message[1]["cloud"]["account"]["id"] == "000000000000" assert logstash_message[1]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] + def test_cloudwatch_logs_no_input_defined(self) -> None: + assert isinstance(self.logstash, LogstashContainer) + assert isinstance(self.localstack, LocalStackContainer) + + fixtures = [ + _load_file_fixture("cloudwatch-log-1.json"), + _load_file_fixture("cloudwatch-log-2.json"), + _load_file_fixture("cloudwatch-log-3.json"), + ] + + cloudwatch_group_name = _time_based_id(suffix="source-group") + cloudwatch_group = _logs_create_cloudwatch_logs_group(self.logs_client, group_name=cloudwatch_group_name) + + cloudwatch_stream_name = _time_based_id(suffix="source-stream") + _logs_create_cloudwatch_logs_stream( + self.logs_client, group_name=cloudwatch_group_name, stream_name=cloudwatch_stream_name + ) + + _logs_upload_event_to_cloudwatch_logs( + self.logs_client, + group_name=cloudwatch_group_name, + stream_name=cloudwatch_stream_name, + messages_body=fixtures, + ) + + cloudwatch_group_arn = cloudwatch_group["arn"] + cloudwatch_group_name = cloudwatch_group_name + cloudwatch_stream_name = cloudwatch_stream_name + + config_yaml: str = f""" + inputs: + - type: "cloudwatch-logs" + id: "misconfigured-cloudwatch-logs" + tags: {self.default_tags} + outputs: + - type: "logstash" + args: + logstash_url: "{self.logstash.get_url()}" + ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} + username: "{self.logstash.logstash_user}" + password: "{self.logstash.logstash_password}" + """ + + config_file_path = "config.yaml" + config_bucket_name = _time_based_id(suffix="config-bucket") + _s3_upload_content_to_bucket( + client=self.s3_client, + content=config_yaml.encode("utf-8"), + content_type="text/plain", + bucket_name=config_bucket_name, + key=config_file_path, + ) + + os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" + + events_cloudwatch_logs, event_ids_cloudwatch_logs, _ = _logs_retrieve_event_from_cloudwatch_logs( + self.logs_client, cloudwatch_group_name, cloudwatch_stream_name + ) + + ctx = ContextMock() + first_call = handler(events_cloudwatch_logs, ctx) # type:ignore + + assert first_call == "completed" + + replayed_events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) + replayed_messages = replayed_events["Records"] + + assert len(replayed_messages) == 3 + + arn_components = cloudwatch_group_arn.split(":") + arn_components[3] = "%AWS_REGION%" + cloudwatch_group_arn = ":".join(arn_components) + + for message in replayed_messages: + assert message["messageAttributes"]["originalEventSourceARN"]["stringValue"] == cloudwatch_group_arn + def test_cloudwatch_logs_last_ending_offset_reset(self) -> None: assert isinstance(self.logstash, LogstashContainer) assert isinstance(self.localstack, LocalStackContainer) @@ -2829,6 +2905,65 @@ def test_cloudwatch_logs_last_event_expanded_offset_continue(self) -> None: assert logstash_message[2]["cloud"]["account"]["id"] == "000000000000" assert logstash_message[2]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] + def test_kinesis_data_stream_no_input_defined(self) -> None: + assert isinstance(self.logstash, LogstashContainer) + assert isinstance(self.localstack, LocalStackContainer) + + fixtures = [ + _load_file_fixture("cloudwatch-log-1.json"), + _load_file_fixture("cloudwatch-log-2.json"), + _load_file_fixture("cloudwatch-log-3.json"), + ] + + kinesis_stream_name = _time_based_id(suffix="source-kinesis") + kinesis_stream = _kinesis_create_stream(self.kinesis_client, kinesis_stream_name) + kinesis_stream_arn = kinesis_stream["StreamDescription"]["StreamARN"] + + _kinesis_put_records(self.kinesis_client, kinesis_stream_name, fixtures) + + config_yaml: str = f""" + inputs: + - type: "kinesis-data-stream" + id: "misconfigured-id" + tags: {self.default_tags} + outputs: + - type: "logstash" + args: + logstash_url: "{self.logstash.get_url()}" + ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} + username: "{self.logstash.logstash_user}" + password: "{self.logstash.logstash_password}" + """ + + config_file_path = "config.yaml" + config_bucket_name = _time_based_id(suffix="config-bucket") + _s3_upload_content_to_bucket( + client=self.s3_client, + content=config_yaml.encode("utf-8"), + content_type="text/plain", + bucket_name=config_bucket_name, + key=config_file_path, + ) + + os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" + + events_kinesis, _ = _kinesis_retrieve_event_from_kinesis_stream( + self.kinesis_client, kinesis_stream_name, kinesis_stream_arn + ) + + ctx = ContextMock() + first_call = handler(events_kinesis, ctx) # type:ignore + + assert first_call == "completed" + + replayed_events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) + replayed_messages = replayed_events["Records"] + + assert len(replayed_messages) == 3 + + for message in replayed_messages: + assert kinesis_stream_arn == message["messageAttributes"]["originalEventSourceARN"]["stringValue"] + def test_kinesis_data_stream_last_ending_offset_reset(self) -> None: assert isinstance(self.logstash, LogstashContainer) assert isinstance(self.localstack, LocalStackContainer) @@ -3052,6 +3187,71 @@ def test_kinesis_data_stream_last_event_expanded_offset_continue(self) -> None: assert logstash_message[2]["cloud"]["account"]["id"] == "000000000000" assert logstash_message[2]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] + def test_sqs_no_input_defined(self) -> None: + assert isinstance(self.logstash, LogstashContainer) + assert isinstance(self.localstack, LocalStackContainer) + + fixtures = [ + _load_file_fixture("cloudwatch-log-1.json"), + _load_file_fixture("cloudwatch-log-2.json"), + _load_file_fixture("cloudwatch-log-3.json"), + ] + + sqs_queue_name = _time_based_id(suffix="source-sqs") + + sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) + + sqs_queue_arn = sqs_queue["QueueArn"] + sqs_queue_url = sqs_queue["QueueUrl"] + + for fixture in fixtures: + _sqs_send_messages(self.sqs_client, sqs_queue_url, fixture) + + config_yaml: str = f""" + inputs: + - type: "sqs" + id: "misconfigured-id" + tags: {self.default_tags} + outputs: + - type: "logstash" + args: + logstash_url: "{self.logstash.get_url()}" + ssl_assert_fingerprint: {self.logstash.ssl_assert_fingerprint} + username: "{self.logstash.logstash_user}" + password: "{self.logstash.logstash_password}" + """ + + config_file_path = "config.yaml" + config_bucket_name = _time_based_id(suffix="config-bucket") + _s3_upload_content_to_bucket( + client=self.s3_client, + content=config_yaml.encode("utf-8"), + content_type="text/plain", + bucket_name=config_bucket_name, + key=config_file_path, + ) + + os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" + + events_sqs, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) + messages_sqs = events_sqs["Records"] + + ctx = ContextMock() + first_call = handler(events_sqs, ctx) # type:ignore + + assert first_call == "completed" + + replayed_events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) + replayed_messages = replayed_events["Records"] + + assert len(messages_sqs) == 3 + assert len(replayed_messages) == 3 + for i, message in enumerate(replayed_messages): + assert ( + messages_sqs[i]["eventSourceARN"] + == message["messageAttributes"]["originalEventSourceARN"]["stringValue"] + ) + def test_sqs_last_ending_offset_reset(self) -> None: assert isinstance(self.logstash, LogstashContainer) assert isinstance(self.localstack, LocalStackContainer)