diff --git a/apps/worker/src/app/workflow/services/cold-start.service.ts b/apps/worker/src/app/workflow/services/cold-start.service.ts index 72102fef7d0..bd6445bb1d1 100644 --- a/apps/worker/src/app/workflow/services/cold-start.service.ts +++ b/apps/worker/src/app/workflow/services/cold-start.service.ts @@ -2,28 +2,15 @@ import { INestApplication } from '@nestjs/common'; import { INovuWorker, ReadinessService } from '@novu/application-generic'; import { StandardWorker } from './standard.worker'; -import { WorkflowWorker } from './workflow.worker'; -import { OldInstanceStandardWorker } from './old-instance-standard.worker'; -import { OldInstanceWorkflowWorker } from './old-instance-workflow.worker'; import { SubscriberProcessWorker } from './subscriber-process.worker'; +import { WorkflowWorker } from './workflow.worker'; -/** - * TODO: Temporary engage OldInstanceWorkflowWorker while migrating to MemoryDB - */ const getWorkers = (app: INestApplication): INovuWorker[] => { const standardWorker = app.get(StandardWorker, { strict: false }); const workflowWorker = app.get(WorkflowWorker, { strict: false }); - const oldInstanceStandardWorker = app.get(OldInstanceStandardWorker, { strict: false }); - const oldInstanceWorkflowWorker = app.get(OldInstanceWorkflowWorker, { strict: false }); const subscriberProcessWorker = app.get(SubscriberProcessWorker, { strict: false }); - const workers: INovuWorker[] = [ - standardWorker, - workflowWorker, - oldInstanceStandardWorker, - oldInstanceWorkflowWorker, - subscriberProcessWorker, - ]; + const workers: INovuWorker[] = [standardWorker, workflowWorker, subscriberProcessWorker]; return workers; }; diff --git a/apps/worker/src/app/workflow/services/index.ts b/apps/worker/src/app/workflow/services/index.ts index ee0d3b62794..2183e3b1a4f 100644 --- a/apps/worker/src/app/workflow/services/index.ts +++ b/apps/worker/src/app/workflow/services/index.ts @@ -2,5 +2,3 @@ export * from './active-jobs-metric.service'; export * from './completed-jobs-metric.service'; export * from './standard.worker'; export * from './workflow.worker'; -export * from './old-instance-standard.worker'; -export * from './old-instance-workflow.worker'; diff --git a/apps/worker/src/app/workflow/services/old-instance-standard.worker.ts b/apps/worker/src/app/workflow/services/old-instance-standard.worker.ts deleted file mode 100644 index e8fdde4afd8..00000000000 --- a/apps/worker/src/app/workflow/services/old-instance-standard.worker.ts +++ /dev/null @@ -1,199 +0,0 @@ -const nr = require('newrelic'); -import { forwardRef, Inject, Injectable, Logger } from '@nestjs/common'; -import { IJobData, ObservabilityBackgroundTransactionEnum } from '@novu/shared'; -import { - INovuWorker, - Job, - OldInstanceBullMqService, - PinoLogger, - storage, - Store, - OldInstanceStandardWorkerService, - WorkerOptions, -} from '@novu/application-generic'; - -import { - RunJob, - RunJobCommand, - SetJobAsCommand, - SetJobAsCompleted, - SetJobAsFailed, - SetJobAsFailedCommand, - WebhookFilterBackoffStrategy, - HandleLastFailedJobCommand, - HandleLastFailedJob, -} from '../usecases'; - -const LOG_CONTEXT = 'OldInstanceStandardWorker'; - -/** - * TODO: Temporary for migration to MemoryDB - */ -@Injectable() -export class OldInstanceStandardWorker extends OldInstanceStandardWorkerService implements INovuWorker { - constructor( - private handleLastFailedJob: HandleLastFailedJob, - private runJob: RunJob, - @Inject(forwardRef(() => SetJobAsCompleted)) private setJobAsCompleted: SetJobAsCompleted, - @Inject(forwardRef(() => SetJobAsFailed)) private setJobAsFailed: SetJobAsFailed, - @Inject(forwardRef(() => WebhookFilterBackoffStrategy)) - private webhookFilterBackoffStrategy: WebhookFilterBackoffStrategy - ) { - super(); - - this.initWorker(this.getWorkerProcessor(), this.getWorkerOptions()); - - if (this.bullMqService.enabled) { - this.worker.on('completed', async (job: Job): Promise => { - await this.jobHasCompleted(job); - }); - - this.worker.on('failed', async (job: Job, error: Error): Promise => { - await this.jobHasFailed(job, error); - }); - } - } - - private getWorkerOptions(): WorkerOptions { - return { - lockDuration: 90000, - concurrency: 200, - settings: { - backoffStrategy: this.getBackoffStrategies(), - }, - }; - } - - private extractMinimalJobData(job: any): { - environmentId: string; - organizationId: string; - jobId: string; - userId: string; - } { - const { _environmentId: environmentId, _id: jobId, _organizationId: organizationId, _userId: userId } = job; - - if (!environmentId || !jobId || !organizationId || !userId) { - const message = job.payload.message; - - return { - environmentId: message._environmentId, - jobId: message._jobId, - organizationId: message._organizationId, - userId: job.userId, - }; - } - - return { - environmentId, - jobId, - organizationId, - userId, - }; - } - - private getWorkerProcessor() { - return async ({ data }: { data: IJobData | any }) => { - const minimalJobData = this.extractMinimalJobData(data); - - Logger.verbose(`Job ${minimalJobData.jobId} is being processed in the old instance standard worker`, LOG_CONTEXT); - - return await new Promise(async (resolve, reject) => { - // eslint-disable-next-line @typescript-eslint/no-this-alias - const _this = this; - - nr.startBackgroundTransaction( - ObservabilityBackgroundTransactionEnum.JOB_PROCESSING_QUEUE, - 'Trigger Engine', - function () { - const transaction = nr.getTransaction(); - - storage.run(new Store(PinoLogger.root), () => { - _this.runJob - .execute(RunJobCommand.create(minimalJobData)) - .then(resolve) - .catch((error) => { - Logger.error( - error, - `Failed to run the job ${minimalJobData.jobId} during worker processing`, - LOG_CONTEXT - ); - - return reject(error); - }) - .finally(() => { - transaction.end(); - }); - }); - } - ); - }); - }; - } - - private async jobHasCompleted(job: Job): Promise { - let jobId; - - try { - const minimalData = this.extractMinimalJobData(job.data); - jobId = minimalData.jobId; - const environmentId = minimalData.environmentId; - const userId = minimalData.userId; - - await this.setJobAsCompleted.execute( - SetJobAsCommand.create({ - environmentId, - jobId, - userId, - }) - ); - Logger.verbose({ job }, `Job ${jobId} set as completed`, LOG_CONTEXT); - } catch (error) { - Logger.error(error, `Failed to set job ${jobId} as completed`, LOG_CONTEXT); - } - } - - private async jobHasFailed(job: Job, error: Error): Promise { - let jobId; - - try { - const minimalData = this.extractMinimalJobData(job.data); - jobId = minimalData.jobId; - - const hasToBackoff = this.runJob.shouldBackoff(error); - const hasReachedMaxAttempts = job.attemptsMade >= this.DEFAULT_ATTEMPTS; - const shouldHandleLastFailedJob = hasToBackoff && hasReachedMaxAttempts; - - const shouldBeSetAsFailed = !hasToBackoff || shouldHandleLastFailedJob; - if (shouldBeSetAsFailed) { - await this.setJobAsFailed.execute(SetJobAsFailedCommand.create(minimalData), error); - } - - if (shouldHandleLastFailedJob) { - const handleLastFailedJobCommand = HandleLastFailedJobCommand.create({ - ...minimalData, - error, - }); - - await this.handleLastFailedJob.execute(handleLastFailedJobCommand); - } - Logger.verbose({ job }, `Job ${jobId} set as failed`, LOG_CONTEXT); - } catch (anotherError) { - Logger.error(anotherError, `Failed to set job ${jobId} as failed`, LOG_CONTEXT); - } - } - - private getBackoffStrategies = () => { - return async (attemptsMade: number, type: string, eventError: Error, eventJob: Job): Promise => { - const command = { - attemptsMade, - environmentId: eventJob?.data?._environmentId, - eventError, - eventJob, - organizationId: eventJob?.data?._organizationId, - userId: eventJob?.data?._userId, - }; - - return await this.webhookFilterBackoffStrategy.execute(command); - }; - }; -} diff --git a/apps/worker/src/app/workflow/services/old-instance-workflow.worker.ts b/apps/worker/src/app/workflow/services/old-instance-workflow.worker.ts deleted file mode 100644 index 60d6535c07f..00000000000 --- a/apps/worker/src/app/workflow/services/old-instance-workflow.worker.ts +++ /dev/null @@ -1,64 +0,0 @@ -const nr = require('newrelic'); -import { forwardRef, Inject, Injectable, Logger } from '@nestjs/common'; -import { IJobData, ObservabilityBackgroundTransactionEnum } from '@novu/shared'; -import { - INovuWorker, - Job, - OldInstanceBullMqService, - PinoLogger, - storage, - Store, - OldInstanceWorkflowWorkerService, - TriggerEvent, - WorkerOptions, -} from '@novu/application-generic'; - -const LOG_CONTEXT = 'OldInstanceWorkflowWorker'; - -/** - * TODO: Temporary for migration to MemoryDB - */ -@Injectable() -export class OldInstanceWorkflowWorker extends OldInstanceWorkflowWorkerService implements INovuWorker { - constructor(private triggerEventUsecase: TriggerEvent) { - super(); - - this.initWorker(this.getWorkerProcessor(), this.getWorkerOptions()); - } - - private getWorkerOptions(): WorkerOptions { - return { - lockDuration: 90000, - concurrency: 200, - }; - } - - private getWorkerProcessor() { - return async ({ data }: { data: IJobData | any }) => { - return await new Promise(async (resolve, reject) => { - // eslint-disable-next-line @typescript-eslint/no-this-alias - const _this = this; - - Logger.verbose(`Job ${data._id} is being processed in the old instance workflow worker`, LOG_CONTEXT); - - nr.startBackgroundTransaction( - ObservabilityBackgroundTransactionEnum.TRIGGER_HANDLER_QUEUE, - 'Trigger Engine', - function () { - const transaction = nr.getTransaction(); - - storage.run(new Store(PinoLogger.root), () => { - _this.triggerEventUsecase - .execute(data) - .then(resolve) - .catch(reject) - .finally(() => { - transaction.end(); - }); - }); - } - ); - }); - }; - } -} diff --git a/apps/worker/src/app/workflow/workflow.module.ts b/apps/worker/src/app/workflow/workflow.module.ts index 080eaa1e6dc..46552356b68 100644 --- a/apps/worker/src/app/workflow/workflow.module.ts +++ b/apps/worker/src/app/workflow/workflow.module.ts @@ -18,7 +18,6 @@ import { GetSubscriberGlobalPreference, GetSubscriberTemplatePreference, ProcessTenant, - OldInstanceBullMqService, QueuesModule, SelectIntegration, SendTestEmail, @@ -33,14 +32,7 @@ import { } from '@novu/application-generic'; import { JobRepository } from '@novu/dal'; -import { - ActiveJobsMetricService, - CompletedJobsMetricService, - StandardWorker, - WorkflowWorker, - OldInstanceWorkflowWorker, - OldInstanceStandardWorker, -} from './services'; +import { ActiveJobsMetricService, CompletedJobsMetricService, StandardWorker, WorkflowWorker } from './services'; import { MessageMatcher, @@ -123,9 +115,6 @@ const PROVIDERS: Provider[] = [ StandardWorker, WorkflowWorker, SubscriberProcessWorker, - OldInstanceBullMqService, - OldInstanceStandardWorker, - OldInstanceWorkflowWorker, ]; @Module({ diff --git a/apps/ws/src/socket/services/index.ts b/apps/ws/src/socket/services/index.ts index 735d2d9ed04..b03b9f0b02d 100644 --- a/apps/ws/src/socket/services/index.ts +++ b/apps/ws/src/socket/services/index.ts @@ -1,3 +1 @@ -export { OldInstanceWebSocketsWorker } from './old-instance-web-sockets.worker'; -export { OldInstanceWebSocketsWorkerService } from './old-instance-web-sockets-worker.service'; export { WebSocketWorker } from './web-socket.worker'; diff --git a/apps/ws/src/socket/services/old-instance-web-sockets-worker.service.ts b/apps/ws/src/socket/services/old-instance-web-sockets-worker.service.ts deleted file mode 100644 index 91ba7c714c6..00000000000 --- a/apps/ws/src/socket/services/old-instance-web-sockets-worker.service.ts +++ /dev/null @@ -1,91 +0,0 @@ -import { IJobData, JobTopicNameEnum } from '@novu/shared'; -import { Inject, Injectable, Logger } from '@nestjs/common'; - -import { JobsOptions, OldInstanceBullMqService, Processor, Worker, WorkerOptions } from '@novu/application-generic'; - -const LOG_CONTEXT = 'OldInstanceWebSocketsWorkerService'; - -type WorkerProcessor = string | Processor | undefined; - -/** - * TODO: Temporary for migration to MemoryDB - */ -export class OldInstanceWebSocketsWorkerService { - private instance: OldInstanceBullMqService; - - public readonly DEFAULT_ATTEMPTS = 3; - public readonly topic: JobTopicNameEnum; - - constructor() { - this.topic = JobTopicNameEnum.WEB_SOCKETS; - this.instance = new OldInstanceBullMqService(); - if (this.instance.enabled) { - Logger.log(`Old instance Worker ${this.topic} instantiated`, LOG_CONTEXT); - } else { - Logger.warn( - `Old instance web sockets worker not instantiated as it is only needed for MemoryDB migration`, - LOG_CONTEXT - ); - } - } - - public get bullMqService(): OldInstanceBullMqService { - return this.instance; - } - - public get worker(): Worker { - return this.instance.worker; - } - - public initWorker(processor: WorkerProcessor, options?: WorkerOptions): void { - if (this.instance.enabled) { - this.createWorker(processor, options); - } - } - - public createWorker(processor: WorkerProcessor, options?: WorkerOptions): void { - if (this.instance.enabled) { - this.instance.createWorker(this.topic, processor, options); - } else { - Logger.log( - { enabled: this.instance.enabled }, - 'We are not running OldInstanceWorkflowWorkerService as it is not needed in this environment', - LOG_CONTEXT - ); - } - } - - public async isRunning(): Promise { - return await this.instance.isWorkerRunning(); - } - - public async isPaused(): Promise { - return await this.instance.isWorkerPaused(); - } - - public async pause(): Promise { - if (this.instance.enabled && this.worker) { - await this.instance.pauseWorker(); - } - } - - public async resume(): Promise { - if (this.instance.enabled && this.worker) { - await this.instance.resumeWorker(); - } - } - - public async gracefulShutdown(): Promise { - if (this.instance.enabled) { - Logger.log('Shutting the old web sockets Worker service down', LOG_CONTEXT); - - await this.instance.gracefulShutdown(); - - Logger.log('Shutting down the old web sockets Worker service has finished', LOG_CONTEXT); - } - } - - async onModuleDestroy(): Promise { - await this.gracefulShutdown(); - } -} diff --git a/apps/ws/src/socket/services/old-instance-web-sockets.worker.ts b/apps/ws/src/socket/services/old-instance-web-sockets.worker.ts deleted file mode 100644 index fea4f6138c6..00000000000 --- a/apps/ws/src/socket/services/old-instance-web-sockets.worker.ts +++ /dev/null @@ -1,71 +0,0 @@ -const nr = require('newrelic'); -import { Injectable, Logger } from '@nestjs/common'; - -import { INovuWorker, WebSocketsWorkerService } from '@novu/application-generic'; - -import { ExternalServicesRoute, ExternalServicesRouteCommand } from '../usecases/external-services-route'; -import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; -import { OldInstanceWebSocketsWorkerService } from './old-instance-web-sockets-worker.service'; - -const LOG_CONTEXT = 'OldInstanceWebSocketsWorker'; - -@Injectable() -export class OldInstanceWebSocketsWorker extends OldInstanceWebSocketsWorkerService implements INovuWorker { - constructor(private externalServicesRoute: ExternalServicesRoute) { - super(); - - this.initWorker(this.getWorkerProcessor(), this.getWorkerOpts()); - } - - private getWorkerProcessor() { - return async (job) => { - return new Promise((resolve, reject) => { - // eslint-disable-next-line @typescript-eslint/no-this-alias - const _this = this; - - Logger.verbose( - `Job ${job.id} / ${job.data.event} is being processed in the old instance web sockets worker`, - LOG_CONTEXT - ); - - nr.startBackgroundTransaction( - ObservabilityBackgroundTransactionEnum.WS_SOCKET_QUEUE, - 'WS Service', - function () { - const transaction = nr.getTransaction(); - - _this.externalServicesRoute - .execute( - ExternalServicesRouteCommand.create({ - userId: job.data.userId, - event: job.data.event, - payload: job.data.payload, - _environmentId: job.data._environmentId, - }) - ) - .then(resolve) - .catch((error) => { - Logger.error( - 'Unexpected exception occurred while handling external services route ', - error, - LOG_CONTEXT - ); - - reject(error); - }) - .finally(() => { - transaction.end(); - }); - } - ); - }); - }; - } - - private getWorkerOpts() { - return { - lockDuration: 90000, - concurrency: 100, - }; - } -} diff --git a/apps/ws/src/socket/socket.module.ts b/apps/ws/src/socket/socket.module.ts index 2b29edf2f24..15173f355a1 100644 --- a/apps/ws/src/socket/socket.module.ts +++ b/apps/ws/src/socket/socket.module.ts @@ -6,17 +6,11 @@ import { WSGateway } from './ws.gateway'; import { SharedModule } from '../shared/shared.module'; import { ExternalServicesRoute } from './usecases/external-services-route'; -import { OldInstanceWebSocketsWorker, OldInstanceWebSocketsWorkerService, WebSocketWorker } from './services'; +import { WebSocketWorker } from './services'; const USE_CASES: Provider[] = [ExternalServicesRoute]; -const PROVIDERS: Provider[] = [ - WSGateway, - OldInstanceWebSocketsWorker, - OldInstanceWebSocketsWorkerService, - WebSocketsWorkerService, - WebSocketWorker, -]; +const PROVIDERS: Provider[] = [WSGateway, WebSocketsWorkerService, WebSocketWorker]; @Module({ imports: [SharedModule], diff --git a/packages/application-generic/src/custom-providers/index.ts b/packages/application-generic/src/custom-providers/index.ts index ba39d9671ad..0a36030d08c 100644 --- a/packages/application-generic/src/custom-providers/index.ts +++ b/packages/application-generic/src/custom-providers/index.ts @@ -6,7 +6,6 @@ import { DistributedLockService, FeatureFlagsService, ReadinessService, - OldInstanceBullMqService, StandardQueueService, SubscriberProcessQueueService, WebSocketsQueueService, @@ -69,17 +68,6 @@ export const bullMqService = { }, }; -export const oldInstanceBullMqService = { - provide: OldInstanceBullMqService, - useFactory: async (): Promise => { - const service = new OldInstanceBullMqService(); - - await service.initialize(); - - return service; - }, -}; - export const cacheService = { provide: CacheService, useFactory: async (): Promise => { diff --git a/packages/application-generic/src/modules/queues.module.ts b/packages/application-generic/src/modules/queues.module.ts index 3379ca5eefd..ecc6e34098d 100644 --- a/packages/application-generic/src/modules/queues.module.ts +++ b/packages/application-generic/src/modules/queues.module.ts @@ -10,7 +10,7 @@ import { WebSocketsQueueServiceHealthIndicator, WorkflowQueueServiceHealthIndicator, } from '../health'; -import { OldInstanceBullMqService, ReadinessService } from '../services'; +import { ReadinessService } from '../services'; import { ActiveJobsMetricQueueService, CompletedJobsMetricQueueService, @@ -28,8 +28,6 @@ import { SubscriberProcessWorkerService, WebSocketsWorkerService, WorkflowWorkerService, - OldInstanceStandardWorkerService, - OldInstanceWorkflowWorkerService, } from '../services/workers'; const PROVIDERS: Provider[] = [ @@ -53,9 +51,6 @@ const PROVIDERS: Provider[] = [ WorkflowQueueService, WorkflowQueueServiceHealthIndicator, WorkflowWorkerService, - OldInstanceStandardWorkerService, - OldInstanceWorkflowWorkerService, - OldInstanceBullMqService, SubscriberProcessQueueService, SubscriberProcessWorkerService, SubscriberProcessQueueHealthIndicator, diff --git a/packages/application-generic/src/services/bull-mq/index.ts b/packages/application-generic/src/services/bull-mq/index.ts index 27460e45b0b..b5934a5062a 100644 --- a/packages/application-generic/src/services/bull-mq/index.ts +++ b/packages/application-generic/src/services/bull-mq/index.ts @@ -1,2 +1 @@ export * from './bull-mq.service'; -export { OldInstanceBullMqService } from './old-instance-bull-mq.service'; diff --git a/packages/application-generic/src/services/bull-mq/old-instance-bull-mq.service.ts b/packages/application-generic/src/services/bull-mq/old-instance-bull-mq.service.ts deleted file mode 100644 index d4224382420..00000000000 --- a/packages/application-generic/src/services/bull-mq/old-instance-bull-mq.service.ts +++ /dev/null @@ -1,327 +0,0 @@ -import { - ConnectionOptions as RedisConnectionOptions, - Job, - JobsOptions, - Metrics, - MetricsTime, - Processor, - Queue, - QueueBaseOptions, - QueueOptions, - Worker, - WorkerOptions, -} from 'bullmq'; -import { Injectable, Logger } from '@nestjs/common'; -import { - getRedisPrefix, - IEventJobData, - IJobData, - JobTopicNameEnum, -} from '@novu/shared'; - -import { - InMemoryProviderEnum, - InMemoryProviderService, -} from '../in-memory-provider'; -import { IRedisProviderConfig } from '../in-memory-provider/providers/redis-provider'; - -interface IQueueMetrics { - completed: Metrics; - failed: Metrics; -} - -type BullMqJobData = undefined | IJobData | IEventJobData; - -const LOG_CONTEXT = 'OldInstanceBullMqService'; -/** - * TODO: Temporary to migrate to MemoryDB - */ -@Injectable() -export class OldInstanceBullMqService { - private _queue: Queue; - private _worker: Worker; - private inMemoryProviderService: InMemoryProviderService; - public enabled: boolean; - - public static readonly pro: boolean = - process.env.NOVU_MANAGED_SERVICE !== undefined; - - constructor() { - if (this.shouldInstantiate()) { - this.inMemoryProviderService = new InMemoryProviderService( - InMemoryProviderEnum.OLD_INSTANCE_REDIS, - true - ); - this.enabled = true; - } else { - this.enabled = false; - } - } - - private shouldInstantiate(): boolean { - const isDecommissioned = - process.env.IS_OLD_CLUSTER_DECOMMISSIONED === 'true'; - const isNotDockerHosted = !process.env.IS_DOCKER_HOSTED; - const hasMemoryDbClusterServiceHost = - !!process.env.MEMORY_DB_CLUSTER_SERVICE_HOST; - - const shouldInstantiate = - !isDecommissioned && isNotDockerHosted && hasMemoryDbClusterServiceHost; - - Logger.warn( - { shouldInstantiate }, - `OldInstanceBullMqService should ${ - shouldInstantiate ? '' : 'not' - } be instantiated`, - LOG_CONTEXT - ); - - return shouldInstantiate; - } - - public async initialize() { - if (this.enabled) { - await this.inMemoryProviderService.delayUntilReadiness(); - } - } - - public get worker(): Worker { - return this._worker; - } - - public get queue(): Queue { - return this._queue; - } - - public get queuePrefix(): string { - return this._queue?.opts?.prefix; - } - - public get workerPrefix(): string { - return this._worker?.opts?.prefix; - } - - public static haveProInstalled(): boolean { - if (!OldInstanceBullMqService.pro) { - return false; - } - - require('@taskforcesh/bullmq-pro'); - - return true; - } - - private runningWithProQueue(): boolean { - return ( - OldInstanceBullMqService.pro && - OldInstanceBullMqService.haveProInstalled() - ); - } - - public async getQueueMetrics(): Promise { - if (this.enabled) { - return { - completed: await this._queue?.getMetrics('completed'), - failed: await this._queue?.getMetrics('failed'), - }; - } else { - return { - completed: undefined, - failed: undefined, - }; - } - } - - public createQueue(topic: JobTopicNameEnum, queueOptions: QueueOptions) { - if (this.enabled) { - const config = { - connection: this.inMemoryProviderService.inMemoryProviderClient, - ...(queueOptions?.defaultJobOptions && { - defaultJobOptions: { - ...queueOptions.defaultJobOptions, - }, - }), - }; - - // eslint-disable-next-line @typescript-eslint/naming-convention - const QueueClass = !OldInstanceBullMqService.pro - ? Queue - : require('@taskforcesh/bullmq-pro').QueuePro; - - Logger.log( - `Creating queue ${topic} for old instance. BullMQ pro is ${ - this.runningWithProQueue() ? 'Enabled' : 'Disabled' - }`, - LOG_CONTEXT - ); - - this._queue = new QueueClass(topic, { - ...config, - }); - } - - return this._queue; - } - - public createWorker( - topic: JobTopicNameEnum, - processor?: string | Processor, - workerOptions?: WorkerOptions - ) { - if (this.enabled) { - // eslint-disable-next-line @typescript-eslint/naming-convention - const WorkerClass = !OldInstanceBullMqService.pro - ? Worker - : require('@taskforcesh/bullmq-pro').WorkerPro; - - const { concurrency, connection, lockDuration, settings } = workerOptions; - - const config = { - connection: this.inMemoryProviderService.inMemoryProviderClient, - ...(concurrency && { concurrency }), - ...(lockDuration && { lockDuration }), - ...(settings && { settings }), - metrics: { maxDataPoints: MetricsTime.ONE_MONTH }, - ...(OldInstanceBullMqService.pro - ? { - group: {}, - } - : {}), - }; - - Logger.log( - `Creating worker for old instance. BullMQ pro is ${ - this.runningWithProQueue() ? 'Enabled' : 'Disabled' - }`, - LOG_CONTEXT - ); - - this._worker = new WorkerClass(topic, processor, config); - - return this._worker; - } - } - - public add( - id: string, - data: BullMqJobData, - options: JobsOptions = {}, - groupId?: string - ) { - if (this.enabled) { - this._queue.add(id, data, { - ...options, - ...(OldInstanceBullMqService.pro && groupId - ? { - group: { - id: groupId, - }, - } - : {}), - }); - } - } - - public async gracefulShutdown(): Promise { - Logger.log('Shutting the BullMQ service down', LOG_CONTEXT); - - if (this._queue) { - await this._queue.close(); - } - if (this._worker) { - await this._worker.close(); - } - - await this.inMemoryProviderService.shutdown(); - - Logger.log('Shutting down the BullMQ service has finished', LOG_CONTEXT); - } - - public async getStatus(): Promise<{ - queueIsPaused: boolean | undefined; - queueName: string | undefined; - workerIsPaused: boolean | undefined; - workerIsRunning: boolean | undefined; - workerName: string | undefined; - }> { - if (this.enabled) { - const [queueIsPaused, workerIsPaused, workerIsRunning] = - await Promise.all([ - this.isQueuePaused(), - this.isWorkerPaused(), - this.isWorkerRunning(), - ]); - - return { - queueIsPaused, - queueName: this._queue?.name, - workerIsPaused, - workerIsRunning, - workerName: this._worker?.name, - }; - } else { - return { - queueIsPaused: undefined, - queueName: undefined, - workerIsRunning: undefined, - workerIsPaused: undefined, - workerName: undefined, - }; - } - } - - public isClientReady(): boolean { - return this.inMemoryProviderService.isClientReady(); - } - - public async isQueuePaused(): Promise { - return await this._queue?.isPaused(); - } - - public async isWorkerPaused(): Promise { - return await this._worker?.isPaused(); - } - - public async isWorkerRunning(): Promise { - return await this._worker?.isRunning(); - } - - public async pauseWorker(): Promise { - if (this.enabled && this._worker) { - try { - /** - * We will only execute this in the cold start, therefore we will - * expect jobs not being processed in the Worker. - * Reference: https://api.docs.bullmq.io/classes/v4.Worker.html#pause.pause-1 - */ - const doNotWaitActive = true; - - await this._worker.pause(doNotWaitActive); - } catch (error) { - Logger.error( - error, - `Worker ${this._worker.name} pause failed`, - LOG_CONTEXT - ); - - throw error; - } - } - } - - public async resumeWorker(): Promise { - if (this.enabled && this._worker) { - try { - await this._worker.resume(); - } catch (error) { - Logger.error( - error, - `Worker ${this._worker.name} resume failed`, - LOG_CONTEXT - ); - - throw error; - } - } - } -} diff --git a/packages/application-generic/src/services/in-memory-provider/in-memory-provider.service.ts b/packages/application-generic/src/services/in-memory-provider/in-memory-provider.service.ts index a3e4b3f0479..c45c69fa52a 100644 --- a/packages/application-generic/src/services/in-memory-provider/in-memory-provider.service.ts +++ b/packages/application-generic/src/services/in-memory-provider/in-memory-provider.service.ts @@ -55,11 +55,6 @@ export class InMemoryProviderService { } private buildClient(provider: InMemoryProviderEnum): InMemoryProviderClient { - // TODO: Temporary while migrating to MemoryDB - if (provider === InMemoryProviderEnum.OLD_INSTANCE_REDIS) { - return this.oldInstanceInMemoryProviderSetup(); - } - return this.isCluster ? this.inMemoryClusterProviderSetup(provider) : this.inMemoryProviderSetup(); @@ -115,10 +110,7 @@ export class InMemoryProviderService { public getOptions(): RedisOptions | undefined { if (this.inMemoryProviderClient) { - if ( - this.provider === InMemoryProviderEnum.OLD_INSTANCE_REDIS || - !this.isCluster - ) { + if (!this.isCluster) { const options: RedisOptions = this.inMemoryProviderClient.options; return options; @@ -291,95 +283,6 @@ export class InMemoryProviderService { } } - /** - * TODO: Temporary while we migrate to MemoryDB - */ - private oldInstanceInMemoryProviderSetup(): Redis | undefined { - Logger.verbose( - this.descriptiveLogMessage('In-memory old instance service set up'), - LOG_CONTEXT - ); - - const { getClient, getConfig, isClientReady } = getClientAndConfig(); - - this.isProviderClientReady = isClientReady; - this.inMemoryProviderConfig = getConfig(); - const { host, port, ttl } = getConfig(); - - if (!host) { - Logger.warn( - this.descriptiveLogMessage( - 'Missing host for in-memory provider old instance' - ), - LOG_CONTEXT - ); - } - - const inMemoryProviderClient = getClient(); - if (host && inMemoryProviderClient) { - Logger.log( - this.descriptiveLogMessage( - `Connecting to old instance to ${host}:${port}` - ), - LOG_CONTEXT - ); - - inMemoryProviderClient.on('connect', () => { - Logger.log( - this.descriptiveLogMessage('REDIS CONNECTED to old instance'), - LOG_CONTEXT - ); - }); - - inMemoryProviderClient.on('reconnecting', () => { - Logger.verbose( - this.descriptiveLogMessage('Redis reconnecting to old instance'), - LOG_CONTEXT - ); - }); - - inMemoryProviderClient.on('close', () => { - Logger.verbose( - this.descriptiveLogMessage('Redis close old instance'), - LOG_CONTEXT - ); - }); - - inMemoryProviderClient.on('end', () => { - Logger.verbose( - this.descriptiveLogMessage('Redis end old instance'), - LOG_CONTEXT - ); - }); - - inMemoryProviderClient.on('error', (error) => { - Logger.error( - error, - this.descriptiveLogMessage( - 'There has been an error in the InMemory provider client for the old instance' - ), - LOG_CONTEXT - ); - }); - - inMemoryProviderClient.on('ready', () => { - Logger.log( - this.descriptiveLogMessage('Redis ready for old instance'), - LOG_CONTEXT - ); - }); - - inMemoryProviderClient.on('wait', () => { - Logger.verbose( - this.descriptiveLogMessage('Redis wait for old instance'), - LOG_CONTEXT - ); - }); - - return inMemoryProviderClient; - } - } - public inMemoryScan(pattern: string): ScanStream { if (this.isCluster) { const client = this.inMemoryProviderClient as Cluster; diff --git a/packages/application-generic/src/services/in-memory-provider/types.ts b/packages/application-generic/src/services/in-memory-provider/types.ts index 40f95d80a66..1db263a7f56 100644 --- a/packages/application-generic/src/services/in-memory-provider/types.ts +++ b/packages/application-generic/src/services/in-memory-provider/types.ts @@ -17,7 +17,6 @@ export enum InMemoryProviderEnum { MEMORY_DB = 'MemoryDB', REDIS = 'Redis', REDIS_CLUSTER = 'RedisCluster', - OLD_INSTANCE_REDIS = 'OldInstanceRedis', } export type Pipeline = ChainableCommander; diff --git a/packages/application-generic/src/services/index.ts b/packages/application-generic/src/services/index.ts index a6d6f0dcf89..25582f4f071 100644 --- a/packages/application-generic/src/services/index.ts +++ b/packages/application-generic/src/services/index.ts @@ -22,6 +22,5 @@ export { QueueOptions, Worker, WorkerOptions, - OldInstanceBullMqService, } from './bull-mq'; export * from './auth'; diff --git a/packages/application-generic/src/services/workers/index.ts b/packages/application-generic/src/services/workers/index.ts index 72cc4e78aea..e2180497f29 100644 --- a/packages/application-generic/src/services/workers/index.ts +++ b/packages/application-generic/src/services/workers/index.ts @@ -8,23 +8,19 @@ import { ActiveJobsMetricWorkerService } from './active-jobs-metric-worker.servi import { CompletedJobsMetricWorkerService } from './completed-jobs-metric-worker.service'; import { InboundParseWorkerService } from './inbound-parse-worker.service'; import { StandardWorkerService } from './standard-worker.service'; +import { SubscriberProcessWorkerService } from './subscriber-process-worker.service'; import { WebSocketsWorkerService } from './web-sockets-worker.service'; import { WorkflowWorkerService } from './workflow-worker.service'; -import { OldInstanceStandardWorkerService } from './old-instance-standard-worker.service'; -import { OldInstanceWorkflowWorkerService } from './old-instance-workflow-worker.service'; -import { SubscriberProcessWorkerService } from './subscriber-process-worker.service'; export { ActiveJobsMetricWorkerService, CompletedJobsMetricWorkerService, InboundParseWorkerService as InboundParseWorker, StandardWorkerService, + SubscriberProcessWorkerService, WebSocketsWorkerService, WorkerBaseService, WorkerOptions, WorkerProcessor, WorkflowWorkerService, - OldInstanceStandardWorkerService, - OldInstanceWorkflowWorkerService, - SubscriberProcessWorkerService, }; diff --git a/packages/application-generic/src/services/workers/old-instance-standard-worker.service.ts b/packages/application-generic/src/services/workers/old-instance-standard-worker.service.ts deleted file mode 100644 index 03852a75223..00000000000 --- a/packages/application-generic/src/services/workers/old-instance-standard-worker.service.ts +++ /dev/null @@ -1,106 +0,0 @@ -import { IJobData, JobTopicNameEnum } from '@novu/shared'; -import { Inject, Injectable, Logger } from '@nestjs/common'; - -import { - JobsOptions, - OldInstanceBullMqService, - Processor, - Worker, - WorkerOptions, -} from '../bull-mq'; - -const LOG_CONTEXT = 'OldInstanceStandardWorkerService'; - -type WorkerProcessor = string | Processor | undefined; - -/** - * TODO: Temporary for migration to MemoryDB - */ -export class OldInstanceStandardWorkerService { - private instance: OldInstanceBullMqService; - - public readonly DEFAULT_ATTEMPTS = 3; - public readonly topic: JobTopicNameEnum; - - constructor() { - this.topic = JobTopicNameEnum.STANDARD; - this.instance = new OldInstanceBullMqService(); - if (this.instance.enabled) { - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); - } else { - Logger.warn( - `Old instance standard worker not instantiated as it is only needed for MemoryDB migration`, - LOG_CONTEXT - ); - } - } - - public get bullMqService(): OldInstanceBullMqService { - return this.instance; - } - - public get worker(): Worker { - return this.instance.worker; - } - - public initWorker(processor: WorkerProcessor, options?: WorkerOptions): void { - if (this.instance.enabled) { - this.createWorker(processor, options); - } - } - - public createWorker( - processor: WorkerProcessor, - options: WorkerOptions - ): void { - if (this.instance.enabled) { - this.instance.createWorker(this.topic, processor, options); - } else { - Logger.log( - { enabled: this.instance.enabled }, - 'We are not running OldInstanceStandardWorkerService as it is not needed in this environment', - LOG_CONTEXT - ); - } - } - - public async isRunning(): Promise { - return await this.instance.isWorkerRunning(); - } - - public async isPaused(): Promise { - return await this.instance.isWorkerPaused(); - } - - public async pause(): Promise { - if (this.instance.enabled && this.worker) { - await this.instance.pauseWorker(); - } - } - - public async resume(): Promise { - if (this.instance.enabled && this.worker) { - await this.instance.resumeWorker(); - } - } - - public async gracefulShutdown(): Promise { - if (this.instance.enabled) { - Logger.log( - 'Shutting the old instance standard worker service down', - LOG_CONTEXT - ); - - await this.instance.gracefulShutdown(); - - Logger.log( - 'Shutting down the old instance standard worker service has finished', - LOG_CONTEXT - ); - } - } - - async onModuleDestroy(): Promise { - await this.gracefulShutdown(); - } -} diff --git a/packages/application-generic/src/services/workers/old-instance-workflow-worker.service.ts b/packages/application-generic/src/services/workers/old-instance-workflow-worker.service.ts deleted file mode 100644 index 74a04372f35..00000000000 --- a/packages/application-generic/src/services/workers/old-instance-workflow-worker.service.ts +++ /dev/null @@ -1,106 +0,0 @@ -import { IJobData, JobTopicNameEnum } from '@novu/shared'; -import { Inject, Injectable, Logger } from '@nestjs/common'; - -import { - JobsOptions, - OldInstanceBullMqService, - Processor, - Worker, - WorkerOptions, -} from '../bull-mq'; - -const LOG_CONTEXT = 'OldInstanceWorkflowWorkerService'; - -type WorkerProcessor = string | Processor | undefined; - -/** - * TODO: Temporary for migration to MemoryDB - */ -export class OldInstanceWorkflowWorkerService { - private instance: OldInstanceBullMqService; - - public readonly DEFAULT_ATTEMPTS = 3; - public readonly topic: JobTopicNameEnum; - - constructor() { - this.topic = JobTopicNameEnum.WORKFLOW; - this.instance = new OldInstanceBullMqService(); - if (this.instance.enabled) { - Logger.log(`Worker ${this.topic} instantiated`, LOG_CONTEXT); - } else { - Logger.warn( - `Old instance workflow worker not instantiated as it is only needed for MemoryDB migration`, - LOG_CONTEXT - ); - } - } - - public get bullMqService(): OldInstanceBullMqService { - return this.instance; - } - - public get worker(): Worker { - return this.instance.worker; - } - - public initWorker(processor: WorkerProcessor, options?: WorkerOptions): void { - if (this.instance.enabled) { - this.createWorker(processor, options); - } - } - - public createWorker( - processor: WorkerProcessor, - options: WorkerOptions - ): void { - if (this.instance.enabled) { - this.instance.createWorker(this.topic, processor, options); - } else { - Logger.log( - { enabled: this.instance.enabled }, - 'We are not running OldInstanceWorkflowWorkerService as it is not needed in this environment', - LOG_CONTEXT - ); - } - } - - public async isRunning(): Promise { - return await this.instance.isWorkerRunning(); - } - - public async isPaused(): Promise { - return await this.instance.isWorkerPaused(); - } - - public async pause(): Promise { - if (this.instance.enabled && this.worker) { - await this.instance.pauseWorker(); - } - } - - public async resume(): Promise { - if (this.instance.enabled && this.worker) { - await this.instance.resumeWorker(); - } - } - - public async gracefulShutdown(): Promise { - if (this.instance.enabled) { - Logger.log( - 'Shutting the old instance workflow worker service down', - LOG_CONTEXT - ); - - await this.instance.gracefulShutdown(); - - Logger.log( - 'Shutting down the old instance workflow worker service has finished', - LOG_CONTEXT - ); - } - } - - async onModuleDestroy(): Promise { - await this.gracefulShutdown(); - } -}