From 5bf7d40fdb2edcfc006909819afe182af11b48fe Mon Sep 17 00:00:00 2001 From: rodneyosodo Date: Wed, 15 Feb 2023 11:39:47 +0300 Subject: [PATCH] add wildcard support to kafka Signed-off-by: rodneyosodo --- pkg/messaging/kafka/pubsub.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/pkg/messaging/kafka/pubsub.go b/pkg/messaging/kafka/pubsub.go index 92439aafdee..190f15de109 100644 --- a/pkg/messaging/kafka/pubsub.go +++ b/pkg/messaging/kafka/pubsub.go @@ -8,6 +8,7 @@ import ( "fmt" "strings" "sync" + "time" kf "github.com/confluentinc/confluent-kafka-go/v2/kafka" log "github.com/mainflux/mainflux/logger" @@ -70,18 +71,11 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) ps.mu.Lock() defer ps.mu.Unlock() - // Check topic - s, ok := ps.subscriptions[topic] - switch ok { - case true: - // Check topic ID - if _, ok := s[id]; ok { - return ErrAlreadySubscribed - } - default: - s = make(map[string]subscription) - ps.subscriptions[topic] = s + s, err := ps.checkTopic(topic, id, ErrAlreadySubscribed) + if err != nil { + return err } + ps.configReader(id, topic, s, handler) consumer, err := kf.NewConsumer(&kf.ConfigMap{ "bootstrap.servers": ps.url,