diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ab40f15..d2a73c1a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +### v1.17.0 - 2024/07/10 +##### Features +* Add dead letter index for ES outputs [733](https://github.com/elastic/elastic-serverless-forwarder/pull/733). + ### v1.16.0 - 2024/07/09 ##### Features * Prevent duplicate _id events from reaching the replay queue [729](https://github.com/elastic/elastic-serverless-forwarder/pull/729). diff --git a/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc b/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc index f34e2364..f16e75b0 100644 --- a/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc +++ b/docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc @@ -75,6 +75,7 @@ inputs: username: "username" password: "password" es_datastream_name: "logs-generic-default" + es_dead_letter_index: "esf-dead-letter-index" # optional batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -98,6 +99,7 @@ inputs: username: "username" password: "password" es_datastream_name: "logs-generic-default" + es_dead_letter_index: "esf-dead-letter-index" # optional batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -121,6 +123,7 @@ inputs: username: "username" password: "password" es_datastream_name: "logs-generic-default" + es_dead_letter_index: "esf-dead-letter-index" # optional batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -144,6 +147,7 @@ inputs: username: "username" password: "password" es_datastream_name: "logs-generic-default" + es_dead_letter_index: "esf-dead-letter-index" # optional batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -167,6 +171,7 @@ inputs: username: "username" password: "password" es_datastream_name: "logs-generic-default" + es_dead_letter_index: "esf-dead-letter-index" # optional batch_max_actions: 500 # optional: default value is 500 batch_max_bytes: 10485760 # optional: default value is 10485760 - type: "logstash" @@ -224,6 +229,7 @@ For `elasticsearch` the following arguments are supported: * `args.password` Password of the elasticsearch instance to connect to. Mandatory when `args.api_key` is not provided. Will take precedence over `args.api_key` if both are defined. * `args.api_key`: API key of elasticsearch endpoint in the format `base64encode(api_key_id:api_key_secret)`. Mandatory when `args.username` and `args.password` are not provided. Will be ignored if `args.username`/`args.password` are defined. * `args.es_datastream_name`: Name of data stream or index where logs should be forwarded to. Lambda supports automatic routing of various {aws} service logs to the corresponding data streams for further processing and storage in the {es} cluster. It supports automatic routing of `aws.cloudtrail`, `aws.cloudwatch_logs`, `aws.elb_logs`, `aws.firewall_logs`, `aws.vpcflow`, and `aws.waf` logs. For other log types, if using data streams, you can optionally set its value in the configuration file according to the naming convention for data streams and available integrations. If the `es_datastream_name` is not specified and it cannot be matched with any of the above {aws} services, then the value will be set to `logs-generic-default`. In versions **v0.29.1** and below, this configuration parameter was named `es_index_or_datastream_name`. Rename the configuration parameter to `es_datastream_name` in your `config.yaml` file on the S3 bucket to continue using it in the future version. The older name `es_index_or_datastream_name` is deprecated as of version **v0.30.0**. The related backward compatibility code is removed from version **v1.0.0**. + * `args.es_dead_letter_index`: Name of data stream or index where logs should be redirected to, in case indexing to `args.es_datastream_name` returned an error. * `args.batch_max_actions`: (Optional) Maximum number of actions to send in a single bulk request. Default value: 500. * `args.batch_max_bytes`: (Optional) Maximum size in bytes to send in a single bulk request. Default value: 10485760 (10MB). * `args.ssl_assert_fingerprint`: (Optional) SSL fingerprint for self-signed SSL certificate on HTTPS transport. The default value is an empty string, meaning the HTTP client requires a valid certificate. diff --git a/share/config.py b/share/config.py index f9ce3fe2..dc38a398 100644 --- a/share/config.py +++ b/share/config.py @@ -50,6 +50,7 @@ def __init__( batch_max_actions: int = 500, batch_max_bytes: int = 10 * 1024 * 1024, ssl_assert_fingerprint: str = "", + es_dead_letter_index: str = "", ): super().__init__(output_type="elasticsearch") self.elasticsearch_url = elasticsearch_url @@ -62,6 +63,7 @@ def __init__( self.batch_max_actions = batch_max_actions self.batch_max_bytes = batch_max_bytes self.ssl_assert_fingerprint = ssl_assert_fingerprint + self.es_dead_letter_index = es_dead_letter_index if self.cloud_id and self.elasticsearch_url: shared_logger.warning("both `elasticsearch_url` and `cloud_id` set in config: using `elasticsearch_url`") @@ -182,6 +184,17 @@ def ssl_assert_fingerprint(self, value: str) -> None: self._ssl_assert_fingerprint = value + @property + def es_dead_letter_index(self) -> str: + return self._es_dead_letter_index + + @es_dead_letter_index.setter + def es_dead_letter_index(self, value: str) -> None: + if not isinstance(value, str): + raise ValueError("`es_dead_letter_index` must be provided as string") + + self._es_dead_letter_index = value + class LogstashOutput(Output): def __init__( diff --git a/share/version.py b/share/version.py index aec73094..95f01911 100644 --- a/share/version.py +++ b/share/version.py @@ -2,4 +2,4 @@ # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. -version = "1.16.0" +version = "1.17.0" diff --git a/shippers/es.py b/shippers/es.py index 71b07782..43d86e1b 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -2,6 +2,8 @@ # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. +import datetime +import uuid from typing import Any, Dict, Optional, Union import elasticapm # noqa: F401 @@ -58,6 +60,7 @@ def __init__( cloud_id: str = "", api_key: str = "", es_datastream_name: str = "", + es_dead_letter_index: str = "", tags: list[str] = [], batch_max_actions: int = 500, batch_max_bytes: int = 10 * 1024 * 1024, @@ -108,6 +111,7 @@ def __init__( self._event_id_generator: Optional[EventIdGeneratorCallable] = None self._es_datastream_name = es_datastream_name + self._es_dead_letter_index = es_dead_letter_index self._tags = tags self._es_index = "" @@ -153,13 +157,13 @@ def _enrich_event(self, event_payload: dict[str, Any]) -> None: event_payload["tags"] += self._tags - def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None: + def _handle_outcome(self, actions: list[dict[str, Any]], errors: tuple[int, Union[int, list[Any]]]) -> list[Any]: assert isinstance(errors[1], list) success = errors[0] - failed = len(errors[1]) + failed: list[Any] = [] for error in errors[1]: - action_failed = [action for action in self._bulk_actions if action["_id"] == error["create"]["_id"]] + action_failed = [action for action in actions if action["_id"] == error["create"]["_id"]] # an ingestion pipeline might override the _id, we can only skip in this case if len(action_failed) != 1: continue @@ -169,20 +173,17 @@ def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None: ) if "status" in error["create"] and error["create"]["status"] == _VERSION_CONFLICT: - # Skip duplicate events on replay queue + # Skip duplicate events on dead letter index and replay queue continue - shared_logger.debug("elasticsearch shipper", extra={"action": action_failed[0]}) - if self._replay_handler is not None: - self._replay_handler(self._output_destination, self._replay_args, action_failed[0]) + failed.append({"error": error["create"]["error"], "action": action_failed[0]}) - if failed > 0: - shared_logger.warning("elasticsearch shipper", extra={"success": success, "failed": failed}) - return - - shared_logger.info("elasticsearch shipper", extra={"success": success, "failed": failed}) + if len(failed) > 0: + shared_logger.warning("elasticsearch shipper", extra={"success": success, "failed": len(failed)}) + else: + shared_logger.info("elasticsearch shipper", extra={"success": success, "failed": len(failed)}) - return + return failed def set_event_id_generator(self, event_id_generator: EventIdGeneratorCallable) -> None: self._event_id_generator = event_id_generator @@ -213,21 +214,88 @@ def send(self, event: dict[str, Any]) -> str: if len(self._bulk_actions) < self._bulk_batch_size: return _EVENT_BUFFERED - errors = es_bulk(self._es_client, self._bulk_actions, **self._bulk_kwargs) - self._handle_outcome(errors=errors) - self._bulk_actions = [] + self.flush() return _EVENT_SENT def flush(self) -> None: - if len(self._bulk_actions) > 0: - errors = es_bulk(self._es_client, self._bulk_actions, **self._bulk_kwargs) - self._handle_outcome(errors=errors) + if len(self._bulk_actions) == 0: + return + + errors = es_bulk(self._es_client, self._bulk_actions, **self._bulk_kwargs) + failed = self._handle_outcome(actions=self._bulk_actions, errors=errors) + + # Send failed requests to dead letter index, if enabled + if len(failed) > 0 and self._es_dead_letter_index: + failed = self._send_dead_letter_index(failed) + + # Send remaining failed requests to replay queue, if enabled + if isinstance(failed, list) and len(failed) > 0 and self._replay_handler is not None: + for outcome in failed: + if "action" not in outcome: + shared_logger.error("action could not be extracted to be replayed", extra={"outcome": outcome}) + continue + + self._replay_handler(self._output_destination, self._replay_args, outcome["action"]) self._bulk_actions = [] return + def _send_dead_letter_index(self, actions: list[Any]) -> list[Any]: + encoded_actions = [] + dead_letter_errors: list[Any] = [] + for action in actions: + # Reshape event to dead letter index + encoded = self._encode_dead_letter(action) + if not encoded: + shared_logger.error("cannot encode dead letter index event from payload", extra={"action": action}) + dead_letter_errors.append(action) + + encoded_actions.append(encoded) + + # If no action can be encoded, return original action list as failed + if len(encoded_actions) == 0: + return dead_letter_errors + + errors = es_bulk(self._es_client, encoded_actions, **self._bulk_kwargs) + failed = self._handle_outcome(actions=encoded_actions, errors=errors) + + if not isinstance(failed, list) or len(failed) == 0: + return dead_letter_errors + + for action in failed: + event_payload = self._decode_dead_letter(action) + + if not event_payload: + shared_logger.error("cannot decode dead letter index event from payload", extra={"action": action}) + continue + + dead_letter_errors.append(event_payload) + + return dead_letter_errors + + def _encode_dead_letter(self, outcome: dict[str, Any]) -> dict[str, Any]: + if "action" not in outcome or "error" not in outcome: + return {} + + # Assign random id in case bulk() results in error, it can be matched to the original + # action + return { + "@timestamp": datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "_id": f"{uuid.uuid4()}", + "_index": self._es_dead_letter_index, + "_op_type": "create", + "message": json_dumper(outcome["action"]), + "error": outcome["error"], + } + + def _decode_dead_letter(self, dead_letter_outcome: dict[str, Any]) -> dict[str, Any]: + if "action" not in dead_letter_outcome or "message" not in dead_letter_outcome["action"]: + return {} + + return {"action": json_parser(dead_letter_outcome["action"]["message"])} + def _discover_dataset(self, event_payload: Dict[str, Any]) -> None: if self._es_datastream_name != "": if self._es_datastream_name.startswith("logs-"): diff --git a/shippers/factory.py b/shippers/factory.py index ecd02919..0f444434 100644 --- a/shippers/factory.py +++ b/shippers/factory.py @@ -48,6 +48,7 @@ def create_from_output(output_type: str, output: Output) -> ProtocolShipper: batch_max_actions=output.batch_max_actions, batch_max_bytes=output.batch_max_bytes, ssl_assert_fingerprint=output.ssl_assert_fingerprint, + es_dead_letter_index=output.es_dead_letter_index, ) if output_type == "logstash": diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index d5d0e9b1..f56beb43 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -4195,3 +4195,225 @@ def test_es_version_conflict_exception(self) -> None: # Test no duplicate events end in the replay queue events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) assert len(events["Records"]) == 0 + + def test_es_dead_letter_index(self) -> None: + assert isinstance(self.elasticsearch, ElasticsearchContainer) + assert isinstance(self.localstack, LocalStackContainer) + + sqs_queue_name = _time_based_id(suffix="source-sqs") + sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) + + dead_letter_index_name = "logs-generic-default-dli" + + sqs_queue_arn = sqs_queue["QueueArn"] + sqs_queue_url = sqs_queue["QueueUrl"] + sqs_queue_url_path = sqs_queue["QueueUrlPath"] + + config_yaml: str = f""" + inputs: + - type: sqs + id: "{sqs_queue_arn}" + tags: {self.default_tags} + outputs: + - type: "elasticsearch" + args: + elasticsearch_url: "{self.elasticsearch.get_url()}" + es_dead_letter_index: "{dead_letter_index_name}" + ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} + username: "{self.secret_arn}:username" + password: "{self.secret_arn}:password" + """ + + config_file_path = "config.yaml" + config_bucket_name = _time_based_id(suffix="config-bucket") + _s3_upload_content_to_bucket( + client=self.s3_client, + content=config_yaml.encode("utf-8"), + content_type="text/plain", + bucket_name=config_bucket_name, + key=config_file_path, + ) + + os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" + + fixtures = [ + _load_file_fixture("cloudwatch-log-1.json"), + ] + + _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) + + event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) + message_id = event["Records"][0]["messageId"] + + # Create pipeline to reject documents + processors = { + "processors": [ + { + "fail": { + "message": "test_es_non_indexable_dead_letter_index fail message", + } + }, + ] + } + + self.elasticsearch.put_pipeline(id="test_es_non_indexable_dead_letter_index_fail_pipeline", body=processors) + + self.elasticsearch.create_data_stream(name="logs-generic-default") + self.elasticsearch.put_settings( + index="logs-generic-default", + body={"index.default_pipeline": "test_es_non_indexable_dead_letter_index_fail_pipeline"}, + ) + + self.elasticsearch.refresh(index="logs-generic-default") + + self.elasticsearch.create_data_stream(name=dead_letter_index_name) + + ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) + first_call = handler(event, ctx) # type:ignore + + assert first_call == "completed" + + # Test document has been rejected from target index + self.elasticsearch.refresh(index="logs-generic-default") + + assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0 + + # Test document has been redirected to dli + assert self.elasticsearch.exists(index=dead_letter_index_name) is True + + self.elasticsearch.refresh(index=dead_letter_index_name) + + assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 1 + + res = self.elasticsearch.search(index=dead_letter_index_name, sort="_seq_no") + + assert res["hits"]["total"] == {"value": 1, "relation": "eq"} + + assert ( + res["hits"]["hits"][0]["_source"]["error"]["reason"] + == "test_es_non_indexable_dead_letter_index fail message" + ) + assert res["hits"]["hits"][0]["_source"]["error"]["type"] == "fail_processor_exception" + dead_letter_message = json_parser(res["hits"]["hits"][0]["_source"]["message"]) + assert dead_letter_message["log"]["offset"] == 0 + assert dead_letter_message["log"]["file"]["path"] == sqs_queue_url_path + assert dead_letter_message["aws"]["sqs"]["name"] == sqs_queue_name + assert dead_letter_message["aws"]["sqs"]["message_id"] == message_id + assert dead_letter_message["cloud"]["provider"] == "aws" + assert dead_letter_message["cloud"]["region"] == "us-east-1" + assert dead_letter_message["cloud"]["account"]["id"] == "000000000000" + assert dead_letter_message["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"] + + # Test event does not go into the replay queue + events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) + + assert len(events["Records"]) == 0 + + def test_es_non_indexable_dead_letter_index(self) -> None: + assert isinstance(self.elasticsearch, ElasticsearchContainer) + assert isinstance(self.localstack, LocalStackContainer) + + sqs_queue_name = _time_based_id(suffix="source-sqs") + sqs_queue = _sqs_create_queue(self.sqs_client, sqs_queue_name, self.localstack.get_url()) + + dead_letter_index_name = "logs-generic-default-dli" + + sqs_queue_arn = sqs_queue["QueueArn"] + sqs_queue_url = sqs_queue["QueueUrl"] + sqs_queue_url_path = sqs_queue["QueueUrlPath"] + + config_yaml: str = f""" + inputs: + - type: sqs + id: "{sqs_queue_arn}" + tags: {self.default_tags} + outputs: + - type: "elasticsearch" + args: + elasticsearch_url: "{self.elasticsearch.get_url()}" + es_dead_letter_index: "{dead_letter_index_name}" + ssl_assert_fingerprint: {self.elasticsearch.ssl_assert_fingerprint} + username: "{self.secret_arn}:username" + password: "{self.secret_arn}:password" + """ + + config_file_path = "config.yaml" + config_bucket_name = _time_based_id(suffix="config-bucket") + _s3_upload_content_to_bucket( + client=self.s3_client, + content=config_yaml.encode("utf-8"), + content_type="text/plain", + bucket_name=config_bucket_name, + key=config_file_path, + ) + + os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}" + + fixtures = [ + _load_file_fixture("cloudwatch-log-1.json"), + ] + + _sqs_send_messages(self.sqs_client, sqs_queue_url, "".join(fixtures)) + + event, _ = _sqs_get_messages(self.sqs_client, sqs_queue_url, sqs_queue_arn) + message_id = event["Records"][0]["messageId"] + + # Create pipeline to reject documents + processors = { + "processors": [ + { + "fail": { + "message": "test_es_non_indexable_dead_letter_index", + } + }, + ] + } + + self.elasticsearch.put_pipeline(id="test_es_non_indexable_dead_letter_index_fail_pipeline", body=processors) + + self.elasticsearch.create_data_stream(name="logs-generic-default") + self.elasticsearch.put_settings( + index="logs-generic-default", + body={"index.default_pipeline": "test_es_non_indexable_dead_letter_index_fail_pipeline"}, + ) + + self.elasticsearch.refresh(index="logs-generic-default") + + self.elasticsearch.create_data_stream(name=dead_letter_index_name) + self.elasticsearch.put_settings( + index=dead_letter_index_name, + body={"index.default_pipeline": "test_es_non_indexable_dead_letter_index_fail_pipeline"}, + ) + + ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m) + first_call = handler(event, ctx) # type:ignore + + assert first_call == "completed" + + # Test document has been rejected from target index + self.elasticsearch.refresh(index="logs-generic-default") + + assert self.elasticsearch.count(index="logs-generic-default")["count"] == 0 + + # Test event does not go into the dead letter queue + assert self.elasticsearch.exists(index=dead_letter_index_name) is True + + self.elasticsearch.refresh(index=dead_letter_index_name) + + assert self.elasticsearch.count(index=dead_letter_index_name)["count"] == 0 + + # Test event has been redirected into the replay queue + events, _ = _sqs_get_messages(self.sqs_client, os.environ["SQS_REPLAY_URL"], self.sqs_replay_queue_arn) + assert len(events["Records"]) == 1 + + first_body: dict[str, Any] = json_parser(events["Records"][0]["body"]) + + assert first_body["event_payload"]["message"] == fixtures[0].rstrip("\n") + assert first_body["event_payload"]["log"]["offset"] == 0 + assert first_body["event_payload"]["log"]["file"]["path"] == sqs_queue_url_path + assert first_body["event_payload"]["aws"]["sqs"]["name"] == sqs_queue_name + assert first_body["event_payload"]["aws"]["sqs"]["message_id"] == message_id + assert first_body["event_payload"]["cloud"]["provider"] == "aws" + assert first_body["event_payload"]["cloud"]["region"] == "us-east-1" + assert first_body["event_payload"]["cloud"]["account"]["id"] == "000000000000" + assert first_body["event_payload"]["tags"] == ["forwarded", "generic", "tag1", "tag2", "tag3"]