Skip to content

Commit

Permalink
Add dead letter index for ES outputs (#733)
Browse files Browse the repository at this point in the history
Add dead letter index for ES outputs (#733)
  • Loading branch information
emilioalvap authored Jul 11, 2024
1 parent cce7939 commit 1962405
Show file tree
Hide file tree
Showing 7 changed files with 334 additions and 20 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### v1.17.0 - 2024/07/10
##### Features
* Add dead letter index for ES outputs [733](https://github.com/elastic/elastic-serverless-forwarder/pull/733).

### v1.16.0 - 2024/07/09
##### Features
* Prevent duplicate _id events from reaching the replay queue [729](https://github.com/elastic/elastic-serverless-forwarder/pull/729).
Expand Down
6 changes: 6 additions & 0 deletions docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ inputs:
username: "username"
password: "password"
es_datastream_name: "logs-generic-default"
es_dead_letter_index: "esf-dead-letter-index" # optional
batch_max_actions: 500 # optional: default value is 500
batch_max_bytes: 10485760 # optional: default value is 10485760
- type: "logstash"
Expand All @@ -98,6 +99,7 @@ inputs:
username: "username"
password: "password"
es_datastream_name: "logs-generic-default"
es_dead_letter_index: "esf-dead-letter-index" # optional
batch_max_actions: 500 # optional: default value is 500
batch_max_bytes: 10485760 # optional: default value is 10485760
- type: "logstash"
Expand All @@ -121,6 +123,7 @@ inputs:
username: "username"
password: "password"
es_datastream_name: "logs-generic-default"
es_dead_letter_index: "esf-dead-letter-index" # optional
batch_max_actions: 500 # optional: default value is 500
batch_max_bytes: 10485760 # optional: default value is 10485760
- type: "logstash"
Expand All @@ -144,6 +147,7 @@ inputs:
username: "username"
password: "password"
es_datastream_name: "logs-generic-default"
es_dead_letter_index: "esf-dead-letter-index" # optional
batch_max_actions: 500 # optional: default value is 500
batch_max_bytes: 10485760 # optional: default value is 10485760
- type: "logstash"
Expand All @@ -167,6 +171,7 @@ inputs:
username: "username"
password: "password"
es_datastream_name: "logs-generic-default"
es_dead_letter_index: "esf-dead-letter-index" # optional
batch_max_actions: 500 # optional: default value is 500
batch_max_bytes: 10485760 # optional: default value is 10485760
- type: "logstash"
Expand Down Expand Up @@ -224,6 +229,7 @@ For `elasticsearch` the following arguments are supported:
* `args.password` Password of the elasticsearch instance to connect to. Mandatory when `args.api_key` is not provided. Will take precedence over `args.api_key` if both are defined.
* `args.api_key`: API key of elasticsearch endpoint in the format `base64encode(api_key_id:api_key_secret)`. Mandatory when `args.username` and `args.password` are not provided. Will be ignored if `args.username`/`args.password` are defined.
* `args.es_datastream_name`: Name of data stream or index where logs should be forwarded to. Lambda supports automatic routing of various {aws} service logs to the corresponding data streams for further processing and storage in the {es} cluster. It supports automatic routing of `aws.cloudtrail`, `aws.cloudwatch_logs`, `aws.elb_logs`, `aws.firewall_logs`, `aws.vpcflow`, and `aws.waf` logs. For other log types, if using data streams, you can optionally set its value in the configuration file according to the naming convention for data streams and available integrations. If the `es_datastream_name` is not specified and it cannot be matched with any of the above {aws} services, then the value will be set to `logs-generic-default`. In versions **v0.29.1** and below, this configuration parameter was named `es_index_or_datastream_name`. Rename the configuration parameter to `es_datastream_name` in your `config.yaml` file on the S3 bucket to continue using it in the future version. The older name `es_index_or_datastream_name` is deprecated as of version **v0.30.0**. The related backward compatibility code is removed from version **v1.0.0**.
* `args.es_dead_letter_index`: Name of data stream or index where logs should be redirected to, in case indexing to `args.es_datastream_name` returned an error.
* `args.batch_max_actions`: (Optional) Maximum number of actions to send in a single bulk request. Default value: 500.
* `args.batch_max_bytes`: (Optional) Maximum size in bytes to send in a single bulk request. Default value: 10485760 (10MB).
* `args.ssl_assert_fingerprint`: (Optional) SSL fingerprint for self-signed SSL certificate on HTTPS transport. The default value is an empty string, meaning the HTTP client requires a valid certificate.
Expand Down
13 changes: 13 additions & 0 deletions share/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(
batch_max_actions: int = 500,
batch_max_bytes: int = 10 * 1024 * 1024,
ssl_assert_fingerprint: str = "",
es_dead_letter_index: str = "",
):
super().__init__(output_type="elasticsearch")
self.elasticsearch_url = elasticsearch_url
Expand All @@ -62,6 +63,7 @@ def __init__(
self.batch_max_actions = batch_max_actions
self.batch_max_bytes = batch_max_bytes
self.ssl_assert_fingerprint = ssl_assert_fingerprint
self.es_dead_letter_index = es_dead_letter_index

if self.cloud_id and self.elasticsearch_url:
shared_logger.warning("both `elasticsearch_url` and `cloud_id` set in config: using `elasticsearch_url`")
Expand Down Expand Up @@ -182,6 +184,17 @@ def ssl_assert_fingerprint(self, value: str) -> None:

self._ssl_assert_fingerprint = value

@property
def es_dead_letter_index(self) -> str:
return self._es_dead_letter_index

@es_dead_letter_index.setter
def es_dead_letter_index(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`es_dead_letter_index` must be provided as string")

self._es_dead_letter_index = value


class LogstashOutput(Output):
def __init__(
Expand Down
2 changes: 1 addition & 1 deletion share/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.

version = "1.16.0"
version = "1.17.0"
106 changes: 87 additions & 19 deletions shippers/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.

import datetime
import uuid
from typing import Any, Dict, Optional, Union

import elasticapm # noqa: F401
Expand Down Expand Up @@ -58,6 +60,7 @@ def __init__(
cloud_id: str = "",
api_key: str = "",
es_datastream_name: str = "",
es_dead_letter_index: str = "",
tags: list[str] = [],
batch_max_actions: int = 500,
batch_max_bytes: int = 10 * 1024 * 1024,
Expand Down Expand Up @@ -108,6 +111,7 @@ def __init__(
self._event_id_generator: Optional[EventIdGeneratorCallable] = None

self._es_datastream_name = es_datastream_name
self._es_dead_letter_index = es_dead_letter_index
self._tags = tags

self._es_index = ""
Expand Down Expand Up @@ -153,13 +157,13 @@ def _enrich_event(self, event_payload: dict[str, Any]) -> None:

event_payload["tags"] += self._tags

def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None:
def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Union[int, list[Any]]]) -> list[Any]:
assert isinstance(errors[1], list)

success = errors[0]
failed = len(errors[1])
failed: list[Any] = []
for error in errors[1]:
action_failed = [action for action in self._bulk_actions if action["_id"] == error["create"]["_id"]]
action_failed = [action for action in actions if action["_id"] == error["create"]["_id"]]
# an ingestion pipeline might override the _id, we can only skip in this case
if len(action_failed) != 1:
continue
Expand All @@ -169,20 +173,17 @@ def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None:
)

if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT:
# Skip duplicate events on replay queue
# Skip duplicate events on dead letter index and replay queue
continue

shared_logger.debug("elasticsearch shipper", extra={"action": action_failed[0]})
if self._replay_handler is not None:
self._replay_handler(self._output_destination, self._replay_args, action_failed[0])
failed.append({"error": error["create"]["error"], "action": action_failed[0]})

if failed > 0:
shared_logger.warning("elasticsearch shipper", extra={"success": success, "failed": failed})
return

shared_logger.info("elasticsearch shipper", extra={"success": success, "failed": failed})
if len(failed) > 0:
shared_logger.warning("elasticsearch shipper", extra={"success": success, "failed": len(failed)})
else:
shared_logger.info("elasticsearch shipper", extra={"success": success, "failed": len(failed)})

return
return failed

def set_event_id_generator(self, event_id_generator: EventIdGeneratorCallable) -> None:
self._event_id_generator = event_id_generator
Expand Down Expand Up @@ -213,21 +214,88 @@ def send(self, event: dict[str, Any]) -> str:
if len(self._bulk_actions) < self._bulk_batch_size:
return _EVENT_BUFFERED

errors = es_bulk(self._es_client, self._bulk_actions, **self._bulk_kwargs)
self._handle_outcome(errors=errors)
self._bulk_actions = []
self.flush()

return _EVENT_SENT

def flush(self) -> None:
if len(self._bulk_actions) > 0:
errors = es_bulk(self._es_client, self._bulk_actions, **self._bulk_kwargs)
self._handle_outcome(errors=errors)
if len(self._bulk_actions) == 0:
return

errors = es_bulk(self._es_client, self._bulk_actions, **self._bulk_kwargs)
failed = self._handle_outcome(actions=self._bulk_actions, errors=errors)

# Send failed requests to dead letter index, if enabled
if len(failed) > 0 and self._es_dead_letter_index:
failed = self._send_dead_letter_index(failed)

# Send remaining failed requests to replay queue, if enabled
if isinstance(failed, list) and len(failed) > 0 and self._replay_handler is not None:
for outcome in failed:
if "action" not in outcome:
shared_logger.error("action could not be extracted to be replayed", extra={"outcome": outcome})
continue

self._replay_handler(self._output_destination, self._replay_args, outcome["action"])

self._bulk_actions = []

return

def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]:
encoded_actions = []
dead_letter_errors: list[Any] = []
for action in actions:
# Reshape event to dead letter index
encoded = self._encode_dead_letter(action)
if not encoded:
shared_logger.error("cannot encode dead letter index event from payload", extra={"action": action})
dead_letter_errors.append(action)

encoded_actions.append(encoded)

# If no action can be encoded, return original action list as failed
if len(encoded_actions) == 0:
return dead_letter_errors

errors = es_bulk(self._es_client, encoded_actions, **self._bulk_kwargs)
failed = self._handle_outcome(actions=encoded_actions, errors=errors)

if not isinstance(failed, list) or len(failed) == 0:
return dead_letter_errors

for action in failed:
event_payload = self._decode_dead_letter(action)

if not event_payload:
shared_logger.error("cannot decode dead letter index event from payload", extra={"action": action})
continue

dead_letter_errors.append(event_payload)

return dead_letter_errors

def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]:
if "action" not in outcome or "error" not in outcome:
return {}

# Assign random id in case bulk() results in error, it can be matched to the original
# action
return {
"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"_id": f"{uuid.uuid4()}",
"_index": self._es_dead_letter_index,
"_op_type": "create",
"message": json_dumper(outcome["action"]),
"error": outcome["error"],
}

def _decode_dead_letter(self, dead_letter_outcome: dict[str, Any]) -> dict[str, Any]:
if "action" not in dead_letter_outcome or "message" not in dead_letter_outcome["action"]:
return {}

return {"action": json_parser(dead_letter_outcome["action"]["message"])}

def _discover_dataset(self, event_payload: Dict[str, Any]) -> None:
if self._es_datastream_name != "":
if self._es_datastream_name.startswith("logs-"):
Expand Down
1 change: 1 addition & 0 deletions shippers/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def create_from_output(output_type: str, output: Output) -> ProtocolShipper:
batch_max_actions=output.batch_max_actions,
batch_max_bytes=output.batch_max_bytes,
ssl_assert_fingerprint=output.ssl_assert_fingerprint,
es_dead_letter_index=output.es_dead_letter_index,
)

if output_type == "logstash":
Expand Down
Loading

0 comments on commit 1962405

Please sign in to comment.