diff --git a/src/constants.js b/src/constants.js index 660e62cf6..f1f383c27 100644 --- a/src/constants.js +++ b/src/constants.js @@ -34,6 +34,8 @@ module.exports = { FAILED_SEND_PONG_PACKET: "failedSendPongPacket", /** @type {String} Emitted when transit fails to send a HEARTBEAT packet*/ FAILED_SEND_HEARTBEAT_PACKET: "failedSendHeartbeatPacket", + /** @type {String} Emitted when broker fails to handler broadcast event*/ + FAILED_HANDLER_BROADCAST_EVENT: "failedHandlerBroadcastEvent", /** @type {String} Emitted when broker fails to stop all services*/ FAILED_STOPPING_SERVICES: "failedServicesStop", /** @type {String} Emitted when broker fails to stop all services*/ diff --git a/src/registry/event-catalog.js b/src/registry/event-catalog.js index db9e6317e..f792cbe6f 100644 --- a/src/registry/event-catalog.js +++ b/src/registry/event-catalog.js @@ -11,13 +11,15 @@ const utils = require("../utils"); const Strategies = require("../strategies"); const EndpointList = require("./endpoint-list"); const EventEndpoint = require("./endpoint-event"); +const EventEmitter = require("events"); +const { FAILED_HANDLER_BROADCAST_EVENT } = require("../constants"); /** * Catalog for events * * @class EventCatalog */ -class EventCatalog { +class EventCatalog extends EventEmitter { /** * Creates an instance of EventCatalog. * @@ -27,6 +29,7 @@ class EventCatalog { * @memberof EventCatalog */ constructor(registry, broker, StrategyFactory) { + super(); this.registry = registry; this.broker = broker; this.logger = registry.logger; @@ -35,6 +38,20 @@ class EventCatalog { this.events = []; this.EndpointFactory = EventEndpoint; + + this.on("broker.event", ctx => { + ctx.endpoint.event + .handler(ctx) + .catch(error => + this.broker.broadcastLocal("$broker.error", { + error, + module: "broker", + type: FAILED_HANDLER_BROADCAST_EVENT + }) + ) + // catch unresolved error + .catch(err => this.logger.error(err)); + }); } /** @@ -153,11 +170,7 @@ class EventCatalog { /** * Call local service handlers * - * @param {String} eventName - * @param {any} payload - * @param {Array?} groupNames - * @param {String} nodeID - * @param {boolean} broadcast + * @param {Context} ctx * @returns {Promise} * * @memberof EventCatalog @@ -171,8 +184,9 @@ class EventCatalog { this.events.forEach(list => { if (!utils.match(ctx.eventName, list.name)) return; if ( + // null or undefined ctx.eventGroups == null || - ctx.eventGroups.length == 0 || + ctx.eventGroups.length === 0 || ctx.eventGroups.indexOf(list.group) !== -1 ) { if (isBroadcast) { @@ -205,7 +219,7 @@ class EventCatalog { * @memberof EventCatalog */ callEventHandler(ctx) { - return ctx.endpoint.event.handler(ctx); + return this.emit("broker.event", ctx); } /** @@ -229,7 +243,7 @@ class EventCatalog { */ remove(eventName, nodeID) { this.events.forEach(list => { - if (list.name == eventName) list.removeByNodeID(nodeID); + if (list.name === eventName) list.removeByNodeID(nodeID); }); } diff --git a/test/integration/tracing.spec.js b/test/integration/tracing.spec.js index 7ea3b6f6b..2f2d13230 100644 --- a/test/integration/tracing.spec.js +++ b/test/integration/tracing.spec.js @@ -81,8 +81,7 @@ describe("Test Tracing feature with actions", () => { await Promise.all( posts.map(async post => { - const author = await ctx.call("users.get", { id: post.author }); - post.author = author; //eslint-disable-line + post.author = await ctx.call("users.get", { id: post.author }); return post; }) ); @@ -124,7 +123,7 @@ describe("Test Tracing feature with actions", () => { }, async handler(ctx) { - let user = USERS.find(user => user.id == ctx.params.id); + let user = USERS.find(user => user.id === ctx.params.id); if (user) { const span = ctx.startSpan("cloning", { tags: { @@ -223,7 +222,8 @@ describe("Test Tracing feature with actions", () => { } }); - await Promise.delay(500); + // event loop lag <10ms + await Promise.delay(510); STORE.sort((a, b) => a.startTicks - b.startTicks); diff --git a/test/unit/registry/event-catalog.spec.js b/test/unit/registry/event-catalog.spec.js index 8c3ab2296..e96824427 100644 --- a/test/unit/registry/event-catalog.spec.js +++ b/test/unit/registry/event-catalog.spec.js @@ -7,7 +7,6 @@ let EventCatalog = require("../../../src/registry/event-catalog"); let EndpointList = require("../../../src/registry/endpoint-list"); let EventEndpoint = require("../../../src/registry/endpoint-event"); let ServiceBroker = require("../../../src/service-broker"); -const { protectReject } = require("../utils"); describe("Test EventCatalog constructor", () => { let broker = new ServiceBroker({ logger: false }); @@ -521,11 +520,13 @@ describe("Test EventCatalog.callEventHandler", () => { ctx.eventGroups = ["mail", "payment"]; ctx.eventType = "emit"; - it("should add catch handler to result", () => { + it("should add catch handler to result", async () => { let resolver; ctx.endpoint.event.handler = jest.fn(() => new Promise(res => (resolver = res))); - const p = catalog.callEventHandler(ctx); + catalog.callEventHandler(ctx); + + await broker.Promise.delay(10); expect(ctx.endpoint.event.handler).toHaveBeenCalledTimes(1); expect(ctx.endpoint.event.handler).toHaveBeenCalledWith(ctx); @@ -533,16 +534,16 @@ describe("Test EventCatalog.callEventHandler", () => { expect(errorHandler).toHaveBeenCalledTimes(0); resolver(); - - return p; }); - it("should catch error", () => { + it("should catch error", async () => { let rejecter; + + const spy = jest.spyOn(broker.localBus, "emit"); ctx.endpoint.event.handler = jest.fn(() => new Promise((res, rej) => (rejecter = rej))); broker.logger.error = jest.fn(); - const p = catalog.callEventHandler(ctx); + catalog.callEventHandler(ctx); expect(ctx.endpoint.event.handler).toHaveBeenCalledTimes(1); expect(ctx.endpoint.event.handler).toHaveBeenCalledWith(ctx); @@ -550,9 +551,10 @@ describe("Test EventCatalog.callEventHandler", () => { const err = new Error("Something went wrong"); rejecter(err); - return p.then(protectReject).catch(e => { - expect(e).toBe(err); - }); + await broker.Promise.delay(10); + expect(spy.mock.calls[0][1].error).toBe(err); + + spy.mockRestore(); }); });