From 233e802b2bb7e36522be7345413736fe319495f4 Mon Sep 17 00:00:00 2001 From: constanca Date: Mon, 27 May 2024 13:40:26 +0200 Subject: [PATCH] first attempt Signed-off-by: constanca --- handlers/aws/handler.py | 21 +++- handlers/aws/replay_trigger.py | 14 +-- handlers/aws/utils.py | 28 +++-- share/config.py | 51 ++++++--- tests/handlers/aws/test_replay_trigger.py | 23 ++-- tests/handlers/aws/test_utils.py | 79 ++++++++++++- tests/share/test_config.py | 129 ++++++++-------------- 7 files changed, 211 insertions(+), 134 deletions(-) diff --git a/handlers/aws/handler.py b/handlers/aws/handler.py index 913a6469..d208b083 100644 --- a/handlers/aws/handler.py +++ b/handlers/aws/handler.py @@ -53,7 +53,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex try: trigger_type, config_source = get_trigger_type_and_config_source(lambda_event) - shared_logger.info("trigger", extra={"type": trigger_type}) + shared_logger.info(f"trigger TYPE IS {trigger_type}", extra={"type": trigger_type}) except Exception as e: raise TriggerTypeException(e) @@ -78,20 +78,31 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex sqs_client = get_sqs_client() if trigger_type == "replay-sqs": + shared_logger.info("TRIGGERED THE REPLAY SQS.") + shared_logger.info("trigger", extra={"size": len(lambda_event["Records"])}) replay_queue_arn = lambda_event["Records"][0]["eventSourceARN"] replay_handler = ReplayedEventReplayHandler(replay_queue_arn=replay_queue_arn) shipper_cache: dict[str, CompositeShipper] = {} for replay_record in lambda_event["Records"]: + # TODO How to trigger this... event = json_parser(replay_record["body"]) + + shared_logger.info(f">> TRIGGERED THE REPLAY SQS: event is {event}") + input_id = event["event_input_id"] output_type = event["output_type"] shipper_id = input_id + output_type + if shipper_id not in shipper_cache: + + # IN THE DLQ + shipper = get_shipper_for_replay_event( config=config, - output_type=output_type, + #output_type=output_type, + output_destination="TODO", output_args=event["output_args"], event_input_id=input_id, replay_handler=replay_handler, @@ -170,7 +181,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex return "completed" aws_region = input_id.split(":")[3] - composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml) + composite_shipper = get_shipper_from_input(event_input=event_input) event_list_from_field_expander = ExpandEventListFromField( event_input.expand_event_list_from_field, @@ -268,7 +279,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex ) return "completed" - composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml) + composite_shipper = get_shipper_from_input(event_input=event_input) event_list_from_field_expander = ExpandEventListFromField( event_input.expand_event_list_from_field, @@ -458,7 +469,7 @@ def handle_timeout( if input_id in composite_shipper_cache: composite_shipper = composite_shipper_cache[input_id] else: - composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml) + composite_shipper = get_shipper_from_input(event_input=event_input) composite_shipper_cache[event_input.id] = composite_shipper continuing_event_expanded_offset: Optional[int] = None diff --git a/handlers/aws/replay_trigger.py b/handlers/aws/replay_trigger.py index 483606eb..2a32b245 100644 --- a/handlers/aws/replay_trigger.py +++ b/handlers/aws/replay_trigger.py @@ -37,7 +37,7 @@ def flush(self) -> None: def get_shipper_for_replay_event( config: Config, - output_type: str, + output_destination: str, output_args: dict[str, Any], event_input_id: str, replay_handler: ReplayedEventReplayHandler, @@ -46,28 +46,28 @@ def get_shipper_for_replay_event( if event_input is None: raise InputConfigException(f"Cannot load input for input id {event_input_id}") - output: Optional[Output] = event_input.get_output_by_type(output_type) + output: Optional[Output] = event_input.get_output_by_destination(output_destination) if output is None: - raise OutputConfigException(f"Cannot load output of type {output_type}") + raise OutputConfigException(f"Cannot load output with destination {output_destination}") # Let's wrap the specific output shipper in the composite one, since the composite deepcopy the mutating events shipper: CompositeShipper = CompositeShipper() - if output_type == "elasticsearch": + if output.type == "elasticsearch": assert isinstance(output, ElasticsearchOutput) output.es_datastream_name = output_args["es_datastream_name"] shared_logger.debug("setting ElasticSearch shipper") - elasticsearch: ProtocolShipper = ShipperFactory.create_from_output(output_type=output_type, output=output) + elasticsearch: ProtocolShipper = ShipperFactory.create_from_output(output_type=output.type, output=output) shipper.add_shipper(elasticsearch) shipper.set_replay_handler(replay_handler=replay_handler.replay_handler) return shipper - if output_type == "logstash": + if output.type == "logstash": assert isinstance(output, LogstashOutput) shared_logger.debug("setting Logstash shipper") - logstash: ProtocolShipper = ShipperFactory.create_from_output(output_type=output_type, output=output) + logstash: ProtocolShipper = ShipperFactory.create_from_output(output_type=output.type, output=output) shipper.add_shipper(logstash) shipper.set_replay_handler(replay_handler=replay_handler.replay_handler) diff --git a/handlers/aws/utils.py b/handlers/aws/utils.py index f952ef53..0e7d93a6 100644 --- a/handlers/aws/utils.py +++ b/handlers/aws/utils.py @@ -134,27 +134,27 @@ def discover_integration_scope(s3_object_key: str) -> str: return INTEGRATION_SCOPE_GENERIC -def get_shipper_from_input(event_input: Input, config_yaml: str) -> CompositeShipper: +def get_shipper_from_input(event_input: Input) -> CompositeShipper: composite_shipper: CompositeShipper = CompositeShipper() - for output_type in event_input.get_output_types(): - if output_type == "elasticsearch": + for output_destination in event_input.get_output_destinations(): + output: Optional[Output] = event_input.get_output_by_destination(output_destination) + assert output is not None + + if output.type == "elasticsearch": shared_logger.debug("setting ElasticSearch shipper") - elasticsearch_output: Optional[Output] = event_input.get_output_by_type("elasticsearch") - assert elasticsearch_output is not None elasticsearch_shipper: ProtocolShipper = ShipperFactory.create_from_output( - output_type="elasticsearch", output=elasticsearch_output + output_type="elasticsearch", output=output ) + composite_shipper.add_shipper(shipper=elasticsearch_shipper) - if output_type == "logstash": + if output.type == "logstash": shared_logger.debug("setting Logstash shipper") - logstash_output: Optional[Output] = event_input.get_output_by_type("logstash") - assert logstash_output is not None logstash_shipper: ProtocolShipper = ShipperFactory.create_from_output( - output_type="logstash", output=logstash_output + output_type="logstash", output=output ) composite_shipper.add_shipper(shipper=logstash_shipper) @@ -294,10 +294,18 @@ def get_trigger_type_and_config_source(event: dict[str, Any]) -> tuple[str, str] event_source = "" first_record = event["Records"][0] + + shared_logger.info(f"FIRST RECORD IS {first_record}") + if "body" in first_record: event_body = first_record["body"] try: body = json_parser(event_body) + + print(f">>> EVENT IN JSON {body}") + + # When could this happen... Put wrong password for cloud - also no + if ( isinstance(body, dict) and "output_type" in event_body diff --git a/share/config.py b/share/config.py index 336c1516..1c87ca06 100644 --- a/share/config.py +++ b/share/config.py @@ -63,9 +63,6 @@ def __init__( self.batch_max_bytes = batch_max_bytes self.ssl_assert_fingerprint = ssl_assert_fingerprint - if not self.cloud_id and not self.elasticsearch_url: - raise ValueError("One between `elasticsearch_url` or `cloud_id` must be set") - if self.cloud_id and self.elasticsearch_url: shared_logger.warning("both `elasticsearch_url` and `cloud_id` set in config: using `elasticsearch_url`") self.cloud_id = "" @@ -394,42 +391,62 @@ def include_exclude_filter(self, value: IncludeExcludeFilter) -> None: self._include_exclude_filter = value - def get_output_by_type(self, output_type: str) -> Optional[Output]: + def get_output_by_destination(self, output_destination: str) -> Optional[Output]: """ Output getter. - Returns a specific output given its type + Returns a specific output given its destination """ - return self._outputs[output_type] if output_type in self._outputs else None + return self._outputs[output_destination] if output_destination in self._outputs else None - def get_output_types(self) -> list[str]: + def get_output_destinations(self) -> list[str]: """ - Output types getter. - Returns all the defined output types + Output destinations getter. + Returns all the defined output destinations """ return list(self._outputs.keys()) - def delete_output_by_type(self, output_type: str) -> None: + # TODO + def delete_output_by_destination(self, output_destination: str) -> None: """ Output deleter. Delete a defined output by its type """ - del self._outputs[output_type] + del self._outputs[output_destination] def add_output(self, output_type: str, **kwargs: Any) -> None: """ Output setter. Set an output given its type and init kwargs """ - if not isinstance(output_type, str): - raise ValueError("`type` must be provided as string") - if output_type in self._outputs: - raise ValueError(f"Duplicated `type` {output_type}") + if output_type not in _available_output_types: + raise ValueError(f"Type {output_type} is not included in the supported types: {_available_output_types}") + + output_dest = "" + if output_type == "elasticsearch": + if "cloud_id" not in kwargs and "elasticsearch_url" not in kwargs: + raise ValueError("Either `elasticsearch_url` or `cloud_id` must be set") + # elasticsearch_url takes precedence over cloud_id + if "elasticsearch_url" not in kwargs: + + output_dest = kwargs["cloud_id"] + else: + output_dest = kwargs["elasticsearch_url"] + elif output_type == "logstash": + if "logstash_url" not in kwargs: + raise ValueError(f"Output type {output_type} requires logstash_url to be set") + output_dest = kwargs["logstash_url"] + + print("Outpus is ", self._outputs) + print(f"Output dest is {output_dest}") + if output_dest in self._outputs: + # Since logstash destination can only be set as logstash_url, we do not have to account + # for the same url/cloud_id for both types logstash or elasticsearch + raise ValueError(f"Duplicated output destination {output_dest} for type {output_type}") - output: Optional[Output] = None if output_type == "elasticsearch": output = ElasticsearchOutput(**kwargs) elif output_type == "logstash": @@ -437,7 +454,7 @@ def add_output(self, output_type: str, **kwargs: Any) -> None: else: output = Output(output_type=output_type) - self._outputs[output.type] = output + self._outputs[output_dest] = output def get_multiline_processor(self) -> Optional[ProtocolMultiline]: return self._multiline_processor diff --git a/tests/handlers/aws/test_replay_trigger.py b/tests/handlers/aws/test_replay_trigger.py index c67efd52..afd9e3a2 100644 --- a/tests/handlers/aws/test_replay_trigger.py +++ b/tests/handlers/aws/test_replay_trigger.py @@ -8,6 +8,7 @@ import mock import pytest +from handlers.aws import OutputConfigException from handlers.aws.replay_trigger import ReplayedEventReplayHandler, get_shipper_for_replay_event from share import parse_config from shippers import CompositeShipper, ElasticsearchShipper, LogstashShipper @@ -31,7 +32,7 @@ def test_get_shipper_for_replay_event(self) -> None: replay_handler = ReplayedEventReplayHandler("arn:aws:sqs:eu-central-1:123456789:queue/replayqueue") logstash_shipper: Optional[CompositeShipper] = get_shipper_for_replay_event( config, - "logstash", + "logstash_url", {}, "arn:aws:kinesis:eu-central-1:123456789:stream/test-esf-kinesis-stream", replay_handler, @@ -56,7 +57,7 @@ def test_get_shipper_for_replay_event(self) -> None: replay_handler = ReplayedEventReplayHandler("arn:aws:sqs:eu-central-1:123456789:queue/replayqueue") elasticsearch_shipper: Optional[CompositeShipper] = get_shipper_for_replay_event( config, - "elasticsearch", + "elasticsearch_url", {"es_datastream_name": "es_datastream_name"}, "arn:aws:kinesis:eu-central-1:123456789:stream/test-esf-kinesis-stream", replay_handler, @@ -65,7 +66,7 @@ def test_get_shipper_for_replay_event(self) -> None: assert isinstance(elasticsearch_shipper, CompositeShipper) assert isinstance(elasticsearch_shipper._shippers[0], ElasticsearchShipper) - with self.subTest("None shipper from replay event"): + with self.subTest("Exception from output destination"): config_yaml_kinesis = """ inputs: - type: kinesis-data-stream @@ -77,11 +78,11 @@ def test_get_shipper_for_replay_event(self) -> None: """ config = parse_config(config_yaml_kinesis) replay_handler = ReplayedEventReplayHandler("arn:aws:sqs:eu-central-1:123456789:queue/replayqueue") - none_shipper = get_shipper_for_replay_event( - config, - "output_type", - {}, - "arn:aws:kinesis:eu-central-1:123456789:stream/test-esf-kinesis-stream", - replay_handler, - ) - assert none_shipper is None + with self.assertRaisesRegex(OutputConfigException, "test"): + get_shipper_for_replay_event( + config, + "test", + {}, + "arn:aws:kinesis:eu-central-1:123456789:stream/test-esf-kinesis-stream", + replay_handler, + ) diff --git a/tests/handlers/aws/test_utils.py b/tests/handlers/aws/test_utils.py index 16e02ea5..20285b82 100644 --- a/tests/handlers/aws/test_utils.py +++ b/tests/handlers/aws/test_utils.py @@ -93,7 +93,7 @@ def test_get_trigger_type_and_config_source(self) -> None: "Records": [ { "body": '{"output_type": "output_type", ' - '"output_args": "output_args", "event_payload": "event_payload"}' + '"output_args": "output_args", "event_payload": "event_payload"}' } ] } @@ -216,9 +216,10 @@ def test_get_shipper_from_input(self) -> None: "arn:aws:kinesis:eu-central-1:123456789:stream/test-esf-kinesis-stream" ) assert event_input is not None - shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml_kinesis) + shipper = get_shipper_from_input(event_input=event_input) assert len(shipper._shippers) == 1 assert isinstance(shipper._shippers[0], LogstashShipper) + event_input.delete_output_by_destination("logstash_url") with self.subTest("Logstash shipper from Cloudwatch logs input"): config_yaml_cw: str = """ @@ -233,10 +234,82 @@ def test_get_shipper_from_input(self) -> None: config = parse_config(config_yaml_cw) event_input = config.get_input_by_id("arn:aws:logs:eu-central-1:123456789:stream/test-cw-logs") assert event_input is not None - shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml_cw) + shipper = get_shipper_from_input(event_input=event_input) assert len(shipper._shippers) == 1 assert isinstance(shipper._shippers[0], LogstashShipper) + with self.subTest("Logstash shipper from each input"): + config_yaml_cw: str = """ + inputs: + - type: cloudwatch-logs + id: arn:aws:logs:eu-central-1:123456789:stream/test-cw-logs + outputs: + - type: logstash + args: + logstash_url: logstash_url + - type: kinesis-data-stream + id: arn:aws:kinesis:eu-central-1:123456789:stream/test-esf-kinesis-stream + outputs: + - type: logstash + args: + logstash_url: logstash_url + """ + config = parse_config(config_yaml_cw) + event_input_cw = config.get_input_by_id("arn:aws:logs:eu-central-1:123456789:stream/test-cw-logs") + assert event_input_cw is not None + shipper = get_shipper_from_input(event_input=event_input_cw) + assert len(shipper._shippers) == 1 + assert isinstance(shipper._shippers[0], LogstashShipper) + + event_input_kinesis = config.get_input_by_id("arn:aws:kinesis:eu-central-1:123456789:stream/test-esf" + "-kinesis-stream") + assert event_input_kinesis is not None + shipper = get_shipper_from_input(event_input=event_input_kinesis) + assert len(shipper._shippers) == 1 + assert isinstance(shipper._shippers[0], LogstashShipper) + + event_input_cw.delete_output_by_destination("logstash_url") + event_input_kinesis.delete_output_by_destination("logstash_url") + + with self.subTest("Two Logstash shippers from Cloudwatch logs input"): + config_yaml_cw: str = """ + inputs: + - type: cloudwatch-logs + id: arn:aws:logs:eu-central-1:123456789:stream/test-cw-logs + outputs: + - type: logstash + args: + logstash_url: logstash_url-1 + - type: logstash + args: + logstash_url: logstash_url-2 + """ + config = parse_config(config_yaml_cw) + event_input = config.get_input_by_id("arn:aws:logs:eu-central-1:123456789:stream/test-cw-logs") + assert event_input is not None + shipper = get_shipper_from_input(event_input=event_input) + assert len(shipper._shippers) == 2 + assert isinstance(shipper._shippers[0], LogstashShipper) + assert isinstance(shipper._shippers[1], LogstashShipper) + event_input.delete_output_by_destination("logstash_url-1") + event_input.delete_output_by_destination("logstash_url-2") + + with self.subTest("Two outputs with the same logstash_url"): + config_yaml_cw: str = """ + inputs: + - type: cloudwatch-logs + id: arn:aws:logs:eu-central-1:123456789:stream/test-cw-logs + outputs: + - type: logstash + args: + logstash_url: logstash_url + - type: logstash + args: + logstash_url: logstash_url + """ + with self.assertRaisesRegex(ValueError, "logstash_url"): + parse_config(config_yaml_cw) + @pytest.mark.unit class TestRecordId(TestCase): diff --git a/tests/share/test_config.py b/tests/share/test_config.py index 93d30766..b9d34319 100644 --- a/tests/share/test_config.py +++ b/tests/share/test_config.py @@ -134,10 +134,6 @@ def test_init(self) -> None: assert elasticsearch.batch_max_bytes == 1 assert elasticsearch.ssl_assert_fingerprint == "" - with self.subTest("neither elasticsearch_url or cloud_id"): - with self.assertRaisesRegex(ValueError, "`elasticsearch_url` or `cloud_id` must be set"): - ElasticsearchOutput(elasticsearch_url="", cloud_id="") - with self.subTest("both elasticsearch_url and cloud_id"): elasticsearch = ElasticsearchOutput( elasticsearch_url="elasticsearch_url", @@ -581,7 +577,7 @@ def test_input_include_exclude_filter(self) -> None: def test_get_output_by_type(self) -> None: with self.subTest("none output"): input_sqs = Input(input_type="s3-sqs", input_id="id") - assert input_sqs.get_output_by_type(output_type="test") is None + assert input_sqs.get_output_by_destination(output_destination="test") is None with self.subTest("elasticsearch output"): input_sqs = Input(input_type="s3-sqs", input_id="id") @@ -595,7 +591,8 @@ def test_get_output_by_type(self) -> None: batch_max_bytes=1, ) - assert isinstance(input_sqs.get_output_by_type(output_type="elasticsearch"), ElasticsearchOutput) + assert isinstance(input_sqs.get_output_by_destination(output_destination="elasticsearch_url"), + ElasticsearchOutput) with self.subTest("logstash output"): input_sqs = Input(input_type="s3-sqs", input_id="id") @@ -604,7 +601,7 @@ def test_get_output_by_type(self) -> None: logstash_url="logstash_url", ) - assert isinstance(input_sqs.get_output_by_type(output_type="logstash"), LogstashOutput) + assert isinstance(input_sqs.get_output_by_destination(output_destination="logstash_url"), LogstashOutput) def test_add_output(self) -> None: with self.subTest("elasticsearch output"): @@ -619,7 +616,8 @@ def test_add_output(self) -> None: batch_max_bytes=1, ) - assert isinstance(input_sqs.get_output_by_type(output_type="elasticsearch"), ElasticsearchOutput) + assert isinstance(input_sqs.get_output_by_destination(output_destination="elasticsearch_url"), + ElasticsearchOutput) with self.subTest("logstash output"): input_sqs = Input(input_type="s3-sqs", input_id="id") @@ -631,21 +629,19 @@ def test_add_output(self) -> None: ssl_assert_fingerprint="fingerprint", ) - assert isinstance(input_sqs.get_output_by_type(output_type="logstash"), LogstashOutput) + assert isinstance(input_sqs.get_output_by_destination(output_destination="logstash_url"), LogstashOutput) with self.subTest("not elasticsearch or logstash output"): input_sqs = Input(input_type="s3-sqs", input_id="id") - with self.assertRaisesRegex( - ValueError, "^`type` must be one of elasticsearch,logstash: another-type given$" - ): + with self.assertRaisesRegex(ValueError, "another-type"): input_sqs.add_output(output_type="another-type") - with self.subTest("type is not str"): + def test_get_output_destinations(self) -> None: + with self.subTest("none output"): input_sqs = Input(input_type="s3-sqs", input_id="id") - with self.assertRaisesRegex(ValueError, "`type` must be provided as string"): - input_sqs.add_output(output_type=0) # type:ignore + assert input_sqs.get_output_destinations() == [] - with self.subTest("type is duplicated"): + with self.subTest("elasticsearch output with only elasticsearch_url set"): input_sqs = Input(input_type="s3-sqs", input_id="id") input_sqs.add_output( output_type="elasticsearch", @@ -657,27 +653,28 @@ def test_add_output(self) -> None: batch_max_bytes=1, ) - with self.assertRaisesRegex(ValueError, "Duplicated `type` elasticsearch"): - input_sqs.add_output( - output_type="elasticsearch", - elasticsearch_url="elasticsearch_url", - username="username", - password="password", - es_datastream_name="es_datastream_name", - batch_max_actions=1, - batch_max_bytes=1, - ) + assert input_sqs.get_output_destinations() == ["elasticsearch_url"] - def test_get_output_types(self) -> None: - with self.subTest("none output"): + with self.subTest("elasticsearch output with only cloud_id set"): input_sqs = Input(input_type="s3-sqs", input_id="id") - assert input_sqs.get_output_types() == [] + input_sqs.add_output( + output_type="elasticsearch", + cloud_id="cloud_id", + username="username", + password="password", + es_datastream_name="es_datastream_name", + batch_max_actions=1, + batch_max_bytes=1, + ) - with self.subTest("elasticsearch output"): + assert input_sqs.get_output_destinations() == ["cloud_id"] + + with self.subTest("elasticsearch output with elasticsearch_url and cloud_id set"): input_sqs = Input(input_type="s3-sqs", input_id="id") input_sqs.add_output( output_type="elasticsearch", elasticsearch_url="elasticsearch_url", + cloud_id="cloud_id", username="username", password="password", es_datastream_name="es_datastream_name", @@ -685,7 +682,7 @@ def test_get_output_types(self) -> None: batch_max_bytes=1, ) - assert input_sqs.get_output_types() == ["elasticsearch"] + assert input_sqs.get_output_destinations() == ["elasticsearch_url"] def test_delete_output_by_type(self) -> None: with self.subTest("delete elasticsearch output"): @@ -700,13 +697,13 @@ def test_delete_output_by_type(self) -> None: batch_max_bytes=1, ) - input_sqs.delete_output_by_type("elasticsearch") - assert input_sqs.get_output_types() == [] + input_sqs.delete_output_by_destination("elasticsearch_url") + assert input_sqs.get_output_destinations() == [] with self.subTest("delete not existing output"): input_sqs = Input(input_type="s3-sqs", input_id="id") - with self.assertRaisesRegex(KeyError, "'type"): - input_sqs.delete_output_by_type("type") + with self.assertRaisesRegex(KeyError, "destination"): + input_sqs.delete_output_by_destination("destination") @pytest.mark.unit @@ -899,36 +896,6 @@ def test_parse_config(self) -> None: """ ) - with self.subTest("not valid input output"): - with self.assertRaisesRegex( - ValueError, - "^An error occurred while applying output configuration at position 1 for input id: " - "`type` must be one of elasticsearch,logstash: another-type given$", - ): - parse_config( - config_yaml=""" - inputs: - - type: s3-sqs - id: id - outputs: - - type: another-type - args: - key: value - """ - ) - - with self.assertRaisesRegex(ValueError, "One between `elasticsearch_url` or `cloud_id` must be set"): - parse_config( - config_yaml=""" - inputs: - - type: s3-sqs - id: id - outputs: - - type: elasticsearch - args: {} - """ - ) - with self.subTest("batch_max_actions not int"): with self.assertRaisesRegex( ValueError, @@ -1070,7 +1037,7 @@ def test_parse_config(self) -> None: assert input_sqs.type == "s3-sqs" assert input_sqs.id == "id" assert input_sqs.expand_event_list_from_field == "aField" - elasticsearch = input_sqs.get_output_by_type(output_type="elasticsearch") + elasticsearch = input_sqs.get_output_by_destination(output_destination="cloud_id") assert elasticsearch is not None assert isinstance(elasticsearch, ElasticsearchOutput) @@ -1123,7 +1090,7 @@ def test_parse_config(self) -> None: assert input_sqs.type == "s3-sqs" assert input_sqs.id == "id" assert input_sqs.root_fields_to_add_to_expanded_event == "all" - elasticsearch = input_sqs.get_output_by_type(output_type="elasticsearch") + elasticsearch = input_sqs.get_output_by_destination(output_destination="cloud_id") assert elasticsearch is not None assert isinstance(elasticsearch, ElasticsearchOutput) @@ -1157,7 +1124,7 @@ def test_parse_config(self) -> None: assert input_sqs.type == "s3-sqs" assert input_sqs.id == "id" assert input_sqs.root_fields_to_add_to_expanded_event == ["one", "two"] - elasticsearch = input_sqs.get_output_by_type(output_type="elasticsearch") + elasticsearch = input_sqs.get_output_by_destination(output_destination="cloud_id") assert elasticsearch is not None assert isinstance(elasticsearch, ElasticsearchOutput) @@ -1399,7 +1366,7 @@ def test_parse_config(self) -> None: assert input_sqs.id == "id" assert input_sqs.tags == ["tag1", "tag2", "tag3"] - elasticsearch = input_sqs.get_output_by_type(output_type="elasticsearch") + elasticsearch = input_sqs.get_output_by_destination(output_destination="elasticsearch_url") assert elasticsearch is not None assert isinstance(elasticsearch, ElasticsearchOutput) @@ -1438,7 +1405,7 @@ def test_parse_config(self) -> None: assert input_sqs.id == "id" assert input_sqs.tags == ["tag1", "tag2", "tag3"] - elasticsearch = input_sqs.get_output_by_type(output_type="elasticsearch") + elasticsearch = input_sqs.get_output_by_destination(output_destination="elasticsearch_url") assert elasticsearch is not None assert isinstance(elasticsearch, ElasticsearchOutput) @@ -1477,7 +1444,7 @@ def test_parse_config(self) -> None: assert input_sqs.id == "id" assert input_sqs.tags == ["tag1", "tag2", "tag3"] - elasticsearch = input_sqs.get_output_by_type(output_type="elasticsearch") + elasticsearch = input_sqs.get_output_by_destination(output_destination="cloud_id") assert elasticsearch is not None assert isinstance(elasticsearch, ElasticsearchOutput) @@ -1516,7 +1483,7 @@ def test_parse_config(self) -> None: assert input_sqs.id == "id" assert input_sqs.tags == ["tag1", "tag2", "tag3"] - elasticsearch = input_sqs.get_output_by_type(output_type="elasticsearch") + elasticsearch = input_sqs.get_output_by_destination(output_destination="cloud_id") assert elasticsearch is not None assert isinstance(elasticsearch, ElasticsearchOutput) @@ -1554,7 +1521,7 @@ def test_parse_config(self) -> None: assert input_sqs.id == "id" assert input_sqs.tags == [] - elasticsearch = input_sqs.get_output_by_type(output_type="elasticsearch") + elasticsearch = input_sqs.get_output_by_destination(output_destination="cloud_id") assert elasticsearch is not None assert isinstance(elasticsearch, ElasticsearchOutput) @@ -1595,7 +1562,7 @@ def test_parse_config(self) -> None: assert input_sqs.id == "id" assert input_sqs.tags == ["input_tag1", "input_tag2"] - elasticsearch = input_sqs.get_output_by_type(output_type="elasticsearch") + elasticsearch = input_sqs.get_output_by_destination(output_destination="cloud_id") assert elasticsearch is not None assert isinstance(elasticsearch, ElasticsearchOutput) @@ -1633,7 +1600,7 @@ def test_parse_config(self) -> None: assert input_sqs.id == "id" assert input_sqs.tags == ["tag1", "tag2", "tag3"] - elasticsearch = input_sqs.get_output_by_type(output_type="elasticsearch") + elasticsearch = input_sqs.get_output_by_destination(output_destination="cloud_id") assert elasticsearch is not None assert isinstance(elasticsearch, ElasticsearchOutput) @@ -1682,7 +1649,7 @@ def test_parse_config(self) -> None: ], ) - elasticsearch = input_sqs.get_output_by_type(output_type="elasticsearch") + elasticsearch = input_sqs.get_output_by_destination(output_destination="cloud_id") assert elasticsearch is not None assert isinstance(elasticsearch, ElasticsearchOutput) @@ -1697,7 +1664,7 @@ def test_parse_config(self) -> None: with self.subTest("no list for include"): with self.assertRaisesRegex(ValueError, "`include` must be provided as list for input id"): - config = parse_config( + parse_config( config_yaml=""" inputs: - type: s3-sqs @@ -1718,7 +1685,7 @@ def test_parse_config(self) -> None: with self.subTest("no list for exclude"): with self.assertRaisesRegex(ValueError, "`exclude` must be provided as list for input id"): - config = parse_config( + parse_config( config_yaml=""" inputs: - type: s3-sqs @@ -1936,7 +1903,7 @@ def test_parse_config(self) -> None: assert input_sqs.id == "id" assert input_sqs.tags == [] - elasticsearch = input_sqs.get_output_by_type(output_type="elasticsearch") + elasticsearch = input_sqs.get_output_by_destination(output_destination="cloud_id") assert elasticsearch is not None assert isinstance(elasticsearch, ElasticsearchOutput) @@ -1971,7 +1938,7 @@ def test_parse_config(self) -> None: assert input_sqs.id == "id" assert input_sqs.tags == [] - elasticsearch = input_sqs.get_output_by_type(output_type="elasticsearch") + elasticsearch = input_sqs.get_output_by_destination(output_destination="cloud_id") assert elasticsearch is not None assert isinstance(elasticsearch, ElasticsearchOutput) @@ -2006,7 +1973,7 @@ def test_parse_config(self) -> None: assert input_sqs.id == "id" assert input_sqs.tags == [] - elasticsearch = input_sqs.get_output_by_type(output_type="elasticsearch") + elasticsearch = input_sqs.get_output_by_destination(output_destination="cloud_id") assert elasticsearch is not None assert isinstance(elasticsearch, ElasticsearchOutput)