This is an Amazon Kinesis proxy project using API Gateway for CDK development with Python.
The cdk.json
file tells the CDK Toolkit how to execute your app.
This project is set up like a standard Python project. The initialization
process also creates a virtualenv within this project, stored under the .venv
directory. To create the virtualenv it assumes that there is a python3
(or python
for Windows) executable in your path with access to the venv
package. If for any reason the automatic creation of the virtualenv fails,
you can create the virtualenv manually.
To manually create a virtualenv on MacOS and Linux:
$ python3 -m venv .venv
After the init process completes and the virtualenv is created, you can use the following step to activate your virtualenv.
$ source .venv/bin/activate
If you are a Windows platform, you would activate the virtualenv like this:
% .venv\Scripts\activate.bat
Once the virtualenv is activated, you can install the required dependencies.
(.venv) $ pip install -r requirements.txt
At this point you can now synthesize the CloudFormation template for this code.
(.venv) $ cdk synth \ -c vpc_name=default \ --parameters SourceKinesisStreams='your-kinesis-stream-name'
Use cdk deploy
command to create the stack shown above.
(.venv) $ cdk deploy --require-aproval never \ -c vpc_name=default \ --parameters SourceKinesisStreams='your-kinesis-stream-name'
To add additional dependencies, for example other CDK libraries, just add
them to your setup.py
file and rerun the pip install -r requirements.txt
command.
Let's test if our lambda function is protected by the authorizer. In order to test the flow we have to:
-
Register a Cognito User, using the aws cli
aws cognito-idp sign-up \ --client-id your-user-pool-client-id \ --username "[email protected]" \ --password "user-password"
Note: You can find
UserPoolClientId
with the following command:aws cloudformation describe-stacks --stack-name your-cloudformation-stack-name | jq -r '.Stacks[0].Outputs | map(select(.OutputKey == "UserPoolClientId")) | .[0].OutputValue'
-
Confirm the user, so they can log in:
aws cognito-idp admin-confirm-sign-up \ --user-pool-id your-user-pool-id \ --username "[email protected]"
At this point if you look at your cognito user pool, you would see that the user is confirmed and ready to log in:
Note: You can find
UserPoolId
with the following command:aws cloudformation describe-stacks --stack-name your-cloudformation-stack-name | jq -r '.Stacks[0].Outputs | map(select(.OutputKey == "UserPoolId")) | .[0].OutputValue'
-
Log the user in to get an identity JWT token
aws cognito-idp initiate-auth \ --auth-flow USER_PASSWORD_AUTH \ --auth-parameters USERNAME="[email protected]",PASSWORD="user-password" \ --client-id your-user-pool-client-id
Hit our Api to test the Authorizer; use the token to invoke our API endpoint which will call the function (if the token is valid)
-
GET /streams
method to invokeListStreams
in Kinesis$ MY_ID_TOKEN=$(aws cognito-idp initiate-auth --auth-flow USER_PASSWORD_AUTH --auth-parameters USERNAME="[email protected]",PASSWORD="user-password" --client-id your-user-pool-client-id | jq -r '.AuthenticationResult.IdToken') $ curl -X GET https://your-api-gateway-id.execute-api.us-east-1.amazonaws.com/v1/streams \ --header "Authorization: ${MY_ID_TOKEN}"
The response is:
{ "HasMoreStreams": false, "StreamNames": [ "PUT-Firehose-aEhWz" ], "StreamSummaries": [ { "StreamARN": "arn:aws:kinesis:us-east-1:123456789012:stream/PUT-Firehose-aEhWz", "StreamCreationTimestamp": 1661612556, "StreamModeDetails": { "StreamMode": "ON_DEMAND" }, "StreamName": "PUT-Firehose-aEhWz", "StreamStatus": "ACTIVE" } ] }
-
GET /streams/{stream-name}
method to invokeDescribeStream
in Kinesis$ MY_ID_TOKEN=$(aws cognito-idp initiate-auth --auth-flow USER_PASSWORD_AUTH --auth-parameters USERNAME="[email protected]",PASSWORD="user-password" --client-id your-user-pool-client-id | jq -r '.AuthenticationResult.IdToken') $ curl -X GET https://your-api-gateway-id.execute-api.us-east-1.amazonaws.com/v1/streams/PUT-Firehose-aEhWz \ --header "Authorization: ${MY_ID_TOKEN}"
The response is:
{ "StreamDescription": { "EncryptionType": "KMS", "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "HasMoreShards": false, "KeyId": "alias/aws/kinesis", "RetentionPeriodHours": 24, "Shards": [ { "HashKeyRange": { "EndingHashKey": "340282366920938463463374607431768211455", "StartingHashKey": "0" }, "SequenceNumberRange": { "StartingSequenceNumber": "49632740478318570836537313591685157894516301790768529410" }, "ShardId": "shardId-000000000000" } ], "StreamARN": "arn:aws:kinesis:us-east-1:123456789012:stream/PUT-Firehose-aEhWz", "StreamCreationTimestamp": 1661612556, "StreamModeDetails": { "StreamMode": "ON_DEMAND" }, "StreamName": "PUT-Firehose-aEhWz", "StreamStatus": "ACTIVE" } }
-
PUT /streams/{stream-name}/record
method to invokePutRecord
in Kinesis$ MY_ID_TOKEN=$(aws cognito-idp initiate-auth --auth-flow USER_PASSWORD_AUTH --auth-parameters USERNAME="[email protected]",PASSWORD="user-password" --client-id your-user-pool-client-id | jq -r '.AuthenticationResult.IdToken') $ curl -X PUT https://your-api-gateway-id.execute-api.us-east-1.amazonaws.com/v1/streams/PUT-Firehose-aEhWz/record \ --header "Authorization: ${MY_ID_TOKEN}" \ -H 'Content-Type: application/json' \ -d '{ "Data": "some data", "PartitionKey": "some key" }'
The response is:
{ "EncryptionType": "KMS", "SequenceNumber": "49632757272385358984391127998703515973414866647712268290", "ShardId": "shardId-000000000000" }
-
PUT /streams/{stream-name}/records
method to invokePutRecords
in Kinesis$ MY_ID_TOKEN=$(aws cognito-idp initiate-auth --auth-flow USER_PASSWORD_AUTH --auth-parameters USERNAME="[email protected]",PASSWORD="user-password" --client-id your-user-pool-client-id | jq -r '.AuthenticationResult.IdToken') $ curl -X PUT https://your-api-gateway-id.execute-api.us-east-1.amazonaws.com/v1/streams/PUT-Firehose-aEhWz/records \ --header "Authorization: ${MY_ID_TOKEN}" \ -H 'Content-Type: application/json' \ -d '{"records":[{"data":"some data","partition-key":"some key"},{"data":"some other data","partition-key":"some key"}]}'
The response is:
{ "EncryptionType": "KMS", "FailedRecordCount": 0, "Records": [ { "SequenceNumber": "49632757272385358984391128069035193381135165118304223234", "ShardId": "shardId-000000000000" }, { "SequenceNumber": "49632757272385358984391128069037611232774394376653635586", "ShardId": "shardId-000000000000" } ] }
Delete the CloudFormation stack by running the below command.
(.venv) $ cdk destroy --force --all
cdk ls
list all stacks in the appcdk synth
emits the synthesized CloudFormation templatecdk deploy
deploy this stack to your default AWS account/regioncdk diff
compare deployed stack with current statecdk docs
open CDK documentation
Enjoy!
- Tutorial: Create a REST API as an Amazon Kinesis proxy in API Gateway
- Streaming Data Solution for Amazon Kinesis
- Serverless Patterns Collection
- aws-samples/serverless-patterns
- Building fine-grained authorization using Amazon Cognito, API Gateway, and IAM
- Curl Cookbook
{ "message": "Missing Authentication Token" }