Skip to content

Commit

Permalink
NOISSUE - Improve graceful shutdown and code quality (absmach#1821)
Browse files Browse the repository at this point in the history
* remove single case switch
remove duplicate cases

Signed-off-by: SammyOina <[email protected]>

* remove single case switch \n format comments

Signed-off-by: SammyOina <[email protected]>

* graceful exit on main func

Signed-off-by: SammyOina <[email protected]>

* remove fatal from imported packages

Signed-off-by: SammyOina <[email protected]>

* reuse exit function

Signed-off-by: SammyOina <[email protected]>

* return nill for empty configs

Signed-off-by: SammyOina <[email protected]>

* return nil for config file not found

Signed-off-by: SammyOina <[email protected]>

---------

Signed-off-by: SammyOina <[email protected]>
Co-authored-by: Drasko DRASKOVIC <[email protected]>
  • Loading branch information
SammyOina and drasko authored Jul 28, 2023
1 parent 33eb8d8 commit 7758f42
Show file tree
Hide file tree
Showing 50 changed files with 418 additions and 182 deletions.
10 changes: 5 additions & 5 deletions cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package cli

import (
"fmt"
"log"
"os"

"github.com/mainflux/mainflux/pkg/errors"
Expand Down Expand Up @@ -33,21 +32,21 @@ func read(file string) (Config, error) {
return c, nil
}

func ParseConfig() {
func ParseConfig() error {
if ConfigPath == "" {
// No config file
return
return nil
}

if _, err := os.Stat(ConfigPath); os.IsNotExist(err) {
errConfigNotFound := errors.Wrap(errors.New("config file was not found"), err)
logError(errConfigNotFound)
return
return nil
}

config, err := read(ConfigPath)
if err != nil {
log.Fatal(err)
return err
}

if config.Offset != 0 {
Expand All @@ -65,4 +64,5 @@ func ParseConfig() {
if config.RawOutput {
RawOutput = config.RawOutput
}
return nil
}
18 changes: 14 additions & 4 deletions cmd/bootstrap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,25 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer db.Close()

// Create new redis client for bootstrap event store
esClient, err := redisClient.Setup(envPrefixES)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to setup %s bootstrap event store redis client : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to setup %s bootstrap event store redis client : %s", svcName, err))
exitCode = 1
return
}
defer esClient.Close()

// Create new auth grpc client api
auth, authHandler, err := authClient.Setup(envPrefix, svcName)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer authHandler.Close()
logger.Info("Successfully connected to auth grpc server " + authHandler.Secure())
Expand All @@ -121,7 +127,9 @@ func main() {
// Create an new HTTP server
httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}

hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, bootstrap.NewConfigReader([]byte(cfg.EncKey)), logger, instanceID), logger)
Expand All @@ -142,7 +150,9 @@ func main() {
// Subscribe to things event store
thingsESClient, err := redisClient.Setup(envPrefixES)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer thingsESClient.Close()

Expand Down
14 changes: 11 additions & 3 deletions cmd/cassandra-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,27 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer tcHandler.Close()
logger.Info("Successfully connected to things grpc server " + tcHandler.Secure())

// Create new auth grpc client
auth, authHandler, err := authClient.Setup(envPrefix, svcName)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer authHandler.Close()
logger.Info("Successfully connected to auth grpc server " + authHandler.Secure())

// Create new cassandra client
csdSession, err := cassandraClient.Setup(envPrefix)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer csdSession.Close()

Expand All @@ -95,7 +101,9 @@ func main() {
httpServerConfig := server.Config{Port: defSvcHttpPort}

if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}

hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, tc, auth, svcName, instanceID), logger)
Expand Down
10 changes: 8 additions & 2 deletions cmd/cassandra-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer csdSession.Close()

tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID)
Expand All @@ -90,7 +92,9 @@ func main() {

httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefix, AltPrefix: envPrefixHttp}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}

// Create new cassandra-writer repo
Expand All @@ -100,7 +104,9 @@ func main() {
// Create new pub sub broker
pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
pubSub = tracing.NewPubSub(tracer, pubSub)
defer pubSub.Close()
Expand Down
6 changes: 5 additions & 1 deletion cmd/certs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,15 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer db.Close()

auth, authHandler, err := authClient.Setup(envPrefix, svcName)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer authHandler.Close()
logger.Info("Successfully connected to auth grpc server " + authHandler.Secure())
Expand Down
4 changes: 3 additions & 1 deletion cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ func main() {
var rootCmd = &cobra.Command{
Use: "mainflux-cli",
PersistentPreRun: func(cmd *cobra.Command, args []string) {
cli.ParseConfig()
if err := cli.ParseConfig(); err != nil {
log.Fatal(err)
}

sdkConf.MsgContentType = sdk.ContentType(msgContentType)
s := sdk.NewSDK(sdkConf)
Expand Down
14 changes: 11 additions & 3 deletions cmd/coap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer tcHandler.Close()
logger.Info("Successfully connected to things grpc server " + tcHandler.Secure())

Expand All @@ -88,7 +90,9 @@ func main() {

nps, err := brokers.NewPubSub(cfg.BrokerURL, "", logger)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
nps = pstracing.NewPubSub(tracer, nps)
defer nps.Close()
Expand All @@ -104,13 +108,17 @@ func main() {

httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(instanceID), logger)

coapServerConfig := server.Config{Port: defSvcCoapPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixCoap, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s CoAP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s CoAP server configuration : %s", svcName, err))
exitCode = 1
return
}
cs := coapserver.New(ctx, cancel, svcName, coapServerConfig, api.MakeCoAPHandler(svc, logger), logger)

Expand Down
10 changes: 8 additions & 2 deletions cmd/http/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer tcHandler.Close()
logger.Info("Successfully connected to things grpc server " + tcHandler.Secure())

Expand All @@ -89,7 +91,9 @@ func main() {

pub, err := brokers.NewPublisher(cfg.BrokerURL)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
pub = pstracing.New(tracer, pub)
defer pub.Close()
Expand All @@ -98,7 +102,9 @@ func main() {

httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, instanceID), logger)

Expand Down
18 changes: 14 additions & 4 deletions cmd/influxdb-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,25 @@ func main() {
if err != nil {
logger.Fatal(err.Error())
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer tcHandler.Close()
logger.Info("Successfully connected to things grpc server " + tcHandler.Secure())

auth, authHandler, err := authClient.Setup(envPrefix, svcName)
if err != nil {
logger.Fatal(err.Error())
logger.Error(err.Error())
exitCode = 1
return
}
defer authHandler.Close()
logger.Info("Successfully connected to auth grpc server " + authHandler.Secure())

influxDBConfig := influxDBClient.Config{}
if err := env.Parse(&influxDBConfig, env.Options{Prefix: envPrefixInfluxdb}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load InfluxDB client configuration from environment variable : %s", err))
logger.Error(fmt.Sprintf("failed to load InfluxDB client configuration from environment variable : %s", err))
exitCode = 1
return
}
influxDBConfig.DBUrl = fmt.Sprintf("%s://%s:%s", influxDBConfig.Protocol, influxDBConfig.Host, influxDBConfig.Port)

Expand All @@ -92,15 +98,19 @@ func main() {

client, err := influxDBClient.Connect(influxDBConfig, ctx)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to InfluxDB : %s", err))
logger.Error(fmt.Sprintf("failed to connect to InfluxDB : %s", err))
exitCode = 1
return
}
defer client.Close()

repo := newService(client, repocfg, logger)

httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, tc, auth, svcName, instanceID), logger)

Expand Down
22 changes: 17 additions & 5 deletions cmd/influxdb-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func main() {
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger: %s", err))
}
var exitCode int
defer mflog.ExitWithError(&exitCode)
defer func() {
if err := tp.Shutdown(ctx); err != nil {
logger.Error(fmt.Sprintf("Error shutting down tracer provider: %v", err))
Expand All @@ -80,14 +82,18 @@ func main() {

pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err))
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
pubSub = tracing.NewPubSub(tracer, pubSub)
defer pubSub.Close()

influxDBConfig := influxDBClient.Config{}
if err := env.Parse(&influxDBConfig, env.Options{Prefix: envPrefixInfluxdb}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load InfluxDB client configuration from environment variable : %s", err))
logger.Error(fmt.Sprintf("failed to load InfluxDB client configuration from environment variable : %s", err))
exitCode = 1
return
}
influxDBConfig.DBUrl = fmt.Sprintf("%s://%s:%s", influxDBConfig.Protocol, influxDBConfig.Host, influxDBConfig.Port)

Expand All @@ -98,13 +104,17 @@ func main() {

client, err := influxDBClient.Connect(influxDBConfig, ctx)
if err != nil {
logger.Fatal(fmt.Sprintf("failed to connect to InfluxDB : %s", err))
logger.Error(fmt.Sprintf("failed to connect to InfluxDB : %s", err))
exitCode = 1
return
}
defer client.Close()

httpServerConfig := server.Config{Port: defSvcHttpPort}
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
logger.Error(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
exitCode = 1
return
}

repo := influxdb.NewAsync(client, repocfg)
Expand All @@ -120,7 +130,9 @@ func main() {
}(logger)

if err := consumers.Start(ctx, svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil {
logger.Fatal(fmt.Sprintf("failed to start InfluxDB writer: %s", err))
logger.Error(fmt.Sprintf("failed to start InfluxDB writer: %s", err))
exitCode = 1
return
}

hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svcName, instanceID), logger)
Expand Down
Loading

0 comments on commit 7758f42

Please sign in to comment.