From 48d6a95a2178851292d22ce5ff2fd483c5b4d23c Mon Sep 17 00:00:00 2001 From: Manuel Imperiale Date: Mon, 11 Apr 2022 13:52:20 +0200 Subject: [PATCH] MF-1582 - Fix lora-adapter MQTT client (#1583) * MF-1582 - Fix lora-adapter MQTT clien Signed-off-by: Manuel Imperiale * Add timeout config to the mqtt subscriber Signed-off-by: Manuel Imperiale * Rm comment Signed-off-by: Manuel Imperiale * Add sub timeout Signed-off-by: Manuel Imperiale --- cmd/lora/main.go | 88 +++++++++++-------- docker/.env | 4 + docker/addons/lora-adapter/docker-compose.yml | 4 + lora/README.md | 36 +++++--- lora/mqtt/sub.go | 59 +++++++++++++ 5 files changed, 142 insertions(+), 49 deletions(-) create mode 100644 lora/mqtt/sub.go diff --git a/cmd/lora/main.go b/cmd/lora/main.go index d15e629c86..1df76ec7bc 100644 --- a/cmd/lora/main.go +++ b/cmd/lora/main.go @@ -5,7 +5,6 @@ package main import ( "context" - "encoding/json" "fmt" "log" "net/http" @@ -15,13 +14,13 @@ import ( "syscall" "time" + mqttPaho "github.com/eclipse/paho.mqtt.golang" r "github.com/go-redis/redis/v8" "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/lora" "github.com/mainflux/mainflux/lora/api" - "github.com/mainflux/mainflux/pkg/messaging" - "github.com/mainflux/mainflux/pkg/messaging/mqtt" + "github.com/mainflux/mainflux/lora/mqtt" "github.com/mainflux/mainflux/pkg/messaging/nats" kitprometheus "github.com/go-kit/kit/metrics/prometheus" @@ -33,7 +32,10 @@ const ( defLogLevel = "error" defHTTPPort = "8180" defLoraMsgURL = "tcp://localhost:1883" - defSubTimeout = "30s" // 30 seconds + defLoraMsgTopic = "application/+/device/+/event/up" + defLoraMsgUser = "" + defLoraMsgPass = "" + defLoraMsgTimeout = "30s" defNatsURL = "nats://localhost:4222" defESURL = "localhost:6379" defESPass = "" @@ -45,7 +47,10 @@ const ( envHTTPPort = "MF_LORA_ADAPTER_HTTP_PORT" envLoraMsgURL = "MF_LORA_ADAPTER_MESSAGES_URL" - envSubTimeout = "MF_LORA_ADAPTER_SUBSCRIBER_TIMEOUT" + envLoraMsgTopic = "MF_LORA_ADAPTER_MESSAGES_TOPIC" + envLoraMsgUser = "MF_LORA_ADAPTER_MESSAGES_USER" + envLoraMsgPass = "MF_LORA_ADAPTER_MESSAGES_PASS" + envLoraMsgTimeout = "MF_LORA_ADAPTER_MESSAGES_TIMEOUT" envNatsURL = "MF_NATS_URL" envLogLevel = "MF_LORA_ADAPTER_LOG_LEVEL" envESURL = "MF_THINGS_ES_URL" @@ -56,8 +61,6 @@ const ( envRouteMapPass = "MF_LORA_ADAPTER_ROUTE_MAP_PASS" envRouteMapDB = "MF_LORA_ADAPTER_ROUTE_MAP_DB" - loraServerTopic = "application/+/device/+/rx" - thingsRMPrefix = "thing" channelsRMPrefix = "channel" connsRMPrefix = "connection" @@ -66,8 +69,11 @@ const ( type config struct { httpPort string loraMsgURL string + loraMsgUser string + loraMsgPass string + loraMsgTopic string + loraMsgTimeout time.Duration natsURL string - subTimeout time.Duration logLevel string esURL string esPass string @@ -121,14 +127,9 @@ func main() { }, []string{"method"}), ) - msub, err := mqtt.NewSubscriber(cfg.loraMsgURL, cfg.subTimeout, logger) - if err != nil { - logger.Error(fmt.Sprintf("Failed to create MQTT subscriber: %s", err)) - os.Exit(1) - } - - go subscribeToLoRaBroker(svc, msub, logger) + mqttConn := connectToMQTTBroker(cfg.loraMsgURL, cfg.loraMsgUser, cfg.loraMsgPass, cfg.loraMsgTimeout, logger) + go subscribeToLoRaBroker(svc, mqttConn, cfg.loraMsgTimeout, cfg.loraMsgTopic, logger) go subscribeToThingsES(svc, esConn, cfg.esConsumerName, logger) errs := make(chan error, 2) @@ -146,14 +147,18 @@ func main() { } func loadConfig() config { - mqttTimeout, err := time.ParseDuration(mainflux.Env(envSubTimeout, defSubTimeout)) + mqttTimeout, err := time.ParseDuration(mainflux.Env(envLoraMsgTimeout, defLoraMsgTimeout)) if err != nil { - log.Fatalf("Invalid %s value: %s", envSubTimeout, err.Error()) + log.Fatalf("Invalid %s value: %s", envLoraMsgTimeout, err.Error()) } + return config{ httpPort: mainflux.Env(envHTTPPort, defHTTPPort), loraMsgURL: mainflux.Env(envLoraMsgURL, defLoraMsgURL), - subTimeout: mqttTimeout, + loraMsgTopic: mainflux.Env(envLoraMsgTopic, defLoraMsgTopic), + loraMsgUser: mainflux.Env(envLoraMsgUser, defLoraMsgUser), + loraMsgPass: mainflux.Env(envLoraMsgPass, defLoraMsgPass), + loraMsgTimeout: mqttTimeout, natsURL: mainflux.Env(envNatsURL, defNatsURL), logLevel: mainflux.Env(envLogLevel, defLogLevel), esURL: mainflux.Env(envESURL, defESURL), @@ -166,6 +171,29 @@ func loadConfig() config { } } +func connectToMQTTBroker(url, user, password string, timeout time.Duration, logger logger.Logger) mqttPaho.Client { + opts := mqttPaho.NewClientOptions() + opts.AddBroker(url) + opts.SetUsername(user) + opts.SetPassword(password) + opts.SetOnConnectHandler(func(c mqttPaho.Client) { + logger.Info("Connected to Lora MQTT broker") + }) + opts.SetConnectionLostHandler(func(c mqttPaho.Client, err error) { + logger.Error(fmt.Sprintf("MQTT connection lost: %s", err.Error())) + os.Exit(1) + }) + + client := mqttPaho.NewClient(opts) + + if token := client.Connect(); token.WaitTimeout(timeout) && token.Error() != nil { + logger.Error(fmt.Sprintf("Failed to connect to Lora MQTT broker: %s", token.Error())) + os.Exit(1) + } + + return client +} + func connectToRedis(redisURL, redisPass, redisDB string, logger logger.Logger) *r.Client { db, err := strconv.Atoi(redisDB) if err != nil { @@ -180,30 +208,20 @@ func connectToRedis(redisURL, redisPass, redisDB string, logger logger.Logger) * }) } -func subscribeToLoRaBroker(svc lora.Service, msub messaging.Subscriber, logger logger.Logger) { - err := msub.Subscribe(loraServerTopic, func(msg messaging.Message) error { - var m lora.Message - if err := json.Unmarshal(msg.Payload, &m); err != nil { - logger.Warn(fmt.Sprintf("Failed to Unmarshal message: %s", err.Error())) - return err - } - if err := svc.Publish(context.Background(), m); err != nil { - return err - } - return nil - }) - if err != nil { - logger.Error(fmt.Sprintf("Failed to subscribe to LoRa MQTT broker: %s", err)) +func subscribeToLoRaBroker(svc lora.Service, mc mqttPaho.Client, timeout time.Duration, topic string, logger logger.Logger) { + mqtt := mqtt.NewBroker(svc, mc, timeout, logger) + logger.Info("Subscribed to Lora MQTT broker") + if err := mqtt.Subscribe(topic); err != nil { + logger.Error(fmt.Sprintf("Failed to subscribe to Lora MQTT broker: %s", err)) os.Exit(1) } - logger.Info("Subscribed to LoRa MQTT broker") } func subscribeToThingsES(svc lora.Service, client *r.Client, consumer string, logger logger.Logger) { eventStore := redis.NewEventStore(svc, client, consumer, logger) logger.Info("Subscribed to Redis Event Store") if err := eventStore.Subscribe(context.Background(), "mainflux.things"); err != nil { - logger.Warn(fmt.Sprintf("LoRa-adapter service failed to subscribe to Redis event source: %s", err)) + logger.Warn(fmt.Sprintf("Lora-adapter service failed to subscribe to Redis event source: %s", err)) } } @@ -214,6 +232,6 @@ func newRouteMapRepository(client *r.Client, prefix string, logger logger.Logger func startHTTPServer(cfg config, logger logger.Logger, errs chan error) { p := fmt.Sprintf(":%s", cfg.httpPort) - logger.Info(fmt.Sprintf("LoRa-adapter service started, exposed port %s", cfg.httpPort)) + logger.Info(fmt.Sprintf("lora-adapter service started, exposed port %s", cfg.httpPort)) errs <- http.ListenAndServe(p, api.MakeHandler()) } diff --git a/docker/.env b/docker/.env index ef620b7708..86bf4be593 100644 --- a/docker/.env +++ b/docker/.env @@ -185,6 +185,10 @@ MF_VAULT_CA_L=Belgrade ### LoRa MF_LORA_ADAPTER_LOG_LEVEL=debug MF_LORA_ADAPTER_MESSAGES_URL=tcp://lora.mqtt.mainflux.io:1883 +MF_LORA_ADAPTER_MESSAGES_TOPIC=application/+/device/+/event/up +MF_LORA_ADAPTER_MESSAGES_USER= +MF_LORA_ADAPTER_MESSAGES_PASS= +MF_LORA_ADAPTER_MESSAGES_TIMEOUT=30s MF_LORA_ADAPTER_HTTP_PORT=8187 MF_LORA_ADAPTER_ROUTE_MAP_URL=localhost:6379 MF_LORA_ADAPTER_ROUTE_MAP_PASS= diff --git a/docker/addons/lora-adapter/docker-compose.yml b/docker/addons/lora-adapter/docker-compose.yml index 11a59b3348..177bfb218d 100644 --- a/docker/addons/lora-adapter/docker-compose.yml +++ b/docker/addons/lora-adapter/docker-compose.yml @@ -29,6 +29,10 @@ services: MF_THINGS_ES_URL: es-redis:${MF_REDIS_TCP_PORT} MF_LORA_ADAPTER_ROUTE_MAP_URL: lora-redis:${MF_REDIS_TCP_PORT} MF_LORA_ADAPTER_MESSAGES_URL: ${MF_LORA_ADAPTER_MESSAGES_URL} + MF_LORA_ADAPTER_MESSAGES_TOPIC: ${MF_LORA_ADAPTER_MESSAGES_TOPIC} + MF_LORA_ADAPTER_MESSAGES_USER: ${MF_LORA_ADAPTER_MESSAGES_USER} + MF_LORA_ADAPTER_MESSAGES_PASS: ${MF_LORA_ADAPTER_MESSAGES_PASS} + MF_LORA_ADAPTER_MESSAGES_TIMEOUT: ${MF_LORA_ADAPTER_MESSAGES_TIMEOUT} MF_LORA_ADAPTER_HTTP_PORT: ${MF_LORA_ADAPTER_HTTP_PORT} MF_NATS_URL: ${MF_NATS_URL} ports: diff --git a/lora/README.md b/lora/README.md index 9779e4f4a2..2ff68b7815 100644 --- a/lora/README.md +++ b/lora/README.md @@ -11,19 +11,23 @@ The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -|----------------------------------|--------------------------------------|-----------------------| -| MF_LORA_ADAPTER_HTTP_PORT | Service HTTP port | 8180 | -| MF_LORA_ADAPTER_LOG_LEVEL | Service Log level | error | -| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | -| MF_LORA_ADAPTER_MESSAGES_URL | LoRa Server MQTT broker URL | tcp://localhost:1883 | -| MF_LORA_ADAPTER_ROUTE_MAP_URL | Route-map database URL | localhost:6379 | -| MF_LORA_ADAPTER_ROUTE_MAP_PASS | Route-map database password | | -| MF_LORA_ADAPTER_ROUTE_MAP_DB | Route-map instance | 0 | -| MF_THINGS_ES_URL | Things service event source URL | localhost:6379 | -| MF_THINGS_ES_PASS | Things service event source password | | -| MF_THINGS_ES_DB | Things service event source DB | 0 | -| MF_LORA_ADAPTER_EVENT_CONSUMER | Service event consumer name | lora | +| Variable | Description | Default | +|----------------------------------|---------------------------------------|---------------------------------| +| MF_LORA_ADAPTER_HTTP_PORT | Service HTTP port | 8180 | +| MF_LORA_ADAPTER_LOG_LEVEL | Service Log level | error | +| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | +| MF_LORA_ADAPTER_MESSAGES_URL | LoRa adapter MQTT broker URL | tcp://localhost:1883 | +| MF_LORA_ADAPTER_MESSAGES_TOPIC | LoRa adapter MQTT subscriber Topic | application/+/device/+/event/up | +| MF_LORA_ADAPTER_MESSAGES_USER | LoRa adapter MQTT subscriber Username | | +| MF_LORA_ADAPTER_MESSAGES_PASS | LoRa adapter MQTT subscriber Password | | +| MF_LORA_ADAPTER_MESSAGES_TIMEOUT | LoRa adapter MQTT subscriber Timeout | 30s | +| MF_LORA_ADAPTER_ROUTE_MAP_URL | Route-map database URL | localhost:6379 | +| MF_LORA_ADAPTER_ROUTE_MAP_PASS | Route-map database password | | +| MF_LORA_ADAPTER_ROUTE_MAP_DB | Route-map instance | 0 | +| MF_THINGS_ES_URL | Things service event source URL | localhost:6379 | +| MF_THINGS_ES_PASS | Things service event source password | | +| MF_THINGS_ES_DB | Things service event source DB | 0 | +| MF_LORA_ADAPTER_EVENT_CONSUMER | Service event consumer name | lora | ## Deployment @@ -47,7 +51,11 @@ make install # set the environment variables and run the service MF_LORA_ADAPTER_LOG_LEVEL=[Lora Adapter Log Level] \ MF_NATS_URL=[NATS instance URL] \ -MF_LORA_ADAPTER_MESSAGES_URL=[LoRa Server mqtt broker URL] \ +MF_LORA_ADAPTER_MESSAGES_URL=[LoRa adapter MQTT broker URL] \ +MF_LORA_ADAPTER_MESSAGES_TOPIC=[LoRa adapter MQTT subscriber Topic] \ +MF_LORA_ADAPTER_MESSAGES_USER=[LoRa adapter MQTT subscriber Username] \ +MF_LORA_ADAPTER_MESSAGES_PASS=[LoRa adapter MQTT subscriber Password] \ +MF_LORA_ADAPTER_MESSAGES_TIMEOUT=[LoRa adapter MQTT subscriber Timeout] MF_LORA_ADAPTER_ROUTE_MAP_URL=[Lora adapter routemap URL] \ MF_LORA_ADAPTER_ROUTE_MAP_PASS=[Lora adapter routemap password] \ MF_LORA_ADAPTER_ROUTE_MAP_DB=[Lora adapter routemap instance] \ diff --git a/lora/mqtt/sub.go b/lora/mqtt/sub.go new file mode 100644 index 0000000000..3ccbf9fe94 --- /dev/null +++ b/lora/mqtt/sub.go @@ -0,0 +1,59 @@ +package mqtt + +// LoraSubscribe subscribe to lora server messages +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/lora" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +// Subscriber represents the MQTT broker. +type Subscriber interface { + // Subscribes to given subject and receives events. + Subscribe(string) error +} + +type broker struct { + svc lora.Service + client mqtt.Client + logger logger.Logger + timeout time.Duration +} + +// NewBroker returns new MQTT broker instance. +func NewBroker(svc lora.Service, client mqtt.Client, t time.Duration, log logger.Logger) Subscriber { + return broker{ + svc: svc, + client: client, + logger: log, + timeout: t, + } +} + +// Subscribe subscribes to the Lora MQTT message broker +func (b broker) Subscribe(subject string) error { + s := b.client.Subscribe(subject, 0, b.handleMsg) + if err := s.Error(); s.WaitTimeout(b.timeout) && err != nil { + return err + } + + return nil +} + +// handleMsg triggered when new message is received on Lora MQTT broker +func (b broker) handleMsg(c mqtt.Client, msg mqtt.Message) { + m := lora.Message{} + if err := json.Unmarshal(msg.Payload(), &m); err != nil { + b.logger.Warn(fmt.Sprintf("Failed to unmarshal message: %s", err.Error())) + return + } + + b.svc.Publish(context.Background(), m) + return +}