Skip to content

Commit

Permalink
Fix json collector decorator and switch json parsing library to orjson (
Browse files Browse the repository at this point in the history
#553)



* Bump boto3 from 1.33.7 to 1.33.9 (#551)

Bumps [boto3](https://github.com/boto/boto3) from 1.33.7 to 1.33.9.
- [Release notes](https://github.com/boto/boto3/releases)
- [Changelog](https://github.com/boto/boto3/blob/develop/CHANGELOG.rst)
- [Commits](boto/boto3@1.33.7...1.33.9)

---
updated-dependencies:
- dependency-name: boto3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Davide Girardi <[email protected]>
  • Loading branch information
3 people authored Dec 11, 2023
1 parent ebf6261 commit bc661c2
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 61 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
##### Features
* Add user agent with information about ESF version and host environment: [#537](https://github.com/elastic/elastic-serverless-forwarder/pull/537)
* Remove calls to `sqs.DeleteMessage` and refactor storage decorators: [#544](https://github.com/elastic/elastic-serverless-forwarder/pull/544)
##### Bug fixes
* Fix regression when both `json_content_type: single` and `expand_event_list_from_field` are set: [#553](https://github.com/elastic/elastic-serverless-forwarder/pull/553)

### v1.10.0 - 2023/10/27
##### Features
Expand Down
5 changes: 4 additions & 1 deletion handlers/aws/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,11 @@ def get_trigger_type_and_config_source(event: dict[str, Any]) -> tuple[str, str]
and "eventSource" in body["Records"][0]
):
event_source = body["Records"][0]["eventSource"]
if event_source not in _available_triggers:
raise Exception("except in the function")
else:
raise Exception
raise Exception("except in the function")

except Exception:
if "eventSource" not in first_record:
raise Exception("Not supported trigger")
Expand Down
2 changes: 1 addition & 1 deletion requirements-tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pytest-cov==4.1.0
pytest-benchmark==4.0.0
coverage==7.3.2
simplejson==3.19.2
orjson==3.9.10
ujson==5.8.0
pysimdjson==5.0.2
python-rapidjson==1.13
cysimdjson==23.8
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ecs_logging==2.1.0
elasticsearch==7.17.9
PyYAML==6.0.1
aws_lambda_typing==2.18.0
ujson==5.8.0
orjson==3.9.10
requests==2.31.0
urllib3==1.26.18
typing-extensions==4.8.0
9 changes: 6 additions & 3 deletions share/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

from typing import Any, AnyStr

import ujson
import orjson


def json_dumper(json_object: Any) -> str:
return ujson.dumps(json_object, ensure_ascii=False, reject_bytes=False)
if isinstance(json_object, bytes):
json_object = json_object.decode("utf-8")

return orjson.dumps(json_object).decode("utf-8")


def json_parser(payload: AnyStr) -> Any:
return ujson.loads(payload)
return orjson.loads(payload)
6 changes: 2 additions & 4 deletions share/secretsmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import boto3
from botocore.client import BaseClient as BotoBaseClient
from ujson import JSONDecodeError
from orjson import JSONDecodeError

from .json import json_parser
from .logger import logger as shared_logger
Expand All @@ -27,7 +27,7 @@ def aws_sm_expander(config_yaml: str) -> str:
Secrets Manager expander for config file
It scans the file for the secrets manager arn pattern, checks for correct configuration,
retrieves the values from the secret manager and replaces them in the config file.
Exceptions will be risen for the following scenarios:
Exceptions will be raised for the following scenarios:
- Not respecting the arn pattern
- Input is for both plain text and json keys for the same secret manager name
- The fetched value is empty
Expand Down Expand Up @@ -153,8 +153,6 @@ def parse_secrets_str(secrets: str, secret_arn: str) -> Union[str, dict[str, Any
except JSONDecodeError:
shared_logger.debug("parsed secrets as plaintext")
return secrets
except Exception as e:
raise Exception(f"{e} while parsing {secret_arn}")
else:
shared_logger.debug("parsed secrets as json")
return parsed_secrets
106 changes: 60 additions & 46 deletions storage/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,22 @@ def wrapper(

yield line, _, _, newline, None

def _collect_single(iterator: StorageDecoratorIterator) -> StorageDecoratorIterator:
# we get the original iterator, we collect everything in a list that we merge later and extract values from
single: list[tuple[Union[StorageReader, bytes], int, int, bytes]] = list(
[
(data, starting_offset, ending_offset, newline)
for data, starting_offset, ending_offset, newline, _ in iterator
]
)

newline = single[0][-1]
starting_offset = single[0][1]
ending_offset = single[-1][2]

data_to_yield: bytes = newline.join([x[0] for x in single])
yield data_to_yield, starting_offset, ending_offset, newline, None

def wrapper(
storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool
) -> StorageDecoratorIterator:
Expand All @@ -255,47 +271,44 @@ def wrapper(
json_collector_state.ending_offset = range_start

iterator = func(storage, range_start, body, is_gzipped)
# if we know it's a single json we replace body with the whole content,
# and mark the object as started
if storage.json_content_type == "single" and event_list_from_field_expander is None:
single: list[tuple[Union[StorageReader, bytes], int, int, bytes]] = list(
[
(data, starting_offset, ending_offset, newline)
for data, starting_offset, ending_offset, newline, _ in iterator
]
)

newline = single[0][-1]
starting_offset = single[0][1]
ending_offset = single[-1][2]

data_to_yield: bytes = newline.join([x[0] for x in single])
yield data_to_yield, starting_offset, ending_offset, newline, None
else:
for data, starting_offset, ending_offset, newline, _ in iterator:
assert isinstance(data, bytes)
# if we know it's a single json we wrap the iterator with _collect_single
# and mark the object as json and started
if storage.json_content_type == "single":
iterator = _collect_single(iterator=iterator)
json_collector_state.is_a_json_object = True
json_collector_state.has_an_object_start = True

for data, starting_offset, ending_offset, newline, _ in iterator:
assert isinstance(data, bytes)

# if it's not a json object we can just forward the content by lines
# let's wait for the start of a json object
if not json_collector_state.has_an_object_start:
# if range_start is greater than zero, or we have leading space, data can be empty
stripped_data = data.decode("utf-8").lstrip()
if len(stripped_data) > 0 and stripped_data[0] == "{":
# we mark the potentiality of a json object start
# CAVEAT: if the log entry starts with `{` but the
# content is not json, we buffer the first 10k lines
# before the circuit breaker kicks in
json_collector_state.has_an_object_start = True

# if it has not a json object start we can just forward the content by lines
if not json_collector_state.has_an_object_start:
# if range_start is greater than zero, or we have leading space, data can be empty
stripped_data = data.decode("utf-8").lstrip()
if len(stripped_data) > 0 and stripped_data[0] == "{":
# we mark the potentiality of a json object start
# CAVEAT: if the log entry starts with `{` but the
# content is not json, we buffer the first 10k lines
# before the circuit breaker kicks in
json_collector_state.has_an_object_start = True

if not json_collector_state.has_an_object_start:
_handle_offset(len(data) + len(newline), json_collector_state)
yield data, starting_offset, ending_offset, newline, None

if json_collector_state.has_an_object_start:
# let's parse the data only if it is not single, or we have a field expander
_handle_offset(len(data) + len(newline), json_collector_state)
yield data, starting_offset, ending_offset, newline, None

# it has a json object start, let's apply our logic
if json_collector_state.has_an_object_start:
# it is a single json and we have not a field expander, let's yield the content
if event_list_from_field_expander is None and storage.json_content_type == "single":
yield data, starting_offset, ending_offset, newline, None
else:
# it is not single, or we have a field expander. let's try to collect the data as json
for data_to_yield, json_object in _collector(data, newline, json_collector_state):
shared_logger.debug(
"json_collector objects", extra={"offset": json_collector_state.ending_offset}
)
# we have a field expander, let's yield the expansion
if event_list_from_field_expander is not None:
for (
expanded_log_event,
Expand All @@ -316,6 +329,7 @@ def wrapper(
expanded_event_n,
)
else:
# we do not have a field expander, let's yield the expansion
yield (
data_to_yield,
json_collector_state.starting_offset,
Expand All @@ -326,17 +340,17 @@ def wrapper(

del json_object

# check if we hit the circuit broken
if json_collector_state.is_a_json_object_circuit_broken:
# let's yield what we have so far
for line, _, _, original_newline, _ in _by_lines_fallback(json_collector_state):
yield (
line,
json_collector_state.starting_offset,
json_collector_state.ending_offset,
original_newline,
None,
)
# check if we hit the circuit broken
if json_collector_state.is_a_json_object_circuit_broken:
# let's yield what we have so far
for line, _, _, original_newline, _ in _by_lines_fallback(json_collector_state):
yield (
line,
json_collector_state.starting_offset,
json_collector_state.ending_offset,
original_newline,
None,
)

# in this case we could have a trailing new line in what's left in the buffer
# or the content had a leading `{` but was not a json object before the circuit breaker intercepted it,
Expand Down
7 changes: 4 additions & 3 deletions tests/handlers/aws/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -959,12 +959,13 @@ def test_lambda_handler_failure(self) -> None:

handler(event, ctx) # type:ignore

with self.subTest("invalid secretsmanager: json TypeError risen"):
with self.subTest("invalid secretsmanager: json TypeError raised"):
os.environ["S3_CONFIG_FILE"] = "s3://s3_config_file_bucket/s3_config_file_object_key"
with self.assertRaisesRegex(
ConfigFileException,
"Expected string or C-contiguous bytes-like object while parsing "
"arn:aws:secretsmanager:eu-central-1:123456789:secret:plain_secret_not_str_int",
"Error for secret "
"arn:aws:secretsmanager:eu-central-1:123456789:secret:plain_secret_not_str_int: "
"expected to be a string",
):
ctx = ContextMock()
_s3_client_mock.config_content = b"""
Expand Down
64 changes: 64 additions & 0 deletions tests/handlers/aws/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,70 @@ def _get_random_digit_string_of_size(size: int) -> str:
return "".join(random.choices(string.digits, k=size))


@pytest.mark.unit
class TestGetTriggerTypeAndConfigSource(TestCase):
def test_get_trigger_type_and_config_source(self) -> None:
from handlers.aws.utils import CONFIG_FROM_PAYLOAD, CONFIG_FROM_S3FILE, get_trigger_type_and_config_source

with self.subTest("cloudwatch-logs and CONFIG_FROM_S3FILE"):
event: dict[str, Any] = {"awslogs": {"data": ""}}

assert get_trigger_type_and_config_source(event=event) == ("cloudwatch-logs", CONFIG_FROM_S3FILE)

with self.subTest("no Records"):
with self.assertRaisesRegexp(Exception, "Not supported trigger"):
event = {}

get_trigger_type_and_config_source(event=event)

with self.subTest("len(Records) < 1"):
with self.assertRaisesRegexp(Exception, "Not supported trigger"):
event = {"Records": []}

get_trigger_type_and_config_source(event=event)

with self.subTest("body in first record: replay-sqs CONFIG_FROM_S3FILE"):
event = {
"Records": [
{
"body": '{"output_type": "output_type", '
'"output_args": "output_args", "event_payload": "event_payload"}'
}
]
}

assert get_trigger_type_and_config_source(event=event) == ("replay-sqs", CONFIG_FROM_S3FILE)

with self.subTest("body in first record: eventSource override"):
event = {"Records": [{"body": '{"Records": [{"eventSource":"aws:s3"}]}', "eventSource": "aws:kinesis"}]}

assert get_trigger_type_and_config_source(event=event) == ("s3-sqs", CONFIG_FROM_S3FILE)

with self.subTest("body in first record: eventSource not override"):
event = {
"Records": [
{"body": '{"Records": [{"eventSource":"not-available-trigger"}]}', "eventSource": "aws:kinesis"}
]
}

assert get_trigger_type_and_config_source(event=event) == ("kinesis-data-stream", CONFIG_FROM_S3FILE)

with self.subTest("body not in first record: eventSource not override"):
event = {"Records": [{"eventSource": "aws:kinesis"}]}

assert get_trigger_type_and_config_source(event=event) == ("kinesis-data-stream", CONFIG_FROM_S3FILE)

with self.subTest("messageAttributes without originalEventSourceARN in first record, CONFIG_FROM_S3FILE"):
event = {"Records": [{"messageAttributes": {}, "eventSource": "aws:kinesis"}]}

assert get_trigger_type_and_config_source(event=event) == ("kinesis-data-stream", CONFIG_FROM_S3FILE)

with self.subTest("messageAttributes with originalEventSourceARN in first record, CONFIG_FROM_PAYLOAD"):
event = {"Records": [{"messageAttributes": {"originalEventSourceARN": ""}, "eventSource": "aws:kinesis"}]}

assert get_trigger_type_and_config_source(event=event) == ("kinesis-data-stream", CONFIG_FROM_PAYLOAD)


@pytest.mark.unit
class TestDiscoverIntegrationScope(TestCase):
def test_discover_integration_scope(self) -> None:
Expand Down
5 changes: 3 additions & 2 deletions tests/share/test_secretsmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,9 @@ def test_parse_secrets_manager(self) -> None:

with self.assertRaisesRegex(
Exception,
"Expected string or C-contiguous bytes-like object while parsing "
"arn:aws:secretsmanager:eu-central-1:123456789:secret:plain_secret_not_str_in",
"Error for secret "
"arn:aws:secretsmanager:eu-central-1:123456789:secret:plain_secret_not_str_int: "
"expected to be a string",
):
aws_sm_expander(config_yaml)

Expand Down
35 changes: 35 additions & 0 deletions tests/storage/test_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,41 @@ def resolver(_: str, field_to_expand_event_list_from: str) -> str:

assert expected == decorated

def test_expand_event_list_from_field_json_content_type_single_no_circuit_breaker(self) -> None:
def resolver(_: str, field_to_expand_event_list_from: str) -> str:
return field_to_expand_event_list_from

event_list_from_field_expander = ExpandEventListFromField("Records", "", resolver, None, None)

storage = DummyStorage(
event_list_from_field_expander=event_list_from_field_expander, json_content_type="single"
)
data: bytes = (
b'{"Records": ['
+ b",\n".join([b'{"a line":"' + str(i).encode("utf-8") + b'"}' for i in range(0, 2000)])
+ b"]}\n"
)
fixtures = BytesIO(data)
expected: list[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]] = list(
[
(b'{"a line":"' + str(i).encode("utf-8") + b'"}', int(i * (len(data) / 2000)), 0, b"\n", i)
for i in range(0, 2000)
]
)
expected.pop()
expected.append((b'{"a line":"1999"}', int(len(data) - (len(data) / 2000)), len(data), b"\n", None))

decorated: list[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]] = list(
[
(data, starting_offset, ending_offset, newline, event_expanded_offset)
for data, starting_offset, ending_offset, newline, event_expanded_offset in storage.generate(
0, fixtures, False
)
]
)

assert expected == decorated

def test_multiline_processor(self) -> None:
multiline_processor = MultilineFactory.create(multiline_type="count", count_lines=3)

Expand Down

0 comments on commit bc661c2

Please sign in to comment.