Skip to content

Commit

Permalink
Place messages from misconfigured IDs in the replay queue (elastic#711)
Browse files Browse the repository at this point in the history
* Fix misconfigured id

Signed-off-by: constanca <[email protected]>

* Fix lint errors

Signed-off-by: constanca <[email protected]>

* Fix lint errors

Signed-off-by: constanca <[email protected]>

* Fix coverage tests

Signed-off-by: constanca <[email protected]>

* Update CHANGELOG.md

Signed-off-by: constanca <[email protected]>

* Fix mismatched region for queues in test_handler

Signed-off-by: constanca <[email protected]>

* Add architecture note

Signed-off-by: constanca <[email protected]>

* Add architecture note

Signed-off-by: constanca <[email protected]>

---------

Signed-off-by: constanca <[email protected]>
  • Loading branch information
constanca-m authored May 14, 2024
1 parent 30c41f4 commit 45e6c35
Show file tree
Hide file tree
Showing 13 changed files with 582 additions and 128 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### v1.14.0 - 2024/05/07
##### Bug fixes
* Report misconfigured input ids as an error instead of warning, and place those messages in the replaying queue [#711](https://github.com/elastic/elastic-serverless-forwarder/pull/711).

### v1.13.1 - 2024/03/07
##### Features
* Add documentation and optimise performance for `root_fields_to_add_to_expanded_event` [#642](https://github.com/elastic/elastic-serverless-forwarder/pull/642)
Expand Down
47 changes: 29 additions & 18 deletions handlers/aws/cloudwatch_logs_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ def _from_awslogs_data_to_event(awslogs_data: str) -> Any:
return json_parser(cloudwatch_logs_payload_plain)


def _handle_cloudwatch_logs_continuation(
def _handle_cloudwatch_logs_move(
sqs_client: BotoBaseClient,
sqs_continuing_queue: str,
last_ending_offset: Optional[int],
last_event_expanded_offset: Optional[int],
sqs_destination_queue: str,
cloudwatch_logs_event: dict[str, Any],
current_log_event: int,
event_input_id: str,
input_id: str,
config_yaml: str,
continuing_queue: bool = True,
current_log_event: int = 0,
last_ending_offset: Optional[int] = None,
last_event_expanded_offset: Optional[int] = None,
) -> None:
"""
Handler of the continuation queue for cloudwatch logs inputs
Expand All @@ -51,7 +52,7 @@ def _handle_cloudwatch_logs_continuation(
message_attributes = {
"config": {"StringValue": config_yaml, "DataType": "String"},
"originalEventId": {"StringValue": log_event["id"], "DataType": "String"},
"originalEventSourceARN": {"StringValue": event_input_id, "DataType": "String"},
"originalEventSourceARN": {"StringValue": input_id, "DataType": "String"},
"originalLogGroup": {"StringValue": log_group_name, "DataType": "String"},
"originalLogStream": {"StringValue": log_stream_name, "DataType": "String"},
"originalEventTimestamp": {"StringValue": str(log_event["timestamp"]), "DataType": "Number"},
Expand All @@ -70,21 +71,31 @@ def _handle_cloudwatch_logs_continuation(
}

sqs_client.send_message(
QueueUrl=sqs_continuing_queue,
QueueUrl=sqs_destination_queue,
MessageBody=log_event["message"],
MessageAttributes=message_attributes,
)

shared_logger.debug(
"continuing",
extra={
"sqs_continuing_queue": sqs_continuing_queue,
"last_ending_offset": last_ending_offset,
"last_event_expanded_offset": last_event_expanded_offset,
"event_id": log_event["id"],
"event_timestamp": log_event["timestamp"],
},
)
if continuing_queue:
shared_logger.debug(
"continuing",
extra={
"sqs_continuing_queue": sqs_destination_queue,
"last_ending_offset": last_ending_offset,
"last_event_expanded_offset": last_event_expanded_offset,
"event_id": log_event["id"],
"event_timestamp": log_event["timestamp"],
},
)
else:
shared_logger.debug(
"replaying",
extra={
"sqs_replaying_queue": sqs_destination_queue,
"event_id": log_event["id"],
"event_timestamp": log_event["timestamp"],
},
)


def _handle_cloudwatch_logs_event(
Expand Down
133 changes: 103 additions & 30 deletions handlers/aws/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@

from .cloudwatch_logs_trigger import (
_from_awslogs_data_to_event,
_handle_cloudwatch_logs_continuation,
_handle_cloudwatch_logs_event,
_handle_cloudwatch_logs_move,
)
from .kinesis_trigger import _handle_kinesis_continuation, _handle_kinesis_record
from .kinesis_trigger import _handle_kinesis_move, _handle_kinesis_record
from .replay_trigger import ReplayedEventReplayHandler, get_shipper_for_replay_event
from .s3_sqs_trigger import _handle_s3_sqs_continuation, _handle_s3_sqs_event
from .sqs_trigger import _handle_sqs_continuation, _handle_sqs_event
from .s3_sqs_trigger import _handle_s3_sqs_event, _handle_s3_sqs_move
from .sqs_trigger import _handle_sqs_event, handle_sqs_move
from .utils import (
CONFIG_FROM_PAYLOAD,
INTEGRATION_SCOPE_GENERIC,
Expand Down Expand Up @@ -130,6 +130,10 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
sent_events: int = 0
empty_events: int = 0
skipped_events: int = 0
error_events: int = 0

sqs_replaying_queue = os.environ["SQS_REPLAY_URL"]
sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"]

if trigger_type == "cloudwatch-logs":
cloudwatch_logs_event = _from_awslogs_data_to_event(lambda_event["awslogs"]["data"])
Expand All @@ -144,8 +148,25 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
)

if event_input is None:
shared_logger.warning("no input defined", extra={"input_type": trigger_type, "input_id": input_id})

shared_logger.error("no input defined", extra={"input_id": input_id})
error_events += 1
_handle_cloudwatch_logs_move(
sqs_client=sqs_client,
sqs_destination_queue=sqs_replaying_queue,
cloudwatch_logs_event=cloudwatch_logs_event,
input_id=input_id,
config_yaml=config_yaml,
continuing_queue=False,
)
shared_logger.info(
"lambda is going to shutdown",
extra={
"error_events": error_events,
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
},
)
return "completed"

aws_region = input_id.split(":")[3]
Expand Down Expand Up @@ -180,8 +201,6 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
empty_events += 1

if lambda_context is not None and lambda_context.get_remaining_time_in_millis() < _completion_grace_period:
sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"]

shared_logger.info(
"lambda is going to shutdown, continuing on dedicated sqs queue",
extra={
Expand All @@ -194,14 +213,14 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex

composite_shipper.flush()

_handle_cloudwatch_logs_continuation(
_handle_cloudwatch_logs_move(
sqs_client=sqs_client,
sqs_continuing_queue=sqs_continuing_queue,
sqs_destination_queue=sqs_continuing_queue,
last_ending_offset=last_ending_offset,
last_event_expanded_offset=last_event_expanded_offset,
cloudwatch_logs_event=cloudwatch_logs_event,
current_log_event=current_log_event_n,
event_input_id=input_id,
input_id=input_id,
config_yaml=config_yaml,
)

Expand All @@ -210,17 +229,43 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
composite_shipper.flush()
shared_logger.info(
"lambda processed all the events",
extra={"sent_event": sent_events, "empty_events": empty_events, "skipped_events": skipped_events},
extra={
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
"error_events": error_events,
},
)

if trigger_type == "kinesis-data-stream":
shared_logger.info("trigger", extra={"size": len(lambda_event["Records"])})

input_id = lambda_event["Records"][0]["eventSourceARN"]
event_input = config.get_input_by_id(input_id)

if event_input is None:
shared_logger.warning("no input defined", extra={"input_id": input_id})
shared_logger.error("no input defined", extra={"input_id": input_id})
error_events += len(lambda_event["Records"])

for kinesis_record in lambda_event["Records"]:
_handle_kinesis_move(
sqs_client=sqs_client,
sqs_destination_queue=sqs_replaying_queue,
kinesis_record=kinesis_record,
event_input_id=input_id,
config_yaml=config_yaml,
continuing_queue=False,
)

shared_logger.info(
"lambda is going to shutdown",
extra={
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
"error_events": error_events,
},
)
return "completed"

composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml)
Expand Down Expand Up @@ -253,15 +298,14 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
empty_events += 1

if lambda_context is not None and lambda_context.get_remaining_time_in_millis() < _completion_grace_period:
sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"]

shared_logger.info(
"lambda is going to shutdown, continuing on dedicated sqs queue",
extra={
"sqs_queue": sqs_continuing_queue,
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
"error_events": error_events,
},
)

Expand All @@ -275,9 +319,9 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
continuing_last_ending_offset = None
continuing_last_event_expanded_offset = None

_handle_kinesis_continuation(
_handle_kinesis_move(
sqs_client=sqs_client,
sqs_continuing_queue=sqs_continuing_queue,
sqs_destination_queue=sqs_continuing_queue,
last_ending_offset=continuing_last_ending_offset,
last_event_expanded_offset=continuing_last_event_expanded_offset,
kinesis_record=kinesis_record,
Expand All @@ -290,7 +334,12 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
composite_shipper.flush()
shared_logger.info(
"lambda processed all the events",
extra={"sent_event": sent_events, "empty_events": empty_events, "skipped_events": skipped_events},
extra={
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
"error_events": error_events,
},
)

if trigger_type == "s3-sqs" or trigger_type == "sqs":
Expand Down Expand Up @@ -318,12 +367,10 @@ def handle_timeout(
timeout_config_yaml: str,
timeout_current_s3_record: int = 0,
) -> None:
timeout_sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"]

shared_logger.info(
"lambda is going to shutdown, continuing on dedicated sqs queue",
extra={
"sqs_queue": timeout_sqs_continuing_queue,
"sqs_queue": sqs_continuing_queue,
"sent_events": timeout_sent_events,
"empty_events": timeout_empty_events,
"skipped_events": timeout_skipped_events,
Expand All @@ -349,24 +396,24 @@ def handle_timeout(
continue

if timeout_input.type == "s3-sqs":
_handle_s3_sqs_continuation(
_handle_s3_sqs_move(
sqs_client=sqs_client,
sqs_continuing_queue=timeout_sqs_continuing_queue,
sqs_destination_queue=sqs_continuing_queue,
last_ending_offset=timeout_last_ending_offset,
last_event_expanded_offset=timeout_last_event_expanded_offset,
sqs_record=timeout_sqs_record,
current_s3_record=timeout_current_s3_record,
event_input_id=timeout_input_id,
input_id=timeout_input_id,
config_yaml=timeout_config_yaml,
)
else:
_handle_sqs_continuation(
handle_sqs_move(
sqs_client=sqs_client,
sqs_continuing_queue=timeout_sqs_continuing_queue,
sqs_destination_queue=sqs_continuing_queue,
last_ending_offset=timeout_last_ending_offset,
last_event_expanded_offset=timeout_last_event_expanded_offset,
sqs_record=timeout_sqs_record,
event_input_id=timeout_input_id,
input_id=timeout_input_id,
config_yaml=timeout_config_yaml,
)

Expand All @@ -382,15 +429,36 @@ def handle_timeout(
input_id = sqs_record["messageAttributes"]["originalEventSourceARN"]["stringValue"]

event_input = config.get_input_by_id(input_id)

if event_input is None:
shared_logger.warning("no input defined", extra={"input_id": input_id})
# This could happen if aws_lambda_event_source_mapping is set correctly, but
# the id on the config.yaml was writen incorrectly.
shared_logger.error("no input defined", extra={"input_id": input_id})
if trigger_type == "s3-sqs":
_handle_s3_sqs_move(
sqs_client=sqs_client,
sqs_destination_queue=sqs_replaying_queue,
sqs_record=sqs_record,
input_id=input_id,
config_yaml=config_yaml,
continuing_queue=False,
)
elif trigger_type == "sqs":
handle_sqs_move(
sqs_client=sqs_client,
sqs_destination_queue=sqs_replaying_queue,
sqs_record=sqs_record,
input_id=input_id,
config_yaml=config_yaml,
continuing_queue=False,
)
error_events += 1
continue

if input_id in composite_shipper_cache:
composite_shipper = composite_shipper_cache[input_id]
else:
composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml)

composite_shipper_cache[event_input.id] = composite_shipper

continuing_event_expanded_offset: Optional[int] = None
Expand Down Expand Up @@ -493,7 +561,12 @@ def handle_timeout(

shared_logger.info(
"lambda processed all the events",
extra={"sent_events": sent_events, "empty_events": empty_events, "skipped_events": skipped_events},
extra={
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
"error_events": error_events,
},
)

return "completed"
Loading

0 comments on commit 45e6c35

Please sign in to comment.