Skip to content

Commit

Permalink
add wildcard support to kafka
Browse files Browse the repository at this point in the history
Signed-off-by: rodneyosodo <[email protected]>
  • Loading branch information
rodneyosodo committed Apr 6, 2023
1 parent 30820ee commit 5bf7d40
Showing 1 changed file with 5 additions and 11 deletions.
16 changes: 5 additions & 11 deletions pkg/messaging/kafka/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"strings"
"sync"
"time"

kf "github.com/confluentinc/confluent-kafka-go/v2/kafka"
log "github.com/mainflux/mainflux/logger"
Expand Down Expand Up @@ -70,18 +71,11 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
ps.mu.Lock()
defer ps.mu.Unlock()

// Check topic
s, ok := ps.subscriptions[topic]
switch ok {
case true:
// Check topic ID
if _, ok := s[id]; ok {
return ErrAlreadySubscribed
}
default:
s = make(map[string]subscription)
ps.subscriptions[topic] = s
s, err := ps.checkTopic(topic, id, ErrAlreadySubscribed)
if err != nil {
return err
}
ps.configReader(id, topic, s, handler)

consumer, err := kf.NewConsumer(&kf.ConfigMap{
"bootstrap.servers": ps.url,
Expand Down

0 comments on commit 5bf7d40

Please sign in to comment.