Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SMQ-2605: Groups replication with groups events consumer & listing of things and channels #2639

Merged
merged 7 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/http/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
UserKey = "user"
DomainKey = "domain"
ChannelKey = "channel"
ConnTypeKey = "connection_type"
DefPermission = "read_permission"
DefTotal = uint64(100)
DefOffset = 0
Expand Down
108 changes: 78 additions & 30 deletions channels/api/http/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

api "github.com/absmach/supermq/api/http"
apiutil "github.com/absmach/supermq/api/http/util"
smqclients "github.com/absmach/supermq/clients"
"github.com/absmach/supermq/clients"
"github.com/absmach/supermq/pkg/errors"
"github.com/go-chi/chi/v5"
)
Expand Down Expand Up @@ -51,58 +51,106 @@
}

func decodeListChannels(_ context.Context, r *http.Request) (interface{}, error) {
s, err := apiutil.ReadStringQuery(r, api.StatusKey, api.DefClientStatus)
name, err := apiutil.ReadStringQuery(r, api.NameKey, "")
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
o, err := apiutil.ReadNumQuery[uint64](r, api.OffsetKey, api.DefOffset)

tag, err := apiutil.ReadStringQuery(r, api.TagKey, "")
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}
l, err := apiutil.ReadNumQuery[uint64](r, api.LimitKey, api.DefLimit)

s, err := apiutil.ReadStringQuery(r, api.StatusKey, api.DefGroupStatus)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
m, err := apiutil.ReadMetadataQuery(r, api.MetadataKey, nil)
status, err := clients.ToStatus(s)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
n, err := apiutil.ReadStringQuery(r, api.NameKey, "")

meta, err := apiutil.ReadMetadataQuery(r, api.MetadataKey, nil)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
t, err := apiutil.ReadStringQuery(r, api.TagKey, "")

offset, err := apiutil.ReadNumQuery[uint64](r, api.OffsetKey, api.DefOffset)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
id, err := apiutil.ReadStringQuery(r, api.IDOrder, "")
limit, err := apiutil.ReadNumQuery[uint64](r, api.LimitKey, api.DefLimit)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}
p, err := apiutil.ReadStringQuery(r, api.PermissionKey, api.DefPermission)

dir, err := apiutil.ReadStringQuery(r, api.DirKey, api.DefDir)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)

Check warning on line 89 in channels/api/http/decode.go

View check run for this annotation

Codecov / codecov/patch

channels/api/http/decode.go#L89

Added line #L89 was not covered by tests
}

lp, err := apiutil.ReadBoolQuery(r, api.ListPerms, api.DefListPerms)
order, err := apiutil.ReadStringQuery(r, api.OrderKey, api.DefOrder)
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)

Check warning on line 94 in channels/api/http/decode.go

View check run for this annotation

Codecov / codecov/patch

channels/api/http/decode.go#L94

Added line #L94 was not covered by tests
}
st, err := smqclients.ToStatus(s)

allActions, err := apiutil.ReadStringQuery(r, api.ActionsKey, "")
if err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}

Check warning on line 100 in channels/api/http/decode.go

View check run for this annotation

Codecov / codecov/patch

channels/api/http/decode.go#L99-L100

Added lines #L99 - L100 were not covered by tests

actions := []string{}

allActions = strings.TrimSpace(allActions)
if allActions != "" {
actions = strings.Split(allActions, ",")

Check warning on line 106 in channels/api/http/decode.go

View check run for this annotation

Codecov / codecov/patch

channels/api/http/decode.go#L106

Added line #L106 was not covered by tests
}
roleID, err := apiutil.ReadStringQuery(r, api.RoleIDKey, "")
if err != nil {
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}

Check warning on line 111 in channels/api/http/decode.go

View check run for this annotation

Codecov / codecov/patch

channels/api/http/decode.go#L110-L111

Added lines #L110 - L111 were not covered by tests

roleName, err := apiutil.ReadStringQuery(r, api.RoleNameKey, "")
if err != nil {
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}

Check warning on line 116 in channels/api/http/decode.go

View check run for this annotation

Codecov / codecov/patch

channels/api/http/decode.go#L115-L116

Added lines #L115 - L116 were not covered by tests

accessType, err := apiutil.ReadStringQuery(r, api.AccessTypeKey, "")
if err != nil {
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}

Check warning on line 121 in channels/api/http/decode.go

View check run for this annotation

Codecov / codecov/patch

channels/api/http/decode.go#L120-L121

Added lines #L120 - L121 were not covered by tests

userID, err := apiutil.ReadStringQuery(r, api.UserKey, "")
if err != nil {
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}

Check warning on line 126 in channels/api/http/decode.go

View check run for this annotation

Codecov / codecov/patch

channels/api/http/decode.go#L125-L126

Added lines #L125 - L126 were not covered by tests

groupID, err := apiutil.ReadStringQuery(r, api.GroupKey, "")
if err != nil {
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}

Check warning on line 131 in channels/api/http/decode.go

View check run for this annotation

Codecov / codecov/patch

channels/api/http/decode.go#L130-L131

Added lines #L130 - L131 were not covered by tests

clientID, err := apiutil.ReadStringQuery(r, api.ClientKey, "")
if err != nil {
return listChannelsReq{}, errors.Wrap(apiutil.ErrValidation, err)
}

Check warning on line 136 in channels/api/http/decode.go

View check run for this annotation

Codecov / codecov/patch

channels/api/http/decode.go#L135-L136

Added lines #L135 - L136 were not covered by tests

req := listChannelsReq{
status: st,
offset: o,
limit: l,
metadata: m,
name: n,
tag: t,
permission: p,
listPerms: lp,
userID: chi.URLParam(r, "userID"),
id: id,
name: name,
tag: tag,
status: status,
metadata: meta,
roleName: roleName,
roleID: roleID,
actions: actions,
accessType: accessType,
order: order,
dir: dir,
offset: offset,
limit: limit,
groupID: groupID,
clientID: clientID,
userID: userID,
}
return req, nil
}
Expand Down
24 changes: 15 additions & 9 deletions channels/api/http/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,21 @@ func listChannelsEndpoint(svc channels.Service) endpoint.Endpoint {
}

pm := channels.PageMetadata{
Status: req.status,
Offset: req.offset,
Limit: req.limit,
Name: req.name,
Tag: req.tag,
Permission: req.permission,
Metadata: req.metadata,
ListPerms: req.listPerms,
Id: req.id,
Offset: req.offset,
Limit: req.limit,
Name: req.name,
Order: req.order,
Dir: req.dir,
Metadata: req.metadata,
Tag: req.tag,
Status: req.status,
Group: req.groupID,
Client: req.clientID,
ConnectionType: req.connType,
RoleName: req.roleName,
RoleID: req.roleID,
Actions: req.actions,
AccessType: req.accessType,
}
page, err := svc.ListChannels(ctx, session, pm)
if err != nil {
Expand Down
30 changes: 15 additions & 15 deletions channels/api/http/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
api "github.com/absmach/supermq/api/http"
apiutil "github.com/absmach/supermq/api/http/util"
"github.com/absmach/supermq/channels"
smqclients "github.com/absmach/supermq/clients"
"github.com/absmach/supermq/clients"
"github.com/absmach/supermq/pkg/connections"
)

Expand Down Expand Up @@ -64,29 +64,29 @@ func (req viewChannelReq) validate() error {
}

type listChannelsReq struct {
status smqclients.Status
offset uint64
limit uint64
name string
tag string
permission string
visibility string
status clients.Status
metadata clients.Metadata
roleName string
roleID string
actions []string
accessType string
order string
dir string
offset uint64
limit uint64
groupID string
clientID string
connType string
userID string
listPerms bool
metadata smqclients.Metadata
id string
}

func (req listChannelsReq) validate() error {
if req.limit > api.MaxLimitSize || req.limit < 1 {
return apiutil.ErrLimitSize
}
if req.visibility != "" &&
req.visibility != api.AllVisibility &&
req.visibility != api.MyVisibility &&
req.visibility != api.SharedVisibility {
return apiutil.ErrInvalidVisibilityType
}

if len(req.name) > api.MaxNameSize {
return apiutil.ErrNameSize
}
Expand Down
8 changes: 0 additions & 8 deletions channels/api/http/requests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,6 @@ func TestListChannelsReqValidation(t *testing.T) {
},
err: apiutil.ErrNameSize,
},
{
desc: "invalid visibility",
req: listChannelsReq{
limit: 10,
visibility: "invalid",
},
err: apiutil.ErrInvalidVisibilityType,
},
}
for _, tc := range cases {
err := tc.req.validate()
Expand Down
68 changes: 41 additions & 27 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,43 @@ type Channel struct {
ParentGroup string `json:"parent_group_id,omitempty"`
Domain string `json:"domain_id,omitempty"`
Metadata clients.Metadata `json:"metadata,omitempty"`
CreatedBy string `json:"created_by,omitempty"`
CreatedAt time.Time `json:"created_at,omitempty"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
UpdatedBy string `json:"updated_by,omitempty"`
Status clients.Status `json:"status,omitempty"` // 1 for enabled, 0 for disabled
Permissions []string `json:"permissions,omitempty"` // 1 for enabled, 0 for disabled
Status clients.Status `json:"status,omitempty"` // 1 for enabled, 0 for disabled
// Extended
ParentGroupPath string `json:"parent_group_path"`
RoleID string `json:"role_id"`
RoleName string `json:"role_name"`
Actions []string `json:"actions"`
AccessType string `json:"access_type"`
AccessProviderId string `json:"access_provider_id"`
AccessProviderRoleId string `json:"access_provider_role_id"`
AccessProviderRoleName string `json:"access_provider_role_name"`
AccessProviderRoleActions []string `json:"access_provider_role_actions"`
}

type PageMetadata struct {
Total uint64 `json:"total"`
Offset uint64 `json:"offset"`
Limit uint64 `json:"limit"`
Name string `json:"name,omitempty"`
Id string `json:"id,omitempty"`
Order string `json:"order,omitempty"`
Dir string `json:"dir,omitempty"`
Metadata clients.Metadata `json:"metadata,omitempty"`
Domain string `json:"domain,omitempty"`
Tag string `json:"tag,omitempty"`
Permission string `json:"permission,omitempty"`
Status clients.Status `json:"status,omitempty"`
IDs []string `json:"ids,omitempty"`
ListPerms bool `json:"-"`
ClientID string `json:"-"`
Total uint64 `json:"total"`
Offset uint64 `json:"offset"`
Limit uint64 `json:"limit"`
Order string `json:"order,omitempty"`
Dir string `json:"dir,omitempty"`
Id string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Metadata clients.Metadata `json:"metadata,omitempty"`
Domain string `json:"domain,omitempty"`
Tag string `json:"tag,omitempty"`
Status clients.Status `json:"status,omitempty"`
Group string `json:"group,omitempty"`
Client string `json:"client,omitempty"`
ConnectionType string `json:"connection_type,omitempty"`
RoleName string `json:"role_name,omitempty"`
RoleID string `json:"role_id,omitempty"`
Actions []string `json:"actions,omitempty"`
AccessType string `json:"access_type,omitempty"`
IDs []string `json:"-"`
}

// ChannelsPage contains page related metadata as well as list of channels that
Expand All @@ -71,15 +85,15 @@ type AuthzReq struct {

//go:generate mockery --name Service --output=./mocks --filename service.go --quiet --note "Copyright (c) Abstract Machines"
type Service interface {
// CreateChannels adds channels to the user identified by the provided key.
// CreateChannels adds channels to the user.
CreateChannels(ctx context.Context, session authn.Session, channels ...Channel) ([]Channel, []roles.RoleProvision, error)

// ViewChannel retrieves data about the channel identified by the provided
// ID, that belongs to the user identified by the provided key.
// ID, that belongs to the user.
ViewChannel(ctx context.Context, session authn.Session, id string) (Channel, error)

// UpdateChannel updates the channel identified by the provided ID, that
// belongs to the user identified by the provided key.
// belongs to the user.
UpdateChannel(ctx context.Context, session authn.Session, channel Channel) (Channel, error)

// UpdateChannelTags updates the channel's tags.
Expand All @@ -89,17 +103,14 @@ type Service interface {

DisableChannel(ctx context.Context, session authn.Session, id string) (Channel, error)

// ListChannels retrieves data about subset of channels that belongs to the
// user identified by the provided key.
// ListChannels retrieves data about subset of channels that belongs to the user.
ListChannels(ctx context.Context, session authn.Session, pm PageMetadata) (Page, error)

// ListChannelsByClient retrieves data about subset of channels that have
// specified client connected or not connected to them and belong to the user identified by
// the provided key.
ListChannelsByClient(ctx context.Context, session authn.Session, id string, pm PageMetadata) (Page, error)
// ListUserChannels retrieves data about subset of channels that belong to the specified user.
ListUserChannels(ctx context.Context, session authn.Session, userID string, pm PageMetadata) (Page, error)

// RemoveChannel removes the client identified by the provided ID, that
// belongs to the user identified by the provided key.
// belongs to the user.
RemoveChannel(ctx context.Context, session authn.Session, id string) error

// Connect adds clients to the channels list of connected clients.
Expand Down Expand Up @@ -131,6 +142,9 @@ type Repository interface {

ChangeStatus(ctx context.Context, channel Channel) (Channel, error)

// RetrieveUserChannels retrieves the channel of given domainID and userID.
RetrieveUserChannels(ctx context.Context, domainID, userID string, pm PageMetadata) (Page, error)

// RetrieveByID retrieves the channel having the provided identifier
RetrieveByID(ctx context.Context, id string) (Channel, error)

Expand Down
Loading
Loading