diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d6caa11..d0681721 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ##### Features * Add user agent with information about ESF version and host environment: [#537](https://github.com/elastic/elastic-serverless-forwarder/pull/537) * Remove calls to `sqs.DeleteMessage` and refactor storage decorators: [#544](https://github.com/elastic/elastic-serverless-forwarder/pull/544) +##### Bug fixes +* Fix regression when both `json_content_type: single` and `expand_event_list_from_field` are set: [#553](https://github.com/elastic/elastic-serverless-forwarder/pull/553) ### v1.10.0 - 2023/10/27 ##### Features diff --git a/handlers/aws/utils.py b/handlers/aws/utils.py index ccef0792..4f0cf9b7 100644 --- a/handlers/aws/utils.py +++ b/handlers/aws/utils.py @@ -313,8 +313,11 @@ def get_trigger_type_and_config_source(event: dict[str, Any]) -> tuple[str, str] and "eventSource" in body["Records"][0] ): event_source = body["Records"][0]["eventSource"] + if event_source not in _available_triggers: + raise Exception("except in the function") else: - raise Exception + raise Exception("except in the function") + except Exception: if "eventSource" not in first_record: raise Exception("Not supported trigger") diff --git a/requirements-tests.txt b/requirements-tests.txt index 49170d6c..7ac0e3de 100644 --- a/requirements-tests.txt +++ b/requirements-tests.txt @@ -4,7 +4,7 @@ pytest-cov==4.1.0 pytest-benchmark==4.0.0 coverage==7.3.2 simplejson==3.19.2 -orjson==3.9.10 +ujson==5.8.0 pysimdjson==5.0.2 python-rapidjson==1.13 cysimdjson==23.8 diff --git a/requirements.txt b/requirements.txt index 80ff79ff..62c33677 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,7 @@ ecs_logging==2.1.0 elasticsearch==7.17.9 PyYAML==6.0.1 aws_lambda_typing==2.18.0 -ujson==5.8.0 +orjson==3.9.10 requests==2.31.0 urllib3==1.26.18 typing-extensions==4.8.0 diff --git a/share/json.py b/share/json.py index 73f90140..49eb06db 100644 --- a/share/json.py +++ b/share/json.py @@ -4,12 +4,15 @@ from typing import Any, AnyStr -import ujson +import orjson def json_dumper(json_object: Any) -> str: - return ujson.dumps(json_object, ensure_ascii=False, reject_bytes=False) + if isinstance(json_object, bytes): + json_object = json_object.decode("utf-8") + + return orjson.dumps(json_object).decode("utf-8") def json_parser(payload: AnyStr) -> Any: - return ujson.loads(payload) + return orjson.loads(payload) diff --git a/share/secretsmanager.py b/share/secretsmanager.py index feb9a971..b8fcedf2 100644 --- a/share/secretsmanager.py +++ b/share/secretsmanager.py @@ -7,7 +7,7 @@ import boto3 from botocore.client import BaseClient as BotoBaseClient -from ujson import JSONDecodeError +from orjson import JSONDecodeError from .json import json_parser from .logger import logger as shared_logger @@ -27,7 +27,7 @@ def aws_sm_expander(config_yaml: str) -> str: Secrets Manager expander for config file It scans the file for the secrets manager arn pattern, checks for correct configuration, retrieves the values from the secret manager and replaces them in the config file. - Exceptions will be risen for the following scenarios: + Exceptions will be raised for the following scenarios: - Not respecting the arn pattern - Input is for both plain text and json keys for the same secret manager name - The fetched value is empty @@ -153,8 +153,6 @@ def parse_secrets_str(secrets: str, secret_arn: str) -> Union[str, dict[str, Any except JSONDecodeError: shared_logger.debug("parsed secrets as plaintext") return secrets - except Exception as e: - raise Exception(f"{e} while parsing {secret_arn}") else: shared_logger.debug("parsed secrets as json") return parsed_secrets diff --git a/storage/decorator.py b/storage/decorator.py index 831aa174..4ee845af 100644 --- a/storage/decorator.py +++ b/storage/decorator.py @@ -236,6 +236,22 @@ def wrapper( yield line, _, _, newline, None + def _collect_single(iterator: StorageDecoratorIterator) -> StorageDecoratorIterator: + # we get the original iterator, we collect everything in a list that we merge later and extract values from + single: list[tuple[Union[StorageReader, bytes], int, int, bytes]] = list( + [ + (data, starting_offset, ending_offset, newline) + for data, starting_offset, ending_offset, newline, _ in iterator + ] + ) + + newline = single[0][-1] + starting_offset = single[0][1] + ending_offset = single[-1][2] + + data_to_yield: bytes = newline.join([x[0] for x in single]) + yield data_to_yield, starting_offset, ending_offset, newline, None + def wrapper( storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool ) -> StorageDecoratorIterator: @@ -255,47 +271,44 @@ def wrapper( json_collector_state.ending_offset = range_start iterator = func(storage, range_start, body, is_gzipped) - # if we know it's a single json we replace body with the whole content, - # and mark the object as started - if storage.json_content_type == "single" and event_list_from_field_expander is None: - single: list[tuple[Union[StorageReader, bytes], int, int, bytes]] = list( - [ - (data, starting_offset, ending_offset, newline) - for data, starting_offset, ending_offset, newline, _ in iterator - ] - ) - - newline = single[0][-1] - starting_offset = single[0][1] - ending_offset = single[-1][2] - - data_to_yield: bytes = newline.join([x[0] for x in single]) - yield data_to_yield, starting_offset, ending_offset, newline, None - else: - for data, starting_offset, ending_offset, newline, _ in iterator: - assert isinstance(data, bytes) + # if we know it's a single json we wrap the iterator with _collect_single + # and mark the object as json and started + if storage.json_content_type == "single": + iterator = _collect_single(iterator=iterator) + json_collector_state.is_a_json_object = True + json_collector_state.has_an_object_start = True + + for data, starting_offset, ending_offset, newline, _ in iterator: + assert isinstance(data, bytes) - # if it's not a json object we can just forward the content by lines + # let's wait for the start of a json object + if not json_collector_state.has_an_object_start: + # if range_start is greater than zero, or we have leading space, data can be empty + stripped_data = data.decode("utf-8").lstrip() + if len(stripped_data) > 0 and stripped_data[0] == "{": + # we mark the potentiality of a json object start + # CAVEAT: if the log entry starts with `{` but the + # content is not json, we buffer the first 10k lines + # before the circuit breaker kicks in + json_collector_state.has_an_object_start = True + + # if it has not a json object start we can just forward the content by lines if not json_collector_state.has_an_object_start: - # if range_start is greater than zero, or we have leading space, data can be empty - stripped_data = data.decode("utf-8").lstrip() - if len(stripped_data) > 0 and stripped_data[0] == "{": - # we mark the potentiality of a json object start - # CAVEAT: if the log entry starts with `{` but the - # content is not json, we buffer the first 10k lines - # before the circuit breaker kicks in - json_collector_state.has_an_object_start = True - - if not json_collector_state.has_an_object_start: - _handle_offset(len(data) + len(newline), json_collector_state) - yield data, starting_offset, ending_offset, newline, None - - if json_collector_state.has_an_object_start: - # let's parse the data only if it is not single, or we have a field expander + _handle_offset(len(data) + len(newline), json_collector_state) + yield data, starting_offset, ending_offset, newline, None + + # it has a json object start, let's apply our logic + if json_collector_state.has_an_object_start: + # it is a single json and we have not a field expander, let's yield the content + if event_list_from_field_expander is None and storage.json_content_type == "single": + yield data, starting_offset, ending_offset, newline, None + else: + # it is not single, or we have a field expander. let's try to collect the data as json for data_to_yield, json_object in _collector(data, newline, json_collector_state): shared_logger.debug( "json_collector objects", extra={"offset": json_collector_state.ending_offset} ) + # we have a field expander, let's yield the expansion if event_list_from_field_expander is not None: for ( expanded_log_event, @@ -316,6 +329,7 @@ def wrapper( expanded_event_n, ) else: + # we do not have a field expander, let's yield the expansion yield ( data_to_yield, json_collector_state.starting_offset, @@ -326,17 +340,17 @@ def wrapper( del json_object - # check if we hit the circuit broken - if json_collector_state.is_a_json_object_circuit_broken: - # let's yield what we have so far - for line, _, _, original_newline, _ in _by_lines_fallback(json_collector_state): - yield ( - line, - json_collector_state.starting_offset, - json_collector_state.ending_offset, - original_newline, - None, - ) + # check if we hit the circuit broken + if json_collector_state.is_a_json_object_circuit_broken: + # let's yield what we have so far + for line, _, _, original_newline, _ in _by_lines_fallback(json_collector_state): + yield ( + line, + json_collector_state.starting_offset, + json_collector_state.ending_offset, + original_newline, + None, + ) # in this case we could have a trailing new line in what's left in the buffer # or the content had a leading `{` but was not a json object before the circuit breaker intercepted it, diff --git a/tests/handlers/aws/test_handler.py b/tests/handlers/aws/test_handler.py index 9f7ab837..6858d0bd 100644 --- a/tests/handlers/aws/test_handler.py +++ b/tests/handlers/aws/test_handler.py @@ -959,12 +959,13 @@ def test_lambda_handler_failure(self) -> None: handler(event, ctx) # type:ignore - with self.subTest("invalid secretsmanager: json TypeError risen"): + with self.subTest("invalid secretsmanager: json TypeError raised"): os.environ["S3_CONFIG_FILE"] = "s3://s3_config_file_bucket/s3_config_file_object_key" with self.assertRaisesRegex( ConfigFileException, - "Expected string or C-contiguous bytes-like object while parsing " - "arn:aws:secretsmanager:eu-central-1:123456789:secret:plain_secret_not_str_int", + "Error for secret " + "arn:aws:secretsmanager:eu-central-1:123456789:secret:plain_secret_not_str_int: " + "expected to be a string", ): ctx = ContextMock() _s3_client_mock.config_content = b""" diff --git a/tests/handlers/aws/test_utils.py b/tests/handlers/aws/test_utils.py index 322aae97..16e02ea5 100644 --- a/tests/handlers/aws/test_utils.py +++ b/tests/handlers/aws/test_utils.py @@ -66,6 +66,70 @@ def _get_random_digit_string_of_size(size: int) -> str: return "".join(random.choices(string.digits, k=size)) +@pytest.mark.unit +class TestGetTriggerTypeAndConfigSource(TestCase): + def test_get_trigger_type_and_config_source(self) -> None: + from handlers.aws.utils import CONFIG_FROM_PAYLOAD, CONFIG_FROM_S3FILE, get_trigger_type_and_config_source + + with self.subTest("cloudwatch-logs and CONFIG_FROM_S3FILE"): + event: dict[str, Any] = {"awslogs": {"data": ""}} + + assert get_trigger_type_and_config_source(event=event) == ("cloudwatch-logs", CONFIG_FROM_S3FILE) + + with self.subTest("no Records"): + with self.assertRaisesRegexp(Exception, "Not supported trigger"): + event = {} + + get_trigger_type_and_config_source(event=event) + + with self.subTest("len(Records) < 1"): + with self.assertRaisesRegexp(Exception, "Not supported trigger"): + event = {"Records": []} + + get_trigger_type_and_config_source(event=event) + + with self.subTest("body in first record: replay-sqs CONFIG_FROM_S3FILE"): + event = { + "Records": [ + { + "body": '{"output_type": "output_type", ' + '"output_args": "output_args", "event_payload": "event_payload"}' + } + ] + } + + assert get_trigger_type_and_config_source(event=event) == ("replay-sqs", CONFIG_FROM_S3FILE) + + with self.subTest("body in first record: eventSource override"): + event = {"Records": [{"body": '{"Records": [{"eventSource":"aws:s3"}]}', "eventSource": "aws:kinesis"}]} + + assert get_trigger_type_and_config_source(event=event) == ("s3-sqs", CONFIG_FROM_S3FILE) + + with self.subTest("body in first record: eventSource not override"): + event = { + "Records": [ + {"body": '{"Records": [{"eventSource":"not-available-trigger"}]}', "eventSource": "aws:kinesis"} + ] + } + + assert get_trigger_type_and_config_source(event=event) == ("kinesis-data-stream", CONFIG_FROM_S3FILE) + + with self.subTest("body not in first record: eventSource not override"): + event = {"Records": [{"eventSource": "aws:kinesis"}]} + + assert get_trigger_type_and_config_source(event=event) == ("kinesis-data-stream", CONFIG_FROM_S3FILE) + + with self.subTest("messageAttributes without originalEventSourceARN in first record, CONFIG_FROM_S3FILE"): + event = {"Records": [{"messageAttributes": {}, "eventSource": "aws:kinesis"}]} + + assert get_trigger_type_and_config_source(event=event) == ("kinesis-data-stream", CONFIG_FROM_S3FILE) + + with self.subTest("messageAttributes with originalEventSourceARN in first record, CONFIG_FROM_PAYLOAD"): + event = {"Records": [{"messageAttributes": {"originalEventSourceARN": ""}, "eventSource": "aws:kinesis"}]} + + assert get_trigger_type_and_config_source(event=event) == ("kinesis-data-stream", CONFIG_FROM_PAYLOAD) + + @pytest.mark.unit class TestDiscoverIntegrationScope(TestCase): def test_discover_integration_scope(self) -> None: diff --git a/tests/share/test_secretsmanager.py b/tests/share/test_secretsmanager.py index 9614c0f2..195ecc77 100644 --- a/tests/share/test_secretsmanager.py +++ b/tests/share/test_secretsmanager.py @@ -323,8 +323,9 @@ def test_parse_secrets_manager(self) -> None: with self.assertRaisesRegex( Exception, - "Expected string or C-contiguous bytes-like object while parsing " - "arn:aws:secretsmanager:eu-central-1:123456789:secret:plain_secret_not_str_in", + "Error for secret " + "arn:aws:secretsmanager:eu-central-1:123456789:secret:plain_secret_not_str_int: " + "expected to be a string", ): aws_sm_expander(config_yaml) diff --git a/tests/storage/test_decorator.py b/tests/storage/test_decorator.py index 2ef065b3..ff7f65ba 100644 --- a/tests/storage/test_decorator.py +++ b/tests/storage/test_decorator.py @@ -190,6 +190,41 @@ def resolver(_: str, field_to_expand_event_list_from: str) -> str: assert expected == decorated + def test_expand_event_list_from_field_json_content_type_single_no_circuit_breaker(self) -> None: + def resolver(_: str, field_to_expand_event_list_from: str) -> str: + return field_to_expand_event_list_from + + event_list_from_field_expander = ExpandEventListFromField("Records", "", resolver, None, None) + + storage = DummyStorage( + event_list_from_field_expander=event_list_from_field_expander, json_content_type="single" + ) + data: bytes = ( + b'{"Records": [' + + b",\n".join([b'{"a line":"' + str(i).encode("utf-8") + b'"}' for i in range(0, 2000)]) + + b"]}\n" + ) + fixtures = BytesIO(data) + expected: list[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]] = list( + [ + (b'{"a line":"' + str(i).encode("utf-8") + b'"}', int(i * (len(data) / 2000)), 0, b"\n", i) + for i in range(0, 2000) + ] + ) + expected.pop() + expected.append((b'{"a line":"1999"}', int(len(data) - (len(data) / 2000)), len(data), b"\n", None)) + + decorated: list[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]] = list( + [ + (data, starting_offset, ending_offset, newline, event_expanded_offset) + for data, starting_offset, ending_offset, newline, event_expanded_offset in storage.generate( + 0, fixtures, False + ) + ] + ) + + assert expected == decorated + def test_multiline_processor(self) -> None: multiline_processor = MultilineFactory.create(multiline_type="count", count_lines=3)