Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent duplicate _id events from reaching the replay queue #729

Merged
merged 10 commits into from
Jul 10, 2024
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### v1.16.0 - 2024/07/09
##### Features
* Prevent duplicate _id events from reaching the replay queue [729](https://github.com/elastic/elastic-serverless-forwarder/pull/729).

### v1.15.0 - 2024/05/29
##### Features
* Enable multiple outputs for each input [725](https://github.com/elastic/elastic-serverless-forwarder/pull/725).
Expand Down
2 changes: 1 addition & 1 deletion share/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.

version = "1.15.0"
version = "1.16.0"
6 changes: 6 additions & 0 deletions shippers/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

_EVENT_BUFFERED = "_EVENT_BUFFERED"
_EVENT_SENT = "_EVENT_SENT"
_VERSION_CONFLICT = 409


class JSONSerializer(Serializer):
Expand Down Expand Up @@ -166,6 +167,11 @@ def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None:
shared_logger.warning(
"elasticsearch shipper", extra={"error": error["create"]["error"], "_id": error["create"]["_id"]}
)

if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT:
# Skip duplicate events on replay queue
continue

shared_logger.debug("elasticsearch shipper", extra={"action": action_failed[0]})
if self._replay_handler is not None:
self._replay_handler(self._output_destination, self._replay_args, action_failed[0])
Expand Down
205 changes: 99 additions & 106 deletions tests/handlers/aws/test_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,42 +956,47 @@ def test_replay(self) -> None:
hash_kinesis_record = get_hex_prefix(f"stream-{kinesis_stream_name}-PartitionKey-{sequence_number}")
prefix_kinesis = f"{int(float(event_timestamps_kinesis_records[0]) * 1000)}-{hash_kinesis_record}"

# Create an expected id for s3-sqs so that es.send will fail
self.elasticsearch.index(
index="logs-generic-default",
op_type="create",
id=f"{prefix_s3_first}-000000000000",
document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")},
)

# Create an expected id for sqs so that es.send will fail
self.elasticsearch.index(
index="logs-generic-default",
op_type="create",
id=f"{prefix_sqs}-000000000000",
document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")},
)
# Create pipeline to reject documents
processors = {
"processors": [
{
"fail": {
"message": "test_replay_fail_pipeline_s3",
"if": f'ctx["_id"] == "{prefix_s3_first}-000000000000"',
}
},
{
"fail": {
"message": "test_replay_fail_pipeline_sqs",
"if": f'ctx["_id"] == "{prefix_sqs}-000000000000"',
}
},
{
"fail": {
"message": "test_replay_fail_pipeline_cloudwatch",
"if": f'ctx["_id"] == "{prefix_cloudwatch_logs}-000000000000"',
}
},
{
"fail": {
"message": "test_replay_fail_pipeline_kinesis",
"if": f'ctx["_id"] == "{prefix_kinesis}-000000000000"',
}
},
]
}

# Create an expected id for cloudwatch-logs so that es.send will fail
self.elasticsearch.index(
index="logs-generic-default",
op_type="create",
id=f"{prefix_cloudwatch_logs}-000000000000",
document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")},
)
self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors)

# Create an expected id for kinesis-data-stream so that es.send will fail
self.elasticsearch.index(
index="logs-generic-default",
op_type="create",
id=f"{prefix_kinesis}-000000000000",
document={"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ")},
self.elasticsearch.create_data_stream(name="logs-generic-default")
self.elasticsearch.put_settings(
index="logs-generic-default", body={"index.default_pipeline": "test_replay_fail_pipeline"}
)

self.elasticsearch.refresh(index="logs-generic-default")

res = self.elasticsearch.search(index="logs-generic-default")
assert res["hits"]["total"] == {"value": 4, "relation": "eq"}
assert res["hits"]["total"] == {"value": 0, "relation": "eq"}

ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m)

Expand All @@ -1002,20 +1007,6 @@ def test_replay(self) -> None:
self.elasticsearch.refresh(index="logs-generic-default")
res = self.elasticsearch.search(
index="logs-generic-default",
query={
"bool": {
"must_not": {
"ids": {
"values": [
f"{prefix_s3_first}-000000000000",
f"{prefix_sqs}-000000000000",
f"{prefix_cloudwatch_logs}-000000000000",
f"{prefix_kinesis}-000000000000",
]
}
}
}
},
sort="_seq_no",
)

Expand Down Expand Up @@ -1045,20 +1036,6 @@ def test_replay(self) -> None:
self.elasticsearch.refresh(index="logs-generic-default")
res = self.elasticsearch.search(
index="logs-generic-default",
query={
"bool": {
"must_not": {
"ids": {
"values": [
f"{prefix_s3_first}-000000000000",
f"{prefix_sqs}-000000000000",
f"{prefix_cloudwatch_logs}-000000000000",
f"{prefix_kinesis}-000000000000",
]
}
}
}
},
sort="_seq_no",
)

Expand All @@ -1084,20 +1061,6 @@ def test_replay(self) -> None:
self.elasticsearch.refresh(index="logs-generic-default")
res = self.elasticsearch.search(
index="logs-generic-default",
query={
"bool": {
"must_not": {
"ids": {
"values": [
f"{prefix_s3_first}-000000000000",
f"{prefix_sqs}-000000000000",
f"{prefix_cloudwatch_logs}-000000000000",
f"{prefix_kinesis}-000000000000",
]
}
}
}
},
sort="_seq_no",
)

Expand Down Expand Up @@ -1127,20 +1090,6 @@ def test_replay(self) -> None:
self.elasticsearch.refresh(index="logs-generic-default")
res = self.elasticsearch.search(
index="logs-generic-default",
query={
"bool": {
"must_not": {
"ids": {
"values": [
f"{prefix_s3_first}-000000000000",
f"{prefix_sqs}-000000000000",
f"{prefix_cloudwatch_logs}-000000000000",
f"{prefix_kinesis}-000000000000",
]
}
}
}
},
sort="_seq_no",
)

Expand Down Expand Up @@ -1170,28 +1119,10 @@ def test_replay(self) -> None:

self.elasticsearch.refresh(index="logs-generic-default")

# Remove the expected id for s3-sqs so that it can be replayed
self.elasticsearch.delete_by_query(
index="logs-generic-default", body={"query": {"ids": {"values": [f"{prefix_s3_first}-000000000000"]}}}
)

# Remove the expected id for sqs so that it can be replayed
self.elasticsearch.delete_by_query(
index="logs-generic-default", body={"query": {"ids": {"values": [f"{prefix_sqs}-000000000000"]}}}
)

# Remove the expected id for cloudwatch logs so that it can be replayed
self.elasticsearch.delete_by_query(
index="logs-generic-default",
body={"query": {"ids": {"values": [f"{prefix_cloudwatch_logs}-000000000000"]}}},
)

# Remove the expected id for kinesis data stream so that it can be replayed
self.elasticsearch.delete_by_query(
index="logs-generic-default",
body={"query": {"ids": {"values": [f"{prefix_kinesis}-000000000000"]}}},
)
# Remove pipeline processors
processors = {"processors": []}

self.elasticsearch.put_pipeline(id="test_replay_fail_pipeline", body=processors)
self.elasticsearch.refresh(index="logs-generic-default")

# let's update the config file so that logstash won't fail anymore
Expand Down Expand Up @@ -4202,3 +4133,65 @@ def test_ls_wrong_auth_creds(self) -> None:
assert second_body["event_payload"]["cloud"]["region"] == "us-east-1"
assert second_body["event_payload"]["cloud"]["account"]["id"] == "000000000000"
assert second_body["event_payload"]["tags"] == ["forwarded", "tag1", "tag2", "tag3"]

def test_es_version_conflict_exception(self) -> None:
assert isinstance(self.elasticsearch, ElasticsearchContainer)
assert isinstance(self.localstack, LocalStackContainer)

sqs_queue_name = _time_based_id(suffix="source-sqs")
sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url())

sqs_queue_arn = sqs_queue["QueueArn"]
sqs_queue_url = sqs_queue["QueueUrl"]

config_yaml: str = f"""
inputs:
- type: sqs
id: "{sqs_queue_arn}"
tags: {self.default_tags}
outputs:
- type: "elasticsearch"
args:
elasticsearch_url: "{self.elasticsearch.get_url()}"
ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint}
username: "{self.secret_arn}:username"
password: "{self.secret_arn}:password"
"""

config_file_path = "config.yaml"
config_bucket_name = _time_based_id(suffix="config-bucket")
_s3_upload_content_to_bucket(
client=self.s3_client,
content=config_yaml.encode("utf-8"),
content_type="text/plain",
bucket_name=config_bucket_name,
key=config_file_path,
)

os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}"

fixtures = [
_load_file_fixture("cloudwatch-log-1.json"),
]

_sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures))

event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn)

ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m)
first_call = handler(event, ctx) # type:ignore

assert first_call == "completed"

# Index event a second time to trigger version conflict
second_call = handler(event, ctx) # type:ignore

assert second_call == "completed"

self.elasticsearch.refresh(index="logs-generic-default")

assert self.elasticsearch.count(index="logs-generic-default")["count"] == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, this means that if we send two documents X and Y, and both documents have the same id, then only one of them goes through? And the other does not?

If it is the case, I think the problem here is that we have different documents being assigned the same ID, and then both documents should be sent to ES. I did some testing and shared the results in #677 (comment), and from what I saw the issue was in the way we assign IDs, and not that the documents were the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also the situation that AWS might create two event notification for the same cloudwatch log.
So this means that we try to enrich the same doc twice.

This is the situation that is described in #677
So I guess that the _VERSION_CONFLICT check does what we want here.

@emilioalvap I guess you have tested this only with integration test right? Wondering how else we can trigger same effect

Copy link
Contributor

@gizas gizas Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little sceptical on just checking this error , that we might miss the actual cases when for some other reason we can not inject !

Copy link
Contributor Author

@emilioalvap emilioalvap Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little sceptical on just checking this error , that we might miss the actual cases when for some other reason we can not inject !

@gizas any other error comes to mind that might be on the same category? IMO, other errors that are likely returned by ES would likely warrant a replay, mostly mapping errors and such. But I'm happy to play out the scenario if you think otherwise. I think the main concern here is the disproportional amount of duplicate ids generated with a certain amount of load.

@constanca-m great point, I reviewed the event id generators and they seem to have enough uniqueness as they are but I can review again. My understanding aligns with @gizas, in that these collisions are mostly due to duplicate events/delivery guarantees. As ESF does not handle duplicates, there's always the potential to get the same event trigger more than once.
From your test, did you have the opportunity to check if any of the duplicated errors were actually different documents? How did you set up these tests? What type of data are they ingesting?

I've been mostly going by integration tests but I will do some tests on AWS next.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great point, I reviewed the event id generators and they seem to have enough uniqueness as they are but I can review again.

When I was doing the tests that gave me this graph #677 (comment) I was increasing the number of events even more in each period. I believe at some point I started to send events that were so close to each other that they were given the same timestamp. The content of the document was indeed the same, but what if a user is receiving multiple logs at the same time? Are they different from each other? I see this is the function for the IDs:

def cloudwatch_logs_object_id(event_payload: dict[str, Any]) -> str:

Is it not possible for all these fields to be the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@constanca-m in your tests, what is the timeout configured for esf lambda function? I was able to replicate the duplicate id error rate when the function would take longer to execute than the configured timeout. This seems to be related to AWS retrying the lambda invocation by default when a timeout happens:
image
image
From docs:

Even if your function doesn't return an error, it's possible for it to receive the same event from Lambda multiple times because the queue itself is eventually consistent. If the function can't keep up with incoming events, events might also be deleted from the queue without being sent to the function. Ensure that your function code gracefully handles duplicate events, and that you have enough concurrency available to handle all invocations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@constanca-m in your tests, what is the timeout configured for esf lambda function?

I unfortunately did not pay attention to that. I deployed ESF with the terraform files though, so the default is being used (3 sec?)


# Test no duplicate events end in the replay queue
events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn)
assert len(events["Records"]) == 0
6 changes: 6 additions & 0 deletions tests/testcontainers/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,9 @@ def index(self, **kwargs: Any) -> dict[str, Any]:
self._index_indices.add(kwargs["index"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file also has _DEFAULT_VERSION = "7.17.20"
in here

Can we update this as well to 8.14?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not include upgrading the testing infrastructure as part of this PR since the logic here is not strictly dependant on ES version. ESF is still using elasticsearch client 7.17 so it matches the lowest supported version as is.


return self.es_client.index(**kwargs)

def create_data_stream(self, **kwargs: Any) -> dict[str, Any]:
if "name" in kwargs:
self._index_indices.add(kwargs["name"])

return self.es_client.indices.create_data_stream(**kwargs)
Loading