From bb2748ba5c6f489bc53e485e194a72a0ad081468 Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Tue, 11 Jun 2024 12:28:17 +0200 Subject: [PATCH 01/17] Skip duplicate events when sending to replay queue --- shippers/es.py | 7 ++- tests/handlers/aws/test_integrations.py | 62 +++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/shippers/es.py b/shippers/es.py index 123f6f99..815038b5 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -19,7 +19,7 @@ _EVENT_BUFFERED = "_EVENT_BUFFERED" _EVENT_SENT = "_EVENT_SENT" - +_VERSION_CONFLICT = 409 class JSONSerializer(Serializer): mimetype = "application/json" @@ -166,6 +166,11 @@ def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None: shared_logger.warning( "elasticsearch shipper", extra={"error": error["create"]["error"], "_id": error["create"]["_id"]} ) + + if "status" in error["create"] and error["create"]["status"] == 409: + # Skip duplicate events on replay queue + continue + shared_logger.debug("elasticsearch shipper", extra={"action": action_failed[0]}) if self._replay_handler is not None: self._replay_handler(self._output_destination, self._replay_args, action_failed[0]) diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index 0b75847a..d6e8c8f5 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -4202,3 +4202,65 @@ def test_ls_wrong_auth_creds(self) -> None: assert second_body["event_payload"]["cloud"]["region"] == "us-east-1" assert second_body["event_payload"]["cloud"]["account"]["id"] == "000000000000" assert second_body["event_payload"]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] + + def test_es_version_conflict_exception(self) -> None: + assert isinstance(self.elasticsearch, ElasticsearchContainer) + assert isinstance(self.localstack, LocalStackContainer) + + sqs_queue_name = _time_based_id(suffix="source-sqs") + sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) + + sqs_queue_arn = sqs_queue["QueueArn"] + sqs_queue_url = sqs_queue["QueueUrl"] + + config_yaml: str = f""" + inputs: + - type: sqs + id: "{sqs_queue_arn}" + tags: {self.default_tags} + outputs: + - type: "elasticsearch" + args: + elasticsearch_url: "{self.elasticsearch.get_url()}" + ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} + username: "{self.secret_arn}:username" + password: "{self.secret_arn}:password" + """ + + config_file_path = "config.yaml" + config_bucket_name = _time_based_id(suffix="config-bucket") + _s3_upload_content_to_bucket( + client=self.s3_client, + content=config_yaml.encode("utf-8"), + content_type="text/plain", + bucket_name=config_bucket_name, + key=config_file_path, + ) + + os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" + + fixtures = [ + _load_file_fixture("cloudwatch-log-1.json"), + ] + + _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) + + event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) + + ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) + first_call = handler(event, ctx) # type:ignore + + assert first_call == "completed" + + # Index event a second time to trigger version conflict + second_call = handler(event, ctx) # type:ignore + + assert second_call == "completed" + + self.elasticsearch.refresh(index="logs-generic-default") + + assert self.elasticsearch.count(index="logs-generic-default")["count"] == 1 + + # Test no duplicate events end in the replay queue + events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) + assert len(events["Records"]) == 0 From c3f07dd21432076b38a37b53822c984fc8baca43 Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Wed, 12 Jun 2024 21:17:47 +0200 Subject: [PATCH 02/17] Version conflict global id --- shippers/es.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shippers/es.py b/shippers/es.py index 815038b5..7823333f 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -167,7 +167,7 @@ def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None: "elasticsearch shipper", extra={"error": error["create"]["error"], "_id": error["create"]["_id"]} ) - if "status" in error["create"] and error["create"]["status"] == 409: + if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT: # Skip duplicate events on replay queue continue From 86f6edde93372487c45cd41dbc2017ca63d56fdc Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Wed, 12 Jun 2024 21:28:31 +0200 Subject: [PATCH 03/17] Fix formatter --- shippers/es.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index 7823333f..e52ad9d7 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -168,8 +168,8 @@ def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None: ) if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT: - # Skip duplicate events on replay queue - continue + # Skip duplicate events on replay queue + continue shared_logger.debug("elasticsearch shipper", extra={"action": action_failed[0]}) if self._replay_handler is not None: From 5f06e2c08839891eb79a2cffa23c3921d6f827bd Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Wed, 12 Jun 2024 21:51:28 +0200 Subject: [PATCH 04/17] Fix formatter v2 --- shippers/es.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index e52ad9d7..71b07782 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -21,6 +21,7 @@ _EVENT_SENT = "_EVENT_SENT" _VERSION_CONFLICT = 409 + class JSONSerializer(Serializer): mimetype = "application/json" @@ -167,10 +168,10 @@ def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None: "elasticsearch shipper", extra={"error": error["create"]["error"], "_id": error["create"]["_id"]} ) - if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT: + if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT: # Skip duplicate events on replay queue continue - + shared_logger.debug("elasticsearch shipper", extra={"action": action_failed[0]}) if self._replay_handler is not None: self._replay_handler(self._output_destination, self._replay_args, action_failed[0]) From 0eff012cc6c8e887fe27fe922e6c93b50cbb8d91 Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Thu, 13 Jun 2024 17:07:50 +0200 Subject: [PATCH 05/17] Fix integration test [test_replay] to not use duplicated ids --- tests/handlers/aws/test_integrations.py | 147 +++++++----------------- tests/testcontainers/es.py | 6 + 2 files changed, 46 insertions(+), 107 deletions(-) diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index d6e8c8f5..a957754f 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -956,42 +956,47 @@ def test_replay(self) -> None: hash_kinesis_record = get_hex_prefix(f"stream-{kinesis_stream_name}-PartitionKey-{sequence_number}") prefix_kinesis = f"{int(float(event_timestamps_kinesis_records[0]) * 1000)}-{hash_kinesis_record}" - # Create an expected id for s3-sqs so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_s3_first}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, - ) - - # Create an expected id for sqs so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_sqs}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, - ) - - # Create an expected id for cloudwatch-logs so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_cloudwatch_logs}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, - ) + # Create pipeline to reject documents + processors = { + "processors": [ + { + "fail": { + "message": "test_replay_fail_pipeline_s3", + "if": f"ctx[\"_id\"] == \"{prefix_s3_first}-000000000000\"", + } + }, + { + "fail": { + "message": "test_replay_fail_pipeline_sqs", + "if": f"ctx[\"_id\"] == \"{prefix_sqs}-000000000000\"", + } + }, + { + "fail": { + "message": "test_replay_fail_pipeline_cloudwatch", + "if": f"ctx[\"_id\"] == \"{prefix_cloudwatch_logs}-000000000000\"", + } + }, + { + "fail": { + "message": "test_replay_fail_pipeline_kinesis", + "if": f"ctx[\"_id\"] == \"{prefix_kinesis}-000000000000\"", + } + }, + ] + } - # Create an expected id for kinesis-data-stream so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_kinesis}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, + self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors) + + self.elasticsearch.create_data_stream(index='logs-generic-default') + self.elasticsearch.put_settings( + index="logs-generic-default", body={"index.default_pipeline": "test_replay_fail_pipeline"} ) self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search(index="logs-generic-default") - assert res["hits"]["total"] == {"value": 4, "relation": "eq"} + assert res["hits"]["total"] == {"value": 0, "relation": "eq"} ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) @@ -1002,20 +1007,6 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search( index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, sort="_seq_no", ) @@ -1045,20 +1036,6 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search( index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, sort="_seq_no", ) @@ -1084,20 +1061,6 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search( index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, sort="_seq_no", ) @@ -1127,20 +1090,6 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search( index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, sort="_seq_no", ) @@ -1170,28 +1119,12 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") - # Remove the expected id for s3-sqs so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", body={"query": {"ids": {"values": [f"{prefix_s3_first}-000000000000"]}}} - ) - - # Remove the expected id for sqs so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", body={"query": {"ids": {"values": [f"{prefix_sqs}-000000000000"]}}} - ) - - # Remove the expected id for cloudwatch logs so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", - body={"query": {"ids": {"values": [f"{prefix_cloudwatch_logs}-000000000000"]}}}, - ) - - # Remove the expected id for kinesis data stream so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", - body={"query": {"ids": {"values": [f"{prefix_kinesis}-000000000000"]}}}, - ) + # Remove pipeline processors + processors = { + "processors": [] + } + self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors) self.elasticsearch.refresh(index="logs-generic-default") # let's update the config file so that logstash won't fail anymore diff --git a/tests/testcontainers/es.py b/tests/testcontainers/es.py index 50a8abba..149b2c07 100644 --- a/tests/testcontainers/es.py +++ b/tests/testcontainers/es.py @@ -209,3 +209,9 @@ def index(self, **kwargs: Any) -> dict[str, Any]: self._index_indices.add(kwargs["index"]) return self.es_client.index(**kwargs) + + def create_data_stream(self, **kwargs: Any) -> dict[str, Any]: + if "index" in kwargs: + self._index_indices.add(kwargs["index"]) + + return self.es_client.indices.create_data_stream(kwargs["index"]) From fefac09c92f878a8ae3f417ad37937812f2ea2e0 Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Thu, 13 Jun 2024 17:15:55 +0200 Subject: [PATCH 06/17] Fix linter --- tests/handlers/aws/test_integrations.py | 16 +++++++--------- tests/testcontainers/es.py | 2 +- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index a957754f..e545138b 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -962,33 +962,33 @@ def test_replay(self) -> None: { "fail": { "message": "test_replay_fail_pipeline_s3", - "if": f"ctx[\"_id\"] == \"{prefix_s3_first}-000000000000\"", + "if": f'ctx["_id"] == "{prefix_s3_first}-000000000000"', } }, { "fail": { "message": "test_replay_fail_pipeline_sqs", - "if": f"ctx[\"_id\"] == \"{prefix_sqs}-000000000000\"", + "if": f'ctx["_id"] == "{prefix_sqs}-000000000000"', } }, { "fail": { "message": "test_replay_fail_pipeline_cloudwatch", - "if": f"ctx[\"_id\"] == \"{prefix_cloudwatch_logs}-000000000000\"", + "if": f'ctx["_id"] == "{prefix_cloudwatch_logs}-000000000000"', } }, { "fail": { "message": "test_replay_fail_pipeline_kinesis", - "if": f"ctx[\"_id\"] == \"{prefix_kinesis}-000000000000\"", + "if": f'ctx["_id"] == "{prefix_kinesis}-000000000000"', } }, ] } self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors) - - self.elasticsearch.create_data_stream(index='logs-generic-default') + + self.elasticsearch.create_data_stream(index="logs-generic-default") self.elasticsearch.put_settings( index="logs-generic-default", body={"index.default_pipeline": "test_replay_fail_pipeline"} ) @@ -1120,9 +1120,7 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") # Remove pipeline processors - processors = { - "processors": [] - } + processors = {"processors": []} self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors) self.elasticsearch.refresh(index="logs-generic-default") diff --git a/tests/testcontainers/es.py b/tests/testcontainers/es.py index 149b2c07..57f6fe82 100644 --- a/tests/testcontainers/es.py +++ b/tests/testcontainers/es.py @@ -209,7 +209,7 @@ def index(self, **kwargs: Any) -> dict[str, Any]: self._index_indices.add(kwargs["index"]) return self.es_client.index(**kwargs) - + def create_data_stream(self, **kwargs: Any) -> dict[str, Any]: if "index" in kwargs: self._index_indices.add(kwargs["index"]) From 936a2019aa137b4feaa7cb802eeacfda9c35304d Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Thu, 13 Jun 2024 17:57:07 +0200 Subject: [PATCH 07/17] Fin linter v2 --- tests/testcontainers/es.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/testcontainers/es.py b/tests/testcontainers/es.py index 57f6fe82..350b406b 100644 --- a/tests/testcontainers/es.py +++ b/tests/testcontainers/es.py @@ -214,4 +214,4 @@ def create_data_stream(self, **kwargs: Any) -> dict[str, Any]: if "index" in kwargs: self._index_indices.add(kwargs["index"]) - return self.es_client.indices.create_data_stream(kwargs["index"]) + return self.es_client.indices.create_data_stream(**kwargs) From 3f97f369550bc7957f5b2785cef14287494ab4fd Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Thu, 13 Jun 2024 18:23:09 +0200 Subject: [PATCH 08/17] Fix integration tests --- tests/handlers/aws/test_integrations.py | 2 +- tests/testcontainers/es.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index e545138b..d5d0e9b1 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -988,7 +988,7 @@ def test_replay(self) -> None: self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors) - self.elasticsearch.create_data_stream(index="logs-generic-default") + self.elasticsearch.create_data_stream(name="logs-generic-default") self.elasticsearch.put_settings( index="logs-generic-default", body={"index.default_pipeline": "test_replay_fail_pipeline"} ) diff --git a/tests/testcontainers/es.py b/tests/testcontainers/es.py index 350b406b..612cf4c4 100644 --- a/tests/testcontainers/es.py +++ b/tests/testcontainers/es.py @@ -211,7 +211,7 @@ def index(self, **kwargs: Any) -> dict[str, Any]: return self.es_client.index(**kwargs) def create_data_stream(self, **kwargs: Any) -> dict[str, Any]: - if "index" in kwargs: - self._index_indices.add(kwargs["index"]) + if "name" in kwargs: + self._index_indices.add(kwargs["name"]) return self.es_client.indices.create_data_stream(**kwargs) From 55685c48f1ad0edd0fdbdc3d5ef21574de23b001 Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Wed, 3 Jul 2024 11:21:42 +0200 Subject: [PATCH 09/17] Add dead letter index redirection for es bulk failed requests --- share/config.py | 13 ++ shippers/es.py | 105 +++++++++-- shippers/factory.py | 1 + tests/handlers/aws/test_integrations.py | 222 ++++++++++++++++++++++++ 4 files changed, 322 insertions(+), 19 deletions(-) diff --git a/share/config.py b/share/config.py index f9ce3fe2..dc38a398 100644 --- a/share/config.py +++ b/share/config.py @@ -50,6 +50,7 @@ def __init__( batch_max_actions: int = 500, batch_max_bytes: int = 10 * 1024 * 1024, ssl_assert_fingerprint: str = "", + es_dead_letter_index: str = "", ): super().__init__(output_type="elasticsearch") self.elasticsearch_url = elasticsearch_url @@ -62,6 +63,7 @@ def __init__( self.batch_max_actions = batch_max_actions self.batch_max_bytes = batch_max_bytes self.ssl_assert_fingerprint = ssl_assert_fingerprint + self.es_dead_letter_index = es_dead_letter_index if self.cloud_id and self.elasticsearch_url: shared_logger.warning("both `elasticsearch_url` and `cloud_id` set in config: using `elasticsearch_url`") @@ -182,6 +184,17 @@ def ssl_assert_fingerprint(self, value: str) -> None: self._ssl_assert_fingerprint = value + @property + def es_dead_letter_index(self) -> str: + return self._es_dead_letter_index + + @es_dead_letter_index.setter + def es_dead_letter_index(self, value: str) -> None: + if not isinstance(value, str): + raise ValueError("`es_dead_letter_index` must be provided as string") + + self._es_dead_letter_index = value + class LogstashOutput(Output): def __init__( diff --git a/shippers/es.py b/shippers/es.py index 71b07782..68bbf535 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -2,7 +2,9 @@ # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. +import datetime from typing import Any, Dict, Optional, Union +import uuid import elasticapm # noqa: F401 from elasticsearch import Elasticsearch @@ -58,6 +60,7 @@ def __init__( cloud_id: str = "", api_key: str = "", es_datastream_name: str = "", + es_dead_letter_index: str = "", tags: list[str] = [], batch_max_actions: int = 500, batch_max_bytes: int = 10 * 1024 * 1024, @@ -108,6 +111,7 @@ def __init__( self._event_id_generator: Optional[EventIdGeneratorCallable] = None self._es_datastream_name = es_datastream_name + self._es_dead_letter_index = es_dead_letter_index self._tags = tags self._es_index = "" @@ -153,13 +157,13 @@ def _enrich_event(self, event_payload: dict[str, Any]) -> None: event_payload["tags"] += self._tags - def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None: + def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Union[int, list[Any]]]) -> list[Any]: assert isinstance(errors[1], list) success = errors[0] - failed = len(errors[1]) + failed: list[Any] = [] for error in errors[1]: - action_failed = [action for action in self._bulk_actions if action["_id"] == error["create"]["_id"]] + action_failed = [action for action in actions if action["_id"] == error["create"]["_id"]] # an ingestion pipeline might override the _id, we can only skip in this case if len(action_failed) != 1: continue @@ -169,20 +173,17 @@ def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None: ) if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT: - # Skip duplicate events on replay queue + # Skip duplicate events on dead letter index and replay queue continue - shared_logger.debug("elasticsearch shipper", extra={"action": action_failed[0]}) - if self._replay_handler is not None: - self._replay_handler(self._output_destination, self._replay_args, action_failed[0]) + failed.append({"error": error["create"]["error"], "action": action_failed[0]}) - if failed > 0: - shared_logger.warning("elasticsearch shipper", extra={"success": success, "failed": failed}) - return - - shared_logger.info("elasticsearch shipper", extra={"success": success, "failed": failed}) + if len(failed) > 0: + shared_logger.warning("elasticsearch shipper", extra={"success": success, "failed": len(failed)}) + else: + shared_logger.info("elasticsearch shipper", extra={"success": success, "failed": len(failed)}) - return + return failed def set_event_id_generator(self, event_id_generator: EventIdGeneratorCallable) -> None: self._event_id_generator = event_id_generator @@ -213,21 +214,87 @@ def send(self, event: dict[str, Any]) -> str: if len(self._bulk_actions) < self._bulk_batch_size: return _EVENT_BUFFERED - errors = es_bulk(self._es_client, self._bulk_actions, **self._bulk_kwargs) - self._handle_outcome(errors=errors) - self._bulk_actions = [] + self.flush() return _EVENT_SENT def flush(self) -> None: - if len(self._bulk_actions) > 0: - errors = es_bulk(self._es_client, self._bulk_actions, **self._bulk_kwargs) - self._handle_outcome(errors=errors) + if len(self._bulk_actions) == 0: + return + + errors = es_bulk(self._es_client, self._bulk_actions, **self._bulk_kwargs) + failed = self._handle_outcome(actions=self._bulk_actions, errors=errors) + + # Send failed requests to dead letter index, if enabled + if len(failed) > 0 and self._es_dead_letter_index: + failed = self._send_dead_letter_index(failed) + + # Send remaining failed requests to replay queue, if enabled + if isinstance(failed, list) and len(failed) > 0 and self._replay_handler is not None: + for outcome in failed: + if "action" not in outcome: + shared_logger.error("action could not be extracted to be replayed", extra={"outcome": outcome}) + continue + + self._replay_handler(self._output_destination, self._replay_args, outcome["action"]) self._bulk_actions = [] return + def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: + encoded_actions = [] + dead_letter_errors = [] + for action in actions: + # Reshape event to dead letter index + encoded = self._encode_dead_letter(action) + if encoded is None: + shared_logger.error("cannot encode dead letter index event from payload", extra={"action": action}) + dead_letter_errors.append(action) + + encoded_actions.append(encoded) + + # If no action can be encoded, return original action list as failed + if len(encoded_actions) == 0: + return dead_letter_errors + + errors = es_bulk(self._es_client, encoded_actions, **self._bulk_kwargs) + failed = self._handle_outcome(actions=encoded_actions, errors=errors) + + if not isinstance(failed, list) or len(failed) == 0: + return [] + + for action in failed: + event_payload = self._decode_dead_letter(action) + + if event_payload is None: + continue + + dead_letter_errors.append(event_payload) + + return dead_letter_errors + + def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]: + if "action" not in outcome or "error" not in outcome: + return + + # Assign random id in case bulk() results in error, it can be matched to the original + # action + return { + "@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "_id": f'{uuid.uuid4()}', + "_index": self._es_dead_letter_index, + "_op_type": "create", + "message": json_dumper(outcome["action"]), + "error": outcome["error"], + } + + def _decode_dead_letter(self, dead_letter_outcome: dict[str, Any]) -> dict[str, Any]: + if "action" not in dead_letter_outcome or "message" not in dead_letter_outcome["action"]: + return + + return {"action": json_parser(dead_letter_outcome["action"]["message"])} + def _discover_dataset(self, event_payload: Dict[str, Any]) -> None: if self._es_datastream_name != "": if self._es_datastream_name.startswith("logs-"): diff --git a/shippers/factory.py b/shippers/factory.py index ecd02919..0f444434 100644 --- a/shippers/factory.py +++ b/shippers/factory.py @@ -48,6 +48,7 @@ def create_from_output(output_type: str, output: Output) -> ProtocolShipper: batch_max_actions=output.batch_max_actions, batch_max_bytes=output.batch_max_bytes, ssl_assert_fingerprint=output.ssl_assert_fingerprint, + es_dead_letter_index=output.es_dead_letter_index, ) if output_type == "logstash": diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index d5d0e9b1..f56beb43 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -4195,3 +4195,225 @@ def test_es_version_conflict_exception(self) -> None: # Test no duplicate events end in the replay queue events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) assert len(events["Records"]) == 0 + + def test_es_dead_letter_index(self) -> None: + assert isinstance(self.elasticsearch, ElasticsearchContainer) + assert isinstance(self.localstack, LocalStackContainer) + + sqs_queue_name = _time_based_id(suffix="source-sqs") + sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) + + dead_letter_index_name = "logs-generic-default-dli" + + sqs_queue_arn = sqs_queue["QueueArn"] + sqs_queue_url = sqs_queue["QueueUrl"] + sqs_queue_url_path = sqs_queue["QueueUrlPath"] + + config_yaml: str = f""" + inputs: + - type: sqs + id: "{sqs_queue_arn}" + tags: {self.default_tags} + outputs: + - type: "elasticsearch" + args: + elasticsearch_url: "{self.elasticsearch.get_url()}" + es_dead_letter_index: "{dead_letter_index_name}" + ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} + username: "{self.secret_arn}:username" + password: "{self.secret_arn}:password" + """ + + config_file_path = "config.yaml" + config_bucket_name = _time_based_id(suffix="config-bucket") + _s3_upload_content_to_bucket( + client=self.s3_client, + content=config_yaml.encode("utf-8"), + content_type="text/plain", + bucket_name=config_bucket_name, + key=config_file_path, + ) + + os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" + + fixtures = [ + _load_file_fixture("cloudwatch-log-1.json"), + ] + + _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) + + event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) + message_id = event["Records"][0]["messageId"] + + # Create pipeline to reject documents + processors = { + "processors": [ + { + "fail": { + "message": "test_es_non_indexable_dead_letter_index fail message", + } + }, + ] + } + + self.elasticsearch.put_pipeline(id="test_es_non_indexable_dead_letter_index_fail_pipeline", body=processors) + + self.elasticsearch.create_data_stream(name="logs-generic-default") + self.elasticsearch.put_settings( + index="logs-generic-default", + body={"index.default_pipeline": "test_es_non_indexable_dead_letter_index_fail_pipeline"}, + ) + + self.elasticsearch.refresh(index="logs-generic-default") + + self.elasticsearch.create_data_stream(name=dead_letter_index_name) + + ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) + first_call = handler(event, ctx) # type:ignore + + assert first_call == "completed" + + # Test document has been rejected from target index + self.elasticsearch.refresh(index="logs-generic-default") + + assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0 + + # Test document has been redirected to dli + assert self.elasticsearch.exists(index=dead_letter_index_name) is True + + self.elasticsearch.refresh(index=dead_letter_index_name) + + assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 1 + + res = self.elasticsearch.search(index=dead_letter_index_name, sort="_seq_no") + + assert res["hits"]["total"] == {"value": 1, "relation": "eq"} + + assert ( + res["hits"]["hits"][0]["_source"]["error"]["reason"] + == "test_es_non_indexable_dead_letter_index fail message" + ) + assert res["hits"]["hits"][0]["_source"]["error"]["type"] == "fail_processor_exception" + dead_letter_message = json_parser(res["hits"]["hits"][0]["_source"]["message"]) + assert dead_letter_message["log"]["offset"] == 0 + assert dead_letter_message["log"]["file"]["path"] == sqs_queue_url_path + assert dead_letter_message["aws"]["sqs"]["name"] == sqs_queue_name + assert dead_letter_message["aws"]["sqs"]["message_id"] == message_id + assert dead_letter_message["cloud"]["provider"] == "aws" + assert dead_letter_message["cloud"]["region"] == "us-east-1" + assert dead_letter_message["cloud"]["account"]["id"] == "000000000000" + assert dead_letter_message["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] + + # Test event does not go into the replay queue + events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) + + assert len(events["Records"]) == 0 + + def test_es_non_indexable_dead_letter_index(self) -> None: + assert isinstance(self.elasticsearch, ElasticsearchContainer) + assert isinstance(self.localstack, LocalStackContainer) + + sqs_queue_name = _time_based_id(suffix="source-sqs") + sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) + + dead_letter_index_name = "logs-generic-default-dli" + + sqs_queue_arn = sqs_queue["QueueArn"] + sqs_queue_url = sqs_queue["QueueUrl"] + sqs_queue_url_path = sqs_queue["QueueUrlPath"] + + config_yaml: str = f""" + inputs: + - type: sqs + id: "{sqs_queue_arn}" + tags: {self.default_tags} + outputs: + - type: "elasticsearch" + args: + elasticsearch_url: "{self.elasticsearch.get_url()}" + es_dead_letter_index: "{dead_letter_index_name}" + ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} + username: "{self.secret_arn}:username" + password: "{self.secret_arn}:password" + """ + + config_file_path = "config.yaml" + config_bucket_name = _time_based_id(suffix="config-bucket") + _s3_upload_content_to_bucket( + client=self.s3_client, + content=config_yaml.encode("utf-8"), + content_type="text/plain", + bucket_name=config_bucket_name, + key=config_file_path, + ) + + os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" + + fixtures = [ + _load_file_fixture("cloudwatch-log-1.json"), + ] + + _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) + + event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) + message_id = event["Records"][0]["messageId"] + + # Create pipeline to reject documents + processors = { + "processors": [ + { + "fail": { + "message": "test_es_non_indexable_dead_letter_index", + } + }, + ] + } + + self.elasticsearch.put_pipeline(id="test_es_non_indexable_dead_letter_index_fail_pipeline", body=processors) + + self.elasticsearch.create_data_stream(name="logs-generic-default") + self.elasticsearch.put_settings( + index="logs-generic-default", + body={"index.default_pipeline": "test_es_non_indexable_dead_letter_index_fail_pipeline"}, + ) + + self.elasticsearch.refresh(index="logs-generic-default") + + self.elasticsearch.create_data_stream(name=dead_letter_index_name) + self.elasticsearch.put_settings( + index=dead_letter_index_name, + body={"index.default_pipeline": "test_es_non_indexable_dead_letter_index_fail_pipeline"}, + ) + + ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) + first_call = handler(event, ctx) # type:ignore + + assert first_call == "completed" + + # Test document has been rejected from target index + self.elasticsearch.refresh(index="logs-generic-default") + + assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0 + + # Test event does not go into the dead letter queue + assert self.elasticsearch.exists(index=dead_letter_index_name) is True + + self.elasticsearch.refresh(index=dead_letter_index_name) + + assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 0 + + # Test event has been redirected into the replay queue + events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) + assert len(events["Records"]) == 1 + + first_body: dict[str, Any] = json_parser(events["Records"][0]["body"]) + + assert first_body["event_payload"]["message"] == fixtures[0].rstrip("\n") + assert first_body["event_payload"]["log"]["offset"] == 0 + assert first_body["event_payload"]["log"]["file"]["path"] == sqs_queue_url_path + assert first_body["event_payload"]["aws"]["sqs"]["name"] == sqs_queue_name + assert first_body["event_payload"]["aws"]["sqs"]["message_id"] == message_id + assert first_body["event_payload"]["cloud"]["provider"] == "aws" + assert first_body["event_payload"]["cloud"]["region"] == "us-east-1" + assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000" + assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] From 16e2cce2a09711d74e9e5468a967dd5b303112f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Emilio=20Alvarez=20Pi=C3=B1eiro?= <95703246+emilioalvap@users.noreply.github.com> Date: Wed, 3 Jul 2024 11:51:01 +0200 Subject: [PATCH 10/17] Update shippers/es.py --- shippers/es.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shippers/es.py b/shippers/es.py index 68bbf535..d89e7034 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -262,7 +262,7 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: failed = self._handle_outcome(actions=encoded_actions, errors=errors) if not isinstance(failed, list) or len(failed) == 0: - return [] + return dead_letter_errors for action in failed: event_payload = self._decode_dead_letter(action) From 23cbcbf09ae81dd8543167d973de0ddc2179af87 Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Wed, 3 Jul 2024 12:21:44 +0200 Subject: [PATCH 11/17] Fix lint --- shippers/es.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shippers/es.py b/shippers/es.py index d89e7034..ac84c44c 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -282,7 +282,7 @@ def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]: # action return { "@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"), - "_id": f'{uuid.uuid4()}', + "_id": f"{uuid.uuid4()}", "_index": self._es_dead_letter_index, "_op_type": "create", "message": json_dumper(outcome["action"]), From a850c0f046089de20c43b8a355abf4f0055581ef Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Wed, 3 Jul 2024 13:40:42 +0200 Subject: [PATCH 12/17] Fix linter --- shippers/es.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shippers/es.py b/shippers/es.py index ac84c44c..0ef16f82 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -3,8 +3,8 @@ # you may not use this file except in compliance with the Elastic License 2.0. import datetime -from typing import Any, Dict, Optional, Union import uuid +from typing import Any, Dict, Optional, Union import elasticapm # noqa: F401 from elasticsearch import Elasticsearch From ef0e8752553a88b1c0a528a6702c73f9fa13f0f2 Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Wed, 3 Jul 2024 14:14:57 +0200 Subject: [PATCH 13/17] Fix linter --- shippers/es.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index 0ef16f82..c635eb25 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -244,7 +244,7 @@ def flush(self) -> None: def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: encoded_actions = [] - dead_letter_errors = [] + dead_letter_errors: list[Any] = [] for action in actions: # Reshape event to dead letter index encoded = self._encode_dead_letter(action) @@ -291,7 +291,7 @@ def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]: def _decode_dead_letter(self, dead_letter_outcome: dict[str, Any]) -> dict[str, Any]: if "action" not in dead_letter_outcome or "message" not in dead_letter_outcome["action"]: - return + return {} return {"action": json_parser(dead_letter_outcome["action"]["message"])} From d25b5ed4528d6ed99e6daad5876edb37f15958fc Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Wed, 3 Jul 2024 14:48:40 +0200 Subject: [PATCH 14/17] Fix linter --- shippers/es.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shippers/es.py b/shippers/es.py index c635eb25..16518e1f 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -276,7 +276,7 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]: if "action" not in outcome or "error" not in outcome: - return + return {} # Assign random id in case bulk() results in error, it can be matched to the original # action From 7b2653a397d4a913bd8080b8396c79cbcd4b7452 Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Wed, 10 Jul 2024 17:53:48 +0200 Subject: [PATCH 15/17] Fix dead letter event null result --- shippers/es.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index 16518e1f..5de35b6d 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -276,7 +276,7 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]: if "action" not in outcome or "error" not in outcome: - return {} + return None # Assign random id in case bulk() results in error, it can be matched to the original # action @@ -291,7 +291,7 @@ def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]: def _decode_dead_letter(self, dead_letter_outcome: dict[str, Any]) -> dict[str, Any]: if "action" not in dead_letter_outcome or "message" not in dead_letter_outcome["action"]: - return {} + return None return {"action": json_parser(dead_letter_outcome["action"]["message"])} From cb8410fefd11ba32e6a2f1ce6e79a4705cef2f36 Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Wed, 10 Jul 2024 21:07:50 +0200 Subject: [PATCH 16/17] Fix empty dead letter encoding conditions --- shippers/es.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index 5de35b6d..43d86e1b 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -248,7 +248,7 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: for action in actions: # Reshape event to dead letter index encoded = self._encode_dead_letter(action) - if encoded is None: + if not encoded: shared_logger.error("cannot encode dead letter index event from payload", extra={"action": action}) dead_letter_errors.append(action) @@ -267,7 +267,8 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: for action in failed: event_payload = self._decode_dead_letter(action) - if event_payload is None: + if not event_payload: + shared_logger.error("cannot decode dead letter index event from payload", extra={"action": action}) continue dead_letter_errors.append(event_payload) @@ -276,7 +277,7 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]: if "action" not in outcome or "error" not in outcome: - return None + return {} # Assign random id in case bulk() results in error, it can be matched to the original # action @@ -291,7 +292,7 @@ def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]: def _decode_dead_letter(self, dead_letter_outcome: dict[str, Any]) -> dict[str, Any]: if "action" not in dead_letter_outcome or "message" not in dead_letter_outcome["action"]: - return None + return {} return {"action": json_parser(dead_letter_outcome["action"]["message"])} From cf8fbca8830d709edbe55ac84929cf0603982d51 Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Wed, 10 Jul 2024 21:14:26 +0200 Subject: [PATCH 17/17] Add changelog, update version and docs --- CHANGELOG.md | 4 ++++ docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc | 6 ++++++ share/version.py | 2 +- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ab40f15..d2a73c1a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +### v1.17.0 - 2024/07/10 +##### Features +* Add dead letter index for ES outputs [733](https://github.com/elastic/elastic-serverless-forwarder/pull/733). + ### v1.16.0 - 2024/07/09 ##### Features * Prevent duplicate _id events from reaching the replay queue [729](https://github.com/elastic/elastic-serverless-forwarder/pull/729). diff --git a/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc b/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc index f34e2364..f16e75b0 100644 --- a/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc +++ b/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc @@ -75,6 +75,7 @@ inputs: username: "username" password: "password" es_datastream_name: "logs-generic-default" + es_dead_letter_index: "esf-dead-letter-index" # optional batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -98,6 +99,7 @@ inputs: username: "username" password: "password" es_datastream_name: "logs-generic-default" + es_dead_letter_index: "esf-dead-letter-index" # optional batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -121,6 +123,7 @@ inputs: username: "username" password: "password" es_datastream_name: "logs-generic-default" + es_dead_letter_index: "esf-dead-letter-index" # optional batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -144,6 +147,7 @@ inputs: username: "username" password: "password" es_datastream_name: "logs-generic-default" + es_dead_letter_index: "esf-dead-letter-index" # optional batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -167,6 +171,7 @@ inputs: username: "username" password: "password" es_datastream_name: "logs-generic-default" + es_dead_letter_index: "esf-dead-letter-index" # optional batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -224,6 +229,7 @@ For `elasticsearch` the following arguments are supported: * `args.password` Password of the elasticsearch instance to connect to. Mandatory when `args.api_key` is not provided. Will take precedence over `args.api_key` if both are defined. * `args.api_key`: API key of elasticsearch endpoint in the format `base64encode(api_key_id:api_key_secret)`. Mandatory when `args.username` and `args.password` are not provided. Will be ignored if `args.username`/`args.password` are defined. * `args.es_datastream_name`: Name of data stream or index where logs should be forwarded to. Lambda supports automatic routing of various {aws} service logs to the corresponding data streams for further processing and storage in the {es} cluster. It supports automatic routing of `aws.cloudtrail`, `aws.cloudwatch_logs`, `aws.elb_logs`, `aws.firewall_logs`, `aws.vpcflow`, and `aws.waf` logs. For other log types, if using data streams, you can optionally set its value in the configuration file according to the naming convention for data streams and available integrations. If the `es_datastream_name` is not specified and it cannot be matched with any of the above {aws} services, then the value will be set to `logs-generic-default`. In versions **v0.29.1** and below, this configuration parameter was named `es_index_or_datastream_name`. Rename the configuration parameter to `es_datastream_name` in your `config.yaml` file on the S3 bucket to continue using it in the future version. The older name `es_index_or_datastream_name` is deprecated as of version **v0.30.0**. The related backward compatibility code is removed from version **v1.0.0**. + * `args.es_dead_letter_index`: Name of data stream or index where logs should be redirected to, in case indexing to `args.es_datastream_name` returned an error. * `args.batch_max_actions`: (Optional) Maximum number of actions to send in a single bulk request. Default value: 500. * `args.batch_max_bytes`: (Optional) Maximum size in bytes to send in a single bulk request. Default value: 10485760 (10MB). * `args.ssl_assert_fingerprint`: (Optional) SSL fingerprint for self-signed SSL certificate on HTTPS transport. The default value is an empty string, meaning the HTTP client requires a valid certificate. diff --git a/share/version.py b/share/version.py index aec73094..95f01911 100644 --- a/share/version.py +++ b/share/version.py @@ -2,4 +2,4 @@ # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. -version = "1.16.0" +version = "1.17.0"