diff --git a/cmd/jaeger/internal/components.go b/cmd/jaeger/internal/components.go index cca095d2e99..67195743f67 100644 --- a/cmd/jaeger/internal/components.go +++ b/cmd/jaeger/internal/components.go @@ -110,6 +110,7 @@ func (b builders) build() (otelcol.Factories, error) { attributesprocessor.NewFactory(), // add-ons adaptivesampling.NewFactory(), + dependencyprocessor.NewFactory(), ) if err != nil { return otelcol.Factories{}, err diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go new file mode 100644 index 00000000000..7dae6f7b366 --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go @@ -0,0 +1,201 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dependencyprocessor + +import ( + "context" + "sync" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +var beamInitOnce sync.Once + +// dependencyAggregator processes spans and aggregates dependencies using Apache Beam +type dependencyAggregator struct { + config *Config + telset component.TelemetrySettings + dependencyWriter spanstore.Writer + inputChan chan spanEvent + closeChan chan struct{} +} + +// spanEvent represents a span with its service name and timestamp +type spanEvent struct { + span ptrace.Span + serviceName string + eventTime time.Time +} + +// newDependencyAggregator creates a new dependency aggregator +func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter spanstore.Writer) *dependencyAggregator { + beamInitOnce.Do(func() { + beam.Init() + }) + return &dependencyAggregator{ + config: &cfg, + telset: telset, + dependencyWriter: dependencyWriter, + inputChan: make(chan spanEvent, 1000), + closeChan: make(chan struct{}), + } +} + +// Start begins the aggregation process +func (agg *dependencyAggregator) Start(closeChan chan struct{}) { + agg.closeChan = closeChan + go agg.runPipeline() +} + +// HandleSpan processes a single span +func (agg *dependencyAggregator) HandleSpan(ctx context.Context, span ptrace.Span, serviceName string) { + event := spanEvent{ + span: span, + serviceName: serviceName, + eventTime: time.Now(), + } + select { + case agg.inputChan <- event: + default: + agg.telset.Logger.Warn("Input channel full, dropping span") + } +} + +// runPipeline runs the main processing pipeline +func (agg *dependencyAggregator) runPipeline() { + for { + var events []spanEvent + timer := time.NewTimer(agg.config.AggregationInterval) + + collectLoop: + for { + select { + case event := <-agg.inputChan: + events = append(events, event) + case <-timer.C: + break collectLoop + case <-agg.closeChan: + if !timer.Stop() { + <-timer.C + } + if len(events) > 0 { + agg.processEvents(context.Background(), events) + } + return + } + } + + if len(events) > 0 { + agg.processEvents(context.Background(), events) + } + } +} + +// processEvents processes a batch of spans using Beam pipeline +func (agg *dependencyAggregator) processEvents(ctx context.Context, events []spanEvent) { + // Create new pipeline and scope + p, s := beam.NewPipelineWithRoot() + + // Create initial PCollection with timestamps + col := beam.CreateList(s, events) + + // Transform into timestamped KV pairs + timestamped := beam.ParDo(s, func(event spanEvent) beam.WindowValue { + return beam.WindowValue{ + Timestamp: event.eventTime, + Windows: window.IntervalWindow{Start: event.eventTime, End: event.eventTime.Add(agg.config.InactivityTimeout)}, + Value: beam.KV{ + Key: event.span.TraceID(), + Value: event, + }, + } + }, col) + + // Apply session windows + windowed := beam.WindowInto(s, + window.NewSessions(agg.config.InactivityTimeout), + timestamped, + ) + + // Group by TraceID and aggregate dependencies + grouped := stats.GroupByKey(s, windowed) + + // Calculate dependencies for each trace + dependencies := beam.ParDo(s, func(key pcommon.TraceID, iter func(*spanEvent) bool) []*model.DependencyLink { + spanMap := make(map[pcommon.SpanID]spanEvent) + var event *spanEvent + + // Build span map + for iter(event) { + spanMap[event.span.SpanID()] = *event + } + + // Calculate dependencies + deps := make(map[string]*model.DependencyLink) + for _, event := range spanMap { + parentSpanID := event.span.ParentSpanID() + if parentEvent, hasParent := spanMap[parentSpanID]; hasParent { + parentService := parentEvent.serviceName + childService := event.serviceName + + // Create dependency link if services are different + if parentService != "" && childService != "" && parentService != childService { + depKey := parentService + "&&&" + childService + if dep, exists := deps[depKey]; exists { + dep.CallCount++ + } else { + deps[depKey] = &model.DependencyLink{ + Parent: parentService, + Child: childService, + CallCount: 1, + } + } + } + } + } + + return depMapToSlice(deps) + }, grouped) + + // Merge results from all windows + merged := beam.Flatten(s, dependencies) + + // Write to storage + beam.ParDo0(s, func(deps []*model.DependencyLink) { + if err := agg.dependencyWriter.WriteDependencies(ctx, time.Now(), deps); err != nil { + agg.telset.Logger.Error("Failed to write dependencies", zap.Error(err)) + } + }, merged) + + // Execute pipeline + if err := beamx.Run(ctx, p); err != nil { + agg.telset.Logger.Error("Failed to run beam pipeline", zap.Error(err)) + } +} + +// Close shuts down the aggregator +func (agg *dependencyAggregator) Close() error { + close(agg.closeChan) + return nil +} + +// depMapToSlice converts dependency map to slice +func depMapToSlice(deps map[string]*model.DependencyLink) []*model.DependencyLink { + result := make([]*model.DependencyLink, 0, len(deps)) + for _, dep := range deps { + result = append(result, dep) + } + return result +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go new file mode 100644 index 00000000000..95b4876aa12 --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go @@ -0,0 +1,172 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dependencyprocessor + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/model" +) + +// MockDependencyWriter is a mock implementation of spanstore.Writer +type MockDependencyWriter struct { + mock.Mock +} + +func (m *MockDependencyWriter) WriteSpan(ctx context.Context, span *model.Span) error { + args := m.Called(ctx, span) + return args.Error(0) +} + +func (m *MockDependencyWriter) WriteDependencies(ctx context.Context, ts time.Time, deps []model.DependencyLink) error { + args := m.Called(ctx, ts, deps) + return args.Error(0) +} + +func TestAggregator(t *testing.T) { + // Create mock writer + mockWriter := new(MockDependencyWriter) + + // Create config + cfg := Config{ + AggregationInterval: 100 * time.Millisecond, + InactivityTimeout: 50 * time.Millisecond, + } + + // Create logger + logger := zap.NewNop() + telemetrySettings := component.TelemetrySettings{ + Logger: logger, + } + + // Create aggregator + agg := newDependencyAggregator(cfg, telemetrySettings, mockWriter) + + // Start aggregator + closeChan := make(chan struct{}) + agg.Start(closeChan) + defer close(closeChan) + + // Create test spans + traceID := createTraceID(1) + parentSpanID := createSpanID(2) + childSpanID := createSpanID(3) + + // Create parent span + parentSpan := createSpan(traceID, parentSpanID, pcommon.SpanID{}, "service1") + + // Create child span + childSpan := createSpan(traceID, childSpanID, parentSpanID, "service2") + + // Setup mock expectations + mockWriter.On("WriteDependencies", mock.Anything, mock.Anything, mock.MatchedBy(func(deps []model.DependencyLink) bool { + if len(deps) != 1 { + return false + } + dep := deps[0] + return dep.Parent == "service1" && dep.Child == "service2" && dep.CallCount == 1 + })).Return(nil) + + // Handle spans + ctx := context.Background() + agg.HandleSpan(ctx, parentSpan, "service1") + agg.HandleSpan(ctx, childSpan, "service2") + + // Wait for processing and verify + assert.Eventually(t, func() bool { + return mockWriter.AssertExpectations(t) + }, time.Second, 10*time.Millisecond, "Dependencies were not written as expected") +} + +func TestAggregatorInactivityTimeout(t *testing.T) { + mockWriter := new(MockDependencyWriter) + cfg := Config{ + AggregationInterval: 1 * time.Second, + InactivityTimeout: 50 * time.Millisecond, + } + + agg := newDependencyAggregator(cfg, component.TelemetrySettings{Logger: zap.NewNop()}, mockWriter) + closeChan := make(chan struct{}) + agg.Start(closeChan) + defer close(closeChan) + + traceID := createTraceID(1) + spanID := createSpanID(1) + span := createSpan(traceID, spanID, pcommon.SpanID{}, "service1") + + mockWriter.On("WriteDependencies", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + ctx := context.Background() + agg.HandleSpan(ctx, span, "service1") + + assert.Eventually(t, func() bool { + agg.tracesLock.RLock() + defer agg.tracesLock.RUnlock() + return len(agg.traces) == 0 + }, time.Second, 10*time.Millisecond, "Trace was not cleared after inactivity timeout") +} + +func TestAggregatorClose(t *testing.T) { + mockWriter := new(MockDependencyWriter) + cfg := Config{ + AggregationInterval: 1 * time.Second, + InactivityTimeout: 1 * time.Second, + } + + agg := newDependencyAggregator(cfg, component.TelemetrySettings{Logger: zap.NewNop()}, mockWriter) + closeChan := make(chan struct{}) + agg.Start(closeChan) + + traceID := createTraceID(1) + spanID := createSpanID(1) + span := createSpan(traceID, spanID, pcommon.SpanID{}, "service1") + + ctx := context.Background() + agg.HandleSpan(ctx, span, "service1") + + mockWriter.On("WriteDependencies", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + close(closeChan) + err := agg.Close() + require.NoError(t, err) + + assert.Eventually(t, func() bool { + agg.tracesLock.RLock() + defer agg.tracesLock.RUnlock() + return len(agg.traces) == 0 + }, time.Second, 10*time.Millisecond, "Traces were not cleared after close") +} + +// Helper functions + +func createTraceID(id byte) pcommon.TraceID { + var traceID [16]byte + traceID[15] = id + return pcommon.TraceID(traceID) +} + +func createSpanID(id byte) pcommon.SpanID { + var spanID [8]byte + spanID[7] = id + return pcommon.SpanID(spanID) +} + +func createSpan(traceID pcommon.TraceID, spanID pcommon.SpanID, parentSpanID pcommon.SpanID, serviceName string) ptrace.Span { + span := ptrace.NewSpan() + span.SetTraceID(traceID) + span.SetSpanID(spanID) + span.SetParentSpanID(parentSpanID) + // Additional span attributes could be set here if needed + return span +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/config.go b/cmd/jaeger/internal/processors/dependencyprocessor/config.go new file mode 100644 index 00000000000..22bf357c3af --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/config.go @@ -0,0 +1,34 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dependencyprocessor + +import ( + "errors" + "time" +) + +type Config struct { + // AggregationInterval defines the length of aggregation window after + // which the accumulated dependencies are flushed into storage. + AggregationInterval time.Duration `yaml:"aggregation_interval" valid:"gt=0"` + // InactivityTimeout specifies the duration of inactivity after which a trace + // is considered complete and ready for dependency aggregation. + InactivityTimeout time.Duration `yaml:"inactivity_timeout" valid:"gt=0"` + // StorageName specifies the storage backend to use for dependency data. + StorageName string `yaml:"storage_name" valid:"required"` +} + +// Validate checks the configuration fields for validity. +func (c *Config) Validate() error { + if c.AggregationInterval <= 0 { + return errors.New("aggregation_interval must be greater than 0") + } + if c.InactivityTimeout <= 0 { + return errors.New("inactivity_timeout must be greater than 0") + } + if c.StorageName == "" { + return errors.New("storage_name must be provided and cannot be empty") + } + return nil +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/factory.go b/cmd/jaeger/internal/processors/dependencyprocessor/factory.go new file mode 100644 index 00000000000..96b24abe516 --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/factory.go @@ -0,0 +1,46 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dependencyprocessor + +import ( + "context" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/processor" +) + +// componentType is the name of this processor in configuration. +var componentType = component.MustNewType("dependencyprocessor") + +// NewFactory creates a factory for the dependency processor. +func NewFactory() processor.Factory { + return processor.NewFactory( + componentType, + createDefaultConfig, + processor.WithTraces(createTracesProcessor, component.StabilityLevelAlpha), + ) +} + +func createDefaultConfig() component.Config { + return &Config{ + AggregationInterval: 10 * time.Minute, + InactivityTimeout: 2 * time.Minute, + } +} + +// createTracesProcessor creates a new instance of the dependency processor. +func createTracesProcessor( + ctx context.Context, + set processor.Settings, + cfg component.Config, + nextConsumer consumer.Traces, +) (processor.Traces, error) { + oCfg := cfg.(*Config) + + dp := newDependencyProcessor(*oCfg, set.TelemetrySettings, nextConsumer) + + return dp, nil +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/package_test.go b/cmd/jaeger/internal/processors/dependencyprocessor/package_test.go new file mode 100644 index 00000000000..a5f7fbc73ba --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dependencyprocessor + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/processor.go b/cmd/jaeger/internal/processors/dependencyprocessor/processor.go new file mode 100644 index 00000000000..56fd8f24454 --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/processor.go @@ -0,0 +1,98 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dependencyprocessor + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +type dependencyProcessor struct { + config *Config + aggregator *dependencyAggregator // Define the aggregator below. + telset component.TelemetrySettings + dependencyWriter spanstore.Writer + closeChan chan struct{} + nextConsumer consumer.Traces +} + +func newDependencyProcessor(cfg Config, telset component.TelemetrySettings, nextConsumer consumer.Traces) *dependencyProcessor { + return &dependencyProcessor{ + config: &cfg, + telset: telset, + closeChan: make(chan struct{}), + nextConsumer: nextConsumer, + } +} + +func (tp *dependencyProcessor) Start(_ context.Context, host component.Host) error { + storageName := tp.config.StorageName + if storageName == "" { + storageName = "memory" + } + + f, err := jaegerstorage.GetStorageFactory(storageName, host) + if err != nil { + return fmt.Errorf("failed to get storage factory: %w", err) + } + + writer, err := f.CreateSpanWriter() + if err != nil { + return fmt.Errorf("failed to create dependency writer: %w", err) + } + + tp.dependencyWriter = writer + + tp.aggregator = newDependencyAggregator(*tp.config, tp.telset, tp.dependencyWriter) + tp.aggregator.Start(tp.closeChan) + return nil +} + +// Shutdown implements processor.Traces +func (tp *dependencyProcessor) Shutdown(ctx context.Context) error { + if tp.aggregator != nil { + if err := tp.aggregator.Close(); err != nil { + return fmt.Errorf("failed to stop the dependency aggregator : %w", err) + } + } + return nil +} + +// Capabilities implements processor.Traces +func (p *dependencyProcessor) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func (dp *dependencyProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + for i := 0; i < td.ResourceSpans().Len(); i++ { + rs := td.ResourceSpans().At(i) + serviceName := getServiceName(rs.Resource()) + + for j := 0; j < rs.ScopeSpans().Len(); j++ { + ss := rs.ScopeSpans().At(j) + for k := 0; k < ss.Spans().Len(); k++ { + span := ss.Spans().At(k) + dp.aggregator.HandleSpan(ctx, span, serviceName) + } + } + } + + return dp.nextConsumer.ConsumeTraces(ctx, td) +} + +func getServiceName(resource pcommon.Resource) string { + serviceNameAttr, found := resource.Attributes().Get("service.name") + if !found { + return "" + } + return serviceNameAttr.Str() +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/processor_test.go b/cmd/jaeger/internal/processors/dependencyprocessor/processor_test.go new file mode 100644 index 00000000000..b5398d123cd --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/processor_test.go @@ -0,0 +1,107 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dependencyprocessor + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor/processortest" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/plugin/storage/memory" +) + +func TestDependencyProcessorEndToEnd(t *testing.T) { + // Create a mock receiver, processor, and exporter + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + + cfg.AggregationInterval = 1 * time.Second + // Create a mock next consumer (exporter) + sink := new(consumertest.TracesSink) + + // Create a memory store to store dependency links + store := memory.NewStore() + + // Create the processor + processor, err := factory.CreateTraces( + context.Background(), + processortest.NewNopSettings(), + cfg, + sink, + ) + require.NoError(t, err) + + // Start the processor + err = processor.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + assert.NoError(t, processor.Shutdown(context.Background())) + }() + + // Create a test trace + trace := createTestTrace() + + // Send the trace to the processor + err = processor.ConsumeTraces(context.Background(), trace) + require.NoError(t, err) + + // Wait for the processor to process the trace + assert.Eventually(t, func() bool { + deps, err := store.GetDependencies(context.Background(), time.Now(), cfg.AggregationInterval) + return err == nil && len(deps) > 0 + }, cfg.AggregationInterval+time.Second, 100*time.Millisecond) + + // Verify dependency links + deps, err := store.GetDependencies(context.Background(), time.Now(), cfg.AggregationInterval) + require.NoError(t, err) + + // Expected dependency links + expectedDeps := []model.DependencyLink{ + { + Parent: "service1", + Child: "service2", + CallCount: 1, + }, + } + assert.Equal(t, expectedDeps, deps, "dependency links do not match expected output") +} + +// createTestTrace creates a test trace with two spans from different services. +func createTestTrace() ptrace.Traces { + traces := ptrace.NewTraces() + + // Create a resource span for the parent span (service1) + rs1 := traces.ResourceSpans().AppendEmpty() + rs1.Resource().Attributes().PutStr("service.name", "service1") + ils1 := rs1.ScopeSpans().AppendEmpty() + parentSpan := ils1.Spans().AppendEmpty() + parentSpan.SetTraceID([16]byte{1, 2, 3, 4}) + parentSpan.SetSpanID([8]byte{5, 6, 7, 8}) + parentSpan.SetName("span2") + parentSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now())) + parentSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(time.Second))) + + // Create a resource span for the child span (service2) + rs2 := traces.ResourceSpans().AppendEmpty() + rs2.Resource().Attributes().PutStr("service.name", "service2") + ils2 := rs2.ScopeSpans().AppendEmpty() + span := ils2.Spans().AppendEmpty() + span.SetTraceID([16]byte{1, 2, 3, 4}) + span.SetSpanID([8]byte{1, 2, 3, 4}) + span.SetName("span1") + span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now())) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(time.Second))) + span.SetParentSpanID(parentSpan.SpanID()) // Set parent-child relationship + + return traces +}