diff --git a/CHANGELOG.md b/CHANGELOG.md index be8f0180..7ab40f15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +### v1.16.0 - 2024/07/09 +##### Features +* Prevent duplicate _id events from reaching the replay queue [729](https://github.com/elastic/elastic-serverless-forwarder/pull/729). + ### v1.15.0 - 2024/05/29 ##### Features * Enable multiple outputs for each input [725](https://github.com/elastic/elastic-serverless-forwarder/pull/725). diff --git a/share/version.py b/share/version.py index 8f38540c..aec73094 100644 --- a/share/version.py +++ b/share/version.py @@ -2,4 +2,4 @@ # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. -version = "1.15.0" +version = "1.16.0" diff --git a/shippers/es.py b/shippers/es.py index 123f6f99..71b07782 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -19,6 +19,7 @@ _EVENT_BUFFERED = "_EVENT_BUFFERED" _EVENT_SENT = "_EVENT_SENT" +_VERSION_CONFLICT = 409 class JSONSerializer(Serializer): @@ -166,6 +167,11 @@ def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None: shared_logger.warning( "elasticsearch shipper", extra={"error": error["create"]["error"], "_id": error["create"]["_id"]} ) + + if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT: + # Skip duplicate events on replay queue + continue + shared_logger.debug("elasticsearch shipper", extra={"action": action_failed[0]}) if self._replay_handler is not None: self._replay_handler(self._output_destination, self._replay_args, action_failed[0]) diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index 0b75847a..d5d0e9b1 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -956,42 +956,47 @@ def test_replay(self) -> None: hash_kinesis_record = get_hex_prefix(f"stream-{kinesis_stream_name}-PartitionKey-{sequence_number}") prefix_kinesis = f"{int(float(event_timestamps_kinesis_records[0]) * 1000)}-{hash_kinesis_record}" - # Create an expected id for s3-sqs so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_s3_first}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, - ) - - # Create an expected id for sqs so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_sqs}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, - ) + # Create pipeline to reject documents + processors = { + "processors": [ + { + "fail": { + "message": "test_replay_fail_pipeline_s3", + "if": f'ctx["_id"] == "{prefix_s3_first}-000000000000"', + } + }, + { + "fail": { + "message": "test_replay_fail_pipeline_sqs", + "if": f'ctx["_id"] == "{prefix_sqs}-000000000000"', + } + }, + { + "fail": { + "message": "test_replay_fail_pipeline_cloudwatch", + "if": f'ctx["_id"] == "{prefix_cloudwatch_logs}-000000000000"', + } + }, + { + "fail": { + "message": "test_replay_fail_pipeline_kinesis", + "if": f'ctx["_id"] == "{prefix_kinesis}-000000000000"', + } + }, + ] + } - # Create an expected id for cloudwatch-logs so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_cloudwatch_logs}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, - ) + self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors) - # Create an expected id for kinesis-data-stream so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_kinesis}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, + self.elasticsearch.create_data_stream(name="logs-generic-default") + self.elasticsearch.put_settings( + index="logs-generic-default", body={"index.default_pipeline": "test_replay_fail_pipeline"} ) self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search(index="logs-generic-default") - assert res["hits"]["total"] == {"value": 4, "relation": "eq"} + assert res["hits"]["total"] == {"value": 0, "relation": "eq"} ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) @@ -1002,20 +1007,6 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search( index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, sort="_seq_no", ) @@ -1045,20 +1036,6 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search( index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, sort="_seq_no", ) @@ -1084,20 +1061,6 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search( index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, sort="_seq_no", ) @@ -1127,20 +1090,6 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search( index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, sort="_seq_no", ) @@ -1170,28 +1119,10 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") - # Remove the expected id for s3-sqs so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", body={"query": {"ids": {"values": [f"{prefix_s3_first}-000000000000"]}}} - ) - - # Remove the expected id for sqs so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", body={"query": {"ids": {"values": [f"{prefix_sqs}-000000000000"]}}} - ) - - # Remove the expected id for cloudwatch logs so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", - body={"query": {"ids": {"values": [f"{prefix_cloudwatch_logs}-000000000000"]}}}, - ) - - # Remove the expected id for kinesis data stream so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", - body={"query": {"ids": {"values": [f"{prefix_kinesis}-000000000000"]}}}, - ) + # Remove pipeline processors + processors = {"processors": []} + self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors) self.elasticsearch.refresh(index="logs-generic-default") # let's update the config file so that logstash won't fail anymore @@ -4202,3 +4133,65 @@ def test_ls_wrong_auth_creds(self) -> None: assert second_body["event_payload"]["cloud"]["region"] == "us-east-1" assert second_body["event_payload"]["cloud"]["account"]["id"] == "000000000000" assert second_body["event_payload"]["tags"] == ["forwarded", "tag1", "tag2", "tag3"] + + def test_es_version_conflict_exception(self) -> None: + assert isinstance(self.elasticsearch, ElasticsearchContainer) + assert isinstance(self.localstack, LocalStackContainer) + + sqs_queue_name = _time_based_id(suffix="source-sqs") + sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) + + sqs_queue_arn = sqs_queue["QueueArn"] + sqs_queue_url = sqs_queue["QueueUrl"] + + config_yaml: str = f""" + inputs: + - type: sqs + id: "{sqs_queue_arn}" + tags: {self.default_tags} + outputs: + - type: "elasticsearch" + args: + elasticsearch_url: "{self.elasticsearch.get_url()}" + ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} + username: "{self.secret_arn}:username" + password: "{self.secret_arn}:password" + """ + + config_file_path = "config.yaml" + config_bucket_name = _time_based_id(suffix="config-bucket") + _s3_upload_content_to_bucket( + client=self.s3_client, + content=config_yaml.encode("utf-8"), + content_type="text/plain", + bucket_name=config_bucket_name, + key=config_file_path, + ) + + os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" + + fixtures = [ + _load_file_fixture("cloudwatch-log-1.json"), + ] + + _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) + + event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) + + ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) + first_call = handler(event, ctx) # type:ignore + + assert first_call == "completed" + + # Index event a second time to trigger version conflict + second_call = handler(event, ctx) # type:ignore + + assert second_call == "completed" + + self.elasticsearch.refresh(index="logs-generic-default") + + assert self.elasticsearch.count(index="logs-generic-default")["count"] == 1 + + # Test no duplicate events end in the replay queue + events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) + assert len(events["Records"]) == 0 diff --git a/tests/testcontainers/es.py b/tests/testcontainers/es.py index 50a8abba..612cf4c4 100644 --- a/tests/testcontainers/es.py +++ b/tests/testcontainers/es.py @@ -209,3 +209,9 @@ def index(self, **kwargs: Any) -> dict[str, Any]: self._index_indices.add(kwargs["index"]) return self.es_client.index(**kwargs) + + def create_data_stream(self, **kwargs: Any) -> dict[str, Any]: + if "name" in kwargs: + self._index_indices.add(kwargs["name"]) + + return self.es_client.indices.create_data_stream(**kwargs)