Skip to content

Commit

Permalink
Add support/signer for manual signing
Browse files Browse the repository at this point in the history
exodus-lambda had a support/signer script which reused the exodus-lambda
code to make signed URLs, but that'll go away since the signing logic
will be cleaned up from exodus-lambda.

Reimplement a similar script here reusing the exodus-gw code. Can be
used to manually check the signing mechanism in exodus-gw.
  • Loading branch information
rohanpm committed Oct 29, 2023
1 parent ed55190 commit e27901a
Showing 1 changed file with 122 additions and 0 deletions.
122 changes: 122 additions & 0 deletions support/signer
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/usr/bin/env python3
#
# Helper script for manual generation of signed URLs.
# Primarily for testing of the signing mechanism.
#
# Requires exodus-gw to be installed in the current python environment,
# as it reuses exodus-gw code.
#
# Example usage:
#
# support/signer \
# --key-id AABBCC \
# --key ~/src/ansible-playbooks/exodus-gw-playbooks/vaults/qa/exodus_private_key \
# https://d350xxxyyy.cloudfront.net/content/signing-test
#
import argparse
import logging
import os
import sys
import traceback
from urllib.parse import urlparse

import requests

from exodus_gw.routers.cdn import sign_url
from exodus_gw.settings import Environment, Settings

LOG = logging.getLogger("signer")


def test_request(url: str):
"""Verify that request to 'url' successfully reaches an
instance of exodus-lambda.
If it does not, the script exits with a non-zero exit code.
"""

headers = {"X-Exodus-Query": "1"}

try:
LOG.debug(
"Testing request to %s",
url,
)
s = requests.Session()

response = s.head(url, headers=headers, allow_redirects=True)
if "X-Exodus-Version" in response.headers:
LOG.debug("Test succeeded")
return

raise RuntimeError("no X-Exodus-Version found in response")
except Exception:
traceback.print_exc()
print(
(
"Signature check failed. Please confirm the correct key, "
"key ID and URL were provided, or use `--skip-test' to ignore "
"this error."
),
file=sys.stderr,
)
sys.exit(20)


def main():
p = argparse.ArgumentParser()
p.add_argument(
"--key",
help="Path to private key used to generate signature",
required=True,
)
p.add_argument(
"--key-id",
help="Key ID for corresponding CloudFront public key",
required=True,
)
p.add_argument(
"--skip-test",
help="Do not test the generated signature",
action="store_true",
)
p.add_argument("--debug", help="Verbose logging", action="store_true")
p.add_argument("url", help="URL to be signed")

args = p.parse_args()

logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO)

# This script always hardcodes <hostname>/_/cookie as the base CDN url.
url_bits = urlparse(str(args.url))
cdn_url = f"{url_bits.scheme}://{url_bits.netloc}/_/cookie"

# This is what the signing code actually expects to receive: no leading
# "/" since the FastAPI router will have already stripped that.
relative_path = url_bits.path.removeprefix("/")

# Make up settings/environment objects needed by the signing code.
settings = Settings()
env = Environment(
# Most of these fields aren't going to be accessed by the signing code.
name="signer",
aws_profile="",
bucket="",
table="",
config_table="",
cdn_url=cdn_url,
cdn_key_id=args.key_id,
)
os.environ["EXODUS_GW_CDN_PRIVATE_KEY_SIGNER"] = open(args.key).read()

LOG.info("Using base CDN url: %s, signing: %s", cdn_url, relative_path)

signed = sign_url(relative_path, settings, env, "support/signer")

test_request(signed)

print(signed)


if __name__ == "__main__":
main()

0 comments on commit e27901a

Please sign in to comment.