Skip to content

Commit

Permalink
even simplier
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrea Spacca committed Nov 24, 2023
1 parent 5fde4a9 commit 1f602b9
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 158 deletions.
255 changes: 110 additions & 145 deletions storage/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,56 +111,63 @@ def iterator_to_multiline_feed() -> Iterator[tuple[bytes, bytes]]:
return wrapper


class JsonCollector:
"""
ProtocolStorage decorator for returning content by collected json object (if any) spanning multiple lines
"""
class JsonCollectorState:
def __init__(self, storage: StorageReader):
self.storage: StorageReader = storage

self.starting_offset: int = 0
self.ending_offset: int = 0

def __init__(self, function: GetByLinesCallable[ProtocolStorageType]):
self._function: GetByLinesCallable[ProtocolStorageType] = function
self.unfinished_line: bytes = b""

self._starting_offset: int = 0
self._ending_offset: int = 0
self.has_an_object_start: bool = False

self._single_payload: bytes = b""
self._unfinished_line: bytes = b""
self.is_a_json_object: bool = False
self.is_a_json_object_circuit_broken: bool = False
self.is_a_json_object_circuit_breaker: int = 0

self._is_a_json_object: bool = False
self._is_a_json_object_circuit_broken: bool = True
self._is_a_json_object_circuit_breaker: int = 0

self._storage: Optional[ProtocolStorageType] = None
def json_collector(func: GetByLinesCallable[ProtocolStorageType]) -> GetByLinesCallable[ProtocolStorageType]:
"""
ProtocolStorage decorator for returning content by collected json object (if any) spanning multiple lines
"""

def _handle_offset(offset_skew: int, json_collector_state: JsonCollectorState) -> None:
json_collector_state.starting_offset = json_collector_state.ending_offset
json_collector_state.ending_offset += offset_skew

def _collector(self, data: bytes, newline: bytes) -> Iterator[tuple[bytes, Optional[dict[str, Any]]]]:
def _collector(
data: bytes, newline: bytes, json_collector_state: JsonCollectorState
) -> Iterator[tuple[bytes, Optional[dict[str, Any]]]]:
try:
# let's buffer the content
# we receive data without newline
# let's append it as well
self._unfinished_line += data + newline
json_collector_state.unfinished_line += data + newline

# let's try to decode
json_object = json_parser(self._unfinished_line)
json_object = json_parser(json_collector_state.unfinished_line)

# it didn't raise: we collected a json object
data_to_yield = self._unfinished_line
data_to_yield = json_collector_state.unfinished_line

# let's reset the buffer
self._unfinished_line = b""
json_collector_state.unfinished_line = b""

# let's increase the offset for yielding
self._handle_offset(len(data_to_yield))
_handle_offset(len(data_to_yield), json_collector_state)

# let's decrease the circuit breaker by the number of lines in the data to yield
if newline != b"":
self._is_a_json_object_circuit_breaker -= data_to_yield.count(newline) - 1
json_collector_state.is_a_json_object_circuit_breaker -= data_to_yield.count(newline) - 1
else:
self._is_a_json_object_circuit_breaker -= 1
json_collector_state.is_a_json_object_circuit_breaker -= 1

# let's trim surrounding newline
data_to_yield = data_to_yield.strip(b"\r\n").strip(b"\n")

# let's set the flag for json object
self._is_a_json_object = True
json_collector_state.is_a_json_object = True

# finally yield
yield data_to_yield, json_object
Expand All @@ -169,76 +176,74 @@ def _collector(self, data: bytes, newline: bytes) -> Iterator[tuple[bytes, Optio
# let's keep iterating
except ValueError:
# it's an empty line, let's yield it
if self._is_a_json_object and len(self._unfinished_line.strip(b"\r\n").strip(b"\n")) == 0:
if (
json_collector_state.is_a_json_object
and len(json_collector_state.unfinished_line.strip(b"\r\n").strip(b"\n")) == 0
):
# let's reset the buffer
self._unfinished_line = b""
json_collector_state.unfinished_line = b""

# let's increase the offset for yielding
self._handle_offset(len(newline))
_handle_offset(len(newline), json_collector_state)

# finally yield
yield b"", None
else:
# buffer was not a complete json object
# let's increase the circuit breaker
self._is_a_json_object_circuit_breaker += 1
json_collector_state.is_a_json_object_circuit_breaker += 1

# if the first 1k lines are not a json object let's give up
if self._is_a_json_object_circuit_breaker > 1000:
self._is_a_json_object_circuit_broken = True
if json_collector_state.is_a_json_object_circuit_breaker > 1000:
json_collector_state.is_a_json_object_circuit_broken = True

def _by_lines_fallback(
self, buffer: bytes
json_collector_state: JsonCollectorState,
) -> Iterator[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]]:
assert self._storage is not None

@by_lines
def wrapper(
storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool
) -> Iterator[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]]:
data_to_yield: bytes = body.read()
yield data_to_yield, 0, range_start, b"", None

for line, starting_offset, ending_offset, newline, _ in wrapper(
self._storage, self._ending_offset, BytesIO(buffer), False
for line, _, _, newline, _ in wrapper(
json_collector_state.storage,
json_collector_state.ending_offset,
BytesIO(json_collector_state.unfinished_line),
False,
):
assert isinstance(line, bytes)
yield line, starting_offset, ending_offset, newline, None

def _handle_offset(self, offset_skew: int) -> None:
self._starting_offset = self._ending_offset
self._ending_offset += offset_skew
_handle_offset(len(line) + len(newline), json_collector_state)

# let's reset the buffer
json_collector_state.unfinished_line = b""

# let's set the flag for direct yield from now on
json_collector_state.has_an_object_start = False

def __call__(
self, storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool
yield line, _, _, newline, None

def wrapper(
storage: ProtocolStorageType, range_start: int, body: BytesIO, is_gzipped: bool
) -> Iterator[tuple[Union[StorageReader, bytes], int, int, bytes, Optional[int]]]:
self._storage = storage
json_collector_state = JsonCollectorState(storage=storage)

multiline_processor: Optional[ProtocolMultiline] = storage.multiline_processor
if storage.json_content_type == "disabled" or multiline_processor:
iterator = self._function(storage, range_start, body, is_gzipped)
iterator = func(storage, range_start, body, is_gzipped)
for data, starting_offset, ending_offset, newline, _ in iterator:
assert isinstance(data, bytes)
shared_logger.debug("JsonCollector skipped", extra={"offset": ending_offset})
shared_logger.debug("json_collector skipped", extra={"offset": ending_offset})

yield data, starting_offset, ending_offset, newline, None
else:
event_list_from_field_expander: Optional[ExpandEventListFromField] = storage.event_list_from_field_expander

has_an_object_start: bool = False
wait_for_object_start: bool = True
wait_for_object_start_buffer: bytes = b""

self._ending_offset = range_start
self._starting_offset = 0

self._single_payload = b""
self._unfinished_line = b""
json_collector_state.ending_offset = range_start

self._is_a_json_object = False
self._is_a_json_object_circuit_broken = False
self._is_a_json_object_circuit_breaker = 0

iterator = self._function(storage, range_start, body, is_gzipped)
iterator = func(storage, range_start, body, is_gzipped)
# if we know it's a single json we replace body with the whole content,
# and mark the object as started
if storage.json_content_type == "single" and event_list_from_field_expander is None:
Expand All @@ -259,106 +264,66 @@ def __call__(
for data, starting_offset, ending_offset, newline, _ in iterator:
assert isinstance(data, bytes)

# we still wait for the object to start
if wait_for_object_start:
# we check for a potential json object on
# we strip leading space to be safe on padding
# if it's not a json object we can just forward the content by lines
if not json_collector_state.has_an_object_start:
# if range_start is greater than zero, or we have leading space, data can be empty
stripped_data = data.decode("utf-8").lstrip()
if len(stripped_data) > 0 and stripped_data[0] == "{":
# we mark the potentiality of a json object start
# CAVEAT: if the log entry starts with `{` but the
# content is not json, we buffer the first 10k lines
# before the circuit breaker kicks in
json_collector_state.has_an_object_start = True

if not json_collector_state.has_an_object_start:
_handle_offset(len(data) + len(newline), json_collector_state)
yield data, starting_offset, ending_offset, newline, None

# if range_start is greater than zero, data can be empty
if len(stripped_data) > 0:
wait_for_object_start = False
if stripped_data[0] == "{":
# we mark the potentiality of a json object start
# CAVEAT: if the log entry starts with `{` but the
# content is not json, we buffer the first 10k lines
# before the circuit breaker kicks in
has_an_object_start = True
else:
# let's buffer discarded data including newline for eventual by_lines() fallback
wait_for_object_start_buffer += data + newline

if not wait_for_object_start:
# if it's not a json object we can just forward the content by lines
if not has_an_object_start:
if len(wait_for_object_start_buffer) > 0:
# let's yield the buffer we set for waiting for object_start before the circuit breaker
for line, _, _, original_newline, _ in self._by_lines_fallback(
wait_for_object_start_buffer
if json_collector_state.has_an_object_start:
# let's parse the data only if it is not single, or we have a field expander
for data_to_yield, json_object in _collector(data, newline, json_collector_state):
shared_logger.debug(
"json_collector objects", extra={"offset": json_collector_state.ending_offset}
)
if event_list_from_field_expander is not None:
for (
expanded_log_event,
expanded_starting_offset,
expanded_ending_offset,
expanded_event_n,
) in event_list_from_field_expander.expand(
data_to_yield,
json_object,
json_collector_state.starting_offset,
json_collector_state.ending_offset,
):
self._handle_offset(len(line) + len(original_newline))
yield line, self._starting_offset, self._ending_offset, original_newline, None

# let's reset the buffer
wait_for_object_start_buffer = b""

yield data, starting_offset, ending_offset, newline, None
else:
# let's yield wait_for_object_start_buffer: it is newline only content
for line, _, _, original_newline, _ in self._by_lines_fallback(
wait_for_object_start_buffer
):
self._handle_offset(len(line) + len(original_newline))
yield line, self._starting_offset, self._ending_offset, original_newline, None

# let's reset the buffer
wait_for_object_start_buffer = b""

# let's parse the data only if it is not single, or we have a field expander
if storage.json_content_type != "single" or event_list_from_field_expander is not None:
for data_to_yield, json_object in self._collector(data, newline):
shared_logger.debug("JsonCollector objects", extra={"offset": self._ending_offset})
if event_list_from_field_expander is not None:
for (
expanded_log_event,
expanded_starting_offset,
expanded_ending_offset,
expanded_event_n,
) in event_list_from_field_expander.expand(
data_to_yield, json_object, self._starting_offset, self._ending_offset
):
yield (
expanded_log_event,
expanded_starting_offset,
expanded_ending_offset,
newline,
expanded_event_n,
)
else:
yield data_to_yield, self._starting_offset, self._ending_offset, newline, None

del json_object
yield (
expanded_log_event,
expanded_starting_offset,
expanded_ending_offset,
newline,
expanded_event_n,
)
else:
self._handle_offset(len(data) + len(newline))
yield data, self._starting_offset, self._ending_offset, newline, None

if self._is_a_json_object_circuit_broken:
# let's yield what we have so far
for line, _, _, original_newline, _ in self._by_lines_fallback(self._unfinished_line):
self._handle_offset(len(line) + len(original_newline))
yield data_to_yield, json_collector_state.starting_offset, json_collector_state.ending_offset, newline, None

yield line, self._starting_offset, self._ending_offset, original_newline, None
del json_object

# let's set the flag for direct yield from now on
has_an_object_start = False

# let's reset the buffer
self._unfinished_line = b""
# check if we hit the circuit broken
if json_collector_state.is_a_json_object_circuit_broken:
# let's yield what we have so far
for line, _, _, original_newline, _ in _by_lines_fallback(json_collector_state):
yield line, json_collector_state.starting_offset, json_collector_state.ending_offset, original_newline, None

# in this case we could have a trailing new line in what's left in the buffer
# or the content had a leading `{` but was not a json object before the circuit breaker intercepted it,
# or we waited for the object start and never reached:
# let's fallback to by_lines()
if not self._is_a_json_object:
buffer: bytes = self._unfinished_line
if wait_for_object_start:
buffer = wait_for_object_start_buffer

for line, _, _, newline, _ in self._by_lines_fallback(buffer):
line = line.strip(newline)
self._handle_offset(len(line) + len(newline))
if not json_collector_state.is_a_json_object:
for line, _, _, newline, _ in _by_lines_fallback(json_collector_state):
yield line, json_collector_state.starting_offset, json_collector_state.ending_offset, newline, None

yield line, self._starting_offset, self._ending_offset, newline, None
return wrapper


def inflate(func: GetByLinesCallable[ProtocolStorageType]) -> GetByLinesCallable[ProtocolStorageType]:
Expand Down
4 changes: 2 additions & 2 deletions storage/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from share import ExpandEventListFromField, ProtocolMultiline, shared_logger

from .decorator import JsonCollector, by_lines, inflate, multi_line
from .decorator import json_collector, by_lines, inflate, multi_line
from .storage import CHUNK_SIZE, CommonStorage, StorageReader, is_gzip_content


Expand All @@ -33,7 +33,7 @@ def __init__(
self.event_list_from_field_expander = event_list_from_field_expander

@multi_line
@JsonCollector
@json_collector
@by_lines
@inflate
def _generate(
Expand Down
4 changes: 2 additions & 2 deletions storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from share import ExpandEventListFromField, ProtocolMultiline, shared_logger

from .decorator import JsonCollector, by_lines, inflate, multi_line
from .decorator import json_collector, by_lines, inflate, multi_line
from .storage import CHUNK_SIZE, CommonStorage, StorageReader, is_gzip_content


Expand Down Expand Up @@ -41,7 +41,7 @@ def __init__(
self.event_list_from_field_expander = event_list_from_field_expander

@multi_line
@JsonCollector
@json_collector
@by_lines
@inflate
def _generate(
Expand Down
Loading

0 comments on commit 1f602b9

Please sign in to comment.