Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dead letter index: align error field to ECS and do not forward retryable errors #793

Merged
merged 27 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
30512d3
Move error field to error.message and error.type
kaiyan-sheng Sep 12, 2024
f08b49a
Move error parsing into a helper
zmoog Sep 17, 2024
089b19e
merge action and error into one dict
zmoog Sep 17, 2024
1080814
Cleanup
zmoog Sep 26, 2024
2ac4cdf
Fix http.response.status_code and clean up
zmoog Sep 26, 2024
feae60c
Set error.type with the exception type name as str
zmoog Sep 26, 2024
f37a3c6
Fix error.type
zmoog Sep 26, 2024
8c937b7
Test http.response.status_code
zmoog Sep 26, 2024
b3a85b2
Do not index connection errors and filter by type
zmoog Sep 30, 2024
cf2e156
Add es_dead_letter_forward_errors to config
zmoog Sep 30, 2024
e2f1abc
Fix private var name
zmoog Sep 30, 2024
0375d2f
Add logs to DLI
zmoog Sep 30, 2024
2ff886d
Logging
zmoog Sep 30, 2024
227d8a5
More logs
zmoog Sep 30, 2024
3109e90
Fix tests
zmoog Sep 30, 2024
afd9e40
Add test_es_dead_letter_index_with_included_action_error test
zmoog Sep 30, 2024
1fd2c03
Fix linter objections
zmoog Sep 30, 2024
6fdce15
Add _parse_error() tests
zmoog Sep 30, 2024
49d7b01
Fix test_es_dead_letter_index_with_included_action_error
zmoog Sep 30, 2024
82e663f
Clean up debug loggers
zmoog Oct 1, 2024
572e764
Add http.response.status_code check on parse_error
zmoog Oct 1, 2024
0e01d22
Update docs
zmoog Oct 2, 2024
6b83e5c
Clarify es_dead_letter_forward_errors docs
zmoog Oct 4, 2024
b1d4642
Exclude retryable errors from DLI
zmoog Oct 7, 2024
e485725
Add a non-retryable error test and fix linter
zmoog Oct 7, 2024
05f3290
Update retryable status codes list
zmoog Oct 7, 2024
f41a5f9
Update DLI docs and docstring
zmoog Oct 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,30 @@ For `elasticsearch` the following arguments are supported:
* `args.password` Password of the elasticsearch instance to connect to. Mandatory when `args.api_key` is not provided. Will take precedence over `args.api_key` if both are defined.
* `args.api_key`: API key of elasticsearch endpoint in the format `base64encode(api_key_id:api_key_secret)`. Mandatory when `args.username` and `args.password` are not provided. Will be ignored if `args.username`/`args.password` are defined.
* `args.es_datastream_name`: Name of data stream or index where logs should be forwarded to. Lambda supports automatic routing of various {aws} service logs to the corresponding data streams for further processing and storage in the {es} cluster. It supports automatic routing of `aws.cloudtrail`, `aws.cloudwatch_logs`, `aws.elb_logs`, `aws.firewall_logs`, `aws.vpcflow`, and `aws.waf` logs. For other log types, if using data streams, you can optionally set its value in the configuration file according to the naming convention for data streams and available integrations. If the `es_datastream_name` is not specified and it cannot be matched with any of the above {aws} services, then the value will be set to `logs-generic-default`. In versions **v0.29.1** and below, this configuration parameter was named `es_index_or_datastream_name`. Rename the configuration parameter to `es_datastream_name` in your `config.yaml` file on the S3 bucket to continue using it in the future version. The older name `es_index_or_datastream_name` is deprecated as of version **v0.30.0**. The related backward compatibility code is removed from version **v1.0.0**.
* `args.es_dead_letter_index`: Name of data stream or index where logs should be redirected to, in case indexing to `args.es_datastream_name` returned an error.
* `args.es_dead_letter_index`: Name of data stream or index where logs should be redirected to, in case indexing to `args.es_datastream_name` returned an error. The elasticseach output will NOT forward retryable errors (connection failures, HTTP status code 429) to the dead letter index.
* `args.batch_max_actions`: (Optional) Maximum number of actions to send in a single bulk request. Default value: 500.
* `args.batch_max_bytes`: (Optional) Maximum size in bytes to send in a single bulk request. Default value: 10485760 (10MB).
* `args.ssl_assert_fingerprint`: (Optional) SSL fingerprint for self-signed SSL certificate on HTTPS transport. The default value is an empty string, meaning the HTTP client requires a valid certificate.

. Here is a sample error indexed in the dead letter index:
+
[source, json]
----
{
"@timestamp": "2024-10-07T05:57:59.448925Z",
"message": "{\"hey\":{\"message\":\"hey there\"},\"_id\":\"e6542822-4583-438d-9b4d-1a3023b5eeb9\",\"_op_type\":\"create\",\"_index\":\"logs-succeed.pr793-default\"}",
"error": {
"message": "[1:30] failed to parse field [hey] of type [keyword] in document with id 'e6542822-4583-438d-9b4d-1a3023b5eeb9'. Preview of field's value: '{message=hey there}'",
"type": "document_parsing_exception"
},
"http": {
"response": {
"status_code": 400
}
}
}
----

For `logstash` the following arguments are supported:

* `args.logstash_url`: URL of {ls} endpoint in the format `http(s)://host:port`
Expand Down
114 changes: 103 additions & 11 deletions shippers/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# you may not use this file except in compliance with the Elastic License 2.0.

import datetime
import http
import uuid
from typing import Any, Dict, Optional, Union

Expand All @@ -21,7 +22,10 @@

_EVENT_BUFFERED = "_EVENT_BUFFERED"
_EVENT_SENT = "_EVENT_SENT"
_VERSION_CONFLICT = 409
# List of HTTP status codes that are considered retryable
_retryable_http_status_codes = [
http.HTTPStatus.TOO_MANY_REQUESTS,
]


class JSONSerializer(Serializer):
Expand Down Expand Up @@ -172,11 +176,13 @@ def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Unio
"elasticsearch shipper", extra={"error": error["create"]["error"], "_id": error["create"]["_id"]}
)

if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT:
if "status" in error["create"] and error["create"]["status"] == http.HTTPStatus.CONFLICT:
# Skip duplicate events on dead letter index and replay queue
continue

failed.append({"error": error["create"]["error"], "action": action_failed[0]})
failed_error = {"action": action_failed[0]} | self._parse_error(error["create"])

failed.append(failed_error)

if len(failed) > 0:
shared_logger.warning("elasticsearch shipper", extra={"success": success, "failed": len(failed)})
Expand All @@ -185,6 +191,52 @@ def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Unio

return failed

def _parse_error(self, error: dict[str, Any]) -> dict[str, Any]:
"""
Parses the error response from Elasticsearch and returns a
standardised error field.

The error field is a dictionary with the following keys:

- `message`: The error message
- `type`: The error type

If the error is not recognised, the `message` key is set
to "Unknown error".

It also sets the status code in the http field if it is present
as a number in the response.
"""
field: dict[str, Any] = {"error": {"message": "Unknown error", "type": "unknown"}}

if "status" in error and isinstance(error["status"], int):
# Collecting the HTTP response status code in the
# error field, if present, and the type is an integer.
#
# Sometimes the status code is a string, for example,
# when the connection to the server fails.
field["http"] = {"response": {"status_code": error["status"]}}

if "error" not in error:
return field

if isinstance(error["error"], str):
zmoog marked this conversation as resolved.
Show resolved Hide resolved
# Can happen with connection errors.
field["error"]["message"] = error["error"]
if "exception" in error:
# The exception field is usually an Exception object,
# so we convert it to a string.
field["error"]["type"] = str(type(error["exception"]))
elif isinstance(error["error"], dict):
# Can happen with status 5xx errors.
# In this case, we look for the "reason" and "type" fields.
if "reason" in error["error"]:
field["error"]["message"] = error["error"]["reason"]
if "type" in error["error"]:
field["error"]["type"] = error["error"]["type"]

return field

def set_event_id_generator(self, event_id_generator: EventIdGeneratorCallable) -> None:
self._event_id_generator = event_id_generator

Expand Down Expand Up @@ -243,26 +295,59 @@ def flush(self) -> None:
return

def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]:
"""
Index the failed actions in the dead letter index (DLI).

This function attempts to index failed actions to the DLI, but may not do so
for one of the following reasons:

1. The failed action could not be encoded for indexing in the DLI.
2. ES returned an error on the attempt to index the failed action in the DLI.
3. The failed action error is retryable (connection error or status code 429).

Retryable errors are not indexed in the DLI, as they are expected to be
sent again to the data stream at `es_datastream_name` by the replay handler.

Args:
actions (list[Any]): A list of actions to index in the DLI.

Returns:
list[Any]: A list of actions that were not indexed in the DLI due to one of
the reasons mentioned above.
"""
non_indexed_actions: list[Any] = []
encoded_actions = []
dead_letter_errors: list[Any] = []

for action in actions:
if (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

"http" not in action # no http status: connection error
or action["http"]["response"]["status_code"] in _retryable_http_status_codes
):
# We don't want to forward this action to
# the dead letter index.
#
# Add the action to the list of non-indexed
# actions and continue with the next one.
non_indexed_actions.append(action)
continue

# Reshape event to dead letter index
encoded = self._encode_dead_letter(action)
if not encoded:
shared_logger.error("cannot encode dead letter index event from payload", extra={"action": action})
dead_letter_errors.append(action)
non_indexed_actions.append(action)

encoded_actions.append(encoded)

# If no action can be encoded, return original action list as failed
if len(encoded_actions) == 0:
return dead_letter_errors
return non_indexed_actions

errors = es_bulk(self._es_client, encoded_actions, **self._bulk_kwargs)
failed = self._handle_outcome(actions=encoded_actions, errors=errors)

if not isinstance(failed, list) or len(failed) == 0:
return dead_letter_errors
return non_indexed_actions

for action in failed:
event_payload = self._decode_dead_letter(action)
Expand All @@ -271,25 +356,32 @@ def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]:
shared_logger.error("cannot decode dead letter index event from payload", extra={"action": action})
continue

dead_letter_errors.append(event_payload)
non_indexed_actions.append(event_payload)

return dead_letter_errors
return non_indexed_actions

def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]:
if "action" not in outcome or "error" not in outcome:
return {}

# Assign random id in case bulk() results in error, it can be matched to the original
# action
return {
encoded = {
"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"_id": f"{uuid.uuid4()}",
"_id": str(uuid.uuid4()),
"_index": self._es_dead_letter_index,
"_op_type": "create",
"message": json_dumper(outcome["action"]),
"error": outcome["error"],
}

if "http" in outcome:
# the `http.response.status_code` is not
# always present in the error field.
encoded["http"] = outcome["http"]

return encoded

def _decode_dead_letter(self, dead_letter_outcome: dict[str, Any]) -> dict[str, Any]:
if "action" not in dead_letter_outcome or "message" not in dead_letter_outcome["action"]:
return {}
Expand Down
116 changes: 115 additions & 1 deletion tests/handlers/aws/test_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4292,10 +4292,11 @@ def test_es_dead_letter_index(self) -> None:
assert res["hits"]["total"] == {"value": 1, "relation": "eq"}

assert (
res["hits"]["hits"][0]["_source"]["error"]["reason"]
res["hits"]["hits"][0]["_source"]["error"]["message"]
== "test_es_non_indexable_dead_letter_index fail message"
)
assert res["hits"]["hits"][0]["_source"]["error"]["type"] == "fail_processor_exception"
assert res["hits"]["hits"][0]["_source"]["http"]["response"]["status_code"] == 500
dead_letter_message = json_parser(res["hits"]["hits"][0]["_source"]["message"])
assert dead_letter_message["log"]["offset"] == 0
assert dead_letter_message["log"]["file"]["path"] == sqs_queue_url_path
Expand Down Expand Up @@ -4419,3 +4420,116 @@ def test_es_non_indexable_dead_letter_index(self) -> None:
assert first_body["event_payload"]["cloud"]["region"] == "us-east-1"
assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000"
assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"]

def test_es_dead_letter_index_with_retryable_errors(self) -> None:
"""
Test that retryable errors are not redirected to the dead letter index (DLI).
"""
assert isinstance(self.elasticsearch, ElasticsearchContainer)
assert isinstance(self.localstack, LocalStackContainer)

sqs_queue_name = _time_based_id(suffix="source-sqs")
sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url())

dead_letter_index_name = "logs-generic-default-dli"

sqs_queue_arn = sqs_queue["QueueArn"]
sqs_queue_url = sqs_queue["QueueUrl"]
sqs_queue_url_path = sqs_queue["QueueUrlPath"]

config_yaml: str = f"""
inputs:
- type: sqs
id: "{sqs_queue_arn}"
tags: {self.default_tags}
outputs:
- type: "elasticsearch"
args:
# This IP address is non-routable and
# will always result in a connection failure.
elasticsearch_url: "0.0.0.0:9200"
es_dead_letter_index: "{dead_letter_index_name}"
ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint}
username: "{self.secret_arn}:username"
password: "{self.secret_arn}:password"
"""

config_file_path = "config.yaml"
config_bucket_name = _time_based_id(suffix="config-bucket")
_s3_upload_content_to_bucket(
client=self.s3_client,
content=config_yaml.encode("utf-8"),
content_type="text/plain",
bucket_name=config_bucket_name,
key=config_file_path,
)

os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}"

fixtures = [
_load_file_fixture("cloudwatch-log-1.json"),
]

_sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures))

event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn)
message_id = event["Records"][0]["messageId"]

# Create pipeline to reject documents
processors = {
"processors": [
{
"fail": {
"message": "test_es_dead_letter_index_with_retryable_errors fail message",
}
},
]
}

self.elasticsearch.put_pipeline(
id="test_es_dead_letter_index_with_retryable_errors_fail_pipeline",
body=processors,
)

self.elasticsearch.create_data_stream(name="logs-generic-default")
self.elasticsearch.put_settings(
index="logs-generic-default",
body={"index.default_pipeline": "test_es_dead_letter_index_with_retryable_errors_fail_pipeline"},
)

self.elasticsearch.refresh(index="logs-generic-default")

self.elasticsearch.create_data_stream(name=dead_letter_index_name)

ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m)
first_call = handler(event, ctx) # type:ignore

assert first_call == "completed"

# Test document has been rejected from target index
self.elasticsearch.refresh(index="logs-generic-default")

assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0

# Test event does not go into the dead letter queue
assert self.elasticsearch.exists(index=dead_letter_index_name) is True

self.elasticsearch.refresh(index=dead_letter_index_name)

assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 0

# Test event has been redirected into the replay queue
events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn)
assert len(events["Records"]) == 1

first_body: dict[str, Any] = json_parser(events["Records"][0]["body"])

assert first_body["event_payload"]["message"] == fixtures[0].rstrip("\n")
assert first_body["event_payload"]["log"]["offset"] == 0
assert first_body["event_payload"]["log"]["file"]["path"] == sqs_queue_url_path
assert first_body["event_payload"]["aws"]["sqs"]["name"] == sqs_queue_name
assert first_body["event_payload"]["aws"]["sqs"]["message_id"] == message_id
assert first_body["event_payload"]["cloud"]["provider"] == "aws"
assert first_body["event_payload"]["cloud"]["region"] == "us-east-1"
assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000"
assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"]
Loading