Skip to content

Commit

Permalink
Remove delete sqs message calls and optimise json collector decorator (
Browse files Browse the repository at this point in the history
…#534)

* parse the content as json only when really necessary (not single content type, or with a events list expander from field)

* make lint

* fix bugs

* bump localstack container

* add ec2 service to localstack

* fix comment

* fix typo

* reduce more complexity

* fix test for localstack 3.0.0

* fix lint

* further semplification and better performance

* remove debug printing

* remove debug printing

* clean get_by_lines unit test

* fix lint

* even simplier

* fix lint

* increase coverage

* Add ESF specific User-Agent header in outgoing Elasticsearch requests (#537)

* Add ESF specific User-Agent header in outgoing Elasticsearch requests
* Bump minimum supported Elastic stack version to 7.17

* Bump boto3 from 1.28.80 to 1.33.1 (#541)

Bumps [boto3](https://github.com/boto/boto3) from 1.28.80 to 1.33.1.
- [Release notes](https://github.com/boto/boto3/releases)
- [Changelog](https://github.com/boto/boto3/blob/develop/CHANGELOG.rst)
- [Commits](boto/boto3@1.28.80...1.33.1)

---
updated-dependencies:
- dependency-name: boto3
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Bump boto3 from 1.33.1 to 1.33.2 (#542)

Bumps [boto3](https://github.com/boto/boto3) from 1.33.1 to 1.33.2.
- [Release notes](https://github.com/boto/boto3/releases)
- [Changelog](https://github.com/boto/boto3/blob/develop/CHANGELOG.rst)
- [Commits](boto/boto3@1.33.1...1.33.2)

---
updated-dependencies:
- dependency-name: boto3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* inline doc

* fix tests

* changelog

* add back empty newline in get_by_lines_parameters

* use type aliases

* add comments

* fix typo in comment

* add more comments

* add decorator unit tests

* fix decorator according to test expectations

---------

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: Davide Girardi <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 1, 2023
1 parent c8b29b3 commit 4c76f27
Show file tree
Hide file tree
Showing 31 changed files with 645 additions and 453 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
### v1.11.0 - 2023/11/27
##### Features
* Add user agent with information about ESF version and host environment: [#537](https://github.com/elastic/elastic-serverless-forwarder/pull/537)
* Remove calls to `sqs.DeleteMessage` and refactor storage decorators: [#534](https://github.com/elastic/elastic-serverless-forwarder/pull/534)

### v1.10.0 - 2023/10/27
##### Features
* Move `_id` field to `@metadata._id` in logstash output: [#507](https://github.com/elastic/elastic-serverless-forwarder/pull/507)
Expand Down
2 changes: 1 addition & 1 deletion docs/en/aws-elastic-serverless-forwarder.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

The Elastic Serverless Forwarder is an Amazon Web Services ({aws}) Lambda function that ships logs from your {aws} environment to Elastic.

The Elastic Serverless Forwarder works with {stack} 7.16 and later.
The Elastic Serverless Forwarder works with {stack} 7.17 and later.

IMPORTANT: Using Elastic Serverless Forwarder may result in additional charges. To learn
how to minimize additional charges, refer to <<preventing-unexpected-costs>>.
Expand Down
3 changes: 0 additions & 3 deletions handlers/aws/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,11 +371,8 @@ def handle_timeout(
)

previous_sqs_record: int = 0
last_sqs_record: Optional[dict[str, Any]] = None
for current_sqs_record, sqs_record in enumerate(lambda_event["Records"]):
last_sqs_record = sqs_record
if current_sqs_record > previous_sqs_record:

previous_sqs_record = current_sqs_record

continuing_original_input_type = get_continuing_original_input_type(sqs_record)
Expand Down
2 changes: 1 addition & 1 deletion handlers/aws/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def get_input_from_log_group_subscription_data(
We avoid to call the describe_log_streams on the logs' client, since we have no way to apply the proper
throttling because we'd need to know the number of concurrent lambda running at the time of the call.
In order to not hardcode the list of regions we rely on ec2 DescribeRegions - as much weird as it is - that I found
no information about any kind of throttling. Weadd IAM permissions for it in deployment.
no information about having any kind of throttling. We add IAM permissions for it in deployment.
"""
all_regions = get_ec2_client().describe_regions(AllRegions=True)
assert "Regions" in all_regions
Expand Down
2 changes: 1 addition & 1 deletion requirements-tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ orjson==3.9.10
pysimdjson==5.0.2
python-rapidjson==1.13
cysimdjson==23.8
responses==0.23.3
responses==0.24.1
testcontainers==3.7.1
pyOpenSSL==23.3.0
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
elastic-apm==6.19.0
boto3==1.28.80
boto3==1.33.2
ecs_logging==2.1.0
elasticsearch==7.16.3
elasticsearch==7.17.9
PyYAML==6.0.1
aws_lambda_typing==2.18.0
ujson==5.8.0
requests==2.31.0
urllib3==1.26.15
urllib3==1.26.18
2 changes: 1 addition & 1 deletion share/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
from .include_exlude import IncludeExcludeFilter, IncludeExcludeRule
from .json import json_dumper, json_parser
from .logger import logger as shared_logger
from .multiline import CollectBuffer, CountMultiline, PatternMultiline, ProtocolMultiline, WhileMultiline
from .multiline import CollectBuffer, CountMultiline, FeedIterator, PatternMultiline, ProtocolMultiline, WhileMultiline
from .secretsmanager import aws_sm_expander
from .utils import get_hex_prefix
2 changes: 0 additions & 2 deletions share/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
_available_input_types: list[str] = ["cloudwatch-logs", "s3-sqs", "sqs", "kinesis-data-stream"]
_available_output_types: list[str] = ["elasticsearch", "logstash"]

IntegrationScopeDiscovererCallable = Callable[[dict[str, Any], int], str]


class Output:
"""
Expand Down
17 changes: 17 additions & 0 deletions share/environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.

import os
import platform


def is_aws() -> bool:
return os.getenv("AWS_EXECUTION_ENV") is not None


def get_environment() -> str:
if is_aws():
return os.environ["AWS_EXECUTION_ENV"]
else:
return f"Python/{platform.python_version()} {platform.system()}/{platform.machine()}"
13 changes: 7 additions & 6 deletions share/expand_event_list_from_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from .json import json_dumper
from .logger import logger as shared_logger

# ExpandEventListFromFieldResolverCallable accepts an integration_scope and the field to expand events list from as
# arguments. It returns the resolved name of the field to expand the events list from.
ExpandEventListFromFieldResolverCallable = Callable[[str, str], str]


Expand All @@ -26,9 +28,9 @@ def __init__(

def _expand_event_list_from_field(
self, json_object: dict[str, Any], starting_offset: int, ending_offset: int
) -> Iterator[tuple[Any, int, Optional[int], bool, bool]]:
) -> Iterator[tuple[Any, int, int, Optional[int], bool, bool]]:
if len(self._field_to_expand_event_list_from) == 0 or self._field_to_expand_event_list_from not in json_object:
yield None, starting_offset, 0, True, False
yield None, starting_offset, ending_offset, 0, True, False
else:
events_list: list[Any] = json_object[self._field_to_expand_event_list_from]
# let's set to 1 if empty list to avoid division by zero in the line below,
Expand Down Expand Up @@ -68,8 +70,8 @@ def _expand_event_list_from_field(
shared_logger.debug("root fields to be added on a non json object event")

event_n += offset_skew
yield event, int(
starting_offset + (event_n * avg_event_length)
yield event, int(starting_offset + (event_n * avg_event_length)), int(
starting_offset + ((event_n + 1) * avg_event_length)
), event_n, event_n == events_list_length - 1, True

def expand(
Expand All @@ -78,11 +80,10 @@ def expand(
if json_object is None:
yield log_event, starting_offset, ending_offset, None
else:
expanded_ending_offset: int = starting_offset

for (
expanded_event,
expanded_starting_offset,
expanded_ending_offset,
expanded_event_n,
is_last_expanded_event,
event_was_expanded,
Expand Down
120 changes: 59 additions & 61 deletions share/multiline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,52 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.


# This file is a porting of the multiline processor on beats.

from __future__ import annotations

import datetime
import re
from abc import ABCMeta
from typing import Callable, Iterator, Optional, Protocol

from typing_extensions import TypeAlias

default_max_bytes: int = 10485760 # Default maximum number of bytes to return in one multi-line event
default_max_lines: int = 500 # Default maximum number of lines to return in one multi-line event
default_multiline_timeout: int = 5 # Default timeout in secs to finish a multi-line event.

timedelta_circuit_breaker: datetime.timedelta = datetime.timedelta(seconds=5)

# CollectTuple is a tuple representing the multilines bytes content, the length of the content and the newline found
# These data is instrumental to the `StorageDecoratorIterator` that needs the content to yield, the starting and ending
# offsets and the newline
CollectTuple: TypeAlias = tuple[bytes, int, bytes]

# CollectIterator yields a `CollectTuple`
CollectIterator: TypeAlias = Iterator[CollectTuple]

# FeedIterator yields a tuple representing the content and its newline to feed of the `ProtocolMultiline` implementation
FeedIterator: TypeAlias = Iterator[tuple[bytes, bytes]]


class CommonMultiline(metaclass=ABCMeta):
"""
Common class for Multiline components
"""

_feed: Iterator[tuple[bytes, bytes, int]]
_feed: FeedIterator
_buffer: CollectBuffer

_pre_collect_buffer: bool

@property
def feed(self) -> Iterator[tuple[bytes, bytes, int]]:
def feed(self) -> FeedIterator:
return self._feed

@feed.setter
def feed(self, value: Iterator[tuple[bytes, bytes, int]]) -> None:
def feed(self, value: FeedIterator) -> None:
self._feed = value


Expand All @@ -40,18 +56,18 @@ class ProtocolMultiline(Protocol):
Protocol class for Multiline components
"""

_feed: Iterator[tuple[bytes, bytes, int]]
_feed: FeedIterator
_buffer: CollectBuffer

@property
def feed(self) -> Iterator[tuple[bytes, bytes, int]]:
def feed(self) -> FeedIterator:
pass # pragma: no cover

@feed.setter
def feed(self, value: Iterator[tuple[bytes, bytes, int]]) -> None:
def feed(self, value: FeedIterator) -> None:
pass # pragma: no cover

def collect(self) -> Iterator[tuple[bytes, int, int]]:
def collect(self) -> CollectIterator:
pass # pragma: no cover


Expand All @@ -74,7 +90,7 @@ def __init__(self, max_bytes: int, max_lines: int, skip_newline: bool):
self._processed_lines: int = 0
self._current_length: int = 0

def collect_and_reset(self) -> tuple[bytes, int]:
def collect_and_reset(self) -> CollectTuple:
data = self._buffer
current_length = self._current_length

Expand All @@ -85,7 +101,14 @@ def collect_and_reset(self) -> tuple[bytes, int]:

self.previous = b""

return data, current_length
if data.find(b"\r\n") > -1:
newline = b"\r\n"
elif data.find(b"\n") > -1:
newline = b"\n"
else:
newline = b""

return data, current_length, newline

def is_empty(self) -> bool:
return self._buffer_lines == 0
Expand Down Expand Up @@ -169,31 +192,24 @@ def __eq__(self, other: object) -> bool:
and self._skip_newline == self._skip_newline
)

def collect(self) -> Iterator[tuple[bytes, int, int]]:
def collect(self) -> CollectIterator:
last_iteration_datetime: datetime.datetime = datetime.datetime.utcnow()
for data, newline, newline_length in self.feed:
for data, newline in self.feed:
self._buffer.grow(data, newline)
self._current_count += 1
if (
self._count_lines == self._current_count
or (datetime.datetime.utcnow() - last_iteration_datetime) > timedelta_circuit_breaker
):
self._current_count = 0
content, current_length = self._buffer.collect_and_reset()
yield content, current_length, newline_length
yield self._buffer.collect_and_reset()

if not self._buffer.is_empty():
content, current_length = self._buffer.collect_and_reset()

newline_length = 0
if content.find(b"\r\n") > -1:
newline_length = 2
elif content.find(b"\n") > -1:
newline_length = 1

yield content, current_length, newline_length
yield self._buffer.collect_and_reset()


# WhileMatcherCallable accepts a pattern in bytes to be compiled as regex.
# It returns a boolean indicating if the content matches a "while" multiline pattern or not.
WhileMatcherCallable = Callable[[bytes], bool]


Expand Down Expand Up @@ -259,44 +275,38 @@ def negate(line: bytes) -> bool:

return negate

def collect(self) -> Iterator[tuple[bytes, int, int]]:
def collect(self) -> CollectIterator:
last_iteration_datetime: datetime.datetime = datetime.datetime.utcnow()
for data, newline, newline_length in self.feed:
for data, newline in self.feed:
if not self._matcher(data):
if self._buffer.is_empty():
self._buffer.grow(data, newline)
content, current_length = self._buffer.collect_and_reset()
yield content, current_length, newline_length
yield self._buffer.collect_and_reset()
else:
content, current_length = self._buffer.collect_and_reset()
content, current_length, _ = self._buffer.collect_and_reset()
self._buffer.grow(data, newline)

yield content, current_length, newline_length
yield content, current_length, newline

content, current_length = self._buffer.collect_and_reset()
content, current_length, _ = self._buffer.collect_and_reset()

yield content, current_length, newline_length
yield content, current_length, newline
else:
self._buffer.grow(data, newline)
# no pre collect buffer in while multiline, let's check the circuit breaker after at least one grow
if (datetime.datetime.utcnow() - last_iteration_datetime) > timedelta_circuit_breaker:
content, current_length = self._buffer.collect_and_reset()

yield content, current_length, newline_length
yield self._buffer.collect_and_reset()

if not self._buffer.is_empty():
content, current_length = self._buffer.collect_and_reset()

newline_length = 0
if content.find(b"\r\n") > -1:
newline_length = 2
elif content.find(b"\n") > -1:
newline_length = 1

yield content, current_length, newline_length
yield self._buffer.collect_and_reset()


# WhileMatcherCallable accepts the previous and the current content as arguments.
# It returns a boolean indicating if the content matches a "pattern" multiline pattern or not.
PatternMatcherCallable = Callable[[bytes, bytes], bool]

# SelectCallable accepts the previous and the current content as arguments.
# It returns either the previous or current content according to the matching type ("before" or "after").
SelectCallable = Callable[[bytes, bytes], bytes]


Expand Down Expand Up @@ -390,8 +400,8 @@ def negate(previous: bytes, current: bytes) -> bool:
def _check_matcher(self) -> bool:
return (self._match == "after" and len(self._buffer.previous) > 0) or self._match == "before"

def collect(self) -> Iterator[tuple[bytes, int, int]]:
for data, newline, newline_length in self.feed:
def collect(self) -> CollectIterator:
for data, newline in self.feed:
last_iteration_datetime: datetime.datetime = datetime.datetime.utcnow()
if self._pre_collect_buffer:
self._buffer.collect_and_reset()
Expand All @@ -401,31 +411,19 @@ def collect(self) -> Iterator[tuple[bytes, int, int]]:
self._buffer.grow(data, newline)
self._pre_collect_buffer = True

content, current_length = self._buffer.collect_and_reset()
yield content, current_length, newline_length
yield self._buffer.collect_and_reset()
elif (
not self._buffer.is_empty() and self._check_matcher() and not self._matcher(self._buffer.previous, data)
):
content, current_length = self._buffer.collect_and_reset()
content, current_length, _ = self._buffer.collect_and_reset()

self._buffer.grow(data, newline)

yield content, current_length, newline_length
yield content, current_length, newline
else:
if (datetime.datetime.utcnow() - last_iteration_datetime) > timedelta_circuit_breaker:
content, current_length = self._buffer.collect_and_reset()

yield content, current_length, newline_length

yield self._buffer.collect_and_reset()
self._buffer.grow(data, newline)

if not self._buffer.is_empty():
content, current_length = self._buffer.collect_and_reset()

newline_length = 0
if content.find(b"\r\n") > -1:
newline_length = 2
elif content.find(b"\n") > -1:
newline_length = 1

yield content, current_length, newline_length
yield self._buffer.collect_and_reset()
Loading

0 comments on commit 4c76f27

Please sign in to comment.