Skip to content

Commit

Permalink
update tests to subscribe on wildcard
Browse files Browse the repository at this point in the history
Signed-off-by: rodneyosodo <[email protected]>
  • Loading branch information
rodneyosodo committed Apr 6, 2023
1 parent ed14de1 commit 30820ee
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 35 deletions.
34 changes: 26 additions & 8 deletions pkg/messaging/kafka/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,44 +33,62 @@ func TestPubsub(t *testing.T) {
// require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
cases := []struct {
desc string
channel string
topic string
subtopic string
payload []byte
err error
}{
{
desc: "publish message with empty topic",
topic: "",
err: kafka.ErrEmptyTopic,
payload: data,
},
{
desc: "publish message with nil payload",
topic: channel,
payload: nil,
err: nil,
},
{
desc: "publish message with string payload",
topic: channel,
payload: data,
err: nil,
},
{
desc: "publish message with channel",
desc: "publish message with topic",
payload: data,
channel: channel,
topic: channel,
err: nil,
},
{
desc: "publish message with subtopic",
payload: data,
subtopic: subtopic,
err: kafka.ErrEmptyTopic,
},
{
desc: "publish message with channel and subtopic",
desc: "publish message with topic and subtopic",
payload: data,
channel: channel,
topic: channel,
subtopic: subtopic,
err: nil,
},
}

for _, tc := range cases {
expectedMsg := messaging.Message{
Channel: tc.channel,
Subtopic: tc.subtopic,
Payload: tc.payload,
}
err := publisher.Publish(topic, &expectedMsg)
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
err := publisher.Publish(tc.topic, &expectedMsg)
if tc.err == nil {
require.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", tc.desc, err))
} else {
assert.Equal(t, err, tc.err)
}

}

// Test Subscribe and Unsubscribe
Expand Down
40 changes: 23 additions & 17 deletions pkg/messaging/nats/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,56 +28,62 @@ var (
)

func TestPublisher(t *testing.T) {
err := pubsub.Subscribe(clientID, fmt.Sprintf("%s.%s", chansPrefix, topic), handler{})
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))
err = pubsub.Subscribe(clientID, fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic), handler{})
err := pubsub.Subscribe(clientID, fmt.Sprintf("%s.>", chansPrefix), handler{})
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

cases := []struct {
desc string
channel string
topic string
subtopic string
payload []byte
err error
}{
{
desc: "publish message with nil payload",
topic: channel,
payload: nil,
err: nil,
},
{
desc: "publish message with string payload",
desc: "publish message with empty topic",
topic: "",
payload: data,
err: nats.ErrEmptyTopic,
},
{
desc: "publish message with channel",
desc: "publish message with string payload",
topic: channel,
payload: data,
channel: channel,
err: nil,
},
{
desc: "publish message with subtopic",
payload: data,
subtopic: subtopic,
err: nats.ErrEmptyTopic,
},
{
desc: "publish message with channel and subtopic",
desc: "publish message with topic and subtopic",
payload: data,
channel: channel,
topic: channel,
subtopic: subtopic,
err: nil,
},
}

for _, tc := range cases {
expectedMsg := messaging.Message{
Channel: tc.channel,
Subtopic: tc.subtopic,
Payload: tc.payload,
}
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

err = pubsub.Publish(topic, &expectedMsg)
require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

receivedMsg := <-msgChan
assert.Equal(t, expectedMsg.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg.Payload, receivedMsg.Payload))
err = pubsub.Publish(tc.topic, &expectedMsg)
if tc.err == nil {
require.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", tc.desc, err))
receivedMsg := <-msgChan
assert.Equal(t, expectedMsg.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg.Payload, receivedMsg.Payload))
} else {
assert.Equal(t, err, tc.err)
}
}
}

Expand Down
19 changes: 9 additions & 10 deletions pkg/messaging/rabbitmq/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,8 @@ func TestPublisher(t *testing.T) {
conn, ch, err := newConn()
assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err))

topicChan := subscribe(t, ch, fmt.Sprintf("%s.%s", chansPrefix, topic))
subtopicChan := subscribe(t, ch, fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic))

topicChan := subscribe(t, ch, fmt.Sprintf("%s.#", chansPrefix))
go rabbitHandler(topicChan, handler{})
go rabbitHandler(subtopicChan, handler{})

t.Cleanup(func() {
conn.Close()
Expand Down Expand Up @@ -78,15 +75,17 @@ func TestPublisher(t *testing.T) {
for _, tc := range cases {
expectedMsg := messaging.Message{
Publisher: clientID,
Channel: tc.channel,
Subtopic: tc.subtopic,
Payload: tc.payload,
}
err = pubsub.Publish(topic, &expectedMsg)
assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error: %s", tc.desc, err))

receivedMsg := <-msgChan
assert.Equal(t, expectedMsg.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg.Payload, receivedMsg.Payload))
err := publisher.Publish(tc.topic, &expectedMsg)
if tc.err == nil {
assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error: %s", tc.desc, err))
receivedMsg := <-msgChan
assert.Equal(t, expectedMsg.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg.Payload, receivedMsg.Payload))
} else {
assert.Equal(t, err, tc.err)
}
}
expectedMsg := messaging.Message{
Channel: channel,
Expand Down

0 comments on commit 30820ee

Please sign in to comment.