Skip to content

Commit

Permalink
feat(telemetry): add telemetry to worker (#161)
Browse files Browse the repository at this point in the history
* feat(telemetry): add telemetry to worker

* refactor(telemetry): additional span attributes arg

* refactor(telemetry): span key name

* feat(telemetry): add gauge message queue time
  • Loading branch information
mabdh authored Dec 5, 2022
1 parent a2bf367 commit ac9f5fc
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 40 deletions.
5 changes: 5 additions & 0 deletions cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/odpf/siren/core/notification"
"github.com/odpf/siren/pkg/pgc"
"github.com/odpf/siren/pkg/secret"
"github.com/odpf/siren/pkg/telemetry"
"github.com/odpf/siren/pkg/worker"
"github.com/odpf/siren/plugins/queues"
"github.com/odpf/siren/plugins/queues/postgresq"
Expand Down Expand Up @@ -133,6 +134,8 @@ func workerStartNotificationDLQHandlerCommand() *cobra.Command {
func StartNotificationHandlerWorker(ctx context.Context, cfg config.Config, cancelWorkerChan chan struct{}) error {
logger := initLogger(cfg.Log)

telemetry.Init(ctx, cfg.Telemetry, logger)

dbClient, err := db.New(cfg.DB)
if err != nil {
return err
Expand Down Expand Up @@ -182,6 +185,8 @@ func StartNotificationHandlerWorker(ctx context.Context, cfg config.Config, canc
func StartNotificationDLQHandlerWorker(ctx context.Context, cfg config.Config, cancelWorkerChan chan struct{}) error {
logger := initLogger(cfg.Log)

telemetry.Init(ctx, cfg.Telemetry, logger)

dbClient, err := db.New(cfg.DB)
if err != nil {
return err
Expand Down
11 changes: 10 additions & 1 deletion core/notification/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/odpf/siren/pkg/errors"
"github.com/odpf/siren/pkg/telemetry"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
)

const (
Expand Down Expand Up @@ -85,11 +86,15 @@ func (h *Handler) Process(ctx context.Context, runAt time.Time) error {
if len(receiverTypes) == 0 {
return errors.New("no receiver type plugin registered, skipping dequeue")
} else {
ctx, span := h.messagingTracer.StartSpan(ctx, "batch_dequeue", nil)
ctx, span := h.messagingTracer.StartSpan(ctx, "batch_dequeue", trace.StringAttribute("messaging.handler_id", h.identifier))
defer span.End()

if err := h.q.Dequeue(ctx, receiverTypes, h.batchSize, h.MessageHandler); err != nil {
if !errors.Is(err, ErrNoMessage) {
span.SetStatus(trace.Status{
Code: trace.StatusCodeUnknown,
Message: err.Error(),
})
return fmt.Errorf("dequeue failed on handler with id %s: %w", h.identifier, err)
}
}
Expand All @@ -100,6 +105,10 @@ func (h *Handler) Process(ctx context.Context, runAt time.Time) error {
// MessageHandler is a function to handler dequeued message
func (h *Handler) MessageHandler(ctx context.Context, messages []Message) error {
for _, message := range messages {

telemetry.GaugeMillisecond(ctx, telemetry.MetricNotificationMessageQueueTime, time.Since(message.UpdatedAt).Milliseconds(),
tag.Upsert(telemetry.TagReceiverType, message.ReceiverType))

notifier, err := h.getNotifierPlugin(message.ReceiverType)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions core/notification/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (m *Message) Initialize(

m.ReceiverType = receiverType
m.Configs = notificationConfigs

details := make(map[string]interface{})
for k, v := range n.Labels {
details[k] = v
Expand Down
17 changes: 9 additions & 8 deletions core/notification/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/odpf/siren/pkg/errors"
"github.com/odpf/siren/pkg/telemetry"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
"gopkg.in/yaml.v3"
)

Expand Down Expand Up @@ -71,10 +72,10 @@ func (ns *NotificationService) DispatchToReceiver(ctx context.Context, n Notific
return err
}

ctx, span := ns.messagingTracer.StartSpan(ctx, "prepare_enqueue", map[string]string{
"messages.notification_id": n.ID,
"messages.routing_method": RoutingMethodReceiver.String(),
})
ctx, span := ns.messagingTracer.StartSpan(ctx, "prepare_enqueue",
trace.StringAttribute("messaging.notification_id", n.ID),
trace.StringAttribute("messaging.routing_method", RoutingMethodReceiver.String()),
)
defer span.End()

notifierPlugin, err := ns.getNotifierPlugin(rcv.Type)
Expand Down Expand Up @@ -125,10 +126,10 @@ func (ns *NotificationService) DispatchToSubscribers(ctx context.Context, n Noti
return errors.ErrInvalid.WithMsgf("not matching any subscription")
}

ctx, span := ns.messagingTracer.StartSpan(ctx, "prepare_enqueue", map[string]string{
"messages.notification_id": n.ID,
"messages.routing_method": RoutingMethodSubscribers.String(),
})
ctx, span := ns.messagingTracer.StartSpan(ctx, "prepare_enqueue",
trace.StringAttribute("messaging.notification_id", n.ID),
trace.StringAttribute("messaging.routing_method", RoutingMethodSubscribers.String()),
)
defer span.End()

var messages = make([]Message, 0)
Expand Down
3 changes: 0 additions & 3 deletions docs/docs/reference/server_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ telemetry:
# OpenCensus.
enable_newrelic: <bool> | default=false

# new relic app name, if left empty, app name will be service_name
newrelic_app_name: <string> | default=""

# newrelic_api_key must be a valid NewRelic License key.
newrelic_api_key: <string> | default="____LICENSE_STRING_OF_40_CHARACTERS_____"

Expand Down
30 changes: 15 additions & 15 deletions pkg/pgc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ func Migrate(cfg db.Config) error {
}

func (c *Client) QueryRowxContext(ctx context.Context, op string, tableName string, query string, args ...interface{}) *sqlx.Row {
ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName, map[string]string{
"db.statement": query,
})
ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName,
trace.StringAttribute("db.statement", query),
)
defer span.End()
sqlxRow := c.GetDB(ctx).QueryRowxContext(ctx, query, args...)
if sqlxRow.Err() != nil {
Expand All @@ -103,9 +103,9 @@ func (c *Client) QueryRowxContext(ctx context.Context, op string, tableName stri
}

func (c *Client) QueryxContext(ctx context.Context, op string, tableName string, query string, args ...interface{}) (*sqlx.Rows, error) {
ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName, map[string]string{
"db.statement": query,
})
ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName,
trace.StringAttribute("db.statement", query),
)
defer span.End()
sqlxRow, err := c.GetDB(ctx).QueryxContext(ctx, query, args...)
if err != nil {
Expand All @@ -118,9 +118,9 @@ func (c *Client) QueryxContext(ctx context.Context, op string, tableName string,
}

func (c *Client) GetContext(ctx context.Context, op string, tableName string, dest interface{}, query string, args ...interface{}) error {
ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName, map[string]string{
"db.statement": query,
})
ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName,
trace.StringAttribute("db.statement", query),
)
defer span.End()

if err := c.GetDB(ctx).QueryRowxContext(ctx, query, args...).StructScan(dest); err != nil {
Expand All @@ -135,9 +135,9 @@ func (c *Client) GetContext(ctx context.Context, op string, tableName string, de
}

func (c *Client) ExecContext(ctx context.Context, op string, tableName string, query string, args ...interface{}) (sql.Result, error) {
ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName, map[string]string{
"db.statement": query,
})
ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName,
trace.StringAttribute("db.statement", query),
)
defer span.End()

res, err := c.db.ExecContext(ctx, query, args...)
Expand All @@ -153,9 +153,9 @@ func (c *Client) ExecContext(ctx context.Context, op string, tableName string, q
}

func (c *Client) NamedExecContext(ctx context.Context, op string, tableName string, query string, arg interface{}) (sql.Result, error) {
ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName, map[string]string{
"db.statement": query,
})
ctx, span := c.postgresTracer.StartSpan(ctx, op, tableName,
trace.StringAttribute("db.statement", query),
)
defer span.End()

res, err := c.db.NamedExecContext(ctx, query, arg)
Expand Down
9 changes: 9 additions & 0 deletions pkg/telemetry/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ var (
TagRoutingMethod = tag.MustNewKey("routing_method")
TagMessageStatus = tag.MustNewKey("status")

MetricNotificationMessageQueueTime = stats.Int64("notification.message.queue.time", "time of message from enqueued to be picked up", stats.UnitMilliseconds)

MetricNotificationMessageEnqueue = stats.Int64("notification.message.enqueue", "enqueued notification messages", stats.UnitDimensionless)
MetricNotificationMessagePending = stats.Int64("notification.message.pending", "processed notification messages", stats.UnitDimensionless)
MetricNotificationMessageFailed = stats.Int64("notification.message.failed", "failed to publish notification messages", stats.UnitDimensionless)
Expand All @@ -29,6 +31,13 @@ var (

func setupApplicationViews() error {
return view.Register(
&view.View{
Name: MetricNotificationMessageQueueTime.Name(),
Description: MetricNotificationMessageQueueTime.Description(),
TagKeys: []tag.Key{TagReceiverType},
Measure: MetricNotificationMessageQueueTime,
Aggregation: view.Distribution(),
},
&view.View{
Name: MetricNotificationMessageEnqueue.Name(),
Description: MetricNotificationMessageEnqueue.Description(),
Expand Down
6 changes: 2 additions & 4 deletions pkg/telemetry/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func NewMessagingTracer(queueSystem string) *MessagingTracer {
}
}

func (msg MessagingTracer) StartSpan(ctx context.Context, op string, spanAttributes map[string]string) (context.Context, *trace.Span) {
func (msg MessagingTracer) StartSpan(ctx context.Context, op string, spanAttributes ...trace.Attribute) (context.Context, *trace.Span) {
// Refer https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
ctx, span := trace.StartSpan(ctx, fmt.Sprintf("notification_queue %s", op), trace.WithSpanKind(trace.SpanKindClient))

Expand All @@ -28,9 +28,7 @@ func (msg MessagingTracer) StartSpan(ctx context.Context, op string, spanAttribu
trace.StringAttribute("messaging.operation", op),
}

for k, v := range spanAttributes {
traceAttributes = append(traceAttributes, trace.StringAttribute(k, v))
}
traceAttributes = append(traceAttributes, spanAttributes...)

span.AddAttributes(
traceAttributes...,
Expand Down
6 changes: 1 addition & 5 deletions pkg/telemetry/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@ func setupOpenCensus(ctx context.Context, mux *http.ServeMux, cfg Config) error
}

if cfg.EnableNewrelic {
nrAppName := cfg.ServiceName
if cfg.NewRelicAppName != "" {
nrAppName = cfg.NewRelicAppName
}
exporter, err := nrcensus.NewExporter(nrAppName, cfg.NewRelicAPIKey)
exporter, err := nrcensus.NewExporter(cfg.ServiceName, cfg.NewRelicAPIKey)
if err != nil {
return err
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/telemetry/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewPostgresTracer(url string) (*PostgresTracer, error) {
}, err
}

func (d PostgresTracer) StartSpan(ctx context.Context, op string, tableName string, spanAttributes map[string]string) (context.Context, *trace.Span) {
func (d PostgresTracer) StartSpan(ctx context.Context, op string, tableName string, spanAttributes ...trace.Attribute) (context.Context, *trace.Span) {
// Refer https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/database.md
ctx, span := trace.StartSpan(ctx, fmt.Sprintf("%s %s.%s", op, d.dbName, tableName), trace.WithSpanKind(trace.SpanKindClient))

Expand All @@ -45,9 +45,7 @@ func (d PostgresTracer) StartSpan(ctx context.Context, op string, tableName stri
trace.StringAttribute("db.sql.table", tableName),
}

for k, v := range spanAttributes {
traceAttributes = append(traceAttributes, trace.StringAttribute(k, v))
}
traceAttributes = append(traceAttributes, spanAttributes...)

span.AddAttributes(
traceAttributes...,
Expand Down
10 changes: 10 additions & 0 deletions pkg/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,13 @@ func IncrementInt64Counter(ctx context.Context, si64 *stats.Int64Measure, tagMut

stats.Record(counterCtx, si64.M(1))
}

func GaugeMillisecond(ctx context.Context, si64 *stats.Int64Measure, value int64, tagMutator ...tag.Mutator) {
counterCtx := ctx

if tagMutator != nil {
counterCtx, _ = tag.New(ctx, tagMutator...)
}

stats.Record(counterCtx, si64.M(value))
}

0 comments on commit ac9f5fc

Please sign in to comment.