diff --git a/CHANGELOG.md b/CHANGELOG.md index 02c81b06..9843495b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +### v1.15.0 - 2024/05/29 +##### Features +* Enable multiple outputs for each input []() + ### v1.14.0 - 2024/05/07 ##### Bug fixes * Report misconfigured input ids as an error instead of warning, and place those messages in the replaying queue [#711](https://github.com/elastic/elastic-serverless-forwarder/pull/711). diff --git a/handlers/aws/handler.py b/handlers/aws/handler.py index d208b083..2477fa37 100644 --- a/handlers/aws/handler.py +++ b/handlers/aws/handler.py @@ -53,7 +53,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex try: trigger_type, config_source = get_trigger_type_and_config_source(lambda_event) - shared_logger.info(f"trigger TYPE IS {trigger_type}", extra={"type": trigger_type}) + shared_logger.info("trigger", extra={"type": trigger_type}) except Exception as e: raise TriggerTypeException(e) @@ -78,31 +78,21 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex sqs_client = get_sqs_client() if trigger_type == "replay-sqs": - shared_logger.info("TRIGGERED THE REPLAY SQS.") - shared_logger.info("trigger", extra={"size": len(lambda_event["Records"])}) replay_queue_arn = lambda_event["Records"][0]["eventSourceARN"] replay_handler = ReplayedEventReplayHandler(replay_queue_arn=replay_queue_arn) shipper_cache: dict[str, CompositeShipper] = {} for replay_record in lambda_event["Records"]: - # TODO How to trigger this... event = json_parser(replay_record["body"]) - - shared_logger.info(f">> TRIGGERED THE REPLAY SQS: event is {event}") - input_id = event["event_input_id"] - output_type = event["output_type"] - shipper_id = input_id + output_type + output_destination = event["output_destination"] + shipper_id = input_id + output_destination if shipper_id not in shipper_cache: - - # IN THE DLQ - shipper = get_shipper_for_replay_event( config=config, - #output_type=output_type, - output_destination="TODO", + output_destination=output_destination, output_args=event["output_args"], event_input_id=input_id, replay_handler=replay_handler, @@ -111,7 +101,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex if shipper is None: shared_logger.warning( "no shipper for output in replay queue", - extra={"output_type": event["output_type"], "event_input_id": event["event_input_id"]}, + extra={"output_destination": event["output_destination"], "event_input_id": event["event_input_id"]}, ) continue @@ -122,7 +112,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex assert isinstance(shipper, CompositeShipper) shipper.send(event["event_payload"]) - event_uniq_id: str = event["event_payload"]["_id"] + output_type + event_uniq_id: str = event["event_payload"]["_id"] + output_destination replay_handler.add_event_with_receipt_handle( event_uniq_id=event_uniq_id, receipt_handle=replay_record["receiptHandle"] ) diff --git a/handlers/aws/replay_trigger.py b/handlers/aws/replay_trigger.py index 2a32b245..7fde1d97 100644 --- a/handlers/aws/replay_trigger.py +++ b/handlers/aws/replay_trigger.py @@ -20,8 +20,8 @@ def __init__(self, replay_queue_arn: str): def add_event_with_receipt_handle(self, event_uniq_id: str, receipt_handle: str) -> None: self._events_with_receipt_handle[event_uniq_id] = receipt_handle - def replay_handler(self, output_type: str, output_args: dict[str, Any], event_payload: dict[str, Any]) -> None: - event_uniq_id: str = event_payload["_id"] + output_type + def replay_handler(self, output_destination: str, output_args: dict[str, Any], event_payload: dict[str, Any]) -> None: + event_uniq_id: str = event_payload["_id"] + output_destination self._failed_event_ids.append(event_uniq_id) def flush(self) -> None: diff --git a/handlers/aws/utils.py b/handlers/aws/utils.py index 0e7d93a6..c15a23dc 100644 --- a/handlers/aws/utils.py +++ b/handlers/aws/utils.py @@ -294,21 +294,13 @@ def get_trigger_type_and_config_source(event: dict[str, Any]) -> tuple[str, str] event_source = "" first_record = event["Records"][0] - - shared_logger.info(f"FIRST RECORD IS {first_record}") - if "body" in first_record: event_body = first_record["body"] try: body = json_parser(event_body) - - print(f">>> EVENT IN JSON {body}") - - # When could this happen... Put wrong password for cloud - also no - if ( isinstance(body, dict) - and "output_type" in event_body + and "output_destination" in event_body and "output_args" in event_body and "event_payload" in event_body ): @@ -356,13 +348,13 @@ class ReplayEventHandler: def __init__(self, event_input: Input): self._event_input_id: str = event_input.id - def replay_handler(self, output_type: str, output_args: dict[str, Any], event_payload: dict[str, Any]) -> None: + def replay_handler(self, output_destination: str, output_args: dict[str, Any], event_payload: dict[str, Any]) -> None: sqs_replay_queue = os.environ["SQS_REPLAY_URL"] sqs_client = get_sqs_client() message_payload: dict[str, Any] = { - "output_type": output_type, + "output_destination": output_destination, "output_args": output_args, "event_payload": event_payload, "event_input_id": self._event_input_id, @@ -371,7 +363,7 @@ def replay_handler(self, output_type: str, output_args: dict[str, Any], event_pa sqs_client.send_message(QueueUrl=sqs_replay_queue, MessageBody=json_dumper(message_payload)) shared_logger.debug( - "sent to replay queue", extra={"output_type": output_type, "event_input_id": self._event_input_id} + "sent to replay queue", extra={"output_destination": output_destination, "event_input_id": self._event_input_id} ) diff --git a/share/config.py b/share/config.py index 1c87ca06..2a305ee6 100644 --- a/share/config.py +++ b/share/config.py @@ -407,7 +407,6 @@ def get_output_destinations(self) -> list[str]: return list(self._outputs.keys()) - # TODO def delete_output_by_destination(self, output_destination: str) -> None: """ Output deleter. @@ -431,7 +430,6 @@ def add_output(self, output_type: str, **kwargs: Any) -> None: raise ValueError("Either `elasticsearch_url` or `cloud_id` must be set") # elasticsearch_url takes precedence over cloud_id if "elasticsearch_url" not in kwargs: - output_dest = kwargs["cloud_id"] else: output_dest = kwargs["elasticsearch_url"] @@ -440,8 +438,6 @@ def add_output(self, output_type: str, **kwargs: Any) -> None: raise ValueError(f"Output type {output_type} requires logstash_url to be set") output_dest = kwargs["logstash_url"] - print("Outpus is ", self._outputs) - print(f"Output dest is {output_dest}") if output_dest in self._outputs: # Since logstash destination can only be set as logstash_url, we do not have to account # for the same url/cloud_id for both types logstash or elasticsearch diff --git a/share/version.py b/share/version.py index dcd8436d..8f38540c 100644 --- a/share/version.py +++ b/share/version.py @@ -2,4 +2,4 @@ # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. -version = "1.14.0" +version = "1.15.0" diff --git a/shippers/es.py b/shippers/es.py index 7b7997d6..123f6f99 100644 --- a/shippers/es.py +++ b/shippers/es.py @@ -80,8 +80,10 @@ def __init__( es_client_kwargs: dict[str, Any] = {} if elasticsearch_url: es_client_kwargs["hosts"] = [elasticsearch_url] + self._output_destination = elasticsearch_url elif cloud_id: es_client_kwargs["cloud_id"] = cloud_id + self._output_destination = cloud_id else: raise ValueError("You must provide one between elasticsearch_url or cloud_id") @@ -166,7 +168,7 @@ def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None: ) shared_logger.debug("elasticsearch shipper", extra={"action": action_failed[0]}) if self._replay_handler is not None: - self._replay_handler("elasticsearch", self._replay_args, action_failed[0]) + self._replay_handler(self._output_destination, self._replay_args, action_failed[0]) if failed > 0: shared_logger.warning("elasticsearch shipper", extra={"success": success, "failed": failed}) diff --git a/shippers/logstash.py b/shippers/logstash.py index 2a633129..e4fe5244 100644 --- a/shippers/logstash.py +++ b/shippers/logstash.py @@ -155,4 +155,4 @@ def _send(self) -> None: event["_id"] = event["@metadata"]["_id"] del event["@metadata"] - self._replay_handler("logstash", self._replay_args, event) + self._replay_handler(self._logstash_url, self._replay_args, event) diff --git a/tests/handlers/aws/test_handler.py b/tests/handlers/aws/test_handler.py index de9c83b7..68c901e9 100644 --- a/tests/handlers/aws/test_handler.py +++ b/tests/handlers/aws/test_handler.py @@ -420,7 +420,7 @@ def test_lambda_handler_noop(self) -> None: with self.subTest("output not elasticsearch from payload config"): with mock.patch( "handlers.aws.handler.get_shipper_for_replay_event", - lambda config, output_type, output_args, event_input_id, replay_handler: None, + lambda config, output_destination, output_args, event_input_id, replay_handler: None, ): ctx = ContextMock() event = { @@ -428,7 +428,7 @@ def test_lambda_handler_noop(self) -> None: { "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789:replay-queue", "receiptHandle": "receiptHandle", - "body": '{"output_type": "output_type", "output_args": {},' + "body": '{"output_destination": "output_destination", "output_args": {},' '"event_input_id": "arn:aws:sqs:eu-central-1:123456789:s3-sqs-queue", ' '"event_payload": {"_id": "_id"}}', } @@ -540,13 +540,13 @@ def test_lambda_handler_failure(self) -> None: { "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789:replay-queue", "receiptHandle": "receiptHandle", - "body": '{"output_type": "output_type", "output_args": {},' + "body": '{"output_destination": "output_destination", "output_args": {},' '"event_input_id": "arn:aws:dummy:eu-central-1:123456789:input", ' '"event_payload": {"_id": "_id"}}', } ] } - with self.assertRaisesRegex(OutputConfigException, "Cannot load output of type output_type"): + with self.assertRaisesRegex(OutputConfigException, "Cannot load output with destination output_destination"): ctx = ContextMock() handler(event, ctx) # type:ignore @@ -558,7 +558,7 @@ def test_lambda_handler_failure(self) -> None: { "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789:replay-queue", "receiptHandle": "receiptHandle", - "body": '{"output_type": "output_type", "output_args": {},' + "body": '{"output_destination": "output_destination", "output_args": {},' '"event_input_id": "arn:aws:dummy:eu-central-1:123456789:not-existing-input", ' '"event_payload": {"_id": "_id"}}', } @@ -657,7 +657,7 @@ def test_lambda_handler_failure(self) -> None: event = { "Records": [ { - "body": '{"output_type": "", "output_args": "", "event_payload": ""}', + "body": '{"output_destination": "", "output_args": "", "event_payload": ""}', } ] } diff --git a/tests/handlers/aws/test_utils.py b/tests/handlers/aws/test_utils.py index 20285b82..7208fa5f 100644 --- a/tests/handlers/aws/test_utils.py +++ b/tests/handlers/aws/test_utils.py @@ -92,7 +92,7 @@ def test_get_trigger_type_and_config_source(self) -> None: event = { "Records": [ { - "body": '{"output_type": "output_type", ' + "body": '{"output_destination": "output_destination", ' '"output_args": "output_args", "event_payload": "event_payload"}' } ]