diff --git a/src/config/feature-flags.ts b/src/config/feature-flags.ts index aa7d0a574..73070ef43 100644 --- a/src/config/feature-flags.ts +++ b/src/config/feature-flags.ts @@ -24,7 +24,8 @@ export enum BooleanFlags { ENABLE_5KU_BACKFILL_PAGE = "enable-5ku-experience-backfill-page", USE_DYNAMODB_TO_PERSIST_AUDIT_LOG = "use-dynamodb-to-persist-audit-log", USE_CUSTOM_ROOT_CA_BUNDLE = "use-custom-root-ca-bundle", - GENERATE_CORE_HEAP_DUMPS_ON_LOW_MEM = "generate-core-heap-dumps-on-low-mem" + GENERATE_CORE_HEAP_DUMPS_ON_LOW_MEM = "generate-core-heap-dumps-on-low-mem", + USE_RATELIMIT_ON_JIRA_CLIENT = "use-ratelimit-on-jira-client" } export enum StringFlags { diff --git a/src/config/metric-names.ts b/src/config/metric-names.ts index 40b8b3382..8177fa527 100644 --- a/src/config/metric-names.ts +++ b/src/config/metric-names.ts @@ -16,6 +16,7 @@ export const sqsQueueMetrics = { received: `${server}.sqs.queue.received`, completed: `${server}.sqs.queue.success`, failed: `${server}.sqs.queue.failed`, + exception: `${server}.sqs.queue.exception`, sent: `${server}.sqs.queue.sent`, deleted: `${server}.sqs.queue.deleted`, duration: `${server}.sqs.queue.duration` diff --git a/src/jira/client/axios.test.ts b/src/jira/client/axios.test.ts index 42fbd1ae4..689155185 100644 --- a/src/jira/client/axios.test.ts +++ b/src/jira/client/axios.test.ts @@ -120,4 +120,21 @@ describe("Jira axios instance", () => { expect(error?.message).toEqual("Error executing Axios Request HTTP 404 - Bad REST path, or Jira instance not found, renamed or temporarily suspended."); }); + describe("when having a rate limited", () => { + it("should extract the retry after header if present", async () => { + const requestPayload = "TestRequestPayload"; + jiraNock.post("/foo/bar", requestPayload) + .reply(429, "", { + "Retry-After": "100" + }); + + await expect(getAxiosInstance(jiraHost, "secret", getLogger("test")).post("/foo/bar", requestPayload)) + .rejects.toMatchObject({ + status: 429, + retryAfterInSeconds: 100 + }); + + }); + }); + }); diff --git a/src/jira/client/axios.ts b/src/jira/client/axios.ts index e56455cd1..9dfd8a0c0 100644 --- a/src/jira/client/axios.ts +++ b/src/jira/client/axios.ts @@ -18,13 +18,30 @@ export class JiraClientError extends Error { status?: number; cause: AxiosError; - constructor(message: string, cause: AxiosError, status?: number) { + constructor( + message: string, + cause: AxiosError, + status: number | undefined + ) { super(message); this.status = status; this.cause = cause; } } +export class JiraClientRateLimitingError extends JiraClientError { + retryAfterInSeconds: number | undefined; + constructor( + message: string, + cause: AxiosError, + status: number | undefined, + retryAfterInSeconds: number | undefined + ) { + super(message, cause, status); + this.retryAfterInSeconds = retryAfterInSeconds; + } +} + export const getJiraErrorMessages = (status: number) => { switch (status) { case 400: @@ -86,9 +103,23 @@ const getErrorMiddleware = (logger: Logger) => logger.error({ err: error, res: error?.response }, errorMessage); } + if (error.response?.status === 429) { + return Promise.reject(new JiraClientRateLimitingError(errorMessage, error, status, getRetryAfterInSec(error))); + } + return Promise.reject(new JiraClientError(errorMessage, error, status)); }; +const getRetryAfterInSec = (error: AxiosError): number | undefined => { + + const retryAfterInSecondsStr = error.response?.headers["retry-after"]; + const retryAfterInSeconds = parseInt(retryAfterInSecondsStr || "unknown"); + + if (isNaN(retryAfterInSeconds)) return undefined; + + return retryAfterInSeconds; +}; + /** * Middleware to enhance successful requests in Jira. * diff --git a/src/sqs/backfill-error-handler.test.ts b/src/sqs/backfill-error-handler.test.ts index ea5049003..9e85302c7 100644 --- a/src/sqs/backfill-error-handler.test.ts +++ b/src/sqs/backfill-error-handler.test.ts @@ -125,7 +125,7 @@ describe("backfillErrorHandler", () => { status: 403 }; const result = await backfillErrorHandler(jest.fn())(abuseDetectionError, createContext(2, false)); - expect(result).toEqual({ + expect(result).toMatchObject({ isFailure: true, retryDelaySec: 540, retryable: true @@ -150,7 +150,7 @@ describe("backfillErrorHandler", () => { } const result = await backfillErrorHandler(jest.fn())(new TaskError(task, sequelizeConnectionError), createContext(2, false)); - expect(result).toEqual({ + expect(result).toMatchObject({ isFailure: true, retryDelaySec: 540, retryable: true @@ -162,7 +162,7 @@ describe("backfillErrorHandler", () => { new TaskError(task, (await create500FromGitHub())!), createContext(2, false) ); - expect(result).toEqual({ + expect(result).toMatchObject({ isFailure: true, retryDelaySec: 540, retryable: true @@ -174,7 +174,7 @@ describe("backfillErrorHandler", () => { new TaskError(task, (await create500FromJira())!), createContext(2, false) ); - expect(result).toEqual({ + expect(result).toMatchObject({ isFailure: true, retryDelaySec: 540, retryable: true @@ -187,7 +187,7 @@ describe("backfillErrorHandler", () => { createContext(5, true) ); - expect(result).toEqual({ + expect(result).toMatchObject({ isFailure: false }); const mockMessage = sendMessageMock.mock.calls[0] as any[]; @@ -204,7 +204,7 @@ describe("backfillErrorHandler", () => { createContext(5, true) ); - expect(result).toEqual({ + expect(result).toMatchObject({ isFailure: false }); const mockMessage = sendMessageMock.mock.calls[0] as any[]; @@ -254,7 +254,7 @@ describe("backfillErrorHandler", () => { createContext(3, false) ); - expect(result).toEqual({ + expect(result).toMatchObject({ isFailure: false }); const mockMessage = sendMessageMock.mock.calls[0] as any[]; diff --git a/src/sqs/backfill-error-handler.ts b/src/sqs/backfill-error-handler.ts index 69f3bae94..7ee01e0f5 100644 --- a/src/sqs/backfill-error-handler.ts +++ b/src/sqs/backfill-error-handler.ts @@ -14,7 +14,7 @@ import { import { booleanFlag, BooleanFlags } from "config/feature-flags"; const handleTaskError = async (sendSQSBackfillMessage: (message, delaySec, logger) => Promise, task: Task, cause: Error, context: SQSMessageContext, rootLogger: Logger -) => { +): Promise => { const log = rootLogger.child({ task, receiveCount: context.receiveCount, @@ -32,7 +32,10 @@ const handleTaskError = async (sendSQSBackfillMessage: (message, delaySec, logge : log.warn("InvalidPermissionError: marking the task as failed and continue with the next one"); await markCurrentTaskAsFailedAndContinue(context.payload, task, true, sendSQSBackfillMessage, log, cause); return { - isFailure: false + isFailure: false, + statusCode: cause.status, + source: "github", + errorName: cause.constructor.name }; } @@ -50,7 +53,10 @@ const handleTaskError = async (sendSQSBackfillMessage: (message, delaySec, logge await sendSQSBackfillMessage(context.payload, 0, log); } return { - isFailure: false + isFailure: false, + statusCode: cause.status, + source: "github", + errorName: cause.constructor.name }; } @@ -59,7 +65,10 @@ const handleTaskError = async (sendSQSBackfillMessage: (message, delaySec, logge : log.info("Repo was deleted, marking the task as completed"); await updateTaskStatusAndContinue(context.payload, { edges: [] }, task, log, sendSQSBackfillMessage); return { - isFailure: false + isFailure: false, + statusCode: cause.status, + source: "github", + errorName: cause.constructor.name }; } @@ -69,7 +78,10 @@ const handleTaskError = async (sendSQSBackfillMessage: (message, delaySec, logge : log.warn("That was the last attempt: marking the task as failed and continue with the next one"); await markCurrentTaskAsFailedAndContinue(context.payload, task, false, sendSQSBackfillMessage, log, cause); return { - isFailure: false + isFailure: false, + statusCode: parseInt(String(cause["status"])), + source: "github", + errorName: cause.constructor.name }; } diff --git a/src/sqs/error-handlers.test.ts b/src/sqs/error-handlers.test.ts index 0957e24dd..d2916e17f 100644 --- a/src/sqs/error-handlers.test.ts +++ b/src/sqs/error-handlers.test.ts @@ -1,11 +1,15 @@ import { statsd } from "config/statsd"; import { jiraAndGitHubErrorsHandler, webhookMetricWrapper } from "./error-handlers"; import { getLogger } from "config/logger"; -import { JiraClientError } from "../jira/client/axios"; +import { JiraClientError, JiraClientRateLimitingError } from "../jira/client/axios"; import { Octokit } from "@octokit/rest"; import { GithubClientRateLimitingError } from "../github/client/github-client-errors"; import { AxiosError, AxiosResponse, AxiosResponseHeaders } from "axios"; import { ErrorHandlingResult, SQSMessageContext, BaseMessagePayload } from "~/src/sqs/sqs.types"; +import { booleanFlag, BooleanFlags } from "config/feature-flags"; +import { when } from "jest-when"; + +jest.mock("config/feature-flags"); describe("error-handlers", () => { @@ -117,6 +121,66 @@ describe("error-handlers", () => { expect(result.isFailure).toBe(true); }); + it("Retryable with proper delay on Rate Limiting - when ff is on", async () => { + + when(booleanFlag).calledWith(BooleanFlags.USE_RATELIMIT_ON_JIRA_CLIENT, expect.anything()) + .mockResolvedValue(true); + + const headers: AxiosResponseHeaders = { "retry-after": "1000" }; + const mockedResponse = { status: 403, headers: headers } as AxiosResponse; + + const result = await jiraAndGitHubErrorsHandler( + new JiraClientRateLimitingError("test rate limit error", { + response: mockedResponse + } as AxiosError, 429, 1000), + createContext(1, false) + ); + + expect(result.retryable).toBe(true); + expect(result.retryDelaySec).toBe(1000); + expect(result.isFailure).toBe(true); + }); + + it("Retryable with origin delay on Rate Limiting but have undefined retry after header - when ff is on", async () => { + + when(booleanFlag).calledWith(BooleanFlags.USE_RATELIMIT_ON_JIRA_CLIENT, expect.anything()) + .mockResolvedValue(true); + + const headers: AxiosResponseHeaders = { }; + const mockedResponse = { status: 403, headers: headers } as AxiosResponse; + + const result = await jiraAndGitHubErrorsHandler( + new JiraClientRateLimitingError("test rate limit error", { + response: mockedResponse + } as AxiosError, 429, undefined), + createContext(1, false) + ); + + expect(result.retryable).toBe(true); + expect(result.retryDelaySec).toBe(180); + expect(result.isFailure).toBe(true); + }); + + it("Retryable with origin delay on Rate Limiting - when ff is off", async () => { + + when(booleanFlag).calledWith(BooleanFlags.USE_RATELIMIT_ON_JIRA_CLIENT, expect.anything()) + .mockResolvedValue(false); + + const headers: AxiosResponseHeaders = { "retry-after": "1000" }; + const mockedResponse = { status: 403, headers: headers } as AxiosResponse; + + const result = await jiraAndGitHubErrorsHandler( + new JiraClientRateLimitingError("test rate limit error", { + response: mockedResponse + } as AxiosError, 429, 1000), + createContext(1, false) + ); + + expect(result.retryable).toBe(true); + expect(result.retryDelaySec).toBe(180); + expect(result.isFailure).toBe(true); + }); + it("Retryable with proper delay on Rate Limiting - receiveCount 3", async () => { const headers: AxiosResponseHeaders = { "x-ratelimit-reset": `${Math.floor(new Date("2020-01-01").getTime() / 1000) + 100}` }; const mockedResponse = { status: 403, headers: headers } as AxiosResponse; diff --git a/src/sqs/error-handlers.ts b/src/sqs/error-handlers.ts index 5a1c71c10..6e001911d 100644 --- a/src/sqs/error-handlers.ts +++ b/src/sqs/error-handlers.ts @@ -1,7 +1,8 @@ -import { JiraClientError } from "../jira/client/axios"; +import { JiraClientError, JiraClientRateLimitingError } from "../jira/client/axios"; import { emitWebhookFailedMetrics } from "utils/webhook-utils"; import { ErrorHandler, ErrorHandlingResult, SQSMessageContext, BaseMessagePayload } from "./sqs.types"; -import { GithubClientRateLimitingError } from "../github/client/github-client-errors"; +import { GithubClientError, GithubClientRateLimitingError } from "../github/client/github-client-errors"; +import { booleanFlag, BooleanFlags } from "config/feature-flags"; /** * Sometimes we can get errors from Jira and GitHub which does not indicate a failured webhook. For example: @@ -25,7 +26,14 @@ export const handleUnknownError: ErrorHandler = => { const delaySec = EXPONENTIAL_BACKOFF_BASE_SEC * Math.pow(EXPONENTIAL_BACKOFF_MULTIPLIER, context.receiveCount); context.log.warn({ err, delaySec }, "Unknown error: retrying with exponential backoff"); - return Promise.resolve({ retryable: true, retryDelaySec: delaySec, isFailure: true }); + return Promise.resolve({ + retryable: true, + retryDelaySec: delaySec, + isFailure: true, + statusCode: parseInt(String(err["status"])), + source: err instanceof GithubClientError ? "github" : (err instanceof JiraClientError ? "jira" : "other"), + errorName: err.constructor.name + }); }; export const jiraAndGitHubErrorsHandler: ErrorHandler = async (error: Error, @@ -34,7 +42,7 @@ export const jiraAndGitHubErrorsHandler: ErrorHandler = asyn context.log.warn({ err: error }, "Handling Jira or GitHub error"); const maybeResult = maybeHandleNonFailureCase(error, context) - || maybeHandleRateLimitingError(error, context) + || await maybeHandleRateLimitingError(error, context) || maybeHandleNonRetryableResponseCode(error, context); if (maybeResult) { @@ -67,7 +75,7 @@ const maybeHandleNonFailureCase = (er error.status && UNRETRYABLE_STATUS_CODES.includes(error.status)) { context.log.warn(`Received ${error.status} from Jira. Unretryable. Discarding the message`); - return { retryable: false, isFailure: false }; + return { retryable: false, isFailure: false, statusCode: error.status, source: "jira", errorName: error.constructor.name }; } return undefined; @@ -81,14 +89,19 @@ const maybeHandleNonRetryableResponseCode = (error: Error, context: SQSMessageContext): ErrorHandlingResult | undefined => { +const maybeHandleRateLimitingError = async (error: Error, context: SQSMessageContext): Promise => { if (error instanceof GithubClientRateLimitingError) { - context.log.warn({ error }, `Rate limiting error, retrying`); // A stepped buffer to prioritize messages with a higher received count to get replayed slightly sooner than newer messages // e.g. a receiveCount 5 message will be visible 50 seconds sooner than a receiveCount 1 message const buffer = Math.max(RATE_LIMITING_BUFFER_STEP, BASE_RATE_LIMITING_DELAY_BUFFER_SEC - context.receiveCount * RATE_LIMITING_BUFFER_STEP); @@ -97,7 +110,15 @@ const maybeHandleRateLimitingError = // GitHub Rate limit resets hourly, in the scenario of burst traffic it continues to overwhelm the rate limit // this attempts to ease the load across multiple refreshes const retryDelaySec = rateLimitReset + ONE_HOUR_IN_SECONDS * (context.receiveCount - 1); - return { retryable: true, retryDelaySec, isFailure: true }; + context.log.warn({ error, source: "github", retryDelaySec }, `Rate limiting error, retrying`); + return { retryable: true, retryDelaySec, isFailure: true, source: "github", statusCode: error.status, errorName: error.constructor.name }; + } + + if (await booleanFlag(BooleanFlags.USE_RATELIMIT_ON_JIRA_CLIENT, context.payload.jiraHost)) { + if (error instanceof JiraClientRateLimitingError && error.retryAfterInSeconds !== undefined) { + context.log.warn({ error, source: "jira", retryDelaySec: error.retryAfterInSeconds }, `Rate limiting error, retrying`); + return { retryable: true, retryDelaySec: error.retryAfterInSeconds, isFailure: true, source: "jira", statusCode: error.status, errorName: error.constructor.name }; + } } return undefined; diff --git a/src/sqs/sqs.ts b/src/sqs/sqs.ts index 779727817..be15a483b 100644 --- a/src/sqs/sqs.ts +++ b/src/sqs/sqs.ts @@ -359,6 +359,7 @@ export class SqsQueue { context.log.warn({ err }, "Failed message"); // eslint-disable-next-line @typescript-eslint/no-unsafe-argument const errorHandlingResult = await this.errorHandler(err as Error, context); + let visibilityChangedSec: number | undefined; this.log.info({ errorHandlingResult }, "Error handling result"); if (errorHandlingResult.isFailure) { @@ -379,19 +380,37 @@ export class SqsQueue { await this.deleteMessage(context); } else { context.log.warn({ err }, "SQS message visibility timeout changed"); - await this.changeVisibilityTimeoutIfNeeded(errorHandlingResult, context.message, context.log); + visibilityChangedSec = await this.changeVisibilityTimeoutIfNeeded(errorHandlingResult, context.message, context.log); } + + statsd.increment(sqsQueueMetrics.exception, { + ...this.metricsTags, + statusCode: String(errorHandlingResult.statusCode), + source: String(errorHandlingResult.source), + errorName: String(errorHandlingResult.errorName), + isFailure: String(errorHandlingResult.isFailure), + retryable: String(errorHandlingResult.retryable), + retryDelayInMin: String(errorHandlingResult.retryDelaySec && Math.floor(errorHandlingResult.retryDelaySec / 60)), + msgVisibilityChangedInMin: String(visibilityChangedSec && Math.floor(visibilityChangedSec / 60)), + skipDlq: String(errorHandlingResult.skipDlq), + receiveCount: String(context.receiveCount), + maxAttempts: String(this.maxAttempts), + remainingBeforeDlq: String(this.maxAttempts - context.receiveCount), + reachedRetryLimit: String(this.isMessageReachedRetryLimit(context)) + }, { jiraHost: context.payload.jiraHost }); + } catch (errorHandlingException: unknown) { context.log.error({ err: errorHandlingException, originalError: err }, "Error while performing error handling on SQS message"); } } - private async changeVisibilityTimeoutIfNeeded(errorHandlingResult: ErrorHandlingResult, message: Message, log: Logger) { + private async changeVisibilityTimeoutIfNeeded(errorHandlingResult: ErrorHandlingResult, message: Message, log: Logger): Promise { const retryDelaySec = errorHandlingResult.retryDelaySec; if (retryDelaySec !== undefined /*zero seconds delay is also supported*/) { log.info(`Delaying the retry for ${retryDelaySec} seconds`); - await this.changeVisibilityTimeout(message, retryDelaySec, log); + return await this.changeVisibilityTimeout(message, retryDelaySec, log); } + return undefined; } private isMessageReachedRetryLimit(context: SQSMessageContext) { @@ -399,15 +418,15 @@ export class SqsQueue { return context.receiveCount >= this.maxAttempts; } - public async changeVisibilityTimeout(message: Message, timeoutSec: number, logger: Logger): Promise { + public async changeVisibilityTimeout(message: Message, timeoutSec: number, logger: Logger): Promise { if (!message.ReceiptHandle) { logger.error(`No ReceiptHandle in message with ID = ${message.MessageId ?? ""}`); - return; + return undefined; } if (timeoutSec < 0) { logger.error(`Timeout needs to be a positive number.`); - return; + return undefined; } if (timeoutSec >= MAX_MESSAGE_VISIBILITY_TIMEOUT_SEC) { @@ -415,15 +434,18 @@ export class SqsQueue { timeoutSec = MAX_MESSAGE_VISIBILITY_TIMEOUT_SEC; } + const finalRoundedSec = Math.round(timeoutSec); const params: ChangeMessageVisibilityRequest = { QueueUrl: this.queueUrl, ReceiptHandle: message.ReceiptHandle, - VisibilityTimeout: Math.round(timeoutSec) + VisibilityTimeout: finalRoundedSec }; try { await this.sqs.changeMessageVisibility(params).promise(); + return finalRoundedSec; } catch (err: unknown) { logger.error("Message visibility timeout change failed"); + return undefined; } } diff --git a/src/sqs/sqs.types.ts b/src/sqs/sqs.types.ts index 46d9b9f23..f90781e56 100644 --- a/src/sqs/sqs.types.ts +++ b/src/sqs/sqs.types.ts @@ -95,6 +95,24 @@ export interface ErrorHandlingResult { */ skipDlq?: boolean; + /** + * the http status of error + * For metrics only, NOT used for logic handling. + */ + statusCode?: number; + + /** + * the source of error + * For metrics only, NOT used for logic handling. + */ + source?: "github" | "jira" | "other"; + + /** + * name of error + * For metrics only, NOT used for logic handling. + */ + errorName?: string; + } export interface SQSContext {