From 1f602b97f3e291b194b762dd1147f222158339f6 Mon Sep 17 00:00:00 2001 From: Andrea Spacca Date: Fri, 24 Nov 2023 19:53:18 +0900 Subject: [PATCH] even simplier --- storage/decorator.py | 255 ++++++++++++++------------------ storage/payload.py | 4 +- storage/s3.py | 4 +- tests/storage/test_benchmark.py | 3 +- tests/storage/test_payload.py | 4 - tests/storage/test_s3.py | 4 - 6 files changed, 116 insertions(+), 158 deletions(-) diff --git a/storage/decorator.py b/storage/decorator.py index 68dcf04a..6cf044e1 100644 --- a/storage/decorator.py +++ b/storage/decorator.py @@ -111,56 +111,63 @@ def iterator_to_multiline_feed() -> Iterator[tuple[bytes, bytes]]: return wrapper -class JsonCollector: - """ - ProtocolStorage decorator for returning content by collected json object (if any) spanning multiple lines - """ +class JsonCollectorState: + def __init__(self, storage: StorageReader): + self.storage: StorageReader = storage + + self.starting_offset: int = 0 + self.ending_offset: int = 0 - def __init__(self, function: GetByLinesCallable[ProtocolStorageType]): - self._function: GetByLinesCallable[ProtocolStorageType] = function + self.unfinished_line: bytes = b"" - self._starting_offset: int = 0 - self._ending_offset: int = 0 + self.has_an_object_start: bool = False - self._single_payload: bytes = b"" - self._unfinished_line: bytes = b"" + self.is_a_json_object: bool = False + self.is_a_json_object_circuit_broken: bool = False + self.is_a_json_object_circuit_breaker: int = 0 - self._is_a_json_object: bool = False - self._is_a_json_object_circuit_broken: bool = True - self._is_a_json_object_circuit_breaker: int = 0 - self._storage: Optional[ProtocolStorageType] = None +def json_collector(func: GetByLinesCallable[ProtocolStorageType]) -> GetByLinesCallable[ProtocolStorageType]: + """ + ProtocolStorage decorator for returning content by collected json object (if any) spanning multiple lines + """ + + def _handle_offset(offset_skew: int, json_collector_state: JsonCollectorState) -> None: + json_collector_state.starting_offset = json_collector_state.ending_offset + json_collector_state.ending_offset += offset_skew - def _collector(self, data: bytes, newline: bytes) -> Iterator[tuple[bytes, Optional[dict[str, Any]]]]: + def _collector( + data: bytes, newline: bytes, json_collector_state: JsonCollectorState + ) -> Iterator[tuple[bytes, Optional[dict[str, Any]]]]: try: # let's buffer the content # we receive data without newline # let's append it as well - self._unfinished_line += data + newline + json_collector_state.unfinished_line += data + newline # let's try to decode - json_object = json_parser(self._unfinished_line) + json_object = json_parser(json_collector_state.unfinished_line) # it didn't raise: we collected a json object - data_to_yield = self._unfinished_line + data_to_yield = json_collector_state.unfinished_line # let's reset the buffer - self._unfinished_line = b"" + json_collector_state.unfinished_line = b"" # let's increase the offset for yielding - self._handle_offset(len(data_to_yield)) + _handle_offset(len(data_to_yield), json_collector_state) # let's decrease the circuit breaker by the number of lines in the data to yield if newline != b"": - self._is_a_json_object_circuit_breaker -= data_to_yield.count(newline) - 1 + json_collector_state.is_a_json_object_circuit_breaker -= data_to_yield.count(newline) - 1 else: - self._is_a_json_object_circuit_breaker -= 1 + json_collector_state.is_a_json_object_circuit_breaker -= 1 # let's trim surrounding newline data_to_yield = data_to_yield.strip(b"\r\n").strip(b"\n") # let's set the flag for json object - self._is_a_json_object = True + json_collector_state.is_a_json_object = True # finally yield yield data_to_yield, json_object @@ -169,29 +176,30 @@ def _collector(self, data: bytes, newline: bytes) -> Iterator[tuple[bytes, Optio # let's keep iterating except ValueError: # it's an empty line, let's yield it - if self._is_a_json_object and len(self._unfinished_line.strip(b"\r\n").strip(b"\n")) == 0: + if ( + json_collector_state.is_a_json_object + and len(json_collector_state.unfinished_line.strip(b"\r\n").strip(b"\n")) == 0 + ): # let's reset the buffer - self._unfinished_line = b"" + json_collector_state.unfinished_line = b"" # let's increase the offset for yielding - self._handle_offset(len(newline)) + _handle_offset(len(newline), json_collector_state) # finally yield yield b"", None else: # buffer was not a complete json object # let's increase the circuit breaker - self._is_a_json_object_circuit_breaker += 1 + json_collector_state.is_a_json_object_circuit_breaker += 1 # if the first 1k lines are not a json object let's give up - if self._is_a_json_object_circuit_breaker > 1000: - self._is_a_json_object_circuit_broken = True + if json_collector_state.is_a_json_object_circuit_breaker > 1000: + json_collector_state.is_a_json_object_circuit_broken = True def _by_lines_fallback( - self, buffer: bytes + json_collector_state: JsonCollectorState, ) -> Iterator[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]]: - assert self._storage is not None - @by_lines def wrapper( storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool @@ -199,46 +207,43 @@ def wrapper( data_to_yield: bytes = body.read() yield data_to_yield, 0, range_start, b"", None - for line, starting_offset, ending_offset, newline, _ in wrapper( - self._storage, self._ending_offset, BytesIO(buffer), False + for line, _, _, newline, _ in wrapper( + json_collector_state.storage, + json_collector_state.ending_offset, + BytesIO(json_collector_state.unfinished_line), + False, ): assert isinstance(line, bytes) - yield line, starting_offset, ending_offset, newline, None - def _handle_offset(self, offset_skew: int) -> None: - self._starting_offset = self._ending_offset - self._ending_offset += offset_skew + _handle_offset(len(line) + len(newline), json_collector_state) + + # let's reset the buffer + json_collector_state.unfinished_line = b"" + + # let's set the flag for direct yield from now on + json_collector_state.has_an_object_start = False - def __call__( - self, storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool + yield line, _, _, newline, None + + def wrapper( + storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool ) -> Iterator[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]]: - self._storage = storage + json_collector_state = JsonCollectorState(storage=storage) + multiline_processor: Optional[ProtocolMultiline] = storage.multiline_processor if storage.json_content_type == "disabled" or multiline_processor: - iterator = self._function(storage, range_start, body, is_gzipped) + iterator = func(storage, range_start, body, is_gzipped) for data, starting_offset, ending_offset, newline, _ in iterator: assert isinstance(data, bytes) - shared_logger.debug("JsonCollector skipped", extra={"offset": ending_offset}) + shared_logger.debug("json_collector skipped", extra={"offset": ending_offset}) yield data, starting_offset, ending_offset, newline, None else: event_list_from_field_expander: Optional[ExpandEventListFromField] = storage.event_list_from_field_expander - has_an_object_start: bool = False - wait_for_object_start: bool = True - wait_for_object_start_buffer: bytes = b"" - - self._ending_offset = range_start - self._starting_offset = 0 - - self._single_payload = b"" - self._unfinished_line = b"" + json_collector_state.ending_offset = range_start - self._is_a_json_object = False - self._is_a_json_object_circuit_broken = False - self._is_a_json_object_circuit_breaker = 0 - - iterator = self._function(storage, range_start, body, is_gzipped) + iterator = func(storage, range_start, body, is_gzipped) # if we know it's a single json we replace body with the whole content, # and mark the object as started if storage.json_content_type == "single" and event_list_from_field_expander is None: @@ -259,106 +264,66 @@ def __call__( for data, starting_offset, ending_offset, newline, _ in iterator: assert isinstance(data, bytes) - # we still wait for the object to start - if wait_for_object_start: - # we check for a potential json object on - # we strip leading space to be safe on padding + # if it's not a json object we can just forward the content by lines + if not json_collector_state.has_an_object_start: + # if range_start is greater than zero, or we have leading space, data can be empty stripped_data = data.decode("utf-8").lstrip() + if len(stripped_data) > 0 and stripped_data[0] == "{": + # we mark the potentiality of a json object start + # CAVEAT: if the log entry starts with `{` but the + # content is not json, we buffer the first 10k lines + # before the circuit breaker kicks in + json_collector_state.has_an_object_start = True + + if not json_collector_state.has_an_object_start: + _handle_offset(len(data) + len(newline), json_collector_state) + yield data, starting_offset, ending_offset, newline, None - # if range_start is greater than zero, data can be empty - if len(stripped_data) > 0: - wait_for_object_start = False - if stripped_data[0] == "{": - # we mark the potentiality of a json object start - # CAVEAT: if the log entry starts with `{` but the - # content is not json, we buffer the first 10k lines - # before the circuit breaker kicks in - has_an_object_start = True - else: - # let's buffer discarded data including newline for eventual by_lines() fallback - wait_for_object_start_buffer += data + newline - - if not wait_for_object_start: - # if it's not a json object we can just forward the content by lines - if not has_an_object_start: - if len(wait_for_object_start_buffer) > 0: - # let's yield the buffer we set for waiting for object_start before the circuit breaker - for line, _, _, original_newline, _ in self._by_lines_fallback( - wait_for_object_start_buffer + if json_collector_state.has_an_object_start: + # let's parse the data only if it is not single, or we have a field expander + for data_to_yield, json_object in _collector(data, newline, json_collector_state): + shared_logger.debug( + "json_collector objects", extra={"offset": json_collector_state.ending_offset} + ) + if event_list_from_field_expander is not None: + for ( + expanded_log_event, + expanded_starting_offset, + expanded_ending_offset, + expanded_event_n, + ) in event_list_from_field_expander.expand( + data_to_yield, + json_object, + json_collector_state.starting_offset, + json_collector_state.ending_offset, ): - self._handle_offset(len(line) + len(original_newline)) - yield line, self._starting_offset, self._ending_offset, original_newline, None - - # let's reset the buffer - wait_for_object_start_buffer = b"" - - yield data, starting_offset, ending_offset, newline, None - else: - # let's yield wait_for_object_start_buffer: it is newline only content - for line, _, _, original_newline, _ in self._by_lines_fallback( - wait_for_object_start_buffer - ): - self._handle_offset(len(line) + len(original_newline)) - yield line, self._starting_offset, self._ending_offset, original_newline, None - - # let's reset the buffer - wait_for_object_start_buffer = b"" - - # let's parse the data only if it is not single, or we have a field expander - if storage.json_content_type != "single" or event_list_from_field_expander is not None: - for data_to_yield, json_object in self._collector(data, newline): - shared_logger.debug("JsonCollector objects", extra={"offset": self._ending_offset}) - if event_list_from_field_expander is not None: - for ( - expanded_log_event, - expanded_starting_offset, - expanded_ending_offset, - expanded_event_n, - ) in event_list_from_field_expander.expand( - data_to_yield, json_object, self._starting_offset, self._ending_offset - ): - yield ( - expanded_log_event, - expanded_starting_offset, - expanded_ending_offset, - newline, - expanded_event_n, - ) - else: - yield data_to_yield, self._starting_offset, self._ending_offset, newline, None - - del json_object + yield ( + expanded_log_event, + expanded_starting_offset, + expanded_ending_offset, + newline, + expanded_event_n, + ) else: - self._handle_offset(len(data) + len(newline)) - yield data, self._starting_offset, self._ending_offset, newline, None - - if self._is_a_json_object_circuit_broken: - # let's yield what we have so far - for line, _, _, original_newline, _ in self._by_lines_fallback(self._unfinished_line): - self._handle_offset(len(line) + len(original_newline)) + yield data_to_yield, json_collector_state.starting_offset, json_collector_state.ending_offset, newline, None - yield line, self._starting_offset, self._ending_offset, original_newline, None + del json_object - # let's set the flag for direct yield from now on - has_an_object_start = False - - # let's reset the buffer - self._unfinished_line = b"" + # check if we hit the circuit broken + if json_collector_state.is_a_json_object_circuit_broken: + # let's yield what we have so far + for line, _, _, original_newline, _ in _by_lines_fallback(json_collector_state): + yield line, json_collector_state.starting_offset, json_collector_state.ending_offset, original_newline, None # in this case we could have a trailing new line in what's left in the buffer # or the content had a leading `{` but was not a json object before the circuit breaker intercepted it, # or we waited for the object start and never reached: # let's fallback to by_lines() - if not self._is_a_json_object: - buffer: bytes = self._unfinished_line - if wait_for_object_start: - buffer = wait_for_object_start_buffer - - for line, _, _, newline, _ in self._by_lines_fallback(buffer): - line = line.strip(newline) - self._handle_offset(len(line) + len(newline)) + if not json_collector_state.is_a_json_object: + for line, _, _, newline, _ in _by_lines_fallback(json_collector_state): + yield line, json_collector_state.starting_offset, json_collector_state.ending_offset, newline, None - yield line, self._starting_offset, self._ending_offset, newline, None + return wrapper def inflate(func: GetByLinesCallable[ProtocolStorageType]) -> GetByLinesCallable[ProtocolStorageType]: diff --git a/storage/payload.py b/storage/payload.py index 3ef4333a..aa1bcb63 100644 --- a/storage/payload.py +++ b/storage/payload.py @@ -9,7 +9,7 @@ from share import ExpandEventListFromField, ProtocolMultiline, shared_logger -from .decorator import JsonCollector, by_lines, inflate, multi_line +from .decorator import json_collector, by_lines, inflate, multi_line from .storage import CHUNK_SIZE, CommonStorage, StorageReader, is_gzip_content @@ -33,7 +33,7 @@ def __init__( self.event_list_from_field_expander = event_list_from_field_expander @multi_line - @JsonCollector + @json_collector @by_lines @inflate def _generate( diff --git a/storage/s3.py b/storage/s3.py index 6428aee2..12e02c73 100644 --- a/storage/s3.py +++ b/storage/s3.py @@ -12,7 +12,7 @@ from share import ExpandEventListFromField, ProtocolMultiline, shared_logger -from .decorator import JsonCollector, by_lines, inflate, multi_line +from .decorator import json_collector, by_lines, inflate, multi_line from .storage import CHUNK_SIZE, CommonStorage, StorageReader, is_gzip_content @@ -41,7 +41,7 @@ def __init__( self.event_list_from_field_expander = event_list_from_field_expander @multi_line - @JsonCollector + @json_collector @by_lines @inflate def _generate( diff --git a/tests/storage/test_benchmark.py b/tests/storage/test_benchmark.py index e8fffd51..55a89aaf 100644 --- a/tests/storage/test_benchmark.py +++ b/tests/storage/test_benchmark.py @@ -68,7 +68,7 @@ def get_by_lines_parameters() -> list[tuple[int, str, bytes]]: _IS_MULTILINE_PATTERN, _IS_MULTILINE_WHILE, ]: - for newline in [b"", b"\n", b"\r\n"]: + for newline in [b"\n", b"\r\n"]: for json_content_type in [None, "single", "ndjson", "disabled"]: parameters.append( pytest.param( @@ -195,6 +195,7 @@ def init_content( for _ in range(1, length_multiplier) ] ) + mock_content = mock_content.rstrip(newline) if content_type == _IS_JSON_LIKE: mock_content = b"{" + mock_content diff --git a/tests/storage/test_payload.py b/tests/storage/test_payload.py index bbb63a59..42c31320 100644 --- a/tests/storage/test_payload.py +++ b/tests/storage/test_payload.py @@ -161,8 +161,6 @@ def test_get_by_lines( + joiner_token + joiner_token.join([x[0] for x in plain_full_02]) ) - if original.endswith(newline): - joined += newline assert joined == original @@ -208,8 +206,6 @@ def test_get_by_lines( + joiner_token + joiner_token.join([x[0] for x in plain_full_03]) ) - if original.endswith(newline): - joined += newline assert joined == original diff --git a/tests/storage/test_s3.py b/tests/storage/test_s3.py index cd5bce40..8556853f 100644 --- a/tests/storage/test_s3.py +++ b/tests/storage/test_s3.py @@ -191,8 +191,6 @@ def test_get_by_lines( + joiner_token + joiner_token.join([x[0] for x in plain_full_02]) ) - if MockContent.f_content_plain.endswith(newline): - joined += newline assert joined == MockContent.f_content_plain @@ -238,8 +236,6 @@ def test_get_by_lines( + joiner_token + joiner_token.join([x[0] for x in plain_full_03]) ) - if MockContent.f_content_plain.endswith(newline): - joined += newline assert joined == MockContent.f_content_plain