Skip to content

Commit

Permalink
Remove delete sqs message calls and optimise json collector decorator (
Browse files Browse the repository at this point in the history
…#544)

* Remove sqs delete record calls

* Remove delete sqs message calls and optimise json collector decorator (#534)

* parse the content as json only when really necessary (not single content type, or with a events list expander from field)

* make lint

* fix bugs

* bump localstack container

* add ec2 service to localstack

* fix comment

* fix typo

* reduce more complexity

* fix test for localstack 3.0.0

* fix lint

* further semplification and better performance

* remove debug printing

* remove debug printing

* clean get_by_lines unit test

* fix lint

* even simplier

* fix lint

* increase coverage

* Add ESF specific User-Agent header in outgoing Elasticsearch requests (#537)

* Add ESF specific User-Agent header in outgoing Elasticsearch requests
* Bump minimum supported Elastic stack version to 7.17

* Bump boto3 from 1.28.80 to 1.33.1 (#541)

Bumps [boto3](https://github.com/boto/boto3) from 1.28.80 to 1.33.1.
- [Release notes](https://github.com/boto/boto3/releases)
- [Changelog](https://github.com/boto/boto3/blob/develop/CHANGELOG.rst)
- [Commits](boto/boto3@1.28.80...1.33.1)

---
updated-dependencies:
- dependency-name: boto3
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump boto3 from 1.33.1 to 1.33.2 (#542)

Bumps [boto3](https://github.com/boto/boto3) from 1.33.1 to 1.33.2.
- [Release notes](https://github.com/boto/boto3/releases)
- [Changelog](https://github.com/boto/boto3/blob/develop/CHANGELOG.rst)
- [Commits](boto/boto3@1.33.1...1.33.2)

---
updated-dependencies:
- dependency-name: boto3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* inline doc

* fix tests

* changelog

* add back empty newline in get_by_lines_parameters

* use type aliases

* add comments

* fix typo in comment

* add more comments

* add decorator unit tests

* fix decorator according to test expectations

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: Davide Girardi <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Update CHANGELOG.md

* revert not updating ending offset in ExpandEventListFromField

* add typing-extensions==4.8.0 to requirements

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: girodav <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 6, 2023
1 parent e10ee73 commit abfb93d
Show file tree
Hide file tree
Showing 21 changed files with 571 additions and 445 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
### v1.11.0 - 2023/11/27
##### Features
* Add user agent with information about ESF version and host environment: [#537](https://github.com/elastic/elastic-serverless-forwarder/pull/537)
* Remove calls to `sqs.DeleteMessage` and refactor storage decorators: [#544](https://github.com/elastic/elastic-serverless-forwarder/pull/544)

### v1.10.0 - 2023/10/27
##### Features
Expand Down
11 changes: 0 additions & 11 deletions handlers/aws/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
capture_serverless,
config_yaml_from_payload,
config_yaml_from_s3,
delete_sqs_record,
expand_event_list_from_field_resolver,
get_continuing_original_input_type,
get_input_from_log_group_subscription_data,
Expand Down Expand Up @@ -371,16 +370,9 @@ def handle_timeout(
config_yaml=timeout_config_yaml,
)

delete_sqs_record(sqs_record["eventSourceARN"], sqs_record["receiptHandle"])

previous_sqs_record: int = 0
last_sqs_record: Optional[dict[str, Any]] = None
for current_sqs_record, sqs_record in enumerate(lambda_event["Records"]):
last_sqs_record = sqs_record
if current_sqs_record > previous_sqs_record:
deleting_sqs_record = lambda_event["Records"][previous_sqs_record]
delete_sqs_record(deleting_sqs_record["eventSourceARN"], deleting_sqs_record["receiptHandle"])

previous_sqs_record = current_sqs_record

continuing_original_input_type = get_continuing_original_input_type(sqs_record)
Expand Down Expand Up @@ -504,7 +496,4 @@ def handle_timeout(
extra={"sent_events": sent_events, "empty_events": empty_events, "skipped_events": skipped_events},
)

assert last_sqs_record is not None
delete_sqs_record(last_sqs_record["eventSourceARN"], last_sqs_record["receiptHandle"])

return "completed"
2 changes: 1 addition & 1 deletion handlers/aws/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def get_input_from_log_group_subscription_data(
We avoid to call the describe_log_streams on the logs' client, since we have no way to apply the proper
throttling because we'd need to know the number of concurrent lambda running at the time of the call.
In order to not hardcode the list of regions we rely on ec2 DescribeRegions - as much weird as it is - that I found
no information about any kind of throttling. Weadd IAM permissions for it in deployment.
no information about having any kind of throttling. We add IAM permissions for it in deployment.
"""
all_regions = get_ec2_client().describe_regions(AllRegions=True)
assert "Regions" in all_regions
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ aws_lambda_typing==2.18.0
ujson==5.8.0
requests==2.31.0
urllib3==1.26.18
typing-extensions==4.8.0
2 changes: 1 addition & 1 deletion share/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
from .include_exlude import IncludeExcludeFilter, IncludeExcludeRule
from .json import json_dumper, json_parser
from .logger import logger as shared_logger
from .multiline import CollectBuffer, CountMultiline, PatternMultiline, ProtocolMultiline, WhileMultiline
from .multiline import CollectBuffer, CountMultiline, FeedIterator, PatternMultiline, ProtocolMultiline, WhileMultiline
from .secretsmanager import aws_sm_expander
from .utils import get_hex_prefix
2 changes: 0 additions & 2 deletions share/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
_available_input_types: list[str] = ["cloudwatch-logs", "s3-sqs", "sqs", "kinesis-data-stream"]
_available_output_types: list[str] = ["elasticsearch", "logstash"]

IntegrationScopeDiscovererCallable = Callable[[dict[str, Any], int], str]


class Output:
"""
Expand Down
6 changes: 6 additions & 0 deletions share/expand_event_list_from_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from .json import json_dumper
from .logger import logger as shared_logger

# ExpandEventListFromFieldResolverCallable accepts an integration_scope and the field to expand events list from as
# arguments. It returns the resolved name of the field to expand the events list from.
ExpandEventListFromFieldResolverCallable = Callable[[str, str], str]


Expand Down Expand Up @@ -78,6 +80,9 @@ def expand(
if json_object is None:
yield log_event, starting_offset, ending_offset, None
else:
# expanded_ending_offset is set to the starting_offset because if we want to set it to the beginning of the
# json object in case of a message from the continuation queue. if we update it, if the payload is continued
# we will fetch the content of the payload from the middle of the json object, failing to parse it
expanded_ending_offset: int = starting_offset

for (
Expand All @@ -97,6 +102,7 @@ def expand(

if is_last_expanded_event:
expanded_event_n = None
# only when we reach the last expanded event we can move the ending offset
expanded_ending_offset = ending_offset
else:
expanded_event_n = None
Expand Down
120 changes: 59 additions & 61 deletions share/multiline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,52 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.


# This file is a porting of the multiline processor on beats.

from __future__ import annotations

import datetime
import re
from abc import ABCMeta
from typing import Callable, Iterator, Optional, Protocol

from typing_extensions import TypeAlias

default_max_bytes: int = 10485760 # Default maximum number of bytes to return in one multi-line event
default_max_lines: int = 500 # Default maximum number of lines to return in one multi-line event
default_multiline_timeout: int = 5 # Default timeout in secs to finish a multi-line event.

timedelta_circuit_breaker: datetime.timedelta = datetime.timedelta(seconds=5)

# CollectTuple is a tuple representing the multilines bytes content, the length of the content and the newline found
# These data is instrumental to the `StorageDecoratorIterator` that needs the content to yield, the starting and ending
# offsets and the newline
CollectTuple: TypeAlias = tuple[bytes, int, bytes]

# CollectIterator yields a `CollectTuple`
CollectIterator: TypeAlias = Iterator[CollectTuple]

# FeedIterator yields a tuple representing the content and its newline to feed of the `ProtocolMultiline` implementation
FeedIterator: TypeAlias = Iterator[tuple[bytes, bytes]]


class CommonMultiline(metaclass=ABCMeta):
"""
Common class for Multiline components
"""

_feed: Iterator[tuple[bytes, bytes, int]]
_feed: FeedIterator
_buffer: CollectBuffer

_pre_collect_buffer: bool

@property
def feed(self) -> Iterator[tuple[bytes, bytes, int]]:
def feed(self) -> FeedIterator:
return self._feed

@feed.setter
def feed(self, value: Iterator[tuple[bytes, bytes, int]]) -> None:
def feed(self, value: FeedIterator) -> None:
self._feed = value


Expand All @@ -40,18 +56,18 @@ class ProtocolMultiline(Protocol):
Protocol class for Multiline components
"""

_feed: Iterator[tuple[bytes, bytes, int]]
_feed: FeedIterator
_buffer: CollectBuffer

@property
def feed(self) -> Iterator[tuple[bytes, bytes, int]]:
def feed(self) -> FeedIterator:
pass # pragma: no cover

@feed.setter
def feed(self, value: Iterator[tuple[bytes, bytes, int]]) -> None:
def feed(self, value: FeedIterator) -> None:
pass # pragma: no cover

def collect(self) -> Iterator[tuple[bytes, int, int]]:
def collect(self) -> CollectIterator:
pass # pragma: no cover


Expand All @@ -74,7 +90,7 @@ def __init__(self, max_bytes: int, max_lines: int, skip_newline: bool):
self._processed_lines: int = 0
self._current_length: int = 0

def collect_and_reset(self) -> tuple[bytes, int]:
def collect_and_reset(self) -> CollectTuple:
data = self._buffer
current_length = self._current_length

Expand All @@ -85,7 +101,14 @@ def collect_and_reset(self) -> tuple[bytes, int]:

self.previous = b""

return data, current_length
if data.find(b"\r\n") > -1:
newline = b"\r\n"
elif data.find(b"\n") > -1:
newline = b"\n"
else:
newline = b""

return data, current_length, newline

def is_empty(self) -> bool:
return self._buffer_lines == 0
Expand Down Expand Up @@ -169,31 +192,24 @@ def __eq__(self, other: object) -> bool:
and self._skip_newline == self._skip_newline
)

def collect(self) -> Iterator[tuple[bytes, int, int]]:
def collect(self) -> CollectIterator:
last_iteration_datetime: datetime.datetime = datetime.datetime.utcnow()
for data, newline, newline_length in self.feed:
for data, newline in self.feed:
self._buffer.grow(data, newline)
self._current_count += 1
if (
self._count_lines == self._current_count
or (datetime.datetime.utcnow() - last_iteration_datetime) > timedelta_circuit_breaker
):
self._current_count = 0
content, current_length = self._buffer.collect_and_reset()
yield content, current_length, newline_length
yield self._buffer.collect_and_reset()

if not self._buffer.is_empty():
content, current_length = self._buffer.collect_and_reset()

newline_length = 0
if content.find(b"\r\n") > -1:
newline_length = 2
elif content.find(b"\n") > -1:
newline_length = 1

yield content, current_length, newline_length
yield self._buffer.collect_and_reset()


# WhileMatcherCallable accepts a pattern in bytes to be compiled as regex.
# It returns a boolean indicating if the content matches a "while" multiline pattern or not.
WhileMatcherCallable = Callable[[bytes], bool]


Expand Down Expand Up @@ -259,44 +275,38 @@ def negate(line: bytes) -> bool:

return negate

def collect(self) -> Iterator[tuple[bytes, int, int]]:
def collect(self) -> CollectIterator:
last_iteration_datetime: datetime.datetime = datetime.datetime.utcnow()
for data, newline, newline_length in self.feed:
for data, newline in self.feed:
if not self._matcher(data):
if self._buffer.is_empty():
self._buffer.grow(data, newline)
content, current_length = self._buffer.collect_and_reset()
yield content, current_length, newline_length
yield self._buffer.collect_and_reset()
else:
content, current_length = self._buffer.collect_and_reset()
content, current_length, _ = self._buffer.collect_and_reset()
self._buffer.grow(data, newline)

yield content, current_length, newline_length
yield content, current_length, newline

content, current_length = self._buffer.collect_and_reset()
content, current_length, _ = self._buffer.collect_and_reset()

yield content, current_length, newline_length
yield content, current_length, newline
else:
self._buffer.grow(data, newline)
# no pre collect buffer in while multiline, let's check the circuit breaker after at least one grow
if (datetime.datetime.utcnow() - last_iteration_datetime) > timedelta_circuit_breaker:
content, current_length = self._buffer.collect_and_reset()

yield content, current_length, newline_length
yield self._buffer.collect_and_reset()

if not self._buffer.is_empty():
content, current_length = self._buffer.collect_and_reset()

newline_length = 0
if content.find(b"\r\n") > -1:
newline_length = 2
elif content.find(b"\n") > -1:
newline_length = 1

yield content, current_length, newline_length
yield self._buffer.collect_and_reset()


# WhileMatcherCallable accepts the previous and the current content as arguments.
# It returns a boolean indicating if the content matches a "pattern" multiline pattern or not.
PatternMatcherCallable = Callable[[bytes, bytes], bool]

# SelectCallable accepts the previous and the current content as arguments.
# It returns either the previous or current content according to the matching type ("before" or "after").
SelectCallable = Callable[[bytes, bytes], bytes]


Expand Down Expand Up @@ -390,8 +400,8 @@ def negate(previous: bytes, current: bytes) -> bool:
def _check_matcher(self) -> bool:
return (self._match == "after" and len(self._buffer.previous) > 0) or self._match == "before"

def collect(self) -> Iterator[tuple[bytes, int, int]]:
for data, newline, newline_length in self.feed:
def collect(self) -> CollectIterator:
for data, newline in self.feed:
last_iteration_datetime: datetime.datetime = datetime.datetime.utcnow()
if self._pre_collect_buffer:
self._buffer.collect_and_reset()
Expand All @@ -401,31 +411,19 @@ def collect(self) -> Iterator[tuple[bytes, int, int]]:
self._buffer.grow(data, newline)
self._pre_collect_buffer = True

content, current_length = self._buffer.collect_and_reset()
yield content, current_length, newline_length
yield self._buffer.collect_and_reset()
elif (
not self._buffer.is_empty() and self._check_matcher() and not self._matcher(self._buffer.previous, data)
):
content, current_length = self._buffer.collect_and_reset()
content, current_length, _ = self._buffer.collect_and_reset()

self._buffer.grow(data, newline)

yield content, current_length, newline_length
yield content, current_length, newline
else:
if (datetime.datetime.utcnow() - last_iteration_datetime) > timedelta_circuit_breaker:
content, current_length = self._buffer.collect_and_reset()

yield content, current_length, newline_length

yield self._buffer.collect_and_reset()
self._buffer.grow(data, newline)

if not self._buffer.is_empty():
content, current_length = self._buffer.collect_and_reset()

newline_length = 0
if content.find(b"\r\n") > -1:
newline_length = 2
elif content.find(b"\n") > -1:
newline_length = 1

yield content, current_length, newline_length
yield self._buffer.collect_and_reset()
4 changes: 4 additions & 0 deletions shippers/shipper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

from typing import Any, Callable, Protocol

# ReplayHandlerCallable accepts the output type, a dict of arguments for the output and the event to be replayed.
# It does not return anything.
ReplayHandlerCallable = Callable[[str, dict[str, Any], dict[str, Any]], None]

# EventIdGeneratorCallable accepts a dict of the events as argument. It returns the _id of that event.
EventIdGeneratorCallable = Callable[[dict[str, Any]], str]

EVENT_IS_EMPTY = "EVENT_IS_EMPTY"
Expand Down
3 changes: 2 additions & 1 deletion storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.

from .decorator import by_lines, inflate, json_collector, multi_line
from .factory import StorageFactory
from .payload import PayloadStorage
from .s3 import S3Storage
from .storage import ProtocolStorage, StorageReader
from .storage import CommonStorage, GetByLinesIterator, ProtocolStorage, StorageDecoratorIterator, StorageReader
Loading

0 comments on commit abfb93d

Please sign in to comment.