Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connection.ts): adds deserializer options to message handlers (c… #754

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 63 additions & 19 deletions integration/rabbitmq/e2e/serialization.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,56 @@ import {
AmqpConnection,
RabbitMQModule,
RabbitSubscribe,
MessageSerializer,
MessageDeserializer,
} from '@golevelup/nestjs-rabbitmq';
import { INestApplication, Injectable, LoggerService } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { createMock } from '@golevelup/ts-jest';
import { gzipSync, gunzipSync } from 'node:zlib';

const testHandler = jest.fn();
const moduleSerializeHandler = jest.fn();
const handlerSerializeHandler = jest.fn();

const moduleSerializer: MessageSerializer = jest.fn((value) =>
gzipSync(JSON.stringify(value)),
);
const moduleDeserializer: MessageDeserializer = jest.fn((message) =>
JSON.parse(gunzipSync(message).toString()),
);

const handlerDeserializer: MessageDeserializer = jest.fn((message) =>
JSON.parse(gunzipSync(message).toString()),
);

const exchange = 'testSerializationExchange';
const routingKey1 = 'testSerializationRoute1';
const routingKey2 = 'testSerializationRoute2';
const routingKey3 = 'testSerializationRoute3';
const routingKey4 = 'testSerializationRoute4';

@Injectable()
class SubscribeService {
class SubscribeModuleSerializationService {
@RabbitSubscribe({
exchange,
routingKey: [routingKey1, routingKey2],
queue: 'serializeQueue',
queue: 'moduleSerializeQueue',
})
handleSubscribe(message: object) {
moduleSerializeHandler(message);
}
}

@Injectable()
class SubscribeHandlerSerializationService {
@RabbitSubscribe({
exchange,
routingKey: [routingKey3, routingKey4],
queue: 'handlerSerializeQueue',
deserializer: handlerDeserializer,
})
handleSubscribe(message: object) {
testHandler(message);
handlerSerializeHandler(message);
}
}

Expand All @@ -39,9 +68,14 @@ describe('Rabbit Subscribe', () => {
process.env.NODE_ENV === 'ci' ? process.env.RABBITMQ_PORT : '5672';
const uri = `amqp://rabbitmq:rabbitmq@${rabbitHost}:${rabbitPort}`;

beforeAll(async () => {
beforeEach(async () => {
jest.clearAllMocks();

const moduleFixture = await Test.createTestingModule({
providers: [SubscribeService],
providers: [
SubscribeModuleSerializationService,
SubscribeHandlerSerializationService,
],
imports: [
RabbitMQModule.forRoot(RabbitMQModule, {
exchanges: [
Expand All @@ -53,12 +87,8 @@ describe('Rabbit Subscribe', () => {
uri,
connectionInitOptions: { wait: true, reject: true, timeout: 3000 },
logger: customLogger,
serializer: (value) => {
return gzipSync(JSON.stringify(value));
},
deserializer: (message) => {
return JSON.parse(gunzipSync(message).toString());
},
serializer: moduleSerializer,
deserializer: moduleDeserializer,
}),
],
}).compile();
Expand All @@ -72,19 +102,33 @@ describe('Rabbit Subscribe', () => {
await app?.close();
});

beforeEach(() => {
jest.resetAllMocks();
it('should receive subscribe messages and serialize/deserialize them via module options', async () => {
[routingKey1, routingKey2].forEach((x, i) =>
amqpConnection.publish(exchange, x, `testMessage-${i}`),
);

await new Promise((resolve) => setTimeout(resolve, 50));

expect(moduleSerializeHandler).toHaveBeenCalledTimes(2);
expect(moduleSerializeHandler).toHaveBeenCalledWith(`testMessage-0`);
expect(moduleSerializeHandler).toHaveBeenCalledWith(`testMessage-1`);
expect(moduleDeserializer).toHaveBeenCalledTimes(2);
expect(moduleSerializer).toHaveBeenCalledTimes(2);
expect(handlerDeserializer).not.toBeCalled();
});

it('should receive subscribe messages and handle them', async () => {
[routingKey1, routingKey2].forEach((x, i) =>
it('should receive subscribe messages and deserialize via handler options', async () => {
[routingKey3, routingKey4].forEach((x, i) =>
amqpConnection.publish(exchange, x, `testMessage-${i}`),
);

await new Promise((resolve) => setTimeout(resolve, 50));

expect(testHandler).toHaveBeenCalledTimes(2);
expect(testHandler).toHaveBeenCalledWith(`testMessage-0`);
expect(testHandler).toHaveBeenCalledWith(`testMessage-1`);
expect(handlerSerializeHandler).toHaveBeenCalledTimes(2);
expect(handlerSerializeHandler).toHaveBeenCalledWith(`testMessage-0`);
expect(handlerSerializeHandler).toHaveBeenCalledWith(`testMessage-1`);
expect(handlerDeserializer).toHaveBeenCalledTimes(2);
expect(moduleDeserializer).not.toBeCalled();
expect(moduleSerializer).toHaveBeenCalledTimes(2);
});
});
32 changes: 18 additions & 14 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import {
RequestOptions,
RabbitMQChannelConfig,
ConsumeOptions,
MessageDeserializer,
MessageSerializer,
} from '../rabbitmq.interfaces';
import {
getHandlerForLegacyBehavior,
Expand Down Expand Up @@ -471,11 +473,10 @@ export class AmqpConnection {
throw new Error('Received null message');
}

const response = await this.handleMessage(
handler,
msg,
msgOptions.allowNonJsonMessages
);
const response = await this.handleMessage(handler, msg, {
allowNonJsonMessages: msgOptions.allowNonJsonMessages,
deserializer: msgOptions.deserializer,
});

if (response instanceof Nack) {
channel.nack(msg, false, response.requeue);
Expand Down Expand Up @@ -578,11 +579,10 @@ export class AmqpConnection {
return;
}

const response = await this.handleMessage(
handler,
msg,
rpcOptions.allowNonJsonMessages
);
const response = await this.handleMessage(handler, msg, {
allowNonJsonMessages: rpcOptions.allowNonJsonMessages,
deserializer: rpcOptions.deserializer,
});

if (response instanceof Nack) {
channel.nack(msg, false, response.requeue);
Expand Down Expand Up @@ -657,21 +657,25 @@ export class AmqpConnection {
headers?: any
) => Promise<U>,
msg: ConsumeMessage,
allowNonJsonMessages?: boolean
options: {
allowNonJsonMessages?: boolean;
deserializer?: MessageDeserializer;
}
) {
let message: T | undefined = undefined;
let headers: any = undefined;
const deserializer = options.deserializer || this.config.deserializer;
if (msg.content) {
if (allowNonJsonMessages) {
if (options.allowNonJsonMessages) {
try {
message = this.config.deserializer(msg.content, msg) as T;
message = deserializer(msg.content, msg) as T;
} catch {
// Pass raw message since flag `allowNonJsonMessages` is set
// Casting to `any` first as T doesn't have a type
message = msg.content.toString() as any as T;
}
} else {
message = this.config.deserializer(msg.content, msg) as T;
message = deserializer(msg.content, msg) as T;
}
}

Expand Down
13 changes: 11 additions & 2 deletions packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ export interface QueueOptions {
consumerOptions?: ConsumeOptions;
}

export type MessageDeserializer = (message: Buffer, msg: ConsumeMessage) => any;
export type MessageSerializer = (value: any) => Buffer;

export interface MessageHandlerOptions {
/**
* You can use a handler config specificied in module level.
Expand Down Expand Up @@ -107,6 +110,12 @@ export interface MessageHandlerOptions {
* @default false - By default, responses are not persistent unless this is set to true.
*/
usePersistentReplyTo?: boolean;

/**
* This function is used to deserialize the received message.
* If set, will override the module's default deserializer.
*/
deserializer?: MessageDeserializer;
}

export interface ConnectionInitOptions {
Expand Down Expand Up @@ -160,12 +169,12 @@ export interface RabbitMQConfig {
/**
* This function is used to deserialize the received message.
*/
deserializer?: (message: Buffer, msg: ConsumeMessage) => any;
deserializer?: MessageDeserializer;

/**
* This function is used to serialize the message to be sent.
*/
serializer?: (value: any) => Buffer;
serializer?: MessageSerializer;
}

export type RabbitHandlerType = 'rpc' | 'subscribe';
Expand Down
Loading