diff --git a/packages/analytics-js-common/__mocks__/StoreManager.ts b/packages/analytics-js-common/__mocks__/StoreManager.ts index 822f0954db..9fe8678245 100644 --- a/packages/analytics-js-common/__mocks__/StoreManager.ts +++ b/packages/analytics-js-common/__mocks__/StoreManager.ts @@ -2,7 +2,8 @@ import type { IStoreConfig, IStoreManager } from '../src/types/Store'; import { defaultPluginsManager } from './PluginsManager'; import { defaultCookieStorage, defaultInMemoryStorage, defaultLocalStorage } from './Storage'; import { defaultStore, Store } from './Store'; - +import { defaultLogger } from './Logger'; +import { defaultErrorHandler } from './ErrorHandler'; // Mock all the methods of the StoreManager class class StoreManager implements IStoreManager { @@ -26,6 +27,8 @@ class StoreManager implements IStoreManager { }; getStore = jest.fn(() => defaultStore); initializeStorageState = jest.fn(); + logger = defaultLogger; + errorHandler = defaultErrorHandler; } const defaultStoreManager = new StoreManager(); diff --git a/packages/analytics-js-plugins/.size-limit.mjs b/packages/analytics-js-plugins/.size-limit.mjs index 8849476fc5..3972f02251 100644 --- a/packages/analytics-js-plugins/.size-limit.mjs +++ b/packages/analytics-js-plugins/.size-limit.mjs @@ -16,7 +16,7 @@ export default [ { name: 'Plugins - Legacy - CDN', path: 'dist/cdn/legacy/plugins/rsa-plugins-*.min.js', - limit: '14.1 KiB', + limit: '14.5 KiB', }, { name: 'Plugins - Modern - CDN', diff --git a/packages/analytics-js-plugins/__tests__/utilities/retryQueue/RetryQueue.test.ts b/packages/analytics-js-plugins/__tests__/utilities/retryQueue/RetryQueue.test.ts index bbbe0e46d6..f9777056d4 100644 --- a/packages/analytics-js-plugins/__tests__/utilities/retryQueue/RetryQueue.test.ts +++ b/packages/analytics-js-plugins/__tests__/utilities/retryQueue/RetryQueue.test.ts @@ -3,6 +3,7 @@ import { QueueStatuses } from '@rudderstack/analytics-js-common/constants/QueueS import { defaultStoreManager } from '@rudderstack/analytics-js-common/__mocks__/StoreManager'; import { defaultLocalStorage } from '@rudderstack/analytics-js-common/__mocks__/Storage'; import { Store } from '@rudderstack/analytics-js-common/__mocks__/Store'; +import { defaultLogger } from '@rudderstack/analytics-js-common/__mocks__/Logger'; import { Schedule } from '../../../src/utilities/retryQueue/Schedule'; import { RetryQueue } from '../../../src/utilities/retryQueue/RetryQueue'; import type { QueueItem, QueueItemData } from '../../../src/types/plugins'; @@ -33,6 +34,8 @@ describe('Queue', () => { }, jest.fn(), defaultStoreManager, + undefined, + defaultLogger, ); queue.schedule = schedule; }); @@ -247,7 +250,7 @@ describe('Queue', () => { it('should retry a task if it fails', () => { queue.start(); - // Fail the first time, Succeed the second time + // Fail the first time, succeed the last time const mockProcessItemCb = jest .fn() .mockImplementationOnce((_, cb) => cb(new Error('no'))) @@ -298,6 +301,86 @@ describe('Queue', () => { }); }); + it('should retry queue item if process function throws an error', () => { + queue.start(); + + const mockProcessItemCb = jest + .fn() + .mockImplementationOnce(() => { + throw new Error('error 1'); + }) + .mockImplementationOnce(() => { + throw new Error('error 2'); + }) + .mockImplementationOnce(() => { + throw new Error('error 3'); + }); + queue.processQueueCb = mockProcessItemCb; + queue.maxAttempts = 2; + + queue.addItem('a'); + + expect(mockProcessItemCb).toHaveBeenCalledTimes(1); + expect(mockProcessItemCb).toHaveBeenCalledWith('a', expect.any(Function), { + retryAttemptNumber: 0, + maxRetryAttempts: 2, + willBeRetried: true, + timeSinceFirstAttempt: expect.any(Number), + timeSinceLastAttempt: expect.any(Number), + reclaimed: false, + }); + expect(defaultLogger.error).toHaveBeenCalledTimes(1); + expect(defaultLogger.error).toHaveBeenCalledWith( + 'RetryQueue:: An unknown error occurred while processing the queue item. The item will be requeued.', + new Error('error 1'), + ); + + // Delay for the first retry + mockProcessItemCb.mockClear(); + defaultLogger.error.mockClear(); + jest.advanceTimersByTime(queue.getDelay(1)); + + expect(mockProcessItemCb).toHaveBeenCalledTimes(1); + expect(mockProcessItemCb).toHaveBeenCalledWith('a', expect.any(Function), { + retryAttemptNumber: 1, + maxRetryAttempts: 2, + willBeRetried: true, + timeSinceFirstAttempt: expect.any(Number), + timeSinceLastAttempt: expect.any(Number), + reclaimed: false, + }); + expect(defaultLogger.error).toHaveBeenCalledTimes(1); + expect(defaultLogger.error).toHaveBeenCalledWith( + 'RetryQueue:: An unknown error occurred while processing the queue item. The item will be requeued. Retry attempt 1 of 2.', + new Error('error 2'), + ); + + // Delay for the second retry + mockProcessItemCb.mockClear(); + defaultLogger.error.mockClear(); + jest.advanceTimersByTime(queue.getDelay(2)); + + expect(mockProcessItemCb).toHaveBeenCalledTimes(1); + expect(mockProcessItemCb).toHaveBeenCalledWith('a', expect.any(Function), { + retryAttemptNumber: 2, + maxRetryAttempts: 2, + willBeRetried: false, // because maxAttempts is 2 + timeSinceFirstAttempt: expect.any(Number), + timeSinceLastAttempt: expect.any(Number), + reclaimed: false, + }); + expect(defaultLogger.error).toHaveBeenCalledTimes(1); + expect(defaultLogger.error).toHaveBeenCalledWith( + 'RetryQueue:: An unknown error occurred while processing the queue item. Retries exhausted (2). The item will be dropped.', + new Error('error 3'), + ); + + // No retries left as all attempts have been made + expect(queue.getStorageEntry('queue')).toEqual([]); + expect(queue.getStorageEntry('batchQueue')).toEqual([]); + expect(queue.getStorageEntry('inProgress')).toEqual({}); + }); + it('should delay retries', () => { const mockProcessItemCb = jest.fn((_, cb) => cb()); queue.processQueueCb = mockProcessItemCb; @@ -389,6 +472,19 @@ describe('Queue', () => { expect(storedQueue[99].item).toEqual(104); }); + it('should respect maxItems configuration value 1', () => { + queue.maxItems = 1; + + for (let i = 0; i < 105; i += 1) { + jest.advanceTimersByTime(1); + queue.addItem(i); + } + + const storedQueue = queue.store.get(QueueStatuses.QUEUE); + expect(storedQueue.length).toEqual(1); + expect(storedQueue[0].item).toEqual(104); + }); + it('should take over a queued task if a queue is abandoned', () => { // a wild queue of interest appears const foundQueue = new Store( diff --git a/packages/analytics-js-plugins/src/utilities/retryQueue/RetryQueue.ts b/packages/analytics-js-plugins/src/utilities/retryQueue/RetryQueue.ts index ac8651a11f..0f55b5b147 100644 --- a/packages/analytics-js-plugins/src/utilities/retryQueue/RetryQueue.ts +++ b/packages/analytics-js-plugins/src/utilities/retryQueue/RetryQueue.ts @@ -48,14 +48,6 @@ const sortByTime = (a: QueueItem, b: QueueItem) => a.time - b.time; const RETRY_QUEUE = 'RetryQueue'; -/** - * Constructs a RetryQueue backed by localStorage - * - * @constructor - * @param {String} name The name of the queue. Will be used to find abandoned queues and retry their items - * @param {Object} [opts] Optional argument to override `maxItems`, `maxAttempts`, `minRetryDelay, `maxRetryDelay`, `backoffFactor` and `backoffJitter`. - * @param {QueueProcessCallback} fn The function to call in order to process an item added to the queue - */ class RetryQueue implements IQueue { name: string; id: string; @@ -78,6 +70,17 @@ class RetryQueue implements IQueue { reclaimEndVal?: Nullable; isPageAccessible: boolean; + /** + * Constructs a RetryQueue backed by localStorage + * + * @param {String} name The name of the queue. Will be used to find abandoned queues and retry their items + * @param {QueueOpts} [options] Optional argument to override `maxItems`, `maxAttempts`, `minRetryDelay, `maxRetryDelay`, `backoffFactor` and `backoffJitter`. + * @param {QueueProcessCallback} queueProcessCb The function to call in order to process an item added to the queue + * @param {IStoreManager} storeManager The store manager instance to use + * @param {StorageType} [storageType] The storage type to use. Defaults to LOCAL_STORAGE + * @param {ILogger} [logger] The logger to use + * @param {QueueBatchItemsSizeCalculatorCallback} [queueBatchItemsSizeCalculatorCb] The callback to use to calculate the size of items in the batch queue + */ constructor( name: string, options: QueueOpts, @@ -375,7 +378,12 @@ class RetryQueue implements IQueue { let queue = (this.getStorageEntry(QueueStatuses.QUEUE) as Nullable[]>) ?? []; - queue = queue.slice(-(this.maxItems - 1)); + if (this.maxItems > 1) { + queue = queue.slice(-(this.maxItems - 1)); + } else { + queue = []; + } + queue.push(curEntry); queue = queue.sort(sortByTime); @@ -419,9 +427,8 @@ class RetryQueue implements IQueue { * Adds an item to the retry queue * * @param {Object} qItem The item to process - * @param {Error} [error] The error that occurred during processing */ - requeue(qItem: QueueItem, error?: Error) { + requeue(qItem: QueueItem) { const { attemptNumber, item, type, id, firstAttemptedAt, lastAttemptedAt, reclaimed } = qItem; // Increment the attempt number as we're about to retry const attemptNumberToUse = attemptNumber + 1; @@ -504,7 +511,7 @@ class RetryQueue implements IQueue { this.setStorageEntry(QueueStatuses.IN_PROGRESS, inProgress); if (err) { - this.requeue({ ...el, firstAttemptedAt, lastAttemptedAt }, err); + this.requeue({ ...el, firstAttemptedAt, lastAttemptedAt }); } }; @@ -596,7 +603,24 @@ class RetryQueue implements IQueue { reclaimed, }); } catch (err) { - this.logger?.error(RETRY_QUEUE_PROCESS_ERROR(RETRY_QUEUE), err); + let errMsg = ''; + if (el.attemptNumber < this.maxAttempts) { + errMsg = 'The item will be requeued.'; + if (el.attemptNumber > 0) { + errMsg = `${errMsg} Retry attempt ${el.attemptNumber} of ${this.maxAttempts}.`; + } + + // requeue the item to be retried + el.done(err); + } else { + errMsg = `Retries exhausted (${this.maxAttempts}). The item will be dropped.`; + + // drop the event as we're unable to process it + // after the max attempts are exhausted + el.done(); + } + + this.logger?.error(RETRY_QUEUE_PROCESS_ERROR(RETRY_QUEUE, errMsg), err); } }); @@ -717,7 +741,7 @@ class RetryQueue implements IQueue { // if the queue is abandoned, all the in-progress are failed. retry them immediately and increment the attempt# addConcatQueue(their.inProgress, 1); - our.queue = our.queue.sort(sortByTime); + our.queue.sort(sortByTime); this.setStorageEntry(QueueStatuses.QUEUE, our.queue); @@ -728,7 +752,6 @@ class RetryQueue implements IQueue { this.processHead(); } - // eslint-disable-next-line class-methods-use-this clearQueueEntries(other: IStore, localStorageBackoff: number) { this.removeStorageEntry(other, 0, localStorageBackoff); } diff --git a/packages/analytics-js-plugins/src/utilities/retryQueue/logMessages.ts b/packages/analytics-js-plugins/src/utilities/retryQueue/logMessages.ts index b9703cfd70..a239af98dd 100644 --- a/packages/analytics-js-plugins/src/utilities/retryQueue/logMessages.ts +++ b/packages/analytics-js-plugins/src/utilities/retryQueue/logMessages.ts @@ -1,7 +1,7 @@ import { LOG_CONTEXT_SEPARATOR } from '../../shared-chunks/common'; -const RETRY_QUEUE_PROCESS_ERROR = (context: string): string => - `${context}${LOG_CONTEXT_SEPARATOR}Process function threw an error.`; +const RETRY_QUEUE_PROCESS_ERROR = (context: string, errMsg: string): string => + `${context}${LOG_CONTEXT_SEPARATOR}An unknown error occurred while processing the queue item. ${errMsg}`; const RETRY_QUEUE_ENTRY_REMOVE_ERROR = (context: string, entry: string, attempt: number): string => `${context}${LOG_CONTEXT_SEPARATOR}Failed to remove local storage entry "${entry}" (attempt: ${attempt}.`; diff --git a/packages/analytics-js/.size-limit.mjs b/packages/analytics-js/.size-limit.mjs index bc20526b05..80d4225f06 100644 --- a/packages/analytics-js/.size-limit.mjs +++ b/packages/analytics-js/.size-limit.mjs @@ -13,7 +13,7 @@ export default [ name: 'Core - Legacy - NPM (CJS)', path: 'dist/npm/legacy/cjs/index.cjs', import: '*', - limit: '48 KiB', + limit: '48.2 KiB', }, { name: 'Core - Legacy - NPM (UMD)', @@ -59,7 +59,7 @@ export default [ name: 'Core (Bundled) - Legacy - NPM (CJS)', path: 'dist/npm/legacy/bundled/cjs/index.cjs', import: '*', - limit: '48 KiB', + limit: '48.2 KiB', }, { name: 'Core (Bundled) - Legacy - NPM (UMD)',