diff --git a/CHANGELOG.md b/CHANGELOG.md index d9a6007f..2d6caa11 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ### v1.11.0 - 2023/11/27 ##### Features * Add user agent with information about ESF version and host environment: [#537](https://github.com/elastic/elastic-serverless-forwarder/pull/537) +* Remove calls to `sqs.DeleteMessage` and refactor storage decorators: [#544](https://github.com/elastic/elastic-serverless-forwarder/pull/544) ### v1.10.0 - 2023/10/27 ##### Features diff --git a/handlers/aws/handler.py b/handlers/aws/handler.py index b967160f..626fc8ad 100644 --- a/handlers/aws/handler.py +++ b/handlers/aws/handler.py @@ -28,7 +28,6 @@ capture_serverless, config_yaml_from_payload, config_yaml_from_s3, - delete_sqs_record, expand_event_list_from_field_resolver, get_continuing_original_input_type, get_input_from_log_group_subscription_data, @@ -371,16 +370,9 @@ def handle_timeout( config_yaml=timeout_config_yaml, ) - delete_sqs_record(sqs_record["eventSourceARN"], sqs_record["receiptHandle"]) - previous_sqs_record: int = 0 - last_sqs_record: Optional[dict[str, Any]] = None for current_sqs_record, sqs_record in enumerate(lambda_event["Records"]): - last_sqs_record = sqs_record if current_sqs_record > previous_sqs_record: - deleting_sqs_record = lambda_event["Records"][previous_sqs_record] - delete_sqs_record(deleting_sqs_record["eventSourceARN"], deleting_sqs_record["receiptHandle"]) - previous_sqs_record = current_sqs_record continuing_original_input_type = get_continuing_original_input_type(sqs_record) @@ -504,7 +496,4 @@ def handle_timeout( extra={"sent_events": sent_events, "empty_events": empty_events, "skipped_events": skipped_events}, ) - assert last_sqs_record is not None - delete_sqs_record(last_sqs_record["eventSourceARN"], last_sqs_record["receiptHandle"]) - return "completed" diff --git a/handlers/aws/utils.py b/handlers/aws/utils.py index 7ef76f13..ccef0792 100644 --- a/handlers/aws/utils.py +++ b/handlers/aws/utils.py @@ -389,7 +389,7 @@ def get_input_from_log_group_subscription_data( We avoid to call the describe_log_streams on the logs' client, since we have no way to apply the proper throttling because we'd need to know the number of concurrent lambda running at the time of the call. In order to not hardcode the list of regions we rely on ec2 DescribeRegions - as much weird as it is - that I found - no information about any kind of throttling. Weadd IAM permissions for it in deployment. + no information about having any kind of throttling. We add IAM permissions for it in deployment. """ all_regions = get_ec2_client().describe_regions(AllRegions=True) assert "Regions" in all_regions diff --git a/requirements.txt b/requirements.txt index 8d87c4d4..e594c086 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ aws_lambda_typing==2.18.0 ujson==5.8.0 requests==2.31.0 urllib3==1.26.18 +typing-extensions==4.8.0 diff --git a/share/__init__.py b/share/__init__.py index 282522c4..515f31d2 100644 --- a/share/__init__.py +++ b/share/__init__.py @@ -9,6 +9,6 @@ from .include_exlude import IncludeExcludeFilter, IncludeExcludeRule from .json import json_dumper, json_parser from .logger import logger as shared_logger -from .multiline import CollectBuffer, CountMultiline, PatternMultiline, ProtocolMultiline, WhileMultiline +from .multiline import CollectBuffer, CountMultiline, FeedIterator, PatternMultiline, ProtocolMultiline, WhileMultiline from .secretsmanager import aws_sm_expander from .utils import get_hex_prefix diff --git a/share/config.py b/share/config.py index 1ff12ed7..336c1516 100644 --- a/share/config.py +++ b/share/config.py @@ -14,8 +14,6 @@ _available_input_types: list[str] = ["cloudwatch-logs", "s3-sqs", "sqs", "kinesis-data-stream"] _available_output_types: list[str] = ["elasticsearch", "logstash"] -IntegrationScopeDiscovererCallable = Callable[[dict[str, Any], int], str] - class Output: """ diff --git a/share/expand_event_list_from_field.py b/share/expand_event_list_from_field.py index b52a68d2..ef880a0f 100644 --- a/share/expand_event_list_from_field.py +++ b/share/expand_event_list_from_field.py @@ -8,6 +8,8 @@ from .json import json_dumper from .logger import logger as shared_logger +# ExpandEventListFromFieldResolverCallable accepts an integration_scope and the field to expand events list from as +# arguments. It returns the resolved name of the field to expand the events list from. ExpandEventListFromFieldResolverCallable = Callable[[str, str], str] @@ -78,6 +80,9 @@ def expand( if json_object is None: yield log_event, starting_offset, ending_offset, None else: + # expanded_ending_offset is set to the starting_offset because if we want to set it to the beginning of the + # json object in case of a message from the continuation queue. if we update it, if the payload is continued + # we will fetch the content of the payload from the middle of the json object, failing to parse it expanded_ending_offset: int = starting_offset for ( @@ -97,6 +102,7 @@ def expand( if is_last_expanded_event: expanded_event_n = None + # only when we reach the last expanded event we can move the ending offset expanded_ending_offset = ending_offset else: expanded_event_n = None diff --git a/share/multiline.py b/share/multiline.py index feba85de..0823b9ae 100644 --- a/share/multiline.py +++ b/share/multiline.py @@ -2,6 +2,9 @@ # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. + +# This file is a porting of the multiline processor on beats. + from __future__ import annotations import datetime @@ -9,29 +12,42 @@ from abc import ABCMeta from typing import Callable, Iterator, Optional, Protocol +from typing_extensions import TypeAlias + default_max_bytes: int = 10485760 # Default maximum number of bytes to return in one multi-line event default_max_lines: int = 500 # Default maximum number of lines to return in one multi-line event default_multiline_timeout: int = 5 # Default timeout in secs to finish a multi-line event. timedelta_circuit_breaker: datetime.timedelta = datetime.timedelta(seconds=5) +# CollectTuple is a tuple representing the multilines bytes content, the length of the content and the newline found +# These data is instrumental to the `StorageDecoratorIterator` that needs the content to yield, the starting and ending +# offsets and the newline +CollectTuple: TypeAlias = tuple[bytes, int, bytes] + +# CollectIterator yields a `CollectTuple` +CollectIterator: TypeAlias = Iterator[CollectTuple] + +# FeedIterator yields a tuple representing the content and its newline to feed of the `ProtocolMultiline` implementation +FeedIterator: TypeAlias = Iterator[tuple[bytes, bytes]] + class CommonMultiline(metaclass=ABCMeta): """ Common class for Multiline components """ - _feed: Iterator[tuple[bytes, bytes, int]] + _feed: FeedIterator _buffer: CollectBuffer _pre_collect_buffer: bool @property - def feed(self) -> Iterator[tuple[bytes, bytes, int]]: + def feed(self) -> FeedIterator: return self._feed @feed.setter - def feed(self, value: Iterator[tuple[bytes, bytes, int]]) -> None: + def feed(self, value: FeedIterator) -> None: self._feed = value @@ -40,18 +56,18 @@ class ProtocolMultiline(Protocol): Protocol class for Multiline components """ - _feed: Iterator[tuple[bytes, bytes, int]] + _feed: FeedIterator _buffer: CollectBuffer @property - def feed(self) -> Iterator[tuple[bytes, bytes, int]]: + def feed(self) -> FeedIterator: pass # pragma: no cover @feed.setter - def feed(self, value: Iterator[tuple[bytes, bytes, int]]) -> None: + def feed(self, value: FeedIterator) -> None: pass # pragma: no cover - def collect(self) -> Iterator[tuple[bytes, int, int]]: + def collect(self) -> CollectIterator: pass # pragma: no cover @@ -74,7 +90,7 @@ def __init__(self, max_bytes: int, max_lines: int, skip_newline: bool): self._processed_lines: int = 0 self._current_length: int = 0 - def collect_and_reset(self) -> tuple[bytes, int]: + def collect_and_reset(self) -> CollectTuple: data = self._buffer current_length = self._current_length @@ -85,7 +101,14 @@ def collect_and_reset(self) -> tuple[bytes, int]: self.previous = b"" - return data, current_length + if data.find(b"\r\n") > -1: + newline = b"\r\n" + elif data.find(b"\n") > -1: + newline = b"\n" + else: + newline = b"" + + return data, current_length, newline def is_empty(self) -> bool: return self._buffer_lines == 0 @@ -169,9 +192,9 @@ def __eq__(self, other: object) -> bool: and self._skip_newline == self._skip_newline ) - def collect(self) -> Iterator[tuple[bytes, int, int]]: + def collect(self) -> CollectIterator: last_iteration_datetime: datetime.datetime = datetime.datetime.utcnow() - for data, newline, newline_length in self.feed: + for data, newline in self.feed: self._buffer.grow(data, newline) self._current_count += 1 if ( @@ -179,21 +202,14 @@ def collect(self) -> Iterator[tuple[bytes, int, int]]: or (datetime.datetime.utcnow() - last_iteration_datetime) > timedelta_circuit_breaker ): self._current_count = 0 - content, current_length = self._buffer.collect_and_reset() - yield content, current_length, newline_length + yield self._buffer.collect_and_reset() if not self._buffer.is_empty(): - content, current_length = self._buffer.collect_and_reset() - - newline_length = 0 - if content.find(b"\r\n") > -1: - newline_length = 2 - elif content.find(b"\n") > -1: - newline_length = 1 - - yield content, current_length, newline_length + yield self._buffer.collect_and_reset() +# WhileMatcherCallable accepts a pattern in bytes to be compiled as regex. +# It returns a boolean indicating if the content matches a "while" multiline pattern or not. WhileMatcherCallable = Callable[[bytes], bool] @@ -259,44 +275,38 @@ def negate(line: bytes) -> bool: return negate - def collect(self) -> Iterator[tuple[bytes, int, int]]: + def collect(self) -> CollectIterator: last_iteration_datetime: datetime.datetime = datetime.datetime.utcnow() - for data, newline, newline_length in self.feed: + for data, newline in self.feed: if not self._matcher(data): if self._buffer.is_empty(): self._buffer.grow(data, newline) - content, current_length = self._buffer.collect_and_reset() - yield content, current_length, newline_length + yield self._buffer.collect_and_reset() else: - content, current_length = self._buffer.collect_and_reset() + content, current_length, _ = self._buffer.collect_and_reset() self._buffer.grow(data, newline) - yield content, current_length, newline_length + yield content, current_length, newline - content, current_length = self._buffer.collect_and_reset() + content, current_length, _ = self._buffer.collect_and_reset() - yield content, current_length, newline_length + yield content, current_length, newline else: self._buffer.grow(data, newline) # no pre collect buffer in while multiline, let's check the circuit breaker after at least one grow if (datetime.datetime.utcnow() - last_iteration_datetime) > timedelta_circuit_breaker: - content, current_length = self._buffer.collect_and_reset() - - yield content, current_length, newline_length + yield self._buffer.collect_and_reset() if not self._buffer.is_empty(): - content, current_length = self._buffer.collect_and_reset() - - newline_length = 0 - if content.find(b"\r\n") > -1: - newline_length = 2 - elif content.find(b"\n") > -1: - newline_length = 1 - - yield content, current_length, newline_length + yield self._buffer.collect_and_reset() +# WhileMatcherCallable accepts the previous and the current content as arguments. +# It returns a boolean indicating if the content matches a "pattern" multiline pattern or not. PatternMatcherCallable = Callable[[bytes, bytes], bool] + +# SelectCallable accepts the previous and the current content as arguments. +# It returns either the previous or current content according to the matching type ("before" or "after"). SelectCallable = Callable[[bytes, bytes], bytes] @@ -390,8 +400,8 @@ def negate(previous: bytes, current: bytes) -> bool: def _check_matcher(self) -> bool: return (self._match == "after" and len(self._buffer.previous) > 0) or self._match == "before" - def collect(self) -> Iterator[tuple[bytes, int, int]]: - for data, newline, newline_length in self.feed: + def collect(self) -> CollectIterator: + for data, newline in self.feed: last_iteration_datetime: datetime.datetime = datetime.datetime.utcnow() if self._pre_collect_buffer: self._buffer.collect_and_reset() @@ -401,31 +411,19 @@ def collect(self) -> Iterator[tuple[bytes, int, int]]: self._buffer.grow(data, newline) self._pre_collect_buffer = True - content, current_length = self._buffer.collect_and_reset() - yield content, current_length, newline_length + yield self._buffer.collect_and_reset() elif ( not self._buffer.is_empty() and self._check_matcher() and not self._matcher(self._buffer.previous, data) ): - content, current_length = self._buffer.collect_and_reset() + content, current_length, _ = self._buffer.collect_and_reset() self._buffer.grow(data, newline) - yield content, current_length, newline_length + yield content, current_length, newline else: if (datetime.datetime.utcnow() - last_iteration_datetime) > timedelta_circuit_breaker: - content, current_length = self._buffer.collect_and_reset() - - yield content, current_length, newline_length - + yield self._buffer.collect_and_reset() self._buffer.grow(data, newline) if not self._buffer.is_empty(): - content, current_length = self._buffer.collect_and_reset() - - newline_length = 0 - if content.find(b"\r\n") > -1: - newline_length = 2 - elif content.find(b"\n") > -1: - newline_length = 1 - - yield content, current_length, newline_length + yield self._buffer.collect_and_reset() diff --git a/shippers/shipper.py b/shippers/shipper.py index af53db8e..78f02959 100644 --- a/shippers/shipper.py +++ b/shippers/shipper.py @@ -4,7 +4,11 @@ from typing import Any, Callable, Protocol +# ReplayHandlerCallable accepts the output type, a dict of arguments for the output and the event to be replayed. +# It does not return anything. ReplayHandlerCallable = Callable[[str, dict[str, Any], dict[str, Any]], None] + +# EventIdGeneratorCallable accepts a dict of the events as argument. It returns the _id of that event. EventIdGeneratorCallable = Callable[[dict[str, Any]], str] EVENT_IS_EMPTY = "EVENT_IS_EMPTY" diff --git a/storage/__init__.py b/storage/__init__.py index 28092de5..3bf1ab80 100644 --- a/storage/__init__.py +++ b/storage/__init__.py @@ -2,7 +2,8 @@ # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. +from .decorator import by_lines, inflate, json_collector, multi_line from .factory import StorageFactory from .payload import PayloadStorage from .s3 import S3Storage -from .storage import ProtocolStorage, StorageReader +from .storage import CommonStorage, GetByLinesIterator, ProtocolStorage, StorageDecoratorIterator, StorageReader diff --git a/storage/decorator.py b/storage/decorator.py index a16d038a..831aa174 100644 --- a/storage/decorator.py +++ b/storage/decorator.py @@ -6,212 +6,181 @@ from io import BytesIO from typing import Any, Iterator, Optional, Union -from share import ExpandEventListFromField, ProtocolMultiline, json_parser, shared_logger +from share import ExpandEventListFromField, FeedIterator, ProtocolMultiline, json_parser, shared_logger -from .storage import CHUNK_SIZE, GetByLinesCallable, ProtocolStorageType, StorageReader +from .storage import CHUNK_SIZE, ProtocolStorageType, StorageDecoratorCallable, StorageDecoratorIterator, StorageReader -def by_lines(func: GetByLinesCallable[ProtocolStorageType]) -> GetByLinesCallable[ProtocolStorageType]: +def by_lines(func: StorageDecoratorCallable[ProtocolStorageType]) -> StorageDecoratorCallable[ProtocolStorageType]: """ ProtocolStorage decorator for returning content split by lines """ def wrapper( storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool - ) -> Iterator[tuple[Union[StorageReader, bytes], int, int, int, Optional[int]]]: + ) -> StorageDecoratorIterator: ending_offset: int = range_start unfinished_line: bytes = b"" - newline: bytes = b"" - newline_length: int = 0 - - json_content_type: Optional[str] = storage.json_content_type - iterator = func(storage, range_start, body, is_gzipped) - if json_content_type == "single" and storage.multiline_processor is None: - try: - while True: - data, _, _, _, _ = next(iterator) - assert isinstance(data, bytes) - - unfinished_line += data - except StopIteration: - starting_offset = ending_offset - ending_offset += len(unfinished_line) - - if unfinished_line.endswith(b"\r\n"): - newline = b"\r\n" - newline_length = 2 - elif unfinished_line.endswith(b"\n"): - newline = b"\n" - newline_length = 1 - - unfinished_line = unfinished_line.rstrip(newline) - shared_logger.debug("by_line json_content_type single", extra={"offset": ending_offset}) - - yield unfinished_line, starting_offset, ending_offset, newline_length, None - else: - for data, _, _, _, _ in iterator: - assert isinstance(data, bytes) - - unfinished_line += data - lines = unfinished_line.decode("utf-8").splitlines() - - if len(lines) == 0: - continue + for data, _, _, _, _ in iterator: + assert isinstance(data, bytes) - if newline_length == 0: - if unfinished_line.find(b"\r\n") > -1: - newline = b"\r\n" - newline_length = 2 - elif unfinished_line.find(b"\n") > -1: - newline = b"\n" - newline_length = 1 + unfinished_line += data + lines = unfinished_line.decode("utf-8").splitlines() - if unfinished_line.endswith(newline): - unfinished_line = lines.pop().encode() + newline - else: - unfinished_line = lines.pop().encode() + if len(lines) == 0: + continue - for line in lines: - line_encoded = line.encode("utf-8") - starting_offset = ending_offset - ending_offset += len(line_encoded) + newline_length - shared_logger.debug("by_line lines", extra={"offset": ending_offset}) + if unfinished_line.find(b"\r\n") > -1: + newline = b"\r\n" + elif unfinished_line.find(b"\n") > -1: + newline = b"\n" + else: + newline = b"" - yield line_encoded, starting_offset, ending_offset, newline_length, None + # replace unfinished_line with the last element removed from lines, trailing with newline + if unfinished_line.endswith(newline): + unfinished_line = lines.pop().encode() + newline + else: + unfinished_line = lines.pop().encode() - if len(unfinished_line) > 0: + for line in lines: + line_encoded = line.encode("utf-8") starting_offset = ending_offset - ending_offset += len(unfinished_line) + ending_offset += len(line_encoded) + len(newline) + shared_logger.debug("by_line lines", extra={"offset": ending_offset}) + + yield line_encoded, starting_offset, ending_offset, newline, None - if newline_length == 2: - newline = b"\r\n" - elif newline_length == 1: - newline = b"\n" - else: - newline = b"" + if len(unfinished_line) > 0: + if unfinished_line.endswith(b"\r\n"): + newline = b"\r\n" + elif unfinished_line.endswith(b"\n"): + newline = b"\n" + else: + newline = b"" - if newline_length > 0 and not unfinished_line.endswith(newline): - newline_length = 0 + unfinished_line = unfinished_line.rstrip(newline) - unfinished_line = unfinished_line.rstrip(newline) + starting_offset = ending_offset + ending_offset += len(unfinished_line) + len(newline) - shared_logger.debug("by_line unfinished_line", extra={"offset": ending_offset}) + shared_logger.debug("by_line unfinished_line", extra={"offset": ending_offset}) - yield unfinished_line, starting_offset, ending_offset, newline_length, None + yield unfinished_line, starting_offset, ending_offset, newline, None return wrapper -def multi_line(func: GetByLinesCallable[ProtocolStorageType]) -> GetByLinesCallable[ProtocolStorageType]: +def multi_line(func: StorageDecoratorCallable[ProtocolStorageType]) -> StorageDecoratorCallable[ProtocolStorageType]: """ ProtocolStorage decorator for returning content collected by multiline """ def wrapper( storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool - ) -> Iterator[tuple[Union[StorageReader, bytes], int, int, int, Optional[int]]]: - ending_offset: int = range_start - + ) -> StorageDecoratorIterator: multiline_processor: Optional[ProtocolMultiline] = storage.multiline_processor if not multiline_processor: iterator = func(storage, range_start, body, is_gzipped) - for ( - data, - original_starting_offset, - original_ending_offset, - newline_length, - event_expanded_offset, - ) in iterator: + for data, starting_offset, ending_offset, newline, event_expanded_offset in iterator: assert isinstance(data, bytes) - shared_logger.debug("multi_line skipped", extra={"offset": original_ending_offset}) + shared_logger.debug("multi_line skipped", extra={"offset": ending_offset}) - yield data, original_starting_offset, original_ending_offset, newline_length, event_expanded_offset + yield data, starting_offset, ending_offset, newline, event_expanded_offset else: + ending_offset = range_start - def iterator_to_multiline_feed() -> Iterator[tuple[bytes, bytes, int]]: - newline: bytes = b"" - previous_newline_length: int = 0 - for original_data, _, _, original_newline_length, _ in func(storage, range_start, body, is_gzipped): - assert isinstance(original_data, bytes) - - if original_newline_length != previous_newline_length: - previous_newline_length = original_newline_length - if original_newline_length == 2: - newline = b"\r\n" - elif original_newline_length == 1: - newline = b"\n" - else: - newline = b"" - - yield original_data, newline, original_newline_length + def iterator_to_multiline_feed() -> FeedIterator: + for data, _, _, newline, _ in func(storage, range_start, body, is_gzipped): + assert isinstance(data, bytes) + yield data, newline multiline_processor.feed = iterator_to_multiline_feed() - for multiline_data, multiline_ending_offset, newline_length in multiline_processor.collect(): + for multiline_data, multiline_ending_offset, newline in multiline_processor.collect(): starting_offset = ending_offset ending_offset += multiline_ending_offset shared_logger.debug("multi_line lines", extra={"offset": ending_offset}) - yield multiline_data, starting_offset, ending_offset, newline_length, None + yield multiline_data, starting_offset, ending_offset, newline, None return wrapper -class JsonCollector: - """ - ProtocolStorage decorator for returning content by collected json object (if any) spanning multiple lines - """ +class JsonCollectorState: + def __init__(self, storage: ProtocolStorageType): + self.storage: ProtocolStorageType = storage + + self.starting_offset: int = 0 + self.ending_offset: int = 0 + + self.unfinished_line: bytes = b"" + + self.has_an_object_start: bool = False + + self.is_a_json_object: bool = False + self.is_a_json_object_circuit_broken: bool = False + self.is_a_json_object_circuit_breaker: int = 0 - def __init__(self, function: GetByLinesCallable[ProtocolStorageType]): - self._function: GetByLinesCallable[ProtocolStorageType] = function - self._starting_offset: int = 0 - self._ending_offset: int = 0 - self._newline_length = -1 - self._unfinished_line: bytes = b"" +def json_collector( + func: StorageDecoratorCallable[ProtocolStorageType], +) -> StorageDecoratorCallable[ProtocolStorageType]: + """ + ProtocolStorage decorator for returning content by collected json object (if any) spanning multiple lines - self._is_a_json_object: bool = False - self._is_a_json_object_circuit_broken: bool = True - self._is_a_json_object_circuit_breaker: int = 0 + If `json_content_type` is `single` and we don't have any `expand_event_list_from_field` set, we just collect all the + content, and yield. If `json_content_type` is `disabled` or we have a multiline processor set, we yield what we + receive from the previous decorator. If `json_content_type` is `None` or `ndjson` we try to parse che content as + json if we find the beginning of a potential json object (ie: the `{` char). This is done appending one line after + one line and passing the content to the json parser until it will be able to parse a full json object. If the + content is ndjson every json parsing attempt will be successful, in case it isn't the parsing will succeed only + after we collect a full json object spanning multiple lines. A circuit breaker is present in order to stop trying to + parse the json if we reached 1000 lines: in this case we yield the content as it is, line by line. + Once a json object is parsed, if we have an `expand_event_list_from_field` set we pass the json the "events list + from field expander" and yield the expanded events list instead. + """ - self._storage: Optional[ProtocolStorageType] = None + def _handle_offset(offset_skew: int, json_collector_state: JsonCollectorState) -> None: + json_collector_state.starting_offset = json_collector_state.ending_offset + json_collector_state.ending_offset += offset_skew def _collector( - self, data: bytes, newline: bytes, newline_length: int + data: bytes, newline: bytes, json_collector_state: JsonCollectorState ) -> Iterator[tuple[bytes, Optional[dict[str, Any]]]]: try: # let's buffer the content # we receive data without newline # let's append it as well - self._unfinished_line += data + newline + json_collector_state.unfinished_line += data + newline # let's try to decode - json_object = json_parser(self._unfinished_line) + json_object = json_parser(json_collector_state.unfinished_line) # it didn't raise: we collected a json object - data_to_yield = self._unfinished_line + data_to_yield = json_collector_state.unfinished_line # let's reset the buffer - self._unfinished_line = b"" + json_collector_state.unfinished_line = b"" # let's increase the offset for yielding - self._handle_offset(len(data_to_yield)) + _handle_offset(len(data_to_yield), json_collector_state) - # let's decrease the circuit breaker + # let's decrease the circuit breaker by the number of lines in the data to yield if newline != b"": - self._is_a_json_object_circuit_breaker -= data_to_yield.count(newline) - 1 + json_collector_state.is_a_json_object_circuit_breaker -= data_to_yield.count(newline) - 1 else: - self._is_a_json_object_circuit_breaker -= 1 + json_collector_state.is_a_json_object_circuit_breaker -= 1 # let's trim surrounding newline data_to_yield = data_to_yield.strip(b"\r\n").strip(b"\n") # let's set the flag for json object - self._is_a_json_object = True + json_collector_state.is_a_json_object = True # finally yield yield data_to_yield, json_object @@ -220,154 +189,113 @@ def _collector( # let's keep iterating except ValueError: # it's an empty line, let's yield it - if self._is_a_json_object and len(self._unfinished_line.strip(b"\r\n").strip(b"\n")) == 0: + if ( + json_collector_state.is_a_json_object + and len(json_collector_state.unfinished_line.strip(b"\r\n").strip(b"\n")) == 0 + ): # let's reset the buffer - self._unfinished_line = b"" + json_collector_state.unfinished_line = b"" # let's increase the offset for yielding - self._handle_offset(newline_length) + _handle_offset(len(newline), json_collector_state) # finally yield yield b"", None else: # buffer was not a complete json object # let's increase the circuit breaker - self._is_a_json_object_circuit_breaker += 1 + json_collector_state.is_a_json_object_circuit_breaker += 1 # if the first 1k lines are not a json object let's give up - if self._is_a_json_object_circuit_breaker > 1000: - self._is_a_json_object_circuit_broken = True - - def _by_lines_fallback( - self, buffer: bytes - ) -> Iterator[tuple[Union[StorageReader, bytes], int, int, int, Optional[int]]]: - assert self._storage is not None - self._storage.json_content_type = None + if json_collector_state.is_a_json_object_circuit_breaker > 1000: + json_collector_state.is_a_json_object_circuit_broken = True + def _by_lines_fallback(json_collector_state: JsonCollectorState) -> StorageDecoratorIterator: @by_lines def wrapper( storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool - ) -> Iterator[tuple[Union[StorageReader, bytes], int, int, int, Optional[int]]]: + ) -> StorageDecoratorIterator: data_to_yield: bytes = body.read() - yield data_to_yield, 0, range_start, 0, None + yield data_to_yield, 0, range_start, b"", None - for line, starting_offset, ending_offset, original_newline_length, _ in wrapper( - self._storage, self._ending_offset, BytesIO(buffer), False + for line, _, _, newline, _ in wrapper( + json_collector_state.storage, + json_collector_state.ending_offset, + BytesIO(json_collector_state.unfinished_line), + False, ): assert isinstance(line, bytes) - yield line, starting_offset, ending_offset, original_newline_length, None - def _handle_offset(self, offset_skew: int) -> None: - self._starting_offset = self._ending_offset - self._ending_offset += offset_skew + _handle_offset(len(line) + len(newline), json_collector_state) + + # let's reset the buffer + json_collector_state.unfinished_line = b"" + + # let's set the flag for direct yield from now on + json_collector_state.has_an_object_start = False + + yield line, _, _, newline, None + + def wrapper( + storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool + ) -> StorageDecoratorIterator: + json_collector_state = JsonCollectorState(storage=storage) - def __call__( - self, storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool - ) -> Iterator[tuple[Union[StorageReader, bytes], int, int, int, Optional[int]]]: - self._storage = storage multiline_processor: Optional[ProtocolMultiline] = storage.multiline_processor if storage.json_content_type == "disabled" or multiline_processor: - iterator = self._function(storage, range_start, body, is_gzipped) - for data, original_starting_offset, original_ending_offset, newline_length, _ in iterator: + iterator = func(storage, range_start, body, is_gzipped) + for data, starting_offset, ending_offset, newline, _ in iterator: assert isinstance(data, bytes) - shared_logger.debug("JsonCollector skipped", extra={"offset": original_ending_offset}) + shared_logger.debug("json_collector skipped", extra={"offset": ending_offset}) - yield data, original_starting_offset, original_ending_offset, newline_length, None + yield data, starting_offset, ending_offset, newline, None else: - newline: bytes = b"" + event_list_from_field_expander: Optional[ExpandEventListFromField] = storage.event_list_from_field_expander - has_an_object_start: bool = False - wait_for_object_start: bool = True - wait_for_object_start_buffer: bytes = b"" + json_collector_state.ending_offset = range_start - self._ending_offset = range_start - self._starting_offset = 0 - self._newline_length = -1 - self._unfinished_line = b"" - - self._is_a_json_object = False - self._is_a_json_object_circuit_broken = False - self._is_a_json_object_circuit_breaker = 0 - - iterator = self._function(storage, range_start, body, is_gzipped) - for data, original_starting_offset, original_ending_offset, newline_length, _ in iterator: - assert isinstance(data, bytes) + iterator = func(storage, range_start, body, is_gzipped) + # if we know it's a single json we replace body with the whole content, + # and mark the object as started + if storage.json_content_type == "single" and event_list_from_field_expander is None: + single: list[tuple[Union[StorageReader, bytes], int, int, bytes]] = list( + [ + (data, starting_offset, ending_offset, newline) + for data, starting_offset, ending_offset, newline, _ in iterator + ] + ) + + newline = single[0][-1] + starting_offset = single[0][1] + ending_offset = single[-1][2] + + data_to_yield: bytes = newline.join([x[0] for x in single]) + yield data_to_yield, starting_offset, ending_offset, newline, None + else: + for data, starting_offset, ending_offset, newline, _ in iterator: + assert isinstance(data, bytes) - if newline_length != self._newline_length: - self._newline_length = newline_length - if newline_length == 2: - newline = b"\r\n" - elif newline_length == 1: - newline = b"\n" - else: - newline = b"" - - # let's check until we got some content from split by lines - if wait_for_object_start: - # we check for a potential json object on - # we strip leading space to be safe on padding - stripped_data = data.decode("utf-8").lstrip() - - # if range_start is greater than zero, data can be empty - if len(stripped_data) > 0: - wait_for_object_start = False - if stripped_data[0] == "{": + # if it's not a json object we can just forward the content by lines + if not json_collector_state.has_an_object_start: + # if range_start is greater than zero, or we have leading space, data can be empty + stripped_data = data.decode("utf-8").lstrip() + if len(stripped_data) > 0 and stripped_data[0] == "{": # we mark the potentiality of a json object start # CAVEAT: if the log entry starts with `{` but the # content is not json, we buffer the first 10k lines # before the circuit breaker kicks in - has_an_object_start = True - else: - # let's buffer discarded data including newline for eventual by_lines() fallback - wait_for_object_start_buffer += data + newline - - if not wait_for_object_start: - # if it's not a json object we can just forward the content by lines - if not has_an_object_start: - if len(wait_for_object_start_buffer) > 0: - # let's consume the buffer we set for waiting for object_start before the circuit breaker - for ( - line, - starting_offset, - ending_offset, - original_newline_length, - _, - ) in self._by_lines_fallback(wait_for_object_start_buffer): - self._handle_offset(len(line) + original_newline_length) - yield line, self._starting_offset, self._ending_offset, original_newline_length, None - - # let's reset the buffer - wait_for_object_start_buffer = b"" - - # json_content_type as json collected the whole content: let's pass through by_lines() - if storage.json_content_type == "single": - for ( - line, - starting_offset, - ending_offset, - original_newline_length, - _, - ) in self._by_lines_fallback(data + newline): - self._handle_offset(len(line) + original_newline_length) - yield line, self._starting_offset, self._ending_offset, original_newline_length, None - else: - yield data, original_starting_offset, original_ending_offset, newline_length, None - else: - # let's yield wait_for_object_start_buffer: it is newline only content - for line, starting_offset, ending_offset, original_newline_length, _ in self._by_lines_fallback( - wait_for_object_start_buffer - ): - self._handle_offset(len(line) + original_newline_length) - yield line, self._starting_offset, self._ending_offset, newline_length, None - - # let's reset the buffer - wait_for_object_start_buffer = b"" - for data_to_yield, json_object in self._collector(data, newline, newline_length): - shared_logger.debug("JsonCollector objects", extra={"offset": self._ending_offset}) - event_list_from_field_expander: Optional[ - ExpandEventListFromField - ] = storage.event_list_from_field_expander - + json_collector_state.has_an_object_start = True + + if not json_collector_state.has_an_object_start: + _handle_offset(len(data) + len(newline), json_collector_state) + yield data, starting_offset, ending_offset, newline, None + + if json_collector_state.has_an_object_start: + # let's parse the data only if it is not single, or we have a field expander + for data_to_yield, json_object in _collector(data, newline, json_collector_state): + shared_logger.debug( + "json_collector objects", extra={"offset": json_collector_state.ending_offset} + ) if event_list_from_field_expander is not None: for ( expanded_log_event, @@ -375,65 +303,60 @@ def __call__( expanded_ending_offset, expanded_event_n, ) in event_list_from_field_expander.expand( - data_to_yield, json_object, self._starting_offset, self._ending_offset + data_to_yield, + json_object, + json_collector_state.starting_offset, + json_collector_state.ending_offset, ): - to_be_yield = ( + yield ( expanded_log_event, expanded_starting_offset, expanded_ending_offset, - newline_length, + newline, expanded_event_n, ) - - yield to_be_yield - - del json_object else: - del json_object - yield data_to_yield, self._starting_offset, self._ending_offset, newline_length, None - - if self._is_a_json_object_circuit_broken: + yield ( + data_to_yield, + json_collector_state.starting_offset, + json_collector_state.ending_offset, + newline, + None, + ) + + del json_object + + # check if we hit the circuit broken + if json_collector_state.is_a_json_object_circuit_broken: # let's yield what we have so far - for ( - line, - starting_offset, - ending_offset, - original_newline_length, - _, - ) in self._by_lines_fallback(self._unfinished_line): - self._handle_offset(len(line) + original_newline_length) - - yield line, self._starting_offset, self._ending_offset, original_newline_length, None - - # let's set the flag for direct yield from now on - has_an_object_start = False - - # let's reset the buffer - self._unfinished_line = b"" + for line, _, _, original_newline, _ in _by_lines_fallback(json_collector_state): + yield ( + line, + json_collector_state.starting_offset, + json_collector_state.ending_offset, + original_newline, + None, + ) # in this case we could have a trailing new line in what's left in the buffer - # or the content had a leading `{` but was not a json object before the circuit breaker intercepted it + # or the content had a leading `{` but was not a json object before the circuit breaker intercepted it, # or we waited for the object start and never reached: # let's fallback to by_lines() - if not self._is_a_json_object: - buffer: bytes = self._unfinished_line - if wait_for_object_start: - buffer = wait_for_object_start_buffer - - for line, starting_offset, ending_offset, original_newline_length, _ in self._by_lines_fallback(buffer): - self._handle_offset(len(line) + original_newline_length) + if not json_collector_state.is_a_json_object: + for line, _, _, newline, _ in _by_lines_fallback(json_collector_state): + yield line, json_collector_state.starting_offset, json_collector_state.ending_offset, newline, None - yield line, self._starting_offset, self._ending_offset, original_newline_length, None + return wrapper -def inflate(func: GetByLinesCallable[ProtocolStorageType]) -> GetByLinesCallable[ProtocolStorageType]: +def inflate(func: StorageDecoratorCallable[ProtocolStorageType]) -> StorageDecoratorCallable[ProtocolStorageType]: """ ProtocolStorage decorator for returning inflated content in case the original is gzipped """ def wrapper( storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool - ) -> Iterator[tuple[Union[StorageReader, bytes], int, int, int, Optional[int]]]: + ) -> StorageDecoratorIterator: iterator = func(storage, range_start, body, is_gzipped) for data, _, _, _, _ in iterator: if is_gzipped: @@ -448,9 +371,9 @@ def wrapper( buffer.write(inflated_chunk) shared_logger.debug("inflate inflate") - yield buffer.getvalue(), 0, 0, 0, None + yield buffer.getvalue(), 0, 0, b"", None else: shared_logger.debug("inflate plain") - yield data, 0, 0, 0, None + yield data, 0, 0, b"", None return wrapper diff --git a/storage/payload.py b/storage/payload.py index 8f3ad878..5723c088 100644 --- a/storage/payload.py +++ b/storage/payload.py @@ -5,12 +5,19 @@ import binascii import gzip from io import SEEK_SET, BytesIO -from typing import Any, Iterator, Optional, Union +from typing import Any, Optional from share import ExpandEventListFromField, ProtocolMultiline, shared_logger -from .decorator import JsonCollector, by_lines, inflate, multi_line -from .storage import CHUNK_SIZE, CommonStorage, StorageReader, is_gzip_content +from .decorator import by_lines, inflate, json_collector, multi_line +from .storage import ( + CHUNK_SIZE, + CommonStorage, + GetByLinesIterator, + StorageDecoratorIterator, + StorageReader, + is_gzip_content, +) class PayloadStorage(CommonStorage): @@ -33,12 +40,10 @@ def __init__( self.event_list_from_field_expander = event_list_from_field_expander @multi_line - @JsonCollector + @json_collector @by_lines @inflate - def _generate( - self, range_start: int, body: BytesIO, is_gzipped: bool - ) -> Iterator[tuple[Union[StorageReader, bytes], int, int, int, Optional[int]]]: + def _generate(self, range_start: int, body: BytesIO, is_gzipped: bool) -> StorageDecoratorIterator: """ Concrete implementation of the iterator for get_by_lines """ @@ -50,16 +55,16 @@ def chunk_lambda() -> Any: if is_gzipped: reader: StorageReader = StorageReader(raw=body) - yield reader, 0, 0, 0, None + yield reader, 0, 0, b"", None else: for chunk in iter(chunk_lambda, b""): file_starting_offset = file_ending_offset file_ending_offset += len(chunk) shared_logger.debug("_generate flat", extra={"offset": file_ending_offset}) - yield chunk, file_starting_offset, file_ending_offset, 0, None + yield chunk, file_starting_offset, file_ending_offset, b"", None - def get_by_lines(self, range_start: int) -> Iterator[tuple[bytes, int, int, Optional[int]]]: + def get_by_lines(self, range_start: int) -> GetByLinesIterator: original_range_start: int = range_start is_gzipped: bool = False diff --git a/storage/s3.py b/storage/s3.py index 21126ab4..e05dee18 100644 --- a/storage/s3.py +++ b/storage/s3.py @@ -3,7 +3,7 @@ # you may not use this file except in compliance with the Elastic License 2.0. from io import SEEK_SET, BytesIO -from typing import Any, Iterator, Optional, Union +from typing import Any, Optional import boto3 import botocore.client @@ -12,8 +12,15 @@ from share import ExpandEventListFromField, ProtocolMultiline, shared_logger -from .decorator import JsonCollector, by_lines, inflate, multi_line -from .storage import CHUNK_SIZE, CommonStorage, StorageReader, is_gzip_content +from .decorator import by_lines, inflate, json_collector, multi_line +from .storage import ( + CHUNK_SIZE, + CommonStorage, + GetByLinesIterator, + StorageDecoratorIterator, + StorageReader, + is_gzip_content, +) class S3Storage(CommonStorage): @@ -41,12 +48,10 @@ def __init__( self.event_list_from_field_expander = event_list_from_field_expander @multi_line - @JsonCollector + @json_collector @by_lines @inflate - def _generate( - self, range_start: int, body: BytesIO, is_gzipped: bool - ) -> Iterator[tuple[Union[StorageReader, bytes], int, int, int, Optional[int]]]: + def _generate(self, range_start: int, body: BytesIO, is_gzipped: bool) -> StorageDecoratorIterator: """ Concrete implementation of the iterator for get_by_lines """ @@ -58,16 +63,16 @@ def chunk_lambda() -> Any: if is_gzipped: reader: StorageReader = StorageReader(raw=body) - yield reader, 0, 0, 0, None + yield reader, 0, 0, b"", None else: for chunk in iter(chunk_lambda, b""): file_starting_offset = file_ending_offset file_ending_offset += len(chunk) shared_logger.debug("_generate flat", extra={"offset": file_ending_offset}) - yield chunk, file_ending_offset, file_starting_offset, 0, None + yield chunk, file_ending_offset, file_starting_offset, b"", None - def get_by_lines(self, range_start: int) -> Iterator[tuple[bytes, int, int, Optional[int]]]: + def get_by_lines(self, range_start: int) -> GetByLinesIterator: original_range_start: int = range_start s3_object_head = self._s3_client.head_object(Bucket=self._bucket_name, Key=self._object_key) diff --git a/storage/storage.py b/storage/storage.py index 3e6c32a0..d969234e 100644 --- a/storage/storage.py +++ b/storage/storage.py @@ -6,6 +6,8 @@ from io import BytesIO from typing import Any, Callable, Iterator, Optional, Protocol, TypeVar, Union +from typing_extensions import TypeAlias + from share import ExpandEventListFromField, ProtocolMultiline # CHUNK_SIZE is how much we read from the gzip stream at every iteration in the inflate decorator @@ -33,6 +35,11 @@ def __getattr__(self, item: str) -> Any: return getattr(self._raw, item) +# GetByLinesIterator yields a tuple of content, starting offset, ending offset +# and optional offset of a list of expanded events +GetByLinesIterator: TypeAlias = Iterator[tuple[bytes, int, int, Optional[int]]] + + class ProtocolStorage(Protocol): """ Protocol for Storage components @@ -42,7 +49,7 @@ class ProtocolStorage(Protocol): multiline_processor: Optional[ProtocolMultiline] event_list_from_field_expander: Optional[ExpandEventListFromField] - def get_by_lines(self, range_start: int) -> Iterator[tuple[bytes, int, int, Optional[int]]]: + def get_by_lines(self, range_start: int) -> GetByLinesIterator: pass # pragma: no cover def get_as_string(self) -> str: @@ -60,7 +67,11 @@ class CommonStorage(metaclass=ABCMeta): ProtocolStorageType = TypeVar("ProtocolStorageType", bound=ProtocolStorage) -GetByLinesCallable = Callable[ - [ProtocolStorageType, int, BytesIO, bool], - Iterator[tuple[Union[StorageReader, bytes], int, int, int, Optional[int]]], -] + +# StorageDecoratorIterator yields a tuple of content (expressed as `StorageReader` or bytes), starting offset, +# ending offset, newline and optional offset of a list of expanded events +StorageDecoratorIterator: TypeAlias = Iterator[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]] + +# StorageDecoratorCallable accepts a `ProtocolStorageType`, the range start offset, the content as BytesIO and a boolean +# flag indicating if the content is gzipped as arguments. It returns a `StorageDecoratorIterator` +StorageDecoratorCallable = Callable[[ProtocolStorageType, int, BytesIO, bool], StorageDecoratorIterator] diff --git a/tests/handlers/aws/test_integrations.py b/tests/handlers/aws/test_integrations.py index eb185091..bf2a632c 100644 --- a/tests/handlers/aws/test_integrations.py +++ b/tests/handlers/aws/test_integrations.py @@ -72,9 +72,11 @@ def setUpClass(cls) -> None: lgc = LogstashContainer(es_container=esc) cls.logstash = lgc.start() - lsc = LocalStackContainer(image="localstack/localstack:1.4.0") + lsc = LocalStackContainer(image="localstack/localstack:3.0.1") lsc.with_env("EAGER_SERVICE_LOADING", "1") - lsc.with_services("kinesis", "logs", "s3", "sqs", "secretsmanager", "ec2") + lsc.with_env("SQS_DISABLE_CLOUDWATCH_METRICS", "1") + lsc.with_services("ec2", "kinesis", "logs", "s3", "sqs", "secretsmanager") + cls.localstack = lsc.start() session = boto3.Session(region_name=_AWS_REGION) @@ -386,7 +388,7 @@ def test_continuing(self) -> None: tags: {self.default_tags} outputs: {self.default_outputs} - type: "cloudwatch-logs" - id: "{cloudwatch_group_arn}:*" + id: "{cloudwatch_group_arn}" tags: {self.default_tags} outputs: {self.default_outputs} - type: sqs @@ -859,7 +861,7 @@ def test_replay(self) -> None: username: "wrong_username" password: "wrong_username" - type: "cloudwatch-logs" - id: "{cloudwatch_group_arn}:*" + id: "{cloudwatch_group_arn}" tags: {self.default_tags} outputs: - type: "elasticsearch" @@ -1200,7 +1202,7 @@ def test_replay(self) -> None: tags: {self.default_tags} outputs: {self.default_outputs} - type: "cloudwatch-logs" - id: "{cloudwatch_group_arn}:*" + id: "{cloudwatch_group_arn}" tags: {self.default_tags} outputs: {self.default_outputs} - type: sqs @@ -1454,7 +1456,7 @@ def test_empty(self) -> None: id: "{kinesis_stream_arn}" outputs: {self.default_outputs} - type: "cloudwatch-logs" - id: "{cloudwatch_group_arn}:*" + id: "{cloudwatch_group_arn}" outputs: {self.default_outputs} - type: sqs id: "{sqs_queue_arn}" @@ -1595,7 +1597,7 @@ def test_filtered(self) -> None: - "excluded" outputs: {self.default_outputs} - type: "cloudwatch-logs" - id: "{cloudwatch_group_arn}:*" + id: "{cloudwatch_group_arn}" exclude: - "excluded" outputs: {self.default_outputs} @@ -2527,7 +2529,7 @@ def test_cloudwatch_logs_stream_as_input_instead_of_group(self) -> None: messages_body=[fixtures[1]], ) - cloudwatch_group_arn = cloudwatch_group["arn"] + cloudwatch_group_arn = cloudwatch_group["arn"][0:-2] cloudwatch_group_name = cloudwatch_group_name cloudwatch_stream_name = cloudwatch_stream_name @@ -2633,7 +2635,7 @@ def test_cloudwatch_logs_last_ending_offset_reset(self) -> None: config_yaml: str = f""" inputs: - type: "cloudwatch-logs" - id: "{cloudwatch_group_arn}:*" + id: "{cloudwatch_group_arn}" tags: {self.default_tags} outputs: - type: "logstash" @@ -2746,7 +2748,7 @@ def test_cloudwatch_logs_last_event_expanded_offset_continue(self) -> None: config_yaml: str = f""" inputs: - type: "cloudwatch-logs" - id: "{cloudwatch_group_arn}:*" + id: "{cloudwatch_group_arn}" expand_event_list_from_field: aField tags: {self.default_tags} outputs: diff --git a/tests/handlers/aws/utils.py b/tests/handlers/aws/utils.py index 468b7d2d..a63608e5 100644 --- a/tests/handlers/aws/utils.py +++ b/tests/handlers/aws/utils.py @@ -21,6 +21,7 @@ from botocore.client import BaseClient as BotoBaseClient +from handlers.aws.utils import get_queue_url_from_sqs_arn from share import json_dumper _AWS_REGION = "us-east-1" @@ -226,11 +227,7 @@ def _kinesis_retrieve_event_from_kinesis_stream( def _sqs_create_queue(client: BotoBaseClient, queue_name: str, endpoint_url: str = "") -> dict[str, Any]: queue_url = client.create_queue(QueueName=queue_name)["QueueUrl"] queue_arn = client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])["Attributes"]["QueueArn"] - return { - "QueueArn": queue_arn, - "QueueUrl": queue_url, - "QueueUrlPath": queue_url.replace(endpoint_url, f"https://sqs.{_AWS_REGION}.amazonaws.com"), - } + return {"QueueArn": queue_arn, "QueueUrl": queue_url, "QueueUrlPath": get_queue_url_from_sqs_arn(queue_arn)} def _sqs_send_messages(client: BotoBaseClient, queue_url: str, message_body: str) -> None: diff --git a/tests/share/test_multiline.py b/tests/share/test_multiline.py index c0efd9bc..3da331ed 100644 --- a/tests/share/test_multiline.py +++ b/tests/share/test_multiline.py @@ -2,13 +2,13 @@ # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. import datetime -from typing import Iterator, Optional +from typing import Optional from unittest import TestCase import mock import pytest -from share import CollectBuffer, CountMultiline, PatternMultiline, WhileMultiline +from share import CollectBuffer, CountMultiline, FeedIterator, PatternMultiline, WhileMultiline collect_buffer_grow = [ pytest.param( @@ -46,7 +46,7 @@ def test_collect_buffer_grow( for i, line in enumerate(lines): collect_buffer.grow(data=line, newline=newline) - content, content_length = collect_buffer.collect_and_reset() + content, content_length, _ = collect_buffer.collect_and_reset() assert content == expected_content assert content_length == expected_content_length @@ -100,7 +100,7 @@ def test_collect_buffer_collect( for i, line in enumerate(lines): collect_buffer.grow(data=line, newline=newline) - content, content_length = collect_buffer.collect_and_reset() + content, content_length, _ = collect_buffer.collect_and_reset() assert content == expected_content assert content_length == expected_content_length @@ -406,11 +406,9 @@ def test_pattern_multiline( pattern=pattern, match=match, flush_pattern=flush_pattern, negate=negate, max_lines=max_lines ) - newline_length: int = len(newline) - - def feed_iterator(content: bytes) -> Iterator[tuple[bytes, bytes, int]]: + def feed_iterator(content: bytes) -> FeedIterator: for line in content.splitlines(): - yield line, newline, newline_length + yield line, newline pattern_multiline.feed = feed_iterator(feed) @@ -432,14 +430,14 @@ def feed_iterator(content: bytes) -> Iterator[tuple[bytes, bytes, int]]: b"\r\n", b"line1\r\n line1.1\r\n line1.2\r\nline2\r\n line2.1\r\n line2.2\r\n", [(b"line1", 7), (b" line1.1", 11), (b" line1.2", 11), (b"line2", 7), (b" line2.1", 11), (b" line2.2", 11)], - id="circuit breaker: \n", + id="circuit breaker: \r\n", ), ] @pytest.mark.unit @pytest.mark.parametrize("newline,feed,expected_events", pattern_multiline_collect_circuitbreaker) -@mock.patch("share.multiline.timedelta_circuit_breaker", new=datetime.timedelta(seconds=0)) +@mock.patch("share.multiline.timedelta_circuit_breaker", new=datetime.timedelta(seconds=-1)) def test_pattern_multiline_circuitbreaker( newline: bytes, feed: bytes, @@ -447,11 +445,9 @@ def test_pattern_multiline_circuitbreaker( ) -> None: pattern_multiline: PatternMultiline = PatternMultiline(pattern="^[ \t] +", match="after") - newline_length: int = len(newline) - - def feed_iterator(content: bytes) -> Iterator[tuple[bytes, bytes, int]]: + def feed_iterator(content: bytes) -> FeedIterator: for line in content.splitlines(): - yield line, newline, newline_length + yield line, newline pattern_multiline.feed = feed_iterator(feed) @@ -574,11 +570,9 @@ def test_count_multiline( count_multiline: CountMultiline = CountMultiline(count_lines=count_lines, max_lines=max_lines) - newline_length: int = len(newline) - - def feed_iterator(content: bytes) -> Iterator[tuple[bytes, bytes, int]]: + def feed_iterator(content: bytes) -> FeedIterator: for line in content.splitlines(): - yield line, newline, newline_length + yield line, newline count_multiline.feed = feed_iterator(feed) @@ -611,11 +605,9 @@ def feed_iterator(content: bytes) -> Iterator[tuple[bytes, bytes, int]]: def test_count_multiline_circuitbreaker(newline: bytes, feed: bytes, expected_events: list[tuple[bytes, int]]) -> None: count_multiline: CountMultiline = CountMultiline(count_lines=2) - newline_length: int = len(newline) - - def feed_iterator(content: bytes) -> Iterator[tuple[bytes, bytes, int]]: + def feed_iterator(content: bytes) -> FeedIterator: for line in content.splitlines(): - yield line, newline, newline_length + yield line, newline count_multiline.feed = feed_iterator(feed) @@ -735,11 +727,9 @@ def test_while_multiline( while_multiline: WhileMultiline = WhileMultiline(pattern=pattern, negate=negate, max_lines=max_lines) - newline_length: int = len(newline) - - def feed_iterator(content: bytes) -> Iterator[tuple[bytes, bytes, int]]: + def feed_iterator(content: bytes) -> FeedIterator: for line in content.splitlines(): - yield line, newline, newline_length + yield line, newline while_multiline.feed = feed_iterator(feed) @@ -772,11 +762,9 @@ def feed_iterator(content: bytes) -> Iterator[tuple[bytes, bytes, int]]: def test_while_multiline_circuitbreaker(newline: bytes, feed: bytes, expected_events: list[tuple[bytes, int]]) -> None: while_multiline: WhileMultiline = WhileMultiline(pattern="^{") - newline_length: int = len(newline) - - def feed_iterator(content: bytes) -> Iterator[tuple[bytes, bytes, int]]: + def feed_iterator(content: bytes) -> FeedIterator: for line in content.splitlines(): - yield line, newline, newline_length + yield line, newline while_multiline.feed = feed_iterator(feed) diff --git a/tests/storage/test_benchmark.py b/tests/storage/test_benchmark.py index 00857bd5..40da50bb 100644 --- a/tests/storage/test_benchmark.py +++ b/tests/storage/test_benchmark.py @@ -170,6 +170,7 @@ def init_content( for _ in range(1, int(length_multiplier / 2)) ] ) + mock_content = mock_content.rstrip(newline) elif content_type.startswith("_IS_MULTILINE"): # every line is from 0 to 20 chars, repeated for length_multiplier mock_content = newline.join( @@ -194,6 +195,7 @@ def init_content( for _ in range(1, length_multiplier) ] ) + mock_content = mock_content.rstrip(newline) if content_type == _IS_JSON_LIKE: mock_content = b"{" + mock_content diff --git a/tests/storage/test_decorator.py b/tests/storage/test_decorator.py new file mode 100644 index 00000000..2ef065b3 --- /dev/null +++ b/tests/storage/test_decorator.py @@ -0,0 +1,211 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License 2.0; +# you may not use this file except in compliance with the Elastic License 2.0. + +import gzip +from io import BytesIO +from typing import Optional, Union +from unittest import TestCase + +import pytest + +from share import ExpandEventListFromField, MultilineFactory, ProtocolMultiline +from storage import ( + CommonStorage, + GetByLinesIterator, + StorageDecoratorIterator, + StorageReader, + by_lines, + inflate, + json_collector, + multi_line, +) + + +class DummyStorage(CommonStorage): + """ + Dummy Storage. + """ + + def __init__( + self, + json_content_type: Optional[str] = None, + multiline_processor: Optional[ProtocolMultiline] = None, + event_list_from_field_expander: Optional[ExpandEventListFromField] = None, + ): + self.json_content_type = json_content_type + self.multiline_processor = multiline_processor + self.event_list_from_field_expander = event_list_from_field_expander + + @staticmethod + def get_by_lines(range_start: int) -> GetByLinesIterator: + yield b"", 0, 0, None + + @staticmethod + def get_as_string() -> str: + return "" + + @multi_line + @json_collector + @by_lines + @inflate + def generate(self, range_start: int, body: BytesIO, is_gzipped: bool) -> StorageDecoratorIterator: + if is_gzipped: + reader: StorageReader = StorageReader(raw=body) + yield reader, 0, 0, b"", None + else: + yield body.read(), 0, 0, b"", None + + +@pytest.mark.unit +class TestDecorator(TestCase): + def test_plain(self) -> None: + storage = DummyStorage() + fixtures = BytesIO(b"line1\nline2\nline3\n") + expected = [ + (b"line1", 0, 6, b"\n", None), + (b"line2", 6, 12, b"\n", None), + (b"line3", 12, 18, b"\n", None), + ] + + decorated: list[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]] = list( + [ + (data, starting_offset, ending_offset, newline, event_expanded_offset) + for data, starting_offset, ending_offset, newline, event_expanded_offset in storage.generate( + 0, fixtures, False + ) + ] + ) + + assert expected == decorated + + def test_ndjson(self) -> None: + storage = DummyStorage(json_content_type="ndjson") + fixtures = BytesIO(b'{"line": 1}\n{"line": 2}\n{"line": 3}\n') + expected = [ + (b'{"line": 1}', 0, 12, b"\n", None), + (b'{"line": 2}', 12, 24, b"\n", None), + (b'{"line": 3}', 24, 36, b"\n", None), + ] + + decorated: list[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]] = list( + [ + (data, starting_offset, ending_offset, newline, event_expanded_offset) + for data, starting_offset, ending_offset, newline, event_expanded_offset in storage.generate( + 0, fixtures, False + ) + ] + ) + + assert expected == decorated + + def test_single(self) -> None: + storage = DummyStorage(json_content_type="single") + fixtures = BytesIO(b'{"line1": 1,\n"line2": 2,\n"line3": 3}\n') + expected = [ + (b'{"line1": 1,\n"line2": 2,\n"line3": 3}', 0, 37, b"\n", None), + ] + + decorated: list[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]] = list( + [ + (data, starting_offset, ending_offset, newline, event_expanded_offset) + for data, starting_offset, ending_offset, newline, event_expanded_offset in storage.generate( + 0, fixtures, False + ) + ] + ) + + assert expected == decorated + + def test_gzip(self) -> None: + storage = DummyStorage() + fixtures = BytesIO(gzip.compress(b"line1\nline2\nline3\n")) + expected = [ + (b"line1", 0, 6, b"\n", None), + (b"line2", 6, 12, b"\n", None), + (b"line3", 12, 18, b"\n", None), + ] + + decorated: list[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]] = list( + [ + (data, starting_offset, ending_offset, newline, event_expanded_offset) + for data, starting_offset, ending_offset, newline, event_expanded_offset in storage.generate( + 0, fixtures, True + ) + ] + ) + + assert expected == decorated + + def test_expand_event_list_from_field(self) -> None: + def resolver(_: str, field_to_expand_event_list_from: str) -> str: + return field_to_expand_event_list_from + + event_list_from_field_expander = ExpandEventListFromField("Records", "", resolver, None, None) + + storage = DummyStorage( + event_list_from_field_expander=event_list_from_field_expander, json_content_type="single" + ) + fixtures = BytesIO(b'{"Records": [{"line": 1},\n{"line": 2},\n{"line": 3}\n]}\n') + expected = [ + (b'{"line":1}', 0, 0, b"\n", 0), # ending_offset is not set with event list from field expander + (b'{"line":2}', 18, 0, b"\n", 1), # ending_offset is not set with event list from field expander + (b'{"line":3}', 36, 54, b"\n", None), # ending_offset is set only for the last expanded event + ] + + decorated: list[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]] = list( + [ + (data, starting_offset, ending_offset, newline, event_expanded_offset) + for data, starting_offset, ending_offset, newline, event_expanded_offset in storage.generate( + 0, fixtures, False + ) + ] + ) + + assert expected == decorated + + def test_expand_event_list_from_field_with_offset(self) -> None: + def resolver(_: str, field_to_expand_event_list_from: str) -> str: + return field_to_expand_event_list_from + + event_list_from_field_expander = ExpandEventListFromField("Records", "", resolver, None, 0) + + storage = DummyStorage( + event_list_from_field_expander=event_list_from_field_expander, json_content_type="single" + ) + fixtures = BytesIO(b'{"Records": [{"line": 1},\n{"line": 2},\n{"line": 3}\n]}\n') + expected = [ + (b'{"line":2}', 18, 0, b"\n", 1), + (b'{"line":3}', 36, 54, b"\n", None), + ] + + decorated: list[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]] = list( + [ + (data, starting_offset, ending_offset, newline, event_expanded_offset) + for data, starting_offset, ending_offset, newline, event_expanded_offset in storage.generate( + 0, fixtures, False + ) + ] + ) + + assert expected == decorated + + def test_multiline_processor(self) -> None: + multiline_processor = MultilineFactory.create(multiline_type="count", count_lines=3) + + storage = DummyStorage(multiline_processor=multiline_processor, json_content_type="single") + fixtures = BytesIO(b"line1\nline2\nline3\n") + expected = [ + (b"line1\nline2\nline3", 0, 18, b"\n", None), + ] + + decorated: list[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]] = list( + [ + (data, starting_offset, ending_offset, newline, event_expanded_offset) + for data, starting_offset, ending_offset, newline, event_expanded_offset in storage.generate( + 0, fixtures, False + ) + ] + ) + + assert expected == decorated diff --git a/tests/storage/test_payload.py b/tests/storage/test_payload.py index a377a108..42c31320 100644 --- a/tests/storage/test_payload.py +++ b/tests/storage/test_payload.py @@ -14,7 +14,6 @@ from storage import PayloadStorage from .test_benchmark import ( - _IS_JSON, _IS_PLAIN, _LENGTH_ABOVE_THRESHOLD, MockContentBase, @@ -117,12 +116,9 @@ def test_get_by_lines( assert plain_full[-1][2] == original_length joined = joiner_token.join([x[0] for x in plain_full]) - if original.endswith(newline): - joined += newline - assert joined == original - if len(newline) == 0 or (content_type == _IS_JSON and json_content_type == "single"): + if len(newline) == 0 or (json_content_type == "single"): return gzip_full_01 = gzip_full[: int(len(gzip_full) / 2)] @@ -165,8 +161,6 @@ def test_get_by_lines( + joiner_token + joiner_token.join([x[0] for x in plain_full_02]) ) - if original.endswith(newline): - joined += newline assert joined == original @@ -212,8 +206,6 @@ def test_get_by_lines( + joiner_token + joiner_token.join([x[0] for x in plain_full_03]) ) - if original.endswith(newline): - joined += newline assert joined == original diff --git a/tests/storage/test_s3.py b/tests/storage/test_s3.py index 4d071b85..8556853f 100644 --- a/tests/storage/test_s3.py +++ b/tests/storage/test_s3.py @@ -15,7 +15,6 @@ from storage import S3Storage from .test_benchmark import ( - _IS_JSON, _IS_PLAIN, _LENGTH_ABOVE_THRESHOLD, MockContentBase, @@ -147,12 +146,9 @@ def test_get_by_lines( assert plain_full[-1][2] == original_length joined = joiner_token.join([x[0] for x in plain_full]) - if MockContent.f_content_plain.endswith(newline): - joined += newline - assert joined == MockContent.f_content_plain - if len(newline) == 0 or (content_type == _IS_JSON and json_content_type == "single"): + if len(newline) == 0 or (json_content_type == "single"): return gzip_full_01 = gzip_full[: int(len(gzip_full) / 2)] @@ -195,8 +191,6 @@ def test_get_by_lines( + joiner_token + joiner_token.join([x[0] for x in plain_full_02]) ) - if MockContent.f_content_plain.endswith(newline): - joined += newline assert joined == MockContent.f_content_plain @@ -242,8 +236,6 @@ def test_get_by_lines( + joiner_token + joiner_token.join([x[0] for x in plain_full_03]) ) - if MockContent.f_content_plain.endswith(newline): - joined += newline assert joined == MockContent.f_content_plain