forked from aws-samples/serverless-test-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_api_gateway.py
103 lines (83 loc) · 4.1 KB
/
test_api_gateway.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import os
from unittest import TestCase
from uuid import uuid4
from boto3.dynamodb.conditions import Key
import boto3
import requests
"""
Set the environment variable AWS_SAM_STACK_NAME
to match the name of the stack you will test
AWS_SAM_STACK_NAME=<stack-name> python -m pytest -s tests/integration -v
"""
class TestApiGateway(TestCase):
api_endpoint: str
aws_region = os.environ.get("AWS_DEFAULT_REGION") or "us-east-1"
@classmethod
def get_stack_name(cls) -> str:
stack_name = os.environ.get("AWS_SAM_STACK_NAME")
if not stack_name:
raise Exception(
"Cannot find env var AWS_SAM_STACK_NAME. \n"
"Please setup this environment variable with the stack name where we are running integration tests."
)
return stack_name
def setUp(self) -> None:
"""
Based on the provided env variable AWS_SAM_STACK_NAME,
We use the cloudformation API to retrieve the HelloPersonApi URL and the DynamoDB Table Name
We also seed the DynamoDB Table for the test
"""
stack_name = TestApiGateway.get_stack_name()
client = boto3.client("cloudformation")
try:
response = client.describe_stacks(StackName=stack_name)
except Exception as e:
raise Exception(
f"Cannot find stack {stack_name}. \n" f'Please make sure stack with the name "{stack_name}" exists.'
) from e
# HelloPersonApi
stack_outputs = response["Stacks"][0]["Outputs"]
api_outputs = [output for output in stack_outputs if output["OutputKey"] == "HelloPersonApi"]
self.assertTrue(api_outputs, f"Cannot find output HelloPersonApi in stack {stack_name}")
self.api_endpoint = api_outputs[0]["OutputValue"]
# DynamoDBTableName
dynamodb_outputs = [output for output in stack_outputs if output["OutputKey"] == "DynamoDBTableName"]
self.assertTrue(dynamodb_outputs, f"Cannot find output DynamoDBTableName in stack {stack_name}")
self.dynamodb_table_name = dynamodb_outputs[0]["OutputValue"]
# Create a random postfix for the id's to prevent data collions between tests
# Using unique id's per unit test will isolate test data
self.id_postfix = "_" + str(uuid4())
# Seed the DynamoDB Table with Test Data
dynamodb_resource = boto3.resource("dynamodb", region_name = self.aws_region)
dynamodb_table = dynamodb_resource.Table(name=self.dynamodb_table_name)
dynamodb_table.put_item(Item={"PK": "TEST001" + self.id_postfix,
"SK": "NAME#",
"data": "Unit Test Name Data"})
def tearDown(self) -> None:
"""
# For tear-down, remove any data injected for the tests
# Take particular care to ensure these values are unique and identifiable as TEST data.
"""
dynamodb_resource = boto3.resource("dynamodb", region_name = self.aws_region)
dynamodb_table = dynamodb_resource.Table(name=self.dynamodb_table_name)
for id in ["TEST001" + self.id_postfix,"TEST002" + self.id_postfix]:
id_items = dynamodb_table.query(
KeyConditionExpression=Key('PK').eq(id)
)
if "Items" in id_items:
for item in id_items["Items"]:
dynamodb_table.delete_item(Key={"PK":item["PK"],"SK":item["SK"]})
def test_api_gateway_200(self):
"""
Call the API Gateway endpoint and check the response for a 200
"""
response = requests.get(self.api_endpoint.replace("{id}","TEST001" + self.id_postfix))
self.assertEqual(response.status_code, requests.codes.ok)
def test_api_gateway_404(self):
"""
Call the API Gateway endpoint and check the response for a 404 (id not found)
"""
response = requests.get(self.api_endpoint.replace("{id}","TEST002" + self.id_postfix))
self.assertEqual(response.status_code, requests.codes.not_found)