Skip to content

Commit

Permalink
Migrate from standard library flags package to kingpin.v2
Browse files Browse the repository at this point in the history
  • Loading branch information
Elbandi committed Jan 3, 2025
1 parent 6bd07d1 commit 8abcfe5
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 833 deletions.
205 changes: 91 additions & 114 deletions cmd/mqtt2prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,86 +3,81 @@ package main
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"net/http"
_ "net/http/pprof"
"os"
"os/user"
"runtime"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/prometheus/common/promlog"
"github.com/prometheus/common/promlog/flag"

"github.com/alecthomas/kingpin/v2"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/go-kit/kit/log"
kitzap "github.com/go-kit/kit/log/zap"
"github.com/go-kit/log/level"
"github.com/hikhvar/mqtt2prometheus/pkg/config"
"github.com/hikhvar/mqtt2prometheus/pkg/metrics"
"github.com/hikhvar/mqtt2prometheus/pkg/mqttclient"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
"github.com/prometheus/exporter-toolkit/web"
"github.com/prometheus/exporter-toolkit/web/kingpinflag"
)

// These variables are set by goreleaser at linking time.
var (
version string
commit string
date string
)

var (
configFlag = flag.String(
"config",
"config.yaml",
"config file",
)
portFlag = flag.String(
"listen-port",
"9641",
"HTTP port used to expose metrics",
)
addressFlag = flag.String(
"listen-address",
"0.0.0.0",
"listen address for HTTP server used to expose metrics",
)
versionFlag = flag.Bool(
"version",
false,
"show the builds version, date and commit",
)
logLevelFlag = zap.LevelFlag("log-level", zap.InfoLevel, "sets the default loglevel (default: \"info\")")
logEncodingFlag = flag.String(
"log-format",
"console",
"set the desired log output format. Valid values are 'console' and 'json'",
)
webConfigFlag = flag.String(
"web-config-file",
"",
"[EXPERIMENTAL] Path to configuration file that can enable TLS or authentication for metric scraping.",
)
usePasswordFromFile = flag.Bool(
"treat-mqtt-password-as-file-name",
false,
"treat MQTT2PROM_MQTT_PASSWORD as a secret file path e.g. /var/run/secrets/mqtt-credential",
func main() {
var (
metricsPath = kingpin.Flag(
"web.telemetry-path",
"Path under which to expose metrics.",
).Default("/metrics").String()
configFlag = kingpin.Flag(
"config",
"config file",
).Default("config.yaml").String()
/*
maxRequests = kingpin.Flag(
"web.max-requests",
"Maximum number of parallel scrape requests. Use 0 to disable.",
).Default("40").Int()
*/
usePasswordFromFile = kingpin.Flag(
"treat-mqtt-password-as-file-name",
"treat MQTT2PROM_MQTT_PASSWORD as a secret file path e.g. /var/run/secrets/mqtt-credential",
).Default("false").Bool()
maxProcs = kingpin.Flag(
"runtime.gomaxprocs", "The target number of CPUs Go will run on (GOMAXPROCS)",
).Envar("GOMAXPROCS").Default("1").Int()
toolkitFlags = kingpinflag.AddFlags(kingpin.CommandLine, ":9641")
)
)

func main() {
flag.Parse()
if *versionFlag {
mustShowVersion()
os.Exit(0)
promlogConfig := &promlog.Config{}
flag.AddFlags(kingpin.CommandLine, promlogConfig)
kingpin.Version(version.Print("mqtt2prometheus_exporter"))
kingpin.CommandLine.UsageWriter(os.Stdout)
kingpin.HelpFlag.Short('h')
kingpin.Parse()
logger := promlog.New(promlogConfig)

level.Info(logger).Log("msg", "Starting mqtt2prometheus_exporter", "version", version.Info())
level.Info(logger).Log("msg", "Build context", "build_context", version.BuildContext())
if user, err := user.Current(); err == nil && user.Uid == "0" {
level.Warn(logger).Log("msg", "MQTT2Prometheus Exporter is running as root user. This exporter is designed to run as unprivileged user, root is not required.")
}
logger := mustSetupLogger()
defer logger.Sync() //nolint:errcheck
runtime.GOMAXPROCS(*maxProcs)
level.Debug(logger).Log("msg", "Go MAXPROCS", "procs", runtime.GOMAXPROCS(0))

prometheus.MustRegister(
version.NewCollector("mqtt2prometheus_exporter"),
)
c := make(chan os.Signal, 1)
cfg, err := config.LoadConfig(*configFlag, logger)
if err != nil {
logger.Fatal("Could not load config", zap.Error(err))
level.Error(logger).Log("msg", "Could not load config", "err", err)
os.Exit(1)
}

mqtt_user := os.Getenv("MQTT2PROM_MQTT_USER")
Expand All @@ -93,11 +88,13 @@ func main() {
mqtt_password := os.Getenv("MQTT2PROM_MQTT_PASSWORD")
if *usePasswordFromFile {
if mqtt_password == "" {
logger.Fatal("MQTT2PROM_MQTT_PASSWORD is required")
level.Error(logger).Log("msg", "MQTT2PROM_MQTT_PASSWORD is required")
os.Exit(1)
}
secret, err := ioutil.ReadFile(mqtt_password)
if err != nil {
logger.Fatal("unable to read mqtt password from secret file", zap.Error(err))
level.Error(logger).Log("msg", "unable to read mqtt password from secret file", "err", err)
os.Exit(1)
}
cfg.MQTT.Password = string(secret)
} else {
Expand All @@ -121,17 +118,19 @@ func main() {
if cfg.MQTT.ClientCert != "" || cfg.MQTT.ClientKey != "" {
tlsconfig, err := newTLSConfig(cfg)
if err != nil {
logger.Fatal("Invalid tls certificate settings", zap.Error(err))
level.Error(logger).Log("msg", "Invalid tls certificate settings", "err", err)
os.Exit(1)
}
mqttClientOptions.SetTLSConfig(tlsconfig)
}

collector := metrics.NewCollector(cfg.Cache.Timeout, cfg.Metrics, logger)
extractor, err := setupExtractor(cfg)
if err != nil {
logger.Fatal("could not setup a metric extractor", zap.Error(err))
level.Error(logger).Log("msg", "could not setup a metric extractor", "err", err)
os.Exit(1)
}
ingest := metrics.NewIngest(collector, extractor, cfg.MQTT.DeviceIDRegex)
ingest := metrics.NewIngest(collector, extractor, cfg.MQTT.DeviceIDRegex, logger)
mqttClientOptions.SetOnConnectHandler(ingest.OnConnectHandler)
mqttClientOptions.SetConnectionLostHandler(ingest.ConnectionLostHandler)
errorChan := make(chan error, 1)
Expand All @@ -147,7 +146,7 @@ func main() {
// connected, break loop
break
}
logger.Warn("could not connect to mqtt broker, sleep 10 second", zap.Error(err))
level.Warn(logger).Log("msg", "could not connect to mqtt broker, sleep 10 second", "err", err)
time.Sleep(10 * time.Second)
}

Expand All @@ -160,50 +159,48 @@ func main() {
reg.MustRegister(collector)
gatherer = reg
}
http.Handle("/metrics", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}))
s := &http.Server{
Addr: getListenAddress(),
Handler: http.DefaultServeMux,

http.Handle(*metricsPath, promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}))
if *metricsPath != "/" {
landingConfig := web.LandingConfig{
Name: "MQTT2Prometheus Exporter",
Description: "Prometheus MQTT2Prometheus Exporter",
Version: version.Info(),
Links: []web.LandingLinks{
{
Address: *metricsPath,
Text: "Metrics",
},
},
}
landingPage, err := web.NewLandingPage(landingConfig)
if err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
http.Handle("/", landingPage)
}

server := &http.Server{}

go func() {
err = web.ListenAndServe(s, *webConfigFlag, setupGoKitLogger(logger))
if err != nil {
logger.Fatal("Error while serving http", zap.Error(err))
if err := web.ListenAndServe(server, toolkitFlags, logger); err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
}()

for {
select {
case <-c:
logger.Info("Terminated via Signal. Stop.")
level.Info(logger).Log("msg", "Terminated via Signal. Stop.")
os.Exit(0)
case err = <-errorChan:
logger.Error("Error while processing message", zap.Error(err))
level.Error(logger).Log("msg", "Error while processing message", "err", err)
}
}
}

func getListenAddress() string {
return fmt.Sprintf("%s:%s", *addressFlag, *portFlag)
}

func mustShowVersion() {
versionInfo := struct {
Version string
Commit string
Date string
}{
Version: version,
Commit: commit,
Date: date,
}

err := json.NewEncoder(os.Stdout).Encode(versionInfo)
if err != nil {
panic(err)
}
}

func mustMQTTClientID() string {
host, err := os.Hostname()
if err != nil {
Expand All @@ -213,26 +210,6 @@ func mustMQTTClientID() string {
return fmt.Sprintf("%s-%d", host, pid)
}

func mustSetupLogger() *zap.Logger {
cfg := zap.NewProductionConfig()
cfg.Level = zap.NewAtomicLevelAt(*logLevelFlag)
cfg.Encoding = *logEncodingFlag
if cfg.Encoding == "console" {
cfg.EncoderConfig.EncodeTime = zapcore.RFC3339TimeEncoder
}
logger, err := cfg.Build()
if err != nil {
panic(fmt.Sprintf("failed to build logger: %v", err))
}

config.SetProcessContext(logger)
return logger
}

func setupGoKitLogger(l *zap.Logger) log.Logger {
return kitzap.NewZapSugarLogger(l, zap.NewAtomicLevelAt(*logLevelFlag).Level())
}

func setupExtractor(cfg config.Config) (metrics.Extractor, error) {
parser := metrics.NewParser(cfg.Metrics, cfg.JsonParsing.Separator, cfg.Cache.StateDir)
if cfg.MQTT.ObjectPerTopicConfig != nil {
Expand Down
34 changes: 17 additions & 17 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,38 @@ module github.com/hikhvar/mqtt2prometheus
go 1.18

require (
github.com/alecthomas/kingpin/v2 v2.4.0
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/expr-lang/expr v1.16.9
github.com/go-kit/kit v0.10.0
github.com/go-kit/log v0.2.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/exporter-toolkit v0.7.3
github.com/prometheus/client_golang v1.17.0
github.com/prometheus/common v0.45.0
github.com/prometheus/exporter-toolkit v0.11.0
github.com/thedevsaddam/gojsonq/v2 v2.5.2
go.uber.org/zap v1.16.0
gopkg.in/yaml.v2 v2.4.0
)

require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/go-kit/log v0.1.0 // indirect
github.com/go-logfmt/logfmt v0.5.0 // indirect
github.com/golang/protobuf v1.5.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.29.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
go.uber.org/atomic v1.6.0 // indirect
go.uber.org/multierr v1.5.0 // indirect
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c // indirect
golang.org/x/oauth2 v0.12.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/appengine v1.6.6 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.33.0 // indirect
)
Loading

0 comments on commit 8abcfe5

Please sign in to comment.