From 6fd9d127ad5b4cbd9fac1ffca6db917141b56a1e Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Tue, 11 Jun 2024 12:28:17 +0200 Subject: [PATCH 1/9] Skip duplicate events when sending to replay queue --- shippers/es.py | 7 ++- tests/handlers/aws/test_integrations.py | 62 +++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/shippers/es.py b/shippers/es.py index 7b7997d6..9ac2aea4 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -19,7 +19,7 @@ _EVENT_BUFFERED = "_EVENT_BUFFERED" _EVENT_SENT = "_EVENT_SENT" - +_VERSION_CONFLICT = 409 class JSONSerializer(Serializer): mimetype = "application/json" @@ -164,6 +164,11 @@ def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None: shared_logger.warning( "elasticsearch shipper", extra={"error": error["create"]["error"], "_id": error["create"]["_id"]} ) + + if "status" in error["create"] and error["create"]["status"] == 409: + # Skip duplicate events on replay queue + continue + shared_logger.debug("elasticsearch shipper", extra={"action": action_failed[0]}) if self._replay_handler is not None: self._replay_handler("elasticsearch", self._replay_args, action_failed[0]) diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index 0b75847a..d6e8c8f5 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -4202,3 +4202,65 @@ def test_ls_wrong_auth_creds(self) -> None: assert second_body["event_payload"]["cloud"]["region"] == "us-east-1" assert second_body["event_payload"]["cloud"]["account"]["id"] == "000000000000" assert second_body["event_payload"]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] + + def test_es_version_conflict_exception(self) -> None: + assert isinstance(self.elasticsearch, ElasticsearchContainer) + assert isinstance(self.localstack, LocalStackContainer) + + sqs_queue_name = _time_based_id(suffix="source-sqs") + sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) + + sqs_queue_arn = sqs_queue["QueueArn"] + sqs_queue_url = sqs_queue["QueueUrl"] + + config_yaml: str = f""" + inputs: + - type: sqs + id: "{sqs_queue_arn}" + tags: {self.default_tags} + outputs: + - type: "elasticsearch" + args: + elasticsearch_url: "{self.elasticsearch.get_url()}" + ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} + username: "{self.secret_arn}:username" + password: "{self.secret_arn}:password" + """ + + config_file_path = "config.yaml" + config_bucket_name = _time_based_id(suffix="config-bucket") + _s3_upload_content_to_bucket( + client=self.s3_client, + content=config_yaml.encode("utf-8"), + content_type="text/plain", + bucket_name=config_bucket_name, + key=config_file_path, + ) + + os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" + + fixtures = [ + _load_file_fixture("cloudwatch-log-1.json"), + ] + + _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) + + event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) + + ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) + first_call = handler(event, ctx) # type:ignore + + assert first_call == "completed" + + # Index event a second time to trigger version conflict + second_call = handler(event, ctx) # type:ignore + + assert second_call == "completed" + + self.elasticsearch.refresh(index="logs-generic-default") + + assert self.elasticsearch.count(index="logs-generic-default")["count"] == 1 + + # Test no duplicate events end in the replay queue + events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) + assert len(events["Records"]) == 0 From 8d19486b026ee8c27c18c456ed9ed8243516dbd7 Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Wed, 12 Jun 2024 21:17:47 +0200 Subject: [PATCH 2/9] Version conflict global id --- shippers/es.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shippers/es.py b/shippers/es.py index 9ac2aea4..f328e9aa 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -165,7 +165,7 @@ def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None: "elasticsearch shipper", extra={"error": error["create"]["error"], "_id": error["create"]["_id"]} ) - if "status" in error["create"] and error["create"]["status"] == 409: + if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT: # Skip duplicate events on replay queue continue From 148cbd605e7913c441d4d2db0003dbd113403ec3 Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Wed, 12 Jun 2024 21:28:31 +0200 Subject: [PATCH 3/9] Fix formatter --- shippers/es.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index f328e9aa..3fa2a178 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -166,8 +166,8 @@ def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None: ) if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT: - # Skip duplicate events on replay queue - continue + # Skip duplicate events on replay queue + continue shared_logger.debug("elasticsearch shipper", extra={"action": action_failed[0]}) if self._replay_handler is not None: From 818b99755b5063eb863cb874e0c59c5cb188a15c Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Wed, 12 Jun 2024 21:51:28 +0200 Subject: [PATCH 4/9] Fix formatter v2 --- shippers/es.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/shippers/es.py b/shippers/es.py index 3fa2a178..aa2c0de3 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -21,6 +21,7 @@ _EVENT_SENT = "_EVENT_SENT" _VERSION_CONFLICT = 409 + class JSONSerializer(Serializer): mimetype = "application/json" @@ -165,10 +166,10 @@ def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None: "elasticsearch shipper", extra={"error": error["create"]["error"], "_id": error["create"]["_id"]} ) - if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT: + if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT: # Skip duplicate events on replay queue continue - + shared_logger.debug("elasticsearch shipper", extra={"action": action_failed[0]}) if self._replay_handler is not None: self._replay_handler("elasticsearch", self._replay_args, action_failed[0]) From ed7dda1fcf2bfc44e955de9efa4c007d4e8739cc Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Thu, 13 Jun 2024 17:07:50 +0200 Subject: [PATCH 5/9] Fix integration test [test_replay] to not use duplicated ids --- tests/handlers/aws/test_integrations.py | 147 +++++++----------------- tests/testcontainers/es.py | 6 + 2 files changed, 46 insertions(+), 107 deletions(-) diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index d6e8c8f5..a957754f 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -956,42 +956,47 @@ def test_replay(self) -> None: hash_kinesis_record = get_hex_prefix(f"stream-{kinesis_stream_name}-PartitionKey-{sequence_number}") prefix_kinesis = f"{int(float(event_timestamps_kinesis_records[0]) * 1000)}-{hash_kinesis_record}" - # Create an expected id for s3-sqs so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_s3_first}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, - ) - - # Create an expected id for sqs so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_sqs}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, - ) - - # Create an expected id for cloudwatch-logs so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_cloudwatch_logs}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, - ) + # Create pipeline to reject documents + processors = { + "processors": [ + { + "fail": { + "message": "test_replay_fail_pipeline_s3", + "if": f"ctx[\"_id\"] == \"{prefix_s3_first}-000000000000\"", + } + }, + { + "fail": { + "message": "test_replay_fail_pipeline_sqs", + "if": f"ctx[\"_id\"] == \"{prefix_sqs}-000000000000\"", + } + }, + { + "fail": { + "message": "test_replay_fail_pipeline_cloudwatch", + "if": f"ctx[\"_id\"] == \"{prefix_cloudwatch_logs}-000000000000\"", + } + }, + { + "fail": { + "message": "test_replay_fail_pipeline_kinesis", + "if": f"ctx[\"_id\"] == \"{prefix_kinesis}-000000000000\"", + } + }, + ] + } - # Create an expected id for kinesis-data-stream so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_kinesis}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, + self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors) + + self.elasticsearch.create_data_stream(index='logs-generic-default') + self.elasticsearch.put_settings( + index="logs-generic-default", body={"index.default_pipeline": "test_replay_fail_pipeline"} ) self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search(index="logs-generic-default") - assert res["hits"]["total"] == {"value": 4, "relation": "eq"} + assert res["hits"]["total"] == {"value": 0, "relation": "eq"} ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) @@ -1002,20 +1007,6 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search( index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, sort="_seq_no", ) @@ -1045,20 +1036,6 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search( index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, sort="_seq_no", ) @@ -1084,20 +1061,6 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search( index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, sort="_seq_no", ) @@ -1127,20 +1090,6 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search( index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, sort="_seq_no", ) @@ -1170,28 +1119,12 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") - # Remove the expected id for s3-sqs so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", body={"query": {"ids": {"values": [f"{prefix_s3_first}-000000000000"]}}} - ) - - # Remove the expected id for sqs so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", body={"query": {"ids": {"values": [f"{prefix_sqs}-000000000000"]}}} - ) - - # Remove the expected id for cloudwatch logs so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", - body={"query": {"ids": {"values": [f"{prefix_cloudwatch_logs}-000000000000"]}}}, - ) - - # Remove the expected id for kinesis data stream so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", - body={"query": {"ids": {"values": [f"{prefix_kinesis}-000000000000"]}}}, - ) + # Remove pipeline processors + processors = { + "processors": [] + } + self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors) self.elasticsearch.refresh(index="logs-generic-default") # let's update the config file so that logstash won't fail anymore diff --git a/tests/testcontainers/es.py b/tests/testcontainers/es.py index 50a8abba..149b2c07 100644 --- a/tests/testcontainers/es.py +++ b/tests/testcontainers/es.py @@ -209,3 +209,9 @@ def index(self, **kwargs: Any) -> dict[str, Any]: self._index_indices.add(kwargs["index"]) return self.es_client.index(**kwargs) + + def create_data_stream(self, **kwargs: Any) -> dict[str, Any]: + if "index" in kwargs: + self._index_indices.add(kwargs["index"]) + + return self.es_client.indices.create_data_stream(kwargs["index"]) From ca4ff05bb83cc3c32aa468ca7ed8c15e79472ca0 Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Thu, 13 Jun 2024 17:15:55 +0200 Subject: [PATCH 6/9] Fix linter --- tests/handlers/aws/test_integrations.py | 16 +++++++--------- tests/testcontainers/es.py | 2 +- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index a957754f..e545138b 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -962,33 +962,33 @@ def test_replay(self) -> None: { "fail": { "message": "test_replay_fail_pipeline_s3", - "if": f"ctx[\"_id\"] == \"{prefix_s3_first}-000000000000\"", + "if": f'ctx["_id"] == "{prefix_s3_first}-000000000000"', } }, { "fail": { "message": "test_replay_fail_pipeline_sqs", - "if": f"ctx[\"_id\"] == \"{prefix_sqs}-000000000000\"", + "if": f'ctx["_id"] == "{prefix_sqs}-000000000000"', } }, { "fail": { "message": "test_replay_fail_pipeline_cloudwatch", - "if": f"ctx[\"_id\"] == \"{prefix_cloudwatch_logs}-000000000000\"", + "if": f'ctx["_id"] == "{prefix_cloudwatch_logs}-000000000000"', } }, { "fail": { "message": "test_replay_fail_pipeline_kinesis", - "if": f"ctx[\"_id\"] == \"{prefix_kinesis}-000000000000\"", + "if": f'ctx["_id"] == "{prefix_kinesis}-000000000000"', } }, ] } self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors) - - self.elasticsearch.create_data_stream(index='logs-generic-default') + + self.elasticsearch.create_data_stream(index="logs-generic-default") self.elasticsearch.put_settings( index="logs-generic-default", body={"index.default_pipeline": "test_replay_fail_pipeline"} ) @@ -1120,9 +1120,7 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") # Remove pipeline processors - processors = { - "processors": [] - } + processors = {"processors": []} self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors) self.elasticsearch.refresh(index="logs-generic-default") diff --git a/tests/testcontainers/es.py b/tests/testcontainers/es.py index 149b2c07..57f6fe82 100644 --- a/tests/testcontainers/es.py +++ b/tests/testcontainers/es.py @@ -209,7 +209,7 @@ def index(self, **kwargs: Any) -> dict[str, Any]: self._index_indices.add(kwargs["index"]) return self.es_client.index(**kwargs) - + def create_data_stream(self, **kwargs: Any) -> dict[str, Any]: if "index" in kwargs: self._index_indices.add(kwargs["index"]) From f5a61f1c49dc0ab2a850e0584ca4607d2176c6e2 Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Thu, 13 Jun 2024 17:57:07 +0200 Subject: [PATCH 7/9] Fin linter v2 --- tests/testcontainers/es.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/testcontainers/es.py b/tests/testcontainers/es.py index 57f6fe82..350b406b 100644 --- a/tests/testcontainers/es.py +++ b/tests/testcontainers/es.py @@ -214,4 +214,4 @@ def create_data_stream(self, **kwargs: Any) -> dict[str, Any]: if "index" in kwargs: self._index_indices.add(kwargs["index"]) - return self.es_client.indices.create_data_stream(kwargs["index"]) + return self.es_client.indices.create_data_stream(**kwargs) From b4e1c8fb3335f6a10173e77f15b06b1e3bd8234d Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Thu, 13 Jun 2024 18:23:09 +0200 Subject: [PATCH 8/9] Fix integration tests --- tests/handlers/aws/test_integrations.py | 2 +- tests/testcontainers/es.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index e545138b..d5d0e9b1 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -988,7 +988,7 @@ def test_replay(self) -> None: self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors) - self.elasticsearch.create_data_stream(index="logs-generic-default") + self.elasticsearch.create_data_stream(name="logs-generic-default") self.elasticsearch.put_settings( index="logs-generic-default", body={"index.default_pipeline": "test_replay_fail_pipeline"} ) diff --git a/tests/testcontainers/es.py b/tests/testcontainers/es.py index 350b406b..612cf4c4 100644 --- a/tests/testcontainers/es.py +++ b/tests/testcontainers/es.py @@ -211,7 +211,7 @@ def index(self, **kwargs: Any) -> dict[str, Any]: return self.es_client.index(**kwargs) def create_data_stream(self, **kwargs: Any) -> dict[str, Any]: - if "index" in kwargs: - self._index_indices.add(kwargs["index"]) + if "name" in kwargs: + self._index_indices.add(kwargs["name"]) return self.es_client.indices.create_data_stream(**kwargs) From 625a3ca66ee557ba1cd254d3263c9cfbf940055f Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Tue, 9 Jul 2024 18:09:50 +0200 Subject: [PATCH 9/9] Add changelog and update version --- CHANGELOG.md | 4 ++++ share/version.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index be8f0180..7ab40f15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +### v1.16.0 - 2024/07/09 +##### Features +* Prevent duplicate _id events from reaching the replay queue [729](https://github.com/elastic/elastic-serverless-forwarder/pull/729). + ### v1.15.0 - 2024/05/29 ##### Features * Enable multiple outputs for each input [725](https://github.com/elastic/elastic-serverless-forwarder/pull/725). diff --git a/share/version.py b/share/version.py index 8f38540c..aec73094 100644 --- a/share/version.py +++ b/share/version.py @@ -2,4 +2,4 @@ # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. -version = "1.15.0" +version = "1.16.0"