Skip to content

Commit

Permalink
SMQ-2648 - Add API and Repository implementation for the Client stats (
Browse files Browse the repository at this point in the history
…#2647)

Signed-off-by: Felix Gateru <[email protected]>
  • Loading branch information
felixgateru authored Jan 17, 2025
1 parent 5d75599 commit fbbe5ff
Show file tree
Hide file tree
Showing 16 changed files with 506 additions and 9 deletions.
23 changes: 23 additions & 0 deletions journal/api/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,26 @@ func retrieveJournalsEndpoint(svc journal.Service) endpoint.Endpoint {
}, nil
}
}

func retrieveClientTelemetryEndpoint(svc journal.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(retrieveClientTelemetryReq)
if err := req.validate(); err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}

session, ok := ctx.Value(api.SessionKey).(authn.Session)
if !ok {
return nil, svcerr.ErrAuthorization
}

telemetry, err := svc.RetrieveClientTelemetry(ctx, session, req.clientID)
if err != nil {
return nil, err
}

return clientTelemetryRes{
ClientTelemetry: telemetry,
}, nil
}
}
85 changes: 84 additions & 1 deletion journal/api/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func TestListEntityJournalsEndpoint(t *testing.T) {
desc: "with empty domain ID",
token: validToken,
url: "/group/",
status: http.StatusNotFound,
status: http.StatusBadRequest,
svcErr: nil,
},
}
Expand Down Expand Up @@ -402,3 +402,86 @@ func TestListEntityJournalsEndpoint(t *testing.T) {
})
}
}

func TestRetrieveClientTelemetryEndpoint(t *testing.T) {
es, svc, authn := newjournalServer()

clientID := testsutil.GenerateUUID(t)
userID := testsutil.GenerateUUID(t)
domanID := testsutil.GenerateUUID(t)

cases := []struct {
desc string
token string
session smqauthn.Session
clientID string
domainID string
url string
contentType string
status int
authnErr error
svcErr error
}{
{
desc: "successful",
token: validToken,
clientID: clientID,
domainID: domanID,
url: fmt.Sprintf("/client/%s/telemetry", clientID),
status: http.StatusOK,
svcErr: nil,
},
{
desc: "with service error",
token: validToken,
clientID: clientID,
domainID: domanID,
url: fmt.Sprintf("/client/%s/telemetry", clientID),
status: http.StatusForbidden,
svcErr: svcerr.ErrAuthorization,
},
{
desc: "with empty token",
clientID: clientID,
domainID: domanID,
url: fmt.Sprintf("/client/%s/telemetry", clientID),
status: http.StatusUnauthorized,
svcErr: nil,
},
{
desc: "with invalid client ID",
token: validToken,
domainID: domanID,
clientID: "invalid",
url: "/client/invalid/telemetry",
status: http.StatusNotFound,
svcErr: svcerr.ErrNotFound,
},
}

for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
if c.token == validToken {
c.session = smqauthn.Session{
UserID: userID,
DomainID: c.domainID,
DomainUserID: c.domainID + "_" + userID,
}
}
authCall := authn.On("Authenticate", mock.Anything, c.token).Return(c.session, c.authnErr)
svcCall := svc.On("RetrieveClientTelemetry", mock.Anything, c.session, c.clientID).Return(journal.ClientTelemetry{}, c.svcErr)
req := testRequest{
client: es.Client(),
method: http.MethodGet,
url: fmt.Sprintf("%s/%s/journal%s", es.URL, c.domainID, c.url),
token: c.token,
}
resp, err := req.make()
assert.Nil(t, err, c.desc)
defer resp.Body.Close()
assert.Equal(t, c.status, resp.StatusCode, c.desc)
svcCall.Unset()
authCall.Unset()
})
}
}
12 changes: 12 additions & 0 deletions journal/api/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,15 @@ func (req retrieveJournalsReq) validate() error {

return nil
}

type retrieveClientTelemetryReq struct {
clientID string
}

func (req retrieveClientTelemetryReq) validate() error {
if req.clientID == "" {
return apiutil.ErrMissingID
}

return nil
}
30 changes: 30 additions & 0 deletions journal/api/requests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,33 @@ func TestRetrieveJournalsReqValidate(t *testing.T) {
})
}
}

func TestRetrieveClientTelemetryReqValidate(t *testing.T) {
cases := []struct {
desc string
req retrieveClientTelemetryReq
err error
}{
{
desc: "valid",
req: retrieveClientTelemetryReq{
clientID: "id",
},
err: nil,
},
{
desc: "missing client id",
req: retrieveClientTelemetryReq{
clientID: "",
},
err: apiutil.ErrMissingID,
},
}

for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
err := c.req.validate()
assert.Equal(t, c.err, err)
})
}
}
4 changes: 4 additions & 0 deletions journal/api/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ func (res pageRes) Code() int {
func (res pageRes) Empty() bool {
return false
}

type clientTelemetryRes struct {
journal.ClientTelemetry `json:",inline"`
}
31 changes: 25 additions & 6 deletions journal/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,23 @@ func MakeHandler(svc journal.Service, authn smqauthn.Authentication, logger *slo
opts...,
), "list_user_journals").ServeHTTP)

mux.With(api.AuthenticateMiddleware(authn, true)).Get("/{domainID}/journal/{entityType}/{entityID}", otelhttp.NewHandler(kithttp.NewServer(
retrieveJournalsEndpoint(svc),
decodeRetrieveEntityJournalReq,
api.EncodeResponse,
opts...,
), "list__entity_journals").ServeHTTP)
mux.Route("/{domainID}/journal", func(r chi.Router) {
r.Use(api.AuthenticateMiddleware(authn, true))

r.Get("/{entityType}/{entityID}", otelhttp.NewHandler(kithttp.NewServer(
retrieveJournalsEndpoint(svc),
decodeRetrieveEntityJournalReq,
api.EncodeResponse,
opts...,
), "list__entity_journals").ServeHTTP)

r.Get("/client/{clientID}/telemetry", otelhttp.NewHandler(kithttp.NewServer(
retrieveClientTelemetryEndpoint(svc),
decodeRetrieveClientTelemetryReq,
api.EncodeResponse,
opts...,
), "view_client_telemetry").ServeHTTP)
})

mux.Get("/health", supermq.Health(svcName, instanceID))
mux.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -160,3 +171,11 @@ func decodePageQuery(r *http.Request) (journal.Page, error) {
Direction: dir,
}, nil
}

func decodeRetrieveClientTelemetryReq(_ context.Context, r *http.Request) (interface{}, error) {
req := retrieveClientTelemetryReq{
clientID: chi.URLParam(r, "clientID"),
}

return req, nil
}
22 changes: 22 additions & 0 deletions journal/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,16 @@ func (page JournalsPage) MarshalJSON() ([]byte, error) {
return json.Marshal(a)
}

type ClientTelemetry struct {
ClientID string `json:"client_id"`
DomainID string `json:"domain_id"`
Subscriptions []string `json:"subscriptions"`
InboundMessages uint64 `json:"inbound_messages"`
OutboundMessages uint64 `json:"outbound_messages"`
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
}

// Service provides access to the journal log service.
//
//go:generate mockery --name Service --output=./mocks --filename service.go --quiet --note "Copyright (c) Abstract Machines"
Expand All @@ -146,6 +156,9 @@ type Service interface {

// RetrieveAll retrieves all journals from the database with the given page.
RetrieveAll(ctx context.Context, session smqauthn.Session, page Page) (JournalsPage, error)

// RetrieveClientTelemetry retrieves telemetry data for a client.
RetrieveClientTelemetry(ctx context.Context, session smqauthn.Session, clientID string) (ClientTelemetry, error)
}

// Repository provides access to the journal log database.
Expand All @@ -157,4 +170,13 @@ type Repository interface {

// RetrieveAll retrieves all journals from the database with the given page.
RetrieveAll(ctx context.Context, page Page) (JournalsPage, error)

// SaveClientTelemetry persists telemetry data for a client to the database.
SaveClientTelemetry(ctx context.Context, ct ClientTelemetry) error

// RetrieveClientTelemetry retrieves telemetry data for a client from the database.
RetrieveClientTelemetry(ctx context.Context, clientID, domainID string) (ClientTelemetry, error)

// DeleteClientTelemetry removes telemetry data for a client from the database.
DeleteClientTelemetry(ctx context.Context, clientID, domainID string) error
}
24 changes: 22 additions & 2 deletions journal/middleware/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (
"github.com/absmach/supermq/pkg/policies"
)

var _ journal.Service = (*authorizationMiddleware)(nil)
var (
_ journal.Service = (*authorizationMiddleware)(nil)

var readPermission = "read_permission"
readPermission = "read_permission"
)

type authorizationMiddleware struct {
svc journal.Service
Expand Down Expand Up @@ -62,3 +64,21 @@ func (am *authorizationMiddleware) RetrieveAll(ctx context.Context, session smqa

return am.svc.RetrieveAll(ctx, session, page)
}

func (am *authorizationMiddleware) RetrieveClientTelemetry(ctx context.Context, session smqauthn.Session, clientID string) (journal.ClientTelemetry, error) {
req := smqauthz.PolicyReq{
Domain: session.DomainID,
SubjectType: policies.UserType,
SubjectKind: policies.UsersKind,
Subject: session.UserID,
Permission: readPermission,
ObjectType: policies.ClientType,
Object: clientID,
}

if err := am.authz.Authorize(ctx, req); err != nil {
return journal.ClientTelemetry{}, err
}

return am.svc.RetrieveClientTelemetry(ctx, session, clientID)
}
18 changes: 18 additions & 0 deletions journal/middleware/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,21 @@ func (lm *loggingMiddleware) RetrieveAll(ctx context.Context, session smqauthn.S

return lm.service.RetrieveAll(ctx, session, page)
}

func (lm *loggingMiddleware) RetrieveClientTelemetry(ctx context.Context, session smqauthn.Session, clientID string) (ct journal.ClientTelemetry, err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("client_id", clientID),
slog.String("domain_id", session.DomainID),
}
if err != nil {
args = append(args, slog.Any("error", err))
lm.logger.Warn("Retrieve client telemetry failed", args...)
return
}
lm.logger.Info("Retrieve client telemetry completed successfully", args...)
}(time.Now())

return lm.service.RetrieveClientTelemetry(ctx, session, clientID)
}
9 changes: 9 additions & 0 deletions journal/middleware/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,12 @@ func (mm *metricsMiddleware) RetrieveAll(ctx context.Context, session smqauthn.S

return mm.service.RetrieveAll(ctx, session, page)
}

func (mm *metricsMiddleware) RetrieveClientTelemetry(ctx context.Context, session smqauthn.Session, clientID string) (journal.ClientTelemetry, error) {
defer func(begin time.Time) {
mm.counter.With("method", "retrieve_client_telemetry").Add(1)
mm.latency.With("method", "retrieve_client_telemetry").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.service.RetrieveClientTelemetry(ctx, session, clientID)
}
10 changes: 10 additions & 0 deletions journal/middleware/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,13 @@ func (tm *tracing) RetrieveAll(ctx context.Context, session smqauthn.Session, pa

return tm.svc.RetrieveAll(ctx, session, page)
}

func (tm *tracing) RetrieveClientTelemetry(ctx context.Context, session smqauthn.Session, clientID string) (j journal.ClientTelemetry, err error) {
ctx, span := tm.tracer.Start(ctx, "retrieve", trace.WithAttributes(
attribute.String("client_id", clientID),
attribute.String("domain_id", session.DomainID),
))
defer span.End()

return tm.svc.RetrieveClientTelemetry(ctx, session, clientID)
}
Loading

0 comments on commit fbbe5ff

Please sign in to comment.