Skip to content

Commit

Permalink
fix: event tests
Browse files Browse the repository at this point in the history
Signed-off-by: Rodney Osodo <[email protected]>
  • Loading branch information
rodneyosodo committed Feb 12, 2024
1 parent 79bc9f9 commit 299e53b
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 75 deletions.
19 changes: 9 additions & 10 deletions pkg/events/nats/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func TestPublish(t *testing.T) {
assert.Equal(t, tc.event["status"], receivedEvent["status"])
assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"])
assert.Equal(t, tc.event["operation"], receivedEvent["operation"])

default:
assert.ErrorContains(t, err, tc.err.Error())
}
Expand Down Expand Up @@ -184,7 +183,7 @@ func TestPubsub(t *testing.T) {
desc: "Subscribe to an empty stream with an empty consumer",
stream: "",
consumer: "",
err: nats.ErrEmptyConsumer,
err: nats.ErrEmptyStream,
handler: handler{false},
},
{
Expand Down Expand Up @@ -217,25 +216,25 @@ func TestPubsub(t *testing.T) {
},
}

for _, pc := range cases {
t.Run(pc.desc, func(t *testing.T) {
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger)
if err != nil {
assert.Equal(t, err, pc.err)
assert.Equal(t, err, tc.err)

return
}

cfg := events.SubscriberConfig{
Stream: pc.stream,
Consumer: pc.consumer,
Handler: pc.handler,
Stream: tc.stream,
Consumer: tc.consumer,
Handler: tc.handler,
}
switch err := subcriber.Subscribe(context.Background(), cfg); {
case err == nil:
assert.Nil(t, err)
default:
assert.Equal(t, err, pc.err)
assert.Equal(t, err, tc.err)
}

err = subcriber.Close()
Expand All @@ -252,7 +251,7 @@ func TestUnavailablePublish(t *testing.T) {
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

cfg := events.SubscriberConfig{
Stream: stream,
Stream: "events." + stream,
Consumer: consumer,
Handler: handler{},
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/events/nats/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ func NewSubscriber(ctx context.Context, url string, logger *slog.Logger) (events
}

func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error {
if cfg.Consumer == "" {
return ErrEmptyConsumer
}
if cfg.Stream == "" {
return ErrEmptyStream
}
if cfg.Consumer == "" {
return ErrEmptyConsumer
}

subCfg := messaging.SubscriberConfig{
ID: cfg.Consumer,
Expand Down
44 changes: 21 additions & 23 deletions pkg/events/rabbitmq/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,30 +129,30 @@ func TestPublish(t *testing.T) {
},
}

for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
event := testEvent{Data: c.event}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
event := testEvent{Data: tc.event}

err := publisher.Publish(context.Background(), event)
switch c.err {
switch tc.err {
case nil:
receivedEvent := <-eventsChan

val := int64(receivedEvent["occurred_at"].(float64))
if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) {
delete(receivedEvent, "occurred_at")
delete(c.event, "occurred_at")
delete(tc.event, "occurred_at")
}

assert.Equal(t, c.event["temperature"], receivedEvent["temperature"])
assert.Equal(t, c.event["humidity"], receivedEvent["humidity"])
assert.Equal(t, c.event["sensor_id"], receivedEvent["sensor_id"])
assert.Equal(t, c.event["status"], receivedEvent["status"])
assert.Equal(t, c.event["timestamp"], receivedEvent["timestamp"])
assert.Equal(t, c.event["operation"], receivedEvent["operation"])
assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"])
assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"])
assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"])
assert.Equal(t, tc.event["status"], receivedEvent["status"])
assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"])
assert.Equal(t, tc.event["operation"], receivedEvent["operation"])

default:
assert.ErrorContains(t, err, c.err.Error())
assert.ErrorContains(t, err, tc.err.Error())
}
})
}
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestPubsub(t *testing.T) {
desc: "Subscribe to an empty stream with an empty consumer",
stream: "",
consumer: "",
err: rabbitmq.ErrEmptyConsumer,
err: rabbitmq.ErrEmptyStream,
handler: handler{false},
},
{
Expand Down Expand Up @@ -217,27 +217,25 @@ func TestPubsub(t *testing.T) {
},
}

for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger)
if err != nil {
assert.Equal(t, err, c.err)
assert.Equal(t, err, tc.err)

return
}

assert.Nil(t, err)

cfg := events.SubscriberConfig{
Stream: c.stream,
Consumer: c.consumer,
Handler: c.handler,
Stream: tc.stream,
Consumer: tc.consumer,
Handler: tc.handler,
}
switch err := subcriber.Subscribe(context.Background(), cfg); {
case err == nil:
assert.Nil(t, err)
default:
assert.Equal(t, err, c.err)
assert.Equal(t, err, tc.err)
}

err = subcriber.Close()
Expand All @@ -254,7 +252,7 @@ func TestUnavailablePublish(t *testing.T) {
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

cfg := events.SubscriberConfig{
Stream: stream,
Stream: "events." + stream,
Consumer: consumer,
Handler: handler{},
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/events/rabbitmq/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ func NewSubscriber(url string, logger *slog.Logger) (events.Subscriber, error) {
}

func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error {
if cfg.Consumer == "" {
return ErrEmptyConsumer
}
if cfg.Stream == "" {
return ErrEmptyStream
}
if cfg.Consumer == "" {
return ErrEmptyConsumer
}

subCfg := messaging.SubscriberConfig{
ID: cfg.Consumer,
Expand Down
2 changes: 1 addition & 1 deletion pkg/events/redis/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewPublisher(ctx context.Context, url, stream string, flushPeriod time.Dura
es := &pubEventStore{
client: redis.NewClient(opts),
unpublishedEvents: make(chan *redis.XAddArgs, events.MaxUnpublishedEvents),
stream: stream,
stream: eventsPrefix + stream,
flushPeriod: flushPeriod,
}

Expand Down
55 changes: 26 additions & 29 deletions pkg/events/redis/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ import (
)

var (
streamName = "magistrala.eventstest"
stream = "tests.events"
consumer = "test-consumer"
eventsChan = make(chan map[string]interface{})
logger = mglog.NewMock()
errFailed = errors.New("failed")
numEvents = 100
stream = "tests.events"
)

type testEvent struct {
Expand Down Expand Up @@ -57,10 +56,10 @@ func TestPublish(t *testing.T) {
err := redisClient.FlushAll(context.Background()).Err()
assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err))

_, err = redis.NewPublisher(context.Background(), "http://invaliurl.com", streamName, events.UnpublishedEventsCheckInterval)
_, err = redis.NewPublisher(context.Background(), "http://invaliurl.com", stream, events.UnpublishedEventsCheckInterval)
assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err)

publisher, err := redis.NewPublisher(context.Background(), redisURL, streamName, events.UnpublishedEventsCheckInterval)
publisher, err := redis.NewPublisher(context.Background(), redisURL, stream, events.UnpublishedEventsCheckInterval)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))
defer publisher.Close()

Expand Down Expand Up @@ -136,33 +135,31 @@ func TestPublish(t *testing.T) {
},
}

for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
event := testEvent{Data: c.event}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
event := testEvent{Data: tc.event}

err := publisher.Publish(context.Background(), event)
switch c.err {
switch tc.err {
case nil:
assert.Nil(t, err)

receivedEvent := <-eventsChan

roa, err := strconv.ParseInt(receivedEvent["occurred_at"].(string), 10, 64)
assert.Nil(t, err)
if assert.WithinRange(t, time.Unix(0, roa), time.Now().Add(-time.Second), time.Now().Add(time.Second)) {
delete(receivedEvent, "occurred_at")
delete(c.event, "occurred_at")
delete(tc.event, "occurred_at")
}

assert.Equal(t, c.event["temperature"], receivedEvent["temperature"])
assert.Equal(t, c.event["humidity"], receivedEvent["humidity"])
assert.Equal(t, c.event["sensor_id"], receivedEvent["sensor_id"])
assert.Equal(t, c.event["status"], receivedEvent["status"])
assert.Equal(t, c.event["timestamp"], receivedEvent["timestamp"])
assert.Equal(t, c.event["operation"], receivedEvent["operation"])
assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"])
assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"])
assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"])
assert.Equal(t, tc.event["status"], receivedEvent["status"])
assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"])
assert.Equal(t, tc.event["operation"], receivedEvent["operation"])

default:
assert.ErrorContains(t, err, c.err.Error())
assert.ErrorContains(t, err, tc.err.Error())
}
})
}
Expand Down Expand Up @@ -197,7 +194,7 @@ func TestPubsub(t *testing.T) {
desc: "Subscribe to an empty stream with an empty consumer",
stream: "",
consumer: "",
err: redis.ErrEmptyConsumer,
err: redis.ErrEmptyStream,
handler: handler{false},
},
{
Expand Down Expand Up @@ -230,25 +227,25 @@ func TestPubsub(t *testing.T) {
},
}

for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
subcriber, err := redis.NewSubscriber(redisURL, logger)
if err != nil {
assert.Equal(t, err, c.err)
assert.Equal(t, err, tc.err)

return
}

cfg := events.SubscriberConfig{
Stream: c.stream,
Consumer: c.consumer,
Handler: c.handler,
Stream: tc.stream,
Consumer: tc.consumer,
Handler: tc.handler,
}
switch err := subcriber.Subscribe(context.TODO(), cfg); {
switch err := subcriber.Subscribe(context.Background(), cfg); {
case err == nil:
assert.Nil(t, err)
default:
assert.Equal(t, err, c.err)
assert.Equal(t, err, tc.err)
}

err = subcriber.Close()
Expand All @@ -258,14 +255,14 @@ func TestPubsub(t *testing.T) {
}

func TestUnavailablePublish(t *testing.T) {
publisher, err := redis.NewPublisher(context.Background(), redisURL, streamName, time.Second)
publisher, err := redis.NewPublisher(context.Background(), redisURL, stream, time.Second)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

subcriber, err := redis.NewSubscriber(redisURL, logger)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

cfg := events.SubscriberConfig{
Stream: streamName,
Stream: "events." + stream,
Consumer: consumer,
Handler: handler{},
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/events/redis/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
)

const (
eventCount = 100
exists = "BUSYGROUP Consumer Group name already exists"
group = "magistrala"
eventsPrefix = "events."
eventCount = 100
exists = "BUSYGROUP Consumer Group name already exists"
group = "magistrala"
)

var _ events.Subscriber = (*subEventStore)(nil)
Expand Down Expand Up @@ -47,12 +48,12 @@ func NewSubscriber(url string, logger *slog.Logger) (events.Subscriber, error) {
}

func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error {
if cfg.Consumer == "" {
return ErrEmptyConsumer
}
if cfg.Stream == "" {
return ErrEmptyStream
}
if cfg.Consumer == "" {
return ErrEmptyConsumer
}

err := es.client.XGroupCreateMkStream(ctx, cfg.Stream, group, "$").Err()
if err != nil && err.Error() != exists {
Expand Down

0 comments on commit 299e53b

Please sign in to comment.