Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why does the StreamListener consume only one record? #3078

Closed
Danden1 opened this issue Dec 17, 2024 · 2 comments
Closed

Why does the StreamListener consume only one record? #3078

Danden1 opened this issue Dec 17, 2024 · 2 comments
Labels
for: stackoverflow A question that's better suited to stackoverflow.com

Comments

@Danden1
Copy link

Danden1 commented Dec 17, 2024

I discovered that there is an option called batch size.

StreamMessageListenerContainerOptions<String, ObjectRecord<String, Long>> containerOptions = StreamMessageListenerContainerOptions
				.builder().batchSize(3).pollTimeout(Duration.ofMillis(100)).targetType(Long.class).build();

So, I expected that using this option would fetch all the data at once.

However, onMessage method cannot take a list as a parameter.

class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

	@Override
        //here
	public void onMessage(MapRecord<String, String, String> message) {

		System.out.println("MessageId: " + message.getId());
		System.out.println("Stream: " + message.getStream());
		System.out.println("Body: " + message.getValue());
	}
}

It reads the data in batch size at once, but uses a for loop to deliver the data one by one.

class StreamPollTask<K, V extends Record<K, ?>> implements Task {
....
    private void deserializeAndEmitRecords(List<ByteRecord> records) {
    
        for (ByteRecord raw : records) {
    
	    try {
    
                pollState.updateReadOffset(raw.getId().getValue());
                V record = convertRecord(raw);
                listener.onMessage(record);
	    } catch (RuntimeException ex) {
    
                if (cancelSubscriptionOnError.test(ex)) {
    
	            cancel();
	            errorHandler.handleError(ex);
            
	            return;
                }
            
                errorHandler.handleError(ex);
            }
        }
    }

}

I'm curious why it doesn't provide the data as a list at once, but instead delivers it this way.

(KafkaListener allow list, https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html#batch-listeners)

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Dec 17, 2024
@mp911de
Copy link
Member

mp911de commented Jan 7, 2025

This is because we do not provide a batch listener abstraction and in the current approach, we sequentially handle each record regardless of whether batch reading is used. Feel free to submit a pull request that introduces a batch listener concept. StreamListener would need to be retrofitted in terms of its generics.

@mp911de mp911de closed this as not planned Won't fix, can't repro, duplicate, stale Jan 7, 2025
@mp911de mp911de added for: stackoverflow A question that's better suited to stackoverflow.com and removed status: waiting-for-triage An issue we've not yet triaged labels Jan 7, 2025
@Danden1
Copy link
Author

Danden1 commented Jan 18, 2025

Thank you for your response, I'll give it a try.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: stackoverflow A question that's better suited to stackoverflow.com
Projects
None yet
Development

No branches or pull requests

3 participants