Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dead letter index for ES outputs #733

Merged
merged 18 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions share/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(
batch_max_actions: int = 500,
batch_max_bytes: int = 10 * 1024 * 1024,
ssl_assert_fingerprint: str = "",
es_dead_letter_index: str = "",
):
super().__init__(output_type="elasticsearch")
self.elasticsearch_url = elasticsearch_url
Expand All @@ -62,6 +63,7 @@ def __init__(
self.batch_max_actions = batch_max_actions
self.batch_max_bytes = batch_max_bytes
self.ssl_assert_fingerprint = ssl_assert_fingerprint
self.es_dead_letter_index = es_dead_letter_index

if self.cloud_id and self.elasticsearch_url:
shared_logger.warning("both `elasticsearch_url` and `cloud_id` set in config: using `elasticsearch_url`")
Expand Down Expand Up @@ -182,6 +184,17 @@ def ssl_assert_fingerprint(self, value: str) -> None:

self._ssl_assert_fingerprint = value

@property
def es_dead_letter_index(self) -> str:
return self._es_dead_letter_index

@es_dead_letter_index.setter
def es_dead_letter_index(self, value: str) -> None:
if not isinstance(value, str):
raise ValueError("`es_dead_letter_index` must be provided as string")

self._es_dead_letter_index = value


class LogstashOutput(Output):
def __init__(
Expand Down
107 changes: 90 additions & 17 deletions shippers/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.

import datetime
from typing import Any, Dict, Optional, Union
import uuid

import elasticapm # noqa: F401
from elasticsearch import Elasticsearch
Expand All @@ -19,6 +21,7 @@

_EVENT_BUFFERED = "_EVENT_BUFFERED"
_EVENT_SENT = "_EVENT_SENT"
_VERSION_CONFLICT = 409


class JSONSerializer(Serializer):
Expand Down Expand Up @@ -57,6 +60,7 @@ def __init__(
cloud_id: str = "",
api_key: str = "",
es_datastream_name: str = "",
es_dead_letter_index: str = "",
tags: list[str] = [],
batch_max_actions: int = 500,
batch_max_bytes: int = 10 * 1024 * 1024,
Expand Down Expand Up @@ -107,6 +111,7 @@ def __init__(
self._event_id_generator: Optional[EventIdGeneratorCallable] = None

self._es_datastream_name = es_datastream_name
self._es_dead_letter_index = es_dead_letter_index
self._tags = tags

self._es_index = ""
Expand Down Expand Up @@ -152,31 +157,33 @@ def _enrich_event(self, event_payload: dict[str, Any]) -> None:

event_payload["tags"] += self._tags

def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None:
def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Union[int, list[Any]]]) -> list[Any]:
assert isinstance(errors[1], list)

success = errors[0]
failed = len(errors[1])
failed: list[Any] = []
for error in errors[1]:
action_failed = [action for action in self._bulk_actions if action["_id"] == error["create"]["_id"]]
action_failed = [action for action in actions if action["_id"] == error["create"]["_id"]]
# an ingestion pipeline might override the _id, we can only skip in this case
if len(action_failed) != 1:
continue

shared_logger.warning(
"elasticsearch shipper", extra={"error": error["create"]["error"], "_id": error["create"]["_id"]}
)
shared_logger.debug("elasticsearch shipper", extra={"action": action_failed[0]})
if self._replay_handler is not None:
self._replay_handler(self._output_destination, self._replay_args, action_failed[0])

if failed > 0:
shared_logger.warning("elasticsearch shipper", extra={"success": success, "failed": failed})
return
if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT:
# Skip duplicate events on dead letter index and replay queue
continue

shared_logger.info("elasticsearch shipper", extra={"success": success, "failed": failed})
failed.append({"error": error["create"]["error"], "action": action_failed[0]})

return
if len(failed) > 0:
shared_logger.warning("elasticsearch shipper", extra={"success": success, "failed": len(failed)})
else:
shared_logger.info("elasticsearch shipper", extra={"success": success, "failed": len(failed)})

return failed

def set_event_id_generator(self, event_id_generator: EventIdGeneratorCallable) -> None:
self._event_id_generator = event_id_generator
Expand Down Expand Up @@ -207,21 +214,87 @@ def send(self, event: dict[str, Any]) -> str:
if len(self._bulk_actions) < self._bulk_batch_size:
return _EVENT_BUFFERED

errors = es_bulk(self._es_client, self._bulk_actions, **self._bulk_kwargs)
self._handle_outcome(errors=errors)
self._bulk_actions = []
self.flush()

return _EVENT_SENT

def flush(self) -> None:
if len(self._bulk_actions) > 0:
errors = es_bulk(self._es_client, self._bulk_actions, **self._bulk_kwargs)
self._handle_outcome(errors=errors)
if len(self._bulk_actions) == 0:
return

errors = es_bulk(self._es_client, self._bulk_actions, **self._bulk_kwargs)
failed = self._handle_outcome(actions=self._bulk_actions, errors=errors)

# Send failed requests to dead letter index, if enabled
if len(failed) > 0 and self._es_dead_letter_index:
failed = self._send_dead_letter_index(failed)

# Send remaining failed requests to replay queue, if enabled
if isinstance(failed, list) and len(failed) > 0 and self._replay_handler is not None:
for outcome in failed:
if "action" not in outcome:
shared_logger.error("action could not be extracted to be replayed", extra={"outcome": outcome})
continue

self._replay_handler(self._output_destination, self._replay_args, outcome["action"])

self._bulk_actions = []

return

def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]:
encoded_actions = []
dead_letter_errors = []
for action in actions:
# Reshape event to dead letter index
encoded = self._encode_dead_letter(action)
if encoded is None:
shared_logger.error("cannot encode dead letter index event from payload", extra={"action": action})
dead_letter_errors.append(action)

encoded_actions.append(encoded)

# If no action can be encoded, return original action list as failed
if len(encoded_actions) == 0:
return dead_letter_errors

errors = es_bulk(self._es_client, encoded_actions, **self._bulk_kwargs)
failed = self._handle_outcome(actions=encoded_actions, errors=errors)

if not isinstance(failed, list) or len(failed) == 0:
return []
emilioalvap marked this conversation as resolved.
Show resolved Hide resolved

for action in failed:
event_payload = self._decode_dead_letter(action)

if event_payload is None:
continue

dead_letter_errors.append(event_payload)

return dead_letter_errors

def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]:
if "action" not in outcome or "error" not in outcome:
return

# Assign random id in case bulk() results in error, it can be matched to the original
# action
return {
"@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"_id": f'{uuid.uuid4()}',
"_index": self._es_dead_letter_index,
"_op_type": "create",
"message": json_dumper(outcome["action"]),
"error": outcome["error"],
}

def _decode_dead_letter(self, dead_letter_outcome: dict[str, Any]) -> dict[str, Any]:
if "action" not in dead_letter_outcome or "message" not in dead_letter_outcome["action"]:
return

return {"action": json_parser(dead_letter_outcome["action"]["message"])}

def _discover_dataset(self, event_payload: Dict[str, Any]) -> None:
if self._es_datastream_name != "":
if self._es_datastream_name.startswith("logs-"):
Expand Down
1 change: 1 addition & 0 deletions shippers/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def create_from_output(output_type: str, output: Output) -> ProtocolShipper:
batch_max_actions=output.batch_max_actions,
batch_max_bytes=output.batch_max_bytes,
ssl_assert_fingerprint=output.ssl_assert_fingerprint,
es_dead_letter_index=output.es_dead_letter_index,
)

if output_type == "logstash":
Expand Down
Loading
Loading