Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
Acknowledge reply messages after reply has been sent to corresponding…
Browse files Browse the repository at this point in the history
… queue.
  • Loading branch information
nikostoulas committed Jun 23, 2016
1 parent c6fabe3 commit a3fdc5f
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,21 @@ Queue.prototype.onMessage = function(msg) {
var body = msg.content.toString();
var obj = JSON.parse(body);
var hasReply = msg.properties.replyTo;
var ack = function () {
if (!this.noAck) {
this.channel.ack(msg);
}
}.bind(this);

if (hasReply && !this.noAck) this.channel.ack(msg);
this.handler(obj, function(reply) {
if (hasReply) {
var replyBuffer = new Buffer(JSON.stringify(reply || ''));
this.channel.sendToQueue(msg.properties.replyTo, replyBuffer, {
correlationId: msg.properties.correlationId
});
}, ack);
}
else if (!this.noAck) {
this.channel.ack(msg);
else {
ack();
}
}.bind(this), msg);
} catch (error){
Expand Down

0 comments on commit a3fdc5f

Please sign in to comment.