Skip to content

Commit

Permalink
feat: append more context to errors from the consumer (#559)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholasgriffintn authored Jan 28, 2025
1 parent 047e426 commit 7bc55fc
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 2 deletions.
12 changes: 12 additions & 0 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ export class Consumer extends TypedEventEmitter {
err,
`SQS receive message failed: ${err.message}`,
this.extendedAWSErrors,
this.queueUrl,
);
}
}
Expand Down Expand Up @@ -479,6 +480,8 @@ export class Consumer extends TypedEventEmitter {
err,
`Error changing visibility timeout: ${err.message}`,
this.extendedAWSErrors,
this.queueUrl,
message,
),
message,
);
Expand Down Expand Up @@ -514,6 +517,8 @@ export class Consumer extends TypedEventEmitter {
err,
`Error changing visibility timeout: ${err.message}`,
this.extendedAWSErrors,
this.queueUrl,
messages,
),
messages,
);
Expand Down Expand Up @@ -549,12 +554,14 @@ export class Consumer extends TypedEventEmitter {
throw toTimeoutError(
err,
`Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`,
message,
);
}
if (err instanceof Error) {
throw toStandardError(
err,
`Unexpected message handler failure: ${err.message}`,
message,
);
}
throw err;
Expand All @@ -581,6 +588,7 @@ export class Consumer extends TypedEventEmitter {
throw toStandardError(
err,
`Unexpected message handler failure: ${err.message}`,
messages,
);
}
throw err;
Expand Down Expand Up @@ -616,6 +624,8 @@ export class Consumer extends TypedEventEmitter {
err,
`SQS delete message failed: ${err.message}`,
this.extendedAWSErrors,
this.queueUrl,
message,
);
}
}
Expand Down Expand Up @@ -654,6 +664,8 @@ export class Consumer extends TypedEventEmitter {
err,
`SQS delete message failed: ${err.message}`,
this.extendedAWSErrors,
this.queueUrl,
messages,
);
}
}
Expand Down
45 changes: 43 additions & 2 deletions src/errors.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { Message } from "@aws-sdk/client-sqs";

import { AWSError } from "./types.js";

class SQSError extends Error {
Expand All @@ -9,6 +11,8 @@ class SQSError extends Error {
fault: AWSError["$fault"];
response?: AWSError["$response"];
metadata?: AWSError["$metadata"];
queueUrl?: string;
messageIds?: string[];

constructor(message: string) {
super(message);
Expand All @@ -17,24 +21,28 @@ class SQSError extends Error {
}

class TimeoutError extends Error {
messageIds: string[];
cause: Error;
time: Date;

constructor(message = "Operation timed out.") {
super(message);
this.message = message;
this.name = "TimeoutError";
this.messageIds = [];
}
}

class StandardError extends Error {
messageIds: string[];
cause: Error;
time: Date;

constructor(message = "An unexpected error occurred:") {
super(message);
this.message = message;
this.name = "StandardError";
this.messageIds = [];
}
}

Expand Down Expand Up @@ -64,6 +72,17 @@ function isConnectionError(err: Error): boolean {
return false;
}

/**
* Gets the message IDs from the message.
* @param message The message that was received from SQS.
*/
function getMessageIds(message: Message | Message[]): string[] {
if (Array.isArray(message)) {
return message.map((m) => m.MessageId);
}
return [message.MessageId];
}

/**
* Formats an AWSError the the SQSError type.
* @param err The error object that was received.
Expand All @@ -73,6 +92,8 @@ function toSQSError(
err: AWSError,
message: string,
extendedAWSErrors: boolean,
queueUrl?: string,
sqsMessage?: Message | Message[],
): SQSError {
const sqsError = new SQSError(message);
sqsError.code = err.name;
Expand All @@ -87,18 +108,32 @@ function toSQSError(
sqsError.metadata = err.$metadata;
}

if (queueUrl) {
sqsError.queueUrl = queueUrl;
}

if (sqsMessage) {
sqsError.messageIds = getMessageIds(sqsMessage);
}

return sqsError;
}

/**
* Formats an Error to the StandardError type.
* @param err The error object that was received.
* @param message The message to send with the error.
* @param sqsMessage The message that was received from SQS.
*/
function toStandardError(err: Error, message: string): StandardError {
function toStandardError(
err: Error,
message: string,
sqsMessage: Message | Message[],
): StandardError {
const error = new StandardError(message);
error.cause = err;
error.time = new Date();
error.messageIds = getMessageIds(sqsMessage);

return error;
}
Expand All @@ -107,11 +142,17 @@ function toStandardError(err: Error, message: string): StandardError {
* Formats an Error to the TimeoutError type.
* @param err The error object that was received.
* @param message The message to send with the error.
* @param sqsMessage The message that was received from SQS.
*/
function toTimeoutError(err: TimeoutError, message: string): TimeoutError {
function toTimeoutError(
err: TimeoutError,
message: string,
sqsMessage: Message | Message[],
): TimeoutError {
const error = new TimeoutError(message);
error.cause = err;
error.time = new Date();
error.messageIds = getMessageIds(sqsMessage);

return error;
}
Expand Down
153 changes: 153 additions & 0 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ describe("Consumer", () => {
"Unexpected message handler failure: Processing error",
);
assert.equal(message.MessageId, "123");
assert.deepEqual((err as any).messageIds, ["123"]);
});

it("fires an `error` event when an `SQSError` occurs processing a message", async () => {
Expand Down Expand Up @@ -1674,6 +1675,8 @@ describe("Consumer", () => {

assert.ok(err);
assert.equal(err.message, "Error changing visibility timeout: failed");
assert.equal(err.queueUrl, QUEUE_URL);
assert.deepEqual(err.messageIds, ["1"]);
});

it("emit error when changing visibility timeout fails for batch handler functions", async () => {
Expand Down Expand Up @@ -1706,6 +1709,156 @@ describe("Consumer", () => {

assert.ok(err);
assert.equal(err.message, "Error changing visibility timeout: failed");
assert.equal(err.queueUrl, QUEUE_URL);
assert.deepEqual(err.messageIds, ["1", "2"]);
});

it("includes messageIds in timeout errors", async () => {
const handleMessageTimeout = 500;
consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage: () =>
new Promise((resolve) => setTimeout(resolve, 1000)),
handleMessageTimeout,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
});

consumer.start();
const [err]: any = await Promise.all([
pEvent(consumer, "timeout_error"),
clock.tickAsync(handleMessageTimeout),
]);
consumer.stop();

assert.ok(err);
assert.equal(
err.message,
`Message handler timed out after ${handleMessageTimeout}ms: Operation timed out.`,
);
assert.deepEqual(err.messageIds, ["123"]);
});

it("includes messageIds in batch processing errors", async () => {
sqs.send.withArgs(mockReceiveMessage).resolves({
Messages: [
{ MessageId: "1", ReceiptHandle: "receipt-handle-1", Body: "body-1" },
{ MessageId: "2", ReceiptHandle: "receipt-handle-2", Body: "body-2" },
],
});

consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessageBatch: () => {
throw new Error("Batch processing error");
},
batchSize: 2,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
});

consumer.start();
const [err]: any = await Promise.all([
pEvent(consumer, "error"),
clock.tickAsync(100),
]);
consumer.stop();

assert.ok(err);
assert.equal(
err.message,
"Unexpected message handler failure: Batch processing error",
);
assert.deepEqual(err.messageIds, ["1", "2"]);
});

it("includes queueUrl and messageIds in SQS errors when deleting message", async () => {
const deleteErr = new Error("Delete error");
deleteErr.name = "SQSError";

handleMessage.resolves(null);
sqs.send.withArgs(mockDeleteMessage).rejects(deleteErr);

consumer.start();
const [err]: any = await Promise.all([
pEvent(consumer, "error"),
clock.tickAsync(100),
]);
consumer.stop();

assert.ok(err);
assert.equal(err.message, "SQS delete message failed: Delete error");
assert.equal(err.queueUrl, QUEUE_URL);
assert.deepEqual(err.messageIds, ["123"]);
});

it("includes queueUrl and messageIds in SQS errors when changing visibility timeout", async () => {
sqs.send.withArgs(mockReceiveMessage).resolves({
Messages: [
{ MessageId: "1", ReceiptHandle: "receipt-handle-1", Body: "body-1" },
],
});
consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage: () =>
new Promise((resolve) => setTimeout(resolve, 75000)),
sqs,
visibilityTimeout: 40,
heartbeatInterval: 30,
});

const receiveErr = new MockSQSError("failed");
sqs.send.withArgs(mockChangeMessageVisibility).rejects(receiveErr);

consumer.start();
const [err]: any = await Promise.all([
pEvent(consumer, "error"),
clock.tickAsync(75000),
]);
consumer.stop();

assert.ok(err);
assert.equal(err.message, "Error changing visibility timeout: failed");
assert.equal(err.queueUrl, QUEUE_URL);
assert.deepEqual(err.messageIds, ["1"]);
});

it("includes queueUrl and messageIds in batch SQS errors", async () => {
sqs.send.withArgs(mockReceiveMessage).resolves({
Messages: [
{ MessageId: "1", ReceiptHandle: "receipt-handle-1", Body: "body-1" },
{ MessageId: "2", ReceiptHandle: "receipt-handle-2", Body: "body-2" },
],
});

consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessageBatch: () =>
new Promise((resolve) => setTimeout(resolve, 75000)),
sqs,
batchSize: 2,
visibilityTimeout: 40,
heartbeatInterval: 30,
});

const receiveErr = new MockSQSError("failed");
sqs.send.withArgs(mockChangeMessageVisibilityBatch).rejects(receiveErr);

consumer.start();
const [err]: any = await Promise.all([
pEvent(consumer, "error"),
clock.tickAsync(75000),
]);
consumer.stop();

assert.ok(err);
assert.equal(err.message, "Error changing visibility timeout: failed");
assert.equal(err.queueUrl, QUEUE_URL);
assert.deepEqual(err.messageIds, ["1", "2"]);
});
});

Expand Down

0 comments on commit 7bc55fc

Please sign in to comment.