From 30512d3fafbd99668af4dfdb75c191d6b250162d Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Wed, 11 Sep 2024 21:28:35 -0600 Subject: [PATCH 01/27] Move error field to error.message and error.type --- shippers/es.py | 15 ++++++++++++++- tests/handlers/aws/test_integrations.py | 2 +- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index 43d86e1b..11d243ce 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -176,7 +176,20 @@ def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Unio # Skip duplicate events on dead letter index and replay queue continue - failed.append({"error": error["create"]["error"], "action": action_failed[0]}) + failed_error = {"action": action_failed[0], "error": {}} + error_field = error.get("create", {}).get("error", None) + if error_field: + if "reason" in error_field: + failed_error["error"]["message"] = error_field["reason"] + if "type" in error_field: + failed_error["error"]["type"] = error_field["type"] + else: + failed_error["error"]["message"] = error_field + + if "exception" in error["create"]: + failed_error["error"]["stack_trace"] = error["create"]["exception"] + + failed.append(failed_error) if len(failed) > 0: shared_logger.warning("elasticsearch shipper", extra={"success": success, "failed": len(failed)}) diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index fc15d227..b23fe3a4 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -4292,7 +4292,7 @@ def test_es_dead_letter_index(self) -> None: assert res["hits"]["total"] == {"value": 1, "relation": "eq"} assert ( - res["hits"]["hits"][0]["_source"]["error"]["reason"] + res["hits"]["hits"][0]["_source"]["error"]["message"] == "test_es_non_indexable_dead_letter_index fail message" ) assert res["hits"]["hits"][0]["_source"]["error"]["type"] == "fail_processor_exception" From f08b49ab3ad8e83f42e3aa11f4283b5abc082d81 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 17 Sep 2024 18:35:19 +0200 Subject: [PATCH 02/27] Move error parsing into a helper --- shippers/es.py | 56 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index 11d243ce..6c2c331e 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -176,18 +176,21 @@ def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Unio # Skip duplicate events on dead letter index and replay queue continue - failed_error = {"action": action_failed[0], "error": {}} - error_field = error.get("create", {}).get("error", None) - if error_field: - if "reason" in error_field: - failed_error["error"]["message"] = error_field["reason"] - if "type" in error_field: - failed_error["error"]["type"] = error_field["type"] - else: - failed_error["error"]["message"] = error_field - - if "exception" in error["create"]: - failed_error["error"]["stack_trace"] = error["create"]["exception"] + failed_error = { + "action": action_failed[0], + "error": self._parse_error(error["create"]), + } + # error_field = error.get("create", {}).get("error", None) + # if error_field: + # if "reason" in error_field: + # failed_error["error"]["message"] = error_field["reason"] + # if "type" in error_field: + # failed_error["error"]["type"] = error_field["type"] + # else: + # failed_error["error"]["message"] = error_field + # + # if "exception" in error["create"]: + # failed_error["error"]["stack_trace"] = error["create"]["exception"] failed.append(failed_error) @@ -198,6 +201,35 @@ def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Unio return failed + def _parse_error(self, error: dict[str, Any]) -> dict[str, Any]: + """ + Parses the error response from Elasticsearch and returns a dictionary. + """ + error_field: dict[str, Any] = {"error": {"message": "Unknown error"}} + + if "status" in error and isinstance(error["status"], int): + error_field["http"] = {"status_code": error["status"]} + + if "error" not in error: + return error_field + + if isinstance(error["error"], str): + # Can happen with connection errors. + error_field["error"]["message"] = error["error"] + if "exception" in error: + # The exception field is usually an Exception object, + # so we convert it to a string. + error_field["error"]["type"] = str(error["exception"]) + elif isinstance(error["error"], dict): + # Can happen with status 5xx errors. + # In this case, we look for the "reason" and "type" fields. + if "reason" in error["error"]: + error_field["error"]["message"] = error["error"]["reason"] + if "type" in error["error"]: + error_field["error"]["type"] = error["error"]["type"] + + return error_field + def set_event_id_generator(self, event_id_generator: EventIdGeneratorCallable) -> None: self._event_id_generator = event_id_generator From 089b19e218a0195d192cc4f39c9e06d880436e43 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 17 Sep 2024 18:44:28 +0200 Subject: [PATCH 03/27] merge action and error into one dict --- shippers/es.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index 6c2c331e..1597137c 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -176,10 +176,8 @@ def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Unio # Skip duplicate events on dead letter index and replay queue continue - failed_error = { - "action": action_failed[0], - "error": self._parse_error(error["create"]), - } + failed_error = {"action": action_failed[0]} | self._parse_error(error["create"]) + # error_field = error.get("create", {}).get("error", None) # if error_field: # if "reason" in error_field: From 108081492b1d05286b57c5b5b6599fead9538f3e Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Thu, 26 Sep 2024 13:05:43 +0200 Subject: [PATCH 04/27] Cleanup --- shippers/es.py | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index 1597137c..0472bfea 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -201,32 +201,48 @@ def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Unio def _parse_error(self, error: dict[str, Any]) -> dict[str, Any]: """ - Parses the error response from Elasticsearch and returns a dictionary. + Parses the error response from Elasticsearch and returns a + standardised error field. + + The error field is a dictionary with the following keys: + + - message: The error message + - type: The error type + + If the error is not recognised, the message is set to "Unknown error". + + It also sets the status code in the http field if it is present + as a number in the response. """ - error_field: dict[str, Any] = {"error": {"message": "Unknown error"}} + field: dict[str, Any] = {"error": {"message": "Unknown error"}, "type": "unknown"} if "status" in error and isinstance(error["status"], int): - error_field["http"] = {"status_code": error["status"]} + # Collecting the HTTP status code in the error field, + # if present and the type is an integer. + # + # Sometimes the status code is a string, for example, + # when the connection to the server fails. + field["http"] = {"status_code": error["status"]} if "error" not in error: - return error_field + return field if isinstance(error["error"], str): # Can happen with connection errors. - error_field["error"]["message"] = error["error"] + field["error"]["message"] = error["error"] if "exception" in error: # The exception field is usually an Exception object, # so we convert it to a string. - error_field["error"]["type"] = str(error["exception"]) + field["error"]["type"] = str(error["exception"]) elif isinstance(error["error"], dict): # Can happen with status 5xx errors. # In this case, we look for the "reason" and "type" fields. if "reason" in error["error"]: - error_field["error"]["message"] = error["error"]["reason"] + field["error"]["message"] = error["error"]["reason"] if "type" in error["error"]: - error_field["error"]["type"] = error["error"]["type"] + field["error"]["type"] = error["error"]["type"] - return error_field + return field def set_event_id_generator(self, event_id_generator: EventIdGeneratorCallable) -> None: self._event_id_generator = event_id_generator From 2ac4cdfe48a3109b27812f72d95e1ac95ce9964c Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Thu, 26 Sep 2024 14:07:23 +0200 Subject: [PATCH 05/27] Fix http.response.status_code and clean up --- shippers/es.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index 0472bfea..0dbfb9f7 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -206,10 +206,11 @@ def _parse_error(self, error: dict[str, Any]) -> dict[str, Any]: The error field is a dictionary with the following keys: - - message: The error message - - type: The error type + - `message`: The error message + - `type`: The error type - If the error is not recognised, the message is set to "Unknown error". + If the error is not recognised, the `message` key is set + to "Unknown error". It also sets the status code in the http field if it is present as a number in the response. @@ -217,12 +218,12 @@ def _parse_error(self, error: dict[str, Any]) -> dict[str, Any]: field: dict[str, Any] = {"error": {"message": "Unknown error"}, "type": "unknown"} if "status" in error and isinstance(error["status"], int): - # Collecting the HTTP status code in the error field, - # if present and the type is an integer. + # Collecting the HTTP response status code in the + # error field, if present, and the type is an integer. # # Sometimes the status code is a string, for example, # when the connection to the server fails. - field["http"] = {"status_code": error["status"]} + field["http"] = {"response": {"status_code": error["status"]}} if "error" not in error: return field @@ -340,15 +341,22 @@ def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]: # Assign random id in case bulk() results in error, it can be matched to the original # action - return { + encoded = { "@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"), - "_id": f"{uuid.uuid4()}", + "_id": str(uuid.uuid4()), "_index": self._es_dead_letter_index, "_op_type": "create", "message": json_dumper(outcome["action"]), "error": outcome["error"], } + if "http" in outcome: + # the `http.response.status_code` is not + # always present in the error field. + encoded["http"] = outcome["http"] + + return encoded + def _decode_dead_letter(self, dead_letter_outcome: dict[str, Any]) -> dict[str, Any]: if "action" not in dead_letter_outcome or "message" not in dead_letter_outcome["action"]: return {} From feae60c6aa2d0f64cf091b9684d522c7f33157b5 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Thu, 26 Sep 2024 18:03:40 +0200 Subject: [PATCH 06/27] Set error.type with the exception type name as str --- shippers/es.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shippers/es.py b/shippers/es.py index 0dbfb9f7..dda7a1f3 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -234,7 +234,7 @@ def _parse_error(self, error: dict[str, Any]) -> dict[str, Any]: if "exception" in error: # The exception field is usually an Exception object, # so we convert it to a string. - field["error"]["type"] = str(error["exception"]) + field["error"]["type"] = str(type(error["exception"])) elif isinstance(error["error"], dict): # Can happen with status 5xx errors. # In this case, we look for the "reason" and "type" fields. From f37a3c6aa7d03d5173aeb29d52d26ef5fe4564ee Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Thu, 26 Sep 2024 18:04:05 +0200 Subject: [PATCH 07/27] Fix error.type --- shippers/es.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shippers/es.py b/shippers/es.py index dda7a1f3..5dd13cb4 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -215,7 +215,7 @@ def _parse_error(self, error: dict[str, Any]) -> dict[str, Any]: It also sets the status code in the http field if it is present as a number in the response. """ - field: dict[str, Any] = {"error": {"message": "Unknown error"}, "type": "unknown"} + field: dict[str, Any] = {"error": {"message": "Unknown error", "type": "unknown"}} if "status" in error and isinstance(error["status"], int): # Collecting the HTTP response status code in the From 8c937b70734d6b3da2fac380b127b3b01293c38b Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Thu, 26 Sep 2024 18:54:16 +0200 Subject: [PATCH 08/27] Test http.response.status_code --- tests/handlers/aws/test_integrations.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index b23fe3a4..e461913d 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -4296,6 +4296,7 @@ def test_es_dead_letter_index(self) -> None: == "test_es_non_indexable_dead_letter_index fail message" ) assert res["hits"]["hits"][0]["_source"]["error"]["type"] == "fail_processor_exception" + assert res["hits"]["hits"][0]["_source"]["http"]["response"]["status_code"] == 500 dead_letter_message = json_parser(res["hits"]["hits"][0]["_source"]["message"]) assert dead_letter_message["log"]["offset"] == 0 assert dead_letter_message["log"]["file"]["path"] == sqs_queue_url_path From b3a85b2ef498b0d5900c7b2d7d5467765f9c0ccf Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Mon, 30 Sep 2024 22:29:50 +0200 Subject: [PATCH 09/27] Do not index connection errors and filter by type --- shippers/es.py | 57 ++++++++---- tests/handlers/aws/test_integrations.py | 110 ++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 19 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index 5dd13cb4..41e62441 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -4,7 +4,7 @@ import datetime import uuid -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, List, Optional, Union import elasticapm # noqa: F401 from elasticsearch import Elasticsearch @@ -61,6 +61,7 @@ def __init__( api_key: str = "", es_datastream_name: str = "", es_dead_letter_index: str = "", + es_dead_letter_forward_errors: List[str] = [], tags: list[str] = [], batch_max_actions: int = 500, batch_max_bytes: int = 10 * 1024 * 1024, @@ -112,6 +113,7 @@ def __init__( self._es_datastream_name = es_datastream_name self._es_dead_letter_index = es_dead_letter_index + self.es_dead_letter_forward_errors = es_dead_letter_forward_errors self._tags = tags self._es_index = "" @@ -178,18 +180,6 @@ def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Unio failed_error = {"action": action_failed[0]} | self._parse_error(error["create"]) - # error_field = error.get("create", {}).get("error", None) - # if error_field: - # if "reason" in error_field: - # failed_error["error"]["message"] = error_field["reason"] - # if "type" in error_field: - # failed_error["error"]["type"] = error_field["type"] - # else: - # failed_error["error"]["message"] = error_field - # - # if "exception" in error["create"]: - # failed_error["error"]["stack_trace"] = error["create"]["exception"] - failed.append(failed_error) if len(failed) > 0: @@ -303,26 +293,55 @@ def flush(self) -> None: return def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: + """ + Send the failed actions to the dead letter index (DLI). + + This function attempts to forward failed actions to the DLI, but may not do so + for one of the following reasons: + + 1. The action response does not have an HTTP status (e.g., the connection failed). + 2. The list of action errors to forward is not empty, and the action error type is not in the list. + 3. The action could not be encoded for indexing in the DLI. + 4. The action failed indexing attempt in the DLI. + + Args: + actions (list[Any]): A list of actions to be processed. + + Returns: + list[Any]: A list of actions that were not indexed in the DLI. + """ + non_indexed_actions: list[Any] = [] encoded_actions = [] - dead_letter_errors: list[Any] = [] + for action in actions: + if "http" not in action or ( + self.es_dead_letter_forward_errors and action["error"]["type"] not in self.es_dead_letter_forward_errors + ): + # We don't want to forward this action to + # the dead letter index. + # + # Add it to the list of non-indexed actions + # and continue to the next. + non_indexed_actions.append(action) + continue + # Reshape event to dead letter index encoded = self._encode_dead_letter(action) if not encoded: shared_logger.error("cannot encode dead letter index event from payload", extra={"action": action}) - dead_letter_errors.append(action) + non_indexed_actions.append(action) encoded_actions.append(encoded) # If no action can be encoded, return original action list as failed if len(encoded_actions) == 0: - return dead_letter_errors + return non_indexed_actions errors = es_bulk(self._es_client, encoded_actions, **self._bulk_kwargs) failed = self._handle_outcome(actions=encoded_actions, errors=errors) if not isinstance(failed, list) or len(failed) == 0: - return dead_letter_errors + return non_indexed_actions for action in failed: event_payload = self._decode_dead_letter(action) @@ -331,9 +350,9 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: shared_logger.error("cannot decode dead letter index event from payload", extra={"action": action}) continue - dead_letter_errors.append(event_payload) + non_indexed_actions.append(event_payload) - return dead_letter_errors + return non_indexed_actions def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]: if "action" not in outcome or "error" not in outcome: diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index e461913d..8e3bbd92 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -4420,3 +4420,113 @@ def test_es_non_indexable_dead_letter_index(self) -> None: assert first_body["event_payload"]["cloud"]["region"] == "us-east-1" assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000" assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] + + def test_es_dead_letter_index_with_excluded_action_error(self) -> None: + assert isinstance(self.elasticsearch, ElasticsearchContainer) + assert isinstance(self.localstack, LocalStackContainer) + + sqs_queue_name = _time_based_id(suffix="source-sqs") + sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) + + dead_letter_index_name = "logs-generic-default-dli" + + sqs_queue_arn = sqs_queue["QueueArn"] + sqs_queue_url = sqs_queue["QueueUrl"] + sqs_queue_url_path = sqs_queue["QueueUrlPath"] + + config_yaml: str = f""" + inputs: + - type: sqs + id: "{sqs_queue_arn}" + tags: {self.default_tags} + outputs: + - type: "elasticsearch" + args: + elasticsearch_url: "{self.elasticsearch.get_url()}" + es_dead_letter_index: "{dead_letter_index_name}" + es_dead_letter_forward_errors: + - fail_processor_exception + ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} + username: "{self.secret_arn}:username" + password: "{self.secret_arn}:password" + """ + + config_file_path = "config.yaml" + config_bucket_name = _time_based_id(suffix="config-bucket") + _s3_upload_content_to_bucket( + client=self.s3_client, + content=config_yaml.encode("utf-8"), + content_type="text/plain", + bucket_name=config_bucket_name, + key=config_file_path, + ) + + os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" + + fixtures = [ + _load_file_fixture("cloudwatch-log-1.json"), + ] + + _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) + + event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) + message_id = event["Records"][0]["messageId"] + + # Create pipeline to reject documents + processors = { + "processors": [ + { + "fail": { + "message": "test_es_dead_letter_index_with_excluded_action_error fail message", + } + }, + ] + } + + self.elasticsearch.put_pipeline( + id="test_es_dead_letter_index_with_excluded_action_error_fail_pipeline", + body=processors, + ) + + self.elasticsearch.create_data_stream(name="logs-generic-default") + self.elasticsearch.put_settings( + index="logs-generic-default", + body={"index.default_pipeline": "test_es_dead_letter_index_with_excluded_action_error_fail_pipeline"}, + ) + + self.elasticsearch.refresh(index="logs-generic-default") + + self.elasticsearch.create_data_stream(name=dead_letter_index_name) + + ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) + first_call = handler(event, ctx) # type:ignore + + assert first_call == "completed" + + # Test document has been rejected from target index + self.elasticsearch.refresh(index="logs-generic-default") + + assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0 + + # Test event does not go into the dead letter queue + assert self.elasticsearch.exists(index=dead_letter_index_name) is True + + self.elasticsearch.refresh(index=dead_letter_index_name) + + assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 0 + + # Test event has been redirected into the replay queue + events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) + assert len(events["Records"]) == 1 + + first_body: dict[str, Any] = json_parser(events["Records"][0]["body"]) + + assert first_body["event_payload"]["message"] == fixtures[0].rstrip("\n") + assert first_body["event_payload"]["log"]["offset"] == 0 + assert first_body["event_payload"]["log"]["file"]["path"] == sqs_queue_url_path + assert first_body["event_payload"]["aws"]["sqs"]["name"] == sqs_queue_name + assert first_body["event_payload"]["aws"]["sqs"]["message_id"] == message_id + assert first_body["event_payload"]["cloud"]["provider"] == "aws" + assert first_body["event_payload"]["cloud"]["region"] == "us-east-1" + assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000" + assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] From cf2e156d1f8a126c476665f065f36b51aa8f9bbb Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Mon, 30 Sep 2024 22:46:13 +0200 Subject: [PATCH 10/27] Add es_dead_letter_forward_errors to config --- share/config.py | 15 ++++++++++++++- shippers/es.py | 2 +- shippers/factory.py | 1 + 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/share/config.py b/share/config.py index dc38a398..f08719ae 100644 --- a/share/config.py +++ b/share/config.py @@ -2,7 +2,7 @@ # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. -from typing import Any, Callable, Optional, Union +from typing import Any, Callable, List, Optional, Union import yaml @@ -51,6 +51,7 @@ def __init__( batch_max_bytes: int = 10 * 1024 * 1024, ssl_assert_fingerprint: str = "", es_dead_letter_index: str = "", + es_dead_letter_forward_errors: List[str] = [], ): super().__init__(output_type="elasticsearch") self.elasticsearch_url = elasticsearch_url @@ -64,6 +65,7 @@ def __init__( self.batch_max_bytes = batch_max_bytes self.ssl_assert_fingerprint = ssl_assert_fingerprint self.es_dead_letter_index = es_dead_letter_index + self.es_dead_letter_forward_errors = es_dead_letter_forward_errors if self.cloud_id and self.elasticsearch_url: shared_logger.warning("both `elasticsearch_url` and `cloud_id` set in config: using `elasticsearch_url`") @@ -195,6 +197,17 @@ def es_dead_letter_index(self, value: str) -> None: self._es_dead_letter_index = value + @property + def es_dead_letter_forward_errors(self) -> str: + return self._es_dead_letter_forward_errors + + @es_dead_letter_forward_errors.setter + def es_dead_letter_forward_errors(self, value: List[str]) -> None: + if not isinstance(value, list): + raise ValueError("`es_dead_letter_forward_errors` must be provided as list") + + self._es_dead_letter_forward_errors = value + class LogstashOutput(Output): def __init__( diff --git a/shippers/es.py b/shippers/es.py index 41e62441..7d642528 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -113,7 +113,7 @@ def __init__( self._es_datastream_name = es_datastream_name self._es_dead_letter_index = es_dead_letter_index - self.es_dead_letter_forward_errors = es_dead_letter_forward_errors + self._es_dead_letter_forward_errors = es_dead_letter_forward_errors self._tags = tags self._es_index = "" diff --git a/shippers/factory.py b/shippers/factory.py index 0f444434..65e4ed18 100644 --- a/shippers/factory.py +++ b/shippers/factory.py @@ -49,6 +49,7 @@ def create_from_output(output_type: str, output: Output) -> ProtocolShipper: batch_max_bytes=output.batch_max_bytes, ssl_assert_fingerprint=output.ssl_assert_fingerprint, es_dead_letter_index=output.es_dead_letter_index, + es_dead_letter_forward_errors=output.es_dead_letter_forward_errors, ) if output_type == "logstash": From e2f1abc754319794d1bb43b978b4766f19280d56 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Mon, 30 Sep 2024 22:53:32 +0200 Subject: [PATCH 11/27] Fix private var name --- share/config.py | 2 +- shippers/es.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/share/config.py b/share/config.py index f08719ae..4ee0497f 100644 --- a/share/config.py +++ b/share/config.py @@ -200,7 +200,7 @@ def es_dead_letter_index(self, value: str) -> None: @property def es_dead_letter_forward_errors(self) -> str: return self._es_dead_letter_forward_errors - + @es_dead_letter_forward_errors.setter def es_dead_letter_forward_errors(self, value: List[str]) -> None: if not isinstance(value, list): diff --git a/shippers/es.py b/shippers/es.py index 7d642528..5d80dd60 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -315,7 +315,8 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: for action in actions: if "http" not in action or ( - self.es_dead_letter_forward_errors and action["error"]["type"] not in self.es_dead_letter_forward_errors + self._es_dead_letter_forward_errors + and action["error"]["type"] not in self._es_dead_letter_forward_errors ): # We don't want to forward this action to # the dead letter index. From 0375d2fb85ce3b3df5921316c38c1d0e43b52c40 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Mon, 30 Sep 2024 23:35:30 +0200 Subject: [PATCH 12/27] Add logs to DLI --- shippers/es.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/shippers/es.py b/shippers/es.py index 5d80dd60..01a26a1d 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -313,6 +313,8 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: non_indexed_actions: list[Any] = [] encoded_actions = [] + shared_logger.debug(f"forwarding {len(actions)} actions to dead letter index") + for action in actions: if "http" not in action or ( self._es_dead_letter_forward_errors @@ -323,6 +325,7 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: # # Add it to the list of non-indexed actions # and continue to the next. + shared_logger.debug("action not forwarded to dead letter index") non_indexed_actions.append(action) continue @@ -336,12 +339,14 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: # If no action can be encoded, return original action list as failed if len(encoded_actions) == 0: + shared_logger.error("no action can be encoded for dead letter index") return non_indexed_actions errors = es_bulk(self._es_client, encoded_actions, **self._bulk_kwargs) failed = self._handle_outcome(actions=encoded_actions, errors=errors) if not isinstance(failed, list) or len(failed) == 0: + shared_logger.info("all actions forwarded to dead letter index") return non_indexed_actions for action in failed: @@ -353,6 +358,7 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: non_indexed_actions.append(event_payload) + shared_logger.info(f"{len(failed)} actions failed to be forwarded to dead letter index") return non_indexed_actions def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]: From 2ff886dcab51a135b32c04bd13a594819cb0a9ab Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Mon, 30 Sep 2024 23:46:49 +0200 Subject: [PATCH 13/27] Logging --- shippers/es.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index 01a26a1d..e435e461 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -313,7 +313,7 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: non_indexed_actions: list[Any] = [] encoded_actions = [] - shared_logger.debug(f"forwarding {len(actions)} actions to dead letter index") + shared_logger.info(f"forwarding {len(actions)} actions to dead letter index") for action in actions: if "http" not in action or ( @@ -325,7 +325,7 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: # # Add it to the list of non-indexed actions # and continue to the next. - shared_logger.debug("action not forwarded to dead letter index") + shared_logger.info("action not forwarded to dead letter index") non_indexed_actions.append(action) continue @@ -339,7 +339,7 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: # If no action can be encoded, return original action list as failed if len(encoded_actions) == 0: - shared_logger.error("no action can be encoded for dead letter index") + shared_logger.info("no actions to forward to dead letter index") return non_indexed_actions errors = es_bulk(self._es_client, encoded_actions, **self._bulk_kwargs) From 227d8a53390086ebe01a40be30366ce7b9abf31f Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Mon, 30 Sep 2024 23:55:20 +0200 Subject: [PATCH 14/27] More logs --- shippers/es.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/shippers/es.py b/shippers/es.py index e435e461..97b48863 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -279,13 +279,17 @@ def flush(self) -> None: if len(failed) > 0 and self._es_dead_letter_index: failed = self._send_dead_letter_index(failed) + shared_logger.info(f"there are {len(failed)} failed actions") + # Send remaining failed requests to replay queue, if enabled if isinstance(failed, list) and len(failed) > 0 and self._replay_handler is not None: + shared_logger.info(f"replaying {len(failed)} failed actions") for outcome in failed: if "action" not in outcome: shared_logger.error("action could not be extracted to be replayed", extra={"outcome": outcome}) continue + shared_logger.info("replaying action", extra={"action": outcome["action"]}) self._replay_handler(self._output_destination, self._replay_args, outcome["action"]) self._bulk_actions = [] @@ -339,7 +343,7 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: # If no action can be encoded, return original action list as failed if len(encoded_actions) == 0: - shared_logger.info("no actions to forward to dead letter index") + shared_logger.info(f"no actions to forward to dead letter index; returning {len(non_indexed_actions)} non_indexed_actions") return non_indexed_actions errors = es_bulk(self._es_client, encoded_actions, **self._bulk_kwargs) From 3109e90e4fdad97d18c14906a0adf5c256435253 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 1 Oct 2024 00:22:23 +0200 Subject: [PATCH 15/27] Fix tests --- shippers/es.py | 1 - tests/handlers/aws/test_integrations.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index 97b48863..ce5de6c3 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -343,7 +343,6 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: # If no action can be encoded, return original action list as failed if len(encoded_actions) == 0: - shared_logger.info(f"no actions to forward to dead letter index; returning {len(non_indexed_actions)} non_indexed_actions") return non_indexed_actions errors = es_bulk(self._es_client, encoded_actions, **self._bulk_kwargs) diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index 8e3bbd92..3c075109 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -4445,7 +4445,7 @@ def test_es_dead_letter_index_with_excluded_action_error(self) -> None: elasticsearch_url: "{self.elasticsearch.get_url()}" es_dead_letter_index: "{dead_letter_index_name}" es_dead_letter_forward_errors: - - fail_processor_exception + - non_existent_error ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} username: "{self.secret_arn}:username" password: "{self.secret_arn}:password" From afd9e40330e380f0c6e11dc410a52ae400547a7d Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 1 Oct 2024 00:30:15 +0200 Subject: [PATCH 16/27] Add test_es_dead_letter_index_with_included_action_error test --- tests/handlers/aws/test_integrations.py | 112 ++++++++++++++++++++++++ 1 file changed, 112 insertions(+) diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index 3c075109..6e9821dc 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -4530,3 +4530,115 @@ def test_es_dead_letter_index_with_excluded_action_error(self) -> None: assert first_body["event_payload"]["cloud"]["region"] == "us-east-1" assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000" assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] + + def test_es_dead_letter_index_with_included_action_error(self) -> None: + assert isinstance(self.elasticsearch, ElasticsearchContainer) + assert isinstance(self.localstack, LocalStackContainer) + + sqs_queue_name = _time_based_id(suffix="source-sqs") + sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) + + dead_letter_index_name = "logs-generic-default-dli" + + sqs_queue_arn = sqs_queue["QueueArn"] + sqs_queue_url = sqs_queue["QueueUrl"] + sqs_queue_url_path = sqs_queue["QueueUrlPath"] + + config_yaml: str = f""" + inputs: + - type: sqs + id: "{sqs_queue_arn}" + tags: {self.default_tags} + outputs: + - type: "elasticsearch" + args: + elasticsearch_url: "{self.elasticsearch.get_url()}" + es_dead_letter_index: "{dead_letter_index_name}" + es_dead_letter_forward_errors: + - fail_processor_exception + ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} + username: "{self.secret_arn}:username" + password: "{self.secret_arn}:password" + """ + + config_file_path = "config.yaml" + config_bucket_name = _time_based_id(suffix="config-bucket") + _s3_upload_content_to_bucket( + client=self.s3_client, + content=config_yaml.encode("utf-8"), + content_type="text/plain", + bucket_name=config_bucket_name, + key=config_file_path, + ) + + os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" + + fixtures = [ + _load_file_fixture("cloudwatch-log-1.json"), + ] + + _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) + + event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) + message_id = event["Records"][0]["messageId"] + + # Create pipeline to reject documents + processors = { + "processors": [ + { + "fail": { + "message": "test_es_dead_letter_index_with_included_action_error fail message", + } + }, + ] + } + + self.elasticsearch.put_pipeline( + id="test_es_dead_letter_index_with_included_action_error_fail_pipeline", + body=processors, + ) + + self.elasticsearch.create_data_stream(name="logs-generic-default") + self.elasticsearch.put_settings( + index="logs-generic-default", + body={"index.default_pipeline": "test_es_dead_letter_index_with_included_action_error_fail_pipeline"}, + ) + + self.elasticsearch.refresh(index="logs-generic-default") + + self.elasticsearch.create_data_stream(name=dead_letter_index_name) + + ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) + first_call = handler(event, ctx) # type:ignore + + assert first_call == "completed" + + # Test document has been rejected from target index + self.elasticsearch.refresh(index="logs-generic-default") + + assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0 + + res = self.elasticsearch.search(index=dead_letter_index_name, sort="_seq_no") + + assert res["hits"]["total"] == {"value": 1, "relation": "eq"} + + assert ( + res["hits"]["hits"][0]["_source"]["error"]["message"] + == "test_es_dead_letter_index_with_included_action_error fail message" + ) + assert res["hits"]["hits"][0]["_source"]["error"]["type"] == "fail_processor_exception" + assert res["hits"]["hits"][0]["_source"]["http"]["response"]["status_code"] == 500 + dead_letter_message = json_parser(res["hits"]["hits"][0]["_source"]["message"]) + assert dead_letter_message["log"]["offset"] == 0 + assert dead_letter_message["log"]["file"]["path"] == sqs_queue_url_path + assert dead_letter_message["aws"]["sqs"]["name"] == sqs_queue_name + assert dead_letter_message["aws"]["sqs"]["message_id"] == message_id + assert dead_letter_message["cloud"]["provider"] == "aws" + assert dead_letter_message["cloud"]["region"] == "us-east-1" + assert dead_letter_message["cloud"]["account"]["id"] == "000000000000" + assert dead_letter_message["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] + + # Test event does not go into the replay queue + events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) + + assert len(events["Records"]) == 0 From 1fd2c035048da78dc62d003bc947ce6c5623e6a9 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 1 Oct 2024 00:31:24 +0200 Subject: [PATCH 17/27] Fix linter objections --- share/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/share/config.py b/share/config.py index 4ee0497f..98f84122 100644 --- a/share/config.py +++ b/share/config.py @@ -198,7 +198,7 @@ def es_dead_letter_index(self, value: str) -> None: self._es_dead_letter_index = value @property - def es_dead_letter_forward_errors(self) -> str: + def es_dead_letter_forward_errors(self) -> List[str]: return self._es_dead_letter_forward_errors @es_dead_letter_forward_errors.setter From 6fdce15218d84eeb08314ccb282401e1f2014d2e Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 1 Oct 2024 01:05:50 +0200 Subject: [PATCH 18/27] Add _parse_error() tests --- tests/shippers/test_es.py | 44 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/tests/shippers/test_es.py b/tests/shippers/test_es.py index aab8c345..87944ae2 100644 --- a/tests/shippers/test_es.py +++ b/tests/shippers/test_es.py @@ -511,3 +511,47 @@ def test_dumps(self) -> None: with self.subTest("dumps dict"): dumped = json_serializer.dumps({"key": "value"}) assert '{"key":"value"}' == dumped + + +@pytest.mark.unit +class TestParseError(TestCase): + + def test_parse_error(self) -> None: + shipper = ElasticsearchShipper( + elasticsearch_url="elasticsearch_url", + username="username", + password="password", + tags=["tag1", "tag2", "tag3"], + ) + + with self.subTest("fail_processor_exception"): + error = shipper._parse_error( + { + "status": 500, + "error": { + "type": "fail_processor_exception", + "reason": "Fail message", + }, + }, + ) + + assert error["error"]["type"] == "fail_processor_exception" + assert error["error"]["message"] == "Fail message" + + with self.subTest("connection_error"): + error = shipper._parse_error( + { + "status": "N/A", + "error": "whatever", + "exception": elasticsearch.exceptions.ConnectionError("Connection error"), + } + ) + + assert error["error"]["type"] == "" + assert error["error"]["message"] == "whatever" + + with self.subTest("unknown_error"): + error = shipper._parse_error({}) + + assert error["error"]["type"] == "unknown" + assert error["error"]["message"] == "Unknown error" From 49d7b019c80121c6fb56edd735707bb170196be4 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 1 Oct 2024 01:06:08 +0200 Subject: [PATCH 19/27] Fix test_es_dead_letter_index_with_included_action_error --- tests/handlers/aws/test_integrations.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index 6e9821dc..6b1a603c 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -4618,6 +4618,13 @@ def test_es_dead_letter_index_with_included_action_error(self) -> None: assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0 + # Test document has been redirected to dli + assert self.elasticsearch.exists(index=dead_letter_index_name) is True + + self.elasticsearch.refresh(index=dead_letter_index_name) + + assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 1 + res = self.elasticsearch.search(index=dead_letter_index_name, sort="_seq_no") assert res["hits"]["total"] == {"value": 1, "relation": "eq"} From 82e663f72b28b4ffcdcab96f4da259bfcb0d7ff4 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 1 Oct 2024 16:20:02 +0200 Subject: [PATCH 20/27] Clean up debug loggers --- shippers/es.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index ce5de6c3..73e0d4aa 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -279,17 +279,13 @@ def flush(self) -> None: if len(failed) > 0 and self._es_dead_letter_index: failed = self._send_dead_letter_index(failed) - shared_logger.info(f"there are {len(failed)} failed actions") - # Send remaining failed requests to replay queue, if enabled if isinstance(failed, list) and len(failed) > 0 and self._replay_handler is not None: - shared_logger.info(f"replaying {len(failed)} failed actions") for outcome in failed: if "action" not in outcome: shared_logger.error("action could not be extracted to be replayed", extra={"outcome": outcome}) continue - shared_logger.info("replaying action", extra={"action": outcome["action"]}) self._replay_handler(self._output_destination, self._replay_args, outcome["action"]) self._bulk_actions = [] @@ -317,19 +313,16 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: non_indexed_actions: list[Any] = [] encoded_actions = [] - shared_logger.info(f"forwarding {len(actions)} actions to dead letter index") - for action in actions: - if "http" not in action or ( + if "http" not in action or ( # no http status: connection error self._es_dead_letter_forward_errors and action["error"]["type"] not in self._es_dead_letter_forward_errors ): # We don't want to forward this action to # the dead letter index. # - # Add it to the list of non-indexed actions - # and continue to the next. - shared_logger.info("action not forwarded to dead letter index") + # Add the action to the list of non-indexed + # actions and continue with the next one. non_indexed_actions.append(action) continue @@ -349,7 +342,6 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: failed = self._handle_outcome(actions=encoded_actions, errors=errors) if not isinstance(failed, list) or len(failed) == 0: - shared_logger.info("all actions forwarded to dead letter index") return non_indexed_actions for action in failed: @@ -361,7 +353,6 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: non_indexed_actions.append(event_payload) - shared_logger.info(f"{len(failed)} actions failed to be forwarded to dead letter index") return non_indexed_actions def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]: From 572e7644f645dbea728d8f679ba2316c12b2f6fe Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Tue, 1 Oct 2024 16:20:53 +0200 Subject: [PATCH 21/27] Add http.response.status_code check on parse_error --- tests/shippers/test_es.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/shippers/test_es.py b/tests/shippers/test_es.py index 87944ae2..dcf774ca 100644 --- a/tests/shippers/test_es.py +++ b/tests/shippers/test_es.py @@ -537,6 +537,7 @@ def test_parse_error(self) -> None: assert error["error"]["type"] == "fail_processor_exception" assert error["error"]["message"] == "Fail message" + assert error["http"]["response"]["status_code"] == 500 with self.subTest("connection_error"): error = shipper._parse_error( @@ -549,9 +550,11 @@ def test_parse_error(self) -> None: assert error["error"]["type"] == "" assert error["error"]["message"] == "whatever" + assert "http" not in error with self.subTest("unknown_error"): error = shipper._parse_error({}) assert error["error"]["type"] == "unknown" assert error["error"]["message"] == "Unknown error" + assert "http" not in error From 0e01d22f2cbeffe6d5e2a3634a5a9afd03a784a4 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Wed, 2 Oct 2024 17:08:09 +0200 Subject: [PATCH 22/27] Update docs --- ...ploy-elastic-serverless-forwarder.asciidoc | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc b/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc index cbcd100b..bf9b2acb 100644 --- a/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc +++ b/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc @@ -76,6 +76,9 @@ inputs: password: "password" es_datastream_name: "logs-generic-default" es_dead_letter_index: "esf-dead-letter-index" # optional + es_dead_letter_forward_errors: # optional (default: empty list) + - document_parsing_exception + - index_not_found_exception batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -100,6 +103,9 @@ inputs: password: "password" es_datastream_name: "logs-generic-default" es_dead_letter_index: "esf-dead-letter-index" # optional + es_dead_letter_forward_errors: # optional (default: empty list) + - document_parsing_exception + - index_not_found_exception batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -124,6 +130,9 @@ inputs: password: "password" es_datastream_name: "logs-generic-default" es_dead_letter_index: "esf-dead-letter-index" # optional + es_dead_letter_forward_errors: # optional (default: empty list) + - document_parsing_exception + - index_not_found_exception batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -148,6 +157,9 @@ inputs: password: "password" es_datastream_name: "logs-generic-default" es_dead_letter_index: "esf-dead-letter-index" # optional + es_dead_letter_forward_errors: # optional (default: empty list) + - document_parsing_exception + - index_not_found_exception batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -172,6 +184,9 @@ inputs: password: "password" es_datastream_name: "logs-generic-default" es_dead_letter_index: "esf-dead-letter-index" # optional + es_dead_letter_forward_errors: # optional (default: empty list) + - document_parsing_exception + - index_not_found_exception batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -230,6 +245,10 @@ For `elasticsearch` the following arguments are supported: * `args.api_key`: API key of elasticsearch endpoint in the format `base64encode(api_key_id:api_key_secret)`. Mandatory when `args.username` and `args.password` are not provided. Will be ignored if `args.username`/`args.password` are defined. * `args.es_datastream_name`: Name of data stream or index where logs should be forwarded to. Lambda supports automatic routing of various {aws} service logs to the corresponding data streams for further processing and storage in the {es} cluster. It supports automatic routing of `aws.cloudtrail`, `aws.cloudwatch_logs`, `aws.elb_logs`, `aws.firewall_logs`, `aws.vpcflow`, and `aws.waf` logs. For other log types, if using data streams, you can optionally set its value in the configuration file according to the naming convention for data streams and available integrations. If the `es_datastream_name` is not specified and it cannot be matched with any of the above {aws} services, then the value will be set to `logs-generic-default`. In versions **v0.29.1** and below, this configuration parameter was named `es_index_or_datastream_name`. Rename the configuration parameter to `es_datastream_name` in your `config.yaml` file on the S3 bucket to continue using it in the future version. The older name `es_index_or_datastream_name` is deprecated as of version **v0.30.0**. The related backward compatibility code is removed from version **v1.0.0**. * `args.es_dead_letter_index`: Name of data stream or index where logs should be redirected to, in case indexing to `args.es_datastream_name` returned an error. + * `es_dead_letter_forward_errors`: List of errors that should be forwarded to the dead letter index. The default value is an empty list (forward all errors). If the list is not empty, only the errors in the list will be forwarded to the dead letter index. + In general, you can use values from the `error.type` field in the Elasticseach API response. Here is a few examples of errors that can be forwarded to the dead letter index: + - `document_parsing_exception`: The document could not be parsed correctly. + - `index_not_found_exception`: The index does not exist. * `args.batch_max_actions`: (Optional) Maximum number of actions to send in a single bulk request. Default value: 500. * `args.batch_max_bytes`: (Optional) Maximum size in bytes to send in a single bulk request. Default value: 10485760 (10MB). * `args.ssl_assert_fingerprint`: (Optional) SSL fingerprint for self-signed SSL certificate on HTTPS transport. The default value is an empty string, meaning the HTTP client requires a valid certificate. From 6b83e5c424a77088992c860d1e62124f090a7d1f Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Fri, 4 Oct 2024 16:16:27 +0200 Subject: [PATCH 23/27] Clarify es_dead_letter_forward_errors docs I'm trying! --- docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc b/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc index bf9b2acb..b81ffd15 100644 --- a/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc +++ b/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc @@ -246,7 +246,7 @@ For `elasticsearch` the following arguments are supported: * `args.es_datastream_name`: Name of data stream or index where logs should be forwarded to. Lambda supports automatic routing of various {aws} service logs to the corresponding data streams for further processing and storage in the {es} cluster. It supports automatic routing of `aws.cloudtrail`, `aws.cloudwatch_logs`, `aws.elb_logs`, `aws.firewall_logs`, `aws.vpcflow`, and `aws.waf` logs. For other log types, if using data streams, you can optionally set its value in the configuration file according to the naming convention for data streams and available integrations. If the `es_datastream_name` is not specified and it cannot be matched with any of the above {aws} services, then the value will be set to `logs-generic-default`. In versions **v0.29.1** and below, this configuration parameter was named `es_index_or_datastream_name`. Rename the configuration parameter to `es_datastream_name` in your `config.yaml` file on the S3 bucket to continue using it in the future version. The older name `es_index_or_datastream_name` is deprecated as of version **v0.30.0**. The related backward compatibility code is removed from version **v1.0.0**. * `args.es_dead_letter_index`: Name of data stream or index where logs should be redirected to, in case indexing to `args.es_datastream_name` returned an error. * `es_dead_letter_forward_errors`: List of errors that should be forwarded to the dead letter index. The default value is an empty list (forward all errors). If the list is not empty, only the errors in the list will be forwarded to the dead letter index. - In general, you can use values from the `error.type` field in the Elasticseach API response. Here is a few examples of errors that can be forwarded to the dead letter index: + You can use the error types from the Elasticseach API (see `error.type` in API response). Here is a few examples: - `document_parsing_exception`: The document could not be parsed correctly. - `index_not_found_exception`: The index does not exist. * `args.batch_max_actions`: (Optional) Maximum number of actions to send in a single bulk request. Default value: 500. From b1d464259eeaaa6573c7a15c6ab79b157e38f181 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Mon, 7 Oct 2024 06:36:22 +0200 Subject: [PATCH 24/27] Exclude retryable errors from DLI ES shipper should only forward to DLI persistent errors like mapping exceptions. --- ...ploy-elastic-serverless-forwarder.asciidoc | 19 - share/config.py | 13 - shippers/es.py | 10 +- shippers/factory.py | 1 - tests/handlers/aws/test_integrations.py | 454 +++++++++--------- 5 files changed, 231 insertions(+), 266 deletions(-) diff --git a/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc b/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc index b81ffd15..cbcd100b 100644 --- a/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc +++ b/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc @@ -76,9 +76,6 @@ inputs: password: "password" es_datastream_name: "logs-generic-default" es_dead_letter_index: "esf-dead-letter-index" # optional - es_dead_letter_forward_errors: # optional (default: empty list) - - document_parsing_exception - - index_not_found_exception batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -103,9 +100,6 @@ inputs: password: "password" es_datastream_name: "logs-generic-default" es_dead_letter_index: "esf-dead-letter-index" # optional - es_dead_letter_forward_errors: # optional (default: empty list) - - document_parsing_exception - - index_not_found_exception batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -130,9 +124,6 @@ inputs: password: "password" es_datastream_name: "logs-generic-default" es_dead_letter_index: "esf-dead-letter-index" # optional - es_dead_letter_forward_errors: # optional (default: empty list) - - document_parsing_exception - - index_not_found_exception batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -157,9 +148,6 @@ inputs: password: "password" es_datastream_name: "logs-generic-default" es_dead_letter_index: "esf-dead-letter-index" # optional - es_dead_letter_forward_errors: # optional (default: empty list) - - document_parsing_exception - - index_not_found_exception batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -184,9 +172,6 @@ inputs: password: "password" es_datastream_name: "logs-generic-default" es_dead_letter_index: "esf-dead-letter-index" # optional - es_dead_letter_forward_errors: # optional (default: empty list) - - document_parsing_exception - - index_not_found_exception batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -245,10 +230,6 @@ For `elasticsearch` the following arguments are supported: * `args.api_key`: API key of elasticsearch endpoint in the format `base64encode(api_key_id:api_key_secret)`. Mandatory when `args.username` and `args.password` are not provided. Will be ignored if `args.username`/`args.password` are defined. * `args.es_datastream_name`: Name of data stream or index where logs should be forwarded to. Lambda supports automatic routing of various {aws} service logs to the corresponding data streams for further processing and storage in the {es} cluster. It supports automatic routing of `aws.cloudtrail`, `aws.cloudwatch_logs`, `aws.elb_logs`, `aws.firewall_logs`, `aws.vpcflow`, and `aws.waf` logs. For other log types, if using data streams, you can optionally set its value in the configuration file according to the naming convention for data streams and available integrations. If the `es_datastream_name` is not specified and it cannot be matched with any of the above {aws} services, then the value will be set to `logs-generic-default`. In versions **v0.29.1** and below, this configuration parameter was named `es_index_or_datastream_name`. Rename the configuration parameter to `es_datastream_name` in your `config.yaml` file on the S3 bucket to continue using it in the future version. The older name `es_index_or_datastream_name` is deprecated as of version **v0.30.0**. The related backward compatibility code is removed from version **v1.0.0**. * `args.es_dead_letter_index`: Name of data stream or index where logs should be redirected to, in case indexing to `args.es_datastream_name` returned an error. - * `es_dead_letter_forward_errors`: List of errors that should be forwarded to the dead letter index. The default value is an empty list (forward all errors). If the list is not empty, only the errors in the list will be forwarded to the dead letter index. - You can use the error types from the Elasticseach API (see `error.type` in API response). Here is a few examples: - - `document_parsing_exception`: The document could not be parsed correctly. - - `index_not_found_exception`: The index does not exist. * `args.batch_max_actions`: (Optional) Maximum number of actions to send in a single bulk request. Default value: 500. * `args.batch_max_bytes`: (Optional) Maximum size in bytes to send in a single bulk request. Default value: 10485760 (10MB). * `args.ssl_assert_fingerprint`: (Optional) SSL fingerprint for self-signed SSL certificate on HTTPS transport. The default value is an empty string, meaning the HTTP client requires a valid certificate. diff --git a/share/config.py b/share/config.py index 98f84122..de25b91a 100644 --- a/share/config.py +++ b/share/config.py @@ -51,7 +51,6 @@ def __init__( batch_max_bytes: int = 10 * 1024 * 1024, ssl_assert_fingerprint: str = "", es_dead_letter_index: str = "", - es_dead_letter_forward_errors: List[str] = [], ): super().__init__(output_type="elasticsearch") self.elasticsearch_url = elasticsearch_url @@ -65,7 +64,6 @@ def __init__( self.batch_max_bytes = batch_max_bytes self.ssl_assert_fingerprint = ssl_assert_fingerprint self.es_dead_letter_index = es_dead_letter_index - self.es_dead_letter_forward_errors = es_dead_letter_forward_errors if self.cloud_id and self.elasticsearch_url: shared_logger.warning("both `elasticsearch_url` and `cloud_id` set in config: using `elasticsearch_url`") @@ -197,17 +195,6 @@ def es_dead_letter_index(self, value: str) -> None: self._es_dead_letter_index = value - @property - def es_dead_letter_forward_errors(self) -> List[str]: - return self._es_dead_letter_forward_errors - - @es_dead_letter_forward_errors.setter - def es_dead_letter_forward_errors(self, value: List[str]) -> None: - if not isinstance(value, list): - raise ValueError("`es_dead_letter_forward_errors` must be provided as list") - - self._es_dead_letter_forward_errors = value - class LogstashOutput(Output): def __init__( diff --git a/shippers/es.py b/shippers/es.py index 73e0d4aa..b6e65d57 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -22,6 +22,8 @@ _EVENT_BUFFERED = "_EVENT_BUFFERED" _EVENT_SENT = "_EVENT_SENT" _VERSION_CONFLICT = 409 +# List of HTTP status codes that are considered retryable +_retryable_http_status_codes = [429, 502, 503, 504] class JSONSerializer(Serializer): @@ -61,7 +63,6 @@ def __init__( api_key: str = "", es_datastream_name: str = "", es_dead_letter_index: str = "", - es_dead_letter_forward_errors: List[str] = [], tags: list[str] = [], batch_max_actions: int = 500, batch_max_bytes: int = 10 * 1024 * 1024, @@ -113,7 +114,6 @@ def __init__( self._es_datastream_name = es_datastream_name self._es_dead_letter_index = es_dead_letter_index - self._es_dead_letter_forward_errors = es_dead_letter_forward_errors self._tags = tags self._es_index = "" @@ -314,9 +314,9 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: encoded_actions = [] for action in actions: - if "http" not in action or ( # no http status: connection error - self._es_dead_letter_forward_errors - and action["error"]["type"] not in self._es_dead_letter_forward_errors + if ( + "http" not in action # no http status: connection error + or action["http"]["response"]["status_code"] in _retryable_http_status_codes ): # We don't want to forward this action to # the dead letter index. diff --git a/shippers/factory.py b/shippers/factory.py index 65e4ed18..0f444434 100644 --- a/shippers/factory.py +++ b/shippers/factory.py @@ -49,7 +49,6 @@ def create_from_output(output_type: str, output: Output) -> ProtocolShipper: batch_max_bytes=output.batch_max_bytes, ssl_assert_fingerprint=output.ssl_assert_fingerprint, es_dead_letter_index=output.es_dead_letter_index, - es_dead_letter_forward_errors=output.es_dead_letter_forward_errors, ) if output_type == "logstash": diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index 6b1a603c..8877c0b7 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -4421,231 +4421,229 @@ def test_es_non_indexable_dead_letter_index(self) -> None: assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000" assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] - def test_es_dead_letter_index_with_excluded_action_error(self) -> None: - assert isinstance(self.elasticsearch, ElasticsearchContainer) - assert isinstance(self.localstack, LocalStackContainer) - - sqs_queue_name = _time_based_id(suffix="source-sqs") - sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) - - dead_letter_index_name = "logs-generic-default-dli" - - sqs_queue_arn = sqs_queue["QueueArn"] - sqs_queue_url = sqs_queue["QueueUrl"] - sqs_queue_url_path = sqs_queue["QueueUrlPath"] - - config_yaml: str = f""" - inputs: - - type: sqs - id: "{sqs_queue_arn}" - tags: {self.default_tags} - outputs: - - type: "elasticsearch" - args: - elasticsearch_url: "{self.elasticsearch.get_url()}" - es_dead_letter_index: "{dead_letter_index_name}" - es_dead_letter_forward_errors: - - non_existent_error - ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} - username: "{self.secret_arn}:username" - password: "{self.secret_arn}:password" - """ - - config_file_path = "config.yaml" - config_bucket_name = _time_based_id(suffix="config-bucket") - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - ) - - os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - - fixtures = [ - _load_file_fixture("cloudwatch-log-1.json"), - ] - - _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) - - event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) - message_id = event["Records"][0]["messageId"] - - # Create pipeline to reject documents - processors = { - "processors": [ - { - "fail": { - "message": "test_es_dead_letter_index_with_excluded_action_error fail message", - } - }, - ] - } - - self.elasticsearch.put_pipeline( - id="test_es_dead_letter_index_with_excluded_action_error_fail_pipeline", - body=processors, - ) - - self.elasticsearch.create_data_stream(name="logs-generic-default") - self.elasticsearch.put_settings( - index="logs-generic-default", - body={"index.default_pipeline": "test_es_dead_letter_index_with_excluded_action_error_fail_pipeline"}, - ) - - self.elasticsearch.refresh(index="logs-generic-default") - - self.elasticsearch.create_data_stream(name=dead_letter_index_name) - - ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) - first_call = handler(event, ctx) # type:ignore - - assert first_call == "completed" - - # Test document has been rejected from target index - self.elasticsearch.refresh(index="logs-generic-default") - - assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0 - - # Test event does not go into the dead letter queue - assert self.elasticsearch.exists(index=dead_letter_index_name) is True - - self.elasticsearch.refresh(index=dead_letter_index_name) - - assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 0 - - # Test event has been redirected into the replay queue - events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) - assert len(events["Records"]) == 1 - - first_body: dict[str, Any] = json_parser(events["Records"][0]["body"]) - - assert first_body["event_payload"]["message"] == fixtures[0].rstrip("\n") - assert first_body["event_payload"]["log"]["offset"] == 0 - assert first_body["event_payload"]["log"]["file"]["path"] == sqs_queue_url_path - assert first_body["event_payload"]["aws"]["sqs"]["name"] == sqs_queue_name - assert first_body["event_payload"]["aws"]["sqs"]["message_id"] == message_id - assert first_body["event_payload"]["cloud"]["provider"] == "aws" - assert first_body["event_payload"]["cloud"]["region"] == "us-east-1" - assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000" - assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] - - def test_es_dead_letter_index_with_included_action_error(self) -> None: - assert isinstance(self.elasticsearch, ElasticsearchContainer) - assert isinstance(self.localstack, LocalStackContainer) - - sqs_queue_name = _time_based_id(suffix="source-sqs") - sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) - - dead_letter_index_name = "logs-generic-default-dli" - - sqs_queue_arn = sqs_queue["QueueArn"] - sqs_queue_url = sqs_queue["QueueUrl"] - sqs_queue_url_path = sqs_queue["QueueUrlPath"] - - config_yaml: str = f""" - inputs: - - type: sqs - id: "{sqs_queue_arn}" - tags: {self.default_tags} - outputs: - - type: "elasticsearch" - args: - elasticsearch_url: "{self.elasticsearch.get_url()}" - es_dead_letter_index: "{dead_letter_index_name}" - es_dead_letter_forward_errors: - - fail_processor_exception - ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} - username: "{self.secret_arn}:username" - password: "{self.secret_arn}:password" - """ - - config_file_path = "config.yaml" - config_bucket_name = _time_based_id(suffix="config-bucket") - _s3_upload_content_to_bucket( - client=self.s3_client, - content=config_yaml.encode("utf-8"), - content_type="text/plain", - bucket_name=config_bucket_name, - key=config_file_path, - ) - - os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - - fixtures = [ - _load_file_fixture("cloudwatch-log-1.json"), - ] - - _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) - - event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) - message_id = event["Records"][0]["messageId"] - - # Create pipeline to reject documents - processors = { - "processors": [ - { - "fail": { - "message": "test_es_dead_letter_index_with_included_action_error fail message", - } - }, - ] - } - - self.elasticsearch.put_pipeline( - id="test_es_dead_letter_index_with_included_action_error_fail_pipeline", - body=processors, - ) - - self.elasticsearch.create_data_stream(name="logs-generic-default") - self.elasticsearch.put_settings( - index="logs-generic-default", - body={"index.default_pipeline": "test_es_dead_letter_index_with_included_action_error_fail_pipeline"}, - ) - - self.elasticsearch.refresh(index="logs-generic-default") - - self.elasticsearch.create_data_stream(name=dead_letter_index_name) - - ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) - first_call = handler(event, ctx) # type:ignore - - assert first_call == "completed" - - # Test document has been rejected from target index - self.elasticsearch.refresh(index="logs-generic-default") - - assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0 - - # Test document has been redirected to dli - assert self.elasticsearch.exists(index=dead_letter_index_name) is True - - self.elasticsearch.refresh(index=dead_letter_index_name) - - assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 1 - - res = self.elasticsearch.search(index=dead_letter_index_name, sort="_seq_no") - - assert res["hits"]["total"] == {"value": 1, "relation": "eq"} - - assert ( - res["hits"]["hits"][0]["_source"]["error"]["message"] - == "test_es_dead_letter_index_with_included_action_error fail message" - ) - assert res["hits"]["hits"][0]["_source"]["error"]["type"] == "fail_processor_exception" - assert res["hits"]["hits"][0]["_source"]["http"]["response"]["status_code"] == 500 - dead_letter_message = json_parser(res["hits"]["hits"][0]["_source"]["message"]) - assert dead_letter_message["log"]["offset"] == 0 - assert dead_letter_message["log"]["file"]["path"] == sqs_queue_url_path - assert dead_letter_message["aws"]["sqs"]["name"] == sqs_queue_name - assert dead_letter_message["aws"]["sqs"]["message_id"] == message_id - assert dead_letter_message["cloud"]["provider"] == "aws" - assert dead_letter_message["cloud"]["region"] == "us-east-1" - assert dead_letter_message["cloud"]["account"]["id"] == "000000000000" - assert dead_letter_message["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] - - # Test event does not go into the replay queue - events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) - - assert len(events["Records"]) == 0 + # def test_es_dead_letter_index_with_excluded_action_error(self) -> None: + # assert isinstance(self.elasticsearch, ElasticsearchContainer) + # assert isinstance(self.localstack, LocalStackContainer) + # + # sqs_queue_name = _time_based_id(suffix="source-sqs") + # sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) + # + # dead_letter_index_name = "logs-generic-default-dli" + # + # sqs_queue_arn = sqs_queue["QueueArn"] + # sqs_queue_url = sqs_queue["QueueUrl"] + # sqs_queue_url_path = sqs_queue["QueueUrlPath"] + # + # config_yaml: str = f""" + # inputs: + # - type: sqs + # id: "{sqs_queue_arn}" + # tags: {self.default_tags} + # outputs: + # - type: "elasticsearch" + # args: + # elasticsearch_url: "{self.elasticsearch.get_url()}" + # es_dead_letter_index: "{dead_letter_index_name}" + # ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} + # username: "{self.secret_arn}:username" + # password: "{self.secret_arn}:password" + # """ + # + # config_file_path = "config.yaml" + # config_bucket_name = _time_based_id(suffix="config-bucket") + # _s3_upload_content_to_bucket( + # client=self.s3_client, + # content=config_yaml.encode("utf-8"), + # content_type="text/plain", + # bucket_name=config_bucket_name, + # key=config_file_path, + # ) + # + # os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" + # + # fixtures = [ + # _load_file_fixture("cloudwatch-log-1.json"), + # ] + # + # _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) + # + # event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) + # message_id = event["Records"][0]["messageId"] + # + # # Create pipeline to reject documents + # processors = { + # "processors": [ + # { + # "fail": { + # "message": "test_es_dead_letter_index_with_excluded_action_error fail message", + # } + # }, + # ] + # } + # + # self.elasticsearch.put_pipeline( + # id="test_es_dead_letter_index_with_excluded_action_error_fail_pipeline", + # body=processors, + # ) + # + # self.elasticsearch.create_data_stream(name="logs-generic-default") + # self.elasticsearch.put_settings( + # index="logs-generic-default", + # body={"index.default_pipeline": "test_es_dead_letter_index_with_excluded_action_error_fail_pipeline"}, + # ) + # + # self.elasticsearch.refresh(index="logs-generic-default") + # + # self.elasticsearch.create_data_stream(name=dead_letter_index_name) + # + # ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) + # first_call = handler(event, ctx) # type:ignore + # + # assert first_call == "completed" + # + # # Test document has been rejected from target index + # self.elasticsearch.refresh(index="logs-generic-default") + # + # assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0 + # + # # Test event does not go into the dead letter queue + # assert self.elasticsearch.exists(index=dead_letter_index_name) is True + # + # self.elasticsearch.refresh(index=dead_letter_index_name) + # + # assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 0 + # + # # Test event has been redirected into the replay queue + # events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) + # assert len(events["Records"]) == 1 + # + # first_body: dict[str, Any] = json_parser(events["Records"][0]["body"]) + # + # assert first_body["event_payload"]["message"] == fixtures[0].rstrip("\n") + # assert first_body["event_payload"]["log"]["offset"] == 0 + # assert first_body["event_payload"]["log"]["file"]["path"] == sqs_queue_url_path + # assert first_body["event_payload"]["aws"]["sqs"]["name"] == sqs_queue_name + # assert first_body["event_payload"]["aws"]["sqs"]["message_id"] == message_id + # assert first_body["event_payload"]["cloud"]["provider"] == "aws" + # assert first_body["event_payload"]["cloud"]["region"] == "us-east-1" + # assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000" + # assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] + # + # def test_es_dead_letter_index_with_included_action_error(self) -> None: + # assert isinstance(self.elasticsearch, ElasticsearchContainer) + # assert isinstance(self.localstack, LocalStackContainer) + # + # sqs_queue_name = _time_based_id(suffix="source-sqs") + # sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) + # + # dead_letter_index_name = "logs-generic-default-dli" + # + # sqs_queue_arn = sqs_queue["QueueArn"] + # sqs_queue_url = sqs_queue["QueueUrl"] + # sqs_queue_url_path = sqs_queue["QueueUrlPath"] + # + # config_yaml: str = f""" + # inputs: + # - type: sqs + # id: "{sqs_queue_arn}" + # tags: {self.default_tags} + # outputs: + # - type: "elasticsearch" + # args: + # elasticsearch_url: "{self.elasticsearch.get_url()}" + # es_dead_letter_index: "{dead_letter_index_name}" + # es_dead_letter_forward_errors: + # - fail_processor_exception + # ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} + # username: "{self.secret_arn}:username" + # password: "{self.secret_arn}:password" + # """ + # + # config_file_path = "config.yaml" + # config_bucket_name = _time_based_id(suffix="config-bucket") + # _s3_upload_content_to_bucket( + # client=self.s3_client, + # content=config_yaml.encode("utf-8"), + # content_type="text/plain", + # bucket_name=config_bucket_name, + # key=config_file_path, + # ) + # + # os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" + # + # fixtures = [ + # _load_file_fixture("cloudwatch-log-1.json"), + # ] + # + # _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) + # + # event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) + # message_id = event["Records"][0]["messageId"] + # + # # Create pipeline to reject documents + # processors = { + # "processors": [ + # { + # "fail": { + # "message": "test_es_dead_letter_index_with_included_action_error fail message", + # } + # }, + # ] + # } + # + # self.elasticsearch.put_pipeline( + # id="test_es_dead_letter_index_with_included_action_error_fail_pipeline", + # body=processors, + # ) + # + # self.elasticsearch.create_data_stream(name="logs-generic-default") + # self.elasticsearch.put_settings( + # index="logs-generic-default", + # body={"index.default_pipeline": "test_es_dead_letter_index_with_included_action_error_fail_pipeline"}, + # ) + # + # self.elasticsearch.refresh(index="logs-generic-default") + # + # self.elasticsearch.create_data_stream(name=dead_letter_index_name) + # + # ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) + # first_call = handler(event, ctx) # type:ignore + # + # assert first_call == "completed" + # + # # Test document has been rejected from target index + # self.elasticsearch.refresh(index="logs-generic-default") + # + # assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0 + # + # # Test document has been redirected to dli + # assert self.elasticsearch.exists(index=dead_letter_index_name) is True + # + # self.elasticsearch.refresh(index=dead_letter_index_name) + # + # assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 1 + # + # res = self.elasticsearch.search(index=dead_letter_index_name, sort="_seq_no") + # + # assert res["hits"]["total"] == {"value": 1, "relation": "eq"} + # + # assert ( + # res["hits"]["hits"][0]["_source"]["error"]["message"] + # == "test_es_dead_letter_index_with_included_action_error fail message" + # ) + # assert res["hits"]["hits"][0]["_source"]["error"]["type"] == "fail_processor_exception" + # assert res["hits"]["hits"][0]["_source"]["http"]["response"]["status_code"] == 500 + # dead_letter_message = json_parser(res["hits"]["hits"][0]["_source"]["message"]) + # assert dead_letter_message["log"]["offset"] == 0 + # assert dead_letter_message["log"]["file"]["path"] == sqs_queue_url_path + # assert dead_letter_message["aws"]["sqs"]["name"] == sqs_queue_name + # assert dead_letter_message["aws"]["sqs"]["message_id"] == message_id + # assert dead_letter_message["cloud"]["provider"] == "aws" + # assert dead_letter_message["cloud"]["region"] == "us-east-1" + # assert dead_letter_message["cloud"]["account"]["id"] == "000000000000" + # assert dead_letter_message["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] + # + # # Test event does not go into the replay queue + # events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) + # + # assert len(events["Records"]) == 0 From e485725bfed5c92219787b29d150ebf5662aec16 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Mon, 7 Oct 2024 07:48:53 +0200 Subject: [PATCH 25/27] Add a non-retryable error test and fix linter --- share/config.py | 2 +- shippers/es.py | 2 +- tests/handlers/aws/test_integrations.py | 338 ++++++++---------------- 3 files changed, 114 insertions(+), 228 deletions(-) diff --git a/share/config.py b/share/config.py index de25b91a..dc38a398 100644 --- a/share/config.py +++ b/share/config.py @@ -2,7 +2,7 @@ # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. -from typing import Any, Callable, List, Optional, Union +from typing import Any, Callable, Optional, Union import yaml diff --git a/shippers/es.py b/shippers/es.py index b6e65d57..004aa793 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -4,7 +4,7 @@ import datetime import uuid -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, Optional, Union import elasticapm # noqa: F401 from elasticsearch import Elasticsearch diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index 8877c0b7..c6654adb 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -4421,229 +4421,115 @@ def test_es_non_indexable_dead_letter_index(self) -> None: assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000" assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] - # def test_es_dead_letter_index_with_excluded_action_error(self) -> None: - # assert isinstance(self.elasticsearch, ElasticsearchContainer) - # assert isinstance(self.localstack, LocalStackContainer) - # - # sqs_queue_name = _time_based_id(suffix="source-sqs") - # sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) - # - # dead_letter_index_name = "logs-generic-default-dli" - # - # sqs_queue_arn = sqs_queue["QueueArn"] - # sqs_queue_url = sqs_queue["QueueUrl"] - # sqs_queue_url_path = sqs_queue["QueueUrlPath"] - # - # config_yaml: str = f""" - # inputs: - # - type: sqs - # id: "{sqs_queue_arn}" - # tags: {self.default_tags} - # outputs: - # - type: "elasticsearch" - # args: - # elasticsearch_url: "{self.elasticsearch.get_url()}" - # es_dead_letter_index: "{dead_letter_index_name}" - # ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} - # username: "{self.secret_arn}:username" - # password: "{self.secret_arn}:password" - # """ - # - # config_file_path = "config.yaml" - # config_bucket_name = _time_based_id(suffix="config-bucket") - # _s3_upload_content_to_bucket( - # client=self.s3_client, - # content=config_yaml.encode("utf-8"), - # content_type="text/plain", - # bucket_name=config_bucket_name, - # key=config_file_path, - # ) - # - # os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - # - # fixtures = [ - # _load_file_fixture("cloudwatch-log-1.json"), - # ] - # - # _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) - # - # event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) - # message_id = event["Records"][0]["messageId"] - # - # # Create pipeline to reject documents - # processors = { - # "processors": [ - # { - # "fail": { - # "message": "test_es_dead_letter_index_with_excluded_action_error fail message", - # } - # }, - # ] - # } - # - # self.elasticsearch.put_pipeline( - # id="test_es_dead_letter_index_with_excluded_action_error_fail_pipeline", - # body=processors, - # ) - # - # self.elasticsearch.create_data_stream(name="logs-generic-default") - # self.elasticsearch.put_settings( - # index="logs-generic-default", - # body={"index.default_pipeline": "test_es_dead_letter_index_with_excluded_action_error_fail_pipeline"}, - # ) - # - # self.elasticsearch.refresh(index="logs-generic-default") - # - # self.elasticsearch.create_data_stream(name=dead_letter_index_name) - # - # ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) - # first_call = handler(event, ctx) # type:ignore - # - # assert first_call == "completed" - # - # # Test document has been rejected from target index - # self.elasticsearch.refresh(index="logs-generic-default") - # - # assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0 - # - # # Test event does not go into the dead letter queue - # assert self.elasticsearch.exists(index=dead_letter_index_name) is True - # - # self.elasticsearch.refresh(index=dead_letter_index_name) - # - # assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 0 - # - # # Test event has been redirected into the replay queue - # events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) - # assert len(events["Records"]) == 1 - # - # first_body: dict[str, Any] = json_parser(events["Records"][0]["body"]) - # - # assert first_body["event_payload"]["message"] == fixtures[0].rstrip("\n") - # assert first_body["event_payload"]["log"]["offset"] == 0 - # assert first_body["event_payload"]["log"]["file"]["path"] == sqs_queue_url_path - # assert first_body["event_payload"]["aws"]["sqs"]["name"] == sqs_queue_name - # assert first_body["event_payload"]["aws"]["sqs"]["message_id"] == message_id - # assert first_body["event_payload"]["cloud"]["provider"] == "aws" - # assert first_body["event_payload"]["cloud"]["region"] == "us-east-1" - # assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000" - # assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] - # - # def test_es_dead_letter_index_with_included_action_error(self) -> None: - # assert isinstance(self.elasticsearch, ElasticsearchContainer) - # assert isinstance(self.localstack, LocalStackContainer) - # - # sqs_queue_name = _time_based_id(suffix="source-sqs") - # sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) - # - # dead_letter_index_name = "logs-generic-default-dli" - # - # sqs_queue_arn = sqs_queue["QueueArn"] - # sqs_queue_url = sqs_queue["QueueUrl"] - # sqs_queue_url_path = sqs_queue["QueueUrlPath"] - # - # config_yaml: str = f""" - # inputs: - # - type: sqs - # id: "{sqs_queue_arn}" - # tags: {self.default_tags} - # outputs: - # - type: "elasticsearch" - # args: - # elasticsearch_url: "{self.elasticsearch.get_url()}" - # es_dead_letter_index: "{dead_letter_index_name}" - # es_dead_letter_forward_errors: - # - fail_processor_exception - # ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} - # username: "{self.secret_arn}:username" - # password: "{self.secret_arn}:password" - # """ - # - # config_file_path = "config.yaml" - # config_bucket_name = _time_based_id(suffix="config-bucket") - # _s3_upload_content_to_bucket( - # client=self.s3_client, - # content=config_yaml.encode("utf-8"), - # content_type="text/plain", - # bucket_name=config_bucket_name, - # key=config_file_path, - # ) - # - # os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" - # - # fixtures = [ - # _load_file_fixture("cloudwatch-log-1.json"), - # ] - # - # _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) - # - # event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) - # message_id = event["Records"][0]["messageId"] - # - # # Create pipeline to reject documents - # processors = { - # "processors": [ - # { - # "fail": { - # "message": "test_es_dead_letter_index_with_included_action_error fail message", - # } - # }, - # ] - # } - # - # self.elasticsearch.put_pipeline( - # id="test_es_dead_letter_index_with_included_action_error_fail_pipeline", - # body=processors, - # ) - # - # self.elasticsearch.create_data_stream(name="logs-generic-default") - # self.elasticsearch.put_settings( - # index="logs-generic-default", - # body={"index.default_pipeline": "test_es_dead_letter_index_with_included_action_error_fail_pipeline"}, - # ) - # - # self.elasticsearch.refresh(index="logs-generic-default") - # - # self.elasticsearch.create_data_stream(name=dead_letter_index_name) - # - # ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) - # first_call = handler(event, ctx) # type:ignore - # - # assert first_call == "completed" - # - # # Test document has been rejected from target index - # self.elasticsearch.refresh(index="logs-generic-default") - # - # assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0 - # - # # Test document has been redirected to dli - # assert self.elasticsearch.exists(index=dead_letter_index_name) is True - # - # self.elasticsearch.refresh(index=dead_letter_index_name) - # - # assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 1 - # - # res = self.elasticsearch.search(index=dead_letter_index_name, sort="_seq_no") - # - # assert res["hits"]["total"] == {"value": 1, "relation": "eq"} - # - # assert ( - # res["hits"]["hits"][0]["_source"]["error"]["message"] - # == "test_es_dead_letter_index_with_included_action_error fail message" - # ) - # assert res["hits"]["hits"][0]["_source"]["error"]["type"] == "fail_processor_exception" - # assert res["hits"]["hits"][0]["_source"]["http"]["response"]["status_code"] == 500 - # dead_letter_message = json_parser(res["hits"]["hits"][0]["_source"]["message"]) - # assert dead_letter_message["log"]["offset"] == 0 - # assert dead_letter_message["log"]["file"]["path"] == sqs_queue_url_path - # assert dead_letter_message["aws"]["sqs"]["name"] == sqs_queue_name - # assert dead_letter_message["aws"]["sqs"]["message_id"] == message_id - # assert dead_letter_message["cloud"]["provider"] == "aws" - # assert dead_letter_message["cloud"]["region"] == "us-east-1" - # assert dead_letter_message["cloud"]["account"]["id"] == "000000000000" - # assert dead_letter_message["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] - # - # # Test event does not go into the replay queue - # events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) - # - # assert len(events["Records"]) == 0 + def test_es_dead_letter_index_with_retryable_errors(self) -> None: + """ + Test that retryable errors are not redirected to the dead letter index (DLI). + """ + assert isinstance(self.elasticsearch, ElasticsearchContainer) + assert isinstance(self.localstack, LocalStackContainer) + + sqs_queue_name = _time_based_id(suffix="source-sqs") + sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) + + dead_letter_index_name = "logs-generic-default-dli" + + sqs_queue_arn = sqs_queue["QueueArn"] + sqs_queue_url = sqs_queue["QueueUrl"] + sqs_queue_url_path = sqs_queue["QueueUrlPath"] + + config_yaml: str = f""" + inputs: + - type: sqs + id: "{sqs_queue_arn}" + tags: {self.default_tags} + outputs: + - type: "elasticsearch" + args: + # This IP address is non-routable and + # will always result in a connection failure. + elasticsearch_url: "0.0.0.0:9200" + es_dead_letter_index: "{dead_letter_index_name}" + ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} + username: "{self.secret_arn}:username" + password: "{self.secret_arn}:password" + """ + + config_file_path = "config.yaml" + config_bucket_name = _time_based_id(suffix="config-bucket") + _s3_upload_content_to_bucket( + client=self.s3_client, + content=config_yaml.encode("utf-8"), + content_type="text/plain", + bucket_name=config_bucket_name, + key=config_file_path, + ) + + os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" + + fixtures = [ + _load_file_fixture("cloudwatch-log-1.json"), + ] + + _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) + + event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) + message_id = event["Records"][0]["messageId"] + + # Create pipeline to reject documents + processors = { + "processors": [ + { + "fail": { + "message": "test_es_dead_letter_index_with_retryable_errors fail message", + } + }, + ] + } + + self.elasticsearch.put_pipeline( + id="test_es_dead_letter_index_with_retryable_errors_fail_pipeline", + body=processors, + ) + + self.elasticsearch.create_data_stream(name="logs-generic-default") + self.elasticsearch.put_settings( + index="logs-generic-default", + body={"index.default_pipeline": "test_es_dead_letter_index_with_retryable_errors_fail_pipeline"}, + ) + + self.elasticsearch.refresh(index="logs-generic-default") + + self.elasticsearch.create_data_stream(name=dead_letter_index_name) + + ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) + first_call = handler(event, ctx) # type:ignore + + assert first_call == "completed" + + # Test document has been rejected from target index + self.elasticsearch.refresh(index="logs-generic-default") + + assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0 + + # Test event does not go into the dead letter queue + assert self.elasticsearch.exists(index=dead_letter_index_name) is True + + self.elasticsearch.refresh(index=dead_letter_index_name) + + assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 0 + + # Test event has been redirected into the replay queue + events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) + assert len(events["Records"]) == 1 + + first_body: dict[str, Any] = json_parser(events["Records"][0]["body"]) + + assert first_body["event_payload"]["message"] == fixtures[0].rstrip("\n") + assert first_body["event_payload"]["log"]["offset"] == 0 + assert first_body["event_payload"]["log"]["file"]["path"] == sqs_queue_url_path + assert first_body["event_payload"]["aws"]["sqs"]["name"] == sqs_queue_name + assert first_body["event_payload"]["aws"]["sqs"]["message_id"] == message_id + assert first_body["event_payload"]["cloud"]["provider"] == "aws" + assert first_body["event_payload"]["cloud"]["region"] == "us-east-1" + assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000" + assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] From 05f32902b4762f187d2cfce8a66eb8cadda32023 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Mon, 7 Oct 2024 08:36:36 +0200 Subject: [PATCH 26/27] Update retryable status codes list --- shippers/es.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index 004aa793..6737acd0 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -3,6 +3,7 @@ # you may not use this file except in compliance with the Elastic License 2.0. import datetime +import http import uuid from typing import Any, Dict, Optional, Union @@ -21,9 +22,10 @@ _EVENT_BUFFERED = "_EVENT_BUFFERED" _EVENT_SENT = "_EVENT_SENT" -_VERSION_CONFLICT = 409 # List of HTTP status codes that are considered retryable -_retryable_http_status_codes = [429, 502, 503, 504] +_retryable_http_status_codes = [ + http.HTTPStatus.TOO_MANY_REQUESTS, +] class JSONSerializer(Serializer): @@ -174,7 +176,7 @@ def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Unio "elasticsearch shipper", extra={"error": error["create"]["error"], "_id": error["create"]["_id"]} ) - if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT: + if "status" in error["create"] and error["create"]["status"] == http.HTTPStatus.CONFLICT: # Skip duplicate events on dead letter index and replay queue continue From f41a5f97bf54ce0ea958c12d3c331f4f11f13fe0 Mon Sep 17 00:00:00 2001 From: Maurizio Branca Date: Mon, 7 Oct 2024 10:26:04 +0200 Subject: [PATCH 27/27] Update DLI docs and docstring --- ...ploy-elastic-serverless-forwarder.asciidoc | 21 ++++++++++++++++++- shippers/es.py | 19 ++++++++++------- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc b/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc index cbcd100b..bfff569b 100644 --- a/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc +++ b/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc @@ -229,11 +229,30 @@ For `elasticsearch` the following arguments are supported: * `args.password` Password of the elasticsearch instance to connect to. Mandatory when `args.api_key` is not provided. Will take precedence over `args.api_key` if both are defined. * `args.api_key`: API key of elasticsearch endpoint in the format `base64encode(api_key_id:api_key_secret)`. Mandatory when `args.username` and `args.password` are not provided. Will be ignored if `args.username`/`args.password` are defined. * `args.es_datastream_name`: Name of data stream or index where logs should be forwarded to. Lambda supports automatic routing of various {aws} service logs to the corresponding data streams for further processing and storage in the {es} cluster. It supports automatic routing of `aws.cloudtrail`, `aws.cloudwatch_logs`, `aws.elb_logs`, `aws.firewall_logs`, `aws.vpcflow`, and `aws.waf` logs. For other log types, if using data streams, you can optionally set its value in the configuration file according to the naming convention for data streams and available integrations. If the `es_datastream_name` is not specified and it cannot be matched with any of the above {aws} services, then the value will be set to `logs-generic-default`. In versions **v0.29.1** and below, this configuration parameter was named `es_index_or_datastream_name`. Rename the configuration parameter to `es_datastream_name` in your `config.yaml` file on the S3 bucket to continue using it in the future version. The older name `es_index_or_datastream_name` is deprecated as of version **v0.30.0**. The related backward compatibility code is removed from version **v1.0.0**. - * `args.es_dead_letter_index`: Name of data stream or index where logs should be redirected to, in case indexing to `args.es_datastream_name` returned an error. + * `args.es_dead_letter_index`: Name of data stream or index where logs should be redirected to, in case indexing to `args.es_datastream_name` returned an error. The elasticseach output will NOT forward retryable errors (connection failures, HTTP status code 429) to the dead letter index. * `args.batch_max_actions`: (Optional) Maximum number of actions to send in a single bulk request. Default value: 500. * `args.batch_max_bytes`: (Optional) Maximum size in bytes to send in a single bulk request. Default value: 10485760 (10MB). * `args.ssl_assert_fingerprint`: (Optional) SSL fingerprint for self-signed SSL certificate on HTTPS transport. The default value is an empty string, meaning the HTTP client requires a valid certificate. +. Here is a sample error indexed in the dead letter index: ++ +[source, json] +---- +{ + "@timestamp": "2024-10-07T05:57:59.448925Z", + "message": "{\"hey\":{\"message\":\"hey there\"},\"_id\":\"e6542822-4583-438d-9b4d-1a3023b5eeb9\",\"_op_type\":\"create\",\"_index\":\"logs-succeed.pr793-default\"}", + "error": { + "message": "[1:30] failed to parse field [hey] of type [keyword] in document with id 'e6542822-4583-438d-9b4d-1a3023b5eeb9'. Preview of field's value: '{message=hey there}'", + "type": "document_parsing_exception" + }, + "http": { + "response": { + "status_code": 400 + } + } +} +---- + For `logstash` the following arguments are supported: * `args.logstash_url`: URL of {ls} endpoint in the format `http(s)://host:port` diff --git a/shippers/es.py b/shippers/es.py index 6737acd0..68f7f59a 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -296,21 +296,24 @@ def flush(self) -> None: def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: """ - Send the failed actions to the dead letter index (DLI). + Index the failed actions in the dead letter index (DLI). - This function attempts to forward failed actions to the DLI, but may not do so + This function attempts to index failed actions to the DLI, but may not do so for one of the following reasons: - 1. The action response does not have an HTTP status (e.g., the connection failed). - 2. The list of action errors to forward is not empty, and the action error type is not in the list. - 3. The action could not be encoded for indexing in the DLI. - 4. The action failed indexing attempt in the DLI. + 1. The failed action could not be encoded for indexing in the DLI. + 2. ES returned an error on the attempt to index the failed action in the DLI. + 3. The failed action error is retryable (connection error or status code 429). + + Retryable errors are not indexed in the DLI, as they are expected to be + sent again to the data stream at `es_datastream_name` by the replay handler. Args: - actions (list[Any]): A list of actions to be processed. + actions (list[Any]): A list of actions to index in the DLI. Returns: - list[Any]: A list of actions that were not indexed in the DLI. + list[Any]: A list of actions that were not indexed in the DLI due to one of + the reasons mentioned above. """ non_indexed_actions: list[Any] = [] encoded_actions = []