Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backoff and error payload sampling to error handling bundle #189

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
104 changes: 97 additions & 7 deletions internal/bundle/strict/bundle.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,118 @@
package strict

import (
"context"
"fmt"

"github.com/warpstreamlabs/bento/internal/bundle"
"github.com/warpstreamlabs/bento/internal/component/input"
"github.com/warpstreamlabs/bento/internal/component/processor"
"github.com/warpstreamlabs/bento/internal/errorhandling"
"github.com/warpstreamlabs/bento/internal/manager"
)

// StrictBundle modifies a provided bundle environment so that all procesors
// will fail an entire batch if any any message-level error is encountered. These
func NewErrorHandlingBundleFromConfig(ctx context.Context, cfg errorhandling.Config, b *bundle.Environment) *bundle.Environment {
env := b.Clone()

if cfg.ErrorSampleRate > 0 {
env = ErrorSampleBundle(env, cfg.ErrorSampleRate)
}

switch cfg.Strategy {
case "reject":
env = StrictBundle(env)
case "backoff":
env = RetryBundle(ctx, env, cfg.MaxRetries)
}

return env

}

// WithStrictBundle modifies a provided bundle environment so that all processors
// will fail an entire batch if any message-level error is encountered. These
// failed batches are nacked and/or reprocessed depending on your input.
func StrictBundle(b *bundle.Environment) *bundle.Environment {
strictEnv := b.Clone()

env := b.Clone()
for _, spec := range b.ProcessorDocs() {
_ = strictEnv.ProcessorAdd(func(conf processor.Config, nm bundle.NewManagement) (processor.V1, error) {
_ = env.ProcessorAdd(func(conf processor.Config, nm bundle.NewManagement) (processor.V1, error) {
proc, err := b.ProcessorInit(conf, nm)
if err != nil {
return nil, err
}

proc = wrapWithStrict(proc)
return proc, err
}, spec)
}

// TODO: Overwrite inputs for retry with backoff
return env
}
func ErrorSampleBundle(b *bundle.Environment, rate float64) *bundle.Environment {
errorSampleEnv := b.Clone()

for _, spec := range b.ProcessorDocs() {
_ = errorSampleEnv.ProcessorAdd(func(conf processor.Config, nm bundle.NewManagement) (processor.V1, error) {
proc, err := b.ProcessorInit(conf, nm)
if err != nil {
return nil, err
}
proc = wrapWithErrorSampling(proc, nm, rate)
return proc, err
}, spec)
}

return errorSampleEnv
}

// RetryBundle returns a modified environment to reprocess any batches containing any message-level errors
// through the entire pipeline. During retries, the input-layer will pause new message ingestion.
func RetryBundle(ctx context.Context, b *bundle.Environment, maxRetries int) *bundle.Environment {
env := b.Clone()

for _, spec := range b.InputDocs() {
_ = env.InputAdd(func(conf input.Config, nm bundle.NewManagement) (input.Streamed, error) {
jem-davies marked this conversation as resolved.
Show resolved Hide resolved
jem-davies marked this conversation as resolved.
Show resolved Hide resolved
i, err := b.InputInit(conf, nm)
if err != nil {
return nil, err
}

// Check if the type is an AsyncReader
if rdr, ok := i.(*input.AsyncReader); ok {
async := rdr.UnwrapAsyncReader()
// If not already an AsyncPreserver, wrap with one and create
// new AsyncReader.
if _, isRetryInput := async.(*input.AsyncPreserver); !isRetryInput {

wrapped := manager.WrapInput(i)
if err := wrapped.CloseExistingInput(ctx, true); err != nil {
return nil, fmt.Errorf("failed to close existing input %s: %w", spec.Name, err)
}
// NewAsyncReader will automatically start an input-ingestion goroutine.
// Hence, we need to close the previous existing input prior to starting a new wrapped one.
retryInput, err := input.NewAsyncReader(conf.Type, input.NewAsyncPreserver(async), nm)
if err != nil {
return nil, fmt.Errorf("failed to wrap input %s with retry: %w", spec.Name, err)
}

wrapped.SwapInput(retryInput)
return wrapped, nil
}
}
return i, err
}, spec)
}

for _, spec := range b.ProcessorDocs() {
_ = env.ProcessorAdd(func(conf processor.Config, nm bundle.NewManagement) (processor.V1, error) {
jem-davies marked this conversation as resolved.
Show resolved Hide resolved
proc, err := b.ProcessorInit(conf, nm)
if err != nil {
return nil, err
}
proc = wrapWithRetry(proc, nm, maxRetries)
return proc, err
}, spec)
}

return strictEnv
return env
}
32 changes: 32 additions & 0 deletions internal/bundle/strict/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,38 @@ bloblang: root = this
assert.Equal(t, `{"hello":"world"}`, string(msgs[0].Get(0).AsBytes()))
}

func TestStrictBundleProcessorWithRetry(t *testing.T) {
senv := strict.RetryBundle(context.Background(), bundle.GlobalEnvironment, 3)
tCtx := context.Background()

pConf, err := testutil.ProcessorFromYAML(`
bloblang: root = this
`)
require.NoError(t, err)

mgr, err := manager.New(
manager.ResourceConfig{},
manager.OptSetEnvironment(senv),
)
require.NoError(t, err)

proc, err := mgr.NewProcessor(pConf)
require.NoError(t, err)

msg := message.QuickBatch([][]byte{[]byte("not a structured doc")})
msgs, res := proc.ProcessBatch(tCtx, msg)
require.Empty(t, msgs)
require.Error(t, res)
assert.ErrorContains(t, res, "invalid character 'o' in literal null (expecting 'u')")

msg = message.QuickBatch([][]byte{[]byte(`{"hello":"world"}`)})
msgs, res = proc.ProcessBatch(tCtx, msg)
require.NoError(t, res)
require.Len(t, msgs, 1)
assert.Equal(t, 1, msgs[0].Len())
assert.Equal(t, `{"hello":"world"}`, string(msgs[0].Get(0).AsBytes()))
}

func TestStrictBundleProcessorMultiMessage(t *testing.T) {
senv := strict.StrictBundle(bundle.GlobalEnvironment)
tCtx := context.Background()
Expand Down
160 changes: 160 additions & 0 deletions internal/bundle/strict/processor_retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package strict

import (
"context"
"sync"
"time"

"github.com/cenkalti/backoff/v4"

"github.com/warpstreamlabs/bento/internal/bundle"
iprocessor "github.com/warpstreamlabs/bento/internal/component/processor"
"github.com/warpstreamlabs/bento/internal/log"
"github.com/warpstreamlabs/bento/internal/message"
)

func wrapWithRetry(p iprocessor.V1, mgr bundle.NewManagement, maxRetries int) iprocessor.V1 {
backoffCtor := func() backoff.BackOff {
boff := backoff.NewExponentialBackOff()
boff.InitialInterval = time.Millisecond * 100
boff.MaxInterval = time.Second * 60
boff.MaxElapsedTime = 0
return boff
}

t := &retryProcessor{
wrapped: p,
log: mgr.Logger(),
enabled: true,
maxRetries: maxRetries,
backoffCtor: backoffCtor,

currentRetries: 0,
backoffDuration: 0,
}
return t
}

//------------------------------------------------------------------------------

// retryProcessor retries a batch processing step if any message contains an error.
type retryProcessor struct {
enabled bool
wrapped iprocessor.V1
log log.Modular

backoffCtor func() backoff.BackOff
backoff backoff.BackOff

maxRetries int

currentRetries int
backoffDuration time.Duration

m sync.RWMutex
}

func (r *retryProcessor) ProcessBatch(ctx context.Context, b message.Batch) ([]message.Batch, error) {
if !r.enabled {
return r.wrapped.ProcessBatch(ctx, b)
}

if r.backoff == nil {
r.backoff = r.backoffCtor()
r.reset()
}

// Clear all previous errors prior to checking
_ = b.Iter(func(i int, p *message.Part) error {
p.ErrorSet(nil)
return nil
})

resBatches, err := r.wrapped.ProcessBatch(ctx, b)
if err != nil {
return nil, err
}

hasFailed := false

errorChecks:
for _, b := range resBatches {
for _, m := range b {
if err = m.ErrorGet(); err != nil {
hasFailed = true
break errorChecks
}
}
}

if !hasFailed {
r.reset()
return resBatches, nil
}

r.m.Lock()
defer r.m.Unlock()
r.currentRetries++
if r.maxRetries > 0 && r.currentRetries > r.maxRetries {
r.reset()
r.log.With("error", err).Debug("Error occurred and maximum number of retries was reached.")

// drop messages with errors
filteredBatches := make([]message.Batch, 0, len(resBatches))
for _, batch := range resBatches {
validMessages := make([]*message.Part, 0, len(batch))

for _, msg := range batch {
if msg.ErrorGet() == nil {
validMessages = append(validMessages, msg)
}
}

// only add batch if non-empty
if len(validMessages) > 0 {
filteredBatches = append(filteredBatches, validMessages)
}
}

return filteredBatches, nil
}

nextSleep := r.backoff.NextBackOff()
r.backoffDuration += nextSleep
if nextSleep == backoff.Stop {
r.reset()
r.log.With("error", err).Debug("Error occurred and maximum wait period was reached.")
return resBatches, nil
}

r.log.With(
"error", err,
"retry", r.currentRetries,
"backoff", nextSleep,
"total_backoff", r.backoffDuration,
).Trace("Error occurred, sleeping for next backoff period.")

select {
case <-time.After(nextSleep):
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
}

}

func (r *retryProcessor) Close(ctx context.Context) error {
return r.wrapped.Close(ctx)
}

func (r *retryProcessor) UnwrapProc() iprocessor.V1 {
return r.wrapped
}

func (r *retryProcessor) reset() {
r.currentRetries = 0
r.backoffDuration = 0
if r.backoff != nil {
r.backoff.Reset()
}
}
Loading
Loading