Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dependency processor using Apache Beam #6560

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/jaeger/internal/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
attributesprocessor.NewFactory(),
// add-ons
adaptivesampling.NewFactory(),
dependencyprocessor.NewFactory(),

Check failure on line 113 in cmd/jaeger/internal/components.go

View workflow job for this annotation

GitHub Actions / binary-size-check

undefined: dependencyprocessor

Check failure on line 113 in cmd/jaeger/internal/components.go

View workflow job for this annotation

GitHub Actions / spm (v2, jaeger)

undefined: dependencyprocessor

Check failure on line 113 in cmd/jaeger/internal/components.go

View workflow job for this annotation

GitHub Actions / hotrod (docker, v2)

undefined: dependencyprocessor

Check failure on line 113 in cmd/jaeger/internal/components.go

View workflow job for this annotation

GitHub Actions / crossdock

undefined: dependencyprocessor

Check failure on line 113 in cmd/jaeger/internal/components.go

View workflow job for this annotation

GitHub Actions / docker-images

undefined: dependencyprocessor

Check failure on line 113 in cmd/jaeger/internal/components.go

View workflow job for this annotation

GitHub Actions / all-in-one (v2)

undefined: dependencyprocessor

Check failure on line 113 in cmd/jaeger/internal/components.go

View workflow job for this annotation

GitHub Actions / build-binaries-linux-arm64

undefined: dependencyprocessor

Check failure on line 113 in cmd/jaeger/internal/components.go

View workflow job for this annotation

GitHub Actions / hotrod (k8s, v2)

undefined: dependencyprocessor

Check failure on line 113 in cmd/jaeger/internal/components.go

View workflow job for this annotation

GitHub Actions / build-binaries-linux-ppc64le

undefined: dependencyprocessor

Check failure on line 113 in cmd/jaeger/internal/components.go

View workflow job for this annotation

GitHub Actions / build-binaries-linux-amd64

undefined: dependencyprocessor

Check failure on line 113 in cmd/jaeger/internal/components.go

View workflow job for this annotation

GitHub Actions / build-binaries-windows-amd64

undefined: dependencyprocessor

Check failure on line 113 in cmd/jaeger/internal/components.go

View workflow job for this annotation

GitHub Actions / build-binaries-linux-s390x

undefined: dependencyprocessor

Check failure on line 113 in cmd/jaeger/internal/components.go

View workflow job for this annotation

GitHub Actions / build-binaries-darwin-arm64

undefined: dependencyprocessor

Check failure on line 113 in cmd/jaeger/internal/components.go

View workflow job for this annotation

GitHub Actions / build-binaries-darwin-amd64

undefined: dependencyprocessor

Check failure on line 113 in cmd/jaeger/internal/components.go

View workflow job for this annotation

GitHub Actions / lint

undefined: dependencyprocessor) (typecheck)

Check failure on line 113 in cmd/jaeger/internal/components.go

View workflow job for this annotation

GitHub Actions / lint

undefined: dependencyprocessor (typecheck)
)
if err != nil {
return otelcol.Factories{}, err
Expand Down
201 changes: 201 additions & 0 deletions cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package dependencyprocessor
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"sync"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"

Check failure on line 11 in cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go

View workflow job for this annotation

GitHub Actions / unit-tests

no required module provides package github.com/apache/beam/sdks/v2/go/pkg/beam; to add it:
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"

Check failure on line 12 in cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go

View workflow job for this annotation

GitHub Actions / unit-tests

no required module provides package github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window; to add it:
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"

Check failure on line 13 in cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go

View workflow job for this annotation

GitHub Actions / unit-tests

no required module provides package github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats; to add it:
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"

Check failure on line 14 in cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go

View workflow job for this annotation

GitHub Actions / unit-tests

no required module provides package github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx; to add it:
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

var beamInitOnce sync.Once

// dependencyAggregator processes spans and aggregates dependencies using Apache Beam
type dependencyAggregator struct {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
config *Config
telset component.TelemetrySettings
dependencyWriter spanstore.Writer
inputChan chan spanEvent
closeChan chan struct{}
}

// spanEvent represents a span with its service name and timestamp
type spanEvent struct {
span ptrace.Span
serviceName string
eventTime time.Time
}

// newDependencyAggregator creates a new dependency aggregator
func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter spanstore.Writer) *dependencyAggregator {
beamInitOnce.Do(func() {
beam.Init()
})
return &dependencyAggregator{
config: &cfg,
telset: telset,
dependencyWriter: dependencyWriter,
inputChan: make(chan spanEvent, 1000),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the motivation for making this a bound queue?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation for making the inputChan a bounded queue (a buffered channel with a fixed size, e.g., 1000) is primarily to manage backpressure, control resource usage, and ensure system stability in high-throughput scenarios

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes,it also can be implemented as follow:
Current code:

func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter spanstore.Writer) *dependencyAggregator {
    beamInitOnce.Do(func() {
        beam.Init()
    })
    return &dependencyAggregator{
        config:           &cfg,
        telset:           telset,
        dependencyWriter: dependencyWriter,
        inputChan:        make(chan spanEvent, 1000), // Arbitrary bounded queue
        closeChan:        make(chan struct{}),
    }
}

The bounded queue (capacity 1000) has several issues:

  1. The size is arbitrary with no clear justification
  2. It can lead to dropped spans under high load
  3. There's no backpressure mechanism - spans are silently dropped when the queue is full
  4. The system provides no visibility into queue capacity or dropped spans beyond a warning log

Better approaches may be:

  1. Use an unbounded channel if memory usage is not a concern:
inputChan: make(chan spanEvent) // Unbounded channel
  1. Make it configurable if bounds are needed:
type Config struct {
    // ... other fields ...
    
    // QueueSize controls the span processing queue capacity
    // Zero means unbounded, non-zero means bounded with backpressure
    QueueSize int `yaml:"queue_size" valid:">=0"`
}
  1. Add proper backpressure instead of dropping spans:
func (agg *dependencyAggregator) HandleSpan(ctx context.Context, span ptrace.Span, serviceName string) error {
    event := spanEvent{
        span:        span,
        serviceName: serviceName,
        eventTime:   time.Now(),
    }
    
    select {
    case agg.inputChan <- event:
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

This would:

  • Allow configuration based on system resources
  • Provide backpressure rather than dropping data
  • Make the behavior more predictable and observable
  • Better handle high load scenarios

The current arbitrary bound of 1000 should either be removed or justified with proper configuration and backpressure mechanisms.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation for making the inputChan a bounded queue (a buffered channel with a fixed size, e.g., 1000) is primarily to manage backpressure, control resource usage, and ensure system stability in high-throughput scenarios

When a channel is unbounded, it cannot be written to unless there there is a reader waiting to consume it, so it provides a natural back pressure as the caller goroutine will be blocked and hold the remote caller. And it does not allow the queue to grow and accumulate unprocessed data while making it look like the processing was immediately successful.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok,i will fix it . Could the part of code is ready to be merged ?

closeChan: make(chan struct{}),
}
}

// Start begins the aggregation process
func (agg *dependencyAggregator) Start(closeChan chan struct{}) {
agg.closeChan = closeChan
Comment on lines +57 to +58
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (agg *dependencyAggregator) Start(closeChan chan struct{}) {
agg.closeChan = closeChan
func (agg *dependencyAggregator) Start() {

go agg.runPipeline()
}

// HandleSpan processes a single span
func (agg *dependencyAggregator) HandleSpan(ctx context.Context, span ptrace.Span, serviceName string) {
event := spanEvent{
span: span,
serviceName: serviceName,
eventTime: time.Now(),
}
select {
case agg.inputChan <- event:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the motivation for having this done in the background instead of in the caller goroutine? Are the operations on Beam pipeline threadsafe or is this the reason for separation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation for processing spans in the background (via a separate goroutine) rather than in the caller goroutine is primarily related to performance optimization, decoupling of concerns, and ensuring thread safety when interacting with the Apache Beam pipeline

default:
agg.telset.Logger.Warn("Input channel full, dropping span")
}
}

// runPipeline runs the main processing pipeline
func (agg *dependencyAggregator) runPipeline() {
for {
var events []spanEvent
timer := time.NewTimer(agg.config.AggregationInterval)

collectLoop:
for {
select {
case event := <-agg.inputChan:
events = append(events, event)
case <-timer.C:
break collectLoop
case <-agg.closeChan:
if !timer.Stop() {
<-timer.C
}
if len(events) > 0 {
agg.processEvents(context.Background(), events)
}
return
}
}

if len(events) > 0 {
agg.processEvents(context.Background(), events)
}
}
}

// processEvents processes a batch of spans using Beam pipeline
func (agg *dependencyAggregator) processEvents(ctx context.Context, events []spanEvent) {
// Create new pipeline and scope
p, s := beam.NewPipelineWithRoot()

// Create initial PCollection with timestamps
col := beam.CreateList(s, events)

// Transform into timestamped KV pairs
timestamped := beam.ParDo(s, func(event spanEvent) beam.WindowValue {
return beam.WindowValue{
Timestamp: event.eventTime,
Windows: window.IntervalWindow{Start: event.eventTime, End: event.eventTime.Add(agg.config.InactivityTimeout)},
Value: beam.KV{
Key: event.span.TraceID(),
Value: event,
},
}
}, col)

// Apply session windows
windowed := beam.WindowInto(s,
window.NewSessions(agg.config.InactivityTimeout),
timestamped,
)

// Group by TraceID and aggregate dependencies
grouped := stats.GroupByKey(s, windowed)

// Calculate dependencies for each trace
dependencies := beam.ParDo(s, func(key pcommon.TraceID, iter func(*spanEvent) bool) []*model.DependencyLink {
spanMap := make(map[pcommon.SpanID]spanEvent)
var event *spanEvent

// Build span map
for iter(event) {
spanMap[event.span.SpanID()] = *event
}

// Calculate dependencies
deps := make(map[string]*model.DependencyLink)
for _, event := range spanMap {
parentSpanID := event.span.ParentSpanID()
if parentEvent, hasParent := spanMap[parentSpanID]; hasParent {
parentService := parentEvent.serviceName
childService := event.serviceName

// Create dependency link if services are different
if parentService != "" && childService != "" && parentService != childService {
depKey := parentService + "&&&" + childService
if dep, exists := deps[depKey]; exists {
dep.CallCount++
} else {
deps[depKey] = &model.DependencyLink{
Parent: parentService,
Child: childService,
CallCount: 1,
}
}
}
}
}

return depMapToSlice(deps)
}, grouped)

// Merge results from all windows
merged := beam.Flatten(s, dependencies)

// Write to storage
beam.ParDo0(s, func(deps []*model.DependencyLink) {
if err := agg.dependencyWriter.WriteDependencies(ctx, time.Now(), deps); err != nil {

Check failure on line 177 in cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go

View workflow job for this annotation

GitHub Actions / lint

agg.dependencyWriter.WriteDependencies undefined (type "github.com/jaegertracing/jaeger/storage/spanstore".Writer has no field or method WriteDependencies) (typecheck)
agg.telset.Logger.Error("Failed to write dependencies", zap.Error(err))
}
}, merged)

// Execute pipeline
if err := beamx.Run(ctx, p); err != nil {
agg.telset.Logger.Error("Failed to run beam pipeline", zap.Error(err))
}
}

// Close shuts down the aggregator
func (agg *dependencyAggregator) Close() error {
close(agg.closeChan)
return nil
}

// depMapToSlice converts dependency map to slice
func depMapToSlice(deps map[string]*model.DependencyLink) []*model.DependencyLink {
result := make([]*model.DependencyLink, 0, len(deps))
for _, dep := range deps {
result = append(result, dep)
}
return result
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package dependencyprocessor

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
)

// MockDependencyWriter is a mock implementation of spanstore.Writer
type MockDependencyWriter struct {
mock.Mock
}

func (m *MockDependencyWriter) WriteSpan(ctx context.Context, span *model.Span) error {
args := m.Called(ctx, span)
return args.Error(0)
}

func (m *MockDependencyWriter) WriteDependencies(ctx context.Context, ts time.Time, deps []model.DependencyLink) error {
args := m.Called(ctx, ts, deps)
return args.Error(0)
}

func TestAggregator(t *testing.T) {
// Create mock writer
mockWriter := new(MockDependencyWriter)

// Create config
cfg := Config{
AggregationInterval: 100 * time.Millisecond,
InactivityTimeout: 50 * time.Millisecond,
}

// Create logger
logger := zap.NewNop()
telemetrySettings := component.TelemetrySettings{
Logger: logger,
}

// Create aggregator
agg := newDependencyAggregator(cfg, telemetrySettings, mockWriter)

// Start aggregator
closeChan := make(chan struct{})
agg.Start(closeChan)
defer close(closeChan)

// Create test spans
traceID := createTraceID(1)
parentSpanID := createSpanID(2)
childSpanID := createSpanID(3)

// Create parent span
parentSpan := createSpan(traceID, parentSpanID, pcommon.SpanID{}, "service1")

// Create child span
childSpan := createSpan(traceID, childSpanID, parentSpanID, "service2")

// Setup mock expectations
mockWriter.On("WriteDependencies", mock.Anything, mock.Anything, mock.MatchedBy(func(deps []model.DependencyLink) bool {
if len(deps) != 1 {
return false
}
dep := deps[0]
return dep.Parent == "service1" && dep.Child == "service2" && dep.CallCount == 1
})).Return(nil)

// Handle spans
ctx := context.Background()
agg.HandleSpan(ctx, parentSpan, "service1")
agg.HandleSpan(ctx, childSpan, "service2")

// Wait for processing and verify
assert.Eventually(t, func() bool {
return mockWriter.AssertExpectations(t)
}, time.Second, 10*time.Millisecond, "Dependencies were not written as expected")
}

func TestAggregatorInactivityTimeout(t *testing.T) {
mockWriter := new(MockDependencyWriter)
cfg := Config{
AggregationInterval: 1 * time.Second,
InactivityTimeout: 50 * time.Millisecond,
}

agg := newDependencyAggregator(cfg, component.TelemetrySettings{Logger: zap.NewNop()}, mockWriter)
closeChan := make(chan struct{})
agg.Start(closeChan)
defer close(closeChan)

traceID := createTraceID(1)
spanID := createSpanID(1)
span := createSpan(traceID, spanID, pcommon.SpanID{}, "service1")

mockWriter.On("WriteDependencies", mock.Anything, mock.Anything, mock.Anything).Return(nil)

ctx := context.Background()
agg.HandleSpan(ctx, span, "service1")

assert.Eventually(t, func() bool {
agg.tracesLock.RLock()

Check failure on line 114 in cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go

View workflow job for this annotation

GitHub Actions / lint

agg.tracesLock undefined (type *dependencyAggregator has no field or method tracesLock) (typecheck)
defer agg.tracesLock.RUnlock()

Check failure on line 115 in cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go

View workflow job for this annotation

GitHub Actions / lint

agg.tracesLock undefined (type *dependencyAggregator has no field or method tracesLock) (typecheck)
return len(agg.traces) == 0

Check failure on line 116 in cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go

View workflow job for this annotation

GitHub Actions / lint

agg.traces undefined (type *dependencyAggregator has no field or method traces) (typecheck)
}, time.Second, 10*time.Millisecond, "Trace was not cleared after inactivity timeout")
}

func TestAggregatorClose(t *testing.T) {
mockWriter := new(MockDependencyWriter)
cfg := Config{
AggregationInterval: 1 * time.Second,
InactivityTimeout: 1 * time.Second,
}

agg := newDependencyAggregator(cfg, component.TelemetrySettings{Logger: zap.NewNop()}, mockWriter)
closeChan := make(chan struct{})
agg.Start(closeChan)

traceID := createTraceID(1)
spanID := createSpanID(1)
span := createSpan(traceID, spanID, pcommon.SpanID{}, "service1")

ctx := context.Background()
agg.HandleSpan(ctx, span, "service1")

mockWriter.On("WriteDependencies", mock.Anything, mock.Anything, mock.Anything).Return(nil)

close(closeChan)
err := agg.Close()
require.NoError(t, err)

assert.Eventually(t, func() bool {
agg.tracesLock.RLock()

Check failure on line 145 in cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go

View workflow job for this annotation

GitHub Actions / lint

agg.tracesLock undefined (type *dependencyAggregator has no field or method tracesLock) (typecheck)
defer agg.tracesLock.RUnlock()

Check failure on line 146 in cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go

View workflow job for this annotation

GitHub Actions / lint

agg.tracesLock undefined (type *dependencyAggregator has no field or method tracesLock) (typecheck)
return len(agg.traces) == 0

Check failure on line 147 in cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go

View workflow job for this annotation

GitHub Actions / lint

agg.traces undefined (type *dependencyAggregator has no field or method traces) (typecheck)
}, time.Second, 10*time.Millisecond, "Traces were not cleared after close")
}

// Helper functions

func createTraceID(id byte) pcommon.TraceID {
var traceID [16]byte
traceID[15] = id
return pcommon.TraceID(traceID)
}

func createSpanID(id byte) pcommon.SpanID {
var spanID [8]byte
spanID[7] = id
return pcommon.SpanID(spanID)
}

func createSpan(traceID pcommon.TraceID, spanID pcommon.SpanID, parentSpanID pcommon.SpanID, serviceName string) ptrace.Span {
span := ptrace.NewSpan()
span.SetTraceID(traceID)
span.SetSpanID(spanID)
span.SetParentSpanID(parentSpanID)
// Additional span attributes could be set here if needed
return span
}
Loading
Loading