Skip to content

Commit

Permalink
style: run golangci-lint
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian committed Oct 19, 2020
1 parent 3e00ff1 commit c1a88b9
Show file tree
Hide file tree
Showing 34 changed files with 248 additions and 201 deletions.
47 changes: 31 additions & 16 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,38 @@ linters-settings:


linters:
enable:
- goimports
- golint
- gosimple
- structcheck
- deadcode
- staticcheck
- errcheck
- unused
- gosec
- dupl
- gocyclo
- misspell
- unparam
enable-all: false
# enable:
# - goimports
# - golint
# - gosimple
# - structcheck
# - deadcode
# - staticcheck
# - errcheck
# - unused
# - gosec
# - dupl
# - gocyclo
# - misspell
# - unparam
enable-all: true
disable:
disable-all: true
- nlreturn
- wsl
- nestif
- gochecknoglobals
- godot
- gomnd
- exhaustive
- nakedret
- funlen
- lll
- gocognit
- goerr113
- gochecknoinits
- gci
- godox
- gofumpt
presets:
fast: false

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.13.6-alpine3.11 as builder
FROM golang:1.15.3-alpine3.12 as builder
LABEL stage=intermediate

#compile linux only
Expand Down
4 changes: 2 additions & 2 deletions auth/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/VolantMQ/vlapi/vlauth"
)

// Manager auth
// Manager auth manager
type Manager struct {
p []vlauth.IFace
}
Expand Down Expand Up @@ -53,7 +53,7 @@ func NewManager(p []string) (*Manager, error) {
// Password authentication
func (m *Manager) Password(clientID, user, password string) (vlauth.Permissions, error) {
for _, p := range m.p {
if status := p.Password(clientID, user, password); status == vlauth.StatusAllow {
if status := p.Password(clientID, user, password); errors.Is(status, vlauth.StatusAllow) {
return p, status
}
}
Expand Down
20 changes: 11 additions & 9 deletions auth/manager_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package auth
package auth_test

import (
"crypto/sha256"
Expand All @@ -7,6 +7,8 @@ import (

"github.com/VolantMQ/vlapi/vlauth"
"github.com/stretchr/testify/require"

"github.com/VolantMQ/volantmq/auth"
)

type testAuth struct {
Expand Down Expand Up @@ -49,31 +51,31 @@ func (a *testAuth) Shutdown() error {
}

func TestAuthRegister(t *testing.T) {
err := Register("basic", newSimpleAuth())
err := auth.Register("basic", newSimpleAuth())
require.NoError(t, err)
}

func TestAuthRegisterDupe(t *testing.T) {
err := Register("basic", newSimpleAuth())
err := auth.Register("basic", newSimpleAuth())
require.Error(t, err, "already exists")
}

func TestAuthRegisterInvalidArgs(t *testing.T) {
err := Register("", newSimpleAuth())
err := auth.Register("", newSimpleAuth())
require.Error(t, err, "invalid args")

err = Register("basic", nil)
err = auth.Register("basic", nil)
require.Error(t, err, "invalid args")
}

func TestNewManagerUnknownProvider(t *testing.T) {
p, err := NewManager([]string{"bla"})
p, err := auth.NewManager([]string{"bla"})
require.Error(t, err)
require.Nil(t, p)
}

func TestNewManagerKnownProvider(t *testing.T) {
p, err := NewManager([]string{"basic"})
p, err := auth.NewManager([]string{"basic"})
require.NoError(t, err)
require.NotNil(t, p)

Expand All @@ -97,9 +99,9 @@ func TestNewManagerKnownProvider(t *testing.T) {
}

func TestAuthUnregister(t *testing.T) {
UnRegister("basic")
auth.UnRegister("basic")

p, err := NewManager([]string{"basic"})
p, err := auth.NewManager([]string{"basic"})
require.Error(t, err)
require.Nil(t, p)
}
5 changes: 3 additions & 2 deletions clients/expiry.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *expiry) start() {
// if will delay is set before and value less than expiration
// then timer should fire 2 times
if (timerPeriod > 0) && (timerPeriod < *s.expireIn) {
*s.expireIn = *s.expireIn - timerPeriod
*s.expireIn -= timerPeriod
} else {
timerPeriod = *s.expireIn
*s.expireIn = 0
Expand Down Expand Up @@ -101,11 +101,12 @@ func (s *expiry) timerCallback() {
// 1. check for will message available
if s.will != nil {
// publish if exists and wipe state
_ = s.messenger.Publish(s.will) // nolint: errcheck
_ = s.messenger.Publish(s.will)
s.will = nil
s.willIn = 0
}

// nolint: godox, gocritic
if s.expireIn == nil {
// 2.a session has processed delayed will and there is nothing to do
// completely shutdown the session
Expand Down
8 changes: 4 additions & 4 deletions clients/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/VolantMQ/vlapi/vlpersistence"
"github.com/VolantMQ/vlapi/vlsubscriber"
"github.com/VolantMQ/vlapi/vltypes"
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/VolantMQ/volantmq/configuration"
Expand Down Expand Up @@ -38,7 +39,7 @@ type sessionConfig struct {
will *mqttp.Publish
expireIn *uint32
durable bool
sharedSubscriptions bool // nolint:structcheck
sharedSubscriptions bool
subscriptionIDAllowed bool
version mqttp.ProtocolVersion
}
Expand Down Expand Up @@ -145,7 +146,7 @@ func (s *session) SignalSubscribe(pkt *mqttp.Subscribe) (mqttp.IFace, error) {

var reason mqttp.ReasonCode

if e := s.permissions.ACL(s.id, s.username, t.Filter(), vlauth.AccessRead); e == vlauth.StatusAllow {
if e := s.permissions.ACL(s.id, s.username, t.Filter(), vlauth.AccessRead); errors.Is(e, vlauth.StatusAllow) {
params := vlsubscriber.SubscriptionParams{
ID: subsID,
Ops: t.Ops(),
Expand Down Expand Up @@ -207,7 +208,7 @@ func (s *session) SignalUnSubscribe(pkt *mqttp.UnSubscribe) (mqttp.IFace, error)

_ = pkt.ForEachTopic(func(t *mqttp.Topic) error {
reason := mqttp.CodeSuccess
if e := s.permissions.ACL(s.id, s.username, t.Full(), vlauth.AccessRead); e == vlauth.StatusAllow {
if e := s.permissions.ACL(s.id, s.username, t.Full(), vlauth.AccessRead); errors.Is(e, vlauth.StatusAllow) {
if e = s.subscriber.UnSubscribe(t.Full()); e != nil {
s.log.Error("unsubscribe from topic", zap.String("clientId", s.id), zap.Error(e))
reason = mqttp.CodeNoSubscriptionExisted
Expand Down Expand Up @@ -252,7 +253,6 @@ func (s *session) SignalDisconnect(pkt *mqttp.Disconnect) error {
var err error

if s.version == mqttp.ProtocolV50 {
// FIXME: CodeRefusedBadUsernameOrPassword has same id as CodeDisconnectWithWill
if pkt.ReasonCode() != mqttp.CodeRefusedBadUsernameOrPassword {
s.will = nil
}
Expand Down
27 changes: 14 additions & 13 deletions clients/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type containerInfo struct {

type loadContext struct {
bar *mpb.Bar
startTs time.Time
startTS time.Time
preloadConfigs map[string]*preloadConfig
delayedWills []mqttp.IFace
}
Expand Down Expand Up @@ -257,7 +257,7 @@ func (m *Manager) LoadSession(context interface{}, id []byte, state *vlpersisten
ctx := context.(*loadContext)

defer func() {
ctx.bar.IncrBy(1, time.Since(ctx.startTs))
ctx.bar.IncrBy(1, time.Since(ctx.startTS))
}()

if len(state.Errors) != 0 {
Expand All @@ -277,7 +277,7 @@ func (m *Manager) LoadSession(context interface{}, id []byte, state *vlpersisten

if err = m.decodeSubscriber(ctx, sID, state.Subscriptions); err != nil {
m.log.Error("Decode subscriber", zap.String("ClientID", sID), zap.Error(err))
if err = m.persistence.SubscriptionsDelete(id); err != nil && err != vlpersistence.ErrNotFound {
if err = m.persistence.SubscriptionsDelete(id); err != nil && !errors.Is(err, vlpersistence.ErrNotFound) {
m.log.Error("Persisted subscriber delete", zap.Error(err))
}
}
Expand Down Expand Up @@ -365,7 +365,7 @@ func (m *Manager) processConnect(params *connection.ConnectParams, authMngr *aut
_ = pkt.SetReturnCode(e)
resp = pkt
}
return nil, nil, params.Error
return resp, nil, params.Error
}

if allowed, ok := m.allowedVersions[params.Version]; !ok || !allowed {
Expand All @@ -383,7 +383,7 @@ func (m *Manager) processConnect(params *connection.ConnectParams, authMngr *aut
} else {
var reason mqttp.ReasonCode

if perm, status := authMngr.Password(params.ID, string(params.Username), string(params.Password)); status == vlauth.StatusAllow {
if perm, status := authMngr.Password(params.ID, string(params.Username), string(params.Password)); errors.Is(status, vlauth.StatusAllow) {
reason = mqttp.CodeSuccess
acl = perm
} else {
Expand Down Expand Up @@ -420,10 +420,7 @@ func (m *Manager) newSession(cn connection.Initial, params *connection.ConnectPa
}
}

if cn.Acknowledge(ack,
connection.KeepAlive(keepAlive),
connection.Permissions(acl)) == nil {

if cn.Acknowledge(ack, connection.KeepAlive(keepAlive), connection.Permissions(acl)) == nil {
ses.start()

m.Metrics.Clients().OnConnected()
Expand Down Expand Up @@ -537,7 +534,11 @@ func (m *Manager) allocContainer(id string, username string, acl vlauth.Permissi
return cont
}

func (m *Manager) loadContainer(cn connection.Session, params *connection.ConnectParams, acl vlauth.Permissions) (cont *containerInfo, err error) {
func (m *Manager) loadContainer(
cn connection.Session,
params *connection.ConnectParams,
acl vlauth.Permissions,
) (cont *containerInfo, err error) {
newContainer := m.allocContainer(params.ID, string(params.Username), acl, time.Now(), cn)

// search for existing container with given id
Expand Down Expand Up @@ -627,7 +628,7 @@ func (m *Manager) loadContainer(cn connection.Session, params *connection.Connec
if params.CleanStart && persisted {
params.Cleaned = true
persisted = false
if err = m.persistence.Delete([]byte(params.ID)); err != nil && err != vlpersistence.ErrNotFound {
if err = m.persistence.Delete([]byte(params.ID)); err != nil && !errors.Is(err, vlpersistence.ErrNotFound) {
m.log.Error("Couldn't wipe session", zap.String("clientId", params.ID), zap.Error(err))
}

Expand Down Expand Up @@ -856,7 +857,7 @@ func (m *Manager) decodeSessionExpiry(ctx *loadContext, id string, state *vlpers
since, err = time.Parse(time.RFC3339, state.Expire.Since)
if err != nil {
m.log.Error("parse expiration value", zap.String("clientId", id), zap.Error(err))
if e := m.persistence.SubscriptionsDelete([]byte(id)); e != nil && e != vlpersistence.ErrNotFound {
if e := m.persistence.SubscriptionsDelete([]byte(id)); e != nil && !errors.Is(e, vlpersistence.ErrNotFound) {
m.log.Error("Persisted subscriber delete", zap.Error(e))
}

Expand Down Expand Up @@ -892,7 +893,7 @@ func (m *Manager) decodeSessionExpiry(ctx *loadContext, id string, state *vlpers

if time.Now().After(expireAt) {
// persisted session has expired, wipe it
if err = m.persistence.Delete([]byte(id)); err != nil && err != vlpersistence.ErrNotFound {
if err = m.persistence.Delete([]byte(id)); err != nil && !errors.Is(err, vlpersistence.ErrNotFound) {
m.log.Error("Delete expired session", zap.Error(err))
}
return nil
Expand Down
12 changes: 4 additions & 8 deletions cmd/volantmq/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,14 @@ func newSimpleAuth(cfg authConfig) (*simpleAuth, error) {

if entry.ACL.Read == "" {
acl.read = s.read
} else {
if acl.read, e = regexp.Compile(entry.ACL.Read); e != nil {
return e
}
} else if acl.read, e = regexp.Compile(entry.ACL.Read); e != nil {
return e
}

if entry.ACL.Write == "" {
acl.read = s.write
} else {
if acl.write, e = regexp.Compile(entry.ACL.Write); e != nil {
return e
}
} else if acl.write, e = regexp.Compile(entry.ACL.Write); e != nil {
return e
}

s.creds[entry.User] = creds{
Expand Down
15 changes: 5 additions & 10 deletions cmd/volantmq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"net/http"
"os"
Expand All @@ -19,6 +18,7 @@ import (
"github.com/VolantMQ/vlapi/vlsubscriber"
"github.com/VolantMQ/vlapi/vltypes"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/troian/healthcheck"
persistenceMem "gitlab.com/VolantMQ/vlplugin/persistence/mem"
"go.uber.org/zap"
Expand Down Expand Up @@ -157,15 +157,15 @@ func loadMqttListeners(defaultAuth *auth.Manager, lCfg *configuration.ListenersC

if cfg.TLS != nil {
if _, err := cfg.TLS.Validate(); err != nil {
return nil, fmt.Errorf("listeners.mqtt.wss: %w", err)
return nil, errors.Wrap(err, "listeners.mqtt.wss")
}
configWs.CertFile = cfg.TLS.Cert
configWs.KeyFile = cfg.TLS.Key
}

listeners = append(listeners, configWs)
default:
return nil, fmt.Errorf("unknown mqtt listener type %s", name)
return nil, errors.Errorf("unknown mqtt listener type %s", name)
}
}
}
Expand Down Expand Up @@ -197,6 +197,7 @@ func (ctx *appContext) loadPlugins(cfg *configuration.PluginsConfig) {

if len(plugins) > 0 {
for name, pl := range plugins {
// nolint: gocritic
if len(pl.Errors) > 0 {
logger.Info("\t", name, pl.Errors)
} else if pl.Plugin == nil {
Expand All @@ -222,12 +223,6 @@ func (ctx *appContext) loadPlugins(cfg *configuration.PluginsConfig) {
}

func configureSimpleAuth(cfg interface{}) (vlauth.IFace, error) {
// authConfig, err := vltypes.NormalizeConfig(cfg)
// if err != nil {
// return nil, err
// }
//

var c authConfig
var err error
if err = mapstructure.Decode(cfg, &c); err != nil {
Expand Down Expand Up @@ -553,7 +548,7 @@ func (ctx *appContext) configurePlugin(pl vlplugin.Plugin, c interface{}) (inter
var plObject interface{}

if plObject, err = pl.Load(c, sysParams); err != nil {
return nil, errors.New(name + ": acquire failed : " + err.Error())
return nil, errors.Wrapf(err, "%s: acquire failed", name)
}

ctx.plugins.configured = append(ctx.plugins.configured, plObject)
Expand Down
Loading

0 comments on commit c1a88b9

Please sign in to comment.