From 10e85ef08f9193c692631fd8a708fdacae6c7f20 Mon Sep 17 00:00:00 2001 From: Vitalii Kovalchuk Date: Thu, 6 Jun 2019 16:39:04 +0300 Subject: [PATCH 1/4] add support of object storage on error --- lib/amqp.js | 5 +++-- spec/amqp.spec.js | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index fa78f69b..4b6c40fe 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -215,8 +215,9 @@ class Amqp { const payload = { error: encryptedError }; - - if (originalMessageContent && originalMessageContent !== '') { + if (headers.objectId) { + payload.objectId = headers.objectId; + } else if (originalMessageContent && originalMessageContent !== '') { payload.errorInput = originalMessageContent.toString(); } const errorPayload = JSON.stringify(payload); diff --git a/spec/amqp.spec.js b/spec/amqp.spec.js index 634681c6..4f4f41a1 100644 --- a/spec/amqp.spec.js +++ b/spec/amqp.spec.js @@ -497,6 +497,50 @@ describe('AMQP', () => { expect(publishParameters[3]).toEqual(props); }); + it('Should send objectId instead of message when process error', () => { + + const amqp = new Amqp(settings); + const objectId = 'aef703e0-7ebf-456f-a7b9-20b647ab43cd'; + amqp.publishChannel = jasmine.createSpyObj('publishChannel', ['publish']); + + + const props = { + contentType: 'application/json', + contentEncoding: 'utf8', + mandatory: true, + headers: { + objectId, + taskId: 'task1234567890', + stepId: 'step_456' + } + }; + + amqp.sendError(new Error('Test error'), props, message.content); + + expect(amqp.publishChannel.publish).toHaveBeenCalled(); + expect(amqp.publishChannel.publish.callCount).toEqual(1); + + const publishParameters = amqp.publishChannel.publish.calls[0].args; + expect(publishParameters).toEqual([ + settings.PUBLISH_MESSAGES_TO, + settings.ERROR_ROUTING_KEY, + jasmine.any(Object), + props + ]); + + const payload = JSON.parse(publishParameters[2].toString()); + payload.error = encryptor.decryptMessageContent(payload.error); + + expect(payload).toEqual({ + error: { + name: 'Error', + message: 'Test error', + stack: jasmine.any(String) + }, + objectId + }); + }); + it('Should send message to rebounds when rebound happened', () => { const amqp = new Amqp(settings); From 24c0de59bd9d1e24b92d59f1cdf789ad4568d871 Mon Sep 17 00:00:00 2001 From: Vitalii Kovalchuk Date: Fri, 7 Jun 2019 18:37:54 +0300 Subject: [PATCH 2/4] add support of object storage on error --- lib/amqp.js | 5 +-- lib/sailor.js | 4 +++ .../actions/error_action.js | 18 +++++++++++ .../integration_component/component.json | 4 +++ mocha_spec/run.spec.js | 31 +++++++++++++++++++ spec/amqp.spec.js | 2 +- 6 files changed, 61 insertions(+), 3 deletions(-) create mode 100644 mocha_spec/integration_component/actions/error_action.js diff --git a/lib/amqp.js b/lib/amqp.js index 4b6c40fe..fe86a58b 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -215,8 +215,9 @@ class Amqp { const payload = { error: encryptedError }; - if (headers.objectId) { - payload.objectId = headers.objectId; + if (properties[Symbol.for('objectId')]) { + payload.objectId = properties[Symbol.for('objectId')]; + delete properties.objectId; } else if (originalMessageContent && originalMessageContent !== '') { payload.errorInput = originalMessageContent.toString(); } diff --git a/lib/sailor.js b/lib/sailor.js index 359966cb..d89b894f 100644 --- a/lib/sailor.js +++ b/lib/sailor.js @@ -205,6 +205,7 @@ class Sailor { const traceId = message.properties.headers[AMQP_HEADER_TRACE_ID] || 'unknown'; const messageId = message.properties.headers.messageId || 'unknown'; const parentMessageId = message.properties.headers.parentMessageId || 'unknown'; + const messageObjectId = message.properties.headers.objectId; const deliveryTag = message.fields.deliveryTag; log.info({ deliveryTag, @@ -337,6 +338,7 @@ class Sailor { }, 'processMessage emit error'); headers.end = new Date().getTime(); const props = createAmqpProperties(headers); + props[Symbol.for('objectId')] = messageObjectId; return self.amqpConnection.sendError(err, props, message.content, self.throttles.error); } @@ -355,6 +357,7 @@ class Sailor { headers.end = new Date().getTime(); headers.reboundReason = err.message; const props = createAmqpProperties(headers); + props[Symbol.for('objectId')] = messageObjectId; return self.amqpConnection.sendRebound(err, message, props); } @@ -452,6 +455,7 @@ class Sailor { function onModuleNotFound(err) { console.error(err.stack); outgoingMessageHeaders.end = new Date().getTime(); + outgoingMessageHeaders[Symbol.for('objectId')] = messageObjectId; self.amqpConnection.sendError(err, outgoingMessageHeaders, message.content); self.amqpConnection.reject(message); } diff --git a/mocha_spec/integration_component/actions/error_action.js b/mocha_spec/integration_component/actions/error_action.js new file mode 100644 index 00000000..e7bcbef9 --- /dev/null +++ b/mocha_spec/integration_component/actions/error_action.js @@ -0,0 +1,18 @@ +'use strict'; + +exports.process = processAction; + +function processAction(msg, cfg) { + + //eslint-disable-next-line no-invalid-this + this.emit('error', { + statusCode: 200, + body: 'Ok', + headers: { + 'content-type': 'text/plain' + } + }); + //eslint-disable-next-line no-invalid-this + this.emit('end'); + +} diff --git a/mocha_spec/integration_component/component.json b/mocha_spec/integration_component/component.json index c36d216d..83413f49 100644 --- a/mocha_spec/integration_component/component.json +++ b/mocha_spec/integration_component/component.json @@ -33,6 +33,10 @@ "http_reply_action": { "main": "./actions/http_reply_action.js", "title": "Http Reply Action" + }, + "error_action": { + "main": "./actions/error_action.js", + "title": "Error Action" } } } diff --git a/mocha_spec/run.spec.js b/mocha_spec/run.spec.js index 6fb6e131..3ec3deed 100644 --- a/mocha_spec/run.spec.js +++ b/mocha_spec/run.spec.js @@ -201,6 +201,37 @@ describe('Integration Test', () => { return new Promise((resolve) => { done = resolve; }); }); + it('should send objectId on error instead of original message', async () => { + + process.env.ELASTICIO_OBJECT_STORAGE_ENABLED = true; + env.ELASTICIO_FUNCTION = 'error_action'; + + helpers.mockApiTaskStepResponse(); + + const objectStorageGet = nock(process.env.ELASTICIO_OBJECT_STORAGE_URI) + .get(`/objects/${objectId}`) + .matchHeader('authorization', /Bearer/) + .reply(200, await helpers.encryptForObjectStorage(inputMessage), { + 'content-type': 'application/octet-stream' + }); + + let done; + amqpHelper.on('data', ({ emittedMessage }) => { + expect(emittedMessage.objectId).to.equal(objectId); + expect(objectStorageGet.isDone()).to.be.true; + done(); + }); + + run = requireRun(); + + amqpHelper.publishMessage('', { + parentMessageId, + traceId + }, { objectId }); + + return new Promise((resolve) => { done = resolve; }); + }); + it('should get object storage message if error repeat succeed', async () => { process.env.ELASTICIO_OBJECT_STORAGE_ENABLED = true; diff --git a/spec/amqp.spec.js b/spec/amqp.spec.js index 4f4f41a1..fd6fcfdd 100644 --- a/spec/amqp.spec.js +++ b/spec/amqp.spec.js @@ -508,8 +508,8 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, + [Symbol.for('objectId')]: objectId, headers: { - objectId, taskId: 'task1234567890', stepId: 'step_456' } From 3c24bd45e882071e385f358ca51ee617e8a432cd Mon Sep 17 00:00:00 2001 From: Vitalii Kovalchuk Date: Mon, 10 Jun 2019 12:23:07 +0300 Subject: [PATCH 3/4] change error handler for better code structure --- lib/amqp.js | 15 ++++++++------- lib/sailor.js | 11 ++++------- spec/amqp.spec.js | 10 ++++++---- spec/sailor.spec.js | 4 ++-- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/lib/amqp.js b/lib/amqp.js index fe86a58b..61b9ba86 100644 --- a/lib/amqp.js +++ b/lib/amqp.js @@ -202,9 +202,11 @@ class Amqp { return this.prepareMessageAndSendToExchange(data, properties, routingKey); } - async sendError(err, properties, originalMessageContent, throttle) { + async sendError(err, properties, originalMessage, throttle) { const settings = this.settings; const headers = properties.headers; + const messageContent = _.get(originalMessage , 'content'); + const messageObjectId = _.get(originalMessage, 'properties.headers.objectId'); const encryptedError = encryptor.encryptMessageContent({ name: err.name, @@ -215,11 +217,10 @@ class Amqp { const payload = { error: encryptedError }; - if (properties[Symbol.for('objectId')]) { - payload.objectId = properties[Symbol.for('objectId')]; - delete properties.objectId; - } else if (originalMessageContent && originalMessageContent !== '') { - payload.errorInput = originalMessageContent.toString(); + if (messageObjectId) { + payload.objectId = messageObjectId; + } else if (messageContent && messageContent !== '') { + payload.errorInput = messageContent.toString(); } const errorPayload = JSON.stringify(payload); @@ -247,7 +248,7 @@ class Amqp { return this.sendError( new Error('Rebound limit exceeded'), properties, - originalMessage.content + originalMessage ); } else { properties.expiration = getExpiration(reboundIteration); diff --git a/lib/sailor.js b/lib/sailor.js index d89b894f..829e2036 100644 --- a/lib/sailor.js +++ b/lib/sailor.js @@ -205,7 +205,6 @@ class Sailor { const traceId = message.properties.headers[AMQP_HEADER_TRACE_ID] || 'unknown'; const messageId = message.properties.headers.messageId || 'unknown'; const parentMessageId = message.properties.headers.parentMessageId || 'unknown'; - const messageObjectId = message.properties.headers.objectId; const deliveryTag = message.fields.deliveryTag; log.info({ deliveryTag, @@ -338,8 +337,7 @@ class Sailor { }, 'processMessage emit error'); headers.end = new Date().getTime(); const props = createAmqpProperties(headers); - props[Symbol.for('objectId')] = messageObjectId; - return self.amqpConnection.sendError(err, props, message.content, self.throttles.error); + return self.amqpConnection.sendError(err, props, message, self.throttles.error); } async function onRebound(err) { @@ -357,7 +355,6 @@ class Sailor { headers.end = new Date().getTime(); headers.reboundReason = err.message; const props = createAmqpProperties(headers); - props[Symbol.for('objectId')] = messageObjectId; return self.amqpConnection.sendRebound(err, message, props); } @@ -453,10 +450,10 @@ class Sailor { } function onModuleNotFound(err) { + const headers = _.clone(outgoingMessageHeaders); console.error(err.stack); - outgoingMessageHeaders.end = new Date().getTime(); - outgoingMessageHeaders[Symbol.for('objectId')] = messageObjectId; - self.amqpConnection.sendError(err, outgoingMessageHeaders, message.content); + headers.end = new Date().getTime(); + self.amqpConnection.sendError(err, headers, message); self.amqpConnection.reject(message); } diff --git a/spec/amqp.spec.js b/spec/amqp.spec.js index fd6fcfdd..d74aac66 100644 --- a/spec/amqp.spec.js +++ b/spec/amqp.spec.js @@ -312,7 +312,7 @@ describe('AMQP', () => { } }; - amqp.sendError(new Error('Test error'), props, message.content); + amqp.sendError(new Error('Test error'), props, message); expect(amqp.publishChannel.publish).toHaveBeenCalled(); expect(amqp.publishChannel.publish.callCount).toEqual(1); @@ -371,7 +371,7 @@ describe('AMQP', () => { }; async function test() { - await amqp.sendError(new Error('Test error'), props, message.content); + await amqp.sendError(new Error('Test error'), props, message); } test().then(() => { @@ -508,14 +508,16 @@ describe('AMQP', () => { contentType: 'application/json', contentEncoding: 'utf8', mandatory: true, - [Symbol.for('objectId')]: objectId, headers: { taskId: 'task1234567890', stepId: 'step_456' } }; - amqp.sendError(new Error('Test error'), props, message.content); + const msgWithObjectId = _.cloneDeep(message); + msgWithObjectId.properties.headers.objectId = objectId; + + amqp.sendError(new Error('Test error'), props, msgWithObjectId); expect(amqp.publishChannel.publish).toHaveBeenCalled(); expect(amqp.publishChannel.publish.callCount).toEqual(1); diff --git a/spec/sailor.spec.js b/spec/sailor.spec.js index 76f89833..a69ea000 100644 --- a/spec/sailor.spec.js +++ b/spec/sailor.spec.js @@ -590,7 +590,7 @@ describe('Sailor', () => { expect(fakeAMQPConnection.sendError).toHaveBeenCalled(); expect(fakeAMQPConnection.sendError.calls[0].args[0].message).toEqual('Some component error'); expect(fakeAMQPConnection.sendError.calls[0].args[0].stack).not.toBeUndefined(); - expect(fakeAMQPConnection.sendError.calls[0].args[2]).toEqual(message.content); + expect(fakeAMQPConnection.sendError.calls[0].args[2]).toEqual(message); expect(fakeAMQPConnection.reject).toHaveBeenCalled(); expect(fakeAMQPConnection.reject.callCount).toEqual(1); @@ -654,7 +654,7 @@ describe('Sailor', () => { /* eslint-enable */ ); expect(fakeAMQPConnection.sendError.calls[0].args[0].stack).not.toBeUndefined(); - expect(fakeAMQPConnection.sendError.calls[0].args[2]).toEqual(message.content); + expect(fakeAMQPConnection.sendError.calls[0].args[2]).toEqual(message); expect(fakeAMQPConnection.reject).toHaveBeenCalled(); expect(fakeAMQPConnection.reject.callCount).toEqual(1); From 42c8078c6d2062eb3fea0a2a94701d52bc60acb5 Mon Sep 17 00:00:00 2001 From: Vitalii Kovalchuk Date: Mon, 10 Jun 2019 12:27:12 +0300 Subject: [PATCH 4/4] fix bug in headers on ModuleNotFound handler --- lib/sailor.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/sailor.js b/lib/sailor.js index 829e2036..3504b67d 100644 --- a/lib/sailor.js +++ b/lib/sailor.js @@ -453,7 +453,8 @@ class Sailor { const headers = _.clone(outgoingMessageHeaders); console.error(err.stack); headers.end = new Date().getTime(); - self.amqpConnection.sendError(err, headers, message); + const props = createAmqpProperties(headers); + self.amqpConnection.sendError(err, props, message); self.amqpConnection.reject(message); }