From e7b02e83097c7f2cbd5df7ea79d9e1bc473ca33d Mon Sep 17 00:00:00 2001 From: Tobias Rauter Date: Fri, 6 Mar 2020 14:59:05 +0100 Subject: [PATCH] enable offset resetting to 0 (used for recovery) --- faust/transport/drivers/aiokafka.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/faust/transport/drivers/aiokafka.py b/faust/transport/drivers/aiokafka.py index 4987de721..29a9bc95f 100644 --- a/faust/transport/drivers/aiokafka.py +++ b/faust/transport/drivers/aiokafka.py @@ -741,6 +741,8 @@ async def _seek_wait(self, consumer.seek(tp, offset) if offset > 0: self.consumer._read_offset[tp] = offset + elif tp in self.consumer._read_offset.keys(): + del self.consumer._read_offset[tp] await asyncio.gather(*[ consumer.position(tp) for tp in partitions ])