Skip to content

Commit

Permalink
feat(config): Update stream name and topic names
Browse files Browse the repository at this point in the history
The code changes involve updating the "thingsStream" variable and modifying configurations and log messages in multiple files across different directories and packages. The changes include adding "events." to the stream name, updating topic names, and modifying log messages for handling NATS and RabbitMQ events.

Signed-off-by: Rodney Osodo <[email protected]>
  • Loading branch information
rodneyosodo committed Jan 10, 2024
1 parent 6454963 commit 830eec3
Show file tree
Hide file tree
Showing 10 changed files with 13 additions and 24 deletions.
2 changes: 1 addition & 1 deletion cmd/bootstrap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (
defDB = "bootstrap"
defSvcHTTPPort = "9013"

thingsStream = "magistrala.things"
thingsStream = "events.magistrala.things"
streamID = "magistrala.bootstrap"
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/lora/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
thingsRMPrefix = "thing"
channelsRMPrefix = "channel"
connsRMPrefix = "connection"
thingsStream = "magistrala.things"
thingsStream = "events.magistrala.things"
)

type config struct {
Expand Down
2 changes: 1 addition & 1 deletion cmd/opcua/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
channelsRMPrefix = "channel"
connectionRMPrefix = "connection"

thingsStream = "magistrala.things"
thingsStream = "events.magistrala.things"
)

type config struct {
Expand Down
11 changes: 0 additions & 11 deletions pkg/events/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er
stream: stream,
}

go es.StartPublishingRoutine(ctx)

return es, nil
}

Expand All @@ -74,15 +72,6 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error
return es.publisher.Publish(ctx, es.stream, record)
}

func (es *pubEventStore) StartPublishingRoutine(ctx context.Context) {
// Nats doesn't need to check for unpublished events
// since the events are published to a buffer.
// The buffer is flushed when the connection is reestablished.
// https://docs.nats.io/using-nats/developer/connecting/reconnect/buffer

<-ctx.Done()
}

func (es *pubEventStore) Close() error {
es.conn.Close()

Expand Down
4 changes: 2 additions & 2 deletions pkg/events/nats/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestPublish(t *testing.T) {
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

cfg := events.SubscriberConfig{
Stream: stream,
Stream: "events." + stream,
Consumer: consumer,
Handler: handler{},
}
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestPubsub(t *testing.T) {
assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))

cfg := events.SubscriberConfig{
Stream: pc.stream,
Stream: "events." + pc.stream,
Consumer: pc.consumer,
Handler: pc.handler,
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/events/nats/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberCon

subCfg := messaging.SubscriberConfig{
ID: cfg.Consumer,
Topic: eventsPrefix + "." + cfg.Stream,
Topic: cfg.Stream,
Handler: &eventHandler{
handler: cfg.Handler,
ctx: ctx,
Expand Down Expand Up @@ -129,7 +129,7 @@ func (eh *eventHandler) Handle(msg *messaging.Message) error {
}

if err := eh.handler.Handle(eh.ctx, event); err != nil {
eh.logger.Warn(fmt.Sprintf("failed to handle redis event: %s", err))
eh.logger.Warn(fmt.Sprintf("failed to handle nats event: %s", err))
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/events/rabbitmq/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestPublish(t *testing.T) {
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

cfg := events.SubscriberConfig{
Stream: stream,
Stream: "events." + stream,
Consumer: consumer,
Handler: handler{},
}
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestPubsub(t *testing.T) {
assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))

cfg := events.SubscriberConfig{
Stream: pc.stream,
Stream: "events." + pc.stream,
Consumer: pc.consumer,
Handler: pc.handler,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/events/rabbitmq/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (eh *eventHandler) Handle(msg *messaging.Message) error {
}

if err := eh.handler.Handle(eh.ctx, event); err != nil {
eh.logger.Warn(fmt.Sprintf("failed to handle redis event: %s", err))
eh.logger.Warn(fmt.Sprintf("failed to handle rabbitmq event: %s", err))
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/events/redis/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestPublish(t *testing.T) {
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

cfg := events.SubscriberConfig{
Stream: streamName,
Stream: "events." + streamName,
Consumer: consumer,
Handler: handler{},
}
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestPubsub(t *testing.T) {
assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))

cfg := events.SubscriberConfig{
Stream: pc.stream,
Stream: "events." + pc.stream,
Consumer: pc.consumer,
Handler: pc.handler,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/events/redis/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (es *subEventStore) Subscribe(ctx context.Context, cfg events.SubscriberCon
Count: eventCount,
}).Result()
if err != nil {
es.logger.Warn(fmt.Sprintf("failed to read from Redis stream: %s", err))
es.logger.Warn(fmt.Sprintf("failed to read from redis stream: %s", err))

continue
}
Expand Down

0 comments on commit 830eec3

Please sign in to comment.