Skip to content

Commit

Permalink
feat(config): Update stream name and topic names
Browse files Browse the repository at this point in the history
The code changes involve updating the "thingsStream" variable and modifying configurations and log messages in multiple files across different directories and packages. The changes include adding "events." to the stream name, updating topic names, and modifying log messages for handling NATS and RabbitMQ events.

Signed-off-by: Rodney Osodo <[email protected]>
  • Loading branch information
rodneyosodo committed Jan 15, 2024
1 parent 982b158 commit ec560a0
Show file tree
Hide file tree
Showing 15 changed files with 306 additions and 427 deletions.
2 changes: 1 addition & 1 deletion cmd/bootstrap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (
defDB = "bootstrap"
defSvcHTTPPort = "9013"

thingsStream = "magistrala.things"
thingsStream = "events.magistrala.things"
streamID = "magistrala.bootstrap"
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/lora/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
thingsRMPrefix = "thing"
channelsRMPrefix = "channel"
connsRMPrefix = "connection"
thingsStream = "magistrala.things"
thingsStream = "events.magistrala.things"
)

type config struct {
Expand Down
2 changes: 1 addition & 1 deletion cmd/opcua/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
channelsRMPrefix = "channel"
connectionRMPrefix = "connection"

thingsStream = "magistrala.things"
thingsStream = "events.magistrala.things"
)

type config struct {
Expand Down
10 changes: 5 additions & 5 deletions pkg/events/mocks/subscriber.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions pkg/events/nats/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ func NewPublisher(ctx context.Context, url, stream string) (events.Publisher, er
stream: stream,
}

go es.StartPublishingRoutine(ctx)

return es, nil
}

Expand All @@ -74,15 +72,6 @@ func (es *pubEventStore) Publish(ctx context.Context, event events.Event) error
return es.publisher.Publish(ctx, es.stream, record)
}

func (es *pubEventStore) StartPublishingRoutine(ctx context.Context) {
// Nats doesn't need to check for unpublished events
// since the events are published to a buffer.
// The buffer is flushed when the connection is reestablished.
// https://docs.nats.io/using-nats/developer/connecting/reconnect/buffer

<-ctx.Done()
}

func (es *pubEventStore) Close() error {
es.conn.Close()

Expand Down
231 changes: 92 additions & 139 deletions pkg/events/nats/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ import (
)

var (
streamTopic = "test-topic"
eventsChan = make(chan map[string]interface{})
logger = mglog.NewMock()
errFailed = errors.New("failed")
eventsChan = make(chan map[string]interface{})
logger = mglog.NewMock()
errFailed = errors.New("failed")
)

type testEvent struct {
Expand Down Expand Up @@ -52,15 +51,17 @@ func (te testEvent) Encode() (map[string]interface{}, error) {
func TestPublish(t *testing.T) {
publisher, err := nats.NewPublisher(ctx, natsURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))
defer publisher.Close()

_, err = nats.NewSubscriber(ctx, "http://invaliurl.com", logger)
assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err)

subcriber, err := nats.NewSubscriber(ctx, natsURL, logger)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))
defer subcriber.Close()

cfg := events.SubscriberConfig{
Stream: stream,
Stream: "events." + stream,
Consumer: consumer,
Handler: handler{},
}
Expand Down Expand Up @@ -125,165 +126,117 @@ func TestPublish(t *testing.T) {
}

for _, tc := range cases {
event := testEvent{Data: tc.event}

err := publisher.Publish(ctx, event)
switch tc.err {
case nil:
assert.Nil(t, err, fmt.Sprintf("%s - got unexpected error: %s", tc.desc, err))

receivedEvent := <-eventsChan

val := int64(receivedEvent["occurred_at"].(float64))
if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) {
delete(receivedEvent, "occurred_at")
delete(tc.event, "occurred_at")
}

assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"], fmt.Sprintf("%s - expected temperature: %s, got: %s", tc.desc, tc.event["temperature"], receivedEvent["temperature"]))
assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"], fmt.Sprintf("%s - expected humidity: %s, got: %s", tc.desc, tc.event["humidity"], receivedEvent["humidity"]))
assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"], fmt.Sprintf("%s - expected sensor_id: %s, got: %s", tc.desc, tc.event["sensor_id"], receivedEvent["sensor_id"]))
assert.Equal(t, tc.event["status"], receivedEvent["status"], fmt.Sprintf("%s - expected status: %s, got: %s", tc.desc, tc.event["status"], receivedEvent["status"]))
assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"], fmt.Sprintf("%s - expected timestamp: %s, got: %s", tc.desc, tc.event["timestamp"], receivedEvent["timestamp"]))
assert.Equal(t, tc.event["operation"], receivedEvent["operation"], fmt.Sprintf("%s - expected operation: %s, got: %s", tc.desc, tc.event["operation"], receivedEvent["operation"]))

default:
assert.ErrorContains(t, err, tc.err.Error(), fmt.Sprintf("%s - expected error: %s", tc.desc, tc.err))
}
}
}

func TestUnavailablePublish(t *testing.T) {
_, err := nats.NewPublisher(ctx, "http://invaliurl.com", stream)
assert.NotNilf(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err), err)

publisher, err := nats.NewPublisher(ctx, natsURL, stream)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on creating event store: %s", err))

err = pool.Client.PauseContainer(container.Container.ID)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on pausing container: %s", err))

spawnGoroutines(publisher, t)

err = pool.Client.UnpauseContainer(container.Container.ID)
assert.Nil(t, err, fmt.Sprintf("got unexpected error on unpausing container: %s", err))

// Wait for the events to be published.
time.Sleep(events.UnpublishedEventsCheckInterval)

err = publisher.Close()
assert.Nil(t, err, fmt.Sprintf("got unexpected error on closing publisher: %s", err))
}

func generateRandomEvent() testEvent {
return testEvent{
Data: map[string]interface{}{
"temperature": fmt.Sprintf("%f", rand.Float64()),
"humidity": fmt.Sprintf("%f", rand.Float64()),
"sensor_id": fmt.Sprintf("%d", rand.Intn(1000)),
"location": fmt.Sprintf("%f", rand.Float64()),
"status": fmt.Sprintf("%d", rand.Intn(1000)),
"timestamp": fmt.Sprintf("%d", time.Now().UnixNano()),
"operation": "create",
},
}
}

func spawnGoroutines(publisher events.Publisher, t *testing.T) {
for i := 0; i < 1e4; i++ {
go func() {
for i := 0; i < 10; i++ {
event := generateRandomEvent()
err := publisher.Publish(ctx, event)
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
t.Run(tc.desc, func(t *testing.T) {
event := testEvent{Data: tc.event}

err := publisher.Publish(ctx, event)
switch tc.err {
case nil:
receivedEvent := <-eventsChan

val := int64(receivedEvent["occurred_at"].(float64))
if assert.WithinRange(t, time.Unix(0, val), time.Now().Add(-time.Second), time.Now().Add(time.Second)) {
delete(receivedEvent, "occurred_at")
delete(tc.event, "occurred_at")
}

assert.Equal(t, tc.event["temperature"], receivedEvent["temperature"])
assert.Equal(t, tc.event["humidity"], receivedEvent["humidity"])
assert.Equal(t, tc.event["sensor_id"], receivedEvent["sensor_id"])
assert.Equal(t, tc.event["status"], receivedEvent["status"])
assert.Equal(t, tc.event["timestamp"], receivedEvent["timestamp"])
assert.Equal(t, tc.event["operation"], receivedEvent["operation"])

default:
assert.ErrorContains(t, err, tc.err.Error())
}
}()
})
}
}

func TestPubsub(t *testing.T) {
subcases := []struct {
desc string
stream string
consumer string
errorMessage error
handler events.EventHandler
cases := []struct {
desc string
stream string
consumer string
err error
handler events.EventHandler
}{
{
desc: "Subscribe to a stream",
stream: fmt.Sprintf("%s.%s", stream, streamTopic),
consumer: consumer,
errorMessage: nil,
handler: handler{false},
desc: "Subscribe to a stream",
stream: fmt.Sprintf("events.%s", stream),
consumer: consumer,
err: nil,
handler: handler{false},
},
{
desc: "Subscribe to the same stream",
stream: fmt.Sprintf("%s.%s", stream, streamTopic),
consumer: consumer,
errorMessage: nil,
handler: handler{false},
desc: "Subscribe to the same stream",
stream: fmt.Sprintf("events.%s", stream),
consumer: consumer,
err: nil,
handler: handler{false},
},
{
desc: "Subscribe to an empty stream with an empty consumer",
stream: "",
consumer: "",
errorMessage: nats.ErrEmptyStream,
handler: handler{false},
desc: "Subscribe to an empty stream with an empty consumer",
stream: "",
consumer: "",
err: nats.ErrEmptyConsumer,
handler: handler{false},
},
{
desc: "Subscribe to an empty stream with a valid consumer",
stream: "",
consumer: consumer,
errorMessage: nats.ErrEmptyStream,
handler: handler{false},
desc: "Subscribe to an empty stream with a valid consumer",
stream: "",
consumer: consumer,
err: nats.ErrEmptyStream,
handler: handler{false},
},
{
desc: "Subscribe to a valid stream with an empty consumer",
stream: fmt.Sprintf("%s.%s", stream, streamTopic),
consumer: "",
errorMessage: nats.ErrEmptyConsumer,
handler: handler{false},
desc: "Subscribe to a valid stream with an empty consumer",
stream: fmt.Sprintf("events.%s", stream),
consumer: "",
err: nats.ErrEmptyConsumer,
handler: handler{false},
},
{
desc: "Subscribe to another stream",
stream: fmt.Sprintf("%s.%s", stream, streamTopic+"1"),
consumer: consumer,
errorMessage: nil,
handler: handler{false},
desc: "Subscribe to another stream",
stream: fmt.Sprintf("events.%s.%d", stream, 1),
consumer: consumer,
err: nil,
handler: handler{false},
},
{
desc: "Subscribe to a stream with malformed handler",
stream: fmt.Sprintf("%s.%s", stream, streamTopic),
consumer: consumer,
errorMessage: nil,
handler: handler{true},
desc: "Subscribe to a stream with malformed handler",
stream: fmt.Sprintf("events.%s", stream),
consumer: consumer,
err: nil,
handler: handler{true},
},
}

for _, pc := range subcases {
subcriber, err := nats.NewSubscriber(ctx, natsURL, logger)
if err != nil {
assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err))

continue
}
for _, pc := range cases {
t.Run(pc.desc, func(t *testing.T) {
subcriber, err := nats.NewSubscriber(ctx, natsURL, logger)
if err != nil {
assert.Equal(t, err, pc.err)

assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))
return
}

cfg := events.SubscriberConfig{
Stream: pc.stream,
Consumer: pc.consumer,
Handler: pc.handler,
}
switch err := subcriber.Subscribe(context.TODO(), cfg); {
case err == nil:
assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))
default:
assert.Equal(t, err, pc.errorMessage, fmt.Sprintf("%s got expected error: %s - got: %s", pc.desc, pc.errorMessage, err))
}
cfg := events.SubscriberConfig{
Stream: pc.stream,
Consumer: pc.consumer,
Handler: pc.handler,
}
switch err := subcriber.Subscribe(context.TODO(), cfg); {
case err == nil:
assert.Nil(t, err)
default:
assert.Equal(t, err, pc.err)
}

err = subcriber.Close()
assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))
err = subcriber.Close()
assert.Nil(t, err)
})
}
}

Expand Down
15 changes: 6 additions & 9 deletions pkg/events/nats/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,19 @@ import (
)

var (
natsURL string
stream = "tests.events"
consumer = "tests-consumer"
ctx = context.Background()
pool *dockertest.Pool
container *dockertest.Resource
natsURL string
stream = "tests.events"
consumer = "tests-consumer"
ctx = context.Background()
)

func TestMain(m *testing.M) {
var err error
pool, err = dockertest.NewPool("")
pool, err := dockertest.NewPool("")
if err != nil {
log.Fatalf("Could not connect to docker: %s", err)
}

container, err = pool.RunWithOptions(&dockertest.RunOptions{
container, err := pool.RunWithOptions(&dockertest.RunOptions{
Name: "test-nats-events",
Repository: "nats",
Tag: "2.9.21-alpine",
Expand Down
Loading

0 comments on commit ec560a0

Please sign in to comment.