From ed7dda1fcf2bfc44e955de9efa4c007d4e8739cc Mon Sep 17 00:00:00 2001 From: emilioalvap Date: Thu, 13 Jun 2024 17:07:50 +0200 Subject: [PATCH] Fix integration test [test_replay] to not use duplicated ids --- tests/handlers/aws/test_integrations.py | 147 +++++++----------------- tests/testcontainers/es.py | 6 + 2 files changed, 46 insertions(+), 107 deletions(-) diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index d6e8c8f5..a957754f 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -956,42 +956,47 @@ def test_replay(self) -> None: hash_kinesis_record = get_hex_prefix(f"stream-{kinesis_stream_name}-PartitionKey-{sequence_number}") prefix_kinesis = f"{int(float(event_timestamps_kinesis_records[0]) * 1000)}-{hash_kinesis_record}" - # Create an expected id for s3-sqs so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_s3_first}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, - ) - - # Create an expected id for sqs so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_sqs}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, - ) - - # Create an expected id for cloudwatch-logs so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_cloudwatch_logs}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, - ) + # Create pipeline to reject documents + processors = { + "processors": [ + { + "fail": { + "message": "test_replay_fail_pipeline_s3", + "if": f"ctx[\"_id\"] == \"{prefix_s3_first}-000000000000\"", + } + }, + { + "fail": { + "message": "test_replay_fail_pipeline_sqs", + "if": f"ctx[\"_id\"] == \"{prefix_sqs}-000000000000\"", + } + }, + { + "fail": { + "message": "test_replay_fail_pipeline_cloudwatch", + "if": f"ctx[\"_id\"] == \"{prefix_cloudwatch_logs}-000000000000\"", + } + }, + { + "fail": { + "message": "test_replay_fail_pipeline_kinesis", + "if": f"ctx[\"_id\"] == \"{prefix_kinesis}-000000000000\"", + } + }, + ] + } - # Create an expected id for kinesis-data-stream so that es.send will fail - self.elasticsearch.index( - index="logs-generic-default", - op_type="create", - id=f"{prefix_kinesis}-000000000000", - document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}, + self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors) + + self.elasticsearch.create_data_stream(index='logs-generic-default') + self.elasticsearch.put_settings( + index="logs-generic-default", body={"index.default_pipeline": "test_replay_fail_pipeline"} ) self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search(index="logs-generic-default") - assert res["hits"]["total"] == {"value": 4, "relation": "eq"} + assert res["hits"]["total"] == {"value": 0, "relation": "eq"} ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) @@ -1002,20 +1007,6 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search( index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, sort="_seq_no", ) @@ -1045,20 +1036,6 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search( index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, sort="_seq_no", ) @@ -1084,20 +1061,6 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search( index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, sort="_seq_no", ) @@ -1127,20 +1090,6 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") res = self.elasticsearch.search( index="logs-generic-default", - query={ - "bool": { - "must_not": { - "ids": { - "values": [ - f"{prefix_s3_first}-000000000000", - f"{prefix_sqs}-000000000000", - f"{prefix_cloudwatch_logs}-000000000000", - f"{prefix_kinesis}-000000000000", - ] - } - } - } - }, sort="_seq_no", ) @@ -1170,28 +1119,12 @@ def test_replay(self) -> None: self.elasticsearch.refresh(index="logs-generic-default") - # Remove the expected id for s3-sqs so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", body={"query": {"ids": {"values": [f"{prefix_s3_first}-000000000000"]}}} - ) - - # Remove the expected id for sqs so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", body={"query": {"ids": {"values": [f"{prefix_sqs}-000000000000"]}}} - ) - - # Remove the expected id for cloudwatch logs so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", - body={"query": {"ids": {"values": [f"{prefix_cloudwatch_logs}-000000000000"]}}}, - ) - - # Remove the expected id for kinesis data stream so that it can be replayed - self.elasticsearch.delete_by_query( - index="logs-generic-default", - body={"query": {"ids": {"values": [f"{prefix_kinesis}-000000000000"]}}}, - ) + # Remove pipeline processors + processors = { + "processors": [] + } + self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors) self.elasticsearch.refresh(index="logs-generic-default") # let's update the config file so that logstash won't fail anymore diff --git a/tests/testcontainers/es.py b/tests/testcontainers/es.py index 50a8abba..149b2c07 100644 --- a/tests/testcontainers/es.py +++ b/tests/testcontainers/es.py @@ -209,3 +209,9 @@ def index(self, **kwargs: Any) -> dict[str, Any]: self._index_indices.add(kwargs["index"]) return self.es_client.index(**kwargs) + + def create_data_stream(self, **kwargs: Any) -> dict[str, Any]: + if "index" in kwargs: + self._index_indices.add(kwargs["index"]) + + return self.es_client.indices.create_data_stream(kwargs["index"])