This is a MSK Serverless project for CDK development with Python.
The cdk.json
file tells the CDK Toolkit how to execute your app.
This project is set up like a standard Python project. The initialization
process also creates a virtualenv within this project, stored under the .venv
directory. To create the virtualenv it assumes that there is a python3
(or python
for Windows) executable in your path with access to the venv
package. If for any reason the automatic creation of the virtualenv fails,
you can create the virtualenv manually.
To manually create a virtualenv on MacOS and Linux:
$ python3 -m venv .venv
After the init process completes and the virtualenv is created, you can use the following step to activate your virtualenv.
$ source .venv/bin/activate
If you are a Windows platform, you would activate the virtualenv like this:
% .venv\Scripts\activate.bat
Once the virtualenv is activated, you can install the required dependencies.
$ pip install -r requirements.txt
At this point you can now synthesize the CloudFormation template for this code.
(.venv) $ cdk synth -c msk_cluster_name=your-msk-cluster-name \ --all
Use cdk deploy
command to create the stack shown above,
(.venv) $ cdk deploy -c msk_cluster_name=your-msk-cluster-name \ --all
To add additional dependencies, for example other CDK libraries, just add
them to your setup.py
file and rerun the pip install -r requirements.txt
command.
After MSK is succesfully created, you can now create topic, and produce and consume data on the topic in MSK as the following example.
-
Get cluster information
$ export MSK_SERVERLESS_CLUSTER_ARN=$(aws kafka list-clusters-v2 | jq -r '.ClusterInfoList[] | select(.ClusterName == "your-msk-cluster-name") | .ClusterArn') $ aws kafka describe-cluster-v2 --cluster-arn $MSK_SERVERLESS_CLUSTER_ARN { "ClusterInfo": { "ClusterType": "SERVERLESS", "ClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/your-msk-cluster-name/813876e5-2023-4882-88c4-58ad8599da5a-s2", "ClusterName": "your-msk-cluster-name", "CreationTime": "2022-12-16T02:26:31.369000+00:00", "CurrentVersion": "K2EUQ1WTGCTBG2", "State": "ACTIVE", "Tags": {}, "Serverless": { "VpcConfigs": [ { "SubnetIds": [ "subnet-0113628395a293b98", "subnet-090240f6a94a4b5aa", "subnet-036e818e577297ddc" ], "SecurityGroupIds": [ "sg-0bd8f5ce976b51769", "sg-0869c9987c033aaf1" ] } ], "ClientAuthentication": { "Sasl": { "Iam": { "Enabled": true } } } } } }
-
Get booststrap brokers
$ aws kafka get-bootstrap-brokers --cluster-arn $MSK_SERVERLESS_CLUSTER_ARN { "BootstrapBrokerStringSaslIam": "boot-deligu0c.c1.kafka-serverless.{region}.amazonaws.com:9098" }
-
Connect the MSK client EC2 Host.
You can connect to an EC2 instance using the EC2 Instance Connect CLI.
Installec2instanceconnectcli
python package and Use the mssh command with the instance ID as follows.$ sudo pip install ec2instanceconnectcli $ mssh ec2-user@i-001234a4bf70dec41EXAMPLE
-
Create an Apache Kafka topic After connect your EC2 Host, you use the client machine to create a topic on the serverless cluster. Run the following command to create a topic called
msk-serverless-tutorial
.[ec2-user@ip-172-31-0-180 ~]$ export PATH=$HOME/opt/kafka/bin:$PATH [ec2-user@ip-172-31-0-180 ~]$ export BS=your-msk-broker-endpoint [ec2-user@ip-172-31-0-180 ~]$ kafka-topics.sh --bootstrap-server $BS --command-config client.properties --create --topic msk-serverless-tutorial --partitions 6 --replication-factor 2
-
Produce and consume data
(1) To produce messages
Run the following command to start a console producer.
[ec2-user@ip-172-31-0-180 ~]$ kafka-console-producer.sh --broker-list $BS --producer.config client.properties --topic msk-serverless-tutorial
Enter any message that you want, and press Enter. Repeat this step two or three times. Every time you enter a line and press Enter, that line is sent to your cluster as a separate message.
(2) To consume messages
Keep the connection to the client machine open, and then open a second, separate connection to that machine in a new window.
[ec2-user@ip-172-31-0-180 ~]$ kafka-console-consumer.sh --bootstrap-server $BS --consumer.config client.properties --topic msk-serverless-tutorial --from-beginning
You start seeing the messages you entered earlier when you used the console producer command. Enter more messages in the producer window, and watch them appear in the consumer window.
Delete the CloudFormation stack by running the below command.
(.venv) $ cdk destroy --force --all
cdk ls
list all stacks in the appcdk synth
emits the synthesized CloudFormation templatecdk deploy
deploy this stack to your default AWS account/regioncdk diff
compare deployed stack with current statecdk docs
open CDK documentation
Enjoy!
- Getting started using MSK Serverless clusters
- Configuration for MSK Serverless clusters
- Actions, resources, and condition keys for Apache Kafka APIs for Amazon MSK clusters
- Analyze real-time streaming data in Amazon MSK with Amazon Athena (2022-12-15)
- Connect using the EC2 Instance Connect CLI
$ sudo pip install ec2instanceconnectcli $ mssh ec2-user@i-001234a4bf70dec41EXAMPLE # ec2-instance-id
- ec2instanceconnectcli: This Python CLI package handles publishing keys through EC2 Instance Connectand using them to connect to EC2 instances.
-
Set up
client.properties
$ cat client.properties security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
ℹ️
client.properties
is a property file containing configs to be passed to Admin Client. This is used only with--bootstrap-server
option for describing and altering broker configs.
For more information, see Getting started using MSK Serverless clusters - Step 3: Create a client machine -
Get Bootstrap server information
$ aws kafka get-bootstrap-brokers --cluster-arn msk_cluster_arn $ export BS={BootstrapBrokerStringSaslIam}
-
List Kafka toipics
$ kafka-topics.sh --bootstrap-server $BS \ --command-config client.properties \ --list
-
Create a Kafka toipic
$ kafka-topics.sh --bootstrap-server $BS \ --command-config client.properties \ --create \ --topic topic_name \ --partitions 3 \ --replication-factor 2
-
Consume records from a Kafka toipic
$ kafka-console-consumer.sh --bootstrap-server $BS \ --consumer.config client.properties \ --topic topic_name \ --from-beginning
-
Produce records into a Kafka toipic
$ kafka-console-producer.sh --bootstrap-server $BS \ --producer.config client.properties \ --topic topic_name