Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rabbitmq): adds a message batching mechanism for RabbitMQ handlers #781

Merged
merged 20 commits into from
Sep 24, 2024

Conversation

ckfngod
Copy link
Contributor

@ckfngod ckfngod commented Sep 10, 2024

Message Batching

This implements consumer-side batching (like Spring). It works by accumulating messages until either it hits the batch size limit or the batch timer expires at which time the handler will be presented with the messages as a single array which gets acked or nacked together. A batch error handler may be provided for users that require their error handling logic to be aware of the failed batch as a whole.

This is implemented as a new optional message handler options property batchOptions which contains:

  • size representing the maximum batch length before returning the batch
  • timeout representing the maximum length of time allowed between messages before returning a batch and
  • errorHandler a custom error handling implementation that receives message batches

At a high-level the batching mechanism works as follows:

  1. Initial message is received:
    a. Batch timer is started with message handling logic as the callback
    b. Store above callback in a separate variable inflightBatchHandler
  2. Subsequent message is received, check if the batch size has been reached:
    a. If it has, clear the batch timer and immediately call inflightBatchHandler with the batch
    b. Otherwise, refresh the batch timer
  3. No message received for batch timeout duration and timer has not been cleared:
    a. inflightBatchHandler is called by the timer with a partial batch

Copy link
Contributor

@underfisk underfisk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do like this idea of batching, left some notes but also I'll request test coverage as we need to be certain that this works as expected and is backwards compatible

packages/rabbitmq/src/amqp/connection.ts Outdated Show resolved Hide resolved
@ckfngod
Copy link
Contributor Author

ckfngod commented Sep 11, 2024

@underfisk Thanks for taking a look, will do on the logging and tests. Are you able to address some of the other questions that I had?

@underfisk
Copy link
Contributor

underfisk commented Sep 11, 2024

Thoughts on the approach?
What is the preferred way to model this? As a different decorator (something like @BatchRabbitSubscribe) or as part of the existing decorator with some interface changes?
Is there a preference on error handling behaviour for batches? Should batches be treated as a single unit or individual messages for that purpose?
I’m not familiar with RabbitMQ RPC but is this something that could be useful there as well? Or is it best to keep this separate as subscriber-only behaviour?

I haven't been using this for a while but I'm familiar with the concept. Spring has a really good implementation and I do believe we'll just benefit.
The API can be improved along usage as I'm not sure how many want/will use batching but even so, the implementation behind doesn't see that hard and we can include that.

  1. If a batching process fails, it will be treated as a single failure but ideally should contain in-depth information or some logical exception that makes it easier to debug and trace. I often see batching/transactions as all or nothing unless there's a mode to ignore failures (in case we don't care) and keep going on.
  2. Leave the RPC to the next phase, initially this seems just fine what you're bringing to the table (I do appreciate it a lot since I don't have that much time to be an active IC). If you want to collect wider feedback, you can open a discussion in the GH discussions tab and present some follow-up ideas
  3. Regarding the decorator, let's add a separate decorator to streamline this new feature. Having a special token for batched subscribers is better than reading if batchOptions exist or not. Introducing another optional property is just going to make it more confusing IMO, i'd rather have them separate as they serve different use cases than mixed together

I tried to answer your questions @ckfngod but the "real answer" may come from real use cases or anyone interested in this feature. In theory there is a common ground for designing new API's and I don't think this proposal is far from a good implementation, it is a good starting point for sure

@ckfngod
Copy link
Contributor Author

ckfngod commented Sep 12, 2024

@underfisk Really appreciate the in-depth answers, that's very helpful.

  1. Agreed, I think it makes alot of sense conceptually to keep it as a single unit in general. Also makes it a little easier to not have to worry about individual message IDs.
  2. Sounds good, I'll leave RPC alone for now.
  3. Makes sense to me. I was leaning in this direction as well because of type complications with errorHandler in the external interface.

I'll go ahead with the proposed changes. Let me know if there's any other changes you'd like to see!

I tried to answer your questions @ckfngod but the "real answer" may come from real use cases or anyone interested in this feature.

Totally agree with this sentiment. For what it's worth, I'm working on this because we have a use case for it and this solution will work fine for us 🙂

@ckfngod
Copy link
Contributor Author

ckfngod commented Sep 18, 2024

Regarding the decorator, let's add a separate decorator to streamline this new feature. Having a special token for batched subscribers is better than reading if batchOptions exist or not. Introducing another optional property is just going to make it more confusing IMO, i'd rather have them separate as they serve different use cases than mixed together

@underfisk Spent some time thinking about this and this might not work as cleanly as we thought. The types get pretty hairy and will impact external interfaces (would need a discriminated union on RabbitHandlerConfig for example) which may be a little risky. It also gets a little ambiguous when considering handler configs defined at the module level because of how configs are merged (if I use @RabbitSubscribe but then later define batchOptions at the module-level, is it batched or not? Do we impose some restriction at runtime based on the handler type?). The ability to enable batching at the module-level would be a very nice to have for us.

What I like about the current solution is that the types are simple (only one handler options interface, no complex discriminated unions), it’ll work for both decorator and module-level configs unambiguously (if batchOptions is present, we batch) and I actually like how it’s built into the existing decorator as it’ll be very easy for interested users to enable the behaviour.

Thoughts?

@underfisk
Copy link
Contributor

Regarding the decorator, let's add a separate decorator to streamline this new feature. Having a special token for batched subscribers is better than reading if batchOptions exist or not. Introducing another optional property is just going to make it more confusing IMO, i'd rather have them separate as they serve different use cases than mixed together

@underfisk Spent some time thinking about this and this might not work as cleanly as we thought. The types get pretty hairy and will impact external interfaces (would need a discriminated union on RabbitHandlerConfig for example) which may be a little risky. It also gets a little ambiguous when considering handler configs defined at the module level because of how configs are merged (if I use @RabbitSubscribe but then later define batchOptions at the module-level, is it batched or not? Do we impose some restriction at runtime based on the handler type?). The ability to enable batching at the module-level would be a very nice to have for us.

What I like about the current solution is that the types are simple (only one handler options interface, no complex discriminated unions), it’ll work for both decorator and module-level configs unambiguously (if batchOptions is present, we batch) and I actually like how it’s built into the existing decorator as it’ll be very easy for interested users to enable the behaviour.

Thoughts?

Sounds fair to me. You're presenting the idea with an use case in mind, I think we can refine it later if people start adhering to it

@underfisk
Copy link
Contributor

@ckfngod Before we get this in, we'll need to polish the PR description as there are open questions possibly resolved and we're going to present a new feature. If you're okay later on to improve the documentation (md files) with some examples, that would be awesome

@ckfngod
Copy link
Contributor Author

ckfngod commented Sep 20, 2024

@underfisk Added tests, documentation and updated PR description with more detail / summaries of our discussion. Let me know if there are any other tasks or outstanding issues you'd like to see resolved!

@ckfngod ckfngod marked this pull request as ready for review September 20, 2024 02:05
@ckfngod ckfngod requested a review from underfisk September 20, 2024 23:17
packages/rabbitmq/src/amqp/connection.ts Outdated Show resolved Hide resolved
packages/rabbitmq/src/amqp/connection.ts Outdated Show resolved Hide resolved
packages/rabbitmq/src/amqp/connection.ts Outdated Show resolved Hide resolved
packages/rabbitmq/src/amqp/connection.ts Show resolved Hide resolved
integration/rabbitmq/e2e/subscribe.e2e-spec.ts Outdated Show resolved Hide resolved
packages/rabbitmq/src/rabbitmq.interfaces.ts Show resolved Hide resolved
packages/rabbitmq/src/rabbitmq.interfaces.ts Show resolved Hide resolved
packages/rabbitmq/src/amqp/errorBehaviors.ts Outdated Show resolved Hide resolved
@ckfngod ckfngod requested a review from underfisk September 23, 2024 18:45
error: any
) => {
if (error.code == PRECONDITION_FAILED_CODE) {
//406 == preconditions failed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can now remove this comment since the code is extracted and easy to read

@underfisk
Copy link
Contributor

@ckfngod Thank you for addressing the PR feedback 🙏
Once all tests pass I'll have it merged

@underfisk underfisk enabled auto-merge (squash) September 23, 2024 21:05
auto-merge was automatically disabled September 23, 2024 23:49

Head branch was pushed to by a user without write access

@ckfngod
Copy link
Contributor Author

ckfngod commented Sep 23, 2024

@underfisk Cool, appreciate it!

@ckfngod
Copy link
Contributor Author

ckfngod commented Sep 24, 2024

@underfisk pushed a change that should fix my flaky tests. was working locally but looks like there's a timing issue in ci pipeline. see 503c827

@underfisk underfisk enabled auto-merge (squash) September 24, 2024 18:39
@underfisk underfisk merged commit ce44d4d into golevelup:master Sep 24, 2024
3 checks passed
@david-pivonka
Copy link

It looks like the logic for this doesn't work with the default RabbitMQ that this module creates.

This is the RabbitMQ module we use:

@Module({
  imports: [
    HttpModule,
    RabbitMQModule.forRoot({
      uri: ConfigurationService.getString('RABBITMQ_URI'),
      exchanges: [
        {
          name: 'amq.direct',
          type: 'direct',
        },
      ],
      connectionInitOptions: {
        wait: false,
      },
    }),
  ],
  providers: [PublisherService, QueueService],
  exports: [PublisherService, QueueService],
})
export class QueueModule {
}

This is the batch consumer:

import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { Injectable } from '@nestjs/common';

@Injectable()
export class TestConsumer {
  @RabbitSubscribe({
    exchange: 'amq.direct',
    routingKey: 'batch-route',
    queue: 'batch-queue',
    batchOptions: {
      size: 156,
    },
  })
  batchErrorSubscriber(messages) {
    console.log(`Received ${messages.length} messages`);
  }
}

Even though I specified the batch size to be 156, I still receive a maximum of 100 messages. I can see that the queue contains more than 10 messages. This number (10) is directly affected by the default prefetchCount in this file:
IntelliJ IDEA 2025-02-06 13 55 49

Can anyone of you please help me resolve this issue? It all seems that the issue is related to the channel that is created, which limits it to 10.

@underfisk
Copy link
Contributor

@david-pivonka If we were to remove the prefetchCount default value, would that help your use-case?

I think it was added as part of a feature implementation but I'm totally fine by removing the default and letting the 1/2 user(s) that requested/implemented this know in a breaking change that we no longer inject it by default.

@david-pivonka
Copy link

@underfisk Hey, thanks for the comment. I've just set the prefetchCount to undefined, and everything is working as expected. So yes, I would appreciate it if you could remove the default value as it seems like breaking for the batch logic.

Let me know if you want me to create a PR for it.

This is the working code:

import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';
import { HttpModule } from '@nestjs/axios';
import { Module } from '@nestjs/common';
import ConfigurationService from '@rennie/configuration/service/configuration.service';
import { QueueService } from '@rennie/queue/service/queue.service';
import { PublisherService } from './service/publisher.service';

@Module({
  imports: [
    HttpModule,
    RabbitMQModule.forRoot({
      uri: ConfigurationService.getString('RABBITMQ_URI'),
      exchanges: [
        {
          name: 'amq.direct',
          type: 'direct',
        },
      ],
      prefetchCount: undefined,
      connectionInitOptions: {
        wait: false,
      },
    }),
  ],
  providers: [PublisherService, QueueService],
  exports: [PublisherService, QueueService],
})
export class QueueModule {
}

@underfisk
Copy link
Contributor

@underfisk Hey, thanks for the comment. I've just set the prefetchCount to undefined, and everything is working as expected. So yes, I would appreciate it if you could remove the default value as it seems like breaking for the batch logic.

Let me know if you want me to create a PR for it.

This is the working code:


import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq';

import { HttpModule } from '@nestjs/axios';

import { Module } from '@nestjs/common';

import ConfigurationService from '@rennie/configuration/service/configuration.service';

import { QueueService } from '@rennie/queue/service/queue.service';

import { PublisherService } from './service/publisher.service';



@Module({

  imports: [

    HttpModule,

    RabbitMQModule.forRoot({

      uri: ConfigurationService.getString('RABBITMQ_URI'),

      exchanges: [

        {

          name: 'amq.direct',

          type: 'direct',

        },

      ],

      prefetchCount: undefined,

      connectionInitOptions: {

        wait: false,

      },

    }),

  ],

  providers: [PublisherService, QueueService],

  exports: [PublisherService, QueueService],

})

export class QueueModule {

}

Feel free to create the PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants