diff --git a/integration/rabbitmq/e2e/serialization.e2e-spec.ts b/integration/rabbitmq/e2e/serialization.e2e-spec.ts index 1af262320..e163373eb 100644 --- a/integration/rabbitmq/e2e/serialization.e2e-spec.ts +++ b/integration/rabbitmq/e2e/serialization.e2e-spec.ts @@ -2,27 +2,56 @@ import { AmqpConnection, RabbitMQModule, RabbitSubscribe, + MessageSerializer, + MessageDeserializer, } from '@golevelup/nestjs-rabbitmq'; import { INestApplication, Injectable, LoggerService } from '@nestjs/common'; import { Test } from '@nestjs/testing'; import { createMock } from '@golevelup/ts-jest'; import { gzipSync, gunzipSync } from 'node:zlib'; -const testHandler = jest.fn(); +const moduleSerializeHandler = jest.fn(); +const handlerSerializeHandler = jest.fn(); + +const moduleSerializer: MessageSerializer = jest.fn((value) => + gzipSync(JSON.stringify(value)), +); +const moduleDeserializer: MessageDeserializer = jest.fn((message) => + JSON.parse(gunzipSync(message).toString()), +); + +const handlerDeserializer: MessageDeserializer = jest.fn((message) => + JSON.parse(gunzipSync(message).toString()), +); const exchange = 'testSerializationExchange'; const routingKey1 = 'testSerializationRoute1'; const routingKey2 = 'testSerializationRoute2'; +const routingKey3 = 'testSerializationRoute3'; +const routingKey4 = 'testSerializationRoute4'; @Injectable() -class SubscribeService { +class SubscribeModuleSerializationService { @RabbitSubscribe({ exchange, routingKey: [routingKey1, routingKey2], - queue: 'serializeQueue', + queue: 'moduleSerializeQueue', + }) + handleSubscribe(message: object) { + moduleSerializeHandler(message); + } +} + +@Injectable() +class SubscribeHandlerSerializationService { + @RabbitSubscribe({ + exchange, + routingKey: [routingKey3, routingKey4], + queue: 'handlerSerializeQueue', + deserializer: handlerDeserializer, }) handleSubscribe(message: object) { - testHandler(message); + handlerSerializeHandler(message); } } @@ -39,9 +68,14 @@ describe('Rabbit Subscribe', () => { process.env.NODE_ENV === 'ci' ? process.env.RABBITMQ_PORT : '5672'; const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:${rabbitPort}`; - beforeAll(async () => { + beforeEach(async () => { + jest.clearAllMocks(); + const moduleFixture = await Test.createTestingModule({ - providers: [SubscribeService], + providers: [ + SubscribeModuleSerializationService, + SubscribeHandlerSerializationService, + ], imports: [ RabbitMQModule.forRoot(RabbitMQModule, { exchanges: [ @@ -53,12 +87,8 @@ describe('Rabbit Subscribe', () => { uri, connectionInitOptions: { wait: true, reject: true, timeout: 3000 }, logger: customLogger, - serializer: (value) => { - return gzipSync(JSON.stringify(value)); - }, - deserializer: (message) => { - return JSON.parse(gunzipSync(message).toString()); - }, + serializer: moduleSerializer, + deserializer: moduleDeserializer, }), ], }).compile(); @@ -72,19 +102,33 @@ describe('Rabbit Subscribe', () => { await app?.close(); }); - beforeEach(() => { - jest.resetAllMocks(); + it('should receive subscribe messages and serialize/deserialize them via module options', async () => { + [routingKey1, routingKey2].forEach((x, i) => + amqpConnection.publish(exchange, x, `testMessage-${i}`), + ); + + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(moduleSerializeHandler).toHaveBeenCalledTimes(2); + expect(moduleSerializeHandler).toHaveBeenCalledWith(`testMessage-0`); + expect(moduleSerializeHandler).toHaveBeenCalledWith(`testMessage-1`); + expect(moduleDeserializer).toHaveBeenCalledTimes(2); + expect(moduleSerializer).toHaveBeenCalledTimes(2); + expect(handlerDeserializer).not.toBeCalled(); }); - it('should receive subscribe messages and handle them', async () => { - [routingKey1, routingKey2].forEach((x, i) => + it('should receive subscribe messages and deserialize via handler options', async () => { + [routingKey3, routingKey4].forEach((x, i) => amqpConnection.publish(exchange, x, `testMessage-${i}`), ); await new Promise((resolve) => setTimeout(resolve, 50)); - expect(testHandler).toHaveBeenCalledTimes(2); - expect(testHandler).toHaveBeenCalledWith(`testMessage-0`); - expect(testHandler).toHaveBeenCalledWith(`testMessage-1`); + expect(handlerSerializeHandler).toHaveBeenCalledTimes(2); + expect(handlerSerializeHandler).toHaveBeenCalledWith(`testMessage-0`); + expect(handlerSerializeHandler).toHaveBeenCalledWith(`testMessage-1`); + expect(handlerDeserializer).toHaveBeenCalledTimes(2); + expect(moduleDeserializer).not.toBeCalled(); + expect(moduleSerializer).toHaveBeenCalledTimes(2); }); }); diff --git a/packages/rabbitmq/src/amqp/connection.ts b/packages/rabbitmq/src/amqp/connection.ts index bc8258ce5..374fbfb93 100644 --- a/packages/rabbitmq/src/amqp/connection.ts +++ b/packages/rabbitmq/src/amqp/connection.ts @@ -29,6 +29,8 @@ import { RequestOptions, RabbitMQChannelConfig, ConsumeOptions, + MessageDeserializer, + MessageSerializer, } from '../rabbitmq.interfaces'; import { getHandlerForLegacyBehavior, @@ -471,11 +473,10 @@ export class AmqpConnection { throw new Error('Received null message'); } - const response = await this.handleMessage( - handler, - msg, - msgOptions.allowNonJsonMessages - ); + const response = await this.handleMessage(handler, msg, { + allowNonJsonMessages: msgOptions.allowNonJsonMessages, + deserializer: msgOptions.deserializer, + }); if (response instanceof Nack) { channel.nack(msg, false, response.requeue); @@ -578,11 +579,10 @@ export class AmqpConnection { return; } - const response = await this.handleMessage( - handler, - msg, - rpcOptions.allowNonJsonMessages - ); + const response = await this.handleMessage(handler, msg, { + allowNonJsonMessages: rpcOptions.allowNonJsonMessages, + deserializer: rpcOptions.deserializer, + }); if (response instanceof Nack) { channel.nack(msg, false, response.requeue); @@ -657,21 +657,25 @@ export class AmqpConnection { headers?: any ) => Promise, msg: ConsumeMessage, - allowNonJsonMessages?: boolean + options: { + allowNonJsonMessages?: boolean; + deserializer?: MessageDeserializer; + } ) { let message: T | undefined = undefined; let headers: any = undefined; + const deserializer = options.deserializer || this.config.deserializer; if (msg.content) { - if (allowNonJsonMessages) { + if (options.allowNonJsonMessages) { try { - message = this.config.deserializer(msg.content, msg) as T; + message = deserializer(msg.content, msg) as T; } catch { // Pass raw message since flag `allowNonJsonMessages` is set // Casting to `any` first as T doesn't have a type message = msg.content.toString() as any as T; } } else { - message = this.config.deserializer(msg.content, msg) as T; + message = deserializer(msg.content, msg) as T; } } diff --git a/packages/rabbitmq/src/rabbitmq.interfaces.ts b/packages/rabbitmq/src/rabbitmq.interfaces.ts index 084d688cd..2352f7b7a 100644 --- a/packages/rabbitmq/src/rabbitmq.interfaces.ts +++ b/packages/rabbitmq/src/rabbitmq.interfaces.ts @@ -75,6 +75,9 @@ export interface QueueOptions { consumerOptions?: ConsumeOptions; } +export type MessageDeserializer = (message: Buffer, msg: ConsumeMessage) => any; +export type MessageSerializer = (value: any) => Buffer; + export interface MessageHandlerOptions { /** * You can use a handler config specificied in module level. @@ -107,6 +110,12 @@ export interface MessageHandlerOptions { * @default false - By default, responses are not persistent unless this is set to true. */ usePersistentReplyTo?: boolean; + + /** + * This function is used to deserialize the received message. + * If set, will override the module's default deserializer. + */ + deserializer?: MessageDeserializer; } export interface ConnectionInitOptions { @@ -160,12 +169,12 @@ export interface RabbitMQConfig { /** * This function is used to deserialize the received message. */ - deserializer?: (message: Buffer, msg: ConsumeMessage) => any; + deserializer?: MessageDeserializer; /** * This function is used to serialize the message to be sent. */ - serializer?: (value: any) => Buffer; + serializer?: MessageSerializer; } export type RabbitHandlerType = 'rpc' | 'subscribe';