Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Decouple concurrency and message batch size in SQS listener #379

Closed
ngaya-ll opened this issue Oct 4, 2018 · 6 comments · Fixed by awspring/spring-cloud-aws#40 · May be fixed by #380
Closed

Decouple concurrency and message batch size in SQS listener #379

ngaya-ll opened this issue Oct 4, 2018 · 6 comments · Fixed by awspring/spring-cloud-aws#40 · May be fixed by #380
Labels
component: sqs SQS integration related issue type: enhancement A general enhancement

Comments

@ngaya-ll
Copy link

ngaya-ll commented Oct 4, 2018

Enhancement

The SimpleMessageListenerContainer has a basic concurrency model that works as follows. While the queue is running:

  • Request maxNumberOfMessages messages from SQS
  • Submit all messages in the batch to a thread pool to be handled. If using the default thread pool all messages will be handled in parallel with no queuing.
  • Block until all messages in the batch have completed handling.

This approach is simple but has the disadvantage of coupling message processing concurrency and message batch size. There is no easy way to request multiple messages at once but only process one at a time. (This can potentially be achieved by configuring a custom task executor, but that doesn't work well with more than one queue because all queues share the same executor.) Likewise, there is no way to handle more than 10 messages at a time (the maximum SQS batch size).

As an enhancement, I would like to request support for the following use cases on a per-queue basis:

  • maxConcurrency < maxNumberOfMessages: Request n messages at a time but limit concurrent processing to m < n.
  • maxConcurrency >= maxNumberOfMessages: Request n messages at a time and allow concurrent processing of up to m >= n total messages.
@ngaya-ll
Copy link
Author

ngaya-ll commented Oct 4, 2018

Somewhat related: #166

@ngaya-ll
Copy link
Author

ngaya-ll commented Oct 8, 2018

I submitted a PR to support a per-queue maxConcurrency field. However I'm wondering if this is still too inflexible. Perhaps a better approach would be to abstract the polling logic into a separate interface with a default implementation that could be overridden by users of the framework.

@ghost
Copy link

ghost commented Nov 9, 2018

Hi! This PR is really very good. Thanks. I'm just waiting to have it in my project - waiting for release 🔢 .

I would like to give some suggestions based on my personal feeling when working with spring-cloud-aws, especially with SQS. Indeed I miss the maxConcurrency feature and it will be great to have it ASAP.
Beside that I think (and maybe I'm wrong) that also it would be great to have the following features as well:

  1. Can we have also an additional attribute in SqsListener like maxNumberOfMessages?
    For example, if you have 2 queues, one 'my-queue-1' of type = standard queue and 'my-queue-2' of type FIFO queue. You would like to grab as many messages as possible from 'my-queue-1', and will want to preserve order during message processing on 'my-queue-2' - this means you will not use batching (one message at a time).
    Proposal: SqsListener#maxNumberOfMessages may be optional and when specified it will override
    SimpleMessageListenerContainer#setMaxNumberOfMessages().
    This will permit to configure the number of messages in batching for each queue. In example above, for 'my-queue-1' it ca be 10, but for 'my-queue-2' it can be 1;

  2. Also it would be great to have a possibility to configure more than one consumer per queue. For example, if we have a FIFO queue with 50 groups, we can have 5 consumers that consume one message at a time. I know you will say that 'you can have one consumer that brings messages from SQS and passes them to another thread then continue to get next messages from SQS', still it would be nice to have such a feature - this will permit app scalability. Now we have only one consumer per queue , that brings messages sequentially (when using non-batching).
    Proposal: SqsListener#numberOfConsumers may be optional (default 1) and when specified with N value then N instances of AsynchronousMessageListener will be created. Or this attribute can have a value that will specify the core number of consumers and max consumers (like in JMS listener container), something like "2-10".

Sorry if this message is in wrong place.
Regards...

@spencergibb spencergibb added type: enhancement A general enhancement and removed status: waiting-for-triage An issue we've not yet triaged labels Jan 29, 2019
@maciejwalkowiak maciejwalkowiak added the component: sqs SQS integration related issue label May 29, 2020
@juanledesma84
Copy link

Hi @arseniir. I would like to ask you about your suggestion, because I have a situation that I have not been able to resolve, and I don't know if it is the same. I want to implement concurrency in the SqsListeners.

I read an AWS documentation about that, but it's a bit old, so I don't know if I need to change my implementation, or if there is a way to set the concurrency attributes in the config class.

Link: https://aws.amazon.com/blogs/developer/using-amazon-sqs-with-spring-boot-and-spring-jms/

The following is the implementation that I currently have, but I don't know how to set the concurrency attributes

Config class:

@Configuration
public class SqsConfig {

	@Bean
	public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSQSAsync) {
		return new QueueMessagingTemplate(amazonSQSAsync);
	}
	
	@Bean
	public AmazonSQS sqsClient() {
		return AmazonSQSClientBuilder.defaultClient();
	}
}

Consumer class:

@Service
public class Consumer implements ISqsStandardConsumer{

	@Override
	@SqsListener("SQS-concurrency-test")
	public void receiveStandardMessage(String message) {
		System.out.println(message);
	}
	
}

Do you know if there is already a way to do this?
Regards.

@maciejwalkowiak
Copy link
Contributor

@juanledesma84 linked article doesn't use Spring Cloud AWS but amazon-sqs-java-messaging-lib

maciejwalkowiak pushed a commit to awspring/spring-cloud-aws that referenced this issue Feb 8, 2021
For FIFO queues the AsynchronousMessageListener groups messages with same messageGroupId into so called MessageGroups. The MessageExecutor (renamed to MessageGroupExecutor) handles the messages within those groups sequentially.

Messages from non-FIFO queues are handled as before with the only difference that they are also wrapped in a MessageGroup. Each separate message belongs to its own MessageGroup.

Fixes spring-attic/spring-cloud-aws#387
Fixes spring-attic/spring-cloud-aws#379
Fixes spring-attic/spring-cloud-aws#530
Fixes spring-attic/spring-cloud-aws#756
maciejwalkowiak added a commit to awspring/spring-cloud-aws that referenced this issue Feb 8, 2021
For FIFO queues the AsynchronousMessageListener groups messages with same messageGroupId into so called MessageGroups. The MessageExecutor (renamed to MessageGroupExecutor) handles the messages within those groups sequentially.

Messages from non-FIFO queues are handled as before with the only difference that they are also wrapped in a MessageGroup. Each separate message belongs to its own MessageGroup.

Fixes spring-attic/spring-cloud-aws#387
Fixes spring-attic/spring-cloud-aws#379
Fixes spring-attic/spring-cloud-aws#530
Fixes spring-attic/spring-cloud-aws#756
Closes spring-attic/spring-cloud-aws#746

Co-authored-by: Tristan Baumbusch <[email protected]>
@nickcaballero
Copy link

@maciejwalkowiak Doesn't really seem like awspring/spring-cloud-aws#40 addresses this. Seems like awspring/spring-cloud-aws#23 is probably the continuation of this?

juho9000 pushed a commit to juho9000/spring-cloud-aws that referenced this issue Apr 29, 2021
For FIFO queues the AsynchronousMessageListener groups messages with same messageGroupId into so called MessageGroups. The MessageExecutor (renamed to MessageGroupExecutor) handles the messages within those groups sequentially.

Messages from non-FIFO queues are handled as before with the only difference that they are also wrapped in a MessageGroup. Each separate message belongs to its own MessageGroup.

Fixes spring-attic/spring-cloud-aws#387
Fixes spring-attic/spring-cloud-aws#379
Fixes spring-attic/spring-cloud-aws#530
Fixes spring-attic/spring-cloud-aws#756
Closes spring-attic/spring-cloud-aws#746

Co-authored-by: Tristan Baumbusch <[email protected]>
juho9000 pushed a commit to juho9000/spring-cloud-aws that referenced this issue Apr 29, 2021
For FIFO queues the AsynchronousMessageListener groups messages with same messageGroupId into so called MessageGroups. The MessageExecutor (renamed to MessageGroupExecutor) handles the messages within those groups sequentially.

Messages from non-FIFO queues are handled as before with the only difference that they are also wrapped in a MessageGroup. Each separate message belongs to its own MessageGroup.

Fixes spring-attic/spring-cloud-aws#387
Fixes spring-attic/spring-cloud-aws#379
Fixes spring-attic/spring-cloud-aws#530
Fixes spring-attic/spring-cloud-aws#756
Closes spring-attic/spring-cloud-aws#746

Co-authored-by: Tristan Baumbusch <[email protected]>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
component: sqs SQS integration related issue type: enhancement A general enhancement
6 participants