Skip to content

Commit

Permalink
Merge branch 'main' into support_aws_gov
Browse files Browse the repository at this point in the history
  • Loading branch information
kaiyan-sheng authored Nov 1, 2023
2 parents 8248ffa + ff1dfc1 commit 1a85f73
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 23 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
### v1.10.0 - 2023/10/27
##### Features
* Move `_id` field to `@metadata._id` in logstash output: [#507](https://github.com/elastic/elastic-serverless-forwarder/pull/507)

### v1.9.0 - 2023/08/24
##### Features
* Allow the possibility to set a prefix for role and policy when deploying with the `publish_lambda.sh` script: [#399](https://github.com/elastic/elastic-serverless-forwarder/pull/399)


### v1.8.1 - 2023/05/04
##### Bug fixes
* Explicitly set `SqsManagedSseEnabled` in CF template for replay and continuing queues for stack created before September/October 2022: [#353](https://github.com/elastic/elastic-serverless-forwarder/pull/353)
Expand Down
6 changes: 4 additions & 2 deletions handlers/aws/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from share import ExpandEventListFromField, json_parser, parse_config, shared_logger
from share.secretsmanager import aws_sm_expander
from shippers import EVENT_IS_FILTERED, EVENT_IS_SENT, CompositeShipper, ProtocolShipper
from shippers import EVENT_IS_FILTERED, EVENT_IS_SENT, CompositeShipper

from .cloudwatch_logs_trigger import (
_from_awslogs_data_to_event,
Expand Down Expand Up @@ -83,7 +83,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex

replay_queue_arn = lambda_event["Records"][0]["eventSourceARN"]
replay_handler = ReplayedEventReplayHandler(replay_queue_arn=replay_queue_arn)
shipper_cache: dict[str, ProtocolShipper] = {}
shipper_cache: dict[str, CompositeShipper] = {}
for replay_record in lambda_event["Records"]:
event = json_parser(replay_record["body"])
input_id = event["event_input_id"]
Expand All @@ -109,6 +109,8 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
else:
shipper = shipper_cache[shipper_id]

assert isinstance(shipper, CompositeShipper)

shipper.send(event["event_payload"])
event_uniq_id: str = event["event_payload"]["_id"] + output_type
replay_handler.add_event_with_receipt_handle(
Expand Down
19 changes: 13 additions & 6 deletions handlers/aws/replay_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Optional

from share import Config, ElasticsearchOutput, Input, LogstashOutput, Output, shared_logger
from shippers import ProtocolShipper, ShipperFactory
from shippers import CompositeShipper, ProtocolShipper, ShipperFactory

from .exceptions import InputConfigException, OutputConfigException, ReplayHandlerException
from .utils import delete_sqs_record
Expand Down Expand Up @@ -41,7 +41,7 @@ def get_shipper_for_replay_event(
output_args: dict[str, Any],
event_input_id: str,
replay_handler: ReplayedEventReplayHandler,
) -> Optional[ProtocolShipper]:
) -> Optional[CompositeShipper]:
event_input: Optional[Input] = config.get_input_by_id(event_input_id)
if event_input is None:
raise InputConfigException(f"Cannot load input for input id {event_input_id}")
Expand All @@ -50,21 +50,28 @@ def get_shipper_for_replay_event(
if output is None:
raise OutputConfigException(f"Cannot load output of type {output_type}")

# Let's wrap the specific output shipper in the composite one, since the composite deepcopy the mutating events
shipper: CompositeShipper = CompositeShipper()

if output_type == "elasticsearch":
assert isinstance(output, ElasticsearchOutput)
output.es_datastream_name = output_args["es_datastream_name"]
shared_logger.debug("setting ElasticSearch shipper")
elasticsearch: ProtocolShipper = ShipperFactory.create_from_output(output_type=output_type, output=output)
elasticsearch.set_replay_handler(replay_handler=replay_handler.replay_handler)

return elasticsearch
shipper.add_shipper(elasticsearch)
shipper.set_replay_handler(replay_handler=replay_handler.replay_handler)

return shipper

if output_type == "logstash":
assert isinstance(output, LogstashOutput)
shared_logger.debug("setting Logstash shipper")
logstash: ProtocolShipper = ShipperFactory.create_from_output(output_type=output_type, output=output)
logstash.set_replay_handler(replay_handler=replay_handler.replay_handler)

return logstash
shipper.add_shipper(logstash)
shipper.set_replay_handler(replay_handler=replay_handler.replay_handler)

return shipper

return None
4 changes: 2 additions & 2 deletions requirements-tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ pytest-cov==4.1.0
pytest-benchmark==4.0.0
coverage==7.3.2
simplejson==3.19.2
orjson==3.9.9
orjson==3.9.10
pysimdjson==5.0.2
python-rapidjson==1.12
python-rapidjson==1.13
cysimdjson==23.8
responses==0.23.3
testcontainers==3.7.1
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
elastic-apm==6.19.0
boto3==1.28.68
boto3==1.28.74
ecs_logging==2.1.0
elasticsearch==7.16.3
PyYAML==6.0.1
Expand Down
10 changes: 10 additions & 0 deletions shippers/logstash.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ def send(self, event: dict[str, Any]) -> str:

event = normalise_event(event)

# Let's move _id to @metadata._id for logstash
if "_id" in event:
event["@metadata"] = {"_id": event["_id"]}
del event["_id"]

self._events_batch.append(event)
if len(self._events_batch) < self._max_batch_size:
return _EVENT_BUFFERED
Expand Down Expand Up @@ -145,4 +150,9 @@ def _send(self) -> None:

if self._replay_handler is not None:
for event in self._events_batch:
# let's put back the _id field from @metadata._id
if "@metadata" in event and "_id" in event["@metadata"]:
event["_id"] = event["@metadata"]["_id"]
del event["@metadata"]

self._replay_handler("logstash", self._replay_args, event)
148 changes: 147 additions & 1 deletion tests/handlers/aws/test_integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def setUpClass(cls) -> None:
esc = ElasticsearchContainer()
cls.elasticsearch = esc.start()

lgc = LogstashContainer()
lgc = LogstashContainer(es_container=esc)
cls.logstash = lgc.start()

lsc = LocalStackContainer(image="localstack/localstack:1.4.0")
Expand Down Expand Up @@ -171,6 +171,152 @@ def tearDown(self) -> None:
os.environ["SQS_CONTINUE_URL"] = ""
os.environ["SQS_REPLAY_URL"] = ""

def test_ls_es_output(self) -> None:
assert isinstance(self.elasticsearch, ElasticsearchContainer)
assert isinstance(self.logstash, LogstashContainer)
assert isinstance(self.localstack, LocalStackContainer)

s3_sqs_queue_name = _time_based_id(suffix="source-s3-sqs")

s3_sqs_queue = _sqs_create_queue(self.sqs_client, s3_sqs_queue_name, self.localstack.get_url())

s3_sqs_queue_arn = s3_sqs_queue["QueueArn"]
s3_sqs_queue_url = s3_sqs_queue["QueueUrl"]

config_yaml: str = f"""
inputs:
- type: s3-sqs
id: "{s3_sqs_queue_arn}"
tags: {self.default_tags}
outputs: {self.default_outputs}
"""

config_file_path = "config.yaml"
config_bucket_name = _time_based_id(suffix="config-bucket")
_s3_upload_content_to_bucket(
client=self.s3_client,
content=config_yaml.encode("utf-8"),
content_type="text/plain",
bucket_name=config_bucket_name,
key=config_file_path,
)

os.environ["S3_CONFIG_FILE"] = f"s3://{config_bucket_name}/{config_file_path}"
fixtures = [
_load_file_fixture("cloudwatch-log-1.json"),
_load_file_fixture("cloudwatch-log-2.json"),
]

cloudtrail_filename_digest = (
"AWSLogs/aws-account-id/CloudTrail-Digest/region/yyyy/mm/dd/"
"aws-account-id_CloudTrail-Digest_region_end-time_random-string.log.gz"
)
cloudtrail_filename_non_digest = (
"AWSLogs/aws-account-id/CloudTrail/region/yyyy/mm/dd/"
"aws-account-id_CloudTrail_region_end-time_random-string.log.gz"
)

s3_bucket_name = _time_based_id(suffix="test-bucket")

_s3_upload_content_to_bucket(
client=self.s3_client,
content=gzip.compress(fixtures[0].encode("utf-8")),
content_type="application/x-gzip",
bucket_name=s3_bucket_name,
key=cloudtrail_filename_digest,
)

_s3_upload_content_to_bucket(
client=self.s3_client,
content=gzip.compress(fixtures[1].encode("utf-8")),
content_type="application/x-gzip",
bucket_name=s3_bucket_name,
key=cloudtrail_filename_non_digest,
)

_sqs_send_s3_notifications(
self.sqs_client,
s3_sqs_queue_url,
s3_bucket_name,
[cloudtrail_filename_digest, cloudtrail_filename_non_digest],
)

event, _ = _sqs_get_messages(self.sqs_client, s3_sqs_queue_url, s3_sqs_queue_arn)

ctx = ContextMock(remaining_time_in_millis=_OVER_COMPLETION_GRACE_PERIOD_2m)
first_call = handler(event, ctx) # type:ignore

assert first_call == "completed"

self.elasticsearch.refresh(index="logs-aws.cloudtrail-default")
assert self.elasticsearch.count(index="logs-aws.cloudtrail-default")["count"] == 2

res = self.elasticsearch.search(index="logs-aws.cloudtrail-default", sort="_seq_no")
assert res["hits"]["total"] == {"value": 2, "relation": "eq"}

assert res["hits"]["hits"][0]["_source"]["message"] == fixtures[0].rstrip("\n")
assert res["hits"]["hits"][0]["_source"]["log"]["offset"] == 0
assert (
res["hits"]["hits"][0]["_source"]["log"]["file"]["path"]
== f"https://{s3_bucket_name}.s3.eu-central-1.amazonaws.com/{cloudtrail_filename_digest}"
)
assert res["hits"]["hits"][0]["_source"]["aws"]["s3"]["bucket"]["name"] == s3_bucket_name
assert res["hits"]["hits"][0]["_source"]["aws"]["s3"]["bucket"]["arn"] == f"arn:aws:s3:::{s3_bucket_name}"
assert res["hits"]["hits"][0]["_source"]["aws"]["s3"]["object"]["key"] == cloudtrail_filename_digest
assert res["hits"]["hits"][0]["_source"]["cloud"]["provider"] == "aws"
assert res["hits"]["hits"][0]["_source"]["cloud"]["region"] == "eu-central-1"
assert res["hits"]["hits"][0]["_source"]["cloud"]["account"]["id"] == "000000000000"
assert res["hits"]["hits"][0]["_source"]["tags"] == ["forwarded", "aws-cloudtrail", "tag1", "tag2", "tag3"]

assert res["hits"]["hits"][1]["_source"]["message"] == fixtures[1].rstrip("\n")
assert res["hits"]["hits"][1]["_source"]["log"]["offset"] == 0
assert (
res["hits"]["hits"][1]["_source"]["log"]["file"]["path"]
== f"https://{s3_bucket_name}.s3.eu-central-1.amazonaws.com/{cloudtrail_filename_non_digest}"
)
assert res["hits"]["hits"][1]["_source"]["aws"]["s3"]["bucket"]["name"] == s3_bucket_name
assert res["hits"]["hits"][1]["_source"]["aws"]["s3"]["bucket"]["arn"] == f"arn:aws:s3:::{s3_bucket_name}"
assert res["hits"]["hits"][1]["_source"]["aws"]["s3"]["object"]["key"] == cloudtrail_filename_non_digest
assert res["hits"]["hits"][1]["_source"]["cloud"]["provider"] == "aws"
assert res["hits"]["hits"][1]["_source"]["cloud"]["region"] == "eu-central-1"
assert res["hits"]["hits"][1]["_source"]["cloud"]["account"]["id"] == "000000000000"
assert res["hits"]["hits"][1]["_source"]["tags"] == ["forwarded", "aws-cloudtrail", "tag1", "tag2", "tag3"]

logstash_message = self.logstash.get_messages(expected=2)
assert len(logstash_message) == 2
res["hits"]["hits"][0]["_source"]["tags"].remove("aws-cloudtrail")
res["hits"]["hits"][1]["_source"]["tags"].remove("aws-cloudtrail")

assert res["hits"]["hits"][0]["_source"]["aws"] == logstash_message[0]["aws"]
assert res["hits"]["hits"][0]["_source"]["cloud"] == logstash_message[0]["cloud"]
assert res["hits"]["hits"][0]["_source"]["log"] == logstash_message[0]["log"]
assert res["hits"]["hits"][0]["_source"]["message"] == logstash_message[0]["message"]
assert res["hits"]["hits"][0]["_source"]["tags"] == logstash_message[0]["tags"]

assert res["hits"]["hits"][1]["_source"]["aws"] == logstash_message[1]["aws"]
assert res["hits"]["hits"][1]["_source"]["cloud"] == logstash_message[1]["cloud"]
assert res["hits"]["hits"][1]["_source"]["log"] == logstash_message[1]["log"]
assert res["hits"]["hits"][1]["_source"]["message"] == logstash_message[1]["message"]
assert res["hits"]["hits"][1]["_source"]["tags"] == logstash_message[1]["tags"]

self.elasticsearch.refresh(index="logs-stash.elasticsearch-output")
assert self.elasticsearch.count(index="logs-stash.elasticsearch-output")["count"] == 2

res = self.elasticsearch.search(index="logs-stash.elasticsearch-output", sort="_seq_no")
assert res["hits"]["total"] == {"value": 2, "relation": "eq"}

assert res["hits"]["hits"][0]["_source"]["aws"] == logstash_message[0]["aws"]
assert res["hits"]["hits"][0]["_source"]["cloud"] == logstash_message[0]["cloud"]
assert res["hits"]["hits"][0]["_source"]["log"] == logstash_message[0]["log"]
assert res["hits"]["hits"][0]["_source"]["message"] == logstash_message[0]["message"]
assert res["hits"]["hits"][0]["_source"]["tags"] == logstash_message[0]["tags"]

assert res["hits"]["hits"][1]["_source"]["aws"] == logstash_message[1]["aws"]
assert res["hits"]["hits"][1]["_source"]["cloud"] == logstash_message[1]["cloud"]
assert res["hits"]["hits"][1]["_source"]["log"] == logstash_message[1]["log"]
assert res["hits"]["hits"][1]["_source"]["message"] == logstash_message[1]["message"]
assert res["hits"]["hits"][1]["_source"]["tags"] == logstash_message[1]["tags"]

def test_continuing(self) -> None:
assert isinstance(self.elasticsearch, ElasticsearchContainer)
assert isinstance(self.logstash, LogstashContainer)
Expand Down
14 changes: 9 additions & 5 deletions tests/handlers/aws/test_replay_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.

from typing import Optional
from unittest import TestCase

import mock
import pytest

from handlers.aws.replay_trigger import ReplayedEventReplayHandler, get_shipper_for_replay_event
from share import parse_config
from shippers import ElasticsearchShipper, LogstashShipper
from shippers import CompositeShipper, ElasticsearchShipper, LogstashShipper


@pytest.mark.unit
Expand All @@ -28,14 +29,15 @@ def test_get_shipper_for_replay_event(self) -> None:
"""
config = parse_config(config_yaml_kinesis)
replay_handler = ReplayedEventReplayHandler("arn:aws:sqs:eu-central-1:123456789:queue/replayqueue")
logstash_shipper = get_shipper_for_replay_event(
logstash_shipper: Optional[CompositeShipper] = get_shipper_for_replay_event(
config,
"logstash",
{},
"arn:aws:kinesis:eu-central-1:123456789:stream/test-esf-kinesis-stream",
replay_handler,
)
assert isinstance(logstash_shipper, LogstashShipper)
assert isinstance(logstash_shipper, CompositeShipper)
assert isinstance(logstash_shipper._shippers[0], LogstashShipper)

with self.subTest("Elasticsearch shipper from replay event"):
config_yaml_kinesis = """
Expand All @@ -52,14 +54,16 @@ def test_get_shipper_for_replay_event(self) -> None:
"""
config = parse_config(config_yaml_kinesis)
replay_handler = ReplayedEventReplayHandler("arn:aws:sqs:eu-central-1:123456789:queue/replayqueue")
elasticsearch_shipper = get_shipper_for_replay_event(
elasticsearch_shipper: Optional[CompositeShipper] = get_shipper_for_replay_event(
config,
"elasticsearch",
{"es_datastream_name": "es_datastream_name"},
"arn:aws:kinesis:eu-central-1:123456789:stream/test-esf-kinesis-stream",
replay_handler,
)
assert isinstance(elasticsearch_shipper, ElasticsearchShipper)

assert isinstance(elasticsearch_shipper, CompositeShipper)
assert isinstance(elasticsearch_shipper._shippers[0], ElasticsearchShipper)

with self.subTest("None shipper from replay event"):
config_yaml_kinesis = """
Expand Down
4 changes: 3 additions & 1 deletion tests/shippers/test_logstash.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ def request_callback(request: PreparedRequest) -> tuple[int, dict[Any, Any], str
_payload.append(ujson.loads(event))

expected_event = deepcopy(_dummy_expected_event)
expected_event["_id"] = "_id"
expected_event["@metadata"] = {"_id": "_id"}
del expected_event["_id"]

assert _payload == [expected_event, expected_event]

return 200, {}, "okay"
Expand Down
11 changes: 9 additions & 2 deletions tests/testcontainers/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_container_is_ready

DEFAULT_USERNAME = "elastic"
DEFAULT_PASSWORD = "password"


class ElasticsearchContainer(DockerContainer): # type: ignore
"""
Expand All @@ -30,8 +33,8 @@ class ElasticsearchContainer(DockerContainer): # type: ignore
_DEFAULT_IMAGE = "docker.elastic.co/elasticsearch/elasticsearch"
_DEFAULT_VERSION = "7.16.3"
_DEFAULT_PORT = 9200
_DEFAULT_USERNAME = "elastic"
_DEFAULT_PASSWORD = "password"
_DEFAULT_USERNAME = DEFAULT_USERNAME
_DEFAULT_PASSWORD = DEFAULT_PASSWORD

def __init__(
self,
Expand Down Expand Up @@ -85,6 +88,7 @@ def _configure(self) -> None:
"xpack.security.enabled": "true",
"discovery.type": "single-node",
"network.bind_host": "0.0.0.0",
"network.publish_host": "0.0.0.0",
"logger.org.elasticsearch": "DEBUG",
"xpack.security.http.ssl.enabled": "true",
"xpack.security.http.ssl.keystore.path": "/usr/share/elasticsearch/config/certs/localhost/"
Expand Down Expand Up @@ -141,6 +145,9 @@ def reset(self) -> None:
for index in self._index_indices:
self.es_client.indices.delete_data_stream(name=index)

if self.es_client.indices.exists(index="logs-stash.elasticsearch-output"):
self.es_client.indices.delete_data_stream(name="logs-stash.elasticsearch-output")

self._index_indices = set()

for pipeline_id in self._pipelines_ids:
Expand Down
Loading

0 comments on commit 1a85f73

Please sign in to comment.