From 973b5eccd5e4e2217244934aab8e62f2ac2d5ee4 Mon Sep 17 00:00:00 2001 From: Max Ekman Date: Fri, 3 Dec 2021 14:51:00 +0100 Subject: [PATCH] fix: missing event in Redis event bus error --- eventbus/redis/eventbus.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/eventbus/redis/eventbus.go b/eventbus/redis/eventbus.go index 56629796..a11a739f 100644 --- a/eventbus/redis/eventbus.go +++ b/eventbus/redis/eventbus.go @@ -269,11 +269,10 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, groupName strin // Ignore non-matching events. if !m.Match(event) { - _, err := b.client.XAck(ctx, b.streamName, groupName, msg.ID).Result() - if err != nil { - err = fmt.Errorf("could not ack event: %w", err) + if _, err := b.client.XAck(ctx, b.streamName, groupName, msg.ID).Result(); err != nil { + err = fmt.Errorf("could not ack non-matching event: %w", err) select { - case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx}: + case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx, Event: event}: default: log.Printf("eventhorizon: missed error in Redis event bus: %s", err) } @@ -297,9 +296,9 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, groupName strin _, err = b.client.XAck(ctx, b.streamName, groupName, msg.ID).Result() if err != nil { - err = fmt.Errorf("could not ack event: %w", err) + err = fmt.Errorf("could not ack handled event: %w", err) select { - case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx}: + case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx, Event: event}: default: log.Printf("eventhorizon: missed error in Redis event bus: %s", err) }