Skip to content

Commit

Permalink
Merge pull request #2 from pulsar256/feature/auth-and-fixes
Browse files Browse the repository at this point in the history
MQTT auth, fixed error handling, less strict json parsing
  • Loading branch information
hikhvar authored Feb 3, 2020
2 parents 3bbf170 + e0e3cbf commit 776bc49
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 15 deletions.
13 changes: 6 additions & 7 deletions cmd/mqtt2prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,15 @@ package main
import (
"log"
"net/http"

"os"

"time"

"flag"

"github.com/eclipse/paho.mqtt.golang"
"github.com/hikhvar/mqtt2prometheus/pkg/metrics"
"github.com/hikhvar/mqtt2prometheus/pkg/mqttclient"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

"fmt"

"github.com/hikhvar/mqtt2prometheus/pkg/config"
)

Expand Down Expand Up @@ -52,10 +46,14 @@ func main() {
}
mqttClientOptions := mqtt.NewClientOptions()
mqttClientOptions.AddBroker(cfg.MQTT.Server).SetClientID(hostName).SetCleanSession(true)
mqttClientOptions.SetUsername(cfg.MQTT.User)
mqttClientOptions.SetPassword(cfg.MQTT.Password)

collector := metrics.NewCollector(2*time.Minute, cfg.Metrics)
ingest := metrics.NewIngest(collector, cfg.Metrics)

var errorChan chan error
errorChan := make(chan error,1)

err = mqttclient.Subscribe(mqttClientOptions, mqttclient.SubscribeOptions{
Topic: cfg.MQTT.TopicPath + "/+",
QoS: cfg.MQTT.QoS,
Expand All @@ -73,6 +71,7 @@ func main() {
log.Fatalf("Error while serving http: %s", err.Error())
}
}()

for {
select {
case <-c:
Expand Down
3 changes: 3 additions & 0 deletions config.yaml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
mqtt:
# The MQTT broker to connect to
server: tcp://127.0.0.1:1883
# Optional: Username and Password for authenticating with the MQTT Server
# user: bob
# password: happylittleclouds
# The Topic path to subscripe to. Actually this will become `$topic_path/+`
topic_path: v1/devices/me
# The MQTT QoS level
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type CacheConfig struct {
type MQTTConfig struct {
Server string `yaml:"server"`
TopicPath string `yaml:"topic_path"`
User string `yaml:"user"`
Password string `yaml:"password"`
QoS byte `yaml:"qos"`
}

Expand Down
19 changes: 11 additions & 8 deletions pkg/metrics/ingest.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
package metrics

import (
"errors"

"fmt"
"path/filepath"

"encoding/json"

"fmt"

"log"

"github.com/eclipse/paho.mqtt.golang"
"github.com/hikhvar/mqtt2prometheus/pkg/config"
"github.com/prometheus/client_golang/prometheus"
)

var NoValidPayload = errors.New("no valid MQTT payload")

type Ingest struct {
validMetrics map[string]config.MetricConfig
collector Collector
Expand All @@ -42,15 +37,21 @@ func NewIngest(collector Collector, metrics []config.MetricConfig) *Ingest {
}
}

type MQTTPayload map[string]float64
type MQTTPayload map[string]interface{}

func (i *Ingest) store(deviceID string, rawMetrics MQTTPayload) error {
var mc MetricCollection
for metricName, value := range rawMetrics {
if cfg, found := i.validMetrics[metricName]; found {

floatValue, ok := value.(float64)
if !ok {
return fmt.Errorf("got data with unexpectd type: %T ('%s') but wanted float64", value, value)
}

mc = append(mc, Metric{
Description: cfg.PrometheusDescription(),
Value: value,
Value: floatValue,
ValueType: cfg.PrometheusValueType(),
})
}
Expand All @@ -68,11 +69,13 @@ func (i *Ingest) SetupSubscriptionHandler(errChan chan<- error) mqtt.MessageHand
if err != nil {
errChan <- fmt.Errorf("could not decode message '%s' on topic %s: %s", string(m.Payload()), m.Topic(), err.Error())
i.MessageMetric.WithLabelValues("decodeError", m.Topic()).Desc()
return
}
err = i.store(deviceId, rawMetrics)
if err != nil {
errChan <- fmt.Errorf("could not store metrics '%s' on topic %s: %s", string(m.Payload()), m.Topic(), err.Error())
i.MessageMetric.WithLabelValues("storeError", m.Topic()).Inc()
return
}
i.MessageMetric.WithLabelValues("success", m.Topic()).Inc()
}
Expand Down

0 comments on commit 776bc49

Please sign in to comment.