From cbe2f2fc3f6e13ac0301df05d128cd253907ad6a Mon Sep 17 00:00:00 2001 From: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Date: Wed, 10 Jan 2024 16:04:29 +0300 Subject: [PATCH 1/3] refactor(event): Add subscriber config This commit adds event subscriber which combines stream, consumer and handler to a single struct to the codebase. The code is related to event handling, subscribing to event stores, and using message brokers like RabbitMQ, Redis, and NATS. Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> --- cmd/bootstrap/main.go | 16 ++++--- cmd/lora/main.go | 20 +++++---- cmd/opcua/main.go | 20 +++++---- pkg/events/events.go | 9 +++- pkg/events/nats/publisher_test.go | 30 ++++++++++--- pkg/events/nats/setup_test.go | 2 +- pkg/events/nats/subscriber.go | 41 ++++++++---------- pkg/events/rabbitmq/publisher_test.go | 30 ++++++++++--- pkg/events/rabbitmq/setup_test.go | 2 +- pkg/events/rabbitmq/subscriber.go | 41 ++++++++---------- pkg/events/redis/publisher_test.go | 29 ++++++++++--- pkg/events/redis/subscriber.go | 43 ++++++++----------- .../store/{brokers_nats.go => store_nats.go} | 4 +- ...{brokers_rabbitmq.go => store_rabbitmq.go} | 4 +- .../{brokers_redis.go => store_redis.go} | 4 +- 15 files changed, 173 insertions(+), 122 deletions(-) rename pkg/events/store/{brokers_nats.go => store_nats.go} (76%) rename pkg/events/store/{brokers_rabbitmq.go => store_rabbitmq.go} (76%) rename pkg/events/store/{brokers_redis.go => store_redis.go} (78%) diff --git a/cmd/bootstrap/main.go b/cmd/bootstrap/main.go index 78200f43cc..7fe956f30a 100644 --- a/cmd/bootstrap/main.go +++ b/cmd/bootstrap/main.go @@ -28,6 +28,7 @@ import ( httpserver "github.com/absmach/magistrala/internal/server/http" mglog "github.com/absmach/magistrala/logger" "github.com/absmach/magistrala/pkg/auth" + "github.com/absmach/magistrala/pkg/events" "github.com/absmach/magistrala/pkg/events/store" mgsdk "github.com/absmach/magistrala/pkg/sdk/go" "github.com/absmach/magistrala/pkg/uuid" @@ -142,6 +143,8 @@ func main() { return } + logger.Info("Subscribed to Event Store") + httpServerConfig := server.Config{Port: defSvcHTTPPort} if err := env.ParseWithOptions(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil { logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) @@ -197,14 +200,15 @@ func newService(ctx context.Context, authClient magistrala.AuthServiceClient, db } func subscribeToThingsES(ctx context.Context, svc bootstrap.Service, cfg config, logger *slog.Logger) error { - subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) + subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger) if err != nil { return err } - handler := consumer.NewEventHandler(svc) - - logger.Info("Subscribed to Redis Event Store") - - return subscriber.Subscribe(ctx, handler) + subConfig := events.SubscriberConfig{ + Stream: thingsStream, + Consumer: cfg.ESConsumerName, + Handler: consumer.NewEventHandler(svc), + } + return subscriber.Subscribe(ctx, subConfig) } diff --git a/cmd/lora/main.go b/cmd/lora/main.go index 348029afc9..ed45c3721a 100644 --- a/cmd/lora/main.go +++ b/cmd/lora/main.go @@ -23,8 +23,9 @@ import ( mglog "github.com/absmach/magistrala/logger" "github.com/absmach/magistrala/lora" "github.com/absmach/magistrala/lora/api" - "github.com/absmach/magistrala/lora/events" + loraevents "github.com/absmach/magistrala/lora/events" "github.com/absmach/magistrala/lora/mqtt" + "github.com/absmach/magistrala/pkg/events" "github.com/absmach/magistrala/pkg/events/store" "github.com/absmach/magistrala/pkg/messaging" "github.com/absmach/magistrala/pkg/messaging/brokers" @@ -147,6 +148,8 @@ func main() { return } + logger.Info("Subscribed to Event Store") + hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(cfg.InstanceID), logger) if cfg.SendTelemetry { @@ -198,21 +201,22 @@ func subscribeToLoRaBroker(svc lora.Service, mc mqttpaho.Client, timeout time.Du } func subscribeToThingsES(ctx context.Context, svc lora.Service, cfg config, logger *slog.Logger) error { - subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) + subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger) if err != nil { return err } - handler := events.NewEventHandler(svc) - - logger.Info("Subscribed to Redis Event Store") - - return subscriber.Subscribe(ctx, handler) + subConfig := events.SubscriberConfig{ + Stream: thingsStream, + Consumer: cfg.ESConsumerName, + Handler: loraevents.NewEventHandler(svc), + } + return subscriber.Subscribe(ctx, subConfig) } func newRouteMapRepository(client *redis.Client, prefix string, logger *slog.Logger) lora.RouteMapRepository { logger.Info(fmt.Sprintf("Connected to %s Redis Route-map", prefix)) - return events.NewRouteMapRepository(client, prefix) + return loraevents.NewRouteMapRepository(client, prefix) } func newService(pub messaging.Publisher, rmConn *redis.Client, thingsRMPrefix, channelsRMPrefix, connsRMPrefix string, logger *slog.Logger) lora.Service { diff --git a/cmd/opcua/main.go b/cmd/opcua/main.go index 2f4f63af3e..f38da90330 100644 --- a/cmd/opcua/main.go +++ b/cmd/opcua/main.go @@ -23,8 +23,9 @@ import ( "github.com/absmach/magistrala/opcua" "github.com/absmach/magistrala/opcua/api" "github.com/absmach/magistrala/opcua/db" - "github.com/absmach/magistrala/opcua/events" + opcuaevents "github.com/absmach/magistrala/opcua/events" "github.com/absmach/magistrala/opcua/gopcua" + "github.com/absmach/magistrala/pkg/events" "github.com/absmach/magistrala/pkg/events/store" "github.com/absmach/magistrala/pkg/messaging/brokers" brokerstracing "github.com/absmach/magistrala/pkg/messaging/brokers/tracing" @@ -142,6 +143,8 @@ func main() { return } + logger.Info("Subscribed to Event Store") + hs := httpserver.New(ctx, httpCancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, cfg.InstanceID), logger) if cfg.SendTelemetry { @@ -181,21 +184,22 @@ func subscribeToStoredSubs(ctx context.Context, sub opcua.Subscriber, cfg opcua. } func subscribeToThingsES(ctx context.Context, svc opcua.Service, cfg config, logger *slog.Logger) error { - subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, thingsStream, cfg.ESConsumerName, logger) + subscriber, err := store.NewSubscriber(ctx, cfg.ESURL, logger) if err != nil { return err } - handler := events.NewEventHandler(svc) - - logger.Info("Subscribed to Redis Event Store") - - return subscriber.Subscribe(ctx, handler) + subConfig := events.SubscriberConfig{ + Stream: thingsStream, + Consumer: cfg.ESConsumerName, + Handler: opcuaevents.NewEventHandler(svc), + } + return subscriber.Subscribe(ctx, subConfig) } func newRouteMapRepositoy(client *redis.Client, prefix string, logger *slog.Logger) opcua.RouteMapRepository { logger.Info(fmt.Sprintf("Connected to %s Redis Route-map", prefix)) - return events.NewRouteMapRepository(client, prefix) + return opcuaevents.NewRouteMapRepository(client, prefix) } func newService(sub opcua.Subscriber, browser opcua.Browser, thingRM, chanRM, connRM opcua.RouteMapRepository, opcuaConfig opcua.Config, logger *slog.Logger) opcua.Service { diff --git a/pkg/events/events.go b/pkg/events/events.go index f0281cd061..626bef11d3 100644 --- a/pkg/events/events.go +++ b/pkg/events/events.go @@ -38,12 +38,19 @@ type EventHandler interface { Handle(ctx context.Context, event Event) error } +// SubscriberConfig represents event subscriber configuration. +type SubscriberConfig struct { + Consumer string + Stream string + Handler EventHandler +} + // Subscriber specifies event subscription API. // //go:generate mockery --name Subscriber --output=./mocks --filename subscriber.go --quiet --note "Copyright (c) Abstract Machines" type Subscriber interface { // Subscribe subscribes to the event stream and consumes events. - Subscribe(ctx context.Context, handler EventHandler) error + Subscribe(ctx context.Context, cfg SubscriberConfig) error // Close gracefully closes event subscriber's connection. Close() error diff --git a/pkg/events/nats/publisher_test.go b/pkg/events/nats/publisher_test.go index ef69550644..30f14cab39 100644 --- a/pkg/events/nats/publisher_test.go +++ b/pkg/events/nats/publisher_test.go @@ -57,13 +57,18 @@ func TestPublish(t *testing.T) { publisher, err := nats.NewPublisher(context.Background(), natsURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - _, err = nats.NewSubscriber(context.Background(), "http://invaliurl.com", stream, consumer, logger) + _, err = nats.NewSubscriber(context.Background(), "http://invaliurl.com", logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) - subcriber, err := nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger) + subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - err = subcriber.Subscribe(context.Background(), handler{}) + cfg := events.SubscriberConfig{ + Stream: stream, + Consumer: consumer, + Handler: handler{}, + } + err = subcriber.Subscribe(context.Background(), cfg) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) cases := []struct { @@ -209,14 +214,20 @@ func TestPubsub(t *testing.T) { } for _, pc := range subcases { - subcriber, err := nats.NewSubscriber(context.Background(), natsURL, pc.stream, pc.consumer, logger) + subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger) if err != nil { assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) continue } + assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - switch err := subcriber.Subscribe(context.Background(), pc.handler); { + cfg := events.SubscriberConfig{ + Stream: pc.stream, + Consumer: pc.consumer, + Handler: pc.handler, + } + switch err := subcriber.Subscribe(context.Background(), cfg); { case err == nil: assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) default: @@ -232,10 +243,15 @@ func TestUnavailablePublish(t *testing.T) { publisher, err := nats.NewPublisher(context.Background(), natsURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - subcriber, err := nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger) + subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - err = subcriber.Subscribe(context.Background(), handler{}) + cfg := events.SubscriberConfig{ + Stream: stream, + Consumer: consumer, + Handler: handler{}, + } + err = subcriber.Subscribe(context.Background(), cfg) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) err = pool.Client.PauseContainer(container.Container.ID) diff --git a/pkg/events/nats/setup_test.go b/pkg/events/nats/setup_test.go index 143e4863b9..e539aca537 100644 --- a/pkg/events/nats/setup_test.go +++ b/pkg/events/nats/setup_test.go @@ -52,7 +52,7 @@ func TestMain(m *testing.M) { } if err := pool.Retry(func() error { - _, err = nats.NewSubscriber(context.Background(), natsURL, stream, consumer, logger) + _, err = nats.NewSubscriber(context.Background(), natsURL, logger) return err }); err != nil { log.Fatalf("Could not connect to docker: %s", err) diff --git a/pkg/events/nats/subscriber.go b/pkg/events/nats/subscriber.go index 9154678fb5..8df9607077 100644 --- a/pkg/events/nats/subscriber.go +++ b/pkg/events/nats/subscriber.go @@ -47,22 +47,12 @@ var ( ) type subEventStore struct { - conn *nats.Conn - pubsub messaging.PubSub - stream string - consumer string - logger *slog.Logger + conn *nats.Conn + pubsub messaging.PubSub + logger *slog.Logger } -func NewSubscriber(ctx context.Context, url, stream, consumer string, logger *slog.Logger) (events.Subscriber, error) { - if stream == "" { - return nil, ErrEmptyStream - } - - if consumer == "" { - return nil, ErrEmptyConsumer - } - +func NewSubscriber(ctx context.Context, url string, logger *slog.Logger) (events.Subscriber, error) { conn, err := nats.Connect(url, nats.MaxReconnects(maxReconnects)) if err != nil { return nil, err @@ -82,20 +72,25 @@ func NewSubscriber(ctx context.Context, url, stream, consumer string, logger *sl } return &subEventStore{ - conn: conn, - pubsub: pubsub, - stream: stream, - consumer: consumer, - logger: logger, + conn: conn, + pubsub: pubsub, + logger: logger, }, nil } -func (es *subEventStore) Subscribe(ctx context.Context, handler events.EventHandler) error { +func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error { + if cfg.Consumer == "" { + return ErrEmptyConsumer + } + if cfg.Stream == "" { + return ErrEmptyStream + } + subCfg := messaging.SubscriberConfig{ - ID: es.consumer, - Topic: eventsPrefix + "." + es.stream, + ID: cfg.Consumer, + Topic: eventsPrefix + "." + cfg.Stream, Handler: &eventHandler{ - handler: handler, + handler: cfg.Handler, ctx: ctx, logger: es.logger, }, diff --git a/pkg/events/rabbitmq/publisher_test.go b/pkg/events/rabbitmq/publisher_test.go index 252784a3c0..e8a2c65de8 100644 --- a/pkg/events/rabbitmq/publisher_test.go +++ b/pkg/events/rabbitmq/publisher_test.go @@ -57,13 +57,18 @@ func TestPublish(t *testing.T) { publisher, err := rabbitmq.NewPublisher(context.Background(), rabbitmqURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - _, err = rabbitmq.NewSubscriber("http://invaliurl.com", stream, consumer, logger) + _, err = rabbitmq.NewSubscriber("http://invaliurl.com", logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) - subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, stream, consumer, logger) + subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - err = subcriber.Subscribe(context.Background(), handler{}) + cfg := events.SubscriberConfig{ + Stream: stream, + Consumer: consumer, + Handler: handler{}, + } + err = subcriber.Subscribe(context.Background(), cfg) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) cases := []struct { @@ -210,14 +215,20 @@ func TestPubsub(t *testing.T) { } for _, pc := range subcases { - subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, pc.stream, pc.consumer, logger) + subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger) if err != nil { assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) continue } + assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - switch err := subcriber.Subscribe(context.Background(), pc.handler); { + cfg := events.SubscriberConfig{ + Stream: pc.stream, + Consumer: pc.consumer, + Handler: pc.handler, + } + switch err := subcriber.Subscribe(context.Background(), cfg); { case err == nil: assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) default: @@ -233,10 +244,15 @@ func TestUnavailablePublish(t *testing.T) { publisher, err := rabbitmq.NewPublisher(context.Background(), rabbitmqURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, stream, consumer, logger) + subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - err = subcriber.Subscribe(context.Background(), handler{}) + cfg := events.SubscriberConfig{ + Stream: stream, + Consumer: consumer, + Handler: handler{}, + } + err = subcriber.Subscribe(context.Background(), cfg) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) err = pool.Client.PauseContainer(container.Container.ID) diff --git a/pkg/events/rabbitmq/setup_test.go b/pkg/events/rabbitmq/setup_test.go index 4db3c7c254..dcbf066afd 100644 --- a/pkg/events/rabbitmq/setup_test.go +++ b/pkg/events/rabbitmq/setup_test.go @@ -51,7 +51,7 @@ func TestMain(m *testing.M) { } if err := pool.Retry(func() error { - _, err = rabbitmq.NewSubscriber(rabbitmqURL, stream, consumer, logger) + _, err = rabbitmq.NewSubscriber(rabbitmqURL, logger) return err }); err != nil { log.Fatalf("Could not connect to docker: %s", err) diff --git a/pkg/events/rabbitmq/subscriber.go b/pkg/events/rabbitmq/subscriber.go index 88d2713f44..4c78ad88d0 100644 --- a/pkg/events/rabbitmq/subscriber.go +++ b/pkg/events/rabbitmq/subscriber.go @@ -30,22 +30,12 @@ var ( ) type subEventStore struct { - conn *amqp.Connection - pubsub messaging.PubSub - stream string - consumer string - logger *slog.Logger + conn *amqp.Connection + pubsub messaging.PubSub + logger *slog.Logger } -func NewSubscriber(url, stream, consumer string, logger *slog.Logger) (events.Subscriber, error) { - if stream == "" { - return nil, ErrEmptyStream - } - - if consumer == "" { - return nil, ErrEmptyConsumer - } - +func NewSubscriber(url string, logger *slog.Logger) (events.Subscriber, error) { conn, err := amqp.Dial(url) if err != nil { return nil, err @@ -64,20 +54,25 @@ func NewSubscriber(url, stream, consumer string, logger *slog.Logger) (events.Su } return &subEventStore{ - conn: conn, - pubsub: pubsub, - stream: stream, - consumer: consumer, - logger: logger, + conn: conn, + pubsub: pubsub, + logger: logger, }, nil } -func (es *subEventStore) Subscribe(ctx context.Context, handler events.EventHandler) error { +func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error { + if cfg.Consumer == "" { + return ErrEmptyConsumer + } + if cfg.Stream == "" { + return ErrEmptyStream + } + subCfg := messaging.SubscriberConfig{ - ID: es.consumer, - Topic: eventsPrefix + "." + es.stream, + ID: cfg.Consumer, + Topic: eventsPrefix + "." + cfg.Stream, Handler: &eventHandler{ - handler: handler, + handler: cfg.Handler, ctx: ctx, logger: es.logger, }, diff --git a/pkg/events/redis/publisher_test.go b/pkg/events/redis/publisher_test.go index 9bf226dd76..cad5fd7a89 100644 --- a/pkg/events/redis/publisher_test.go +++ b/pkg/events/redis/publisher_test.go @@ -66,13 +66,18 @@ func TestPublish(t *testing.T) { publisher, err := redis.NewPublisher(context.Background(), redisURL, streamName, events.UnpublishedEventsCheckInterval) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - _, err = redis.NewSubscriber("http://invaliurl.com", streamName, consumer, logger) + _, err = redis.NewSubscriber("http://invaliurl.com", logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) - subcriber, err := redis.NewSubscriber(redisURL, streamName, consumer, logger) + subcriber, err := redis.NewSubscriber(redisURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - err = subcriber.Subscribe(context.Background(), handler{}) + cfg := events.SubscriberConfig{ + Stream: streamName, + Consumer: consumer, + Handler: handler{}, + } + err = subcriber.Subscribe(context.Background(), cfg) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) cases := []struct { @@ -223,7 +228,7 @@ func TestPubsub(t *testing.T) { } for _, pc := range subcases { - subcriber, err := redis.NewSubscriber(redisURL, pc.stream, pc.consumer, logger) + subcriber, err := redis.NewSubscriber(redisURL, logger) if err != nil { assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) @@ -232,7 +237,12 @@ func TestPubsub(t *testing.T) { assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - switch err := subcriber.Subscribe(context.Background(), pc.handler); { + cfg := events.SubscriberConfig{ + Stream: pc.stream, + Consumer: pc.consumer, + Handler: pc.handler, + } + switch err := subcriber.Subscribe(context.Background(), cfg); { case err == nil: assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) default: @@ -248,10 +258,15 @@ func TestUnavailablePublish(t *testing.T) { publisher, err := redis.NewPublisher(context.Background(), redisURL, streamName, time.Second) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - subcriber, err := redis.NewSubscriber(redisURL, streamName, consumer, logger) + subcriber, err := redis.NewSubscriber(redisURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) - err = subcriber.Subscribe(context.Background(), handler{}) + cfg := events.SubscriberConfig{ + Stream: streamName, + Consumer: consumer, + Handler: handler{}, + } + err = subcriber.Subscribe(context.Background(), cfg) assert.Nil(t, err, fmt.Sprintf("got unexpected error on subscribing to event store: %s", err)) err = pool.Client.PauseContainer(container.Container.ID) diff --git a/pkg/events/redis/subscriber.go b/pkg/events/redis/subscriber.go index 64d1724d2e..ad3ce62f1d 100644 --- a/pkg/events/redis/subscriber.go +++ b/pkg/events/redis/subscriber.go @@ -33,36 +33,31 @@ var ( ) type subEventStore struct { - client *redis.Client - stream string - consumer string - logger *slog.Logger + client *redis.Client + logger *slog.Logger } -func NewSubscriber(url, stream, consumer string, logger *slog.Logger) (events.Subscriber, error) { - if stream == "" { - return nil, ErrEmptyStream - } - - if consumer == "" { - return nil, ErrEmptyConsumer - } - +func NewSubscriber(url string, logger *slog.Logger) (events.Subscriber, error) { opts, err := redis.ParseURL(url) if err != nil { return nil, err } return &subEventStore{ - client: redis.NewClient(opts), - stream: stream, - consumer: consumer, - logger: logger, + client: redis.NewClient(opts), + logger: logger, }, nil } -func (es *subEventStore) Subscribe(ctx context.Context, handler events.EventHandler) error { - err := es.client.XGroupCreateMkStream(ctx, es.stream, group, "$").Err() +func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error { + if cfg.Consumer == "" { + return ErrEmptyConsumer + } + if cfg.Stream == "" { + return ErrEmptyStream + } + + err := es.client.XGroupCreateMkStream(ctx, cfg.Stream, group, "$").Err() if err != nil && err.Error() != exists { return err } @@ -71,8 +66,8 @@ func (es *subEventStore) Subscribe(ctx context.Context, handler events.EventHand for { msgs, err := es.client.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: group, - Consumer: es.consumer, - Streams: []string{es.stream, ">"}, + Consumer: cfg.Consumer, + Streams: []string{cfg.Stream, ">"}, Count: eventCount, }).Result() if err != nil { @@ -84,7 +79,7 @@ func (es *subEventStore) Subscribe(ctx context.Context, handler events.EventHand continue } - es.handle(ctx, msgs[0].Messages, handler) + es.handle(ctx, cfg.Stream, msgs[0].Messages, cfg.Handler) } }() @@ -103,7 +98,7 @@ func (re redisEvent) Encode() (map[string]interface{}, error) { return re.Data, nil } -func (es *subEventStore) handle(ctx context.Context, msgs []redis.XMessage, h events.EventHandler) { +func (es *subEventStore) handle(ctx context.Context, stream string, msgs []redis.XMessage, h events.EventHandler) { for _, msg := range msgs { event := redisEvent{ Data: msg.Values, @@ -115,7 +110,7 @@ func (es *subEventStore) handle(ctx context.Context, msgs []redis.XMessage, h ev return } - if err := es.client.XAck(ctx, es.stream, group, msg.ID).Err(); err != nil { + if err := es.client.XAck(ctx, stream, group, msg.ID).Err(); err != nil { es.logger.Warn(fmt.Sprintf("failed to ack redis event: %s", err)) return diff --git a/pkg/events/store/brokers_nats.go b/pkg/events/store/store_nats.go similarity index 76% rename from pkg/events/store/brokers_nats.go rename to pkg/events/store/store_nats.go index 282fc6ad5e..e344253bb2 100644 --- a/pkg/events/store/brokers_nats.go +++ b/pkg/events/store/store_nats.go @@ -28,8 +28,8 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er return pb, nil } -func NewSubscriber(ctx context.Context, url, stream, consumer string, logger *slog.Logger) (events.Subscriber, error) { - pb, err := nats.NewSubscriber(ctx, url, stream, consumer, logger) +func NewSubscriber(ctx context.Context, url string, logger *slog.Logger) (events.Subscriber, error) { + pb, err := nats.NewSubscriber(ctx, url, logger) if err != nil { return nil, err } diff --git a/pkg/events/store/brokers_rabbitmq.go b/pkg/events/store/store_rabbitmq.go similarity index 76% rename from pkg/events/store/brokers_rabbitmq.go rename to pkg/events/store/store_rabbitmq.go index bf895502f2..0af15e0d70 100644 --- a/pkg/events/store/brokers_rabbitmq.go +++ b/pkg/events/store/store_rabbitmq.go @@ -28,8 +28,8 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er return pb, nil } -func NewSubscriber(_ context.Context, url, stream, consumer string, logger *slog.Logger) (events.Subscriber, error) { - pb, err := rabbitmq.NewSubscriber(url, stream, consumer, logger) +func NewSubscriber(_ context.Context, url string, logger *slog.Logger) (events.Subscriber, error) { + pb, err := rabbitmq.NewSubscriber(url, logger) if err != nil { return nil, err } diff --git a/pkg/events/store/brokers_redis.go b/pkg/events/store/store_redis.go similarity index 78% rename from pkg/events/store/brokers_redis.go rename to pkg/events/store/store_redis.go index 711310123e..136d01b794 100644 --- a/pkg/events/store/brokers_redis.go +++ b/pkg/events/store/store_redis.go @@ -28,8 +28,8 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er return pb, nil } -func NewSubscriber(_ context.Context, url, stream, consumer string, logger *slog.Logger) (events.Subscriber, error) { - pb, err := redis.NewSubscriber(url, stream, consumer, logger) +func NewSubscriber(_ context.Context, url string, logger *slog.Logger) (events.Subscriber, error) { + pb, err := redis.NewSubscriber(url, logger) if err != nil { return nil, err } From 0519afd257abe1bb06c7a8acf13cb2e63dff83d0 Mon Sep 17 00:00:00 2001 From: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Date: Wed, 10 Jan 2024 16:18:48 +0300 Subject: [PATCH 2/3] feat(config): Update stream name and topic names The code changes involve updating the "thingsStream" variable and modifying configurations and log messages in multiple files across different directories and packages. The changes include adding "events." to the stream name, updating topic names, and modifying log messages for handling NATS and RabbitMQ events. Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> --- cmd/bootstrap/main.go | 2 +- cmd/lora/main.go | 2 +- cmd/opcua/main.go | 2 +- pkg/events/mocks/subscriber.go | 10 +- pkg/events/nats/publisher_test.go | 181 +++++++++++------------ pkg/events/nats/subscriber.go | 8 +- pkg/events/rabbitmq/publisher_test.go | 186 ++++++++++++------------ pkg/events/rabbitmq/subscriber.go | 4 +- pkg/events/redis/publisher.go | 3 - pkg/events/redis/publisher_test.go | 197 +++++++++++++------------- pkg/events/redis/setup_test.go | 3 - pkg/events/redis/subscriber.go | 5 +- 12 files changed, 303 insertions(+), 300 deletions(-) diff --git a/cmd/bootstrap/main.go b/cmd/bootstrap/main.go index 7fe956f30a..61f56096fc 100644 --- a/cmd/bootstrap/main.go +++ b/cmd/bootstrap/main.go @@ -46,7 +46,7 @@ const ( defDB = "bootstrap" defSvcHTTPPort = "9013" - thingsStream = "magistrala.things" + thingsStream = "events.magistrala.things" streamID = "magistrala.bootstrap" ) diff --git a/cmd/lora/main.go b/cmd/lora/main.go index ed45c3721a..78105c3e29 100644 --- a/cmd/lora/main.go +++ b/cmd/lora/main.go @@ -45,7 +45,7 @@ const ( thingsRMPrefix = "thing" channelsRMPrefix = "channel" connsRMPrefix = "connection" - thingsStream = "magistrala.things" + thingsStream = "events.magistrala.things" ) type config struct { diff --git a/cmd/opcua/main.go b/cmd/opcua/main.go index f38da90330..54607d0dd9 100644 --- a/cmd/opcua/main.go +++ b/cmd/opcua/main.go @@ -44,7 +44,7 @@ const ( channelsRMPrefix = "channel" connectionRMPrefix = "connection" - thingsStream = "magistrala.things" + thingsStream = "events.magistrala.things" ) type config struct { diff --git a/pkg/events/mocks/subscriber.go b/pkg/events/mocks/subscriber.go index 38d4002387..4de1ceb50f 100644 --- a/pkg/events/mocks/subscriber.go +++ b/pkg/events/mocks/subscriber.go @@ -34,17 +34,17 @@ func (_m *Subscriber) Close() error { return r0 } -// Subscribe provides a mock function with given fields: ctx, handler -func (_m *Subscriber) Subscribe(ctx context.Context, handler events.EventHandler) error { - ret := _m.Called(ctx, handler) +// Subscribe provides a mock function with given fields: ctx, cfg +func (_m *Subscriber) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error { + ret := _m.Called(ctx, cfg) if len(ret) == 0 { panic("no return value specified for Subscribe") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, events.EventHandler) error); ok { - r0 = rf(ctx, handler) + if rf, ok := ret.Get(0).(func(context.Context, events.SubscriberConfig) error); ok { + r0 = rf(ctx, cfg) } else { r0 = ret.Error(0) } diff --git a/pkg/events/nats/publisher_test.go b/pkg/events/nats/publisher_test.go index 30f14cab39..a53d855d0a 100644 --- a/pkg/events/nats/publisher_test.go +++ b/pkg/events/nats/publisher_test.go @@ -19,11 +19,10 @@ import ( ) var ( - streamTopic = "test-topic" - eventsChan = make(chan map[string]interface{}) - logger = mglog.NewMock() - errFailed = errors.New("failed") - numEvents = 100 + eventsChan = make(chan map[string]interface{}) + logger = mglog.NewMock() + errFailed = errors.New("failed") + numEvents = 100 ) type testEvent struct { @@ -56,15 +55,17 @@ func TestPublish(t *testing.T) { publisher, err := nats.NewPublisher(context.Background(), natsURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer publisher.Close() _, err = nats.NewSubscriber(context.Background(), "http://invaliurl.com", logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer subcriber.Close() cfg := events.SubscriberConfig{ - Stream: stream, + Stream: "events." + stream, Consumer: consumer, Handler: handler{}, } @@ -129,113 +130,117 @@ func TestPublish(t *testing.T) { } for _, tc := range cases { - event := testEvent{Data: tc.event} - - err := publisher.Publish(context.Background(), event) - switch tc.err { - case nil: - receivedEvent := <-eventsChan - - val := int64(receivedEvent["occurred_at"].(float64)) - if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { - delete(receivedEvent, "occurred_at") - delete(tc.event, "occurred_at") + t.Run(tc.desc, func(t *testing.T) { + event := testEvent{Data: tc.event} + + err := publisher.Publish(context.Background(), event) + switch tc.err { + case nil: + receivedEvent := <-eventsChan + + val := int64(receivedEvent["occurred_at"].(float64)) + if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { + delete(receivedEvent, "occurred_at") + delete(tc.event, "occurred_at") + } + + assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, tc.event["status"], receivedEvent["status"]) + assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) + + default: + assert.ErrorContains(t, err, tc.err.Error()) } - - assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) - assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) - assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) - assert.Equal(t, tc.event["status"], receivedEvent["status"]) - assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) - assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) - default: - assert.ErrorContains(t, err, tc.err.Error()) - } + }) } } func TestPubsub(t *testing.T) { - subcases := []struct { - desc string - stream string - consumer string - errorMessage error - handler events.EventHandler + cases := []struct { + desc string + stream string + consumer string + err error + handler events.EventHandler }{ { - desc: "Subscribe to a stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to a stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to the same stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to the same stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with an empty consumer", - stream: "", - consumer: "", - errorMessage: nats.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + err: nats.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with a valid consumer", - stream: "", - consumer: consumer, - errorMessage: nats.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + err: nats.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to a valid stream with an empty consumer", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: "", - errorMessage: nats.ErrEmptyConsumer, - handler: handler{false}, + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("events.%s", stream), + consumer: "", + err: nats.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to another stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic+"1"), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to another stream", + stream: fmt.Sprintf("events.%s.%d", stream, 1), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to a stream with malformed handler", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{true}, + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{true}, }, } - for _, pc := range subcases { - subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger) - if err != nil { - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) + for _, pc := range cases { + t.Run(pc.desc, func(t *testing.T) { + subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger) + if err != nil { + assert.Equal(t, err, pc.err) - continue - } - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + return + } - cfg := events.SubscriberConfig{ - Stream: pc.stream, - Consumer: pc.consumer, - Handler: pc.handler, - } - switch err := subcriber.Subscribe(context.Background(), cfg); { - case err == nil: - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - default: - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - } + cfg := events.SubscriberConfig{ + Stream: pc.stream, + Consumer: pc.consumer, + Handler: pc.handler, + } + switch err := subcriber.Subscribe(context.Background(), cfg); { + case err == nil: + assert.Nil(t, err) + default: + assert.Equal(t, err, pc.err) + } - err = subcriber.Close() - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + err = subcriber.Close() + assert.Nil(t, err) + }) } } diff --git a/pkg/events/nats/subscriber.go b/pkg/events/nats/subscriber.go index 8df9607077..182f426023 100644 --- a/pkg/events/nats/subscriber.go +++ b/pkg/events/nats/subscriber.go @@ -18,9 +18,7 @@ import ( "github.com/nats-io/nats.go/jetstream" ) -const ( - maxReconnects = -1 -) +const maxReconnects = -1 var _ events.Subscriber = (*subEventStore)(nil) @@ -88,7 +86,7 @@ func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberCon subCfg := messaging.SubscriberConfig{ ID: cfg.Consumer, - Topic: eventsPrefix + "." + cfg.Stream, + Topic: cfg.Stream, Handler: &eventHandler{ handler: cfg.Handler, ctx: ctx, @@ -129,7 +127,7 @@ func (eh *eventHandler) Handle(msg *messaging.Message) error { } if err := eh.handler.Handle(eh.ctx, event); err != nil { - eh.logger.Warn(fmt.Sprintf("failed to handle redis event: %s", err)) + eh.logger.Warn(fmt.Sprintf("failed to handle nats event: %s", err)) } return nil diff --git a/pkg/events/rabbitmq/publisher_test.go b/pkg/events/rabbitmq/publisher_test.go index e8a2c65de8..15763ac1cb 100644 --- a/pkg/events/rabbitmq/publisher_test.go +++ b/pkg/events/rabbitmq/publisher_test.go @@ -19,11 +19,10 @@ import ( ) var ( - streamTopic = "test-topic" - eventsChan = make(chan map[string]interface{}) - logger = mglog.NewMock() - errFailed = errors.New("failed") - numEvents = 100 + eventsChan = make(chan map[string]interface{}) + logger = mglog.NewMock() + errFailed = errors.New("failed") + numEvents = 100 ) type testEvent struct { @@ -56,15 +55,17 @@ func TestPublish(t *testing.T) { publisher, err := rabbitmq.NewPublisher(context.Background(), rabbitmqURL, stream) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer publisher.Close() _, err = rabbitmq.NewSubscriber("http://invaliurl.com", logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer subcriber.Close() cfg := events.SubscriberConfig{ - Stream: stream, + Stream: "events." + stream, Consumer: consumer, Handler: handler{}, } @@ -128,115 +129,120 @@ func TestPublish(t *testing.T) { }, } - for _, tc := range cases { - event := testEvent{Data: tc.event} - - err := publisher.Publish(context.Background(), event) - switch tc.err { - case nil: - receivedEvent := <-eventsChan - - val := int64(receivedEvent["occurred_at"].(float64)) - if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { - delete(receivedEvent, "occurred_at") - delete(tc.event, "occurred_at") + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + event := testEvent{Data: c.event} + + err := publisher.Publish(context.Background(), event) + switch c.err { + case nil: + receivedEvent := <-eventsChan + + val := int64(receivedEvent["occurred_at"].(float64)) + if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { + delete(receivedEvent, "occurred_at") + delete(c.event, "occurred_at") + } + + assert.Equal(t, c.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, c.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, c.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, c.event["status"], receivedEvent["status"]) + assert.Equal(t, c.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, c.event["operation"], receivedEvent["operation"]) + + default: + assert.ErrorContains(t, err, c.err.Error()) } - - assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) - assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) - assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) - assert.Equal(t, tc.event["status"], receivedEvent["status"]) - assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) - assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) - - default: - assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err)) - } + }) } } func TestPubsub(t *testing.T) { - subcases := []struct { - desc string - stream string - consumer string - errorMessage error - handler events.EventHandler + cases := []struct { + desc string + stream string + consumer string + err error + handler events.EventHandler }{ { - desc: "Subscribe to a stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to a stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to the same stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to the same stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with an empty consumer", - stream: "", - consumer: "", - errorMessage: rabbitmq.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + err: rabbitmq.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with a valid consumer", - stream: "", - consumer: consumer, - errorMessage: rabbitmq.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + err: rabbitmq.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to a valid stream with an empty consumer", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: "", - errorMessage: rabbitmq.ErrEmptyConsumer, - handler: handler{false}, + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("events.%s", stream), + consumer: "", + err: rabbitmq.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to another stream", - stream: fmt.Sprintf("%s.%s", stream, streamTopic+"1"), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to another stream", + stream: fmt.Sprintf("events.%s.%d", stream, 1), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to a stream with malformed handler", - stream: fmt.Sprintf("%s.%s", stream, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{true}, + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{true}, }, } - for _, pc := range subcases { - subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger) - if err != nil { - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger) + if err != nil { + assert.Equal(t, err, c.err) - continue - } - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + return + } - cfg := events.SubscriberConfig{ - Stream: pc.stream, - Consumer: pc.consumer, - Handler: pc.handler, - } - switch err := subcriber.Subscribe(context.Background(), cfg); { - case err == nil: - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - default: - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - } + assert.Nil(t, err) + + cfg := events.SubscriberConfig{ + Stream: c.stream, + Consumer: c.consumer, + Handler: c.handler, + } + switch err := subcriber.Subscribe(context.Background(), cfg); { + case err == nil: + assert.Nil(t, err) + default: + assert.Equal(t, err, c.err) + } - err = subcriber.Close() - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + err = subcriber.Close() + assert.Nil(t, err) + }) } } diff --git a/pkg/events/rabbitmq/subscriber.go b/pkg/events/rabbitmq/subscriber.go index 4c78ad88d0..dab686c103 100644 --- a/pkg/events/rabbitmq/subscriber.go +++ b/pkg/events/rabbitmq/subscriber.go @@ -70,7 +70,7 @@ func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberCon subCfg := messaging.SubscriberConfig{ ID: cfg.Consumer, - Topic: eventsPrefix + "." + cfg.Stream, + Topic: cfg.Stream, Handler: &eventHandler{ handler: cfg.Handler, ctx: ctx, @@ -111,7 +111,7 @@ func (eh *eventHandler) Handle(msg *messaging.Message) error { } if err := eh.handler.Handle(eh.ctx, event); err != nil { - eh.logger.Warn(fmt.Sprintf("failed to handle redis event: %s", err)) + eh.logger.Warn(fmt.Sprintf("failed to handle rabbitmq event: %s", err)) } return nil diff --git a/pkg/events/redis/publisher.go b/pkg/events/redis/publisher.go index a6f5c90aca..57f74758c3 100644 --- a/pkg/events/redis/publisher.go +++ b/pkg/events/redis/publisher.go @@ -1,9 +1,6 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build !nats && !rabbitmq -// +build !nats,!rabbitmq - package redis import ( diff --git a/pkg/events/redis/publisher_test.go b/pkg/events/redis/publisher_test.go index cad5fd7a89..2b503e7372 100644 --- a/pkg/events/redis/publisher_test.go +++ b/pkg/events/redis/publisher_test.go @@ -1,9 +1,6 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build !nats && !rabbitmq -// +build !nats,!rabbitmq - package redis_test import ( @@ -23,13 +20,13 @@ import ( ) var ( - streamName = "magistrala.eventstest" - consumer = "test-consumer" - streamTopic = "test-topic" - eventsChan = make(chan map[string]interface{}) - logger = mglog.NewMock() - errFailed = errors.New("failed") - numEvents = 100 + streamName = "magistrala.eventstest" + consumer = "test-consumer" + eventsChan = make(chan map[string]interface{}) + logger = mglog.NewMock() + errFailed = errors.New("failed") + numEvents = 100 + stream = "tests.events" ) type testEvent struct { @@ -65,15 +62,17 @@ func TestPublish(t *testing.T) { publisher, err := redis.NewPublisher(context.Background(), redisURL, streamName, events.UnpublishedEventsCheckInterval) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer publisher.Close() _, err = redis.NewSubscriber("http://invaliurl.com", logger) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) subcriber, err := redis.NewSubscriber(redisURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) + defer subcriber.Close() cfg := events.SubscriberConfig{ - Stream: streamName, + Stream: "events." + stream, Consumer: consumer, Handler: handler{}, } @@ -137,31 +136,35 @@ func TestPublish(t *testing.T) { }, } - for _, tc := range cases { - event := testEvent{Data: tc.event} - - err := publisher.Publish(context.Background(), event) - switch tc.err { - case nil: - receivedEvent := <-eventsChan - - roa, err := strconv.ParseInt(receivedEvent["occurred_at"].(string), 10, 64) - assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err)) - if assert.WithinRange(t, time.Unix(0, roa), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { - delete(receivedEvent, "occurred_at") - delete(tc.event, "occurred_at") + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + event := testEvent{Data: c.event} + + err := publisher.Publish(context.Background(), event) + switch c.err { + case nil: + assert.Nil(t, err) + + receivedEvent := <-eventsChan + + roa, err := strconv.ParseInt(receivedEvent["occurred_at"].(string), 10, 64) + assert.Nil(t, err) + if assert.WithinRange(t, time.Unix(0, roa), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { + delete(receivedEvent, "occurred_at") + delete(c.event, "occurred_at") + } + + assert.Equal(t, c.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, c.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, c.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, c.event["status"], receivedEvent["status"]) + assert.Equal(t, c.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, c.event["operation"], receivedEvent["operation"]) + + default: + assert.ErrorContains(t, err, c.err.Error()) } - - assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) - assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) - assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) - assert.Equal(t, tc.event["status"], receivedEvent["status"]) - assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) - assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) - - default: - assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err)) - } + }) } } @@ -169,88 +172,88 @@ func TestPubsub(t *testing.T) { err := redisClient.FlushAll(context.Background()).Err() assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err)) - subcases := []struct { - desc string - stream string - consumer string - errorMessage error - handler events.EventHandler + cases := []struct { + desc string + stream string + consumer string + err error + handler events.EventHandler }{ { - desc: "Subscribe to a stream", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to a stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to the same stream", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to the same stream", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with an empty consumer", - stream: "", - consumer: "", - errorMessage: redis.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with an empty consumer", + stream: "", + consumer: "", + err: redis.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to an empty stream with a valid consumer", - stream: "", - consumer: consumer, - errorMessage: redis.ErrEmptyStream, - handler: handler{false}, + desc: "Subscribe to an empty stream with a valid consumer", + stream: "", + consumer: consumer, + err: redis.ErrEmptyStream, + handler: handler{false}, }, { - desc: "Subscribe to a valid stream with an empty consumer", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic), - consumer: "", - errorMessage: redis.ErrEmptyConsumer, - handler: handler{false}, + desc: "Subscribe to a valid stream with an empty consumer", + stream: fmt.Sprintf("events.%s", stream), + consumer: "", + err: redis.ErrEmptyConsumer, + handler: handler{false}, }, { - desc: "Subscribe to another stream", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic+"1"), - consumer: consumer, - errorMessage: nil, - handler: handler{false}, + desc: "Subscribe to another stream", + stream: fmt.Sprintf("events.%s.%d", stream, 1), + consumer: consumer, + err: nil, + handler: handler{false}, }, { - desc: "Subscribe to a stream with malformed handler", - stream: fmt.Sprintf("%s.%s", streamName, streamTopic), - consumer: consumer, - errorMessage: nil, - handler: handler{true}, + desc: "Subscribe to a stream with malformed handler", + stream: fmt.Sprintf("events.%s", stream), + consumer: consumer, + err: nil, + handler: handler{true}, }, } - for _, pc := range subcases { - subcriber, err := redis.NewSubscriber(redisURL, logger) - if err != nil { - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - - continue - } + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + subcriber, err := redis.NewSubscriber(redisURL, logger) + if err != nil { + assert.Equal(t, err, c.err) - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + return + } - cfg := events.SubscriberConfig{ - Stream: pc.stream, - Consumer: pc.consumer, - Handler: pc.handler, - } - switch err := subcriber.Subscribe(context.Background(), cfg); { - case err == nil: - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) - default: - assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err)) - } + cfg := events.SubscriberConfig{ + Stream: c.stream, + Consumer: c.consumer, + Handler: c.handler, + } + switch err := subcriber.Subscribe(context.TODO(), cfg); { + case err == nil: + assert.Nil(t, err) + default: + assert.Equal(t, err, c.err) + } - err = subcriber.Close() - assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) + err = subcriber.Close() + assert.Nil(t, err) + }) } } diff --git a/pkg/events/redis/setup_test.go b/pkg/events/redis/setup_test.go index 541cb2a3d1..719e0996c3 100644 --- a/pkg/events/redis/setup_test.go +++ b/pkg/events/redis/setup_test.go @@ -1,9 +1,6 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build !nats && !rabbitmq -// +build !nats,!rabbitmq - package redis_test import ( diff --git a/pkg/events/redis/subscriber.go b/pkg/events/redis/subscriber.go index ad3ce62f1d..43898075b3 100644 --- a/pkg/events/redis/subscriber.go +++ b/pkg/events/redis/subscriber.go @@ -1,9 +1,6 @@ // Copyright (c) Abstract Machines // SPDX-License-Identifier: Apache-2.0 -//go:build !nats && !rabbitmq -// +build !nats,!rabbitmq - package redis import ( @@ -71,7 +68,7 @@ func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberCon Count: eventCount, }).Result() if err != nil { - es.logger.Warn(fmt.Sprintf("failed to read from Redis stream: %s", err)) + es.logger.Warn(fmt.Sprintf("failed to read from redis stream: %s", err)) continue } From fa6da0a113186cbc5d42b4b2f678bf4212f6b09c Mon Sep 17 00:00:00 2001 From: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> Date: Mon, 12 Feb 2024 22:43:26 +0300 Subject: [PATCH 3/3] fix: event tests Signed-off-by: Rodney Osodo <28790446+rodneyosodo@users.noreply.github.com> --- pkg/events/nats/publisher_test.go | 19 +++++---- pkg/events/nats/subscriber.go | 6 +-- pkg/events/rabbitmq/publisher_test.go | 44 ++++++++++----------- pkg/events/rabbitmq/subscriber.go | 6 +-- pkg/events/redis/publisher.go | 2 +- pkg/events/redis/publisher_test.go | 55 +++++++++++++-------------- pkg/events/redis/subscriber.go | 13 ++++--- 7 files changed, 70 insertions(+), 75 deletions(-) diff --git a/pkg/events/nats/publisher_test.go b/pkg/events/nats/publisher_test.go index a53d855d0a..20086ea552 100644 --- a/pkg/events/nats/publisher_test.go +++ b/pkg/events/nats/publisher_test.go @@ -150,7 +150,6 @@ func TestPublish(t *testing.T) { assert.Equal(t, tc.event["status"], receivedEvent["status"]) assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) - default: assert.ErrorContains(t, err, tc.err.Error()) } @@ -184,7 +183,7 @@ func TestPubsub(t *testing.T) { desc: "Subscribe to an empty stream with an empty consumer", stream: "", consumer: "", - err: nats.ErrEmptyConsumer, + err: nats.ErrEmptyStream, handler: handler{false}, }, { @@ -217,25 +216,25 @@ func TestPubsub(t *testing.T) { }, } - for _, pc := range cases { - t.Run(pc.desc, func(t *testing.T) { + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { subcriber, err := nats.NewSubscriber(context.Background(), natsURL, logger) if err != nil { - assert.Equal(t, err, pc.err) + assert.Equal(t, err, tc.err) return } cfg := events.SubscriberConfig{ - Stream: pc.stream, - Consumer: pc.consumer, - Handler: pc.handler, + Stream: tc.stream, + Consumer: tc.consumer, + Handler: tc.handler, } switch err := subcriber.Subscribe(context.Background(), cfg); { case err == nil: assert.Nil(t, err) default: - assert.Equal(t, err, pc.err) + assert.Equal(t, err, tc.err) } err = subcriber.Close() @@ -252,7 +251,7 @@ func TestUnavailablePublish(t *testing.T) { assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) cfg := events.SubscriberConfig{ - Stream: stream, + Stream: "events." + stream, Consumer: consumer, Handler: handler{}, } diff --git a/pkg/events/nats/subscriber.go b/pkg/events/nats/subscriber.go index 182f426023..ca99f83123 100644 --- a/pkg/events/nats/subscriber.go +++ b/pkg/events/nats/subscriber.go @@ -77,12 +77,12 @@ func NewSubscriber(ctx context.Context, url string, logger *slog.Logger) (events } func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error { - if cfg.Consumer == "" { - return ErrEmptyConsumer - } if cfg.Stream == "" { return ErrEmptyStream } + if cfg.Consumer == "" { + return ErrEmptyConsumer + } subCfg := messaging.SubscriberConfig{ ID: cfg.Consumer, diff --git a/pkg/events/rabbitmq/publisher_test.go b/pkg/events/rabbitmq/publisher_test.go index 15763ac1cb..f14534654b 100644 --- a/pkg/events/rabbitmq/publisher_test.go +++ b/pkg/events/rabbitmq/publisher_test.go @@ -129,30 +129,30 @@ func TestPublish(t *testing.T) { }, } - for _, c := range cases { - t.Run(c.desc, func(t *testing.T) { - event := testEvent{Data: c.event} + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + event := testEvent{Data: tc.event} err := publisher.Publish(context.Background(), event) - switch c.err { + switch tc.err { case nil: receivedEvent := <-eventsChan val := int64(receivedEvent["occurred_at"].(float64)) if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { delete(receivedEvent, "occurred_at") - delete(c.event, "occurred_at") + delete(tc.event, "occurred_at") } - assert.Equal(t, c.event["temperature"], receivedEvent["temperature"]) - assert.Equal(t, c.event["humidity"], receivedEvent["humidity"]) - assert.Equal(t, c.event["sensor_id"], receivedEvent["sensor_id"]) - assert.Equal(t, c.event["status"], receivedEvent["status"]) - assert.Equal(t, c.event["timestamp"], receivedEvent["timestamp"]) - assert.Equal(t, c.event["operation"], receivedEvent["operation"]) + assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, tc.event["status"], receivedEvent["status"]) + assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) default: - assert.ErrorContains(t, err, c.err.Error()) + assert.ErrorContains(t, err, tc.err.Error()) } }) } @@ -184,7 +184,7 @@ func TestPubsub(t *testing.T) { desc: "Subscribe to an empty stream with an empty consumer", stream: "", consumer: "", - err: rabbitmq.ErrEmptyConsumer, + err: rabbitmq.ErrEmptyStream, handler: handler{false}, }, { @@ -217,27 +217,25 @@ func TestPubsub(t *testing.T) { }, } - for _, c := range cases { - t.Run(c.desc, func(t *testing.T) { + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { subcriber, err := rabbitmq.NewSubscriber(rabbitmqURL, logger) if err != nil { - assert.Equal(t, err, c.err) + assert.Equal(t, err, tc.err) return } - assert.Nil(t, err) - cfg := events.SubscriberConfig{ - Stream: c.stream, - Consumer: c.consumer, - Handler: c.handler, + Stream: tc.stream, + Consumer: tc.consumer, + Handler: tc.handler, } switch err := subcriber.Subscribe(context.Background(), cfg); { case err == nil: assert.Nil(t, err) default: - assert.Equal(t, err, c.err) + assert.Equal(t, err, tc.err) } err = subcriber.Close() @@ -254,7 +252,7 @@ func TestUnavailablePublish(t *testing.T) { assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) cfg := events.SubscriberConfig{ - Stream: stream, + Stream: "events." + stream, Consumer: consumer, Handler: handler{}, } diff --git a/pkg/events/rabbitmq/subscriber.go b/pkg/events/rabbitmq/subscriber.go index dab686c103..bba6b16317 100644 --- a/pkg/events/rabbitmq/subscriber.go +++ b/pkg/events/rabbitmq/subscriber.go @@ -61,12 +61,12 @@ func NewSubscriber(url string, logger *slog.Logger) (events.Subscriber, error) { } func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error { - if cfg.Consumer == "" { - return ErrEmptyConsumer - } if cfg.Stream == "" { return ErrEmptyStream } + if cfg.Consumer == "" { + return ErrEmptyConsumer + } subCfg := messaging.SubscriberConfig{ ID: cfg.Consumer, diff --git a/pkg/events/redis/publisher.go b/pkg/events/redis/publisher.go index 57f74758c3..e6a29626c0 100644 --- a/pkg/events/redis/publisher.go +++ b/pkg/events/redis/publisher.go @@ -29,7 +29,7 @@ func NewPublisher(ctx context.Context, url, stream string, flushPeriod time.Dura es := &pubEventStore{ client: redis.NewClient(opts), unpublishedEvents: make(chan *redis.XAddArgs, events.MaxUnpublishedEvents), - stream: stream, + stream: eventsPrefix + stream, flushPeriod: flushPeriod, } diff --git a/pkg/events/redis/publisher_test.go b/pkg/events/redis/publisher_test.go index 2b503e7372..ca1320a4b0 100644 --- a/pkg/events/redis/publisher_test.go +++ b/pkg/events/redis/publisher_test.go @@ -20,13 +20,12 @@ import ( ) var ( - streamName = "magistrala.eventstest" + stream = "tests.events" consumer = "test-consumer" eventsChan = make(chan map[string]interface{}) logger = mglog.NewMock() errFailed = errors.New("failed") numEvents = 100 - stream = "tests.events" ) type testEvent struct { @@ -57,10 +56,10 @@ func TestPublish(t *testing.T) { err := redisClient.FlushAll(context.Background()).Err() assert.Nil(t, err, fmt.Sprintf("got unexpected error on flushing redis: %s", err)) - _, err = redis.NewPublisher(context.Background(), "http://invaliurl.com", streamName, events.UnpublishedEventsCheckInterval) + _, err = redis.NewPublisher(context.Background(), "http://invaliurl.com", stream, events.UnpublishedEventsCheckInterval) assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err) - publisher, err := redis.NewPublisher(context.Background(), redisURL, streamName, events.UnpublishedEventsCheckInterval) + publisher, err := redis.NewPublisher(context.Background(), redisURL, stream, events.UnpublishedEventsCheckInterval) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) defer publisher.Close() @@ -136,33 +135,31 @@ func TestPublish(t *testing.T) { }, } - for _, c := range cases { - t.Run(c.desc, func(t *testing.T) { - event := testEvent{Data: c.event} + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + event := testEvent{Data: tc.event} err := publisher.Publish(context.Background(), event) - switch c.err { + switch tc.err { case nil: - assert.Nil(t, err) - receivedEvent := <-eventsChan roa, err := strconv.ParseInt(receivedEvent["occurred_at"].(string), 10, 64) assert.Nil(t, err) if assert.WithinRange(t, time.Unix(0, roa), time.Now().Add(-time.Second), time.Now().Add(time.Second)) { delete(receivedEvent, "occurred_at") - delete(c.event, "occurred_at") + delete(tc.event, "occurred_at") } - assert.Equal(t, c.event["temperature"], receivedEvent["temperature"]) - assert.Equal(t, c.event["humidity"], receivedEvent["humidity"]) - assert.Equal(t, c.event["sensor_id"], receivedEvent["sensor_id"]) - assert.Equal(t, c.event["status"], receivedEvent["status"]) - assert.Equal(t, c.event["timestamp"], receivedEvent["timestamp"]) - assert.Equal(t, c.event["operation"], receivedEvent["operation"]) + assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"]) + assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"]) + assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"]) + assert.Equal(t, tc.event["status"], receivedEvent["status"]) + assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"]) + assert.Equal(t, tc.event["operation"], receivedEvent["operation"]) default: - assert.ErrorContains(t, err, c.err.Error()) + assert.ErrorContains(t, err, tc.err.Error()) } }) } @@ -197,7 +194,7 @@ func TestPubsub(t *testing.T) { desc: "Subscribe to an empty stream with an empty consumer", stream: "", consumer: "", - err: redis.ErrEmptyConsumer, + err: redis.ErrEmptyStream, handler: handler{false}, }, { @@ -230,25 +227,25 @@ func TestPubsub(t *testing.T) { }, } - for _, c := range cases { - t.Run(c.desc, func(t *testing.T) { + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { subcriber, err := redis.NewSubscriber(redisURL, logger) if err != nil { - assert.Equal(t, err, c.err) + assert.Equal(t, err, tc.err) return } cfg := events.SubscriberConfig{ - Stream: c.stream, - Consumer: c.consumer, - Handler: c.handler, + Stream: tc.stream, + Consumer: tc.consumer, + Handler: tc.handler, } - switch err := subcriber.Subscribe(context.TODO(), cfg); { + switch err := subcriber.Subscribe(context.Background(), cfg); { case err == nil: assert.Nil(t, err) default: - assert.Equal(t, err, c.err) + assert.Equal(t, err, tc.err) } err = subcriber.Close() @@ -258,14 +255,14 @@ func TestPubsub(t *testing.T) { } func TestUnavailablePublish(t *testing.T) { - publisher, err := redis.NewPublisher(context.Background(), redisURL, streamName, time.Second) + publisher, err := redis.NewPublisher(context.Background(), redisURL, stream, time.Second) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) subcriber, err := redis.NewSubscriber(redisURL, logger) assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err)) cfg := events.SubscriberConfig{ - Stream: streamName, + Stream: "events." + stream, Consumer: consumer, Handler: handler{}, } diff --git a/pkg/events/redis/subscriber.go b/pkg/events/redis/subscriber.go index 43898075b3..910ecca348 100644 --- a/pkg/events/redis/subscriber.go +++ b/pkg/events/redis/subscriber.go @@ -14,9 +14,10 @@ import ( ) const ( - eventCount = 100 - exists = "BUSYGROUP Consumer Group name already exists" - group = "magistrala" + eventsPrefix = "events." + eventCount = 100 + exists = "BUSYGROUP Consumer Group name already exists" + group = "magistrala" ) var _ events.Subscriber = (*subEventStore)(nil) @@ -47,12 +48,12 @@ func NewSubscriber(url string, logger *slog.Logger) (events.Subscriber, error) { } func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberConfig) error { - if cfg.Consumer == "" { - return ErrEmptyConsumer - } if cfg.Stream == "" { return ErrEmptyStream } + if cfg.Consumer == "" { + return ErrEmptyConsumer + } err := es.client.XGroupCreateMkStream(ctx, cfg.Stream, group, "$").Err() if err != nil && err.Error() != exists {