From 6ebfe8fad11935b64a42ad55eb3bce689b9dd2e5 Mon Sep 17 00:00:00 2001 From: Stephen Cresswell <229672+cressie176@users.noreply.github.com> Date: Mon, 27 May 2024 18:44:54 +0100 Subject: [PATCH] Workaround https://github.com/rabbitmq/rabbitmq-server/issues/11331 --- .github/workflows/node-js-ci.yml | 4 ++-- lib/amqp/SubscriberError.js | 3 ++- lib/amqp/Subscription.js | 21 ++++++++++++++------- lib/amqp/XDeath.js | 5 +++++ package.json | 2 +- test/subscriptions.tests.js | 6 +++--- test/subscriptionsAsPromised.tests.js | 4 ++-- 7 files changed, 29 insertions(+), 16 deletions(-) create mode 100644 lib/amqp/XDeath.js diff --git a/.github/workflows/node-js-ci.yml b/.github/workflows/node-js-ci.yml index f5c1a53..f3fdb76 100644 --- a/.github/workflows/node-js-ci.yml +++ b/.github/workflows/node-js-ci.yml @@ -13,7 +13,7 @@ jobs: - 15672:15672 strategy: matrix: - node-version: [14.x] + node-version: [14.x, 16.x, 18.x, 20.x] steps: - uses: actions/checkout@v3 - uses: actions/setup-node@v3 @@ -30,7 +30,7 @@ jobs: runs-on: ubuntu-latest services: rabbitmq: - image: rabbitmq:3-management-alpine + image: rabbitmq:3.13.2-management-alpine ports: - 5672:5672 - 15672:15672 diff --git a/lib/amqp/SubscriberError.js b/lib/amqp/SubscriberError.js index 6fb7ba9..9538382 100644 --- a/lib/amqp/SubscriberError.js +++ b/lib/amqp/SubscriberError.js @@ -3,6 +3,7 @@ const format = require('util').format; const _ = require('lodash'); const async = require('async'); const setTimeoutUnref = require('../utils/setTimeoutUnref'); +const { EMPTY_X_DEATH } = require('./XDeath'); module.exports = function SubscriptionRecovery(broker, vhost) { this.handle = function (session, message, err, recoveryOptions, next) { @@ -86,7 +87,7 @@ module.exports = function SubscriptionRecovery(broker, vhost) { if (strategyConfig.immediateNack) { const xDeathRecords = message.properties.headers['x-death'] || []; - const xDeath = xDeathRecords.find(({ queue, reason }) => queue === originalQueue && reason === 'rejected') || { count: 0, queue: originalQueue }; + const xDeath = xDeathRecords.find(({ queue, reason }) => queue === originalQueue && reason === 'rejected') || EMPTY_X_DEATH; _.set(publishOptions, ['headers', 'rascal', 'recovery', originalQueue], { immediateNack: true, xDeath }); } diff --git a/lib/amqp/Subscription.js b/lib/amqp/Subscription.js index fabc988..becf144 100644 --- a/lib/amqp/Subscription.js +++ b/lib/amqp/Subscription.js @@ -8,6 +8,7 @@ const SubscriberSession = require('./SubscriberSession'); const SubscriberError = require('./SubscriberError'); const backoff = require('../backoff'); const setTimeoutUnref = require('../utils/setTimeoutUnref'); +const { EMPTY_X_DEATH } = require('./XDeath'); module.exports = { create(broker, vhost, counter, config, next) { @@ -214,16 +215,22 @@ function Subscription(broker, vhost, subscriptionConfig, counter) { function immediateNack(message) { const originalQueue = message.properties.headers.rascal.originalQueue; const xDeathRecords = message.properties.headers['x-death'] || []; - const currentXDeath = xDeathRecords.find(({ queue, reason }) => queue === originalQueue && reason === 'rejected') || { count: 0, queue: originalQueue }; - const previousXDeath = _.get(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'xDeath'], { count: 0, queue: originalQueue }); + const currentXDeath = xDeathRecords.find(({ queue, reason }) => queue === originalQueue && reason === 'rejected') || EMPTY_X_DEATH; + const previousXDeath = _.get(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'xDeath'], EMPTY_X_DEATH); const hasImmediateNackHeader = _.has(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'immediateNack']); if (!hasImmediateNackHeader) return false; debug('Message %s has been marked for immediate nack. Previous xDeath is %o. Current xDeath is %o.', message.properties.messageId, previousXDeath, currentXDeath); - if (currentXDeath.count === previousXDeath.count) return true; - debug('Message %s has been replayed after being dead lettered. Removing immediate nack.', message.properties.messageId); - _.unset(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'immediateNack']); - _.unset(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'xDeath']); - return false; + // See https://github.com/rabbitmq/rabbitmq-server/issues/11331 + // RabbitMQ v3.13 stopped updating the xDeath record's count property. + // RabbitMQ v3.12 does not update the xDeath record's time property. + // Therefore having test them both + if (currentXDeath.count > previousXDeath.count || currentXDeath.time.value > previousXDeath.time.value) { + debug('Message %s has been replayed after being dead lettered. Removing immediate nack.', message.properties.messageId); + _.unset(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'immediateNack']); + _.unset(message, ['properties', 'headers', 'rascal', 'recovery', originalQueue, 'xDeath']); + return false; + } + return true; } function getAckOrNack(session, message) { diff --git a/lib/amqp/XDeath.js b/lib/amqp/XDeath.js new file mode 100644 index 0000000..f28bdf5 --- /dev/null +++ b/lib/amqp/XDeath.js @@ -0,0 +1,5 @@ +const EMPTY_X_DEATH = { count: 0, time: { value: 0 } }; + +module.exports = { + EMPTY_X_DEATH, +}; diff --git a/package.json b/package.json index 8e39fc8..92f6fc9 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,7 @@ "lint:fix": "eslint --fix .", "lint-staged": "lint-staged", "coverage": "nyc --report html --reporter lcov --reporter text-summary zUnit", - "docker": "docker run -d --name rascal-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management-alpine", + "docker": "docker run -d --name rascal-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.12.9-management-alpine", "prepare": "husky install" }, "lint-staged": { diff --git a/test/subscriptions.tests.js b/test/subscriptions.tests.js index ff46f08..0a3b782 100644 --- a/test/subscriptions.tests.js +++ b/test/subscriptions.tests.js @@ -1651,7 +1651,7 @@ describe('Subscriptions', () => { count++; if (count === 1) { assert.ok(message); - ackOrNack(new Error('immediate nack'), { + ackOrNack(new Error(`Test Error ${count}`), { strategy: 'republish', immediateNack: true, }); @@ -1739,7 +1739,7 @@ describe('Subscriptions', () => { count++; if (count <= 2) { assert.ok(message); - ackOrNack(new Error(`immediate nack: ${count}`), { + ackOrNack(new Error(`Test Error ${count}`), { strategy: 'republish', immediateNack: true, }); @@ -1762,7 +1762,7 @@ describe('Subscriptions', () => { }); }, ); - }, { exclusive: true, timeout: 10000 }); + }); it('should forward messages to publication when requested', (test, done) => { createBroker( diff --git a/test/subscriptionsAsPromised.tests.js b/test/subscriptionsAsPromised.tests.js index 5b5b024..85e01ec 100644 --- a/test/subscriptionsAsPromised.tests.js +++ b/test/subscriptionsAsPromised.tests.js @@ -1096,7 +1096,7 @@ describe('Subscriptions As Promised', () => { subscription.on('message', (message, content, ackOrNack) => { assert.strictEqual(++count, 1); assert.ok(message); - ackOrNack(new Error('immediate nack'), { strategy: 'republish', immediateNack: true }); + ackOrNack(new Error(`Test Error ${count}`), { strategy: 'republish', immediateNack: true }); }); }); @@ -1167,7 +1167,7 @@ describe('Subscriptions As Promised', () => { count++; if (count === 1) { assert.ok(message); - ackOrNack(new Error('immediate nack'), { + ackOrNack(new Error(`Test Error ${count}`), { strategy: 'republish', immediateNack: true, });