page_type | languages | products | description | urlFragment | ||
---|---|---|---|---|---|---|
sample |
|
|
Azure Spring Cloud Stream Binder Sample project for Event Hub client library |
azure-spring-cloud-sample-eventhubs-binder |
This code sample demonstrates how to use the Spring Cloud Stream Binder for Azure Event Hub.The sample app has two operating modes. One way is to expose a Restful API to receive string message, another way is to automatically provide string messages. These messages are published to an event hub. The sample will also consume messages from the same event hub.
Running this sample will be charged by Azure. You can check the usage and bill at this link.
We have several ways to config the Spring Cloud Stream Binder for Azure Event Hub. You can choose anyone of them.
Important
When using the Restful API to send messages, the Active profiles must contain manual
.
-
Create Azure Event Hubs. Please note
Basic
tier is unsupported. After creating the Azure Event Hub, you can create your own Consumer Group or use the default "$Default" Consumer Group. -
Create Azure Storage for checkpoint use.
-
Update application.yaml.
spring: cloud: azure: eventhub: # Fill event hub namespace connection string copied from portal connection-string: [eventhub-namespace-connection-string] # Fill checkpoint storage account name, access key and container checkpoint-storage-account: [checkpoint-storage-account] checkpoint-access-key: [checkpoint-access-key] checkpoint-container: [checkpoint-container] stream: function: definition: consume;supply bindings: consume-in-0: destination: [eventhub-name] group: [consumer-group] supply-out-0: destination: [the-same-eventhub-name-as-above]
-
Create a service principal for use in by your app. Please follow create service principal from Azure CLI.
-
Create Azure Event Hubs. Please note
Basic
tier is unsupported. After creating the Azure Event Hub, you can create your own Consumer Group or use the default "$Default" Consumer Group. -
Create Azure Storage for checkpoint use.
-
Add Role Assignment for Event Hub, Storage Account and Resource group. See Service principal for Azure resources with Event Hubs to add role assignment for Event Hub, Storage Account, Resource group is similar.
- Resource group: assign
Contributor
role for service principal. - Event Hub: assign
Contributor
role for service principal. - Storage Account: assign
Storage Account Key Operator Service Role
role for service principal.
- Resource group: assign
-
Update application-sp.yaml.
spring: cloud: azure: client-id: [service-principal-id] client-secret: [service-principal-secret] tenant-id: [tenant-id] resource-group: [resource-group] eventhub: namespace: [eventhub-namespace] checkpoint-storage-account: [checkpoint-storage-account] checkpoint-container: [checkpoint-container] stream: function: definition: consume;supply bindings: consume-in-0: destination: [eventhub-name] group: [consumer-group] supply-out-0: destination: [the-same-eventhub-name-as-above]
We should specify
spring.profiles.active=sp
to run the Spring Boot application.
Please follow create managed identity to set up managed identity.
-
Create Azure Event Hubs. Please note
Basic
tier is unsupported. After creating the Azure Event Hub, you can create your own Consumer Group or use the default "$Default" Consumer Group. -
Create Azure Storage for checkpoint use.
-
Add Role Assignment for Event Hub and Storage Account. See Managed identities for Azure resources with Event Hubs to add role assignment for Event Hub, Storage Account is similar.
- Event Hub: assign
Contributor
role for managed identity. - Storage Account: assign
Storage Account Key Operator Service Role
role for managed identity.
- Event Hub: assign
- Update application-mi.yaml
spring: cloud: azure: msi-enabled: true client-id: [the-id-of-managed-identity] resource-group: [resource-group] # Fill subscription ID copied from portal subscription-id: [subscription-id] eventhub: namespace: [eventhub-namespace] checkpoint-storage-account: [checkpoint-storage-account] checkpoint-container: [checkpoint-container] stream: function: definition: consume;supply bindings: consume-in-0: destination: [eventhub-name] group: [consumer-group] supply-out-0: destination: [the-same-eventhub-name-as-above]
We should specify
spring.profiles.active=mi
to run the Spring Boot application. For App Service, please add a configuration entry for this.
If you update the spring.cloud.azure.managed-identity.client-id
property after deploying the app, or update the role assignment for
services, please try to redeploy the app again.
You can follow Deploy a Spring Boot JAR file to Azure App Service to deploy this application to App Service
If you want to auto create the Azure Event Hub and Azure Storage account instances, make sure you add such properties (only support the service principal and managed identity cases):
spring:
cloud:
azure:
subscription-id: [subscription-id]
auto-create-resources: true
environment: Azure
region: [region]
To enable message sending in a synchronized way with Spring Cloud Stream 3.x, azure-spring-cloud-stream-binder-eventhubs supports the sync producer mode to get responses for sent messages. By enabling following configuration, you could use StreamBridge for the synchronized message producing.
spring:
cloud:
stream:
eventhub:
bindings:
supply-out-0:
producer:
sync: true
To enable batch consuming feature, you should add below configuration in the batch
profile.
spring:
cloud:
stream:
bindings:
consume-in-0:
destination: [eventhub-name]
group: [consumer-group]
consumer:
batch-mode: true
eventhub:
bindings:
consume-in-0:
consumer:
checkpoint-mode: BATCH # or MANUAL as needed
max-batch-size: [max-batch-size] # The default valueis 10
max-wait-time: [max-wait-time] # Optional, the default value is null
For checkpointing mode as BATCH, you can use below code to send messages and consume in batches, see the BatchConsumerConfiguration.java
@Bean
public Consumer<List<String>> consume() {
return list -> list.forEach(event -> LOGGER.info("New event received: '{}'",event));
}
@Bean
public Supplier<Message<String>> supply() {
return () -> {
LOGGER.info("Sending message, sequence " + i);
return MessageBuilder.withPayload("\"Hello world"+ i++ +"\"").build();
};
}
For checkpointing mode as MANUAL, you can use below code to send messages and consume/checkpoint in batches.
@Bean
public Consumer<Message<List<String>>> consume() {
return message -> {
for (int i = 0; i < message.getPayload().size(); i++) {
LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
message.getPayload().get(i),
((List<Object>) message.getHeaders().get(EventHubHeaders.PARTITION_KEY)).get(i),
((List<Object>) message.getHeaders().get(EventHubHeaders.SEQUENCE_NUMBER)).get(i),
((List<Object>) message.getHeaders().get(EventHubHeaders.OFFSET)).get(i),
((List<Object>) message.getHeaders().get(EventHubHeaders.ENQUEUED_TIME)).get(i));
}
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
checkpointer.success()
.doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
.doOnError(error -> LOGGER.error("Exception found", error))
.subscribe();
};
}
@Bean
public Supplier<Message<String>> supply() {
return () -> {
LOGGER.info("Sending message, sequence " + i);
return MessageBuilder.withPayload("\"Hello world"+ i++ +"\"").build();
};
}
-
Run the
mvn spring-boot:run
in the root of the code sample to get the app running. -
Send a POST request
$ ### Send messages through imperative. $ curl -X POST http://localhost:8080/messages/imperative/staticalDestination?message=hello $ curl -X POST http://localhost:8080/messages/imperative/dynamicDestination?message=hello $ ### Send messages through reactive. $ curl -X POST http://localhost:8080/messages/reactive?message=hello
or when the app runs on App Service or VM
$ ### Send messages through imperative. $ curl -d -X POST https://[your-app-URL]/messages/imperative/staticalDestination?message=hello $ curl -d -X POST https://[your-app-URL]/messages/imperative/dynamicDestination?message=hello $ ### Send messages through reactive. $ curl -d -X POST https://[your-app-URL]/messages/reactive?message=hello
-
Verify in your app’s logs that a similar message was posted:
New message received: 'hello', partition key: 2002572479, sequence number: 4, offset: 768, enqueued time: 2021-06-03T01:47:36.859Z Message 'hello' successfully checkpointed
-
Delete the resources on Azure Portal to avoid unexpected charges.