diff --git a/cmd/ws/main.go b/cmd/ws/main.go index 19f6a0fb05..5df57232c1 100644 --- a/cmd/ws/main.go +++ b/cmd/ws/main.go @@ -111,7 +111,7 @@ func main() { svc := newService(tc, nps, logger, tracer) - hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, cfg.InstanceID), logger) + hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(ctx, svc, logger, cfg.InstanceID), logger) if cfg.SendTelemetry { chc := chclient.New(svcName, mainflux.Version, logger, cancel) diff --git a/coap/adapter.go b/coap/adapter.go index c5cb2f1509..44ec4b40f9 100644 --- a/coap/adapter.go +++ b/coap/adapter.go @@ -79,7 +79,7 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic Subject: key, Object: chanID, Action: policies.ReadAction, - EntityType: policies.GroupEntityType, + EntityType: policies.ThingEntityType, } res, err := svc.auth.Authorize(ctx, ar) if err != nil { @@ -100,7 +100,7 @@ func (svc *adapterService) Unsubscribe(ctx context.Context, key, chanID, subtopi Subject: key, Object: chanID, Action: policies.ReadAction, - EntityType: policies.GroupEntityType, + EntityType: policies.ThingEntityType, } res, err := svc.auth.Authorize(ctx, ar) if err != nil { diff --git a/mqtt/handler.go b/mqtt/handler.go index fac018588f..7e8289ff7b 100644 --- a/mqtt/handler.go +++ b/mqtt/handler.go @@ -97,7 +97,7 @@ func (h *handler) AuthConnect(ctx context.Context) error { return errors.ErrAuthentication } - if err := h.es.Connect(ctx, string(s.Password)); err != nil { + if err := h.es.Connect(ctx, pwd); err != nil { h.logger.Error(errors.Wrap(ErrFailedPublishConnectEvent, err).Error()) } @@ -249,7 +249,7 @@ func (h *handler) authAccess(ctx context.Context, password, topic, action string return errors.ErrAuthorization } - return err + return nil } func parseSubtopic(subtopic string) (string, error) { diff --git a/ws/adapter.go b/ws/adapter.go index ab6aa89425..182cb5c5f1 100644 --- a/ws/adapter.go +++ b/ws/adapter.go @@ -75,7 +75,7 @@ func New(auth policies.AuthServiceClient, pubsub messaging.PubSub) Service { } func (svc *adapterService) Publish(ctx context.Context, thingKey string, msg *messaging.Message) error { - thid, err := svc.authorize(ctx, thingKey, msg.GetChannel()) + thid, err := svc.authorize(ctx, thingKey, msg.GetChannel(), policies.WriteAction) if err != nil { return ErrUnauthorizedAccess } @@ -98,7 +98,7 @@ func (svc *adapterService) Subscribe(ctx context.Context, thingKey, chanID, subt return ErrUnauthorizedAccess } - thid, err := svc.authorize(ctx, thingKey, chanID) + thid, err := svc.authorize(ctx, thingKey, chanID, policies.ReadAction) if err != nil { return ErrUnauthorizedAccess } @@ -122,7 +122,7 @@ func (svc *adapterService) Unsubscribe(ctx context.Context, thingKey, chanID, su return ErrUnauthorizedAccess } - thid, err := svc.authorize(ctx, thingKey, chanID) + thid, err := svc.authorize(ctx, thingKey, chanID, policies.ReadAction) if err != nil { return ErrUnauthorizedAccess } @@ -137,11 +137,11 @@ func (svc *adapterService) Unsubscribe(ctx context.Context, thingKey, chanID, su // authorize checks if the thingKey is authorized to access the channel // and returns the thingID if it is. -func (svc *adapterService) authorize(ctx context.Context, thingKey, chanID string) (string, error) { +func (svc *adapterService) authorize(ctx context.Context, thingKey, chanID, action string) (string, error) { ar := &policies.AuthorizeReq{ Subject: thingKey, Object: chanID, - Action: policies.ReadAction, + Action: action, EntityType: policies.ThingEntityType, } res, err := svc.auth.Authorize(ctx, ar) diff --git a/ws/api/endpoint_test.go b/ws/api/endpoint_test.go index 9bed0ec4e0..e0b81a7b73 100644 --- a/ws/api/endpoint_test.go +++ b/ws/api/endpoint_test.go @@ -4,6 +4,7 @@ package api_test import ( + "context" "fmt" "net/http" "net/http/httptest" @@ -37,7 +38,7 @@ func newService(cc policies.AuthServiceClient) (ws.Service, mocks.MockPubSub) { func newHTTPServer(svc ws.Service) *httptest.Server { logger := mflog.NewMock() - mux := api.MakeHandler(svc, logger, instanceID) + mux := api.MakeHandler(context.Background(), svc, logger, instanceID) return httptest.NewServer(mux) } diff --git a/ws/api/endpoints.go b/ws/api/endpoints.go index 4958f7ae40..f5d1fb9710 100644 --- a/ws/api/endpoints.go +++ b/ws/api/endpoints.go @@ -21,9 +21,8 @@ import ( var channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`) -func handshake(svc ws.Service) http.HandlerFunc { +func handshake(ctx context.Context, svc ws.Service) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() req, err := decodeRequest(r) if err != nil { encodeError(w, err) @@ -146,10 +145,10 @@ func process(ctx context.Context, svc ws.Service, req connReq, msgs <-chan []byt Payload: msg, Created: time.Now().UnixNano(), } - _ = svc.Publish(ctx, req.thingKey, &m) - } - if err := svc.Unsubscribe(ctx, req.thingKey, req.chanID, req.subtopic); err != nil { - req.conn.Close() + + if err := svc.Publish(ctx, req.thingKey, &m); err != nil { + logger.Warn(fmt.Sprintf("Failed to publish message: %s", err.Error())) + } } } diff --git a/ws/api/transport.go b/ws/api/transport.go index 3fa963e9f0..8eddc88cbf 100644 --- a/ws/api/transport.go +++ b/ws/api/transport.go @@ -4,6 +4,7 @@ package api import ( + "context" "errors" "net/http" @@ -35,12 +36,12 @@ var ( ) // MakeHandler returns http handler with handshake endpoint. -func MakeHandler(svc ws.Service, l mflog.Logger, instanceID string) http.Handler { +func MakeHandler(ctx context.Context, svc ws.Service, l mflog.Logger, instanceID string) http.Handler { logger = l mux := bone.New() - mux.GetFunc("/channels/:chanID/messages", handshake(svc)) - mux.GetFunc("/channels/:chanID/messages/*", handshake(svc)) + mux.GetFunc("/channels/:chanID/messages", handshake(ctx, svc)) + mux.GetFunc("/channels/:chanID/messages/*", handshake(ctx, svc)) mux.GetFunc("/version", mainflux.Health(protocol, instanceID)) mux.Handle("/metrics", promhttp.Handler())