Skip to content

Commit

Permalink
Remove netpool for a while
Browse files Browse the repository at this point in the history
Use rx goroutine per each connection

Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian committed Apr 22, 2018
1 parent 2d6bd59 commit 52606c6
Show file tree
Hide file tree
Showing 25 changed files with 948 additions and 736 deletions.
6 changes: 1 addition & 5 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@ _testmain.go
*.test
*.out

*.sublime-project
*.sublime-workspace

persistence/examples/bolt/test.db
examples/tcp/persist.db
persistence
coverage.txt
vendor

Expand Down
14 changes: 7 additions & 7 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@

[[constraint]]
name = "go.uber.org/zap"
version = "1.7.1"
version = "1.8.0"

[[constraint]]
name = "gopkg.in/yaml.v2"
version = "2.1.1"
version = "2.2.1"

[prune]
go-tests = true
Expand Down
5 changes: 4 additions & 1 deletion auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"crypto/sha256"
"encoding/hex"

"github.com/VolantMQ/vlauth"
)
Expand All @@ -25,7 +26,9 @@ func (a *simpleAuth) addUser(u, p string) {
// nolint: golint
func (a *simpleAuth) Password(user, password string) error {
if hash, ok := a.creds[user]; ok {
if string(sha256.New().Sum([]byte(password))) == hash {
algo := sha256.New()
algo.Write([]byte(password))
if hex.EncodeToString(algo.Sum(nil)) == hash {
return vlauth.StatusAllow
}
}
Expand Down
27 changes: 21 additions & 6 deletions auth/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (

// Manager auth
type Manager struct {
p []vlauth.Iface
p []vlauth.Iface
anonymous bool
}

var providers = make(map[string]vlauth.Iface)
Expand All @@ -35,8 +36,10 @@ func UnRegister(name string) {
}

// NewManager new auth manager
func NewManager(p []string) (*Manager, error) {
m := Manager{}
func NewManager(p []string, allowAnonymous bool) (*Manager, error) {
m := Manager{
anonymous: allowAnonymous,
}

for _, pa := range p {
pvd, ok := providers[pa]
Expand All @@ -50,11 +53,23 @@ func NewManager(p []string) (*Manager, error) {
return &m, nil
}

func (m *Manager) AllowAnonymous() error {
if m.anonymous {
return vlauth.StatusAllow
}

return vlauth.StatusDeny
}

// Password authentication
func (m *Manager) Password(user, password string) error {
for _, p := range m.p {
if status := p.Password(user, password); status == vlauth.StatusAllow {
return status
if user == "" && m.anonymous {
return vlauth.StatusAllow
} else {
for _, p := range m.p {
if status := p.Password(user, password); status == vlauth.StatusAllow {
return status
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions clients/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type container struct {
expiry atomic.Value
sub *subscriber.Type
removable bool
removed bool
}

//func (s *container) shutdown() bool {
Expand All @@ -37,11 +38,14 @@ func (s *container) release() {
}

func (s *container) session() *session {
defer s.rmLock.Unlock()
s.rmLock.Lock()
return s.ses
}

func (s *container) swap(from *container) *container {
s.ses = from.ses
//s.sub = from.sub

s.ses.idLock = &s.lock

Expand Down
105 changes: 42 additions & 63 deletions clients/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/VolantMQ/mqttp"
"github.com/VolantMQ/persistence"
"github.com/VolantMQ/vlauth"
"github.com/VolantMQ/volantmq/configuration"
"github.com/VolantMQ/volantmq/connection"
"github.com/VolantMQ/volantmq/subscriber"
Expand All @@ -27,6 +28,8 @@ type sessionPreConfig struct {
messenger types.TopicMessenger
conn connection.Session
persistence persistence.Packets
permissions vlauth.Permissions
username string
}

type sessionConfig struct {
Expand Down Expand Up @@ -87,51 +90,22 @@ func (s *session) configure(c sessionConfig, clean bool) {
s.subscriber.Online(tmp)
s.persistence.PacketsForEach([]byte(s.id), s.conn)
s.subscriber.Online(s.conn)
s.persistence.PacketsDelete([]byte(s.id))
s.conn.LoadRemaining(tmp.gList, tmp.qList)
} else {
s.subscriber.Online(s.conn)
}
}

func (s *session) start() {
s.conn.Start()
s.idLock.Unlock()
}

func (s *session) stop(reason packet.ReasonCode) {
s.stopReq.Do(func() {
s.conn.Stop(reason)
})

//s.wgDisconnected.Wait()

//state := &persistence.SessionState{
// Timestamp: s.createdAt.Format(time.RFC3339),
//}

//if s.expireIn != nil || (s.willDelay > 0 && s.will != nil) {
// state.Expire = &persistence.SessionDelays{
// Since: s.expiringSince.Format(time.RFC3339),
// }
//
// elapsed := uint32(time.Since(s.expiringSince) / time.Second)
//
// if (s.willDelay > 0 && s.will != nil) && (s.willDelay-elapsed) > 0 {
// s.willDelay = s.willDelay - elapsed
// s.will.SetPacketID(0)
// if buf, err := packet.Encode(s.will); err != nil {
//
// } else {
// state.Expire.WillIn = strconv.Itoa(int(s.willDelay))
// state.Expire.WillData = buf
// }
// }
//
// if s.expireIn != nil && *s.expireIn > 0 && (*s.expireIn-elapsed) > 0 {
// *s.expireIn = *s.expireIn - elapsed
// }
//}

//return state
}

// SignalPublish process PUBLISH packet from client
Expand All @@ -147,9 +121,12 @@ func (s *session) SignalPublish(pkt *packet.Publish) error {
// [MQTT-3.3.1-7]
if pkt.QoS() == packet.QoS0 {
retained := packet.NewPublish(s.version)
retained.SetQoS(pkt.QoS()) // nolint: errcheck
retained.SetTopic(pkt.Topic()) // nolint: errcheck
//s.retained.list = append(s.retained.list, m)
if err := retained.SetQoS(pkt.QoS()); err != nil {
s.log.Error("set retained QoS", zap.String("ClientID", s.id), zap.Error(err))
}
if err := retained.SetTopic(pkt.Topic()); err != nil {
s.log.Error("set retained topic", zap.String("ClientID", s.id), zap.Error(err))
}
}
}

Expand All @@ -173,34 +150,35 @@ func (s *session) SignalSubscribe(pkt *packet.Subscribe) (packet.Provider, error

pkt.RangeTopics(func(t string, ops packet.SubscriptionOptions) {
reason := packet.CodeSuccess // nolint: ineffassign
//authorized := true
// TODO: check permissions here

//if authorized {
subsID := uint32(0)
if err := s.permissions.ACL(s.id, s.username, t, vlauth.AccessRead); err == vlauth.StatusAllow {
subsID := uint32(0)

// V5.0 [MQTT-3.8.2.1.2]
if prop := pkt.PropertyGet(packet.PropertySubscriptionIdentifier); prop != nil {
if v, e := prop.AsInt(); e == nil {
subsID = v
// V5.0 [MQTT-3.8.2.1.2]
if prop := pkt.PropertyGet(packet.PropertySubscriptionIdentifier); prop != nil {
if v, e := prop.AsInt(); e == nil {
subsID = v
}
}
}

subsParams := topicsTypes.SubscriptionParams{
ID: subsID,
Ops: ops,
}
subsParams := topicsTypes.SubscriptionParams{
ID: subsID,
Ops: ops,
}

if grantedQoS, retained, err := s.subscriber.Subscribe(t, &subsParams); err != nil {
if grantedQoS, retained, err := s.subscriber.Subscribe(t, &subsParams); err != nil {
reason = packet.QosFailure
} else {
reason = packet.ReasonCode(grantedQoS)
retainedPublishes = append(retainedPublishes, retained...)
}
} else {
// [MQTT-3.9.3]
if s.version == packet.ProtocolV50 {
reason = packet.CodeUnspecifiedError
reason = packet.CodeNotAuthorized
} else {
reason = packet.QosFailure
}
} else {
reason = packet.ReasonCode(grantedQoS)
retainedPublishes = append(retainedPublishes, retained...)
}

retCodes = append(retCodes, reason)
Expand All @@ -212,9 +190,9 @@ func (s *session) SignalSubscribe(pkt *packet.Subscribe) (packet.Provider, error

// Now put retained messages into publish queue
for _, rp := range retainedPublishes {
if pkt, err := rp.Clone(s.version); err == nil {
pkt.SetRetain(true)
s.conn.Publish(s.id, pkt)
if p, err := rp.Clone(s.version); err == nil {
p.SetRetain(true)
s.conn.Publish(s.id, p)
} else {
s.log.Error("Couldn't clone PUBLISH message", zap.String("ClientID", s.id), zap.Error(err))
}
Expand All @@ -228,12 +206,9 @@ func (s *session) SignalUnSubscribe(pkt *packet.UnSubscribe) (packet.Provider, e
var retCodes []packet.ReasonCode

for _, t := range pkt.Topics() {
// TODO: check permissions here
authorized := true
reason := packet.CodeSuccess

if authorized {
if err := s.subscriber.UnSubscribe(t); err != nil {
if err := s.permissions.ACL(s.id, s.username, t, vlauth.AccessRead); err == vlauth.StatusAllow {
if err = s.subscriber.UnSubscribe(t); err != nil {
s.log.Error("Couldn't unsubscribe from topic", zap.Error(err))
reason = packet.CodeNoSubscriptionExisted
}
Expand All @@ -249,7 +224,9 @@ func (s *session) SignalUnSubscribe(pkt *packet.UnSubscribe) (packet.Provider, e

id, _ := pkt.ID()
resp.SetPacketID(id)
resp.AddReturnCodes(retCodes) // nolint: errcheck
if err := resp.AddReturnCodes(retCodes); err != nil {
s.log.Error("unsubscribe set return codes", zap.String("ClientID", s.id), zap.Error(err))
}

return resp, nil
}
Expand Down Expand Up @@ -310,12 +287,14 @@ func (s *session) SignalConnectionClose(params connection.DisconnectParams) {
s.connectionClosed(s.id, params.Reason)

if s.durable && len(params.Packets) > 0 {
s.persistence.PacketsStore([]byte(s.id), params.Packets)
if err := s.persistence.PacketsStore([]byte(s.id), params.Packets); err != nil {
s.log.Error("persisting packets", zap.String("ClientID", s.id), zap.Error(err))
}
}

keepContainer := s.durable && s.subscriber.HasSubscriptions()

if !s.durable || !s.subscriber.HasSubscriptions() {
if !keepContainer {
s.subscriberShutdown(s.id, s.subscriber)
s.subscriber = nil
}
Expand Down
Loading

0 comments on commit 52606c6

Please sign in to comment.