Skip to content

Commit

Permalink
Merge branch 'main' into remove-sqs-delete-message-calls
Browse files Browse the repository at this point in the history
  • Loading branch information
girodav committed Nov 16, 2023
2 parents b7c5941 + f5ffa15 commit c8b29b3
Show file tree
Hide file tree
Showing 18 changed files with 356 additions and 45 deletions.
4 changes: 2 additions & 2 deletions .internal/aws/cloudformation/macro.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ Resources:
parameters = event["templateParameterValues"]
if "ElasticServerlessForwarderS3ConfigFile" in parameters:
bucket_name_and_object_key = parameters["ElasticServerlessForwarderS3ConfigFile"].replace("s3://", "")
resource = f"arn:aws:s3:::{bucket_name_and_object_key}"
resource = f"arn:%awsOrGov%:s3:::{bucket_name_and_object_key}"
if len(resource) > 0:
policy_fragment["Properties"]["PolicyDocument"]["Statement"].append(
{
Expand Down Expand Up @@ -246,7 +246,7 @@ Resources:
MacroElasticServerlessForwarder:
Type: AWS::CloudFormation::Macro
Properties:
Description: Expand parameters to Events and Policy for %sarAppName%
Description: Expand parameters to Events and Policy for %sarAppName% in %awsOrGov%
FunctionName: !GetAtt MacroElasticServerlessForwarderFunction.Arn
Name: %sarAppName%-macro
Metadata:
Expand Down
4 changes: 2 additions & 2 deletions .internal/aws/cloudformation/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ Resources:
Type: AWS::Serverless::Application
Properties:
Location:
ApplicationId: arn:aws:serverlessrepo:%awsRegion%:%accountID%:applications/helper-macro-%sarAppName%
ApplicationId: arn:%awsOrGov%:serverlessrepo:%awsRegion%:%accountID%:applications/helper-macro-%sarAppName%
SemanticVersion: %semanticVersion%
ElasticServerlessForwarderApplication:
Type: AWS::Serverless::Application
Properties:
Location:
ApplicationId: arn:aws:serverlessrepo:%awsRegion%:%accountID%:applications/helper-application-%sarAppName%
ApplicationId: arn:%awsOrGov%:serverlessrepo:%awsRegion%:%accountID%:applications/helper-application-%sarAppName%
SemanticVersion: %semanticVersion%
Parameters:
ElasticServerlessForwarderS3ConfigFile: !Ref ElasticServerlessForwarderS3ConfigFile
Expand Down
21 changes: 16 additions & 5 deletions .internal/aws/scripts/dist.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,18 @@ CODE_URI="${TMPDIR}/sources"

trap "rm -rf ${TMPDIR}" EXIT

aws s3api get-bucket-location --bucket "${BUCKET}" || aws s3api create-bucket --acl private --bucket "${BUCKET}" --region "${REGION}" --create-bucket-configuration LocationConstraint="${REGION}"
aws s3api get-bucket-location --bucket "${BUCKET}" --region "${REGION}" || aws s3api create-bucket --acl private --bucket "${BUCKET}" --region "${REGION}" --create-bucket-configuration LocationConstraint="${REGION}"

# Check if region is in AWS GovCloud and create bucket arn
if [[ ${REGION} == *"$gov"* ]]; then
BUCKET_ARN="arn:aws-us-gov:s3:::${BUCKET}"
AWS_OR_AWS_GOV="aws-us-gov"
else
BUCKET_ARN="arn:aws:s3:::${BUCKET}"
AWS_OR_AWS_GOV="aws"
fi

BUCKET_RESOURCE="${BUCKET_ARN}/*"

cat <<EOF > "${TMPDIR}/policy.json"
{
Expand All @@ -47,7 +58,7 @@ cat <<EOF > "${TMPDIR}/policy.json"
"Service": "serverlessrepo.amazonaws.com"
},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::${BUCKET}/*",
"Resource": "${BUCKET_RESOURCE}",
"Condition" : {
"StringEquals": {
"aws:SourceAccount": "${ACCOUNT_ID}"
Expand All @@ -58,7 +69,7 @@ cat <<EOF > "${TMPDIR}/policy.json"
}
EOF

aws s3api put-bucket-policy --bucket "${BUCKET}" --policy "file://${TMPDIR}/policy.json"
aws s3api put-bucket-policy --bucket "${BUCKET}" --region "${REGION}" --policy "file://${TMPDIR}/policy.json"
mkdir -v -p "${CODE_URI}"
cp -v requirements.txt "${CODE_URI}/"
cp -v main_aws.py "${CODE_URI}/"
Expand All @@ -67,8 +78,8 @@ find {handlers,share,shippers,storage} -not -name "*__pycache__*" -name "*.py" -
cp -v LICENSE.txt "${CODE_URI}/LICENSE.txt"
cp -v docs/README-AWS.md "${CODE_URI}/README.md"

sed -e "s|%codeUri%|${CODE_URI}|g" -e "s/%sarAppName%/${SAR_APP_NAME}/g" -e "s/%sarAuthorName%/${SAR_AUTHOR_NAME}/g" -e "s/%semanticVersion%/${SEMANTIC_VERSION}/g" -e "s/%awsRegion%/${REGION}/g" .internal/aws/cloudformation/macro.yaml > "${TMPDIR}/macro.yaml"
sed -e "s|%codeUri%|${CODE_URI}|g" -e "s/%sarAppName%/${SAR_APP_NAME}/g" -e "s/%sarAuthorName%/${SAR_AUTHOR_NAME}/g" -e "s/%semanticVersion%/${SEMANTIC_VERSION}/g" -e "s/%awsRegion%/${REGION}/g" -e "s/%accountID%/${ACCOUNT_ID}/g" .internal/aws/cloudformation/template.yaml > "${TMPDIR}/template.yaml"
sed -e "s|%codeUri%|${CODE_URI}|g" -e "s/%sarAppName%/${SAR_APP_NAME}/g" -e "s/%sarAuthorName%/${SAR_AUTHOR_NAME}/g" -e "s/%semanticVersion%/${SEMANTIC_VERSION}/g" -e "s/%awsRegion%/${REGION}/g" -e "s/%awsOrGov%/${AWS_OR_AWS_GOV}/g" .internal/aws/cloudformation/macro.yaml > "${TMPDIR}/macro.yaml"
sed -e "s|%codeUri%|${CODE_URI}|g" -e "s/%sarAppName%/${SAR_APP_NAME}/g" -e "s/%sarAuthorName%/${SAR_AUTHOR_NAME}/g" -e "s/%semanticVersion%/${SEMANTIC_VERSION}/g" -e "s/%awsRegion%/${REGION}/g" -e "s/%accountID%/${ACCOUNT_ID}/g" -e "s/%awsOrGov%/${AWS_OR_AWS_GOV}/g" .internal/aws/cloudformation/template.yaml > "${TMPDIR}/template.yaml"
sed -e "s|%codeUri%|${CODE_URI}|g" -e "s/%sarAppName%/${SAR_APP_NAME}/g" -e "s/%sarAuthorName%/${SAR_AUTHOR_NAME}/g" -e "s/%semanticVersion%/${SEMANTIC_VERSION}/g" -e "s/%awsRegion%/${REGION}/g" -e "s/%codeURIBucket%/${BUCKET}/g" .internal/aws/cloudformation/application.yaml > "${TMPDIR}/application.yaml"

sam build --debug --use-container --build-dir "${TMPDIR}/.aws-sam/build/macro" --template-file "${TMPDIR}/macro.yaml" --region "${REGION}"
Expand Down
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
### v1.10.0 - 2023/10/27
##### Features
* Move `_id` field to `@metadata._id` in logstash output: [#507](https://github.com/elastic/elastic-serverless-forwarder/pull/507)

### v1.9.0 - 2023/08/24
##### Features
* Allow the possibility to set a prefix for role and policy when deploying with the `publish_lambda.sh` script: [#399](https://github.com/elastic/elastic-serverless-forwarder/pull/399)


### v1.8.1 - 2023/05/04
##### Bug fixes
* Explicitly set `SqsManagedSseEnabled` in CF template for replay and continuing queues for stack created before September/October 2022: [#353](https://github.com/elastic/elastic-serverless-forwarder/pull/353)
Expand Down
4 changes: 2 additions & 2 deletions docs/en/aws-deploy-elastic-serverless-forwarder.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ For `elasticsearch` the following arguments are supported:
* `args.es_datastream_name`: Name of data stream or index where logs should be forwarded to. Lambda supports automatic routing of various {aws} service logs to the corresponding data streams for further processing and storage in the {es} cluster. It supports automatic routing of `aws.cloudtrail`, `aws.cloudwatch_logs`, `aws.elb_logs`, `aws.firewall_logs`, `aws.vpcflow`, and `aws.waf` logs. For other log types, if using data streams, you can optionally set its value in the configuration file according to the naming convention for data streams and available integrations. If the `es_datastream_name` is not specified and it cannot be matched with any of the above {aws} services, then the value will be set to `logs-generic-default`. In versions **v0.29.1** and below, this configuration parameter was named `es_index_or_datastream_name`. Rename the configuration parameter to `es_datastream_name` in your `config.yaml` file on the S3 bucket to continue using it in the future version. The older name `es_index_or_datastream_name` is deprecated as of version **v0.30.0**. The related backward compatibility code is removed from version **v1.0.0**.
* `args.batch_max_actions`: (Optional) Maximum number of actions to send in a single bulk request. Default value: 500.
* `args.batch_max_bytes`: (Optional) Maximum size in bytes to send in a single bulk request. Default value: 10485760 (10MB).
* `args.ssl_assert_fingerprint`: (Optional) SSL fingerprint for self-signed SSL certificate on HTTPS transport.
* `args.ssl_assert_fingerprint`: (Optional) SSL fingerprint for self-signed SSL certificate on HTTPS transport. The default value is an empty string, meaning the HTTP client requires a valid certificate.

For `logstash` the following arguments are supported:

Expand All @@ -234,7 +234,7 @@ For `logstash` the following arguments are supported:
* `args.password`: (Optional) Password of the {ls} instance to connect to. Mandatory if HTTP Basic authentication is enabled in {ls}.
* `args.max_batch_size`: (Optional) Maximum number of events to send in a single HTTP(s) request. Default value: 500
* `args.compression_level`: (Optional) The GZIP compression level for HTTP(s) requests towards {ls}. It can be any integer value between 1 (minimum compression, best performance, highest amount of bytes sent) and 9 (maximum compression, worst performance, lowest amount of bytes sent). Default value: 1
* `args.ssl_assert_fingerprint`: (Optional) SSL fingerprint for self-signed SSL certificate on HTTPS transport.
* `args.ssl_assert_fingerprint`: (Optional) SSL fingerprint for self-signed SSL certificate on HTTPS transport. The default value is an empty string, meaning the HTTP client requires a valid certificate.

[discrete]
[[aws-serverless-forwarder-define-deploy-parameters]]
Expand Down
73 changes: 73 additions & 0 deletions docs/en/aws-elastic-serverless-forwarder-configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -569,3 +569,76 @@ image:images/multiline-regexp-test-repl-main.png[Add your test message to Multil

[role="screenshot"]
image:images/multiline-regexp-test-repl-run.png[View the test results]

[discrete]
[[aws-serverless-manage-self-signed-certificates]]
== Manage self-signed certificates

From v1.5.0, ESF introduced the SSL fingerprint option to access Elasticsearch clusters using self-signed certificates.

[discrete]
[[aws-serverless-manage-self-signed-certificates-config]]
=== Configuration options

To set the `ssl_assert_fingerprint` option, you must edit the config file stored in the S3 bucket.

Suppose you have a `config.yml` file stored in the bucket with the following content:

[source, yaml]
----
inputs:
- type: "s3-sqs"
id: "arn:aws:sqs:eu-west-1:123456789:dev-access-logs"
outputs:
- type: "elasticsearch"
args:
api_key: "<REDACTED>"
es_datastream_name: "logs-aws.s3access-default"
batch_max_actions: 500
batch_max_bytes: 10485760
ssl_assert_fingerprint: ""
----

If the configuration omits the `ssl_assert_fingerprint` or, like in this example, is empty (the default option), the HTTP client validates the certificates of Elasticsearch clusters.

[discrete]
[[aaws-serverless-manage-self-signed-certificates-get-ssl-fingerprint]]
=== Get the SSL fingerprint

The next step is to get the fingerprint of the HTTPS certificate your Elasticsearch cluster is using now.

You can use OpenSSL to get the fingerprint for your certificate. Here's an example using an Elasticsearch cluster hosted on Elastic Cloud:

[source, shell]
----
$ openssl s_client \
-connect my-deployment.es.eastus2.azure.elastic-cloud.com:443 \
-showcerts </dev/null 2>/dev/null | openssl x509 -noout -fingerprint
SHA1 Fingerprint=1C:46:32:75:AA:D6:F1:E2:8E:10:A3:64:44:B1:36:C9:7D:44:35:B4
----

You can use your DNS name, IP address, and port number instead of `my-deployment.es.eastus2.azure.elastic-cloud.com:443` from the above example.

Copy your fingerprint value for the next step.

[discrete]
[[aaws-serverless-manage-self-signed-certificates-set-ssl-fingerprint]]
=== Set the SSL fingerprint

As a final step, edit your `config.yml` file to use the SSL fingerprint:

[source, yaml]
----
inputs:
- type: "s3-sqs"
id: "arn:aws:sqs:eu-west-1:123456789:dev-access-logs"
outputs:
- type: "elasticsearch"
args:
api_key: "<REDACTED>"
es_datastream_name: "logs-aws.s3access-default"
batch_max_actions: 500
batch_max_bytes: 10485760
ssl_assert_fingerprint: "1C:46:32:75:AA:D6:F1:E2:8E:10:A3:64:44:B1:36:C9:7D:44:35:B4"
----
6 changes: 4 additions & 2 deletions handlers/aws/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from share import ExpandEventListFromField, json_parser, parse_config, shared_logger
from share.secretsmanager import aws_sm_expander
from shippers import EVENT_IS_FILTERED, EVENT_IS_SENT, CompositeShipper, ProtocolShipper
from shippers import EVENT_IS_FILTERED, EVENT_IS_SENT, CompositeShipper

from .cloudwatch_logs_trigger import (
_from_awslogs_data_to_event,
Expand Down Expand Up @@ -82,7 +82,7 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex

replay_queue_arn = lambda_event["Records"][0]["eventSourceARN"]
replay_handler = ReplayedEventReplayHandler(replay_queue_arn=replay_queue_arn)
shipper_cache: dict[str, ProtocolShipper] = {}
shipper_cache: dict[str, CompositeShipper] = {}
for replay_record in lambda_event["Records"]:
event = json_parser(replay_record["body"])
input_id = event["event_input_id"]
Expand All @@ -108,6 +108,8 @@ def lambda_handler(lambda_event: dict[str, Any], lambda_context: context_.Contex
else:
shipper = shipper_cache[shipper_id]

assert isinstance(shipper, CompositeShipper)

shipper.send(event["event_payload"])
event_uniq_id: str = event["event_payload"]["_id"] + output_type
replay_handler.add_event_with_receipt_handle(
Expand Down
19 changes: 13 additions & 6 deletions handlers/aws/replay_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any, Optional

from share import Config, ElasticsearchOutput, Input, LogstashOutput, Output, shared_logger
from shippers import ProtocolShipper, ShipperFactory
from shippers import CompositeShipper, ProtocolShipper, ShipperFactory

from .exceptions import InputConfigException, OutputConfigException, ReplayHandlerException
from .utils import delete_sqs_record
Expand Down Expand Up @@ -41,7 +41,7 @@ def get_shipper_for_replay_event(
output_args: dict[str, Any],
event_input_id: str,
replay_handler: ReplayedEventReplayHandler,
) -> Optional[ProtocolShipper]:
) -> Optional[CompositeShipper]:
event_input: Optional[Input] = config.get_input_by_id(event_input_id)
if event_input is None:
raise InputConfigException(f"Cannot load input for input id {event_input_id}")
Expand All @@ -50,21 +50,28 @@ def get_shipper_for_replay_event(
if output is None:
raise OutputConfigException(f"Cannot load output of type {output_type}")

# Let's wrap the specific output shipper in the composite one, since the composite deepcopy the mutating events
shipper: CompositeShipper = CompositeShipper()

if output_type == "elasticsearch":
assert isinstance(output, ElasticsearchOutput)
output.es_datastream_name = output_args["es_datastream_name"]
shared_logger.debug("setting ElasticSearch shipper")
elasticsearch: ProtocolShipper = ShipperFactory.create_from_output(output_type=output_type, output=output)
elasticsearch.set_replay_handler(replay_handler=replay_handler.replay_handler)

return elasticsearch
shipper.add_shipper(elasticsearch)
shipper.set_replay_handler(replay_handler=replay_handler.replay_handler)

return shipper

if output_type == "logstash":
assert isinstance(output, LogstashOutput)
shared_logger.debug("setting Logstash shipper")
logstash: ProtocolShipper = ShipperFactory.create_from_output(output_type=output_type, output=output)
logstash.set_replay_handler(replay_handler=replay_handler.replay_handler)

return logstash
shipper.add_shipper(logstash)
shipper.set_replay_handler(replay_handler=replay_handler.replay_handler)

return shipper

return None
16 changes: 13 additions & 3 deletions handlers/aws/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,13 @@ def get_continuing_original_input_type(sqs_record: dict[str, Any]) -> Optional[s

original_event_source: str = sqs_record["messageAttributes"]["originalEventSourceARN"]["stringValue"]

if original_event_source.startswith("arn:aws:logs"):
if original_event_source.startswith("arn:aws:logs") or original_event_source.startswith("arn:aws-us-gov:logs"):
return "cloudwatch-logs"

if original_event_source.startswith("arn:aws:kinesis") and original_event_source.find(":stream/") > -1:
if (
original_event_source.startswith("arn:aws:kinesis")
or original_event_source.startswith("arn:aws-us-gov:kinesis")
) and original_event_source.find(":stream/") > -1:
return "kinesis-data-stream"

return None
Expand Down Expand Up @@ -392,7 +395,14 @@ def get_input_from_log_group_subscription_data(
assert "Regions" in all_regions
for region_data in all_regions["Regions"]:
region = region_data["RegionName"]
log_stream_arn = f"arn:aws:logs:{region}:{account_id}:log-group:{log_group_name}:log-stream:{log_stream_name}"

aws_or_gov = "aws"
if "gov" in region:
aws_or_gov = "aws-us-gov"

log_stream_arn = (
f"arn:{aws_or_gov}:logs:{region}:{account_id}:log-group:{log_group_name}:log-stream:{log_stream_name}"
)
event_input = config.get_input_by_id(log_stream_arn)

if event_input is not None:
Expand Down
23 changes: 17 additions & 6 deletions publish_lambda.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,18 @@ GIT_REPO="https://github.com/elastic/elastic-serverless-forwarder.git"

trap 'rm -rf ${TMPDIR}' EXIT

aws s3api get-bucket-location --bucket "${BUCKET}" || aws s3api create-bucket --acl private --bucket "${BUCKET}" --region "${REGION}" --create-bucket-configuration LocationConstraint="${REGION}"
aws s3api get-bucket-location --bucket "${BUCKET}" --region "${REGION}" || aws s3api create-bucket --acl private --bucket "${BUCKET}" --region "${REGION}" --create-bucket-configuration LocationConstraint="${REGION}"

# Check if region is in AWS GovCloud and create bucket arn
if [[ ${REGION} == *"$gov"* ]]; then
BUCKET_ARN="arn:aws-us-gov:s3:::${BUCKET}"
AWS_OR_AWS_GOV="aws-us-gov"
else
BUCKET_ARN="arn:aws:s3:::${BUCKET}"
AWS_OR_AWS_GOV="aws"
fi

BUCKET_RESOURCE="${BUCKET_ARN}/*"

mkdir -v -p "${CLONED_FOLDER}"
git clone --depth 1 --branch "${TAG_NAME}" "${GIT_REPO}" "${CLONED_FOLDER}"
Expand Down Expand Up @@ -322,7 +333,7 @@ def create_policy(publish_config: dict[str, Any]):
assert isinstance(publish_config["s3-config-file"], str)
bucket_name_and_object_key = publish_config["s3-config-file"].replace("s3://", "")
resource = f"arn:aws:s3:::{bucket_name_and_object_key}"
resource = f"arn:${AWS_OR_AWS_GOV}:s3:::{bucket_name_and_object_key}"
if len(resource) > 0:
policy_fragment["Properties"]["PolicyDocument"]["Statement"].append(
{"Effect": "Allow", "Action": "s3:GetObject", "Resource": resource}
Expand Down Expand Up @@ -471,18 +482,18 @@ if __name__ == "__main__":
},
"RoleName": "${CUSTOM_ROLE_PREFIX}ApplicationElasticServerlessForwarderRole",
"ManagedPolicyArns": [
"arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:aws:iam::aws:policy/service-role/AWSLambdaSQSQueueExecutionRole"
"arn:${AWS_OR_AWS_GOV}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
"arn:${AWS_OR_AWS_GOV}:iam::aws:policy/service-role/AWSLambdaSQSQueueExecutionRole"
],
},
}
if vpc_config:
customRole["Properties"]["ManagedPolicyArns"].append("arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole")
customRole["Properties"]["ManagedPolicyArns"].append("arn:${AWS_OR_AWS_GOV}:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole")
has_kinesis_events: bool = len([created_event for created_event in created_events if created_events[created_event]["Type"] == "Kinesis"]) > 0
if has_kinesis_events:
customRole["Properties"]["ManagedPolicyArns"].append("arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole")
customRole["Properties"]["ManagedPolicyArns"].append("arn:${AWS_OR_AWS_GOV}:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole")
cloudformation_yaml["Resources"]["ApplicationElasticServerlessForwarderCustomRole"] = customRole
cloudformation_yaml["Resources"]["ApplicationElasticServerlessForwarder"]["Properties"]["Role"] = {"Fn::GetAtt": ["ApplicationElasticServerlessForwarderCustomRole", "Arn"] }
Expand Down
8 changes: 4 additions & 4 deletions requirements-tests.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
mock==5.1.0
pytest==7.4.2
pytest==7.4.3
pytest-cov==4.1.0
pytest-benchmark==4.0.0
coverage==7.3.2
simplejson==3.19.2
orjson==3.9.9
orjson==3.9.10
pysimdjson==5.0.2
python-rapidjson==1.12
python-rapidjson==1.13
cysimdjson==23.8
responses==0.23.3
testcontainers==3.7.1
pyOpenSSL==23.2.0
pyOpenSSL==23.3.0
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
elastic-apm==6.19.0
boto3==1.28.68
boto3==1.28.80
ecs_logging==2.1.0
elasticsearch==7.16.3
PyYAML==6.0.1
Expand Down
Loading

0 comments on commit c8b29b3

Please sign in to comment.