Skip to content

Commit

Permalink
NOISSUE - Raise code coverage in ws adapter (#242)
Browse files Browse the repository at this point in the history
* Fix failed subscription handling in ws adapter

Fix unsubscribing bug in ws adapter.

Signed-off-by: Aleksandar Novakovic <[email protected]>

* Add subscription fail and publish fail test cases

Update mock implementation to receive publish error in order to
support these test cases.

Signed-off-by: Aleksandar Novakovic <[email protected]>

* Update mainflux version to 0.2.3

Update project version and load tests version to 0.2.3.

Signed-off-by: Aleksandar Novakovic <[email protected]>

* Update version endpoint response format

Signed-off-by: Aleksandar Novakovic <[email protected]>
  • Loading branch information
anovakovic01 authored and mijicd committed Apr 24, 2018
1 parent 58f3c73 commit aea7db1
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 19 deletions.
2 changes: 1 addition & 1 deletion load-test/build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
enablePlugins(GatlingPlugin)

name := "load-test"
version := "0.2.2"
version := "0.2.3"

scalaVersion := "2.12.4"

Expand Down
8 changes: 4 additions & 4 deletions version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import (
"net/http"
)

const version string = "0.2.2"
const version string = "0.2.3"

type response struct {
Version string
Service string
Service string `json:"service"`
Version string `json:"version"`
}

// Version exposes an HTTP handler for retrieving service version.
func Version(service string) http.HandlerFunc {
return http.HandlerFunc(func(rw http.ResponseWriter, _ *http.Request) {
res := response{Version: version, Service: service}
res := response{service, version}

data, _ := json.Marshal(res)

Expand Down
3 changes: 2 additions & 1 deletion ws/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/mainflux/mainflux/ws"
"github.com/mainflux/mainflux/ws/mocks"
broker "github.com/nats-io/go-nats"
"github.com/stretchr/testify/assert"

"github.com/mainflux/mainflux"
Expand All @@ -29,7 +30,7 @@ var (

func newService() ws.Service {
subs := map[string]ws.Channel{chanID: channel}
pubsub := mocks.NewService(subs)
pubsub := mocks.NewService(subs, broker.ErrInvalidMsg)
return ws.New(pubsub)
}

Expand Down
5 changes: 3 additions & 2 deletions ws/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ func handshake(svc ws.Service) http.HandlerFunc {
// Subscribe to channel
channel := ws.Channel{make(chan mainflux.RawMessage), make(chan bool)}
sub.channel = channel
if err = svc.Subscribe(sub.chanID, sub.channel); err != nil {
if err := svc.Subscribe(sub.chanID, sub.channel); err != nil {
logger.Warn(fmt.Sprintf("Failed to subscribe to NATS subject: %s", err))
w.WriteHeader(http.StatusExpectationFailed)
conn.Close()
return
}
go sub.listen()
Expand Down Expand Up @@ -137,6 +137,7 @@ func (sub subscription) broadcast(svc ws.Service) {
if err := svc.Publish(msg); err != nil {
logger.Warn(fmt.Sprintf("Failed to publish message to NATS: %s", err))
if err == ws.ErrFailedConnection {
sub.conn.Close()
sub.channel.Closed <- true
return
}
Expand Down
16 changes: 10 additions & 6 deletions ws/api/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/mainflux/mainflux/ws"
"github.com/mainflux/mainflux/ws/api"
"github.com/mainflux/mainflux/ws/mocks"
broker "github.com/nats-io/go-nats"
"github.com/stretchr/testify/assert"
)

Expand All @@ -31,7 +32,7 @@ var (

func newService() ws.Service {
subs := map[string]ws.Channel{chanID: channel}
pubsub := mocks.NewService(subs)
pubsub := mocks.NewService(subs, broker.ErrConnectionClosed)
return ws.New(pubsub)
}

Expand Down Expand Up @@ -88,11 +89,14 @@ func TestHandshake(t *testing.T) {
header bool
token string
status int
msg []byte
}{
{"connect and send message", chanID, true, token, http.StatusSwitchingProtocols},
{"connect with invalid token", chanID, true, "", http.StatusForbidden},
{"connect with invalid channel id", "1", true, token, http.StatusNotFound},
{"connect and send message with token as query parameter", chanID, false, token, http.StatusSwitchingProtocols},
{"connect and send message", chanID, true, token, http.StatusSwitchingProtocols, msg},
{"connect to non-existent channel", "123e4567-e89b-12d3-a456-000000000042", true, token, http.StatusSwitchingProtocols, []byte{}},
{"connect with invalid token", chanID, true, "", http.StatusForbidden, []byte{}},
{"connect with invalid channel id", "1", true, token, http.StatusNotFound, []byte{}},
{"connect and send message with token as query parameter", chanID, false, token, http.StatusSwitchingProtocols, msg},
{"connect and send message that cannot be published", chanID, true, token, http.StatusSwitchingProtocols, []byte{}},
}

for _, tc := range cases {
Expand All @@ -101,7 +105,7 @@ func TestHandshake(t *testing.T) {
if err != nil {
continue
}
err = conn.WriteMessage(websocket.TextMessage, msg)
err = conn.WriteMessage(websocket.TextMessage, tc.msg)
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s\n", tc.desc, err))
}
}
18 changes: 18 additions & 0 deletions ws/channel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package ws_test

import (
"testing"

"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/ws"
"github.com/stretchr/testify/assert"
)

func TestClose(t *testing.T) {
channel := ws.Channel{make(chan mainflux.RawMessage), make(chan bool)}
channel.Close()
_, closed := <-channel.Closed
_, messagesClosed := <-channel.Messages
assert.False(t, closed, "channel closed stayed open")
assert.False(t, messagesClosed, "channel messages stayed open")
}
8 changes: 4 additions & 4 deletions ws/mocks/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@ package mocks
import (
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/ws"
broker "github.com/nats-io/go-nats"
)

var _ ws.Service = (*mockService)(nil)

type mockService struct {
subscriptions map[string]ws.Channel
pubError error
}

// NewService returns mock message publisher.
func NewService(subs map[string]ws.Channel) ws.Service {
return mockService{subs}
func NewService(subs map[string]ws.Channel, pubError error) ws.Service {
return mockService{subs, pubError}
}

func (svc mockService) Publish(msg mainflux.RawMessage) error {
if len(msg.Payload) == 0 {
return broker.ErrInvalidMsg
return svc.pubError
}
svc.subscriptions[msg.Channel].Messages <- msg
return nil
Expand Down
9 changes: 8 additions & 1 deletion ws/nats/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,20 @@ func (pubsub *natsPubSub) Subscribe(chanID string, channel ws.Channel) error {
return
}

// Prevents sending message to closed channel
select {
case channel.Messages <- rawMsg:
case <-channel.Closed:
sub.Unsubscribe()
channel.Close()
}
})

// Check if subscription should be closed
go func() {
<-channel.Closed
sub.Unsubscribe()
channel.Close()
}()

return err
}

0 comments on commit aea7db1

Please sign in to comment.