Skip to content

Commit

Permalink
feat: add clients telemetry to journals service
Browse files Browse the repository at this point in the history
Signed-off-by: Felix Gateru <[email protected]>
  • Loading branch information
felixgateru committed Jan 16, 2025
1 parent fdea1a7 commit c52b865
Show file tree
Hide file tree
Showing 14 changed files with 526 additions and 0 deletions.
20 changes: 20 additions & 0 deletions journal/api/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,23 @@ func retrieveJournalsEndpoint(svc journal.Service) endpoint.Endpoint {
}, nil
}
}

func retrieveClientTelemetryEndpoint(svc journal.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(retrieveClientTelemetryReq)
if err := req.validate(); err != nil {
return nil, errors.Wrap(apiutil.ErrValidation, err)
}


telemetry, err := svc.RetrieveClientTelemetry(ctx, req.clientID, req.domainID)
if err != nil {
return nil, err
}

return clientTelemetryRes{
ClientsTelemetry: telemetry,
}, nil
}

}

Check failure on line 59 in journal/api/endpoint.go

View workflow job for this annotation

GitHub Actions / Lint and Build

unnecessary trailing newline (whitespace)
17 changes: 17 additions & 0 deletions journal/api/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,20 @@ func (req retrieveJournalsReq) validate() error {

return nil
}

type retrieveClientTelemetryReq struct {
token string
clientID string
domainID string
}

func (req retrieveClientTelemetryReq) validate () error {
if req.clientID == "" {
return apiutil.ErrMissingID
}
if req.domainID == "" {
return apiutil.ErrMissingID
}

return nil
}
4 changes: 4 additions & 0 deletions journal/api/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ func (res pageRes) Code() int {
func (res pageRes) Empty() bool {
return false
}

type clientTelemetryRes struct {
journal.ClientsTelemetry `json:",inline"`
}
17 changes: 17 additions & 0 deletions journal/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ func MakeHandler(svc journal.Service, authn smqauthn.Authentication, logger *slo
opts...,
), "list__entity_journals").ServeHTTP)

mux.With(api.AuthenticateMiddleware(authn, true)).Get("/{domainID}/journal/{clientID}/client-telemetry", otelhttp.NewHandler(kithttp.NewServer(
retrieveClientTelemetryEndpoint(svc),
decodeRetrieveClientTelemetryReq,
api.EncodeResponse,
opts...,
), "list_client_telemetry").ServeHTTP)

mux.Get("/health", supermq.Health(svcName, instanceID))
mux.Handle("/metrics", promhttp.Handler())

Expand Down Expand Up @@ -160,3 +167,13 @@ func decodePageQuery(r *http.Request) (journal.Page, error) {
Direction: dir,
}, nil
}

func decodeRetrieveClientTelemetryReq(_ context.Context, r *http.Request) (interface{}, error) {
req := retrieveClientTelemetryReq{
token: apiutil.ExtractBearerToken(r),
clientID: chi.URLParam(r, "clientID"),
domainID: chi.URLParam(r, "domainID"),
}

return req, nil
}
28 changes: 28 additions & 0 deletions journal/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,19 @@ func (page JournalsPage) MarshalJSON() ([]byte, error) {
return json.Marshal(a)
}

type Message struct {
ID string `json:"id"`
ChannelID string `json:"channel_id"`
Protocol string `json:"protocol"`
}

type ClientsTelemetry struct {
ClientID string `json:"client_id"`
DomainID string `json:"domain_id"`
Connections []string `json:"connections"`
Messages []Message `json:"messages"`
}

// Service provides access to the journal log service.
//
//go:generate mockery --name Service --output=./mocks --filename service.go --quiet --note "Copyright (c) Abstract Machines"
Expand All @@ -146,6 +159,8 @@ type Service interface {

// RetrieveAll retrieves all journals from the database with the given page.
RetrieveAll(ctx context.Context, session smqauthn.Session, page Page) (JournalsPage, error)

RetrieveClientTelemetry(ctx context.Context, clientID, domainID string) (ClientsTelemetry, error)
}

// Repository provides access to the journal log database.
Expand All @@ -157,4 +172,17 @@ type Repository interface {

// RetrieveAll retrieves all journals from the database with the given page.
RetrieveAll(ctx context.Context, page Page) (JournalsPage, error)

// SaveClientTelemetry adds telemetry data for a client.
SaveClientTelemetry(ctx context.Context, clientID, domainID string) error

// RetrieveClientTelemetry retrieves telemetry data for a client.
RetrieveClientTelemetry(ctx context.Context, clientID, domainID string) (ClientsTelemetry, error)

// DeleteClientTelemetry removes telemetry data for a client.
DeleteClientTelemetry(ctx context.Context, clientID, domainID string) error

AddClientConnection(ctx context.Context, clientID, domainID, connection string) error

RemoveClientConnection(ctx context.Context, clientID, domainID, connection string) error
}
4 changes: 4 additions & 0 deletions journal/middleware/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,7 @@ func (am *authorizationMiddleware) RetrieveAll(ctx context.Context, session smqa

return am.svc.RetrieveAll(ctx, session, page)
}

func (am *authorizationMiddleware) RetrieveClientTelemetry(ctx context.Context, clientID, domainID string) (journal.ClientsTelemetry, error) {
return am.svc.RetrieveClientTelemetry(ctx, clientID, domainID)
}
18 changes: 18 additions & 0 deletions journal/middleware/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,21 @@ func (lm *loggingMiddleware) RetrieveAll(ctx context.Context, session smqauthn.S

return lm.service.RetrieveAll(ctx, session, page)
}

func (lm *loggingMiddleware) RetrieveClientTelemetry(ctx context.Context, clientID, domainID string) (ct journal.ClientsTelemetry, err error) {
defer func(begin time.Time) {
args := []any{
slog.String("duration", time.Since(begin).String()),
slog.String("client_id", clientID),
slog.String("domain_id", domainID),
}
if err != nil {
args = append(args, slog.Any("error", err))
lm.logger.Warn("Retrieve client telemetry failed", args...)
return
}
lm.logger.Info("Retrieve client telemetry completed successfully", args...)
}(time.Now())

return lm.service.RetrieveClientTelemetry(ctx, clientID, domainID)
}
9 changes: 9 additions & 0 deletions journal/middleware/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,12 @@ func (mm *metricsMiddleware) RetrieveAll(ctx context.Context, session smqauthn.S

return mm.service.RetrieveAll(ctx, session, page)
}

func (mm *metricsMiddleware) RetrieveClientTelemetry(ctx context.Context, clientID, domainID string) (journal.ClientsTelemetry, error){
defer func(begin time.Time) {
mm.counter.With("method", "retrieve_client_telemetry").Add(1)
mm.latency.With("method", "retrieve_client_telemetry").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.service.RetrieveClientTelemetry(ctx, clientID, domainID)
}
10 changes: 10 additions & 0 deletions journal/middleware/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,13 @@ func (tm *tracing) RetrieveAll(ctx context.Context, session smqauthn.Session, pa

return tm.svc.RetrieveAll(ctx, session, page)
}

func (tm *tracing) RetrieveClientTelemetry(ctx context.Context, clientID, domainID string) (j journal.ClientsTelemetry, err error) {
ctx, span := tm.tracer.Start(ctx, "retrieve", trace.WithAttributes(
attribute.String("client_id", clientID),
attribute.String("domain_id", domainID),
))
defer span.End()

return tm.svc.RetrieveClientTelemetry(ctx, clientID, domainID)
}
100 changes: 100 additions & 0 deletions journal/mocks/repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions journal/mocks/service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions journal/postgres/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,16 @@ func Migration() *migrate.MemoryMigrationSource {
`CREATE INDEX idx_journal_default_group_filter ON journal(operation, (attributes->>'id'), (attributes->>'group_id'), occurred_at DESC);`,
`CREATE INDEX idx_journal_default_client_filter ON journal(operation, (attributes->>'id'), (attributes->>'client_id'), occurred_at DESC);`,
`CREATE INDEX idx_journal_default_channel_filter ON journal(operation, (attributes->>'id'), (attributes->>'channel_id'), occurred_at DESC);`,
`CREATE TABLE IF NOT EXISTS clients_telemetry (
client_id VARCHAR(36) NOT NULL,
domain_id VARCHAR(36) NOT NULL,
connections JSONB,
messages JSONB,
PRIMARY KEY (client_id, domain_id)
)`,
},
Down: []string{
`DROP TABLE IF EXISTS clients_telemetry`,
`DROP TABLE IF EXISTS journal`,
},
},
Expand Down
Loading

0 comments on commit c52b865

Please sign in to comment.