diff --git a/internal/component/otelcol/connector/connector.go b/internal/component/otelcol/connector/connector.go index a0185ae0f4..bb9b3a151e 100644 --- a/internal/component/otelcol/connector/connector.go +++ b/internal/component/otelcol/connector/connector.go @@ -7,15 +7,6 @@ import ( "errors" "os" - "github.com/grafana/alloy/internal/build" - "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/component/otelcol" - otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" - "github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer" - "github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector" - "github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer" - "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" - "github.com/grafana/alloy/internal/util/zapadapter" "github.com/prometheus/client_golang/prometheus" otelcomponent "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" @@ -26,6 +17,16 @@ import ( otelmetric "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/sdk/metric" + + "github.com/grafana/alloy/internal/build" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/otelcol" + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/internal/fanoutconsumer" + "github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector" + "github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer" + "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" + "github.com/grafana/alloy/internal/util/zapadapter" ) const ( @@ -94,7 +95,7 @@ var ( func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Connector, error) { ctx, cancel := context.WithCancel(context.Background()) - consumer := lazyconsumer.New(ctx) + consumer := lazyconsumer.NewPaused(ctx) // Create a lazy collector where metrics from the upstream component will be // forwarded. @@ -116,7 +117,7 @@ func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Conn factory: f, consumer: consumer, - sched: scheduler.New(opts.Logger), + sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), collector: collector, } if err := p.Update(args); err != nil { diff --git a/internal/component/otelcol/exporter/exporter.go b/internal/component/otelcol/exporter/exporter.go index 90f1826966..22a2b56d3a 100644 --- a/internal/component/otelcol/exporter/exporter.go +++ b/internal/component/otelcol/exporter/exporter.go @@ -7,15 +7,6 @@ import ( "errors" "os" - "github.com/grafana/alloy/internal/build" - "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/component/otelcol" - otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" - "github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector" - "github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer" - "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" - "github.com/grafana/alloy/internal/component/otelcol/internal/views" - "github.com/grafana/alloy/internal/util/zapadapter" "github.com/prometheus/client_golang/prometheus" otelcomponent "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" @@ -26,6 +17,16 @@ import ( otelmetric "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/sdk/metric" + + "github.com/grafana/alloy/internal/build" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/otelcol" + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/internal/lazycollector" + "github.com/grafana/alloy/internal/component/otelcol/internal/lazyconsumer" + "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" + "github.com/grafana/alloy/internal/component/otelcol/internal/views" + "github.com/grafana/alloy/internal/util/zapadapter" ) // Arguments is an extension of component.Arguments which contains necessary @@ -108,7 +109,7 @@ var ( func New(opts component.Options, f otelexporter.Factory, args Arguments, supportedSignals TypeSignal) (*Exporter, error) { ctx, cancel := context.WithCancel(context.Background()) - consumer := lazyconsumer.New(ctx) + consumer := lazyconsumer.NewPaused(ctx) // Create a lazy collector where metrics from the upstream component will be // forwarded. @@ -130,7 +131,7 @@ func New(opts component.Options, f otelexporter.Factory, args Arguments, support factory: f, consumer: consumer, - sched: scheduler.New(opts.Logger), + sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), collector: collector, supportedSignals: supportedSignals, diff --git a/internal/component/otelcol/internal/lazyconsumer/lazyconsumer.go b/internal/component/otelcol/internal/lazyconsumer/lazyconsumer.go index 3d2e653423..a5813e8469 100644 --- a/internal/component/otelcol/internal/lazyconsumer/lazyconsumer.go +++ b/internal/component/otelcol/internal/lazyconsumer/lazyconsumer.go @@ -17,6 +17,10 @@ import ( type Consumer struct { ctx context.Context + // pauseMut and pausedWg are used to implement Pause & Resume semantics. See Pause method for more info. + pauseMut sync.RWMutex + pausedWg *sync.WaitGroup + mut sync.RWMutex metricsConsumer otelconsumer.Metrics logsConsumer otelconsumer.Logs @@ -36,6 +40,13 @@ func New(ctx context.Context) *Consumer { return &Consumer{ctx: ctx} } +// NewPaused is like New, but returns a Consumer that is paused by calling Pause method. +func NewPaused(ctx context.Context) *Consumer { + c := New(ctx) + c.Pause() + return c +} + // Capabilities implements otelconsumer.baseConsumer. func (c *Consumer) Capabilities() otelconsumer.Capabilities { return otelconsumer.Capabilities{ @@ -52,6 +63,8 @@ func (c *Consumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { return c.ctx.Err() } + c.waitUntilResumed() + c.mut.RLock() defer c.mut.RUnlock() @@ -73,6 +86,8 @@ func (c *Consumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error return c.ctx.Err() } + c.waitUntilResumed() + c.mut.RLock() defer c.mut.RUnlock() @@ -94,6 +109,8 @@ func (c *Consumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error { return c.ctx.Err() } + c.waitUntilResumed() + c.mut.RLock() defer c.mut.RUnlock() @@ -109,6 +126,15 @@ func (c *Consumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error { return c.logsConsumer.ConsumeLogs(ctx, ld) } +func (c *Consumer) waitUntilResumed() { + c.pauseMut.RLock() + pausedWg := c.pausedWg + c.pauseMut.RUnlock() + if pausedWg != nil { + pausedWg.Wait() + } +} + // SetConsumers updates the internal consumers that Consumer will forward data // to. It is valid for any combination of m, l, and t to be nil. func (c *Consumer) SetConsumers(t otelconsumer.Traces, m otelconsumer.Metrics, l otelconsumer.Logs) { @@ -119,3 +145,37 @@ func (c *Consumer) SetConsumers(t otelconsumer.Traces, m otelconsumer.Metrics, l c.logsConsumer = l c.tracesConsumer = t } + +// Pause will stop the consumer until Resume is called. While paused, the calls to Consume* methods will block. +// Pause can be called multiple times, but a single call to Resume will un-pause this consumer. Thread-safe. +func (c *Consumer) Pause() { + c.pauseMut.Lock() + defer c.pauseMut.Unlock() + + if c.pausedWg != nil { + return // already paused + } + + c.pausedWg = &sync.WaitGroup{} + c.pausedWg.Add(1) +} + +// Resume will revert the Pause call and the consumer will continue to work. See Pause for more details. +func (c *Consumer) Resume() { + c.pauseMut.Lock() + defer c.pauseMut.Unlock() + + if c.pausedWg == nil { + return // already resumed + } + + c.pausedWg.Done() // release all waiting + c.pausedWg = nil +} + +// IsPaused returns whether the consumer is currently paused. See Pause for details. +func (c *Consumer) IsPaused() bool { + c.pauseMut.RLock() + defer c.pauseMut.RUnlock() + return c.pausedWg != nil +} diff --git a/internal/component/otelcol/internal/lazyconsumer/lazyconsumer_test.go b/internal/component/otelcol/internal/lazyconsumer/lazyconsumer_test.go new file mode 100644 index 0000000000..1980029d5e --- /dev/null +++ b/internal/component/otelcol/internal/lazyconsumer/lazyconsumer_test.go @@ -0,0 +1,161 @@ +package lazyconsumer + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/goleak" + + "github.com/grafana/alloy/internal/runtime/componenttest" +) + +func Test_PauseAndResume(t *testing.T) { + c := New(componenttest.TestContext(t)) + require.False(t, c.IsPaused()) + c.Pause() + require.True(t, c.IsPaused()) + c.Resume() + require.False(t, c.IsPaused()) +} + +func Test_NewPaused(t *testing.T) { + c := NewPaused(componenttest.TestContext(t)) + require.True(t, c.IsPaused()) + c.Resume() + require.False(t, c.IsPaused()) +} + +func Test_PauseResume_MultipleCalls(t *testing.T) { + c := New(componenttest.TestContext(t)) + require.False(t, c.IsPaused()) + c.Pause() + c.Pause() + c.Pause() + require.True(t, c.IsPaused()) + c.Resume() + c.Resume() + c.Resume() + require.False(t, c.IsPaused()) +} + +func Test_ConsumeWaitsForResume(t *testing.T) { + goleak.VerifyNone(t, goleak.IgnoreCurrent()) + c := NewPaused(componenttest.TestContext(t)) + require.True(t, c.IsPaused()) + + method := map[string]func(){ + "ConsumeTraces": func() { + _ = c.ConsumeTraces(nil, ptrace.NewTraces()) + }, + "ConsumeMetrics": func() { + _ = c.ConsumeMetrics(nil, pmetric.NewMetrics()) + }, + "ConsumeLogs": func() { + _ = c.ConsumeLogs(nil, plog.NewLogs()) + }, + } + + for name, fn := range method { + t.Run(name, func(t *testing.T) { + c.Pause() + require.True(t, c.IsPaused()) + + started := make(chan struct{}) + finished := make(chan struct{}) + + // Start goroutine that attempts to run Consume* method + go func() { + started <- struct{}{} + fn() + finished <- struct{}{} + }() + + // Wait to be started + select { + case <-started: + case <-time.After(5 * time.Second): + t.Fatal("consumer goroutine never started") + } + + // Wait for a bit to ensure the consumer is blocking on Consume* function + select { + case <-finished: + t.Fatal("consumer should not have finished yet - it's paused") + case <-time.After(100 * time.Millisecond): + } + + // Resume the consumer and verify the Consume* function unblocked + c.Resume() + select { + case <-finished: + case <-time.After(5 * time.Second): + t.Fatal("consumer should have finished after resuming") + } + + }) + } +} + +func Test_PauseResume_Multithreaded(t *testing.T) { + goleak.VerifyNone(t, goleak.IgnoreCurrent()) + ctx, cancel := context.WithCancel(componenttest.TestContext(t)) + runs := 500 + routines := 5 + allDone := sync.WaitGroup{} + + c := NewPaused(componenttest.TestContext(t)) + require.True(t, c.IsPaused()) + + // Run goroutines that constantly try to call Consume* methods + for i := 0; i < routines; i++ { + allDone.Add(1) + go func() { + for { + select { + case <-ctx.Done(): + allDone.Done() + return + default: + _ = c.ConsumeLogs(ctx, plog.NewLogs()) + _ = c.ConsumeMetrics(ctx, pmetric.NewMetrics()) + _ = c.ConsumeTraces(ctx, ptrace.NewTraces()) + } + } + }() + } + + // Run goroutines that Pause and then Resume in parallel. + // In particular, this verifies we can call .Pause() and .Resume() on an already paused or already resumed consumer. + workChan := make(chan struct{}, routines) + for i := 0; i < routines; i++ { + allDone.Add(1) + go func() { + for { + select { + case <-workChan: + c.Pause() + c.Resume() + case <-ctx.Done(): + allDone.Done() + return + } + } + }() + } + + for i := 0; i < runs; i++ { + workChan <- struct{}{} + } + cancel() + + allDone.Wait() + + // Should not be paused as last call will always be c.Resume() + require.False(t, c.IsPaused()) +} diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 022c09dadf..2c731616a5 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -9,10 +9,11 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/component" - "github.com/grafana/alloy/internal/runtime/logging/level" otelcomponent "go.opentelemetry.io/collector/component" "go.uber.org/multierr" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/runtime/logging/level" ) // Scheduler implements manages a set of OpenTelemetry Collector components. @@ -39,6 +40,11 @@ type Scheduler struct { // newComponentsCh is written to when schedComponents gets updated. newComponentsCh chan struct{} + + // onPause is called when scheduler is making changes to running components. + onPause func() + // onResume is called when scheduler is done making changes to running components. + onResume func() } // New creates a new unstarted Scheduler. Call Run to start it, and call @@ -47,6 +53,21 @@ func New(l log.Logger) *Scheduler { return &Scheduler{ log: l, newComponentsCh: make(chan struct{}, 1), + onPause: func() {}, + onResume: func() {}, + } +} + +// NewWithPauseCallbacks is like New, but allows to specify onPause and onResume callbacks. The scheduler is assumed to +// start paused and only when its components are scheduled, it will call onResume. From then on, each update to running +// components via Schedule method will trigger a call to onPause and then onResume. When scheduler is shutting down, it +// will call onResume as a last step. +func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler { + return &Scheduler{ + log: l, + newComponentsCh: make(chan struct{}, 1), + onPause: onPause, + onResume: onResume, } } @@ -75,11 +96,16 @@ func (cs *Scheduler) Schedule(h otelcomponent.Host, cc ...otelcomponent.Componen // Run starts the Scheduler. Run will watch for schedule components to appear // and run them, terminating previously running components if they exist. func (cs *Scheduler) Run(ctx context.Context) error { + firstRun := true var components []otelcomponent.Component // Make sure we terminate all of our running components on shutdown. defer func() { + if !firstRun { // always handle the callbacks correctly + cs.onPause() + } cs.stopComponents(context.Background(), components...) + cs.onResume() }() // Wait for a write to cs.newComponentsCh. The initial list of components is @@ -90,6 +116,11 @@ func (cs *Scheduler) Run(ctx context.Context) error { case <-ctx.Done(): return nil case <-cs.newComponentsCh: + if !firstRun { + cs.onPause() // do not pause on first run + } else { + firstRun = false + } // Stop the old components before running new scheduled ones. cs.stopComponents(ctx, components...) @@ -100,6 +131,7 @@ func (cs *Scheduler) Run(ctx context.Context) error { level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components)) components = cs.startComponents(ctx, host, components...) + cs.onResume() } } } diff --git a/internal/component/otelcol/internal/scheduler/scheduler_test.go b/internal/component/otelcol/internal/scheduler/scheduler_test.go index 5f238b33de..469d679b7f 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler_test.go +++ b/internal/component/otelcol/internal/scheduler/scheduler_test.go @@ -5,11 +5,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + otelcomponent "go.opentelemetry.io/collector/component" + "go.uber.org/atomic" + "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" "github.com/grafana/alloy/internal/runtime/componenttest" "github.com/grafana/alloy/internal/util" - "github.com/stretchr/testify/require" - otelcomponent "go.opentelemetry.io/collector/component" ) func TestScheduler(t *testing.T) { @@ -58,6 +61,59 @@ func TestScheduler(t *testing.T) { require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") }) + t.Run("Pause callbacks are called", func(t *testing.T) { + var ( + pauseCalls = &atomic.Int32{} + resumeCalls = &atomic.Int32{} + l = util.TestLogger(t) + cs = scheduler.NewWithPauseCallbacks( + l, + func() { pauseCalls.Inc() }, + func() { resumeCalls.Inc() }, + ) + h = scheduler.NewHost(l) + ) + ctx, cancel := context.WithCancel(context.Background()) + + // Run our scheduler in the background. + go func() { + err := cs.Run(ctx) + require.NoError(t, err) + }() + + // Schedule our component, which should notify the started and stopped + // trigger once it starts and stops respectively. + component, started, stopped := newTriggerComponent() + cs.Schedule(h, component) + + toInt := func(a *atomic.Int32) int { return int(a.Load()) } + + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, 0, toInt(pauseCalls), "pause callbacks should not be called on first run") + assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on first run") + }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") + + // Wait for the component to start, and then unschedule all components, which + // should cause our running component to terminate. + require.NoError(t, started.Wait(5*time.Second), "component did not start") + cs.Schedule(h) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, 1, toInt(pauseCalls), "pause callback should be called on second run") + assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on second run") + }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") + + require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") + + // Stop the scheduler + cancel() + + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, 2, toInt(pauseCalls), "pause callback should be called on shutdown") + assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on shutdown") + }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") + }) + t.Run("Running components get stopped on shutdown", func(t *testing.T) { var ( l = util.TestLogger(t) diff --git a/internal/component/otelcol/processor/batch/batch_test.go b/internal/component/otelcol/processor/batch/batch_test.go index 12604eff83..b69c4802cc 100644 --- a/internal/component/otelcol/processor/batch/batch_test.go +++ b/internal/component/otelcol/processor/batch/batch_test.go @@ -5,6 +5,11 @@ import ( "testing" "time" + "github.com/grafana/dskit/backoff" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor/batchprocessor" + "github.com/grafana/alloy/internal/component/otelcol" "github.com/grafana/alloy/internal/component/otelcol/internal/fakeconsumer" "github.com/grafana/alloy/internal/component/otelcol/processor/batch" @@ -12,10 +17,6 @@ import ( "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/grafana/alloy/internal/util" "github.com/grafana/alloy/syntax" - "github.com/grafana/dskit/backoff" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/ptrace" - "go.opentelemetry.io/collector/processor/batchprocessor" ) // Test performs a basic integration test which runs the @@ -53,10 +54,7 @@ func Test(t *testing.T) { go func() { exports := ctrl.Exports().(otelcol.ConsumerExports) - bo := backoff.New(ctx, backoff.Config{ - MinBackoff: 10 * time.Millisecond, - MaxBackoff: 100 * time.Millisecond, - }) + bo := backoff.New(ctx, testBackoffConfig()) for bo.Ongoing() { err := exports.Input.ConsumeTraces(ctx, createTestTraces()) if err != nil { @@ -78,6 +76,69 @@ func Test(t *testing.T) { } } +func Test_Update(t *testing.T) { + ctx := componenttest.TestContext(t) + + ctrl, err := componenttest.NewControllerFromID(util.TestLogger(t), "otelcol.processor.batch") + require.NoError(t, err) + + args := batch.Arguments{ + Timeout: 10 * time.Millisecond, + } + args.SetToDefault() + + // Override our arguments so traces get forwarded to traceCh. + traceCh := make(chan ptrace.Traces) + args.Output = makeTracesOutput(traceCh) + + go func() { + err := ctrl.Run(ctx, args) + require.NoError(t, err) + }() + + // Verify running and exported + require.NoError(t, ctrl.WaitRunning(time.Second), "component never started") + require.NoError(t, ctrl.WaitExports(time.Second), "component never exported anything") + + // Update the args + args.Timeout = 20 * time.Millisecond + require.NoError(t, ctrl.Update(args)) + + // Verify running + require.NoError(t, ctrl.WaitRunning(time.Second), "component never started") + + // Send traces in the background to our processor. + go func() { + exports := ctrl.Exports().(otelcol.ConsumerExports) + + bo := backoff.New(ctx, testBackoffConfig()) + for bo.Ongoing() { + err := exports.Input.ConsumeTraces(ctx, createTestTraces()) + if err != nil { + level.Error(util.TestLogger(t)).Log("msg", "failed to send traces", "err", err) + bo.Wait() + continue + } + return + } + }() + + // Wait for our processor to finish and forward data to traceCh. + select { + case <-time.After(time.Second): + require.FailNow(t, "failed waiting for traces") + case tr := <-traceCh: + require.Equal(t, 1, tr.SpanCount()) + } +} + +func testBackoffConfig() backoff.Config { + return backoff.Config{ + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, + } +} + // makeTracesOutput returns ConsumerArguments which will forward traces to the // provided channel. func makeTracesOutput(ch chan ptrace.Traces) *otelcol.ConsumerArguments { diff --git a/internal/component/otelcol/processor/processor.go b/internal/component/otelcol/processor/processor.go index 5aaa4750b3..5072d65233 100644 --- a/internal/component/otelcol/processor/processor.go +++ b/internal/component/otelcol/processor/processor.go @@ -7,6 +7,17 @@ import ( "errors" "os" + "github.com/prometheus/client_golang/prometheus" + otelcomponent "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" + otelextension "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/pipeline" + otelprocessor "go.opentelemetry.io/collector/processor" + sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus" + otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/sdk/metric" + "github.com/grafana/alloy/internal/build" "github.com/grafana/alloy/internal/component" "github.com/grafana/alloy/internal/component/otelcol" @@ -18,16 +29,6 @@ import ( "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" "github.com/grafana/alloy/internal/service/livedebugging" "github.com/grafana/alloy/internal/util/zapadapter" - "github.com/prometheus/client_golang/prometheus" - otelcomponent "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" - otelextension "go.opentelemetry.io/collector/extension" - "go.opentelemetry.io/collector/pipeline" - otelprocessor "go.opentelemetry.io/collector/processor" - sdkprometheus "go.opentelemetry.io/otel/exporters/prometheus" - otelmetric "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" - "go.opentelemetry.io/otel/sdk/metric" ) // Arguments is an extension of component.Arguments which contains necessary @@ -94,7 +95,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc ctx, cancel := context.WithCancel(context.Background()) - consumer := lazyconsumer.New(ctx) + consumer := lazyconsumer.NewPaused(ctx) // Create a lazy collector where metrics from the upstream component will be // forwarded. @@ -116,7 +117,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc factory: f, consumer: consumer, - sched: scheduler.New(opts.Logger), + sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), collector: collector, liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID),