Skip to content

Commit

Permalink
MF-1582 - Fix lora-adapter MQTT client (#1583)
Browse files Browse the repository at this point in the history
* MF-1582 - Fix lora-adapter MQTT clien

Signed-off-by: Manuel Imperiale <[email protected]>

* Add timeout config to the mqtt subscriber

Signed-off-by: Manuel Imperiale <[email protected]>

* Rm comment

Signed-off-by: Manuel Imperiale <[email protected]>

* Add sub timeout

Signed-off-by: Manuel Imperiale <[email protected]>
  • Loading branch information
manuio authored Apr 11, 2022
1 parent c6f7c69 commit 48d6a95
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 49 deletions.
88 changes: 53 additions & 35 deletions cmd/lora/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package main

import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
Expand All @@ -15,13 +14,13 @@ import (
"syscall"
"time"

mqttPaho "github.com/eclipse/paho.mqtt.golang"
r "github.com/go-redis/redis/v8"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/lora"
"github.com/mainflux/mainflux/lora/api"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/pkg/messaging/mqtt"
"github.com/mainflux/mainflux/lora/mqtt"
"github.com/mainflux/mainflux/pkg/messaging/nats"

kitprometheus "github.com/go-kit/kit/metrics/prometheus"
Expand All @@ -33,7 +32,10 @@ const (
defLogLevel = "error"
defHTTPPort = "8180"
defLoraMsgURL = "tcp://localhost:1883"
defSubTimeout = "30s" // 30 seconds
defLoraMsgTopic = "application/+/device/+/event/up"
defLoraMsgUser = ""
defLoraMsgPass = ""
defLoraMsgTimeout = "30s"
defNatsURL = "nats://localhost:4222"
defESURL = "localhost:6379"
defESPass = ""
Expand All @@ -45,7 +47,10 @@ const (

envHTTPPort = "MF_LORA_ADAPTER_HTTP_PORT"
envLoraMsgURL = "MF_LORA_ADAPTER_MESSAGES_URL"
envSubTimeout = "MF_LORA_ADAPTER_SUBSCRIBER_TIMEOUT"
envLoraMsgTopic = "MF_LORA_ADAPTER_MESSAGES_TOPIC"
envLoraMsgUser = "MF_LORA_ADAPTER_MESSAGES_USER"
envLoraMsgPass = "MF_LORA_ADAPTER_MESSAGES_PASS"
envLoraMsgTimeout = "MF_LORA_ADAPTER_MESSAGES_TIMEOUT"
envNatsURL = "MF_NATS_URL"
envLogLevel = "MF_LORA_ADAPTER_LOG_LEVEL"
envESURL = "MF_THINGS_ES_URL"
Expand All @@ -56,8 +61,6 @@ const (
envRouteMapPass = "MF_LORA_ADAPTER_ROUTE_MAP_PASS"
envRouteMapDB = "MF_LORA_ADAPTER_ROUTE_MAP_DB"

loraServerTopic = "application/+/device/+/rx"

thingsRMPrefix = "thing"
channelsRMPrefix = "channel"
connsRMPrefix = "connection"
Expand All @@ -66,8 +69,11 @@ const (
type config struct {
httpPort string
loraMsgURL string
loraMsgUser string
loraMsgPass string
loraMsgTopic string
loraMsgTimeout time.Duration
natsURL string
subTimeout time.Duration
logLevel string
esURL string
esPass string
Expand Down Expand Up @@ -121,14 +127,9 @@ func main() {
}, []string{"method"}),
)

msub, err := mqtt.NewSubscriber(cfg.loraMsgURL, cfg.subTimeout, logger)
if err != nil {
logger.Error(fmt.Sprintf("Failed to create MQTT subscriber: %s", err))
os.Exit(1)
}

go subscribeToLoRaBroker(svc, msub, logger)
mqttConn := connectToMQTTBroker(cfg.loraMsgURL, cfg.loraMsgUser, cfg.loraMsgPass, cfg.loraMsgTimeout, logger)

go subscribeToLoRaBroker(svc, mqttConn, cfg.loraMsgTimeout, cfg.loraMsgTopic, logger)
go subscribeToThingsES(svc, esConn, cfg.esConsumerName, logger)

errs := make(chan error, 2)
Expand All @@ -146,14 +147,18 @@ func main() {
}

func loadConfig() config {
mqttTimeout, err := time.ParseDuration(mainflux.Env(envSubTimeout, defSubTimeout))
mqttTimeout, err := time.ParseDuration(mainflux.Env(envLoraMsgTimeout, defLoraMsgTimeout))
if err != nil {
log.Fatalf("Invalid %s value: %s", envSubTimeout, err.Error())
log.Fatalf("Invalid %s value: %s", envLoraMsgTimeout, err.Error())
}

return config{
httpPort: mainflux.Env(envHTTPPort, defHTTPPort),
loraMsgURL: mainflux.Env(envLoraMsgURL, defLoraMsgURL),
subTimeout: mqttTimeout,
loraMsgTopic: mainflux.Env(envLoraMsgTopic, defLoraMsgTopic),
loraMsgUser: mainflux.Env(envLoraMsgUser, defLoraMsgUser),
loraMsgPass: mainflux.Env(envLoraMsgPass, defLoraMsgPass),
loraMsgTimeout: mqttTimeout,
natsURL: mainflux.Env(envNatsURL, defNatsURL),
logLevel: mainflux.Env(envLogLevel, defLogLevel),
esURL: mainflux.Env(envESURL, defESURL),
Expand All @@ -166,6 +171,29 @@ func loadConfig() config {
}
}

func connectToMQTTBroker(url, user, password string, timeout time.Duration, logger logger.Logger) mqttPaho.Client {
opts := mqttPaho.NewClientOptions()
opts.AddBroker(url)
opts.SetUsername(user)
opts.SetPassword(password)
opts.SetOnConnectHandler(func(c mqttPaho.Client) {
logger.Info("Connected to Lora MQTT broker")
})
opts.SetConnectionLostHandler(func(c mqttPaho.Client, err error) {
logger.Error(fmt.Sprintf("MQTT connection lost: %s", err.Error()))
os.Exit(1)
})

client := mqttPaho.NewClient(opts)

if token := client.Connect(); token.WaitTimeout(timeout) && token.Error() != nil {
logger.Error(fmt.Sprintf("Failed to connect to Lora MQTT broker: %s", token.Error()))
os.Exit(1)
}

return client
}

func connectToRedis(redisURL, redisPass, redisDB string, logger logger.Logger) *r.Client {
db, err := strconv.Atoi(redisDB)
if err != nil {
Expand All @@ -180,30 +208,20 @@ func connectToRedis(redisURL, redisPass, redisDB string, logger logger.Logger) *
})
}

func subscribeToLoRaBroker(svc lora.Service, msub messaging.Subscriber, logger logger.Logger) {
err := msub.Subscribe(loraServerTopic, func(msg messaging.Message) error {
var m lora.Message
if err := json.Unmarshal(msg.Payload, &m); err != nil {
logger.Warn(fmt.Sprintf("Failed to Unmarshal message: %s", err.Error()))
return err
}
if err := svc.Publish(context.Background(), m); err != nil {
return err
}
return nil
})
if err != nil {
logger.Error(fmt.Sprintf("Failed to subscribe to LoRa MQTT broker: %s", err))
func subscribeToLoRaBroker(svc lora.Service, mc mqttPaho.Client, timeout time.Duration, topic string, logger logger.Logger) {
mqtt := mqtt.NewBroker(svc, mc, timeout, logger)
logger.Info("Subscribed to Lora MQTT broker")
if err := mqtt.Subscribe(topic); err != nil {
logger.Error(fmt.Sprintf("Failed to subscribe to Lora MQTT broker: %s", err))
os.Exit(1)
}
logger.Info("Subscribed to LoRa MQTT broker")
}

func subscribeToThingsES(svc lora.Service, client *r.Client, consumer string, logger logger.Logger) {
eventStore := redis.NewEventStore(svc, client, consumer, logger)
logger.Info("Subscribed to Redis Event Store")
if err := eventStore.Subscribe(context.Background(), "mainflux.things"); err != nil {
logger.Warn(fmt.Sprintf("LoRa-adapter service failed to subscribe to Redis event source: %s", err))
logger.Warn(fmt.Sprintf("Lora-adapter service failed to subscribe to Redis event source: %s", err))
}
}

Expand All @@ -214,6 +232,6 @@ func newRouteMapRepository(client *r.Client, prefix string, logger logger.Logger

func startHTTPServer(cfg config, logger logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", cfg.httpPort)
logger.Info(fmt.Sprintf("LoRa-adapter service started, exposed port %s", cfg.httpPort))
logger.Info(fmt.Sprintf("lora-adapter service started, exposed port %s", cfg.httpPort))
errs <- http.ListenAndServe(p, api.MakeHandler())
}
4 changes: 4 additions & 0 deletions docker/.env
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ MF_VAULT_CA_L=Belgrade
### LoRa
MF_LORA_ADAPTER_LOG_LEVEL=debug
MF_LORA_ADAPTER_MESSAGES_URL=tcp://lora.mqtt.mainflux.io:1883
MF_LORA_ADAPTER_MESSAGES_TOPIC=application/+/device/+/event/up
MF_LORA_ADAPTER_MESSAGES_USER=
MF_LORA_ADAPTER_MESSAGES_PASS=
MF_LORA_ADAPTER_MESSAGES_TIMEOUT=30s
MF_LORA_ADAPTER_HTTP_PORT=8187
MF_LORA_ADAPTER_ROUTE_MAP_URL=localhost:6379
MF_LORA_ADAPTER_ROUTE_MAP_PASS=
Expand Down
4 changes: 4 additions & 0 deletions docker/addons/lora-adapter/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ services:
MF_THINGS_ES_URL: es-redis:${MF_REDIS_TCP_PORT}
MF_LORA_ADAPTER_ROUTE_MAP_URL: lora-redis:${MF_REDIS_TCP_PORT}
MF_LORA_ADAPTER_MESSAGES_URL: ${MF_LORA_ADAPTER_MESSAGES_URL}
MF_LORA_ADAPTER_MESSAGES_TOPIC: ${MF_LORA_ADAPTER_MESSAGES_TOPIC}
MF_LORA_ADAPTER_MESSAGES_USER: ${MF_LORA_ADAPTER_MESSAGES_USER}
MF_LORA_ADAPTER_MESSAGES_PASS: ${MF_LORA_ADAPTER_MESSAGES_PASS}
MF_LORA_ADAPTER_MESSAGES_TIMEOUT: ${MF_LORA_ADAPTER_MESSAGES_TIMEOUT}
MF_LORA_ADAPTER_HTTP_PORT: ${MF_LORA_ADAPTER_HTTP_PORT}
MF_NATS_URL: ${MF_NATS_URL}
ports:
Expand Down
36 changes: 22 additions & 14 deletions lora/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,23 @@ The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.

| Variable | Description | Default |
|----------------------------------|--------------------------------------|-----------------------|
| MF_LORA_ADAPTER_HTTP_PORT | Service HTTP port | 8180 |
| MF_LORA_ADAPTER_LOG_LEVEL | Service Log level | error |
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_LORA_ADAPTER_MESSAGES_URL | LoRa Server MQTT broker URL | tcp://localhost:1883 |
| MF_LORA_ADAPTER_ROUTE_MAP_URL | Route-map database URL | localhost:6379 |
| MF_LORA_ADAPTER_ROUTE_MAP_PASS | Route-map database password | |
| MF_LORA_ADAPTER_ROUTE_MAP_DB | Route-map instance | 0 |
| MF_THINGS_ES_URL | Things service event source URL | localhost:6379 |
| MF_THINGS_ES_PASS | Things service event source password | |
| MF_THINGS_ES_DB | Things service event source DB | 0 |
| MF_LORA_ADAPTER_EVENT_CONSUMER | Service event consumer name | lora |
| Variable | Description | Default |
|----------------------------------|---------------------------------------|---------------------------------|
| MF_LORA_ADAPTER_HTTP_PORT | Service HTTP port | 8180 |
| MF_LORA_ADAPTER_LOG_LEVEL | Service Log level | error |
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_LORA_ADAPTER_MESSAGES_URL | LoRa adapter MQTT broker URL | tcp://localhost:1883 |
| MF_LORA_ADAPTER_MESSAGES_TOPIC | LoRa adapter MQTT subscriber Topic | application/+/device/+/event/up |
| MF_LORA_ADAPTER_MESSAGES_USER | LoRa adapter MQTT subscriber Username | |
| MF_LORA_ADAPTER_MESSAGES_PASS | LoRa adapter MQTT subscriber Password | |
| MF_LORA_ADAPTER_MESSAGES_TIMEOUT | LoRa adapter MQTT subscriber Timeout | 30s |
| MF_LORA_ADAPTER_ROUTE_MAP_URL | Route-map database URL | localhost:6379 |
| MF_LORA_ADAPTER_ROUTE_MAP_PASS | Route-map database password | |
| MF_LORA_ADAPTER_ROUTE_MAP_DB | Route-map instance | 0 |
| MF_THINGS_ES_URL | Things service event source URL | localhost:6379 |
| MF_THINGS_ES_PASS | Things service event source password | |
| MF_THINGS_ES_DB | Things service event source DB | 0 |
| MF_LORA_ADAPTER_EVENT_CONSUMER | Service event consumer name | lora |

## Deployment

Expand All @@ -47,7 +51,11 @@ make install
# set the environment variables and run the service
MF_LORA_ADAPTER_LOG_LEVEL=[Lora Adapter Log Level] \
MF_NATS_URL=[NATS instance URL] \
MF_LORA_ADAPTER_MESSAGES_URL=[LoRa Server mqtt broker URL] \
MF_LORA_ADAPTER_MESSAGES_URL=[LoRa adapter MQTT broker URL] \
MF_LORA_ADAPTER_MESSAGES_TOPIC=[LoRa adapter MQTT subscriber Topic] \
MF_LORA_ADAPTER_MESSAGES_USER=[LoRa adapter MQTT subscriber Username] \
MF_LORA_ADAPTER_MESSAGES_PASS=[LoRa adapter MQTT subscriber Password] \
MF_LORA_ADAPTER_MESSAGES_TIMEOUT=[LoRa adapter MQTT subscriber Timeout]
MF_LORA_ADAPTER_ROUTE_MAP_URL=[Lora adapter routemap URL] \
MF_LORA_ADAPTER_ROUTE_MAP_PASS=[Lora adapter routemap password] \
MF_LORA_ADAPTER_ROUTE_MAP_DB=[Lora adapter routemap instance] \
Expand Down
59 changes: 59 additions & 0 deletions lora/mqtt/sub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package mqtt

// LoraSubscribe subscribe to lora server messages
import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/lora"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

// Subscriber represents the MQTT broker.
type Subscriber interface {
// Subscribes to given subject and receives events.
Subscribe(string) error
}

type broker struct {
svc lora.Service
client mqtt.Client
logger logger.Logger
timeout time.Duration
}

// NewBroker returns new MQTT broker instance.
func NewBroker(svc lora.Service, client mqtt.Client, t time.Duration, log logger.Logger) Subscriber {
return broker{
svc: svc,
client: client,
logger: log,
timeout: t,
}
}

// Subscribe subscribes to the Lora MQTT message broker
func (b broker) Subscribe(subject string) error {
s := b.client.Subscribe(subject, 0, b.handleMsg)
if err := s.Error(); s.WaitTimeout(b.timeout) && err != nil {
return err
}

return nil
}

// handleMsg triggered when new message is received on Lora MQTT broker
func (b broker) handleMsg(c mqtt.Client, msg mqtt.Message) {
m := lora.Message{}
if err := json.Unmarshal(msg.Payload(), &m); err != nil {
b.logger.Warn(fmt.Sprintf("Failed to unmarshal message: %s", err.Error()))
return
}

b.svc.Publish(context.Background(), m)
return
}

0 comments on commit 48d6a95

Please sign in to comment.