Skip to content

Commit

Permalink
Add multiple outputs
Browse files Browse the repository at this point in the history
Signed-off-by: constanca <[email protected]>
  • Loading branch information
constanca-m committed May 29, 2024
1 parent 233e802 commit 2abba65
Show file tree
Hide file tree
Showing 10 changed files with 28 additions and 44 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### v1.15.0 - 2024/05/29
##### Features
* Enable multiple outputs for each input []()

### v1.14.0 - 2024/05/07
##### Bug fixes
* Report misconfigured input ids as an error instead of warning, and place those messages in the replaying queue [#711](https://github.com/elastic/elastic-serverless-forwarder/pull/711).
Expand Down
22 changes: 6 additions & 16 deletions handlers/aws/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex

try:
trigger_type, config_source = get_trigger_type_and_config_source(lambda_event)
shared_logger.info(f"trigger TYPE IS {trigger_type}", extra={"type": trigger_type})
shared_logger.info("trigger", extra={"type": trigger_type})
except Exception as e:
raise TriggerTypeException(e)

Expand All @@ -78,31 +78,21 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
sqs_client = get_sqs_client()

if trigger_type == "replay-sqs":
shared_logger.info("TRIGGERED THE REPLAY SQS.")

shared_logger.info("trigger", extra={"size": len(lambda_event["Records"])})

replay_queue_arn = lambda_event["Records"][0]["eventSourceARN"]
replay_handler = ReplayedEventReplayHandler(replay_queue_arn=replay_queue_arn)
shipper_cache: dict[str, CompositeShipper] = {}
for replay_record in lambda_event["Records"]:
# TODO How to trigger this...
event = json_parser(replay_record["body"])

shared_logger.info(f">> TRIGGERED THE REPLAY SQS: event is {event}")

input_id = event["event_input_id"]
output_type = event["output_type"]
shipper_id = input_id + output_type
output_destination = event["output_destination"]
shipper_id = input_id + output_destination

if shipper_id not in shipper_cache:

# IN THE DLQ

shipper = get_shipper_for_replay_event(
config=config,
#output_type=output_type,
output_destination="TODO",
output_destination=output_destination,
output_args=event["output_args"],
event_input_id=input_id,
replay_handler=replay_handler,
Expand All @@ -111,7 +101,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
if shipper is None:
shared_logger.warning(
"no shipper for output in replay queue",
extra={"output_type": event["output_type"], "event_input_id": event["event_input_id"]},
extra={"output_destination": event["output_destination"], "event_input_id": event["event_input_id"]},
)
continue

Expand All @@ -122,7 +112,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
assert isinstance(shipper, CompositeShipper)

shipper.send(event["event_payload"])
event_uniq_id: str = event["event_payload"]["_id"] + output_type
event_uniq_id: str = event["event_payload"]["_id"] + output_destination
replay_handler.add_event_with_receipt_handle(
event_uniq_id=event_uniq_id, receipt_handle=replay_record["receiptHandle"]
)
Expand Down
4 changes: 2 additions & 2 deletions handlers/aws/replay_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def __init__(self, replay_queue_arn: str):
def add_event_with_receipt_handle(self, event_uniq_id: str, receipt_handle: str) -> None:
self._events_with_receipt_handle[event_uniq_id] = receipt_handle

def replay_handler(self, output_type: str, output_args: dict[str, Any], event_payload: dict[str, Any]) -> None:
event_uniq_id: str = event_payload["_id"] + output_type
def replay_handler(self, output_destination: str, output_args: dict[str, Any], event_payload: dict[str, Any]) -> None:
event_uniq_id: str = event_payload["_id"] + output_destination
self._failed_event_ids.append(event_uniq_id)

def flush(self) -> None:
Expand Down
16 changes: 4 additions & 12 deletions handlers/aws/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,21 +294,13 @@ def get_trigger_type_and_config_source(event: dict[str, Any]) -> tuple[str, str]

event_source = ""
first_record = event["Records"][0]

shared_logger.info(f"FIRST RECORD IS {first_record}")

if "body" in first_record:
event_body = first_record["body"]
try:
body = json_parser(event_body)

print(f">>> EVENT IN JSON {body}")

# When could this happen... Put wrong password for cloud - also no

if (
isinstance(body, dict)
and "output_type" in event_body
and "output_destination" in event_body
and "output_args" in event_body
and "event_payload" in event_body
):
Expand Down Expand Up @@ -356,13 +348,13 @@ class ReplayEventHandler:
def __init__(self, event_input: Input):
self._event_input_id: str = event_input.id

def replay_handler(self, output_type: str, output_args: dict[str, Any], event_payload: dict[str, Any]) -> None:
def replay_handler(self, output_destination: str, output_args: dict[str, Any], event_payload: dict[str, Any]) -> None:
sqs_replay_queue = os.environ["SQS_REPLAY_URL"]

sqs_client = get_sqs_client()

message_payload: dict[str, Any] = {
"output_type": output_type,
"output_destination": output_destination,
"output_args": output_args,
"event_payload": event_payload,
"event_input_id": self._event_input_id,
Expand All @@ -371,7 +363,7 @@ def replay_handler(self, output_type: str, output_args: dict[str, Any], event_pa
sqs_client.send_message(QueueUrl=sqs_replay_queue, MessageBody=json_dumper(message_payload))

shared_logger.debug(
"sent to replay queue", extra={"output_type": output_type, "event_input_id": self._event_input_id}
"sent to replay queue", extra={"output_destination": output_destination, "event_input_id": self._event_input_id}
)


Expand Down
4 changes: 0 additions & 4 deletions share/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ def get_output_destinations(self) -> list[str]:

return list(self._outputs.keys())

# TODO
def delete_output_by_destination(self, output_destination: str) -> None:
"""
Output deleter.
Expand All @@ -431,7 +430,6 @@ def add_output(self, output_type: str, **kwargs: Any) -> None:
raise ValueError("Either `elasticsearch_url` or `cloud_id` must be set")
# elasticsearch_url takes precedence over cloud_id
if "elasticsearch_url" not in kwargs:

output_dest = kwargs["cloud_id"]
else:
output_dest = kwargs["elasticsearch_url"]
Expand All @@ -440,8 +438,6 @@ def add_output(self, output_type: str, **kwargs: Any) -> None:
raise ValueError(f"Output type {output_type} requires logstash_url to be set")
output_dest = kwargs["logstash_url"]

print("Outpus is ", self._outputs)
print(f"Output dest is {output_dest}")
if output_dest in self._outputs:
# Since logstash destination can only be set as logstash_url, we do not have to account
# for the same url/cloud_id for both types logstash or elasticsearch
Expand Down
2 changes: 1 addition & 1 deletion share/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.

version = "1.14.0"
version = "1.15.0"
4 changes: 3 additions & 1 deletion shippers/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ def __init__(
es_client_kwargs: dict[str, Any] = {}
if elasticsearch_url:
es_client_kwargs["hosts"] = [elasticsearch_url]
self._output_destination = elasticsearch_url
elif cloud_id:
es_client_kwargs["cloud_id"] = cloud_id
self._output_destination = cloud_id
else:
raise ValueError("You must provide one between elasticsearch_url or cloud_id")

Expand Down Expand Up @@ -166,7 +168,7 @@ def _handle_outcome(self, errors: tuple[int, Union[int, list[Any]]]) -> None:
)
shared_logger.debug("elasticsearch shipper", extra={"action": action_failed[0]})
if self._replay_handler is not None:
self._replay_handler("elasticsearch", self._replay_args, action_failed[0])
self._replay_handler(self._output_destination, self._replay_args, action_failed[0])

if failed > 0:
shared_logger.warning("elasticsearch shipper", extra={"success": success, "failed": failed})
Expand Down
2 changes: 1 addition & 1 deletion shippers/logstash.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,4 @@ def _send(self) -> None:
event["_id"] = event["@metadata"]["_id"]
del event["@metadata"]

self._replay_handler("logstash", self._replay_args, event)
self._replay_handler(self._logstash_url, self._replay_args, event)
12 changes: 6 additions & 6 deletions tests/handlers/aws/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,15 +420,15 @@ def test_lambda_handler_noop(self) -> None:
with self.subTest("output not elasticsearch from payload config"):
with mock.patch(
"handlers.aws.handler.get_shipper_for_replay_event",
lambda config, output_type, output_args, event_input_id, replay_handler: None,
lambda config, output_destination, output_args, event_input_id, replay_handler: None,
):
ctx = ContextMock()
event = {
"Records": [
{
"eventSourceARN": "arn:aws:sqs:eu-central-1:123456789:replay-queue",
"receiptHandle": "receiptHandle",
"body": '{"output_type": "output_type", "output_args": {},'
"body": '{"output_destination": "output_destination", "output_args": {},'
'"event_input_id": "arn:aws:sqs:eu-central-1:123456789:s3-sqs-queue", '
'"event_payload": {"_id": "_id"}}',
}
Expand Down Expand Up @@ -540,13 +540,13 @@ def test_lambda_handler_failure(self) -> None:
{
"eventSourceARN": "arn:aws:sqs:eu-central-1:123456789:replay-queue",
"receiptHandle": "receiptHandle",
"body": '{"output_type": "output_type", "output_args": {},'
"body": '{"output_destination": "output_destination", "output_args": {},'
'"event_input_id": "arn:aws:dummy:eu-central-1:123456789:input", '
'"event_payload": {"_id": "_id"}}',
}
]
}
with self.assertRaisesRegex(OutputConfigException, "Cannot load output of type output_type"):
with self.assertRaisesRegex(OutputConfigException, "Cannot load output with destination output_destination"):
ctx = ContextMock()

handler(event, ctx) # type:ignore
Expand All @@ -558,7 +558,7 @@ def test_lambda_handler_failure(self) -> None:
{
"eventSourceARN": "arn:aws:sqs:eu-central-1:123456789:replay-queue",
"receiptHandle": "receiptHandle",
"body": '{"output_type": "output_type", "output_args": {},'
"body": '{"output_destination": "output_destination", "output_args": {},'
'"event_input_id": "arn:aws:dummy:eu-central-1:123456789:not-existing-input", '
'"event_payload": {"_id": "_id"}}',
}
Expand Down Expand Up @@ -657,7 +657,7 @@ def test_lambda_handler_failure(self) -> None:
event = {
"Records": [
{
"body": '{"output_type": "", "output_args": "", "event_payload": ""}',
"body": '{"output_destination": "", "output_args": "", "event_payload": ""}',
}
]
}
Expand Down
2 changes: 1 addition & 1 deletion tests/handlers/aws/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_get_trigger_type_and_config_source(self) -> None:
event = {
"Records": [
{
"body": '{"output_type": "output_type", '
"body": '{"output_destination": "output_destination", '
'"output_args": "output_args", "event_payload": "event_payload"}'
}
]
Expand Down

0 comments on commit 2abba65

Please sign in to comment.