Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dependency processor using Apache Beam #6560

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

yunmaoQu
Copy link

Which problem is this PR solving?

Resolves #5911

Description of the changes

  • add dependency processor using Apache Beam

How was this change tested?

  • e2e tests

Checklist

@yunmaoQu yunmaoQu requested a review from a team as a code owner January 17, 2025 17:37
@yunmaoQu yunmaoQu requested a review from joe-elliott January 17, 2025 17:37
@yunmaoQu yunmaoQu force-pushed the add-dependency-processor branch from af5f794 to 60fb334 Compare January 17, 2025 17:42
perTenant: make(map[string]*Tenant),
defaultConfig: cfg,
perTenant: make(map[string]*Tenant),
useNewDependencies: false, // 添加初始化
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use English in comments

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Member

@yurishkuro yurishkuro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • where is it hooked up to anything?
  • what would be the e2e testing for this component?

@yunmaoQu
Copy link
Author

yunmaoQu commented Jan 18, 2025

  • where is it hooked up to anything?
  • what would be the e2e testing for this component?

@yurishkuro I have fixed it

Copy link
Member

@yurishkuro yurishkuro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mahadzaryab1 interesting direction here

type Config struct {
AggregationInterval time.Duration `yaml:"aggregation_interval"`
InactivityTimeout time.Duration `yaml:"inactivity_timeout"`
Store *memory.Store `yaml:"-"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storage is not part of the config. The processor itself can retrieve storage from storage extension - see adaptive sampling processor for example.

require.NoError(t, err)

// Wait for the processor to process the trace
time.Sleep(cfg.AggregationInterval + 100*time.Millisecond)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use sleep in tests, use assert.Eventually


func DefaultConfig() Config {
return Config{
AggregationInterval: 5 * time.Second, // Default dependency aggregation interval: 5 seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is way too frequent. We used to flush every 15min at Uber, since the topology does not change that often, but I can see a need for more frequent flushing if we want to capture movement of metrics.

Suggested change
AggregationInterval: 5 * time.Second, // Default dependency aggregation interval: 5 seconds
AggregationInterval: 10 * time.Minute,

The comment is redundant.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to flush every minute to support investigating metrics movement, but it will create too much data. We may have to consider integrating with Prometheus instead, and only focus on capturing topology here, which definitely does not need 1min frequency.

Signed-off-by: yunmaoQu <[email protected]>
@yunmaoQu
Copy link
Author

yunmaoQu commented Jan 20, 2025

@yurishkuro Except this ,I update all based on your review.

config *Config
aggregator *dependencyAggregator // Define the aggregator below.
telset component.TelemetrySettings
dependencyWriter *memory.Store
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as I mentioned, you cannot have concrete store dependency here. The processor needs to work with any storage supported by Jaeger, as long as they implement WriteDependencies.

Example:

f, err := jaegerstorage.GetStorageFactory(storageName, host)

func (tp *dependencyProcessor) Shutdown(ctx context.Context) error {
close(tp.closeChan)
if tp.aggregator != nil {
if err := tp.aggregator.Close(); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if aggregator has a Close() function why does it need to be passed closeChan?

type Config struct {
// AggregationInterval defines how often the processor aggregates dependencies.
// This controls the frequency of flushing dependency data to storage.
// Default dependency aggregation interval: 10 seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to mention default, it's defined elsewhere and we don't have to keep two places in sync.

Comment on lines +13 to +14
// AggregationInterval defines how often the processor aggregates dependencies.
// This controls the frequency of flushing dependency data to storage.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// AggregationInterval defines how often the processor aggregates dependencies.
// This controls the frequency of flushing dependency data to storage.
// AggregationInterval defines the length of aggregation window after
// which the accumulated dependencies are flushed into storage.

Comment on lines +27 to +28

cfg.AggregationInterval = 1 * time.Second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cfg.AggregationInterval = 1 * time.Second
cfg.AggregationInterval = 1 * time.Second

// is considered complete and ready for dependency aggregation.
// Default trace completion timeout: 2 seconds of inactivity
InactivityTimeout time.Duration `yaml:"inactivity_timeout"`
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add Validate method and use valid: notations in the field tags.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

"github.com/jaegertracing/jaeger/plugin/storage/memory"
)

type dependencyAggregator struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a lot of code here. How can it be unit tested with beam?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Comment on lines +31 to +32
AggregationInterval: 10 * time.Second,
InactivityTimeout: 2 * time.Second,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
AggregationInterval: 10 * time.Second,
InactivityTimeout: 2 * time.Second,
AggregationInterval: 10 * time.Minute,
InactivityTimeout: 2 * time.Minute,

}

func (dp *dependencyProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
batches := v1adapter.ProtoFromTraces(td)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to fall back to the old Jaeger model? This is new code, let's keep it in the ptrace domain.

}

func newDependencyAggregator(cfg Config, telset component.TelemetrySettings, dependencyWriter *memory.Store) *dependencyAggregator {
beam.Init()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be called more than once?

"github.com/jaegertracing/jaeger/plugin/storage/memory"
)

func TestDependencyProcessorEndToEnd(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this is a unit test, not an e2e test. By e2e I mean running the new processor inside a real Jaeger binary and validating the correct processing through the API calls. See cmd/jaeger/internal/integration/.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks a lot. I will fix it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement in-memory Service Dependency Graph using Apache Beam
2 participants