You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Version Information
Akka 1.4.49
Akka.Streams.Kafka 1.4.49
Describe the bug
When an exception is thrown within the KafkaConsumerActor and handled by BaseSingleSourceLogic under the Restart directive. The Committer still keeps a reference to the previous ActorRef. This means that all offset commits are being sent to the wrong actor which means no offset commits are done to Kafka anymore.
To Reproduce
Override OnException from the DefaultConsumerDecider to issue a Directive.Restart on an exception.
Throw an exception in KafkaConsumerActor which gets handled by the ProcessExceptions method.
The actor will get recreated in BaseSingleSourceLogic but the Committer will still keep a reference to the previous actor.
Expected behavior
The reference to the consumer actor in the Committer should be updated. I found that recreating the messageBuilder using the factory in BaseSingleSourceLogic after creating the new KafkaConsumerActor seems to fix it, but I'm not sure if that is the best way to go about it.
Actual behavior
After the KafkaConsumerActor is recreated under the Restart directive in BaseSingleSourceLogic, offsets are no longer committed to Kafka.
Environment
Windows, .NET 6?
Additional context
Related to the work done in this PR and raised in this ticket.
The consumer actor now gets recreated as it should but with side effects.
The text was updated successfully, but these errors were encountered:
Version Information
Akka 1.4.49
Akka.Streams.Kafka 1.4.49
Describe the bug
When an exception is thrown within the KafkaConsumerActor and handled by BaseSingleSourceLogic under the
Restart
directive. The Committer still keeps a reference to the previous ActorRef. This means that all offset commits are being sent to the wrong actor which means no offset commits are done to Kafka anymore.To Reproduce
OnException
from theDefaultConsumerDecider
to issue aDirective.Restart
on an exception.KafkaConsumerActor
which gets handled by theProcessExceptions
method.Expected behavior
The reference to the consumer actor in the Committer should be updated. I found that recreating the
messageBuilder
using the factory inBaseSingleSourceLogic
after creating the new KafkaConsumerActor seems to fix it, but I'm not sure if that is the best way to go about it.Actual behavior
After the KafkaConsumerActor is recreated under the
Restart
directive in BaseSingleSourceLogic, offsets are no longer committed to Kafka.Environment
Windows, .NET 6?
Additional context
Related to the work done in this PR and raised in this ticket.
The consumer actor now gets recreated as it should but with side effects.
The text was updated successfully, but these errors were encountered: