From aea7db14b7bfcde2ecf23d34f7007d0064371fec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20Novakovi=C4=87?= <anovakovic01@gmail.com> Date: Tue, 24 Apr 2018 13:56:13 +0200 Subject: [PATCH] NOISSUE - Raise code coverage in ws adapter (#242) * Fix failed subscription handling in ws adapter Fix unsubscribing bug in ws adapter. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add subscription fail and publish fail test cases Update mock implementation to receive publish error in order to support these test cases. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update mainflux version to 0.2.3 Update project version and load tests version to 0.2.3. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update version endpoint response format Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> --- load-test/build.sbt | 2 +- version.go | 8 ++++---- ws/adapter_test.go | 3 ++- ws/api/transport.go | 5 +++-- ws/api/transport_test.go | 16 ++++++++++------ ws/channel_test.go | 18 ++++++++++++++++++ ws/mocks/messages.go | 8 ++++---- ws/nats/pubsub.go | 9 ++++++++- 8 files changed, 50 insertions(+), 19 deletions(-) create mode 100644 ws/channel_test.go diff --git a/load-test/build.sbt b/load-test/build.sbt index 77f7f82b4e..2c2e1d035c 100644 --- a/load-test/build.sbt +++ b/load-test/build.sbt @@ -1,7 +1,7 @@ enablePlugins(GatlingPlugin) name := "load-test" -version := "0.2.2" +version := "0.2.3" scalaVersion := "2.12.4" diff --git a/version.go b/version.go index a8ef484e3d..0f7eafe7e1 100644 --- a/version.go +++ b/version.go @@ -5,17 +5,17 @@ import ( "net/http" ) -const version string = "0.2.2" +const version string = "0.2.3" type response struct { - Version string - Service string + Service string `json:"service"` + Version string `json:"version"` } // Version exposes an HTTP handler for retrieving service version. func Version(service string) http.HandlerFunc { return http.HandlerFunc(func(rw http.ResponseWriter, _ *http.Request) { - res := response{Version: version, Service: service} + res := response{service, version} data, _ := json.Marshal(res) diff --git a/ws/adapter_test.go b/ws/adapter_test.go index 343812dbd4..cacba6dfd6 100644 --- a/ws/adapter_test.go +++ b/ws/adapter_test.go @@ -6,6 +6,7 @@ import ( "github.com/mainflux/mainflux/ws" "github.com/mainflux/mainflux/ws/mocks" + broker "github.com/nats-io/go-nats" "github.com/stretchr/testify/assert" "github.com/mainflux/mainflux" @@ -29,7 +30,7 @@ var ( func newService() ws.Service { subs := map[string]ws.Channel{chanID: channel} - pubsub := mocks.NewService(subs) + pubsub := mocks.NewService(subs, broker.ErrInvalidMsg) return ws.New(pubsub) } diff --git a/ws/api/transport.go b/ws/api/transport.go index ecf2056ed3..d441612c19 100644 --- a/ws/api/transport.go +++ b/ws/api/transport.go @@ -69,9 +69,9 @@ func handshake(svc ws.Service) http.HandlerFunc { // Subscribe to channel channel := ws.Channel{make(chan mainflux.RawMessage), make(chan bool)} sub.channel = channel - if err = svc.Subscribe(sub.chanID, sub.channel); err != nil { + if err := svc.Subscribe(sub.chanID, sub.channel); err != nil { logger.Warn(fmt.Sprintf("Failed to subscribe to NATS subject: %s", err)) - w.WriteHeader(http.StatusExpectationFailed) + conn.Close() return } go sub.listen() @@ -137,6 +137,7 @@ func (sub subscription) broadcast(svc ws.Service) { if err := svc.Publish(msg); err != nil { logger.Warn(fmt.Sprintf("Failed to publish message to NATS: %s", err)) if err == ws.ErrFailedConnection { + sub.conn.Close() sub.channel.Closed <- true return } diff --git a/ws/api/transport_test.go b/ws/api/transport_test.go index dd88dd5254..1ab86df75f 100644 --- a/ws/api/transport_test.go +++ b/ws/api/transport_test.go @@ -15,6 +15,7 @@ import ( "github.com/mainflux/mainflux/ws" "github.com/mainflux/mainflux/ws/api" "github.com/mainflux/mainflux/ws/mocks" + broker "github.com/nats-io/go-nats" "github.com/stretchr/testify/assert" ) @@ -31,7 +32,7 @@ var ( func newService() ws.Service { subs := map[string]ws.Channel{chanID: channel} - pubsub := mocks.NewService(subs) + pubsub := mocks.NewService(subs, broker.ErrConnectionClosed) return ws.New(pubsub) } @@ -88,11 +89,14 @@ func TestHandshake(t *testing.T) { header bool token string status int + msg []byte }{ - {"connect and send message", chanID, true, token, http.StatusSwitchingProtocols}, - {"connect with invalid token", chanID, true, "", http.StatusForbidden}, - {"connect with invalid channel id", "1", true, token, http.StatusNotFound}, - {"connect and send message with token as query parameter", chanID, false, token, http.StatusSwitchingProtocols}, + {"connect and send message", chanID, true, token, http.StatusSwitchingProtocols, msg}, + {"connect to non-existent channel", "123e4567-e89b-12d3-a456-000000000042", true, token, http.StatusSwitchingProtocols, []byte{}}, + {"connect with invalid token", chanID, true, "", http.StatusForbidden, []byte{}}, + {"connect with invalid channel id", "1", true, token, http.StatusNotFound, []byte{}}, + {"connect and send message with token as query parameter", chanID, false, token, http.StatusSwitchingProtocols, msg}, + {"connect and send message that cannot be published", chanID, true, token, http.StatusSwitchingProtocols, []byte{}}, } for _, tc := range cases { @@ -101,7 +105,7 @@ func TestHandshake(t *testing.T) { if err != nil { continue } - err = conn.WriteMessage(websocket.TextMessage, msg) + err = conn.WriteMessage(websocket.TextMessage, tc.msg) assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s\n", tc.desc, err)) } } diff --git a/ws/channel_test.go b/ws/channel_test.go new file mode 100644 index 0000000000..f0bccdb89c --- /dev/null +++ b/ws/channel_test.go @@ -0,0 +1,18 @@ +package ws_test + +import ( + "testing" + + "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/ws" + "github.com/stretchr/testify/assert" +) + +func TestClose(t *testing.T) { + channel := ws.Channel{make(chan mainflux.RawMessage), make(chan bool)} + channel.Close() + _, closed := <-channel.Closed + _, messagesClosed := <-channel.Messages + assert.False(t, closed, "channel closed stayed open") + assert.False(t, messagesClosed, "channel messages stayed open") +} diff --git a/ws/mocks/messages.go b/ws/mocks/messages.go index 5e7a5da746..7ba132397b 100644 --- a/ws/mocks/messages.go +++ b/ws/mocks/messages.go @@ -3,23 +3,23 @@ package mocks import ( "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/ws" - broker "github.com/nats-io/go-nats" ) var _ ws.Service = (*mockService)(nil) type mockService struct { subscriptions map[string]ws.Channel + pubError error } // NewService returns mock message publisher. -func NewService(subs map[string]ws.Channel) ws.Service { - return mockService{subs} +func NewService(subs map[string]ws.Channel, pubError error) ws.Service { + return mockService{subs, pubError} } func (svc mockService) Publish(msg mainflux.RawMessage) error { if len(msg.Payload) == 0 { - return broker.ErrInvalidMsg + return svc.pubError } svc.subscriptions[msg.Channel].Messages <- msg return nil diff --git a/ws/nats/pubsub.go b/ws/nats/pubsub.go index cf668fc08b..c27911cf00 100644 --- a/ws/nats/pubsub.go +++ b/ws/nats/pubsub.go @@ -59,13 +59,20 @@ func (pubsub *natsPubSub) Subscribe(chanID string, channel ws.Channel) error { return } + // Prevents sending message to closed channel select { case channel.Messages <- rawMsg: case <-channel.Closed: sub.Unsubscribe() - channel.Close() } }) + // Check if subscription should be closed + go func() { + <-channel.Closed + sub.Unsubscribe() + channel.Close() + }() + return err }