Skip to content

Commit

Permalink
first attempt
Browse files Browse the repository at this point in the history
Signed-off-by: constanca <[email protected]>
  • Loading branch information
constanca-m committed May 27, 2024
1 parent acbe702 commit 233e802
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 134 deletions.
21 changes: 16 additions & 5 deletions handlers/aws/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex

try:
trigger_type, config_source = get_trigger_type_and_config_source(lambda_event)
shared_logger.info("trigger", extra={"type": trigger_type})
shared_logger.info(f"trigger TYPE IS {trigger_type}", extra={"type": trigger_type})
except Exception as e:
raise TriggerTypeException(e)

Expand All @@ -78,20 +78,31 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
sqs_client = get_sqs_client()

if trigger_type == "replay-sqs":
shared_logger.info("TRIGGERED THE REPLAY SQS.")

shared_logger.info("trigger", extra={"size": len(lambda_event["Records"])})

replay_queue_arn = lambda_event["Records"][0]["eventSourceARN"]
replay_handler = ReplayedEventReplayHandler(replay_queue_arn=replay_queue_arn)
shipper_cache: dict[str, CompositeShipper] = {}
for replay_record in lambda_event["Records"]:
# TODO How to trigger this...
event = json_parser(replay_record["body"])

shared_logger.info(f">> TRIGGERED THE REPLAY SQS: event is {event}")

input_id = event["event_input_id"]
output_type = event["output_type"]
shipper_id = input_id + output_type

if shipper_id not in shipper_cache:

# IN THE DLQ

shipper = get_shipper_for_replay_event(
config=config,
output_type=output_type,
#output_type=output_type,
output_destination="TODO",
output_args=event["output_args"],
event_input_id=input_id,
replay_handler=replay_handler,
Expand Down Expand Up @@ -170,7 +181,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
return "completed"

aws_region = input_id.split(":")[3]
composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml)
composite_shipper = get_shipper_from_input(event_input=event_input)

event_list_from_field_expander = ExpandEventListFromField(
event_input.expand_event_list_from_field,
Expand Down Expand Up @@ -268,7 +279,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
)
return "completed"

composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml)
composite_shipper = get_shipper_from_input(event_input=event_input)

event_list_from_field_expander = ExpandEventListFromField(
event_input.expand_event_list_from_field,
Expand Down Expand Up @@ -458,7 +469,7 @@ def handle_timeout(
if input_id in composite_shipper_cache:
composite_shipper = composite_shipper_cache[input_id]
else:
composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml)
composite_shipper = get_shipper_from_input(event_input=event_input)
composite_shipper_cache[event_input.id] = composite_shipper

continuing_event_expanded_offset: Optional[int] = None
Expand Down
14 changes: 7 additions & 7 deletions handlers/aws/replay_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def flush(self) -> None:

def get_shipper_for_replay_event(
config: Config,
output_type: str,
output_destination: str,
output_args: dict[str, Any],
event_input_id: str,
replay_handler: ReplayedEventReplayHandler,
Expand All @@ -46,28 +46,28 @@ def get_shipper_for_replay_event(
if event_input is None:
raise InputConfigException(f"Cannot load input for input id {event_input_id}")

output: Optional[Output] = event_input.get_output_by_type(output_type)
output: Optional[Output] = event_input.get_output_by_destination(output_destination)
if output is None:
raise OutputConfigException(f"Cannot load output of type {output_type}")
raise OutputConfigException(f"Cannot load output with destination {output_destination}")

# Let's wrap the specific output shipper in the composite one, since the composite deepcopy the mutating events
shipper: CompositeShipper = CompositeShipper()

if output_type == "elasticsearch":
if output.type == "elasticsearch":
assert isinstance(output, ElasticsearchOutput)
output.es_datastream_name = output_args["es_datastream_name"]
shared_logger.debug("setting ElasticSearch shipper")
elasticsearch: ProtocolShipper = ShipperFactory.create_from_output(output_type=output_type, output=output)
elasticsearch: ProtocolShipper = ShipperFactory.create_from_output(output_type=output.type, output=output)

shipper.add_shipper(elasticsearch)
shipper.set_replay_handler(replay_handler=replay_handler.replay_handler)

return shipper

if output_type == "logstash":
if output.type == "logstash":
assert isinstance(output, LogstashOutput)
shared_logger.debug("setting Logstash shipper")
logstash: ProtocolShipper = ShipperFactory.create_from_output(output_type=output_type, output=output)
logstash: ProtocolShipper = ShipperFactory.create_from_output(output_type=output.type, output=output)

shipper.add_shipper(logstash)
shipper.set_replay_handler(replay_handler=replay_handler.replay_handler)
Expand Down
28 changes: 18 additions & 10 deletions handlers/aws/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,27 +134,27 @@ def discover_integration_scope(s3_object_key: str) -> str:
return INTEGRATION_SCOPE_GENERIC


def get_shipper_from_input(event_input: Input, config_yaml: str) -> CompositeShipper:
def get_shipper_from_input(event_input: Input) -> CompositeShipper:
composite_shipper: CompositeShipper = CompositeShipper()

for output_type in event_input.get_output_types():
if output_type == "elasticsearch":
for output_destination in event_input.get_output_destinations():
output: Optional[Output] = event_input.get_output_by_destination(output_destination)
assert output is not None

if output.type == "elasticsearch":
shared_logger.debug("setting ElasticSearch shipper")
elasticsearch_output: Optional[Output] = event_input.get_output_by_type("elasticsearch")
assert elasticsearch_output is not None

elasticsearch_shipper: ProtocolShipper = ShipperFactory.create_from_output(
output_type="elasticsearch", output=elasticsearch_output
output_type="elasticsearch", output=output
)

composite_shipper.add_shipper(shipper=elasticsearch_shipper)

if output_type == "logstash":
if output.type == "logstash":
shared_logger.debug("setting Logstash shipper")
logstash_output: Optional[Output] = event_input.get_output_by_type("logstash")
assert logstash_output is not None

logstash_shipper: ProtocolShipper = ShipperFactory.create_from_output(
output_type="logstash", output=logstash_output
output_type="logstash", output=output
)

composite_shipper.add_shipper(shipper=logstash_shipper)
Expand Down Expand Up @@ -294,10 +294,18 @@ def get_trigger_type_and_config_source(event: dict[str, Any]) -> tuple[str, str]

event_source = ""
first_record = event["Records"][0]

shared_logger.info(f"FIRST RECORD IS {first_record}")

if "body" in first_record:
event_body = first_record["body"]
try:
body = json_parser(event_body)

print(f">>> EVENT IN JSON {body}")

# When could this happen... Put wrong password for cloud - also no

if (
isinstance(body, dict)
and "output_type" in event_body
Expand Down
51 changes: 34 additions & 17 deletions share/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ def __init__(
self.batch_max_bytes = batch_max_bytes
self.ssl_assert_fingerprint = ssl_assert_fingerprint

if not self.cloud_id and not self.elasticsearch_url:
raise ValueError("One between `elasticsearch_url` or `cloud_id` must be set")

if self.cloud_id and self.elasticsearch_url:
shared_logger.warning("both `elasticsearch_url` and `cloud_id` set in config: using `elasticsearch_url`")
self.cloud_id = ""
Expand Down Expand Up @@ -394,50 +391,70 @@ def include_exclude_filter(self, value: IncludeExcludeFilter) -> None:

self._include_exclude_filter = value

def get_output_by_type(self, output_type: str) -> Optional[Output]:
def get_output_by_destination(self, output_destination: str) -> Optional[Output]:
"""
Output getter.
Returns a specific output given its type
Returns a specific output given its destination
"""

return self._outputs[output_type] if output_type in self._outputs else None
return self._outputs[output_destination] if output_destination in self._outputs else None

def get_output_types(self) -> list[str]:
def get_output_destinations(self) -> list[str]:
"""
Output types getter.
Returns all the defined output types
Output destinations getter.
Returns all the defined output destinations
"""

return list(self._outputs.keys())

def delete_output_by_type(self, output_type: str) -> None:
# TODO
def delete_output_by_destination(self, output_destination: str) -> None:
"""
Output deleter.
Delete a defined output by its type
"""

del self._outputs[output_type]
del self._outputs[output_destination]

def add_output(self, output_type: str, **kwargs: Any) -> None:
"""
Output setter.
Set an output given its type and init kwargs
"""
if not isinstance(output_type, str):
raise ValueError("`type` must be provided as string")

if output_type in self._outputs:
raise ValueError(f"Duplicated `type` {output_type}")
if output_type not in _available_output_types:
raise ValueError(f"Type {output_type} is not included in the supported types: {_available_output_types}")

output_dest = ""
if output_type == "elasticsearch":
if "cloud_id" not in kwargs and "elasticsearch_url" not in kwargs:
raise ValueError("Either `elasticsearch_url` or `cloud_id` must be set")
# elasticsearch_url takes precedence over cloud_id
if "elasticsearch_url" not in kwargs:

output_dest = kwargs["cloud_id"]
else:
output_dest = kwargs["elasticsearch_url"]
elif output_type == "logstash":
if "logstash_url" not in kwargs:
raise ValueError(f"Output type {output_type} requires logstash_url to be set")
output_dest = kwargs["logstash_url"]

print("Outpus is ", self._outputs)
print(f"Output dest is {output_dest}")
if output_dest in self._outputs:
# Since logstash destination can only be set as logstash_url, we do not have to account
# for the same url/cloud_id for both types logstash or elasticsearch
raise ValueError(f"Duplicated output destination {output_dest} for type {output_type}")

output: Optional[Output] = None
if output_type == "elasticsearch":
output = ElasticsearchOutput(**kwargs)
elif output_type == "logstash":
output = LogstashOutput(**kwargs)
else:
output = Output(output_type=output_type)

self._outputs[output.type] = output
self._outputs[output_dest] = output

def get_multiline_processor(self) -> Optional[ProtocolMultiline]:
return self._multiline_processor
Expand Down
23 changes: 12 additions & 11 deletions tests/handlers/aws/test_replay_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import mock
import pytest

from handlers.aws import OutputConfigException
from handlers.aws.replay_trigger import ReplayedEventReplayHandler, get_shipper_for_replay_event
from share import parse_config
from shippers import CompositeShipper, ElasticsearchShipper, LogstashShipper
Expand All @@ -31,7 +32,7 @@ def test_get_shipper_for_replay_event(self) -> None:
replay_handler = ReplayedEventReplayHandler("arn:aws:sqs:eu-central-1:123456789:queue/replayqueue")
logstash_shipper: Optional[CompositeShipper] = get_shipper_for_replay_event(
config,
"logstash",
"logstash_url",
{},
"arn:aws:kinesis:eu-central-1:123456789:stream/test-esf-kinesis-stream",
replay_handler,
Expand All @@ -56,7 +57,7 @@ def test_get_shipper_for_replay_event(self) -> None:
replay_handler = ReplayedEventReplayHandler("arn:aws:sqs:eu-central-1:123456789:queue/replayqueue")
elasticsearch_shipper: Optional[CompositeShipper] = get_shipper_for_replay_event(
config,
"elasticsearch",
"elasticsearch_url",
{"es_datastream_name": "es_datastream_name"},
"arn:aws:kinesis:eu-central-1:123456789:stream/test-esf-kinesis-stream",
replay_handler,
Expand All @@ -65,7 +66,7 @@ def test_get_shipper_for_replay_event(self) -> None:
assert isinstance(elasticsearch_shipper, CompositeShipper)
assert isinstance(elasticsearch_shipper._shippers[0], ElasticsearchShipper)

with self.subTest("None shipper from replay event"):
with self.subTest("Exception from output destination"):
config_yaml_kinesis = """
inputs:
- type: kinesis-data-stream
Expand All @@ -77,11 +78,11 @@ def test_get_shipper_for_replay_event(self) -> None:
"""
config = parse_config(config_yaml_kinesis)
replay_handler = ReplayedEventReplayHandler("arn:aws:sqs:eu-central-1:123456789:queue/replayqueue")
none_shipper = get_shipper_for_replay_event(
config,
"output_type",
{},
"arn:aws:kinesis:eu-central-1:123456789:stream/test-esf-kinesis-stream",
replay_handler,
)
assert none_shipper is None
with self.assertRaisesRegex(OutputConfigException, "test"):
get_shipper_for_replay_event(
config,
"test",
{},
"arn:aws:kinesis:eu-central-1:123456789:stream/test-esf-kinesis-stream",
replay_handler,
)
Loading

0 comments on commit 233e802

Please sign in to comment.