Skip to content

Commit

Permalink
Merge pull request #12974 from toxol/rmq-nack-when-no-message-handler
Browse files Browse the repository at this point in the history
fix(microservices): send rmq nack without matching message handler
  • Loading branch information
kamilmysliwiec committed Feb 7, 2024
2 parents f477d5b + e43df5a commit a58f6a9
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 1 deletion.
7 changes: 6 additions & 1 deletion packages/microservices/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ export const RQM_NO_EVENT_HANDLER = (
text: TemplateStringsArray,
pattern: string,
) =>
`An unsupported event was received. It has been acknowledged, so it will not be re-delivered. Pattern: ${pattern}`;
`An unsupported event was received. It has been negative acknowledged, so it will not be re-delivered. Pattern: ${pattern}`;
export const RQM_NO_MESSAGE_HANDLER = (
text: TemplateStringsArray,
pattern: string,
) =>
`An unsupported message was received. It has been negative acknowledged, so it will not be re-delivered. Pattern: ${pattern}`;
export const GRPC_DEFAULT_PROTO_LOADER = '@grpc/proto-loader';

export const NO_EVENT_HANDLER = (text: TemplateStringsArray, pattern: string) =>
Expand Down
5 changes: 5 additions & 0 deletions packages/microservices/server/server-rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
RQM_DEFAULT_QUEUE_OPTIONS,
RQM_DEFAULT_URL,
RQM_NO_EVENT_HANDLER,
RQM_NO_MESSAGE_HANDLER,
} from '../constants';
import { RmqContext } from '../ctx-host';
import { Transport } from '../enums';
Expand Down Expand Up @@ -184,6 +185,10 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
const handler = this.getHandlerByPattern(pattern);

if (!handler) {
if (!this.noAck) {
this.logger.warn(RQM_NO_MESSAGE_HANDLER`${pattern}`);
this.channel.nack(rmqContext.getMessage(), false, false);
}
const status = 'error';
const noHandlerPacket = {
id: (packet as IncomingRequest).id,
Expand Down
26 changes: 26 additions & 0 deletions packages/microservices/test/server/server-rmq.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ describe('ServerRMQ', () => {
sendMessageStub = sinon.stub(server, 'sendMessage').callsFake(() => ({}));
(server as any).channel = channel;
});
afterEach(() => {
channel.nack.resetHistory();
});
it('should call "handleEvent" if identifier is not present', async () => {
const handleEventSpy = sinon.spy(server, 'handleEvent');
await server.handleMessage(createMessage({ pattern: '', data: '' }), '');
Expand Down Expand Up @@ -149,6 +152,29 @@ describe('ServerRMQ', () => {
assert.fail('Was not supposed to throw an error');
});
});
it('should negative acknowledge if message does not exists in handlers object and noAck option is false', async () => {
(server as any).noAck = false;
await server.handleMessage(msg, '');
expect(channel.nack.calledWith(msg, false, false)).to.be.true;
expect(
sendMessageStub.calledWith({
id: '3',
status: 'error',
err: NO_MESSAGE_HANDLER,
}),
).to.be.true;
});
it('should not negative acknowledge if key does not exists in handlers object and noAck option is true', async () => {
await server.handleMessage(msg, '');
expect(channel.nack.notCalled).to.be.true;
expect(
sendMessageStub.calledWith({
id: '3',
status: 'error',
err: NO_MESSAGE_HANDLER,
}),
).to.be.true;
});
});
describe('setupChannel', () => {
const queue = 'test';
Expand Down

0 comments on commit a58f6a9

Please sign in to comment.