Skip to content

Commit

Permalink
Merge pull request #3 from dusanb94/optimize-logs
Browse files Browse the repository at this point in the history
NOISSUE - Optimize logs output
  • Loading branch information
drasko authored Feb 25, 2020
2 parents 5d4b727 + 0cd1ae0 commit f520546
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 34 deletions.
14 changes: 6 additions & 8 deletions examples/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,26 @@ func (e *Event) AuthPublish(username, clientID string, topic *string, payload *[
// AuthSubscribe is called on device publish,
// prior forwarding to the MQTT broker
func (e *Event) AuthSubscribe(username, clientID string, topics *[]string) error {

e.logger.Info(fmt.Sprintf("AuthSubscribe() - clientID: %s, topics: %s", clientID, strings.Join(*topics, ",")))
return nil
}

// Register - after client sucesfully connected
// Register - after client successfully connected
func (e *Event) Register(clientID string) {
e.logger.Info(fmt.Sprintf("Register() - clientID: %s", clientID))
}

// Publish - after client sucesfully published
// Publish - after client successfully published
func (e *Event) Publish(clientID, topic string, payload []byte) {
e.logger.Info(fmt.Sprintf("Publish() - clientID: %s, topic: %s, payload: %s", clientID, topic, string(payload)))
}

// Subscribe - after client sucesfully subscribed
// Subscribe - after client successfully subscribed
func (e *Event) Subscribe(clientID string, topics []string) {
e.logger.Info(fmt.Sprintf("Subscribe() - clientID: %s, topics: %s", clientID, strings.Join(topics, ",")))
}

// Unubscribe - after client unsubscribed
func (e *Event) Unubscribe(clientID string, topics []string) {

e.logger.Info(fmt.Sprintf("Unubscribe() - clientID: %s, topics: %s", clientID, strings.Join(topics, ",")))
// Unsubscribe - after client unsubscribed
func (e *Event) Unsubscribe(clientID string, topics []string) {
e.logger.Info(fmt.Sprintf("Unsubscribe() - clientID: %s, topics: %s", clientID, strings.Join(topics, ",")))
}
14 changes: 7 additions & 7 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@ package events

// Event is an interface for mProxy hooks
type Event interface {
// Athorization on client `CONNECT`
// Authorization on client `CONNECT`
// Each of the params are passed by reference, so that it can be changed
AuthRegister(username, clientID *string, password *[]byte) error

// Athorization on client `PUBLISH`
// Authorization on client `PUBLISH`
// Topic is passed by reference, so that it can be modified
AuthPublish(username, clientID string, topic *string, payload *[]byte) error

// Athorization on client `SUBSCRIBE`
// Authorization on client `SUBSCRIBE`
// Topics are passed by reference, so that they can be modified
AuthSubscribe(username, clientID string, topics *[]string) error

// After client sucesfully connected
// After client successfully connected
Register(clientID string)

// After client sucesfully published
// After client successfully published
Publish(clientID, topic string, payload []byte)

// After client sucesfully subscribed
// After client successfully subscribed
Subscribe(clientID string, topics []string)

// After client unsubscribed
Unubscribe(clientID string, topics []string)
Unsubscribe(clientID string, topics []string)
}
33 changes: 15 additions & 18 deletions pkg/mqtt/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,29 @@ import (

// Proxy is main MQTT proxy struct
type Proxy struct {
host string
port string
targetHost string
targetPort string
event events.Event
logger logger.Logger
host string
port string
target string
event events.Event
logger logger.Logger
}

// New will setup a new Proxy struct after parsing the options
func New(host, port, targetHost, targetPort string, event events.Event, logger logger.Logger) *Proxy {
return &Proxy{
host: host,
port: port,
targetHost: targetHost,
targetPort: targetPort,
event: event,
logger: logger,
host: host,
port: port,
target: fmt.Sprintf("%s:%s", targetHost, targetPort),
event: event,
logger: logger,
}
}

func (p *Proxy) accept(l net.Listener) {
for {
conn, err := l.Accept()
if err != nil {
p.logger.Warn(fmt.Sprintf("Accept error %s", err))
p.logger.Warn("Accept error " + err.Error())
continue
}

Expand All @@ -48,10 +46,9 @@ func (p *Proxy) accept(l net.Listener) {
func (p *Proxy) handleConnection(inbound net.Conn) {
defer inbound.Close()

addr := fmt.Sprintf("%s:%s", p.targetHost, p.targetPort)
outbound, err := net.Dial("tcp", addr)
outbound, err := net.Dial("tcp", p.target)
if err != nil {
p.logger.Error(fmt.Sprintf("Cannot connect to remote broker %s", addr))
p.logger.Error("Cannot connect to remote broker " + p.target)
return
}
defer outbound.Close()
Expand All @@ -63,9 +60,9 @@ func (p *Proxy) handleConnection(inbound net.Conn) {

s := newSession(uuid.String(), inbound, outbound, p.event, p.logger)
if err := s.stream(); err != io.EOF {
p.logger.Warn(fmt.Sprintf("Exited session %s with error: %s", s.id, err))
p.logger.Warn("Exited session " + s.id + "with error: " + err.Error())
}
s.logger.Info(fmt.Sprintf("Session %s closed: %s", s.id, s.outbound.LocalAddr().String()))
s.logger.Info("Session " + s.id + "closed: " + s.outbound.LocalAddr().String())
}

// Proxy of the server, this will block.
Expand Down
2 changes: 1 addition & 1 deletion pkg/mqtt/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (s *session) notify(pkt packets.ControlPacket) {
case *packets.SubscribePacket:
s.event.Subscribe(s.client.ID, p.Topics)
case *packets.UnsubscribePacket:
s.event.Unubscribe(s.client.ID, p.Topics)
s.event.Unsubscribe(s.client.ID, p.Topics)
default:
return
}
Expand Down

0 comments on commit f520546

Please sign in to comment.