Skip to content

Commit

Permalink
Improve packet pack performance (#91)
Browse files Browse the repository at this point in the history
* Add benchmark
* Improve packet pack performance
  • Loading branch information
at-wat authored Feb 7, 2020
1 parent 787af7a commit 6cf9aa0
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 39 deletions.
15 changes: 8 additions & 7 deletions connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ type pktConnect struct {
Will *Message
}

func (p *pktConnect) pack() []byte {
payload := packString(p.ClientID)
func (p *pktConnect) Pack() []byte {
payload := make([]byte, 0, packetBufferCap)
payload = appendString(payload, p.ClientID)

var flag byte
if p.CleanSession {
Expand All @@ -71,16 +72,16 @@ func (p *pktConnect) pack() []byte {
if p.Will.Retain {
flag |= byte(connectFlagWillRetain)
}
payload = append(payload, packString(p.Will.Topic)...)
payload = append(payload, packBytes(p.Will.Payload)...)
payload = appendString(payload, p.Will.Topic)
payload = appendBytes(payload, p.Will.Payload)
}
if p.UserName != "" {
flag |= byte(connectFlagUserName)
payload = append(payload, packString(p.UserName)...)
payload = appendString(payload, p.UserName)
}
if p.Password != "" {
flag |= byte(connectFlagPassword)
payload = append(payload, packString(p.Password)...)
payload = appendString(payload, p.Password)
}
return pack(
packetConnect.b(),
Expand Down Expand Up @@ -143,7 +144,7 @@ func (c *BaseClient) Connect(ctx context.Context, clientID string, opts ...Conne
UserName: o.UserName,
Password: o.Password,
Will: o.Will,
}).pack()
}).Pack()

if err := c.write(pkt); err != nil {
return false, wrapError(err, "sending CONNECT")
Expand Down
35 changes: 25 additions & 10 deletions packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,25 +115,38 @@ func remainingLength(n int) []byte {
panic("remaining length overflow")
}

func packString(s string) []byte {
return packBytes([]byte(s))
func appendString(b []byte, s string) []byte {
return appendBytes(b, []byte(s))
}

func packBytes(s []byte) []byte {
func appendBytes(b, s []byte) []byte {
n := len(s)
if n > 0xFFFF {
panic("string length overflow")
}
ret := packUint16(uint16(n))
ret = append(ret, s...)
return ret
b = appendUint16(b, uint16(n))
return append(b, s...)
}

func packUint16(v uint16) []byte {
return []byte{
byte(v >> 8),
func appendUint16(b []byte, v uint16) []byte {
return append(b,
byte(v>>8),
byte(v),
}
)
}

func packString(s string) []byte {
return packBytes([]byte(s))
}

func packBytes(s []byte) []byte {
ret := make([]byte, 0, len(s)+2)
return appendBytes(ret, s)
}

func packUint16(v uint16) []byte {
ret := make([]byte, 0, 2)
return appendUint16(ret, v)
}

func unpackUint16(b []byte) (int, uint16) {
Expand All @@ -158,3 +171,5 @@ func unpackString(b []byte) (int, string, error) {
}
return int(n) + nHeader, string(rs), nil
}

const packetBufferCap = 256
29 changes: 29 additions & 0 deletions packet_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package mqtt

import (
"testing"
)

func BenchmarkPack(b *testing.B) {
packets := map[string]interface{ Pack() []byte }{
"Connect": &pktConnect{
ProtocolLevel: ProtocolLevel4, ClientID: "client",
UserName: "user", Password: "pass",
Will: &Message{Topic: "topic", Payload: make([]byte, 128)},
},
"PubAck": &pktPubAck{},
"PubComp": &pktPubComp{},
"Publish": &pktPublish{&Message{Topic: "topic", Payload: make([]byte, 128)}},
"PubRec": &pktPubRec{},
"PubRel": &pktPubRel{},
"Subscribe": &pktSubscribe{Subscriptions: []Subscription{{Topic: "topic"}}},
"Unsubscribe": &pktUnsubscribe{Topics: []string{"topic"}},
}
for name, p := range packets {
b.Run(name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = p.Pack()
}
})
}
}
2 changes: 1 addition & 1 deletion puback.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (p *pktPubAck) Parse(flag byte, contents []byte) (*pktPubAck, error) {
return p, nil
}

func (p *pktPubAck) pack() []byte {
func (p *pktPubAck) Pack() []byte {
return pack(
packetPubAck.b(),
packUint16(p.ID),
Expand Down
2 changes: 1 addition & 1 deletion pubcomp.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (p *pktPubComp) Parse(flag byte, contents []byte) (*pktPubComp, error) {
return p, nil
}

func (p *pktPubComp) pack() []byte {
func (p *pktPubComp) Pack() []byte {
return pack(
packetPubComp.b(),
packUint16(p.ID),
Expand Down
11 changes: 6 additions & 5 deletions publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (p *pktPublish) Parse(flag byte, contents []byte) (*pktPublish, error) {
return p, nil
}

func (p *pktPublish) pack() []byte {
func (p *pktPublish) Pack() []byte {
pktHeader := packetPublish.b()

if p.Message.Retain {
Expand All @@ -88,9 +88,10 @@ func (p *pktPublish) pack() []byte {
pktHeader |= byte(publishFlagDup)
}

header := packString(p.Message.Topic)
header := make([]byte, 0, packetBufferCap)
header = appendString(header, p.Message.Topic)
if p.Message.QoS != QoS0 {
header = append(header, packUint16(p.Message.ID)...)
header = appendUint16(header, p.Message.ID)
}

return pack(
Expand Down Expand Up @@ -137,7 +138,7 @@ func (c *BaseClient) Publish(ctx context.Context, message *Message) error {
c.sig.mu.Unlock()
}

pkt := (&pktPublish{Message: message}).pack()
pkt := (&pktPublish{Message: message}).Pack()
if err := c.write(pkt); err != nil {
return wrapError(err, "sending PUBLISH")
}
Expand All @@ -158,7 +159,7 @@ func (c *BaseClient) Publish(ctx context.Context, message *Message) error {
return ctx.Err()
case <-chPubRec:
}
pktPubRel := (&pktPubRel{ID: message.ID}).pack()
pktPubRel := (&pktPubRel{ID: message.ID}).Pack()
if err := c.write(pktPubRel); err != nil {
return wrapError(err, "sending PUBREL")
}
Expand Down
2 changes: 1 addition & 1 deletion pubrec.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (p *pktPubRec) Parse(flag byte, contents []byte) (*pktPubRec, error) {
return p, nil
}

func (p *pktPubRec) pack() []byte {
func (p *pktPubRec) Pack() []byte {
return pack(
packetPubRec.b(),
packUint16(p.ID),
Expand Down
2 changes: 1 addition & 1 deletion pubrel.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (p *pktPubRel) Parse(flag byte, contents []byte) (*pktPubRel, error) {
return p, nil
}

func (p *pktPubRel) pack() []byte {
func (p *pktPubRel) Pack() []byte {
return pack(
packetPubRel.b()|packetFromClient.b(),
packUint16(p.ID),
Expand Down
6 changes: 3 additions & 3 deletions serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ func (c *BaseClient) serve() error {
if handler != nil {
handler.Serve(publish.Message)
}
pktPubAck := (&pktPubAck{ID: publish.Message.ID}).pack()
pktPubAck := (&pktPubAck{ID: publish.Message.ID}).Pack()
if err := c.write(pktPubAck); err != nil {
return wrapError(err, "sending PUBACK")
}
case QoS2:
pktPubRec := (&pktPubRec{ID: publish.Message.ID}).pack()
pktPubRec := (&pktPubRec{ID: publish.Message.ID}).Pack()
if err := c.write(pktPubRec); err != nil {
return wrapError(err, "sending PUBREC")
}
Expand Down Expand Up @@ -133,7 +133,7 @@ func (c *BaseClient) serve() error {
delete(subBuffer, pubRel.ID)
}

pktPubComp := (&pktPubComp{ID: pubRel.ID}).pack()
pktPubComp := (&pktPubComp{ID: pubRel.ID}).Pack()
if err := c.write(pktPubComp); err != nil {
return wrapError(err, "sending PUBCOMP")
}
Expand Down
2 changes: 1 addition & 1 deletion serve_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func BenchmarkReadPacket(b *testing.B) {
data := (&pktPublish{Message: &Message{
Topic: "topicString",
Payload: make([]byte, l),
}}).pack()
}}).Pack()

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down
2 changes: 1 addition & 1 deletion serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestRemainingLengthParse(t *testing.T) {

// Send PUBLISH from broker.
if _, err := ca.Write(
(&pktPublish{Message: &Message{Topic: "a", Payload: make([]byte, 256)}}).pack(),
(&pktPublish{Message: &Message{Topic: "a", Payload: make([]byte, 256)}}).Pack(),
); err != nil {
t.Fatalf("Unexpected error: ''%v''", err)
}
Expand Down
8 changes: 4 additions & 4 deletions subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ type pktSubscribe struct {
Subscriptions []Subscription
}

func (p *pktSubscribe) pack() []byte {
var payload []byte
func (p *pktSubscribe) Pack() []byte {
payload := make([]byte, 0, packetBufferCap)
for _, sub := range p.Subscriptions {
payload = append(payload, packString(sub.Topic)...)
payload = appendString(payload, sub.Topic)

var flag byte
switch sub.QoS {
Expand Down Expand Up @@ -75,7 +75,7 @@ func (c *BaseClient) Subscribe(ctx context.Context, subs ...Subscription) error
c.sig.chSubAck[id] = chSubAck
c.sig.mu.Unlock()

pkt := (&pktSubscribe{ID: id, Subscriptions: subs}).pack()
pkt := (&pktSubscribe{ID: id, Subscriptions: subs}).Pack()
if err := c.write(pkt); err != nil {
return wrapError(err, "sending SUBSCRIBE")
}
Expand Down
8 changes: 4 additions & 4 deletions unsubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ type pktUnsubscribe struct {
Topics []string
}

func (p *pktUnsubscribe) pack() []byte {
var payload []byte
func (p *pktUnsubscribe) Pack() []byte {
payload := make([]byte, 0, packetBufferCap)
for _, sub := range p.Topics {
payload = append(payload, packString(sub)...)
payload = appendString(payload, sub)
}

return pack(
Expand All @@ -51,7 +51,7 @@ func (c *BaseClient) Unsubscribe(ctx context.Context, subs ...string) error {
c.sig.chUnsubAck[id] = chUnsubAck
c.sig.mu.Unlock()

pkt := (&pktUnsubscribe{ID: id, Topics: subs}).pack()
pkt := (&pktUnsubscribe{ID: id, Topics: subs}).Pack()
if err := c.write(pkt); err != nil {
return wrapError(err, "sending UNSUBSCRIBE")
}
Expand Down

0 comments on commit 6cf9aa0

Please sign in to comment.