Skip to content

Commit

Permalink
Tons of fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
troian committed Dec 8, 2017
1 parent 34d2e1e commit b359ea5
Show file tree
Hide file tree
Showing 64 changed files with 2,545 additions and 1,979 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ _testmain.go
persistence/examples/bolt/test.db
examples/tcp/persist.db
coverage.txt
vendor
vendor

persist.db
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
- repo: https://github.com/troian/pre-commit-golang
sha: c43dbbf0704e4c722e2b64672aaee689145e17a6
hooks:
- id: go-imports
- id: go-build
- id: go-metalinter
args:
Expand All @@ -11,3 +10,5 @@
- --dupl-threshold=100
- --disable=gotype
- --disable=govendor
- --vendor
- --enable=goimports
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
os: linux
script:
- go build -race -v -gcflags "-N -l" ./examples/...
- gometalinter --fast --exclude=corefoundation.go --deadline=360s --enable-gc --sort=path --vendor --cyclo-over=40 --dupl-threshold=100 --disable=gotype ./...
- gometalinter --fast --exclude=corefoundation.go --deadline=360s --enable-gc --sort=path --vendor --cyclo-over=40 --dupl-threshold=100 --disable=gotype --vendor --enable=goimports./...
- ./go.test.sh
after_success:
- bash <(curl -s https://codecov.io/bash)
Expand Down
9 changes: 9 additions & 0 deletions auth/basic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package auth

type Simple interface {
Password(u, p string) Status
}

type Anonymous interface {
Is() Status
}
4 changes: 2 additions & 2 deletions auth/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ var providers = make(map[string]Provider)
// Register auth provider
func Register(name string, provider Provider) error {
if name == "" && provider == nil {
return errors.New("Invalid args")
return errors.New("invalid args")
}

if _, dup := providers[name]; dup {
return errors.New("Already exists")
return errors.New("already exists")
}

providers[name] = provider
Expand Down
52 changes: 52 additions & 0 deletions clients/container.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package clients

import (
"sync"
"sync/atomic"

"github.com/VolantMQ/volantmq/subscriber"
)

type container struct {
lock sync.Mutex
ses atomic.Value
expiry atomic.Value
sub *subscriber.Type
}

func (s *container) acquire() {
s.lock.Lock()
}

func (s *container) release() {
s.lock.Unlock()
}

func (s *container) session() *session {
return s.ses.Load().(*session)
}

func (s *container) swap(w *container) *container {
s.ses = w.ses

ses := s.ses.Load().(*session)
ses.idLock = &s.lock

return s
}

func (s *container) subscriber(cleanStart bool, c subscriber.Config) (*subscriber.Type, bool) {
if cleanStart && s.sub != nil {
s.sub.Offline(true)
s.sub = nil
}

if s.sub == nil {
s.sub = subscriber.New(c)
cleanStart = true
} else {
cleanStart = false
}

return s.sub, !cleanStart
}
99 changes: 99 additions & 0 deletions clients/expiry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package clients

import (
"sync"
"time"

"github.com/VolantMQ/volantmq/packet"
"github.com/VolantMQ/volantmq/types"
)

type expiryEvent interface {
sessionTimer(string, bool)
}

type expiryConfig struct {
expiryEvent
id string
createdAt time.Time
messenger types.TopicMessenger
will *packet.Publish
expireIn *uint32
willDelay uint32
}

type expiry struct {
expiryConfig
expiringSince time.Time
timerLock sync.Mutex
timer *time.Timer
}

func newExpiry(c expiryConfig) *expiry {
return &expiry{
expiryConfig: c,
}
}

func (s *expiry) start() {
var timerPeriod uint32

// if meet will requirements point that
if s.will != nil && s.willDelay > 0 {
timerPeriod = s.willDelay
} else {
s.will = nil
}

if s.expireIn != nil {
// if will delay is set before and value less than expiration
// then timer should fire 2 times
if (timerPeriod > 0) && (timerPeriod < *s.expireIn) {
*s.expireIn = *s.expireIn - timerPeriod
} else {
timerPeriod = *s.expireIn
*s.expireIn = 0
}
}

s.expiringSince = time.Now()
s.timer = time.NewTimer(time.Duration(timerPeriod) * time.Second)
}

func (s *expiry) cancel() {
if !s.timer.Stop() {
s.timerLock.Lock()
s.timerLock.Unlock() // nolint: megacheck
}
}

func (s *expiry) timerCallback() {
defer s.timerLock.Unlock()
s.timerLock.Lock()

// 1. check for will message available
if s.will != nil {
// publish if exists and wipe state
s.messenger.Publish(s.will) // nolint: errcheck
s.will = nil
s.willDelay = 0
}

if s.expireIn == nil {
// 2.a session has processed delayed will and there is nothing to do
// completely shutdown the session
s.sessionTimer(s.id, false)
} else if *s.expireIn == 0 {
// session has expired. WIPE IT
//if s.subscriber != nil {
// s.shutdownSubscriber(s.subscriber)
//}
s.sessionTimer(s.id, true)
} else {
// restart timer and wait again
val := *s.expireIn
// clear value pointed by expireIn so when next fire comes we signal session is expired
*s.expireIn = 0
s.timer.Reset(time.Duration(val) * time.Second)
}
}
Loading

0 comments on commit b359ea5

Please sign in to comment.