Skip to content

Commit

Permalink
fix: handle edge cases in retry queue (#2074)
Browse files Browse the repository at this point in the history
* fix: handle when max items is 1

* fix: handle queue item on processing error

* chore: add documentation and fix lint issues

* test: add more test cases for improving coverage
  • Loading branch information
saikumarrs authored Mar 3, 2025
1 parent bf72904 commit f9263b2
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 22 deletions.
5 changes: 4 additions & 1 deletion packages/analytics-js-common/__mocks__/StoreManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import type { IStoreConfig, IStoreManager } from '../src/types/Store';
import { defaultPluginsManager } from './PluginsManager';
import { defaultCookieStorage, defaultInMemoryStorage, defaultLocalStorage } from './Storage';
import { defaultStore, Store } from './Store';

import { defaultLogger } from './Logger';
import { defaultErrorHandler } from './ErrorHandler';
// Mock all the methods of the StoreManager class

class StoreManager implements IStoreManager {
Expand All @@ -26,6 +27,8 @@ class StoreManager implements IStoreManager {
};
getStore = jest.fn(() => defaultStore);
initializeStorageState = jest.fn();
logger = defaultLogger;
errorHandler = defaultErrorHandler;
}

const defaultStoreManager = new StoreManager();
Expand Down
2 changes: 1 addition & 1 deletion packages/analytics-js-plugins/.size-limit.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export default [
{
name: 'Plugins - Legacy - CDN',
path: 'dist/cdn/legacy/plugins/rsa-plugins-*.min.js',
limit: '14.1 KiB',
limit: '14.5 KiB',
},
{
name: 'Plugins - Modern - CDN',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { QueueStatuses } from '@rudderstack/analytics-js-common/constants/QueueS
import { defaultStoreManager } from '@rudderstack/analytics-js-common/__mocks__/StoreManager';
import { defaultLocalStorage } from '@rudderstack/analytics-js-common/__mocks__/Storage';
import { Store } from '@rudderstack/analytics-js-common/__mocks__/Store';
import { defaultLogger } from '@rudderstack/analytics-js-common/__mocks__/Logger';
import { Schedule } from '../../../src/utilities/retryQueue/Schedule';
import { RetryQueue } from '../../../src/utilities/retryQueue/RetryQueue';
import type { QueueItem, QueueItemData } from '../../../src/types/plugins';
Expand Down Expand Up @@ -33,6 +34,8 @@ describe('Queue', () => {
},
jest.fn(),
defaultStoreManager,
undefined,
defaultLogger,
);
queue.schedule = schedule;
});
Expand Down Expand Up @@ -247,7 +250,7 @@ describe('Queue', () => {
it('should retry a task if it fails', () => {
queue.start();

// Fail the first time, Succeed the second time
// Fail the first time, succeed the last time
const mockProcessItemCb = jest
.fn()
.mockImplementationOnce((_, cb) => cb(new Error('no')))
Expand Down Expand Up @@ -298,6 +301,86 @@ describe('Queue', () => {
});
});

it('should retry queue item if process function throws an error', () => {
queue.start();

const mockProcessItemCb = jest
.fn()
.mockImplementationOnce(() => {
throw new Error('error 1');
})
.mockImplementationOnce(() => {
throw new Error('error 2');
})
.mockImplementationOnce(() => {
throw new Error('error 3');
});
queue.processQueueCb = mockProcessItemCb;
queue.maxAttempts = 2;

queue.addItem('a');

expect(mockProcessItemCb).toHaveBeenCalledTimes(1);
expect(mockProcessItemCb).toHaveBeenCalledWith('a', expect.any(Function), {
retryAttemptNumber: 0,
maxRetryAttempts: 2,
willBeRetried: true,
timeSinceFirstAttempt: expect.any(Number),
timeSinceLastAttempt: expect.any(Number),
reclaimed: false,
});
expect(defaultLogger.error).toHaveBeenCalledTimes(1);
expect(defaultLogger.error).toHaveBeenCalledWith(
'RetryQueue:: An unknown error occurred while processing the queue item. The item will be requeued.',
new Error('error 1'),
);

// Delay for the first retry
mockProcessItemCb.mockClear();
defaultLogger.error.mockClear();
jest.advanceTimersByTime(queue.getDelay(1));

expect(mockProcessItemCb).toHaveBeenCalledTimes(1);
expect(mockProcessItemCb).toHaveBeenCalledWith('a', expect.any(Function), {
retryAttemptNumber: 1,
maxRetryAttempts: 2,
willBeRetried: true,
timeSinceFirstAttempt: expect.any(Number),
timeSinceLastAttempt: expect.any(Number),
reclaimed: false,
});
expect(defaultLogger.error).toHaveBeenCalledTimes(1);
expect(defaultLogger.error).toHaveBeenCalledWith(
'RetryQueue:: An unknown error occurred while processing the queue item. The item will be requeued. Retry attempt 1 of 2.',
new Error('error 2'),
);

// Delay for the second retry
mockProcessItemCb.mockClear();
defaultLogger.error.mockClear();
jest.advanceTimersByTime(queue.getDelay(2));

expect(mockProcessItemCb).toHaveBeenCalledTimes(1);
expect(mockProcessItemCb).toHaveBeenCalledWith('a', expect.any(Function), {
retryAttemptNumber: 2,
maxRetryAttempts: 2,
willBeRetried: false, // because maxAttempts is 2
timeSinceFirstAttempt: expect.any(Number),
timeSinceLastAttempt: expect.any(Number),
reclaimed: false,
});
expect(defaultLogger.error).toHaveBeenCalledTimes(1);
expect(defaultLogger.error).toHaveBeenCalledWith(
'RetryQueue:: An unknown error occurred while processing the queue item. Retries exhausted (2). The item will be dropped.',
new Error('error 3'),
);

// No retries left as all attempts have been made
expect(queue.getStorageEntry('queue')).toEqual([]);
expect(queue.getStorageEntry('batchQueue')).toEqual([]);
expect(queue.getStorageEntry('inProgress')).toEqual({});
});

it('should delay retries', () => {
const mockProcessItemCb = jest.fn((_, cb) => cb());
queue.processQueueCb = mockProcessItemCb;
Expand Down Expand Up @@ -389,6 +472,19 @@ describe('Queue', () => {
expect(storedQueue[99].item).toEqual(104);
});

it('should respect maxItems configuration value 1', () => {
queue.maxItems = 1;

for (let i = 0; i < 105; i += 1) {
jest.advanceTimersByTime(1);
queue.addItem(i);
}

const storedQueue = queue.store.get(QueueStatuses.QUEUE);
expect(storedQueue.length).toEqual(1);
expect(storedQueue[0].item).toEqual(104);
});

it('should take over a queued task if a queue is abandoned', () => {
// a wild queue of interest appears
const foundQueue = new Store(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,6 @@ const sortByTime = (a: QueueItem, b: QueueItem) => a.time - b.time;

const RETRY_QUEUE = 'RetryQueue';

/**
* Constructs a RetryQueue backed by localStorage
*
* @constructor
* @param {String} name The name of the queue. Will be used to find abandoned queues and retry their items
* @param {Object} [opts] Optional argument to override `maxItems`, `maxAttempts`, `minRetryDelay, `maxRetryDelay`, `backoffFactor` and `backoffJitter`.
* @param {QueueProcessCallback} fn The function to call in order to process an item added to the queue
*/
class RetryQueue implements IQueue<QueueItemData> {
name: string;
id: string;
Expand All @@ -78,6 +70,17 @@ class RetryQueue implements IQueue<QueueItemData> {
reclaimEndVal?: Nullable<string>;
isPageAccessible: boolean;

/**
* Constructs a RetryQueue backed by localStorage
*
* @param {String} name The name of the queue. Will be used to find abandoned queues and retry their items
* @param {QueueOpts} [options] Optional argument to override `maxItems`, `maxAttempts`, `minRetryDelay, `maxRetryDelay`, `backoffFactor` and `backoffJitter`.
* @param {QueueProcessCallback} queueProcessCb The function to call in order to process an item added to the queue
* @param {IStoreManager} storeManager The store manager instance to use
* @param {StorageType} [storageType] The storage type to use. Defaults to LOCAL_STORAGE
* @param {ILogger} [logger] The logger to use
* @param {QueueBatchItemsSizeCalculatorCallback} [queueBatchItemsSizeCalculatorCb] The callback to use to calculate the size of items in the batch queue
*/
constructor(
name: string,
options: QueueOpts,
Expand Down Expand Up @@ -375,7 +378,12 @@ class RetryQueue implements IQueue<QueueItemData> {
let queue =
(this.getStorageEntry(QueueStatuses.QUEUE) as Nullable<QueueItem<QueueItemData>[]>) ?? [];

queue = queue.slice(-(this.maxItems - 1));
if (this.maxItems > 1) {
queue = queue.slice(-(this.maxItems - 1));
} else {
queue = [];
}

queue.push(curEntry);
queue = queue.sort(sortByTime);

Expand Down Expand Up @@ -419,9 +427,8 @@ class RetryQueue implements IQueue<QueueItemData> {
* Adds an item to the retry queue
*
* @param {Object} qItem The item to process
* @param {Error} [error] The error that occurred during processing
*/
requeue(qItem: QueueItem<QueueItemData>, error?: Error) {
requeue(qItem: QueueItem<QueueItemData>) {
const { attemptNumber, item, type, id, firstAttemptedAt, lastAttemptedAt, reclaimed } = qItem;
// Increment the attempt number as we're about to retry
const attemptNumberToUse = attemptNumber + 1;
Expand Down Expand Up @@ -504,7 +511,7 @@ class RetryQueue implements IQueue<QueueItemData> {
this.setStorageEntry(QueueStatuses.IN_PROGRESS, inProgress);

if (err) {
this.requeue({ ...el, firstAttemptedAt, lastAttemptedAt }, err);
this.requeue({ ...el, firstAttemptedAt, lastAttemptedAt });
}
};

Expand Down Expand Up @@ -596,7 +603,24 @@ class RetryQueue implements IQueue<QueueItemData> {
reclaimed,
});
} catch (err) {
this.logger?.error(RETRY_QUEUE_PROCESS_ERROR(RETRY_QUEUE), err);
let errMsg = '';
if (el.attemptNumber < this.maxAttempts) {
errMsg = 'The item will be requeued.';
if (el.attemptNumber > 0) {
errMsg = `${errMsg} Retry attempt ${el.attemptNumber} of ${this.maxAttempts}.`;
}

// requeue the item to be retried
el.done(err);
} else {
errMsg = `Retries exhausted (${this.maxAttempts}). The item will be dropped.`;

// drop the event as we're unable to process it
// after the max attempts are exhausted
el.done();
}

this.logger?.error(RETRY_QUEUE_PROCESS_ERROR(RETRY_QUEUE, errMsg), err);
}
});

Expand Down Expand Up @@ -717,7 +741,7 @@ class RetryQueue implements IQueue<QueueItemData> {
// if the queue is abandoned, all the in-progress are failed. retry them immediately and increment the attempt#
addConcatQueue(their.inProgress, 1);

our.queue = our.queue.sort(sortByTime);
our.queue.sort(sortByTime);

this.setStorageEntry(QueueStatuses.QUEUE, our.queue);

Expand All @@ -728,7 +752,6 @@ class RetryQueue implements IQueue<QueueItemData> {
this.processHead();
}

// eslint-disable-next-line class-methods-use-this
clearQueueEntries(other: IStore, localStorageBackoff: number) {
this.removeStorageEntry(other, 0, localStorageBackoff);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { LOG_CONTEXT_SEPARATOR } from '../../shared-chunks/common';

const RETRY_QUEUE_PROCESS_ERROR = (context: string): string =>
`${context}${LOG_CONTEXT_SEPARATOR}Process function threw an error.`;
const RETRY_QUEUE_PROCESS_ERROR = (context: string, errMsg: string): string =>
`${context}${LOG_CONTEXT_SEPARATOR}An unknown error occurred while processing the queue item. ${errMsg}`;

const RETRY_QUEUE_ENTRY_REMOVE_ERROR = (context: string, entry: string, attempt: number): string =>
`${context}${LOG_CONTEXT_SEPARATOR}Failed to remove local storage entry "${entry}" (attempt: ${attempt}.`;
Expand Down
4 changes: 2 additions & 2 deletions packages/analytics-js/.size-limit.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export default [
name: 'Core - Legacy - NPM (CJS)',
path: 'dist/npm/legacy/cjs/index.cjs',
import: '*',
limit: '48 KiB',
limit: '48.2 KiB',
},
{
name: 'Core - Legacy - NPM (UMD)',
Expand Down Expand Up @@ -59,7 +59,7 @@ export default [
name: 'Core (Bundled) - Legacy - NPM (CJS)',
path: 'dist/npm/legacy/bundled/cjs/index.cjs',
import: '*',
limit: '48 KiB',
limit: '48.2 KiB',
},
{
name: 'Core (Bundled) - Legacy - NPM (UMD)',
Expand Down

0 comments on commit f9263b2

Please sign in to comment.