Skip to content

Commit

Permalink
feat(connection.ts): adds deserializer options to message handlers (c…
Browse files Browse the repository at this point in the history
…loses golevelup#704) (golevelup#754)

Allows passing a 'deserializer' function to message handler options that override the module's
deserialization function
  • Loading branch information
dafow authored and Christian Jeschke committed Aug 11, 2024
1 parent 61de8b4 commit 04660ba
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 35 deletions.
82 changes: 63 additions & 19 deletions integration/rabbitmq/e2e/serialization.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,56 @@ import {
AmqpConnection,
RabbitMQModule,
RabbitSubscribe,
MessageSerializer,
MessageDeserializer,
} from '@golevelup/nestjs-rabbitmq';
import { INestApplication, Injectable, LoggerService } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { createMock } from '@golevelup/ts-jest';
import { gzipSync, gunzipSync } from 'node:zlib';

const testHandler = jest.fn();
const moduleSerializeHandler = jest.fn();
const handlerSerializeHandler = jest.fn();

const moduleSerializer: MessageSerializer = jest.fn((value) =>
gzipSync(JSON.stringify(value)),
);
const moduleDeserializer: MessageDeserializer = jest.fn((message) =>
JSON.parse(gunzipSync(message).toString()),
);

const handlerDeserializer: MessageDeserializer = jest.fn((message) =>
JSON.parse(gunzipSync(message).toString()),
);

const exchange = 'testSerializationExchange';
const routingKey1 = 'testSerializationRoute1';
const routingKey2 = 'testSerializationRoute2';
const routingKey3 = 'testSerializationRoute3';
const routingKey4 = 'testSerializationRoute4';

@Injectable()
class SubscribeService {
class SubscribeModuleSerializationService {
@RabbitSubscribe({
exchange,
routingKey: [routingKey1, routingKey2],
queue: 'serializeQueue',
queue: 'moduleSerializeQueue',
})
handleSubscribe(message: object) {
moduleSerializeHandler(message);
}
}

@Injectable()
class SubscribeHandlerSerializationService {
@RabbitSubscribe({
exchange,
routingKey: [routingKey3, routingKey4],
queue: 'handlerSerializeQueue',
deserializer: handlerDeserializer,
})
handleSubscribe(message: object) {
testHandler(message);
handlerSerializeHandler(message);
}
}

Expand All @@ -39,9 +68,14 @@ describe('Rabbit Subscribe', () => {
process.env.NODE_ENV === 'ci' ? process.env.RABBITMQ_PORT : '5672';
const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:${rabbitPort}`;

beforeAll(async () => {
beforeEach(async () => {
jest.clearAllMocks();

const moduleFixture = await Test.createTestingModule({
providers: [SubscribeService],
providers: [
SubscribeModuleSerializationService,
SubscribeHandlerSerializationService,
],
imports: [
RabbitMQModule.forRoot(RabbitMQModule, {
exchanges: [
Expand All @@ -53,12 +87,8 @@ describe('Rabbit Subscribe', () => {
uri,
connectionInitOptions: { wait: true, reject: true, timeout: 3000 },
logger: customLogger,
serializer: (value) => {
return gzipSync(JSON.stringify(value));
},
deserializer: (message) => {
return JSON.parse(gunzipSync(message).toString());
},
serializer: moduleSerializer,
deserializer: moduleDeserializer,
}),
],
}).compile();
Expand All @@ -72,19 +102,33 @@ describe('Rabbit Subscribe', () => {
await app?.close();
});

beforeEach(() => {
jest.resetAllMocks();
it('should receive subscribe messages and serialize/deserialize them via module options', async () => {
[routingKey1, routingKey2].forEach((x, i) =>
amqpConnection.publish(exchange, x, `testMessage-${i}`),
);

await new Promise((resolve) => setTimeout(resolve, 50));

expect(moduleSerializeHandler).toHaveBeenCalledTimes(2);
expect(moduleSerializeHandler).toHaveBeenCalledWith(`testMessage-0`);
expect(moduleSerializeHandler).toHaveBeenCalledWith(`testMessage-1`);
expect(moduleDeserializer).toHaveBeenCalledTimes(2);
expect(moduleSerializer).toHaveBeenCalledTimes(2);
expect(handlerDeserializer).not.toBeCalled();
});

it('should receive subscribe messages and handle them', async () => {
[routingKey1, routingKey2].forEach((x, i) =>
it('should receive subscribe messages and deserialize via handler options', async () => {
[routingKey3, routingKey4].forEach((x, i) =>
amqpConnection.publish(exchange, x, `testMessage-${i}`),
);

await new Promise((resolve) => setTimeout(resolve, 50));

expect(testHandler).toHaveBeenCalledTimes(2);
expect(testHandler).toHaveBeenCalledWith(`testMessage-0`);
expect(testHandler).toHaveBeenCalledWith(`testMessage-1`);
expect(handlerSerializeHandler).toHaveBeenCalledTimes(2);
expect(handlerSerializeHandler).toHaveBeenCalledWith(`testMessage-0`);
expect(handlerSerializeHandler).toHaveBeenCalledWith(`testMessage-1`);
expect(handlerDeserializer).toHaveBeenCalledTimes(2);
expect(moduleDeserializer).not.toBeCalled();
expect(moduleSerializer).toHaveBeenCalledTimes(2);
});
});
32 changes: 18 additions & 14 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import {
RequestOptions,
RabbitMQChannelConfig,
ConsumeOptions,
MessageDeserializer,
MessageSerializer,
} from '../rabbitmq.interfaces';
import {
getHandlerForLegacyBehavior,
Expand Down Expand Up @@ -471,11 +473,10 @@ export class AmqpConnection {
throw new Error('Received null message');
}

const response = await this.handleMessage(
handler,
msg,
msgOptions.allowNonJsonMessages
);
const response = await this.handleMessage(handler, msg, {
allowNonJsonMessages: msgOptions.allowNonJsonMessages,
deserializer: msgOptions.deserializer,
});

if (response instanceof Nack) {
channel.nack(msg, false, response.requeue);
Expand Down Expand Up @@ -578,11 +579,10 @@ export class AmqpConnection {
return;
}

const response = await this.handleMessage(
handler,
msg,
rpcOptions.allowNonJsonMessages
);
const response = await this.handleMessage(handler, msg, {
allowNonJsonMessages: rpcOptions.allowNonJsonMessages,
deserializer: rpcOptions.deserializer,
});

if (response instanceof Nack) {
channel.nack(msg, false, response.requeue);
Expand Down Expand Up @@ -657,21 +657,25 @@ export class AmqpConnection {
headers?: any
) => Promise<U>,
msg: ConsumeMessage,
allowNonJsonMessages?: boolean
options: {
allowNonJsonMessages?: boolean;
deserializer?: MessageDeserializer;
}
) {
let message: T | undefined = undefined;
let headers: any = undefined;
const deserializer = options.deserializer || this.config.deserializer;
if (msg.content) {
if (allowNonJsonMessages) {
if (options.allowNonJsonMessages) {
try {
message = this.config.deserializer(msg.content, msg) as T;
message = deserializer(msg.content, msg) as T;
} catch {
// Pass raw message since flag `allowNonJsonMessages` is set
// Casting to `any` first as T doesn't have a type
message = msg.content.toString() as any as T;
}
} else {
message = this.config.deserializer(msg.content, msg) as T;
message = deserializer(msg.content, msg) as T;
}
}

Expand Down
13 changes: 11 additions & 2 deletions packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ export interface QueueOptions {
consumerOptions?: ConsumeOptions;
}

export type MessageDeserializer = (message: Buffer, msg: ConsumeMessage) => any;
export type MessageSerializer = (value: any) => Buffer;

export interface MessageHandlerOptions {
/**
* You can use a handler config specificied in module level.
Expand Down Expand Up @@ -107,6 +110,12 @@ export interface MessageHandlerOptions {
* @default false - By default, responses are not persistent unless this is set to true.
*/
usePersistentReplyTo?: boolean;

/**
* This function is used to deserialize the received message.
* If set, will override the module's default deserializer.
*/
deserializer?: MessageDeserializer;
}

export interface ConnectionInitOptions {
Expand Down Expand Up @@ -160,12 +169,12 @@ export interface RabbitMQConfig {
/**
* This function is used to deserialize the received message.
*/
deserializer?: (message: Buffer, msg: ConsumeMessage) => any;
deserializer?: MessageDeserializer;

/**
* This function is used to serialize the message to be sent.
*/
serializer?: (value: any) => Buffer;
serializer?: MessageSerializer;
}

export type RabbitHandlerType = 'rpc' | 'subscribe';
Expand Down

0 comments on commit 04660ba

Please sign in to comment.