Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent duplicate _id events from reaching the replay queue #729

Merged
merged 10 commits into from
Jul 10, 2024

Conversation

emilioalvap
Copy link
Contributor

@emilioalvap emilioalvap commented Jun 12, 2024

What does this PR do?

Fixes #677.

Check status codes for _bulk requests responses to detect _id collisions and prevent them from going into the replay queue.

Why is it important?

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.md

@emilioalvap emilioalvap added the bug Something isn't working label Jun 12, 2024
@emilioalvap emilioalvap requested a review from a team June 12, 2024 19:21
@emilioalvap
Copy link
Contributor Author

cc @bturquet

@emilioalvap emilioalvap changed the title Esf fix duplicate events Prevent duplicate _id events from reaching the replay queue Jun 12, 2024

self.elasticsearch.refresh(index="logs-generic-default")

assert self.elasticsearch.count(index="logs-generic-default")["count"] == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, this means that if we send two documents X and Y, and both documents have the same id, then only one of them goes through? And the other does not?

If it is the case, I think the problem here is that we have different documents being assigned the same ID, and then both documents should be sent to ES. I did some testing and shared the results in #677 (comment), and from what I saw the issue was in the way we assign IDs, and not that the documents were the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also the situation that AWS might create two event notification for the same cloudwatch log.
So this means that we try to enrich the same doc twice.

This is the situation that is described in #677
So I guess that the _VERSION_CONFLICT check does what we want here.

@emilioalvap I guess you have tested this only with integration test right? Wondering how else we can trigger same effect

Copy link
Contributor

@gizas gizas Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little sceptical on just checking this error , that we might miss the actual cases when for some other reason we can not inject !

Copy link
Contributor Author

@emilioalvap emilioalvap Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little sceptical on just checking this error , that we might miss the actual cases when for some other reason we can not inject !

@gizas any other error comes to mind that might be on the same category? IMO, other errors that are likely returned by ES would likely warrant a replay, mostly mapping errors and such. But I'm happy to play out the scenario if you think otherwise. I think the main concern here is the disproportional amount of duplicate ids generated with a certain amount of load.

@constanca-m great point, I reviewed the event id generators and they seem to have enough uniqueness as they are but I can review again. My understanding aligns with @gizas, in that these collisions are mostly due to duplicate events/delivery guarantees. As ESF does not handle duplicates, there's always the potential to get the same event trigger more than once.
From your test, did you have the opportunity to check if any of the duplicated errors were actually different documents? How did you set up these tests? What type of data are they ingesting?

I've been mostly going by integration tests but I will do some tests on AWS next.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great point, I reviewed the event id generators and they seem to have enough uniqueness as they are but I can review again.

When I was doing the tests that gave me this graph #677 (comment) I was increasing the number of events even more in each period. I believe at some point I started to send events that were so close to each other that they were given the same timestamp. The content of the document was indeed the same, but what if a user is receiving multiple logs at the same time? Are they different from each other? I see this is the function for the IDs:

def cloudwatch_logs_object_id(event_payload: dict[str, Any]) -> str:

Is it not possible for all these fields to be the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@constanca-m in your tests, what is the timeout configured for esf lambda function? I was able to replicate the duplicate id error rate when the function would take longer to execute than the configured timeout. This seems to be related to AWS retrying the lambda invocation by default when a timeout happens:
image
image
From docs:

Even if your function doesn't return an error, it's possible for it to receive the same event from Lambda multiple times because the queue itself is eventually consistent. If the function can't keep up with incoming events, events might also be deleted from the queue without being sent to the function. Ensure that your function code gracefully handles duplicate events, and that you have enough concurrency available to handle all invocations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@constanca-m in your tests, what is the timeout configured for esf lambda function?

I unfortunately did not pay attention to that. I deployed ESF with the terraform files though, so the default is being used (3 sec?)

@constanca-m
Copy link
Contributor

From my understanding from this PR, if ESF receives a document that was already sent to ES, then it will just skip it and continue the execution. In opposition to stopping and returning an error, like it is now. Can you confirm this? This seems ok with me.

Additionally, this PR needs to increase the ESF version and add an entry to the CHANGELOG.

@gizas
Copy link
Contributor

gizas commented Jul 3, 2024

From implementation side all are ok. The key point is the comment:
events that were so close to each other that they were given the same timestamp

If we can verify that this does not happen and we can guarantee the uniqueness of the timestamp creation here then I think we are ok.

What is the timestamp's precision? I mean we include ms right as in here?

@axw
Copy link
Member

axw commented Jul 4, 2024

If the timestamp is non-unique, then we would need to update how the _id field is computed. An option for that would be to hash the entire document, using something fast and non-cryptographic, like xxhash or murmur3.

@gizas
Copy link
Contributor

gizas commented Jul 8, 2024

@emilioalvap please check comment above

I think the main concern here is the disproportional amount of duplicate ids generated with a certain amount of load.

So in order not either hide the error right away and also not producing extra amount of duplicates can we consider including only one doc in the replay queue and then continue? Sorry if I sound persistent here, just dont want to hide any error right away and miss something important here. Of course this might not show the users the actual impact if still see duplicates but we will need to document this.

@axw , @constanca-m any objections on the above?

@@ -209,3 +209,9 @@ def index(self, **kwargs: Any) -> dict[str, Any]:
self._index_indices.add(kwargs["index"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file also has _DEFAULT_VERSION = "7.17.20"
in here

Can we update this as well to 8.14?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not include upgrading the testing infrastructure as part of this PR since the logic here is not strictly dependant on ES version. ESF is still using elasticsearch client 7.17 so it matches the lowest supported version as is.

@constanca-m
Copy link
Contributor

So in order not either hide the error right away and also not producing extra amount of duplicates can we consider including only one doc in the replay queue and then continue?

This sounds good to me @gizas

@axw
Copy link
Member

axw commented Jul 9, 2024

So in order not either hide the error right away and also not producing extra amount of duplicates can we consider including only one doc in the replay queue and then continue? Sorry if I sound persistent here, just dont want to hide any error right away and miss something important here. Of course this might not show the users the actual impact if still see duplicates but we will need to document this.

@axw , @constanca-m any objections on the above?

What would be the purpose of sending a conflicted doc to the replay queue? I mean, what would you do next? Would it be onerous to make this behaviour configurable?

My preference would be to guarantee the uniqueness of document IDs and always ignore 409s, but if we're unsure of this then I think it would be OK to make the behaviour configurable.

@gizas
Copy link
Contributor

gizas commented Jul 9, 2024

I was thinking that if at least one doc reaches the replay queue and eventually the dead letter it would be a notification for those that monitor the system at least something happened. If the phenomenon continues to exist then this should be checked. Yes ok making the behaviour configurable I guess is a better option

@emilioalvap
Copy link
Contributor Author

In opposition to stopping and returning an error, like it is now. Can you confirm this? This seems ok with me.

@constanca-m, to clarify, the actual implementation does not stop when an indexing error is found, it logs it and adds it to the replay queue. The only difference with this PR is that it will skip enqueueing to replay.

So in order not either hide the error right away and also not producing extra amount of duplicates can we consider including only one doc in the replay queue and then continue?

@gizas, I have some trouble seeing how, from the perspective of a single lambda instance processing an event, we could validate that no duplicate exists in the replay queue already, could you elaborate on that? It'd probably require an external synchronization mechanism. It's also worth pointing out that these duplicate responses are getting included in ESF logs.

Like @axw mentions, I think the effort should go into validating that ESF id generators are unique enough. And so far I think they are, shared timestamps should not be enough to cause an id collision as all of them seem to include a unique identifier. Eg, cloudwatch event ids are likely guaranteed to be unique. If we factor in ESF is partitioning the id per timestamp, the possibility of an id collision (not duplication) on the same millisecond should be close to zero. The cryptographic hashing seed seems to be correctly initialized and SHA384 is not particularly known to likely produce collisions, afaik.

Please check my previous comment on how the issues @constanca-m and I were seeing with increased rates of duplicate ids are likely related to the default lambda timeout applied by the TF template in terraform-elastic-esf. In my tests after applying the same defaults as the SAR template, didn't get a single duplicate ID error after ingesting cloudwatch events north of 1M events, whereas previously 50 events would be enough to trigger it. So far I haven't been able to get a single event collision or duplicate. The default timeout has already been increased here: elastic/terraform-elastic-esf#13.

@gizas
Copy link
Contributor

gizas commented Jul 9, 2024

It's also worth pointing out that these duplicate responses are getting included in ESF logs.

Thanks @emilioalvap , which logs are those?

Yes I see it will be difficult to check with existing implementation. Maybe to keep a record of ids that have duplicates? But this means that then we will increase the memory and probably need to search and delete from the python list

Yes I was reading the above discussion, it is fine then as at least we we did some load test to verify uniqueness
Reminder before merging:

Please update ESF version and add an entry to the CHANGELOG

gizas
gizas previously approved these changes Jul 9, 2024
@emilioalvap emilioalvap merged commit cce7939 into elastic:main Jul 10, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Investigate ingestion of files with same IDs
4 participants