Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle edge cases in retry queue #2074

Merged
merged 4 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/analytics-js-common/__mocks__/StoreManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import type { IStoreConfig, IStoreManager } from '../src/types/Store';
import { defaultPluginsManager } from './PluginsManager';
import { defaultCookieStorage, defaultInMemoryStorage, defaultLocalStorage } from './Storage';
import { defaultStore, Store } from './Store';

import { defaultLogger } from './Logger';
import { defaultErrorHandler } from './ErrorHandler';
// Mock all the methods of the StoreManager class

class StoreManager implements IStoreManager {
Expand All @@ -26,6 +27,8 @@ class StoreManager implements IStoreManager {
};
getStore = jest.fn(() => defaultStore);
initializeStorageState = jest.fn();
logger = defaultLogger;
errorHandler = defaultErrorHandler;
}

const defaultStoreManager = new StoreManager();
Expand Down
2 changes: 1 addition & 1 deletion packages/analytics-js-plugins/.size-limit.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export default [
{
name: 'Plugins - Legacy - CDN',
path: 'dist/cdn/legacy/plugins/rsa-plugins-*.min.js',
limit: '14.1 KiB',
limit: '14.5 KiB',
},
{
name: 'Plugins - Modern - CDN',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { QueueStatuses } from '@rudderstack/analytics-js-common/constants/QueueS
import { defaultStoreManager } from '@rudderstack/analytics-js-common/__mocks__/StoreManager';
import { defaultLocalStorage } from '@rudderstack/analytics-js-common/__mocks__/Storage';
import { Store } from '@rudderstack/analytics-js-common/__mocks__/Store';
import { defaultLogger } from '@rudderstack/analytics-js-common/__mocks__/Logger';
import { Schedule } from '../../../src/utilities/retryQueue/Schedule';
import { RetryQueue } from '../../../src/utilities/retryQueue/RetryQueue';
import type { QueueItem, QueueItemData } from '../../../src/types/plugins';
Expand Down Expand Up @@ -33,6 +34,8 @@ describe('Queue', () => {
},
jest.fn(),
defaultStoreManager,
undefined,
defaultLogger,
);
queue.schedule = schedule;
});
Expand Down Expand Up @@ -247,7 +250,7 @@ describe('Queue', () => {
it('should retry a task if it fails', () => {
queue.start();

// Fail the first time, Succeed the second time
// Fail the first time, succeed the last time
const mockProcessItemCb = jest
.fn()
.mockImplementationOnce((_, cb) => cb(new Error('no')))
Expand Down Expand Up @@ -298,6 +301,86 @@ describe('Queue', () => {
});
});

it('should retry queue item if process function throws an error', () => {
queue.start();

const mockProcessItemCb = jest
.fn()
.mockImplementationOnce(() => {
throw new Error('error 1');
})
.mockImplementationOnce(() => {
throw new Error('error 2');
})
.mockImplementationOnce(() => {
throw new Error('error 3');
});
queue.processQueueCb = mockProcessItemCb;
queue.maxAttempts = 2;

queue.addItem('a');

expect(mockProcessItemCb).toHaveBeenCalledTimes(1);
expect(mockProcessItemCb).toHaveBeenCalledWith('a', expect.any(Function), {
retryAttemptNumber: 0,
maxRetryAttempts: 2,
willBeRetried: true,
timeSinceFirstAttempt: expect.any(Number),
timeSinceLastAttempt: expect.any(Number),
reclaimed: false,
});
expect(defaultLogger.error).toHaveBeenCalledTimes(1);
expect(defaultLogger.error).toHaveBeenCalledWith(
'RetryQueue:: An unknown error occurred while processing the queue item. The item will be requeued.',
new Error('error 1'),
);

// Delay for the first retry
mockProcessItemCb.mockClear();
defaultLogger.error.mockClear();
jest.advanceTimersByTime(queue.getDelay(1));

expect(mockProcessItemCb).toHaveBeenCalledTimes(1);
expect(mockProcessItemCb).toHaveBeenCalledWith('a', expect.any(Function), {
retryAttemptNumber: 1,
maxRetryAttempts: 2,
willBeRetried: true,
timeSinceFirstAttempt: expect.any(Number),
timeSinceLastAttempt: expect.any(Number),
reclaimed: false,
});
expect(defaultLogger.error).toHaveBeenCalledTimes(1);
expect(defaultLogger.error).toHaveBeenCalledWith(
'RetryQueue:: An unknown error occurred while processing the queue item. The item will be requeued. Retry attempt 1 of 2.',
new Error('error 2'),
);

// Delay for the second retry
mockProcessItemCb.mockClear();
defaultLogger.error.mockClear();
jest.advanceTimersByTime(queue.getDelay(2));

expect(mockProcessItemCb).toHaveBeenCalledTimes(1);
expect(mockProcessItemCb).toHaveBeenCalledWith('a', expect.any(Function), {
retryAttemptNumber: 2,
maxRetryAttempts: 2,
willBeRetried: false, // because maxAttempts is 2
timeSinceFirstAttempt: expect.any(Number),
timeSinceLastAttempt: expect.any(Number),
reclaimed: false,
});
expect(defaultLogger.error).toHaveBeenCalledTimes(1);
expect(defaultLogger.error).toHaveBeenCalledWith(
'RetryQueue:: An unknown error occurred while processing the queue item. Retries exhausted (2). The item will be dropped.',
new Error('error 3'),
);

// No retries left as all attempts have been made
expect(queue.getStorageEntry('queue')).toEqual([]);
expect(queue.getStorageEntry('batchQueue')).toEqual([]);
expect(queue.getStorageEntry('inProgress')).toEqual({});
});

it('should delay retries', () => {
const mockProcessItemCb = jest.fn((_, cb) => cb());
queue.processQueueCb = mockProcessItemCb;
Expand Down Expand Up @@ -389,6 +472,19 @@ describe('Queue', () => {
expect(storedQueue[99].item).toEqual(104);
});

it('should respect maxItems configuration value 1', () => {
queue.maxItems = 1;

for (let i = 0; i < 105; i += 1) {
jest.advanceTimersByTime(1);
queue.addItem(i);
}

const storedQueue = queue.store.get(QueueStatuses.QUEUE);
expect(storedQueue.length).toEqual(1);
expect(storedQueue[0].item).toEqual(104);
});

it('should take over a queued task if a queue is abandoned', () => {
// a wild queue of interest appears
const foundQueue = new Store(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,6 @@ const sortByTime = (a: QueueItem, b: QueueItem) => a.time - b.time;

const RETRY_QUEUE = 'RetryQueue';

/**
* Constructs a RetryQueue backed by localStorage
*
* @constructor
* @param {String} name The name of the queue. Will be used to find abandoned queues and retry their items
* @param {Object} [opts] Optional argument to override `maxItems`, `maxAttempts`, `minRetryDelay, `maxRetryDelay`, `backoffFactor` and `backoffJitter`.
* @param {QueueProcessCallback} fn The function to call in order to process an item added to the queue
*/
class RetryQueue implements IQueue<QueueItemData> {
name: string;
id: string;
Expand All @@ -78,6 +70,17 @@ class RetryQueue implements IQueue<QueueItemData> {
reclaimEndVal?: Nullable<string>;
isPageAccessible: boolean;

/**
* Constructs a RetryQueue backed by localStorage
*
* @param {String} name The name of the queue. Will be used to find abandoned queues and retry their items
* @param {QueueOpts} [options] Optional argument to override `maxItems`, `maxAttempts`, `minRetryDelay, `maxRetryDelay`, `backoffFactor` and `backoffJitter`.
* @param {QueueProcessCallback} queueProcessCb The function to call in order to process an item added to the queue
* @param {IStoreManager} storeManager The store manager instance to use
* @param {StorageType} [storageType] The storage type to use. Defaults to LOCAL_STORAGE
* @param {ILogger} [logger] The logger to use
* @param {QueueBatchItemsSizeCalculatorCallback} [queueBatchItemsSizeCalculatorCb] The callback to use to calculate the size of items in the batch queue
*/
constructor(
name: string,
options: QueueOpts,
Expand Down Expand Up @@ -375,7 +378,12 @@ class RetryQueue implements IQueue<QueueItemData> {
let queue =
(this.getStorageEntry(QueueStatuses.QUEUE) as Nullable<QueueItem<QueueItemData>[]>) ?? [];

queue = queue.slice(-(this.maxItems - 1));
if (this.maxItems > 1) {
queue = queue.slice(-(this.maxItems - 1));
} else {
queue = [];
}

queue.push(curEntry);
queue = queue.sort(sortByTime);

Expand Down Expand Up @@ -419,9 +427,8 @@ class RetryQueue implements IQueue<QueueItemData> {
* Adds an item to the retry queue
*
* @param {Object} qItem The item to process
* @param {Error} [error] The error that occurred during processing
*/
requeue(qItem: QueueItem<QueueItemData>, error?: Error) {
requeue(qItem: QueueItem<QueueItemData>) {
const { attemptNumber, item, type, id, firstAttemptedAt, lastAttemptedAt, reclaimed } = qItem;
// Increment the attempt number as we're about to retry
const attemptNumberToUse = attemptNumber + 1;
Expand Down Expand Up @@ -504,7 +511,7 @@ class RetryQueue implements IQueue<QueueItemData> {
this.setStorageEntry(QueueStatuses.IN_PROGRESS, inProgress);

if (err) {
this.requeue({ ...el, firstAttemptedAt, lastAttemptedAt }, err);
this.requeue({ ...el, firstAttemptedAt, lastAttemptedAt });
}
};

Expand Down Expand Up @@ -596,7 +603,24 @@ class RetryQueue implements IQueue<QueueItemData> {
reclaimed,
});
} catch (err) {
this.logger?.error(RETRY_QUEUE_PROCESS_ERROR(RETRY_QUEUE), err);
let errMsg = '';
if (el.attemptNumber < this.maxAttempts) {
errMsg = 'The item will be requeued.';
if (el.attemptNumber > 0) {
errMsg = `${errMsg} Retry attempt ${el.attemptNumber} of ${this.maxAttempts}.`;
}

// requeue the item to be retried
el.done(err);
} else {
errMsg = `Retries exhausted (${this.maxAttempts}). The item will be dropped.`;

// drop the event as we're unable to process it
// after the max attempts are exhausted
el.done();
}

this.logger?.error(RETRY_QUEUE_PROCESS_ERROR(RETRY_QUEUE, errMsg), err);
}
});

Expand Down Expand Up @@ -717,7 +741,7 @@ class RetryQueue implements IQueue<QueueItemData> {
// if the queue is abandoned, all the in-progress are failed. retry them immediately and increment the attempt#
addConcatQueue(their.inProgress, 1);

our.queue = our.queue.sort(sortByTime);
our.queue.sort(sortByTime);

this.setStorageEntry(QueueStatuses.QUEUE, our.queue);

Expand All @@ -728,7 +752,6 @@ class RetryQueue implements IQueue<QueueItemData> {
this.processHead();
}

// eslint-disable-next-line class-methods-use-this
clearQueueEntries(other: IStore, localStorageBackoff: number) {
this.removeStorageEntry(other, 0, localStorageBackoff);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { LOG_CONTEXT_SEPARATOR } from '../../shared-chunks/common';

const RETRY_QUEUE_PROCESS_ERROR = (context: string): string =>
`${context}${LOG_CONTEXT_SEPARATOR}Process function threw an error.`;
const RETRY_QUEUE_PROCESS_ERROR = (context: string, errMsg: string): string =>
`${context}${LOG_CONTEXT_SEPARATOR}An unknown error occurred while processing the queue item. ${errMsg}`;

const RETRY_QUEUE_ENTRY_REMOVE_ERROR = (context: string, entry: string, attempt: number): string =>
`${context}${LOG_CONTEXT_SEPARATOR}Failed to remove local storage entry "${entry}" (attempt: ${attempt}.`;
Expand Down
4 changes: 2 additions & 2 deletions packages/analytics-js/.size-limit.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export default [
name: 'Core - Legacy - NPM (CJS)',
path: 'dist/npm/legacy/cjs/index.cjs',
import: '*',
limit: '48 KiB',
limit: '48.2 KiB',
},
{
name: 'Core - Legacy - NPM (UMD)',
Expand Down Expand Up @@ -59,7 +59,7 @@ export default [
name: 'Core (Bundled) - Legacy - NPM (CJS)',
path: 'dist/npm/legacy/bundled/cjs/index.cjs',
import: '*',
limit: '48 KiB',
limit: '48.2 KiB',
},
{
name: 'Core (Bundled) - Legacy - NPM (UMD)',
Expand Down
Loading