From 1a224438861d28e18b8d69404c273bd9e71e52a2 Mon Sep 17 00:00:00 2001 From: Anton Date: Sun, 15 Dec 2024 14:20:24 +0300 Subject: [PATCH 1/4] fix: updated moleculer-timeout-middleware with streams --- src/middlewares/timeout.js | 6 ++++++ src/transit.js | 16 ++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/src/middlewares/timeout.js b/src/middlewares/timeout.js index fefa439fe..9f2873313 100644 --- a/src/middlewares/timeout.js +++ b/src/middlewares/timeout.js @@ -6,6 +6,7 @@ "use strict"; +const { Stream } = require("stream"); const { RequestTimeoutError } = require("../errors"); const { METRIC } = require("../metrics"); @@ -38,6 +39,11 @@ module.exports = function (broker) { nodeID, timeout: ctx.options.timeout }); + + if (ctx.params instanceof Stream) { + ctx.params.emit('moleculer-timeout-middleware', ctx.options.timeout) + } + err = new RequestTimeoutError({ action: actionName, nodeID }); broker.metrics.increment(METRIC.MOLECULER_REQUEST_TIMEOUT_TOTAL, { diff --git a/src/transit.js b/src/transit.js index 3c980b3bf..f72f714b7 100644 --- a/src/transit.js +++ b/src/transit.js @@ -550,6 +550,13 @@ class Transit { pass.$pool = new Map(); this.pendingReqStreams.set(payload.id, { sender: payload.sender, stream: pass }); + + pass.on('moleculer-timeout-middleware', (timeout) => { + setTimeout(() => { + this.pendingReqStreams.delete(payload.id); + this._destroyStreamIfPossible(pass, `Pending request stream ${payload.id} have been closed by timeout ${timeout} ms`) + }, 1000); + }) } if (payload.seq > pass.$prevSeq + 1) { @@ -864,6 +871,14 @@ class Transit { // Add to pendings this.pendingRequests.set(ctx.id, request); + if (request.stream) { + const pass = request.ctx.params + + pass.on('moleculer-timeout-middleware', (timeout) => { + this._destroyStreamIfPossible(pass, `Request stream ${ctx.id} have been closed by timeout ${timeout} ms`) + }) + } + // Publish request return this.publish(packet) .then(() => { @@ -1103,6 +1118,7 @@ class Transit { */ _destroyStreamIfPossible(stream, errorMessage) { if (!stream.destroyed && stream.destroy) { + stream.on('error', (err) => this.logger.error(err.message)) stream.destroy(new Error(errorMessage)); } } From dac65180c585ab636a95e20994904c35a83e87aa Mon Sep 17 00:00:00 2001 From: Anton Date: Tue, 17 Dec 2024 20:57:34 +0300 Subject: [PATCH 2/4] fix: separate local requires --- src/middlewares/timeout.js | 1 + 1 file changed, 1 insertion(+) diff --git a/src/middlewares/timeout.js b/src/middlewares/timeout.js index 9f2873313..59de2899b 100644 --- a/src/middlewares/timeout.js +++ b/src/middlewares/timeout.js @@ -7,6 +7,7 @@ "use strict"; const { Stream } = require("stream"); + const { RequestTimeoutError } = require("../errors"); const { METRIC } = require("../metrics"); From cd743bc8a7b015efff462cc0f34096e5f1cda501 Mon Sep 17 00:00:00 2001 From: Anton Date: Tue, 14 Jan 2025 18:45:46 +0300 Subject: [PATCH 3/4] fix: updated eslint errors /src/middlewares/timeout.js /src/transit.js --- src/middlewares/timeout.js | 2 +- src/transit.js | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/middlewares/timeout.js b/src/middlewares/timeout.js index 59de2899b..95fdba0a0 100644 --- a/src/middlewares/timeout.js +++ b/src/middlewares/timeout.js @@ -42,7 +42,7 @@ module.exports = function (broker) { }); if (ctx.params instanceof Stream) { - ctx.params.emit('moleculer-timeout-middleware', ctx.options.timeout) + ctx.params.emit("moleculer-timeout-middleware", ctx.options.timeout); } err = new RequestTimeoutError({ action: actionName, nodeID }); diff --git a/src/transit.js b/src/transit.js index f72f714b7..e3e2adacf 100644 --- a/src/transit.js +++ b/src/transit.js @@ -551,12 +551,12 @@ class Transit { this.pendingReqStreams.set(payload.id, { sender: payload.sender, stream: pass }); - pass.on('moleculer-timeout-middleware', (timeout) => { + pass.on("moleculer-timeout-middleware", (timeout) => { setTimeout(() => { this.pendingReqStreams.delete(payload.id); - this._destroyStreamIfPossible(pass, `Pending request stream ${payload.id} have been closed by timeout ${timeout} ms`) + this._destroyStreamIfPossible(pass, `Pending request stream ${payload.id} have been closed by timeout ${timeout} ms`); }, 1000); - }) + }); } if (payload.seq > pass.$prevSeq + 1) { @@ -872,11 +872,11 @@ class Transit { this.pendingRequests.set(ctx.id, request); if (request.stream) { - const pass = request.ctx.params + const pass = request.ctx.params; - pass.on('moleculer-timeout-middleware', (timeout) => { - this._destroyStreamIfPossible(pass, `Request stream ${ctx.id} have been closed by timeout ${timeout} ms`) - }) + pass.on("moleculer-timeout-middleware", (timeout) => { + this._destroyStreamIfPossible(pass, `Request stream ${ctx.id} have been closed by timeout ${timeout} ms`); + }); } // Publish request @@ -1118,7 +1118,7 @@ class Transit { */ _destroyStreamIfPossible(stream, errorMessage) { if (!stream.destroyed && stream.destroy) { - stream.on('error', (err) => this.logger.error(err.message)) + stream.on("error", (err) => this.logger.error(err.message)); stream.destroy(new Error(errorMessage)); } } From 58b8548fa9e79464ceabf03116c634432b328f60 Mon Sep 17 00:00:00 2001 From: Anton Date: Tue, 14 Jan 2025 19:07:03 +0300 Subject: [PATCH 4/4] fix: updated eslint errors /src/transit.js --- src/transit.js | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/transit.js b/src/transit.js index e3e2adacf..e89541986 100644 --- a/src/transit.js +++ b/src/transit.js @@ -551,10 +551,13 @@ class Transit { this.pendingReqStreams.set(payload.id, { sender: payload.sender, stream: pass }); - pass.on("moleculer-timeout-middleware", (timeout) => { + pass.on("moleculer-timeout-middleware", timeout => { setTimeout(() => { this.pendingReqStreams.delete(payload.id); - this._destroyStreamIfPossible(pass, `Pending request stream ${payload.id} have been closed by timeout ${timeout} ms`); + this._destroyStreamIfPossible( + pass, + `Pending request stream ${payload.id} have been closed by timeout ${timeout} ms` + ); }, 1000); }); } @@ -874,8 +877,11 @@ class Transit { if (request.stream) { const pass = request.ctx.params; - pass.on("moleculer-timeout-middleware", (timeout) => { - this._destroyStreamIfPossible(pass, `Request stream ${ctx.id} have been closed by timeout ${timeout} ms`); + pass.on("moleculer-timeout-middleware", timeout => { + this._destroyStreamIfPossible( + pass, + `Request stream ${ctx.id} have been closed by timeout ${timeout} ms` + ); }); } @@ -1118,7 +1124,7 @@ class Transit { */ _destroyStreamIfPossible(stream, errorMessage) { if (!stream.destroyed && stream.destroy) { - stream.on("error", (err) => this.logger.error(err.message)); + stream.on("error", err => this.logger.error(err.message)); stream.destroy(new Error(errorMessage)); } }