Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backoff and error payload sampling to error handling bundle #189

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

gregfurman
Copy link
Collaborator

@gregfurman gregfurman commented Dec 16, 2024

Changes

  • Adds an ErrorSampleBundle that will randomly select a proportion of failed events and log them -- allowing us to see the payload of those events that failed processing.
  • Adds a RetryBundle that will retry failed processing steps with exponential backoff -- closing the input layer while retrying.

Examples

Errored Payload Sampling

The below will log a proportion of all failed payloads:

error_handling:
  error_sample_rate: 0.5

Retry with backoff

error_handling:
  strategy: backoff
  max_retries: 0 # default is 0 which means we will retry forever.

TODO

  • Add configuration tests for the new error_handling fields.
  • Add mocked tests for error_sample_rate.
  • Validations for max_retries field.

@gregfurman gregfurman self-assigned this Dec 16, 2024
Copy link
Collaborator

@jem-davies jem-davies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do another look at the processors but seems ok to me

internal/bundle/strict/bundle.go Outdated Show resolved Hide resolved
internal/cli/common/manager.go Outdated Show resolved Hide resolved
internal/bundle/strict/bundle.go Show resolved Hide resolved
internal/bundle/strict/bundle.go Show resolved Hide resolved
internal/bundle/strict/bundle.go Show resolved Hide resolved
internal/bundle/strict/bundle.go Outdated Show resolved Hide resolved
@gregfurman gregfurman marked this pull request as ready for review December 24, 2024 16:53
@gregfurman gregfurman requested a review from ryanworl December 24, 2024 16:53
Copy link
Collaborator

@jem-davies jem-davies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is an issue with the description of the backoff / max_retries - about the dropping of messages but it doesn't drop messages like the reject strategy? - it will continue along the pipeline after doing the max_retries?

I think it would be better as well if the content on /components/processors/about#error-handling was moved to /docs/configuration/about because this is where the other top level config options such as shutdown_delay are described?

I noticed that there is a relative link to error-handling here when it should probably be error-handling-1 it's a bit confusing because there are two headings both 'error-handling'?

I think as well that there isn't enough explanation in the docs currently about the backoff strategy - about how that will block input and the default options for the:

backoffCtor := func() backoff.BackOff {
		boff := backoff.NewExponentialBackOff()
		boff.InitialInterval = time.Millisecond * 100
		boff.MaxInterval = time.Second * 60
		boff.MaxElapsedTime = 0
		return boff
	}

EDIT:

(this is really something that came in on the other PR and I didn't raise it then but under the error-handling section it reads: "The handling of rejected messages then depends on the input component's nack behavior - by default, triggering the reprocessing failed messages from scratch."

But I think that the reject_error output explains this better with "For inputs that support propagating nacks upstream such as AMQP or NATS the message will be nacked. However, for inputs that are sequential such as files or Kafka the messages will simply be reprocessed from scratch."?

EDIT_2 - update links to the actual website and not localhost 😆

Comment on lines +67 to +77
sobj, parseErr := p.AsStructured()
if parseErr != nil {
fields["parse_payload_error"] = parseErr.Error()
} else {
// Convert to string representation
if jsonBytes, err := json.Marshal(sobj); err == nil {
fields["structured_payload"] = string(jsonBytes)
} else {
fields["structured_payload_error"] = "failed to marshal structured payload"
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could just return the raw_payload field and not worry about marshalling it to json then returning either a structured_payload / structured_payload_error field, just because it's an assumption to make, that it will be a json message, it might not be, (could be a csv row) and marshalling every message part in a loop could be resource intensive.

It probably makes sense to add a field to enable or disable these structured_payload / structured_payload_error fields because it sounds contextual to specific application of Bento to me?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should worry about overhead tbh. If resource intensivity is an issue, a user can just sample a smaller proportion of failed payloads.

If I read in CSV data, don't we convert this to a JSON friendly format anyway when constructing messages? So the structured payload will still correctly show CSV data but just in JSON/structured form

internal/errorhandling/docs.go Show resolved Hide resolved
Comment on lines +322 to +325
| Strategy | Description |
|----------|-------------|
| `reject` | Immediately fails entire batch when any message has an error. Propagates negative acknowledgment (nack) to the input. |
| `backoff` | Retries batches containing failed messages using exponential backoff. Starting at `100ms`, the delay doubles after each retry up to a maximum of `60s` between attempts. When `max_retries` is `0` (default), retries continue indefinitely. Otherwise, messages are dropped from the batch after the maximum retries are exhausted. |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| Strategy | Description |
|----------|-------------|
| `reject` | Immediately fails entire batch when any message has an error. Propagates negative acknowledgment (nack) to the input. |
| `backoff` | Retries batches containing failed messages using exponential backoff. Starting at `100ms`, the delay doubles after each retry up to a maximum of `60s` between attempts. When `max_retries` is `0` (default), retries continue indefinitely. Otherwise, messages are dropped from the batch after the maximum retries are exhausted. |
| Strategy | Version | Description |
|----------|-------------|-------------|
| `reject` | v1.4.0 |Immediately fails entire batch when any message has an error. Propagates negative acknowledgment (nack) to the input. |
| `backoff` | v1.5.0 |Retries batches containing failed messages using exponential backoff. Starting at `100ms`, the delay doubles after each retry up to a maximum of `60s` between attempts. When `max_retries` is `0` (default), retries continue indefinitely. Otherwise, messages are dropped from the batch after the maximum retries are exhausted. |

Think would be good to add a version column for the strategies.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants