Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Place messages from misconfigured IDs in the replay queue #711

Merged
merged 8 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### v1.14.0 - 2024/05/07
##### Bug fixes
* Report misconfigured input ids as an error instead of warning, and place those messages in the replaying queue [#711](https://github.com/elastic/elastic-serverless-forwarder/pull/711).

### v1.13.1 - 2024/03/07
##### Features
* Add documentation and optimise performance for `root_fields_to_add_to_expanded_event` [#642](https://github.com/elastic/elastic-serverless-forwarder/pull/642)
Expand Down
47 changes: 29 additions & 18 deletions handlers/aws/cloudwatch_logs_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ def _from_awslogs_data_to_event(awslogs_data: str) -> Any:
return json_parser(cloudwatch_logs_payload_plain)


def _handle_cloudwatch_logs_continuation(
def _handle_cloudwatch_logs_move(
sqs_client: BotoBaseClient,
sqs_continuing_queue: str,
last_ending_offset: Optional[int],
last_event_expanded_offset: Optional[int],
sqs_destination_queue: str,
cloudwatch_logs_event: dict[str, Any],
current_log_event: int,
event_input_id: str,
input_id: str,
config_yaml: str,
continuing_queue: bool = True,
current_log_event: int = 0,
last_ending_offset: Optional[int] = None,
last_event_expanded_offset: Optional[int] = None,
) -> None:
"""
Handler of the continuation queue for cloudwatch logs inputs
Expand All @@ -51,7 +52,7 @@ def _handle_cloudwatch_logs_continuation(
message_attributes = {
"config": {"StringValue": config_yaml, "DataType": "String"},
"originalEventId": {"StringValue": log_event["id"], "DataType": "String"},
"originalEventSourceARN": {"StringValue": event_input_id, "DataType": "String"},
"originalEventSourceARN": {"StringValue": input_id, "DataType": "String"},
"originalLogGroup": {"StringValue": log_group_name, "DataType": "String"},
"originalLogStream": {"StringValue": log_stream_name, "DataType": "String"},
"originalEventTimestamp": {"StringValue": str(log_event["timestamp"]), "DataType": "Number"},
Expand All @@ -70,21 +71,31 @@ def _handle_cloudwatch_logs_continuation(
}

sqs_client.send_message(
QueueUrl=sqs_continuing_queue,
QueueUrl=sqs_destination_queue,
MessageBody=log_event["message"],
MessageAttributes=message_attributes,
)

shared_logger.debug(
"continuing",
extra={
"sqs_continuing_queue": sqs_continuing_queue,
"last_ending_offset": last_ending_offset,
"last_event_expanded_offset": last_event_expanded_offset,
"event_id": log_event["id"],
"event_timestamp": log_event["timestamp"],
},
)
if continuing_queue:
shared_logger.debug(
"continuing",
extra={
"sqs_continuing_queue": sqs_destination_queue,
"last_ending_offset": last_ending_offset,
"last_event_expanded_offset": last_event_expanded_offset,
"event_id": log_event["id"],
"event_timestamp": log_event["timestamp"],
},
)
else:
shared_logger.debug(
"replaying",
extra={
"sqs_replaying_queue": sqs_destination_queue,
"event_id": log_event["id"],
"event_timestamp": log_event["timestamp"],
},
)


def _handle_cloudwatch_logs_event(
Expand Down
133 changes: 103 additions & 30 deletions handlers/aws/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@

from .cloudwatch_logs_trigger import (
_from_awslogs_data_to_event,
_handle_cloudwatch_logs_continuation,
_handle_cloudwatch_logs_event,
_handle_cloudwatch_logs_move,
)
from .kinesis_trigger import _handle_kinesis_continuation, _handle_kinesis_record
from .kinesis_trigger import _handle_kinesis_move, _handle_kinesis_record
from .replay_trigger import ReplayedEventReplayHandler, get_shipper_for_replay_event
from .s3_sqs_trigger import _handle_s3_sqs_continuation, _handle_s3_sqs_event
from .sqs_trigger import _handle_sqs_continuation, _handle_sqs_event
from .s3_sqs_trigger import _handle_s3_sqs_event, _handle_s3_sqs_move
from .sqs_trigger import _handle_sqs_event, handle_sqs_move
from .utils import (
CONFIG_FROM_PAYLOAD,
INTEGRATION_SCOPE_GENERIC,
Expand Down Expand Up @@ -130,6 +130,10 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
sent_events: int = 0
empty_events: int = 0
skipped_events: int = 0
error_events: int = 0

sqs_replaying_queue = os.environ["SQS_REPLAY_URL"]
sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"]

if trigger_type == "cloudwatch-logs":
cloudwatch_logs_event = _from_awslogs_data_to_event(lambda_event["awslogs"]["data"])
Expand All @@ -144,8 +148,25 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
)

if event_input is None:
shared_logger.warning("no input defined", extra={"input_type": trigger_type, "input_id": input_id})

shared_logger.error("no input defined", extra={"input_id": input_id})
error_events += 1
_handle_cloudwatch_logs_move(
sqs_client=sqs_client,
sqs_destination_queue=sqs_replaying_queue,
cloudwatch_logs_event=cloudwatch_logs_event,
input_id=input_id,
config_yaml=config_yaml,
continuing_queue=False,
)
shared_logger.info(
"lambda is going to shutdown",
extra={
"error_events": error_events,
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
},
)
return "completed"

aws_region = input_id.split(":")[3]
Expand Down Expand Up @@ -180,8 +201,6 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
empty_events += 1

if lambda_context is not None and lambda_context.get_remaining_time_in_millis() < _completion_grace_period:
sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"]

shared_logger.info(
"lambda is going to shutdown, continuing on dedicated sqs queue",
extra={
Expand All @@ -194,14 +213,14 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex

composite_shipper.flush()

_handle_cloudwatch_logs_continuation(
_handle_cloudwatch_logs_move(
sqs_client=sqs_client,
sqs_continuing_queue=sqs_continuing_queue,
sqs_destination_queue=sqs_continuing_queue,
last_ending_offset=last_ending_offset,
last_event_expanded_offset=last_event_expanded_offset,
cloudwatch_logs_event=cloudwatch_logs_event,
current_log_event=current_log_event_n,
event_input_id=input_id,
input_id=input_id,
config_yaml=config_yaml,
)

Expand All @@ -210,17 +229,43 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
composite_shipper.flush()
shared_logger.info(
"lambda processed all the events",
extra={"sent_event": sent_events, "empty_events": empty_events, "skipped_events": skipped_events},
extra={
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
"error_events": error_events,
},
)

if trigger_type == "kinesis-data-stream":
shared_logger.info("trigger", extra={"size": len(lambda_event["Records"])})

input_id = lambda_event["Records"][0]["eventSourceARN"]
event_input = config.get_input_by_id(input_id)

if event_input is None:
shared_logger.warning("no input defined", extra={"input_id": input_id})
shared_logger.error("no input defined", extra={"input_id": input_id})
error_events += len(lambda_event["Records"])

for kinesis_record in lambda_event["Records"]:
_handle_kinesis_move(
sqs_client=sqs_client,
sqs_destination_queue=sqs_replaying_queue,
kinesis_record=kinesis_record,
event_input_id=input_id,
config_yaml=config_yaml,
continuing_queue=False,
)

shared_logger.info(
"lambda is going to shutdown",
extra={
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
"error_events": error_events,
},
)
return "completed"

composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml)
Expand Down Expand Up @@ -253,15 +298,14 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
empty_events += 1

if lambda_context is not None and lambda_context.get_remaining_time_in_millis() < _completion_grace_period:
sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"]

shared_logger.info(
"lambda is going to shutdown, continuing on dedicated sqs queue",
extra={
"sqs_queue": sqs_continuing_queue,
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
"error_events": error_events,
},
)

Expand All @@ -275,9 +319,9 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
continuing_last_ending_offset = None
continuing_last_event_expanded_offset = None

_handle_kinesis_continuation(
_handle_kinesis_move(
sqs_client=sqs_client,
sqs_continuing_queue=sqs_continuing_queue,
sqs_destination_queue=sqs_continuing_queue,
last_ending_offset=continuing_last_ending_offset,
last_event_expanded_offset=continuing_last_event_expanded_offset,
kinesis_record=kinesis_record,
Expand All @@ -290,7 +334,12 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
composite_shipper.flush()
shared_logger.info(
"lambda processed all the events",
extra={"sent_event": sent_events, "empty_events": empty_events, "skipped_events": skipped_events},
extra={
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
"error_events": error_events,
},
)

if trigger_type == "s3-sqs" or trigger_type == "sqs":
Expand Down Expand Up @@ -318,12 +367,10 @@ def handle_timeout(
timeout_config_yaml: str,
timeout_current_s3_record: int = 0,
) -> None:
timeout_sqs_continuing_queue = os.environ["SQS_CONTINUE_URL"]

shared_logger.info(
"lambda is going to shutdown, continuing on dedicated sqs queue",
extra={
"sqs_queue": timeout_sqs_continuing_queue,
"sqs_queue": sqs_continuing_queue,
"sent_events": timeout_sent_events,
"empty_events": timeout_empty_events,
"skipped_events": timeout_skipped_events,
Expand All @@ -349,24 +396,24 @@ def handle_timeout(
continue

if timeout_input.type == "s3-sqs":
_handle_s3_sqs_continuation(
_handle_s3_sqs_move(
sqs_client=sqs_client,
sqs_continuing_queue=timeout_sqs_continuing_queue,
sqs_destination_queue=sqs_continuing_queue,
last_ending_offset=timeout_last_ending_offset,
last_event_expanded_offset=timeout_last_event_expanded_offset,
sqs_record=timeout_sqs_record,
current_s3_record=timeout_current_s3_record,
event_input_id=timeout_input_id,
input_id=timeout_input_id,
config_yaml=timeout_config_yaml,
)
else:
_handle_sqs_continuation(
handle_sqs_move(
sqs_client=sqs_client,
sqs_continuing_queue=timeout_sqs_continuing_queue,
sqs_destination_queue=sqs_continuing_queue,
last_ending_offset=timeout_last_ending_offset,
last_event_expanded_offset=timeout_last_event_expanded_offset,
sqs_record=timeout_sqs_record,
event_input_id=timeout_input_id,
input_id=timeout_input_id,
config_yaml=timeout_config_yaml,
)

Expand All @@ -382,15 +429,36 @@ def handle_timeout(
input_id = sqs_record["messageAttributes"]["originalEventSourceARN"]["stringValue"]

event_input = config.get_input_by_id(input_id)

if event_input is None:
shared_logger.warning("no input defined", extra={"input_id": input_id})
# This could happen if aws_lambda_event_source_mapping is set correctly, but
# the id on the config.yaml was writen incorrectly.
shared_logger.error("no input defined", extra={"input_id": input_id})
if trigger_type == "s3-sqs":
_handle_s3_sqs_move(
sqs_client=sqs_client,
sqs_destination_queue=sqs_replaying_queue,
sqs_record=sqs_record,
input_id=input_id,
config_yaml=config_yaml,
continuing_queue=False,
)
elif trigger_type == "sqs":
handle_sqs_move(
sqs_client=sqs_client,
sqs_destination_queue=sqs_replaying_queue,
sqs_record=sqs_record,
input_id=input_id,
config_yaml=config_yaml,
continuing_queue=False,
)
error_events += 1
continue

if input_id in composite_shipper_cache:
composite_shipper = composite_shipper_cache[input_id]
else:
composite_shipper = get_shipper_from_input(event_input=event_input, config_yaml=config_yaml)

composite_shipper_cache[event_input.id] = composite_shipper

continuing_event_expanded_offset: Optional[int] = None
Expand Down Expand Up @@ -493,7 +561,12 @@ def handle_timeout(

shared_logger.info(
"lambda processed all the events",
extra={"sent_events": sent_events, "empty_events": empty_events, "skipped_events": skipped_events},
extra={
"sent_events": sent_events,
"empty_events": empty_events,
"skipped_events": skipped_events,
"error_events": error_events,
},
)

return "completed"
Loading
Loading