Skip to content

Commit

Permalink
[chore] Cleanup batch and queue senders test to not use the whole exp…
Browse files Browse the repository at this point in the history
…orter

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Feb 9, 2025
1 parent 83d93cd commit fd88469
Show file tree
Hide file tree
Showing 4 changed files with 301 additions and 484 deletions.
74 changes: 72 additions & 2 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
Expand All @@ -19,6 +20,7 @@ import (
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/exporter/internal/requesttest"
"go.opentelemetry.io/collector/pipeline"
)

Expand All @@ -41,7 +43,12 @@ type noopSender struct {

func newNoopExportSender() Sender[internal.Request] {
return &noopSender{SendFunc: func(ctx context.Context, req internal.Request) error {
return req.Export(ctx)
select {
case <-ctx.Done():
return ctx.Err() // Returns the cancellation error
default:
return req.Export(ctx)
}
}}
}

Expand Down Expand Up @@ -97,7 +104,7 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
require.Error(t, err)

_, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender,
WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&requesttest.FakeRequest{Items: 1})),
WithRetry(configretry.NewDefaultBackOffConfig()),
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]()))
require.Error(t, err)
Expand All @@ -122,8 +129,71 @@ func TestBaseExporterLogging(t *testing.T) {
require.Error(t, sendErr)

require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 1)
assert.Contains(t, observed.All()[0].Message, "Exporting failed. Rejecting data.")
assert.Equal(t, "my error", observed.All()[0].ContextMap()["error"])
})
}
runTest("enable_queue_batcher", true)
runTest("disable_queue_batcher", false)
}

func TestQueueRetryWithDisabledQueue(t *testing.T) {
tests := []struct {
name string
queueOptions []Option
}{
{
name: "WithQueue",
queueOptions: []Option{
WithMarshaler(mockRequestMarshaler),
WithUnmarshaler(mockRequestUnmarshaler(&requesttest.FakeRequest{Items: 1})),
func() Option {
qs := NewDefaultQueueConfig()
qs.Enabled = false
return WithQueue(qs)
}(),
},
},
{
name: "WithRequestQueue",
queueOptions: []Option{
func() Option {
qs := exporterqueue.NewDefaultConfig()
qs.Enabled = false
return WithRequestQueue(qs, exporterqueue.NewMemoryQueueFactory[internal.Request]())
}(),
},
},
}

runTest := func(testName string, enableQueueBatcher bool, tt struct {
name string
queueOptions []Option
},
) {
t.Run(testName, func(t *testing.T) {
defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)()
set := exportertest.NewNopSettings()
logger, observed := observer.New(zap.ErrorLevel)
set.Logger = zap.New(logger)
be, err := NewBaseExporter(set, pipeline.SignalLogs, newObservabilityConsumerSender, tt.queueOptions...)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
ocs := be.ObsrepSender.(*observabilityConsumerSender)
mockR := &requesttest.FakeRequest{Items: 2, ExportErr: errors.New("some error")}
ocs.run(func() {
require.Error(t, be.Send(context.Background(), mockR))
})
assert.Len(t, observed.All(), 1)
assert.Equal(t, "Exporting failed. Rejecting data. Try enabling sending_queue to survive temporary failures.", observed.All()[0].Message)
ocs.awaitAsyncProcessing()
ocs.checkSendItemsCount(t, 0)
ocs.checkDroppedItemsCount(t, 2)
require.NoError(t, be.Shutdown(context.Background()))
})
}
for _, tt := range tests {
runTest(tt.name+"_enable_queue_batcher", true, tt)
runTest(tt.name+"_disable_queue_batcher", false, tt)
}
}
Loading

0 comments on commit fd88469

Please sign in to comment.