From 60fb3344e3037bf804f97b2a1b50c43478e89cab Mon Sep 17 00:00:00 2001 From: yunmaoQu <2643354262@qq.com> Date: Fri, 17 Jan 2025 17:32:45 +0000 Subject: [PATCH 1/5] feat: add dependency processor using Apache Beam Signed-off-by: yunmaoQu <2643354262@qq.com> --- .../dependencyprocessor/aggregator.go | 205 ++++++++++++++++++ .../processors/dependencyprocessor/config.go | 15 ++ .../dependencyprocessor/processor.go | 61 ++++++ plugin/storage/memory/memory.go | 72 ++++-- 4 files changed, 336 insertions(+), 17 deletions(-) create mode 100644 cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go create mode 100644 cmd/jaeger/internal/processors/dependencyprocessor/config.go create mode 100644 cmd/jaeger/internal/processors/dependencyprocessor/processor.go diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go new file mode 100644 index 00000000000..02ed6d149be --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go @@ -0,0 +1,205 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dependencyprocessor + +import ( + "context" + "sync" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "go.opentelemetry.io/collector/component" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/plugin/storage/memory" +) + +type dependencyAggregator struct { + config *Config + telset component.TelemetrySettings + dependencyWriter *memory.Store + traces map[model.TraceID]*TraceState + tracesLock sync.RWMutex + closeChan chan struct{} + beamPipeline *beam.Pipeline + beamScope beam.Scope +} + +type TraceState struct { + spans []*model.Span + spanMap map[model.SpanID]*model.Span + lastUpdateTime time.Time + timer *time.Timer +} + +func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store) *dependencyAggregator { + beam.Init() + p, s := beam.NewPipelineWithRoot() + return &dependencyAggregator{ + config: &cfg, + telset: telset, + dependencyWriter: dependencyWriter, + traces: make(map[model.TraceID]*TraceState), + beamPipeline: p, + beamScope: s, + } +} + +func (agg *dependencyAggregator) Start(closeChan chan struct{}) { + agg.closeChan = closeChan + go func() { + ticker := time.NewTicker(agg.config.AggregationInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + agg.processTraces(context.Background()) // Pass context + case <-agg.closeChan: + agg.processTraces(context.Background()) // Pass context + return + } + } + }() +} + +func (agg *dependencyAggregator) Close() error { + agg.tracesLock.Lock() + defer agg.tracesLock.Unlock() + for _, traceState := range agg.traces { + if traceState.timer != nil { + traceState.timer.Stop() + } + } + return nil +} + +func (agg *dependencyAggregator) HandleSpan(span *model.Span) { + agg.tracesLock.Lock() + defer agg.tracesLock.Unlock() + + traceState, ok := agg.traces[span.TraceID] + if !ok { + traceState = &TraceState{ + spans: []*model.Span{}, + spanMap: make(map[model.SpanID]*model.Span), + lastUpdateTime: time.Now(), + } + agg.traces[span.TraceID] = traceState + } + + traceState.spans = append(traceState.spans, span) + traceState.spanMap[span.SpanID] = span + traceState.lastUpdateTime = time.Now() + + if traceState.timer != nil { + traceState.timer.Stop() + } + traceState.timer = time.AfterFunc(agg.config.InactivityTimeout, func() { + agg.processTraces(context.Background()) // Pass context + }) +} + +func (agg *dependencyAggregator) processTraces(ctx context.Context) { + agg.tracesLock.Lock() + if len(agg.traces) == 0 { + agg.tracesLock.Unlock() + return + } + traces := agg.traces + agg.traces = make(map[model.TraceID]*TraceState) + agg.tracesLock.Unlock() + for _, traceState := range traces { + if traceState.timer != nil { + traceState.timer.Stop() + } + } + + beamInput := agg.createBeamInput(traces) + if beamInput.IsValid() { + agg.calculateDependencies(ctx, beamInput) + } +} + +func (agg *dependencyAggregator) createBeamInput(traces map[model.TraceID]*TraceState) beam.PCollection { + var allSpans []*model.Span + for _, traceState := range traces { + allSpans = append(allSpans, traceState.spans...) + } + if len(allSpans) == 0 { + return beam.PCollection{} + } + return beam.CreateList(agg.beamScope, allSpans) +} + +func (agg *dependencyAggregator) calculateDependencies(ctx context.Context, input beam.PCollection) { + keyedSpans := beam.ParDo(agg.beamScope, func(s *model.Span) (model.TraceID, *model.Span) { + return s.TraceID, s + }, input) + + groupedSpans := beam.GroupByKey(agg.beamScope, keyedSpans) + depLinks := beam.ParDo(agg.beamScope, func(_ model.TraceID, spansIter func(*model.Span) bool) []*model.DependencyLink { + deps := map[string]*model.DependencyLink{} + var span *model.Span + for spansIter(span) { + processSpan(deps, span, agg.traces) + } + return depMapToSlice(deps) + }, groupedSpans) + flattened := beam.Flatten(agg.beamScope, depLinks) + + beam.ParDo0(agg.beamScope, func(deps []*model.DependencyLink) { + err := agg.dependencyWriter.WriteDependencies(ctx, time.Now(), deps) + if err != nil { + agg.telset.Logger.Error("Error writing dependencies", zap.Error(err)) + } + }, flattened) + + go func() { + err := beamx.Run(ctx, agg.beamPipeline) + if err != nil { + agg.telset.Logger.Error("Error running beam pipeline", zap.Error(err)) + } + agg.beamPipeline = beam.NewPipeline() + agg.beamScope = beam.Scope{Parent: beam.PipelineScope{ID: "dependency"}, Name: "dependency"} + }() +} + +// processSpan is a copy from the memory storage plugin +func processSpan(deps map[string]*model.DependencyLink, s *model.Span, traces map[model.TraceID]*TraceState) { + parentSpan := seekToSpan(s, traces) + if parentSpan != nil { + if parentSpan.Process.ServiceName == s.Process.ServiceName { + return + } + depKey := parentSpan.Process.ServiceName + "&&&" + s.Process.ServiceName + if _, ok := deps[depKey]; !ok { + deps[depKey] = &model.DependencyLink{ + Parent: parentSpan.Process.ServiceName, + Child: s.Process.ServiceName, + CallCount: 1, + } + } else { + deps[depKey].CallCount++ + } + } +} + +func seekToSpan(span *model.Span, traces map[model.TraceID]*TraceState) *model.Span { + traceState, ok := traces[span.TraceID] + if !ok { + return nil + } + return traceState.spanMap[span.ParentSpanID()] +} + +// depMapToSlice modifies the spans to DependencyLink in the same way as the memory storage plugin +func depMapToSlice(deps map[string]*model.DependencyLink) []*model.DependencyLink { + retMe := make([]*model.DependencyLink, 0, len(deps)) + for _, dep := range deps { + retMe = append(retMe, dep) + } + return retMe +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/config.go b/cmd/jaeger/internal/processors/dependencyprocessor/config.go new file mode 100644 index 00000000000..929439c1c45 --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/config.go @@ -0,0 +1,15 @@ +package dependencyprocessor + +import "time" + +type Config struct { + AggregationInterval time.Duration `yaml:"aggregation_interval"` + InactivityTimeout time.Duration `yaml:"inactivity_timeout"` +} + +func DefaultConfig() Config { + return Config{ + AggregationInterval: 5 * time.Second, // 默认每5秒聚合一次依赖 + InactivityTimeout: 2 * time.Second, // 默认trace不活跃2秒后视为完成 + } +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/processor.go b/cmd/jaeger/internal/processors/dependencyprocessor/processor.go new file mode 100644 index 00000000000..01d2185792b --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/processor.go @@ -0,0 +1,61 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dependencyprocessor + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/plugin/storage/memory" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" +) + +type dependencyProcessor struct { + config *Config + aggregator *dependencyAggregator // Define the aggregator below. + telset component.TelemetrySettings + dependencyWriter *memory.Store + closeChan chan struct{} +} + +func newDependencyProcessor(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store) *dependencyProcessor { + return &dependencyProcessor{ + config: &cfg, + telset: telset, + dependencyWriter: dependencyWriter, + closeChan: make(chan struct{}), + } +} + +func (tp *dependencyProcessor) start(_ context.Context, host component.Host) error { + tp.aggregator = newDependencyAggregator(*tp.config, tp.telset, tp.dependencyWriter) + tp.aggregator.Start(tp.closeChan) + return nil +} + +func (tp *dependencyProcessor) close(ctx context.Context) error { + close(tp.closeChan) + if tp.aggregator != nil { + if err := tp.aggregator.Close(); err != nil { + return fmt.Errorf("failed to stop the dependency aggregator : %w", err) + } + } + return nil +} + +func (tp *dependencyProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { + batches := v1adapter.ProtoFromTraces(td) + for _, batch := range batches { + for _, span := range batch.Spans { + if span.Process == nil { + span.Process = batch.Process + } + tp.aggregator.HandleSpan(span) + } + } + return td, nil +} diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index 02dff26a584..0ae0b70f979 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -24,20 +24,22 @@ type Store struct { sync.RWMutex // Each tenant gets a copy of default config. // In the future this can be extended to contain per-tenant configuration. - defaultConfig Configuration - perTenant map[string]*Tenant + defaultConfig Configuration + perTenant map[string]*Tenant + useNewDependencies bool } // Tenant is an in-memory store of traces for a single tenant type Tenant struct { sync.RWMutex - ids []*model.TraceID - traces map[model.TraceID]*model.Trace - services map[string]struct{} - operations map[string]map[spanstore.Operation]struct{} - deduper adjuster.Adjuster - config Configuration - index int + ids []*model.TraceID + traces map[model.TraceID]*model.Trace + services map[string]struct{} + operations map[string]map[spanstore.Operation]struct{} + deduper adjuster.Adjuster + config Configuration + index int + dependencyLinks map[string]*model.DependencyLink } // NewStore creates an unbounded in-memory store @@ -48,19 +50,22 @@ func NewStore() *Store { // WithConfiguration creates a new in memory storage based on the given configuration func WithConfiguration(cfg Configuration) *Store { return &Store{ - defaultConfig: cfg, - perTenant: make(map[string]*Tenant), + defaultConfig: cfg, + perTenant: make(map[string]*Tenant), + useNewDependencies: false, // 添加初始化 + } } func newTenant(cfg Configuration) *Tenant { return &Tenant{ - ids: make([]*model.TraceID, cfg.MaxTraces), - traces: map[model.TraceID]*model.Trace{}, - services: map[string]struct{}{}, - operations: map[string]map[spanstore.Operation]struct{}{}, - deduper: adjuster.ZipkinSpanIDUniquifier(), - config: cfg, + ids: make([]*model.TraceID, cfg.MaxTraces), + traces: map[model.TraceID]*model.Trace{}, + services: map[string]struct{}{}, + operations: map[string]map[spanstore.Operation]struct{}{}, + deduper: adjuster.ZipkinSpanIDUniquifier(), + config: cfg, + dependencyLinks: make(map[string]*model.DependencyLink), } } @@ -83,6 +88,9 @@ func (st *Store) getTenant(tenantID string) *Tenant { // GetDependencies returns dependencies between services func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { + if st.useNewDependencies { // 添加条件判断 + return st.getDependenciesNew(ctx) + } m := st.getTenant(tenancy.GetTenant(ctx)) // deduper used below can modify the spans, so we take an exclusive lock m.Lock() @@ -119,6 +127,36 @@ func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback return retMe, nil } +func (st *Store) getDependenciesNew(ctx context.Context) ([]model.DependencyLink, error) { + m := st.getTenant(tenancy.GetTenant(ctx)) + m.RLock() + defer m.RUnlock() + retMe := make([]model.DependencyLink, 0, len(m.dependencyLinks)) + for _, dep := range m.dependencyLinks { + retMe = append(retMe, *dep) + } + return retMe, nil +} + +func (st *Store) WriteDependencies(ctx context.Context, ts time.Time, dependencies []*model.DependencyLink) error { + m := st.getTenant(tenancy.GetTenant(ctx)) + m.Lock() + defer m.Unlock() + for _, dep := range dependencies { + key := dep.Parent + "&&&" + dep.Child + if _, ok := m.dependencyLinks[key]; !ok { + m.dependencyLinks[key] = &model.DependencyLink{ + Parent: dep.Parent, + Child: dep.Child, + CallCount: dep.CallCount, + } + } else { + m.dependencyLinks[key].CallCount += dep.CallCount + } + } + return nil +} + func findSpan(trace *model.Trace, spanID model.SpanID) *model.Span { for _, s := range trace.Spans { if s.SpanID == spanID { From 5ccf68dfd8dca60869d073d38dfe3376d7a90a60 Mon Sep 17 00:00:00 2001 From: yunmaoQu <2643354262@qq.com> Date: Sun, 19 Jan 2025 13:32:38 +0000 Subject: [PATCH 2/5] fix: address review comments Signed-off-by: yunmaoQu <2643354262@qq.com> --- cmd/jaeger/internal/components.go | 1 + .../processors/dependencyprocessor/config.go | 15 ++- .../dependencyprocessor/e2e_test.go | 103 ++++++++++++++++++ .../processors/dependencyprocessor/factory.go | 50 +++++++++ .../dependencyprocessor/package_test.go | 14 +++ .../dependencyprocessor/processor.go | 21 +++- 6 files changed, 195 insertions(+), 9 deletions(-) create mode 100644 cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go create mode 100644 cmd/jaeger/internal/processors/dependencyprocessor/factory.go create mode 100644 cmd/jaeger/internal/processors/dependencyprocessor/package_test.go diff --git a/cmd/jaeger/internal/components.go b/cmd/jaeger/internal/components.go index cca095d2e99..67195743f67 100644 --- a/cmd/jaeger/internal/components.go +++ b/cmd/jaeger/internal/components.go @@ -110,6 +110,7 @@ func (b builders) build() (otelcol.Factories, error) { attributesprocessor.NewFactory(), // add-ons adaptivesampling.NewFactory(), + dependencyprocessor.NewFactory(), ) if err != nil { return otelcol.Factories{}, err diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/config.go b/cmd/jaeger/internal/processors/dependencyprocessor/config.go index 929439c1c45..8f828f87073 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/config.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/config.go @@ -1,15 +1,24 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + package dependencyprocessor -import "time" +import ( + "time" + + "github.com/jaegertracing/jaeger/plugin/storage/memory" +) type Config struct { AggregationInterval time.Duration `yaml:"aggregation_interval"` InactivityTimeout time.Duration `yaml:"inactivity_timeout"` + Store *memory.Store `yaml:"-"` } func DefaultConfig() Config { return Config{ - AggregationInterval: 5 * time.Second, // 默认每5秒聚合一次依赖 - InactivityTimeout: 2 * time.Second, // 默认trace不活跃2秒后视为完成 + AggregationInterval: 5 * time.Second, // Default dependency aggregation interval: 5 seconds + InactivityTimeout: 2 * time.Second, // Default trace completion timeout: 2 seconds of inactivity + Store: memory.NewStore(), } } diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go b/cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go new file mode 100644 index 00000000000..57f6bb0ec3f --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go @@ -0,0 +1,103 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dependencyprocessor + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor/processortest" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/plugin/storage/memory" +) + +func TestDependencyProcessorEndToEnd(t *testing.T) { + // Create a mock receiver, processor, and exporter + factory := NewFactory() + cfg := factory.CreateDefaultConfig().(*Config) + + // Create a mock next consumer (exporter) + sink := new(consumertest.TracesSink) + + // Create a memory store to store dependency links + store := memory.NewStore() + + // Create the processor + processor, err := factory.CreateTraces( + context.Background(), + processortest.NewNopSettings(), + cfg, + sink, + ) + require.NoError(t, err) + + // Start the processor + err = processor.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + defer func() { + assert.NoError(t, processor.Shutdown(context.Background())) + }() + + // Create a test trace + trace := createTestTrace() + + // Send the trace to the processor + err = processor.ConsumeTraces(context.Background(), trace) + require.NoError(t, err) + + // Wait for the processor to process the trace + time.Sleep(cfg.AggregationInterval + 100*time.Millisecond) + + // Verify dependency links + deps, err := store.GetDependencies(context.Background(), time.Now(), cfg.AggregationInterval) + require.NoError(t, err) + + // Expected dependency links + expectedDeps := []model.DependencyLink{ + { + Parent: "service1", + Child: "service2", + CallCount: 1, + }, + } + assert.Equal(t, expectedDeps, deps, "dependency links do not match expected output") +} + +// createTestTrace creates a test trace with two spans from different services. +func createTestTrace() ptrace.Traces { + traces := ptrace.NewTraces() + + // Create a resource span for the parent span (service1) + rs1 := traces.ResourceSpans().AppendEmpty() + rs1.Resource().Attributes().PutStr("service.name", "service1") + ils1 := rs1.ScopeSpans().AppendEmpty() + parentSpan := ils1.Spans().AppendEmpty() + parentSpan.SetTraceID([16]byte{1, 2, 3, 4}) + parentSpan.SetSpanID([8]byte{5, 6, 7, 8}) + parentSpan.SetName("span2") + parentSpan.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now())) + parentSpan.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(time.Second))) + + // Create a resource span for the child span (service2) + rs2 := traces.ResourceSpans().AppendEmpty() + rs2.Resource().Attributes().PutStr("service.name", "service2") + ils2 := rs2.ScopeSpans().AppendEmpty() + span := ils2.Spans().AppendEmpty() + span.SetTraceID([16]byte{1, 2, 3, 4}) + span.SetSpanID([8]byte{1, 2, 3, 4}) + span.SetName("span1") + span.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Now())) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(time.Second))) + span.SetParentSpanID(parentSpan.SpanID()) // Set parent-child relationship + + return traces +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/factory.go b/cmd/jaeger/internal/processors/dependencyprocessor/factory.go new file mode 100644 index 00000000000..083be248d6b --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/factory.go @@ -0,0 +1,50 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dependencyprocessor + +import ( + "context" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/processor" + + "github.com/jaegertracing/jaeger/plugin/storage/memory" +) + +// componentType is the name of this processor in configuration. +var componentType = component.MustNewType("dependencyprocessor") + +// NewFactory creates a factory for the dependency processor. +func NewFactory() processor.Factory { + return processor.NewFactory( + componentType, + createDefaultConfig, + processor.WithTraces(createTracesProcessor, component.StabilityLevelAlpha), + ) +} + +// createDefaultConfig returns the default configuration for the dependency processor. +func createDefaultConfig() component.Config { + return &Config{ + AggregationInterval: 5 * time.Second, + InactivityTimeout: 2 * time.Second, + Store: memory.NewStore(), + } +} + +// createTracesProcessor creates a new instance of the dependency processor. +func createTracesProcessor( + ctx context.Context, + set processor.Settings, + cfg component.Config, + nextConsumer consumer.Traces, +) (processor.Traces, error) { + oCfg := cfg.(*Config) + + dp := newDependencyProcessor(*oCfg, set.TelemetrySettings, oCfg.Store, nextConsumer) + + return dp, nil +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/package_test.go b/cmd/jaeger/internal/processors/dependencyprocessor/package_test.go new file mode 100644 index 00000000000..a5f7fbc73ba --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dependencyprocessor + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/processor.go b/cmd/jaeger/internal/processors/dependencyprocessor/processor.go index 01d2185792b..62a59cf4fc5 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/processor.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/processor.go @@ -8,6 +8,7 @@ import ( "fmt" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/jaegertracing/jaeger/plugin/storage/memory" @@ -20,24 +21,27 @@ type dependencyProcessor struct { telset component.TelemetrySettings dependencyWriter *memory.Store closeChan chan struct{} + nextConsumer consumer.Traces } -func newDependencyProcessor(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store) *dependencyProcessor { +func newDependencyProcessor(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store, nextConsumer consumer.Traces) *dependencyProcessor { return &dependencyProcessor{ config: &cfg, telset: telset, dependencyWriter: dependencyWriter, closeChan: make(chan struct{}), + nextConsumer: nextConsumer, } } -func (tp *dependencyProcessor) start(_ context.Context, host component.Host) error { +func (tp *dependencyProcessor) Start(_ context.Context, host component.Host) error { tp.aggregator = newDependencyAggregator(*tp.config, tp.telset, tp.dependencyWriter) tp.aggregator.Start(tp.closeChan) return nil } -func (tp *dependencyProcessor) close(ctx context.Context) error { +// Shutdown implements processor.Traces +func (tp *dependencyProcessor) Shutdown(ctx context.Context) error { close(tp.closeChan) if tp.aggregator != nil { if err := tp.aggregator.Close(); err != nil { @@ -47,15 +51,20 @@ func (tp *dependencyProcessor) close(ctx context.Context) error { return nil } -func (tp *dependencyProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { +// Capabilities implements processor.Traces +func (p *dependencyProcessor) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func (dp *dependencyProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { batches := v1adapter.ProtoFromTraces(td) for _, batch := range batches { for _, span := range batch.Spans { if span.Process == nil { span.Process = batch.Process } - tp.aggregator.HandleSpan(span) + dp.aggregator.HandleSpan(span) } } - return td, nil + return dp.nextConsumer.ConsumeTraces(ctx, td) } From 05dc78fba88df0bf8cd2abcae8fc4242d001157e Mon Sep 17 00:00:00 2001 From: yunmaoQu <2643354262@qq.com> Date: Mon, 20 Jan 2025 07:45:02 +0000 Subject: [PATCH 3/5] update Signed-off-by: yunmaoQu <2643354262@qq.com> --- .../processors/dependencyprocessor/config.go | 17 +++++++---------- .../processors/dependencyprocessor/e2e_test.go | 6 +++++- .../processors/dependencyprocessor/factory.go | 4 +--- plugin/storage/memory/memory.go | 5 ++--- 4 files changed, 15 insertions(+), 17 deletions(-) diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/config.go b/cmd/jaeger/internal/processors/dependencyprocessor/config.go index 8f828f87073..ed6750cded1 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/config.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/config.go @@ -10,15 +10,12 @@ import ( ) type Config struct { + // AggregationInterval defines how often the processor aggregates dependencies. + // This controls the frequency of flushing dependency data to storage. + // Default dependency aggregation interval: 10 seconds AggregationInterval time.Duration `yaml:"aggregation_interval"` - InactivityTimeout time.Duration `yaml:"inactivity_timeout"` - Store *memory.Store `yaml:"-"` -} - -func DefaultConfig() Config { - return Config{ - AggregationInterval: 5 * time.Second, // Default dependency aggregation interval: 5 seconds - InactivityTimeout: 2 * time.Second, // Default trace completion timeout: 2 seconds of inactivity - Store: memory.NewStore(), - } + // InactivityTimeout specifies the duration of inactivity after which a trace + // is considered complete and ready for dependency aggregation. + // Default trace completion timeout: 2 seconds of inactivity + InactivityTimeout time.Duration `yaml:"inactivity_timeout"` } diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go b/cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go index 57f6bb0ec3f..b5398d123cd 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go @@ -25,6 +25,7 @@ func TestDependencyProcessorEndToEnd(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) + cfg.AggregationInterval = 1 * time.Second // Create a mock next consumer (exporter) sink := new(consumertest.TracesSink) @@ -55,7 +56,10 @@ func TestDependencyProcessorEndToEnd(t *testing.T) { require.NoError(t, err) // Wait for the processor to process the trace - time.Sleep(cfg.AggregationInterval + 100*time.Millisecond) + assert.Eventually(t, func() bool { + deps, err := store.GetDependencies(context.Background(), time.Now(), cfg.AggregationInterval) + return err == nil && len(deps) > 0 + }, cfg.AggregationInterval+time.Second, 100*time.Millisecond) // Verify dependency links deps, err := store.GetDependencies(context.Background(), time.Now(), cfg.AggregationInterval) diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/factory.go b/cmd/jaeger/internal/processors/dependencyprocessor/factory.go index 083be248d6b..ee063431f83 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/factory.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/factory.go @@ -26,12 +26,10 @@ func NewFactory() processor.Factory { ) } -// createDefaultConfig returns the default configuration for the dependency processor. func createDefaultConfig() component.Config { return &Config{ - AggregationInterval: 5 * time.Second, + AggregationInterval: 10 * time.Second, InactivityTimeout: 2 * time.Second, - Store: memory.NewStore(), } } diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index 0ae0b70f979..43c0478c60b 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -52,8 +52,7 @@ func WithConfiguration(cfg Configuration) *Store { return &Store{ defaultConfig: cfg, perTenant: make(map[string]*Tenant), - useNewDependencies: false, // 添加初始化 - + useNewDependencies: false, } } @@ -88,7 +87,7 @@ func (st *Store) getTenant(tenantID string) *Tenant { // GetDependencies returns dependencies between services func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - if st.useNewDependencies { // 添加条件判断 + if st.useNewDependencies { return st.getDependenciesNew(ctx) } m := st.getTenant(tenancy.GetTenant(ctx)) From 3c7ef3569b67775f10279bfe953ee6b8dc029fde Mon Sep 17 00:00:00 2001 From: yunmaoQu <2643354262@qq.com> Date: Thu, 30 Jan 2025 06:46:09 +0000 Subject: [PATCH 4/5] fix Signed-off-by: yunmaoQu <2643354262@qq.com> --- .../dependencyprocessor/aggregator.go | 123 ++++++++----- .../dependencyprocessor/aggregator_test.go | 172 ++++++++++++++++++ .../processors/dependencyprocessor/config.go | 29 ++- .../processors/dependencyprocessor/factory.go | 8 +- .../dependencyprocessor/processor.go | 60 ++++-- .../{e2e_test.go => processor_test.go} | 0 plugin/storage/memory/memory.go | 71 ++------ 7 files changed, 330 insertions(+), 133 deletions(-) create mode 100644 cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go rename cmd/jaeger/internal/processors/dependencyprocessor/{e2e_test.go => processor_test.go} (100%) diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go index 02ed6d149be..0f03d2afc41 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go @@ -11,17 +11,21 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/plugin/storage/memory" + "github.com/jaegertracing/jaeger/storage/spanstore" ) +var beamInitOnce sync.Once + type dependencyAggregator struct { config *Config telset component.TelemetrySettings - dependencyWriter *memory.Store - traces map[model.TraceID]*TraceState + dependencyWriter spanstore.Writer + traces map[pcommon.TraceID]*TraceState tracesLock sync.RWMutex closeChan chan struct{} beamPipeline *beam.Pipeline @@ -29,20 +33,23 @@ type dependencyAggregator struct { } type TraceState struct { - spans []*model.Span - spanMap map[model.SpanID]*model.Span + spans []*ptrace.Span + spanMap map[pcommon.SpanID]*ptrace.Span lastUpdateTime time.Time timer *time.Timer + serviceName string } -func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store) *dependencyAggregator { - beam.Init() +func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter spanstore.Writer) *dependencyAggregator { + beamInitOnce.Do(func() { + beam.Init() + }) p, s := beam.NewPipelineWithRoot() return &dependencyAggregator{ config: &cfg, telset: telset, dependencyWriter: dependencyWriter, - traces: make(map[model.TraceID]*TraceState), + traces: make(map[pcommon.TraceID]*TraceState), beamPipeline: p, beamScope: s, } @@ -56,9 +63,9 @@ func (agg *dependencyAggregator) Start(closeChan chan struct{}) { for { select { case <-ticker.C: - agg.processTraces(context.Background()) // Pass context + agg.processTraces(context.Background()) case <-agg.closeChan: - agg.processTraces(context.Background()) // Pass context + agg.processTraces(context.Background()) return } } @@ -76,29 +83,34 @@ func (agg *dependencyAggregator) Close() error { return nil } -func (agg *dependencyAggregator) HandleSpan(span *model.Span) { +func (agg *dependencyAggregator) HandleSpan(ctx context.Context, span ptrace.Span, serviceName string) { agg.tracesLock.Lock() defer agg.tracesLock.Unlock() - traceState, ok := agg.traces[span.TraceID] + traceID := span.TraceID() + spanID := span.SpanID() + + traceState, ok := agg.traces[traceID] if !ok { traceState = &TraceState{ - spans: []*model.Span{}, - spanMap: make(map[model.SpanID]*model.Span), + spans: []*ptrace.Span{}, + spanMap: make(map[pcommon.SpanID]*ptrace.Span), lastUpdateTime: time.Now(), + serviceName: serviceName, } - agg.traces[span.TraceID] = traceState + agg.traces[traceID] = traceState } - traceState.spans = append(traceState.spans, span) - traceState.spanMap[span.SpanID] = span + spanCopy := span + traceState.spans = append(traceState.spans, &spanCopy) + traceState.spanMap[spanID] = &spanCopy traceState.lastUpdateTime = time.Now() if traceState.timer != nil { traceState.timer.Stop() } traceState.timer = time.AfterFunc(agg.config.InactivityTimeout, func() { - agg.processTraces(context.Background()) // Pass context + agg.processTraces(ctx) }) } @@ -109,8 +121,9 @@ func (agg *dependencyAggregator) processTraces(ctx context.Context) { return } traces := agg.traces - agg.traces = make(map[model.TraceID]*TraceState) + agg.traces = make(map[pcommon.TraceID]*TraceState) agg.tracesLock.Unlock() + for _, traceState := range traces { if traceState.timer != nil { traceState.timer.Stop() @@ -123,8 +136,8 @@ func (agg *dependencyAggregator) processTraces(ctx context.Context) { } } -func (agg *dependencyAggregator) createBeamInput(traces map[model.TraceID]*TraceState) beam.PCollection { - var allSpans []*model.Span +func (agg *dependencyAggregator) createBeamInput(traces map[pcommon.TraceID]*TraceState) beam.PCollection { + var allSpans []*ptrace.Span for _, traceState := range traces { allSpans = append(allSpans, traceState.spans...) } @@ -135,16 +148,16 @@ func (agg *dependencyAggregator) createBeamInput(traces map[model.TraceID]*Trace } func (agg *dependencyAggregator) calculateDependencies(ctx context.Context, input beam.PCollection) { - keyedSpans := beam.ParDo(agg.beamScope, func(s *model.Span) (model.TraceID, *model.Span) { - return s.TraceID, s + keyedSpans := beam.ParDo(agg.beamScope, func(s *ptrace.Span) (pcommon.TraceID, *ptrace.Span) { + return s.TraceID(), s }, input) groupedSpans := beam.GroupByKey(agg.beamScope, keyedSpans) - depLinks := beam.ParDo(agg.beamScope, func(_ model.TraceID, spansIter func(*model.Span) bool) []*model.DependencyLink { + depLinks := beam.ParDo(agg.beamScope, func(_ pcommon.TraceID, spansIter func(*ptrace.Span) bool) []*model.DependencyLink { deps := map[string]*model.DependencyLink{} - var span *model.Span + var span *ptrace.Span for spansIter(span) { - processSpan(deps, span, agg.traces) + processSpanOtel(deps, span, agg.traces) } return depMapToSlice(deps) }, groupedSpans) @@ -167,35 +180,45 @@ func (agg *dependencyAggregator) calculateDependencies(ctx context.Context, inpu }() } -// processSpan is a copy from the memory storage plugin -func processSpan(deps map[string]*model.DependencyLink, s *model.Span, traces map[model.TraceID]*TraceState) { - parentSpan := seekToSpan(s, traces) - if parentSpan != nil { - if parentSpan.Process.ServiceName == s.Process.ServiceName { - return - } - depKey := parentSpan.Process.ServiceName + "&&&" + s.Process.ServiceName - if _, ok := deps[depKey]; !ok { - deps[depKey] = &model.DependencyLink{ - Parent: parentSpan.Process.ServiceName, - Child: s.Process.ServiceName, - CallCount: 1, - } - } else { - deps[depKey].CallCount++ - } - } -} +func processSpanOtel(deps map[string]*model.DependencyLink, span *ptrace.Span, traces map[pcommon.TraceID]*TraceState) { + traceID := span.TraceID() + parentSpanID := span.ParentSpanID() -func seekToSpan(span *model.Span, traces map[model.TraceID]*TraceState) *model.Span { - traceState, ok := traces[span.TraceID] + traceState, ok := traces[traceID] if !ok { - return nil + return + } + + parentSpan := traceState.spanMap[parentSpanID] + if parentSpan == nil { + return + } + + parentTraceState := traces[traceID] + if parentTraceState == nil { + return + } + + parentService := parentTraceState.serviceName + currentService := traceState.serviceName + + if parentService == currentService || parentService == "" || currentService == "" { + return + } + + depKey := parentService + "&&&" + currentService + if _, ok := deps[depKey]; !ok { + deps[depKey] = &model.DependencyLink{ + Parent: parentService, + Child: currentService, + CallCount: 1, + } + } else { + deps[depKey].CallCount++ } - return traceState.spanMap[span.ParentSpanID()] } -// depMapToSlice modifies the spans to DependencyLink in the same way as the memory storage plugin +// depMapToSlice converts dependency map to slice func depMapToSlice(deps map[string]*model.DependencyLink) []*model.DependencyLink { retMe := make([]*model.DependencyLink, 0, len(deps)) for _, dep := range deps { diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go new file mode 100644 index 00000000000..95b4876aa12 --- /dev/null +++ b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator_test.go @@ -0,0 +1,172 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package dependencyprocessor + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/model" +) + +// MockDependencyWriter is a mock implementation of spanstore.Writer +type MockDependencyWriter struct { + mock.Mock +} + +func (m *MockDependencyWriter) WriteSpan(ctx context.Context, span *model.Span) error { + args := m.Called(ctx, span) + return args.Error(0) +} + +func (m *MockDependencyWriter) WriteDependencies(ctx context.Context, ts time.Time, deps []model.DependencyLink) error { + args := m.Called(ctx, ts, deps) + return args.Error(0) +} + +func TestAggregator(t *testing.T) { + // Create mock writer + mockWriter := new(MockDependencyWriter) + + // Create config + cfg := Config{ + AggregationInterval: 100 * time.Millisecond, + InactivityTimeout: 50 * time.Millisecond, + } + + // Create logger + logger := zap.NewNop() + telemetrySettings := component.TelemetrySettings{ + Logger: logger, + } + + // Create aggregator + agg := newDependencyAggregator(cfg, telemetrySettings, mockWriter) + + // Start aggregator + closeChan := make(chan struct{}) + agg.Start(closeChan) + defer close(closeChan) + + // Create test spans + traceID := createTraceID(1) + parentSpanID := createSpanID(2) + childSpanID := createSpanID(3) + + // Create parent span + parentSpan := createSpan(traceID, parentSpanID, pcommon.SpanID{}, "service1") + + // Create child span + childSpan := createSpan(traceID, childSpanID, parentSpanID, "service2") + + // Setup mock expectations + mockWriter.On("WriteDependencies", mock.Anything, mock.Anything, mock.MatchedBy(func(deps []model.DependencyLink) bool { + if len(deps) != 1 { + return false + } + dep := deps[0] + return dep.Parent == "service1" && dep.Child == "service2" && dep.CallCount == 1 + })).Return(nil) + + // Handle spans + ctx := context.Background() + agg.HandleSpan(ctx, parentSpan, "service1") + agg.HandleSpan(ctx, childSpan, "service2") + + // Wait for processing and verify + assert.Eventually(t, func() bool { + return mockWriter.AssertExpectations(t) + }, time.Second, 10*time.Millisecond, "Dependencies were not written as expected") +} + +func TestAggregatorInactivityTimeout(t *testing.T) { + mockWriter := new(MockDependencyWriter) + cfg := Config{ + AggregationInterval: 1 * time.Second, + InactivityTimeout: 50 * time.Millisecond, + } + + agg := newDependencyAggregator(cfg, component.TelemetrySettings{Logger: zap.NewNop()}, mockWriter) + closeChan := make(chan struct{}) + agg.Start(closeChan) + defer close(closeChan) + + traceID := createTraceID(1) + spanID := createSpanID(1) + span := createSpan(traceID, spanID, pcommon.SpanID{}, "service1") + + mockWriter.On("WriteDependencies", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + ctx := context.Background() + agg.HandleSpan(ctx, span, "service1") + + assert.Eventually(t, func() bool { + agg.tracesLock.RLock() + defer agg.tracesLock.RUnlock() + return len(agg.traces) == 0 + }, time.Second, 10*time.Millisecond, "Trace was not cleared after inactivity timeout") +} + +func TestAggregatorClose(t *testing.T) { + mockWriter := new(MockDependencyWriter) + cfg := Config{ + AggregationInterval: 1 * time.Second, + InactivityTimeout: 1 * time.Second, + } + + agg := newDependencyAggregator(cfg, component.TelemetrySettings{Logger: zap.NewNop()}, mockWriter) + closeChan := make(chan struct{}) + agg.Start(closeChan) + + traceID := createTraceID(1) + spanID := createSpanID(1) + span := createSpan(traceID, spanID, pcommon.SpanID{}, "service1") + + ctx := context.Background() + agg.HandleSpan(ctx, span, "service1") + + mockWriter.On("WriteDependencies", mock.Anything, mock.Anything, mock.Anything).Return(nil) + + close(closeChan) + err := agg.Close() + require.NoError(t, err) + + assert.Eventually(t, func() bool { + agg.tracesLock.RLock() + defer agg.tracesLock.RUnlock() + return len(agg.traces) == 0 + }, time.Second, 10*time.Millisecond, "Traces were not cleared after close") +} + +// Helper functions + +func createTraceID(id byte) pcommon.TraceID { + var traceID [16]byte + traceID[15] = id + return pcommon.TraceID(traceID) +} + +func createSpanID(id byte) pcommon.SpanID { + var spanID [8]byte + spanID[7] = id + return pcommon.SpanID(spanID) +} + +func createSpan(traceID pcommon.TraceID, spanID pcommon.SpanID, parentSpanID pcommon.SpanID, serviceName string) ptrace.Span { + span := ptrace.NewSpan() + span.SetTraceID(traceID) + span.SetSpanID(spanID) + span.SetParentSpanID(parentSpanID) + // Additional span attributes could be set here if needed + return span +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/config.go b/cmd/jaeger/internal/processors/dependencyprocessor/config.go index ed6750cded1..22bf357c3af 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/config.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/config.go @@ -4,18 +4,31 @@ package dependencyprocessor import ( + "errors" "time" - - "github.com/jaegertracing/jaeger/plugin/storage/memory" ) type Config struct { - // AggregationInterval defines how often the processor aggregates dependencies. - // This controls the frequency of flushing dependency data to storage. - // Default dependency aggregation interval: 10 seconds - AggregationInterval time.Duration `yaml:"aggregation_interval"` + // AggregationInterval defines the length of aggregation window after + // which the accumulated dependencies are flushed into storage. + AggregationInterval time.Duration `yaml:"aggregation_interval" valid:"gt=0"` // InactivityTimeout specifies the duration of inactivity after which a trace // is considered complete and ready for dependency aggregation. - // Default trace completion timeout: 2 seconds of inactivity - InactivityTimeout time.Duration `yaml:"inactivity_timeout"` + InactivityTimeout time.Duration `yaml:"inactivity_timeout" valid:"gt=0"` + // StorageName specifies the storage backend to use for dependency data. + StorageName string `yaml:"storage_name" valid:"required"` +} + +// Validate checks the configuration fields for validity. +func (c *Config) Validate() error { + if c.AggregationInterval <= 0 { + return errors.New("aggregation_interval must be greater than 0") + } + if c.InactivityTimeout <= 0 { + return errors.New("inactivity_timeout must be greater than 0") + } + if c.StorageName == "" { + return errors.New("storage_name must be provided and cannot be empty") + } + return nil } diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/factory.go b/cmd/jaeger/internal/processors/dependencyprocessor/factory.go index ee063431f83..96b24abe516 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/factory.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/factory.go @@ -10,8 +10,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/processor" - - "github.com/jaegertracing/jaeger/plugin/storage/memory" ) // componentType is the name of this processor in configuration. @@ -28,8 +26,8 @@ func NewFactory() processor.Factory { func createDefaultConfig() component.Config { return &Config{ - AggregationInterval: 10 * time.Second, - InactivityTimeout: 2 * time.Second, + AggregationInterval: 10 * time.Minute, + InactivityTimeout: 2 * time.Minute, } } @@ -42,7 +40,7 @@ func createTracesProcessor( ) (processor.Traces, error) { oCfg := cfg.(*Config) - dp := newDependencyProcessor(*oCfg, set.TelemetrySettings, oCfg.Store, nextConsumer) + dp := newDependencyProcessor(*oCfg, set.TelemetrySettings, nextConsumer) return dp, nil } diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/processor.go b/cmd/jaeger/internal/processors/dependencyprocessor/processor.go index 62a59cf4fc5..56fd8f24454 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/processor.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/processor.go @@ -9,32 +9,49 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" - "github.com/jaegertracing/jaeger/plugin/storage/memory" - "github.com/jaegertracing/jaeger/storage_v2/v1adapter" + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" + "github.com/jaegertracing/jaeger/storage/spanstore" ) type dependencyProcessor struct { config *Config aggregator *dependencyAggregator // Define the aggregator below. telset component.TelemetrySettings - dependencyWriter *memory.Store + dependencyWriter spanstore.Writer closeChan chan struct{} nextConsumer consumer.Traces } -func newDependencyProcessor(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store, nextConsumer consumer.Traces) *dependencyProcessor { +func newDependencyProcessor(cfg Config, telset component.TelemetrySettings, nextConsumer consumer.Traces) *dependencyProcessor { return &dependencyProcessor{ - config: &cfg, - telset: telset, - dependencyWriter: dependencyWriter, - closeChan: make(chan struct{}), - nextConsumer: nextConsumer, + config: &cfg, + telset: telset, + closeChan: make(chan struct{}), + nextConsumer: nextConsumer, } } func (tp *dependencyProcessor) Start(_ context.Context, host component.Host) error { + storageName := tp.config.StorageName + if storageName == "" { + storageName = "memory" + } + + f, err := jaegerstorage.GetStorageFactory(storageName, host) + if err != nil { + return fmt.Errorf("failed to get storage factory: %w", err) + } + + writer, err := f.CreateSpanWriter() + if err != nil { + return fmt.Errorf("failed to create dependency writer: %w", err) + } + + tp.dependencyWriter = writer + tp.aggregator = newDependencyAggregator(*tp.config, tp.telset, tp.dependencyWriter) tp.aggregator.Start(tp.closeChan) return nil @@ -42,7 +59,6 @@ func (tp *dependencyProcessor) Start(_ context.Context, host component.Host) err // Shutdown implements processor.Traces func (tp *dependencyProcessor) Shutdown(ctx context.Context) error { - close(tp.closeChan) if tp.aggregator != nil { if err := tp.aggregator.Close(); err != nil { return fmt.Errorf("failed to stop the dependency aggregator : %w", err) @@ -57,14 +73,26 @@ func (p *dependencyProcessor) Capabilities() consumer.Capabilities { } func (dp *dependencyProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { - batches := v1adapter.ProtoFromTraces(td) - for _, batch := range batches { - for _, span := range batch.Spans { - if span.Process == nil { - span.Process = batch.Process + for i := 0; i < td.ResourceSpans().Len(); i++ { + rs := td.ResourceSpans().At(i) + serviceName := getServiceName(rs.Resource()) + + for j := 0; j < rs.ScopeSpans().Len(); j++ { + ss := rs.ScopeSpans().At(j) + for k := 0; k < ss.Spans().Len(); k++ { + span := ss.Spans().At(k) + dp.aggregator.HandleSpan(ctx, span, serviceName) } - dp.aggregator.HandleSpan(span) } } + return dp.nextConsumer.ConsumeTraces(ctx, td) } + +func getServiceName(resource pcommon.Resource) string { + serviceNameAttr, found := resource.Attributes().Get("service.name") + if !found { + return "" + } + return serviceNameAttr.Str() +} diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go b/cmd/jaeger/internal/processors/dependencyprocessor/processor_test.go similarity index 100% rename from cmd/jaeger/internal/processors/dependencyprocessor/e2e_test.go rename to cmd/jaeger/internal/processors/dependencyprocessor/processor_test.go diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index 43c0478c60b..02dff26a584 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -24,22 +24,20 @@ type Store struct { sync.RWMutex // Each tenant gets a copy of default config. // In the future this can be extended to contain per-tenant configuration. - defaultConfig Configuration - perTenant map[string]*Tenant - useNewDependencies bool + defaultConfig Configuration + perTenant map[string]*Tenant } // Tenant is an in-memory store of traces for a single tenant type Tenant struct { sync.RWMutex - ids []*model.TraceID - traces map[model.TraceID]*model.Trace - services map[string]struct{} - operations map[string]map[spanstore.Operation]struct{} - deduper adjuster.Adjuster - config Configuration - index int - dependencyLinks map[string]*model.DependencyLink + ids []*model.TraceID + traces map[model.TraceID]*model.Trace + services map[string]struct{} + operations map[string]map[spanstore.Operation]struct{} + deduper adjuster.Adjuster + config Configuration + index int } // NewStore creates an unbounded in-memory store @@ -50,21 +48,19 @@ func NewStore() *Store { // WithConfiguration creates a new in memory storage based on the given configuration func WithConfiguration(cfg Configuration) *Store { return &Store{ - defaultConfig: cfg, - perTenant: make(map[string]*Tenant), - useNewDependencies: false, + defaultConfig: cfg, + perTenant: make(map[string]*Tenant), } } func newTenant(cfg Configuration) *Tenant { return &Tenant{ - ids: make([]*model.TraceID, cfg.MaxTraces), - traces: map[model.TraceID]*model.Trace{}, - services: map[string]struct{}{}, - operations: map[string]map[spanstore.Operation]struct{}{}, - deduper: adjuster.ZipkinSpanIDUniquifier(), - config: cfg, - dependencyLinks: make(map[string]*model.DependencyLink), + ids: make([]*model.TraceID, cfg.MaxTraces), + traces: map[model.TraceID]*model.Trace{}, + services: map[string]struct{}{}, + operations: map[string]map[spanstore.Operation]struct{}{}, + deduper: adjuster.ZipkinSpanIDUniquifier(), + config: cfg, } } @@ -87,9 +83,6 @@ func (st *Store) getTenant(tenantID string) *Tenant { // GetDependencies returns dependencies between services func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { - if st.useNewDependencies { - return st.getDependenciesNew(ctx) - } m := st.getTenant(tenancy.GetTenant(ctx)) // deduper used below can modify the spans, so we take an exclusive lock m.Lock() @@ -126,36 +119,6 @@ func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback return retMe, nil } -func (st *Store) getDependenciesNew(ctx context.Context) ([]model.DependencyLink, error) { - m := st.getTenant(tenancy.GetTenant(ctx)) - m.RLock() - defer m.RUnlock() - retMe := make([]model.DependencyLink, 0, len(m.dependencyLinks)) - for _, dep := range m.dependencyLinks { - retMe = append(retMe, *dep) - } - return retMe, nil -} - -func (st *Store) WriteDependencies(ctx context.Context, ts time.Time, dependencies []*model.DependencyLink) error { - m := st.getTenant(tenancy.GetTenant(ctx)) - m.Lock() - defer m.Unlock() - for _, dep := range dependencies { - key := dep.Parent + "&&&" + dep.Child - if _, ok := m.dependencyLinks[key]; !ok { - m.dependencyLinks[key] = &model.DependencyLink{ - Parent: dep.Parent, - Child: dep.Child, - CallCount: dep.CallCount, - } - } else { - m.dependencyLinks[key].CallCount += dep.CallCount - } - } - return nil -} - func findSpan(trace *model.Trace, spanID model.SpanID) *model.Span { for _, s := range trace.Spans { if s.SpanID == spanID { From de25c36c8500c7bd903879ae59896e2d5387a262 Mon Sep 17 00:00:00 2001 From: yunmaoQu <2643354262@qq.com> Date: Fri, 31 Jan 2025 15:27:25 +0000 Subject: [PATCH 5/5] fix Signed-off-by: yunmaoQu <2643354262@qq.com> --- .../dependencyprocessor/aggregator.go | 273 ++++++++---------- 1 file changed, 123 insertions(+), 150 deletions(-) diff --git a/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go index 0f03d2afc41..7dae6f7b366 100644 --- a/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go +++ b/cmd/jaeger/internal/processors/dependencyprocessor/aggregator.go @@ -9,6 +9,8 @@ import ( "time" "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" @@ -21,208 +23,179 @@ import ( var beamInitOnce sync.Once +// dependencyAggregator processes spans and aggregates dependencies using Apache Beam type dependencyAggregator struct { config *Config telset component.TelemetrySettings dependencyWriter spanstore.Writer - traces map[pcommon.TraceID]*TraceState - tracesLock sync.RWMutex + inputChan chan spanEvent closeChan chan struct{} - beamPipeline *beam.Pipeline - beamScope beam.Scope } -type TraceState struct { - spans []*ptrace.Span - spanMap map[pcommon.SpanID]*ptrace.Span - lastUpdateTime time.Time - timer *time.Timer - serviceName string +// spanEvent represents a span with its service name and timestamp +type spanEvent struct { + span ptrace.Span + serviceName string + eventTime time.Time } +// newDependencyAggregator creates a new dependency aggregator func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter spanstore.Writer) *dependencyAggregator { beamInitOnce.Do(func() { beam.Init() }) - p, s := beam.NewPipelineWithRoot() return &dependencyAggregator{ config: &cfg, telset: telset, dependencyWriter: dependencyWriter, - traces: make(map[pcommon.TraceID]*TraceState), - beamPipeline: p, - beamScope: s, + inputChan: make(chan spanEvent, 1000), + closeChan: make(chan struct{}), } } +// Start begins the aggregation process func (agg *dependencyAggregator) Start(closeChan chan struct{}) { agg.closeChan = closeChan - go func() { - ticker := time.NewTicker(agg.config.AggregationInterval) - defer ticker.Stop() + go agg.runPipeline() +} + +// HandleSpan processes a single span +func (agg *dependencyAggregator) HandleSpan(ctx context.Context, span ptrace.Span, serviceName string) { + event := spanEvent{ + span: span, + serviceName: serviceName, + eventTime: time.Now(), + } + select { + case agg.inputChan <- event: + default: + agg.telset.Logger.Warn("Input channel full, dropping span") + } +} + +// runPipeline runs the main processing pipeline +func (agg *dependencyAggregator) runPipeline() { + for { + var events []spanEvent + timer := time.NewTimer(agg.config.AggregationInterval) + + collectLoop: for { select { - case <-ticker.C: - agg.processTraces(context.Background()) + case event := <-agg.inputChan: + events = append(events, event) + case <-timer.C: + break collectLoop case <-agg.closeChan: - agg.processTraces(context.Background()) + if !timer.Stop() { + <-timer.C + } + if len(events) > 0 { + agg.processEvents(context.Background(), events) + } return } } - }() -} -func (agg *dependencyAggregator) Close() error { - agg.tracesLock.Lock() - defer agg.tracesLock.Unlock() - for _, traceState := range agg.traces { - if traceState.timer != nil { - traceState.timer.Stop() + if len(events) > 0 { + agg.processEvents(context.Background(), events) } } - return nil } -func (agg *dependencyAggregator) HandleSpan(ctx context.Context, span ptrace.Span, serviceName string) { - agg.tracesLock.Lock() - defer agg.tracesLock.Unlock() - - traceID := span.TraceID() - spanID := span.SpanID() - - traceState, ok := agg.traces[traceID] - if !ok { - traceState = &TraceState{ - spans: []*ptrace.Span{}, - spanMap: make(map[pcommon.SpanID]*ptrace.Span), - lastUpdateTime: time.Now(), - serviceName: serviceName, - } - agg.traces[traceID] = traceState - } - - spanCopy := span - traceState.spans = append(traceState.spans, &spanCopy) - traceState.spanMap[spanID] = &spanCopy - traceState.lastUpdateTime = time.Now() - - if traceState.timer != nil { - traceState.timer.Stop() - } - traceState.timer = time.AfterFunc(agg.config.InactivityTimeout, func() { - agg.processTraces(ctx) - }) -} - -func (agg *dependencyAggregator) processTraces(ctx context.Context) { - agg.tracesLock.Lock() - if len(agg.traces) == 0 { - agg.tracesLock.Unlock() - return - } - traces := agg.traces - agg.traces = make(map[pcommon.TraceID]*TraceState) - agg.tracesLock.Unlock() +// processEvents processes a batch of spans using Beam pipeline +func (agg *dependencyAggregator) processEvents(ctx context.Context, events []spanEvent) { + // Create new pipeline and scope + p, s := beam.NewPipelineWithRoot() - for _, traceState := range traces { - if traceState.timer != nil { - traceState.timer.Stop() + // Create initial PCollection with timestamps + col := beam.CreateList(s, events) + + // Transform into timestamped KV pairs + timestamped := beam.ParDo(s, func(event spanEvent) beam.WindowValue { + return beam.WindowValue{ + Timestamp: event.eventTime, + Windows: window.IntervalWindow{Start: event.eventTime, End: event.eventTime.Add(agg.config.InactivityTimeout)}, + Value: beam.KV{ + Key: event.span.TraceID(), + Value: event, + }, } - } + }, col) - beamInput := agg.createBeamInput(traces) - if beamInput.IsValid() { - agg.calculateDependencies(ctx, beamInput) - } -} + // Apply session windows + windowed := beam.WindowInto(s, + window.NewSessions(agg.config.InactivityTimeout), + timestamped, + ) -func (agg *dependencyAggregator) createBeamInput(traces map[pcommon.TraceID]*TraceState) beam.PCollection { - var allSpans []*ptrace.Span - for _, traceState := range traces { - allSpans = append(allSpans, traceState.spans...) - } - if len(allSpans) == 0 { - return beam.PCollection{} - } - return beam.CreateList(agg.beamScope, allSpans) -} + // Group by TraceID and aggregate dependencies + grouped := stats.GroupByKey(s, windowed) -func (agg *dependencyAggregator) calculateDependencies(ctx context.Context, input beam.PCollection) { - keyedSpans := beam.ParDo(agg.beamScope, func(s *ptrace.Span) (pcommon.TraceID, *ptrace.Span) { - return s.TraceID(), s - }, input) - - groupedSpans := beam.GroupByKey(agg.beamScope, keyedSpans) - depLinks := beam.ParDo(agg.beamScope, func(_ pcommon.TraceID, spansIter func(*ptrace.Span) bool) []*model.DependencyLink { - deps := map[string]*model.DependencyLink{} - var span *ptrace.Span - for spansIter(span) { - processSpanOtel(deps, span, agg.traces) - } - return depMapToSlice(deps) - }, groupedSpans) - flattened := beam.Flatten(agg.beamScope, depLinks) + // Calculate dependencies for each trace + dependencies := beam.ParDo(s, func(key pcommon.TraceID, iter func(*spanEvent) bool) []*model.DependencyLink { + spanMap := make(map[pcommon.SpanID]spanEvent) + var event *spanEvent - beam.ParDo0(agg.beamScope, func(deps []*model.DependencyLink) { - err := agg.dependencyWriter.WriteDependencies(ctx, time.Now(), deps) - if err != nil { - agg.telset.Logger.Error("Error writing dependencies", zap.Error(err)) + // Build span map + for iter(event) { + spanMap[event.span.SpanID()] = *event } - }, flattened) - go func() { - err := beamx.Run(ctx, agg.beamPipeline) - if err != nil { - agg.telset.Logger.Error("Error running beam pipeline", zap.Error(err)) + // Calculate dependencies + deps := make(map[string]*model.DependencyLink) + for _, event := range spanMap { + parentSpanID := event.span.ParentSpanID() + if parentEvent, hasParent := spanMap[parentSpanID]; hasParent { + parentService := parentEvent.serviceName + childService := event.serviceName + + // Create dependency link if services are different + if parentService != "" && childService != "" && parentService != childService { + depKey := parentService + "&&&" + childService + if dep, exists := deps[depKey]; exists { + dep.CallCount++ + } else { + deps[depKey] = &model.DependencyLink{ + Parent: parentService, + Child: childService, + CallCount: 1, + } + } + } + } } - agg.beamPipeline = beam.NewPipeline() - agg.beamScope = beam.Scope{Parent: beam.PipelineScope{ID: "dependency"}, Name: "dependency"} - }() -} - -func processSpanOtel(deps map[string]*model.DependencyLink, span *ptrace.Span, traces map[pcommon.TraceID]*TraceState) { - traceID := span.TraceID() - parentSpanID := span.ParentSpanID() - traceState, ok := traces[traceID] - if !ok { - return - } - - parentSpan := traceState.spanMap[parentSpanID] - if parentSpan == nil { - return - } + return depMapToSlice(deps) + }, grouped) - parentTraceState := traces[traceID] - if parentTraceState == nil { - return - } + // Merge results from all windows + merged := beam.Flatten(s, dependencies) - parentService := parentTraceState.serviceName - currentService := traceState.serviceName + // Write to storage + beam.ParDo0(s, func(deps []*model.DependencyLink) { + if err := agg.dependencyWriter.WriteDependencies(ctx, time.Now(), deps); err != nil { + agg.telset.Logger.Error("Failed to write dependencies", zap.Error(err)) + } + }, merged) - if parentService == currentService || parentService == "" || currentService == "" { - return + // Execute pipeline + if err := beamx.Run(ctx, p); err != nil { + agg.telset.Logger.Error("Failed to run beam pipeline", zap.Error(err)) } +} - depKey := parentService + "&&&" + currentService - if _, ok := deps[depKey]; !ok { - deps[depKey] = &model.DependencyLink{ - Parent: parentService, - Child: currentService, - CallCount: 1, - } - } else { - deps[depKey].CallCount++ - } +// Close shuts down the aggregator +func (agg *dependencyAggregator) Close() error { + close(agg.closeChan) + return nil } // depMapToSlice converts dependency map to slice func depMapToSlice(deps map[string]*model.DependencyLink) []*model.DependencyLink { - retMe := make([]*model.DependencyLink, 0, len(deps)) + result := make([]*model.DependencyLink, 0, len(deps)) for _, dep := range deps { - retMe = append(retMe, dep) + result = append(result, dep) } - return retMe + return result }