forked from absmach/supermq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadapter.go
156 lines (121 loc) · 4.74 KB
/
adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package ws
import (
"context"
"fmt"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging"
"github.com/mainflux/mainflux/things/policies"
)
const chansPrefix = "channels"
var (
// ErrFailedMessagePublish indicates that message publishing failed.
ErrFailedMessagePublish = errors.New("failed to publish message")
// ErrFailedSubscription indicates that client couldn't subscribe to specified channel.
ErrFailedSubscription = errors.New("failed to subscribe to a channel")
// ErrFailedUnsubscribe indicates that client couldn't unsubscribe from specified channel.
ErrFailedUnsubscribe = errors.New("failed to unsubscribe from a channel")
// ErrFailedConnection indicates that service couldn't connect to message broker.
ErrFailedConnection = errors.New("failed to connect to message broker")
// ErrInvalidConnection indicates that client couldn't subscribe to message broker.
ErrInvalidConnection = errors.New("nats: invalid connection")
// ErrUnauthorizedAccess indicates that client provided missing or invalid credentials.
ErrUnauthorizedAccess = errors.New("missing or invalid credentials provided")
// ErrEmptyTopic indicate absence of thingKey in the request.
ErrEmptyTopic = errors.New("empty topic")
// ErrEmptyID indicate absence of channelID in the request.
ErrEmptyID = errors.New("empty id")
)
// Service specifies web socket service API.
type Service interface {
// Publish publishes the message to the internal message broker.
// ThingKey is used for authorization.
// If the message is published successfully, nil is returned otherwise
// error is returned.
Publish(ctx context.Context, thingKey string, msg *messaging.Message) error
// Subscribe subscribes message from the broker using the thingKey for authorization,
// and the channelID for subscription. Subtopic is optional.
// If the subscription is successful, nil is returned otherwise error is returned.
Subscribe(ctx context.Context, thingKey, chanID, subtopic string, client *Client) error
// Unsubscribe unsubscribes message from the broker using the thingKey for authorization,
// and the channelID for subscription. Subtopic is optional.
// If the unsubscription is successful, nil is returned otherwise error is returned.
Unsubscribe(ctx context.Context, thingKey, chanID, subtopic string) error
}
var _ Service = (*adapterService)(nil)
type adapterService struct {
auth policies.AuthServiceClient
pubsub messaging.PubSub
}
// New instantiates the WS adapter implementation.
func New(auth policies.AuthServiceClient, pubsub messaging.PubSub) Service {
return &adapterService{
auth: auth,
pubsub: pubsub,
}
}
func (svc *adapterService) Publish(ctx context.Context, thingKey string, msg *messaging.Message) error {
thid, err := svc.authorize(ctx, thingKey, msg.GetChannel(), policies.WriteAction)
if err != nil {
return ErrUnauthorizedAccess
}
if len(msg.Payload) == 0 {
return ErrFailedMessagePublish
}
msg.Publisher = thid
if err := svc.pubsub.Publish(ctx, msg.GetChannel(), msg); err != nil {
return ErrFailedMessagePublish
}
return nil
}
func (svc *adapterService) Subscribe(ctx context.Context, thingKey, chanID, subtopic string, c *Client) error {
if chanID == "" || thingKey == "" {
return ErrUnauthorizedAccess
}
thid, err := svc.authorize(ctx, thingKey, chanID, policies.ReadAction)
if err != nil {
return ErrUnauthorizedAccess
}
c.id = thid
subject := fmt.Sprintf("%s.%s", chansPrefix, chanID)
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}
if err := svc.pubsub.Subscribe(ctx, thid, subject, c); err != nil {
return ErrFailedSubscription
}
return nil
}
func (svc *adapterService) Unsubscribe(ctx context.Context, thingKey, chanID, subtopic string) error {
if chanID == "" || thingKey == "" {
return ErrUnauthorizedAccess
}
thid, err := svc.authorize(ctx, thingKey, chanID, policies.ReadAction)
if err != nil {
return ErrUnauthorizedAccess
}
subject := fmt.Sprintf("%s.%s", chansPrefix, chanID)
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}
return svc.pubsub.Unsubscribe(ctx, thid, subject)
}
// authorize checks if the thingKey is authorized to access the channel
// and returns the thingID if it is.
func (svc *adapterService) authorize(ctx context.Context, thingKey, chanID, action string) (string, error) {
ar := &policies.AuthorizeReq{
Subject: thingKey,
Object: chanID,
Action: action,
EntityType: policies.ThingEntityType,
}
res, err := svc.auth.Authorize(ctx, ar)
if err != nil {
return "", errors.Wrap(errors.ErrAuthorization, err)
}
if !res.GetAuthorized() {
return "", errors.Wrap(errors.ErrAuthorization, err)
}
return res.GetThingID(), nil
}