Skip to content

Commit

Permalink
Fix bug in GCP event bus with no event data
Browse files Browse the repository at this point in the history
  • Loading branch information
maxekman committed Oct 8, 2020
1 parent 955276f commit fef4e07
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 19 deletions.
29 changes: 25 additions & 4 deletions eventbus/acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,46 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus, timeout time.Duration)
observerName = "observer"
)

// Event without data (tested in its own handler).
otherHandler := mocks.NewEventHandler("other-handler")
bus1.AddHandler(eh.MatchEvent(mocks.EventOtherType), otherHandler)
eventWithoutData := eh.NewEventForAggregate(mocks.EventOtherType, nil, timestamp,
mocks.AggregateType, uuid.New(), 1)
if err := bus1.HandleEvent(ctx, eventWithoutData); err != nil {
t.Error("there should be no error:", err)
}
expectedEvents := []eh.Event{eventWithoutData}
if !otherHandler.Wait(timeout) {
t.Error("did not receive event in time")
}
if !mocks.EqualEvents(otherHandler.Events, expectedEvents) {
t.Error("the events were incorrect:")
t.Log(otherHandler.Events)
if len(otherHandler.Events) == 1 {
t.Log(pretty.Sprint(otherHandler.Events[0]))
}
}

// Add handlers and observers.
handlerBus1 := mocks.NewEventHandler(handlerName)
handlerBus2 := mocks.NewEventHandler(handlerName)
anotherHandlerBus2 := mocks.NewEventHandler("another_handler")
observerBus1 := mocks.NewEventHandler(observerName)
observerBus2 := mocks.NewEventHandler(observerName)
bus1.AddHandler(eh.MatchAny(), handlerBus1)
bus2.AddHandler(eh.MatchAny(), handlerBus2)
bus2.AddHandler(eh.MatchAny(), anotherHandlerBus2)
bus1.AddHandler(eh.MatchEvent(mocks.EventType), handlerBus1)
bus2.AddHandler(eh.MatchEvent(mocks.EventType), handlerBus2)
bus2.AddHandler(eh.MatchEvent(mocks.EventType), anotherHandlerBus2)
// Add observers using the observer middleware.
bus1.AddHandler(eh.MatchAny(), eh.UseEventHandlerMiddleware(observerBus1, observer.Middleware))
bus2.AddHandler(eh.MatchAny(), eh.UseEventHandlerMiddleware(observerBus2, observer.Middleware))

// Event with data.
if err := bus1.HandleEvent(ctx, event1); err != nil {
t.Error("there should be no error:", err)
}

// Check for correct event in handler 1 or 2.
expectedEvents := []eh.Event{event1}
expectedEvents = []eh.Event{event1}
if !(handlerBus1.Wait(timeout) || handlerBus2.Wait(timeout)) {
t.Error("did not receive event in time")
}
Expand Down
32 changes: 17 additions & 15 deletions eventbus/gcp/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,24 +189,26 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx contex
}

// Create an event of the correct type and decode from raw BSON.
var err error
if e.data, err = eh.CreateEventData(e.EventType); err != nil {
select {
case b.errCh <- eh.EventBusError{Err: errors.New("could not create event data: " + err.Error()), Ctx: ctx}:
default:
if len(e.RawData) > 0 {
var err error
if e.data, err = eh.CreateEventData(e.EventType); err != nil {
select {
case b.errCh <- eh.EventBusError{Err: errors.New("could not create event data: " + err.Error()), Ctx: ctx}:
default:
}
msg.Nack()
return
}
msg.Nack()
return
}
if err := bson.Unmarshal(e.RawData, e.data); err != nil {
select {
case b.errCh <- eh.EventBusError{Err: errors.New("could not unmarshal event data: " + err.Error()), Ctx: ctx}:
default:
if err := bson.Unmarshal(e.RawData, e.data); err != nil {
select {
case b.errCh <- eh.EventBusError{Err: errors.New("could not unmarshal event data: " + err.Error()), Ctx: ctx}:
default:
}
msg.Nack()
return
}
msg.Nack()
return
e.RawData = nil
}
e.RawData = nil

event := event{evt: e}
ctx = eh.UnmarshalContext(e.Context)
Expand Down

0 comments on commit fef4e07

Please sign in to comment.