Skip to content

Commit

Permalink
Fix integration test [test_replay] to not use duplicated ids
Browse files Browse the repository at this point in the history
  • Loading branch information
emilioalvap committed Jun 13, 2024
1 parent 818b997 commit ed7dda1
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 107 deletions.
147 changes: 40 additions & 107 deletions tests/handlers/aws/test_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,42 +956,47 @@ def test_replay(self) -> None:
hash_kinesis_record = get_hex_prefix(f"stream-{kinesis_stream_name}-PartitionKey-{sequence_number}")
prefix_kinesis = f"{int(float(event_timestamps_kinesis_records[0]) * 1000)}-{hash_kinesis_record}"

# Create an expected id for s3-sqs so that es.send will fail
self.elasticsearch.index(
index="logs-generic-default",
op_type="create",
id=f"{prefix_s3_first}-000000000000",
document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")},
)

# Create an expected id for sqs so that es.send will fail
self.elasticsearch.index(
index="logs-generic-default",
op_type="create",
id=f"{prefix_sqs}-000000000000",
document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")},
)

# Create an expected id for cloudwatch-logs so that es.send will fail
self.elasticsearch.index(
index="logs-generic-default",
op_type="create",
id=f"{prefix_cloudwatch_logs}-000000000000",
document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")},
)
# Create pipeline to reject documents
processors = {
"processors": [
{
"fail": {
"message": "test_replay_fail_pipeline_s3",
"if": f"ctx[\"_id\"] == \"{prefix_s3_first}-000000000000\"",
}
},
{
"fail": {
"message": "test_replay_fail_pipeline_sqs",
"if": f"ctx[\"_id\"] == \"{prefix_sqs}-000000000000\"",
}
},
{
"fail": {
"message": "test_replay_fail_pipeline_cloudwatch",
"if": f"ctx[\"_id\"] == \"{prefix_cloudwatch_logs}-000000000000\"",
}
},
{
"fail": {
"message": "test_replay_fail_pipeline_kinesis",
"if": f"ctx[\"_id\"] == \"{prefix_kinesis}-000000000000\"",
}
},
]
}

# Create an expected id for kinesis-data-stream so that es.send will fail
self.elasticsearch.index(
index="logs-generic-default",
op_type="create",
id=f"{prefix_kinesis}-000000000000",
document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")},
self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors)

self.elasticsearch.create_data_stream(index='logs-generic-default')
self.elasticsearch.put_settings(
index="logs-generic-default", body={"index.default_pipeline": "test_replay_fail_pipeline"}
)

self.elasticsearch.refresh(index="logs-generic-default")

res = self.elasticsearch.search(index="logs-generic-default")
assert res["hits"]["total"] == {"value": 4, "relation": "eq"}
assert res["hits"]["total"] == {"value": 0, "relation": "eq"}

ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m)

Expand All @@ -1002,20 +1007,6 @@ def test_replay(self) -> None:
self.elasticsearch.refresh(index="logs-generic-default")
res = self.elasticsearch.search(
index="logs-generic-default",
query={
"bool": {
"must_not": {
"ids": {
"values": [
f"{prefix_s3_first}-000000000000",
f"{prefix_sqs}-000000000000",
f"{prefix_cloudwatch_logs}-000000000000",
f"{prefix_kinesis}-000000000000",
]
}
}
}
},
sort="_seq_no",
)

Expand Down Expand Up @@ -1045,20 +1036,6 @@ def test_replay(self) -> None:
self.elasticsearch.refresh(index="logs-generic-default")
res = self.elasticsearch.search(
index="logs-generic-default",
query={
"bool": {
"must_not": {
"ids": {
"values": [
f"{prefix_s3_first}-000000000000",
f"{prefix_sqs}-000000000000",
f"{prefix_cloudwatch_logs}-000000000000",
f"{prefix_kinesis}-000000000000",
]
}
}
}
},
sort="_seq_no",
)

Expand All @@ -1084,20 +1061,6 @@ def test_replay(self) -> None:
self.elasticsearch.refresh(index="logs-generic-default")
res = self.elasticsearch.search(
index="logs-generic-default",
query={
"bool": {
"must_not": {
"ids": {
"values": [
f"{prefix_s3_first}-000000000000",
f"{prefix_sqs}-000000000000",
f"{prefix_cloudwatch_logs}-000000000000",
f"{prefix_kinesis}-000000000000",
]
}
}
}
},
sort="_seq_no",
)

Expand Down Expand Up @@ -1127,20 +1090,6 @@ def test_replay(self) -> None:
self.elasticsearch.refresh(index="logs-generic-default")
res = self.elasticsearch.search(
index="logs-generic-default",
query={
"bool": {
"must_not": {
"ids": {
"values": [
f"{prefix_s3_first}-000000000000",
f"{prefix_sqs}-000000000000",
f"{prefix_cloudwatch_logs}-000000000000",
f"{prefix_kinesis}-000000000000",
]
}
}
}
},
sort="_seq_no",
)

Expand Down Expand Up @@ -1170,28 +1119,12 @@ def test_replay(self) -> None:

self.elasticsearch.refresh(index="logs-generic-default")

# Remove the expected id for s3-sqs so that it can be replayed
self.elasticsearch.delete_by_query(
index="logs-generic-default", body={"query": {"ids": {"values": [f"{prefix_s3_first}-000000000000"]}}}
)

# Remove the expected id for sqs so that it can be replayed
self.elasticsearch.delete_by_query(
index="logs-generic-default", body={"query": {"ids": {"values": [f"{prefix_sqs}-000000000000"]}}}
)

# Remove the expected id for cloudwatch logs so that it can be replayed
self.elasticsearch.delete_by_query(
index="logs-generic-default",
body={"query": {"ids": {"values": [f"{prefix_cloudwatch_logs}-000000000000"]}}},
)

# Remove the expected id for kinesis data stream so that it can be replayed
self.elasticsearch.delete_by_query(
index="logs-generic-default",
body={"query": {"ids": {"values": [f"{prefix_kinesis}-000000000000"]}}},
)
# Remove pipeline processors
processors = {
"processors": []
}

self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors)
self.elasticsearch.refresh(index="logs-generic-default")

# let's update the config file so that logstash won't fail anymore
Expand Down
6 changes: 6 additions & 0 deletions tests/testcontainers/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,9 @@ def index(self, **kwargs: Any) -> dict[str, Any]:
self._index_indices.add(kwargs["index"])

return self.es_client.index(**kwargs)

def create_data_stream(self, **kwargs: Any) -> dict[str, Any]:
if "index" in kwargs:
self._index_indices.add(kwargs["index"])

return self.es_client.indices.create_data_stream(kwargs["index"])

0 comments on commit ed7dda1

Please sign in to comment.