From 437d43290ed8ab94fe87ac3b94322b4b675b62bb Mon Sep 17 00:00:00 2001 From: Gosha Date: Tue, 14 Nov 2023 11:24:51 +0200 Subject: [PATCH 1/5] feat: typed the queue add method and worker data object --- .../parse-event-request.usecase.ts | 10 +- .../services/inbound-parse.queue.service.ts | 5 +- .../inbound-email-parse.command.ts | 82 +++----------- .../mark-all-messages-as.usecase.ts | 19 +--- .../mark-message-as.usecase.ts | 12 +-- .../remove-message/remove-message.usecase.ts | 10 +- .../remove-all-messages.usecase.ts | 10 +- .../src/server/inbound-mail.service.spec.ts | 13 ++- apps/inbound-mail/src/server/index.ts | 6 +- .../services/active-jobs-metric.service.ts | 23 ++-- .../services/completed-jobs-metric.service.ts | 23 ++-- .../app/workflow/services/standard.worker.ts | 28 ++--- .../services/subscriber-process.worker.ts | 4 +- .../app/workflow/services/workflow.worker.ts | 7 +- .../send-message-in-app.usecase.ts | 12 +-- .../socket/services/web-socket.worker.spec.ts | 12 ++- .../src/socket/services/web-socket.worker.ts | 12 ++- .../src/custom-providers/index.ts | 7 +- .../src/dtos/inbound-parse-job.dto.ts | 100 ++++++++++++++++++ .../application-generic/src/dtos/index.ts | 8 ++ .../src/dtos/process-subscriber-job.dto.ts | 29 +++++ .../src/dtos/standard-job.dto.ts | 23 ++++ .../src/dtos/web-sockets-job.dto.ts | 30 ++++++ .../src/dtos/workflow-job.dto.ts | 34 ++++++ ...bscriber-process-queue.health-indicator.ts | 2 +- packages/application-generic/src/index.ts | 2 + .../src/modules/queues.module.ts | 2 +- .../src/services/bull-mq/bull-mq.service.ts | 2 +- .../application-generic/src/services/index.ts | 1 - .../active-jobs-metric-queue.service.spec.ts | 0 .../active-jobs-metric-queue.service.ts | 5 +- ...ompleted-jobs-metric-queue.service.spec.ts | 0 .../completed-jobs-metric-queue.service.ts | 5 +- .../inbound-parse-queue.service.spec.ts | 17 +-- .../inbound-parse-queue.service.ts | 16 ++- .../src/services/queues/index.ts | 15 ++- .../src/services/queues/queue-base.service.ts | 68 ++++++------ .../services/queues/standard-queue.service.ts | 17 --- .../standard-queue.service.spec.ts | 25 +++-- .../queues/standard/standard-queue.service.ts | 31 ++++++ .../subscriber-process-queue.service.ts | 15 ++- .../queues/web-sockets-queue.service.ts | 17 --- .../web-sockets-queue.service.spec.ts | 27 +++-- .../web-sockets/web-sockets-queue.service.ts | 28 +++++ .../services/queues/workflow-queue.service.ts | 17 --- .../workflow-queue.service.spec.ts | 13 ++- .../queues/workflow/workflow-queue.service.ts | 26 +++++ .../readiness/readiness.service.spec.ts | 7 +- .../src/usecases/add-job/add-job.usecase.ts | 22 ++-- .../subscriber-job-bound.command.ts | 9 +- .../trigger-event/trigger-event.usecase.ts | 20 ++-- 51 files changed, 598 insertions(+), 330 deletions(-) create mode 100644 packages/application-generic/src/dtos/inbound-parse-job.dto.ts create mode 100644 packages/application-generic/src/dtos/index.ts create mode 100644 packages/application-generic/src/dtos/process-subscriber-job.dto.ts create mode 100644 packages/application-generic/src/dtos/standard-job.dto.ts create mode 100644 packages/application-generic/src/dtos/web-sockets-job.dto.ts create mode 100644 packages/application-generic/src/dtos/workflow-job.dto.ts rename packages/application-generic/src/services/queues/{ => active-jobs-metric}/active-jobs-metric-queue.service.spec.ts (100%) rename packages/application-generic/src/services/queues/{ => active-jobs-metric}/active-jobs-metric-queue.service.ts (74%) rename packages/application-generic/src/services/queues/{ => completed-jobs-metric}/completed-jobs-metric-queue.service.spec.ts (100%) rename packages/application-generic/src/services/queues/{ => completed-jobs-metric}/completed-jobs-metric-queue.service.ts (74%) rename packages/application-generic/src/services/queues/{ => inbound-parse}/inbound-parse-queue.service.spec.ts (92%) rename packages/application-generic/src/services/queues/{ => inbound-parse}/inbound-parse-queue.service.ts (60%) delete mode 100644 packages/application-generic/src/services/queues/standard-queue.service.ts rename packages/application-generic/src/services/queues/{ => standard}/standard-queue.service.spec.ts (89%) create mode 100644 packages/application-generic/src/services/queues/standard/standard-queue.service.ts rename packages/application-generic/src/services/queues/{ => subscriber-process}/subscriber-process-queue.service.ts (52%) delete mode 100644 packages/application-generic/src/services/queues/web-sockets-queue.service.ts rename packages/application-generic/src/services/queues/{ => web-sockets}/web-sockets-queue.service.spec.ts (87%) create mode 100644 packages/application-generic/src/services/queues/web-sockets/web-sockets-queue.service.ts delete mode 100644 packages/application-generic/src/services/queues/workflow-queue.service.ts rename packages/application-generic/src/services/queues/{ => workflow}/workflow-queue.service.spec.ts (93%) create mode 100644 packages/application-generic/src/services/queues/workflow/workflow-queue.service.ts diff --git a/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts b/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts index 9d0d6b9c9a6..52f930767e7 100644 --- a/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts +++ b/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts @@ -104,18 +104,12 @@ export class ParseEventRequest { command.payload = merge({}, defaultPayload, command.payload); - let jobData = { + const jobData = { ...command, - actor: command.actor, transactionId, } as ParseEventRequestCommand; - if ((command as ParseEventRequestMulticastCommand).to?.length > 0) { - jobData = jobData as ParseEventRequestMulticastCommand; - jobData.to = (command as ParseEventRequestMulticastCommand).to; - } - - await this.workflowQueueService.add(transactionId, jobData, command.organizationId); + await this.workflowQueueService.add({ name: transactionId, data: jobData, groupId: command.organizationId }); return { acknowledged: true, diff --git a/apps/api/src/app/inbound-parse/services/inbound-parse.queue.service.ts b/apps/api/src/app/inbound-parse/services/inbound-parse.queue.service.ts index 208ede0aff3..f1e61a1a9de 100644 --- a/apps/api/src/app/inbound-parse/services/inbound-parse.queue.service.ts +++ b/apps/api/src/app/inbound-parse/services/inbound-parse.queue.service.ts @@ -3,15 +3,14 @@ import { InboundParseQueue, InboundParseWorker, Queue, - QueueOptions, Worker, WorkerOptions, } from '@novu/application-generic'; -import { JobTopicNameEnum } from '@novu/shared'; import { Injectable, Logger } from '@nestjs/common'; import { InboundEmailParse } from '../usecases/inbound-email-parse/inbound-email-parse.usecase'; import { InboundEmailParseCommand } from '../usecases/inbound-email-parse/inbound-email-parse.command'; +import { IInboundParseDataDto } from '@novu/application-generic/build/main/dtos/inbound-parse-job.dto'; const LOG_CONTEXT = 'InboundParseQueueService'; @@ -34,7 +33,7 @@ export class InboundParseQueueService { } public getWorkerProcessor() { - return async ({ data }: { data: InboundEmailParseCommand }) => { + return async ({ data }: { data: IInboundParseDataDto }) => { Logger.verbose({ data }, 'Processing the inbound parsed email', LOG_CONTEXT); await this.emailParseUsecase.execute(InboundEmailParseCommand.create({ ...data })); }; diff --git a/apps/api/src/app/inbound-parse/usecases/inbound-email-parse/inbound-email-parse.command.ts b/apps/api/src/app/inbound-parse/usecases/inbound-email-parse/inbound-email-parse.command.ts index 6514c9eb3cb..eced0917d13 100644 --- a/apps/api/src/app/inbound-parse/usecases/inbound-email-parse/inbound-email-parse.command.ts +++ b/apps/api/src/app/inbound-parse/usecases/inbound-email-parse/inbound-email-parse.command.ts @@ -1,7 +1,16 @@ import { IsDefined, IsNumber, IsOptional, IsString } from 'class-validator'; -import { BaseCommand } from '@novu/application-generic'; - -export class InboundEmailParseCommand extends BaseCommand { +import { + BaseCommand, + IConnection, + IEnvelopeFrom, + IEnvelopeTo, + IFrom, + IHeaders, + IInboundParseDataDto, + ITo, +} from '@novu/application-generic'; + +export class InboundEmailParseCommand extends BaseCommand implements IInboundParseDataDto { @IsDefined() @IsString() html: string; @@ -66,70 +75,3 @@ export class InboundEmailParseCommand extends BaseCommand { @IsDefined() envelopeTo: IEnvelopeTo[]; } - -export interface IHeaders { - 'content-type': string; - from: string; - to: string; - subject: string; - 'message-id': string; - date: string; - 'mime-version': string; -} - -export interface IFrom { - address: string; - name: string; -} - -export interface ITo { - address: string; - name: string; -} - -export interface ITlsOptions { - name: string; - standardName: string; - version: string; -} - -export interface IMailFrom { - address: string; - args: boolean; -} - -export interface IRcptTo { - address: string; - args: boolean; -} - -export interface IEnvelope { - mailFrom: IMailFrom; - rcptTo: IRcptTo[]; -} - -export interface IConnection { - id: string; - remoteAddress: string; - remotePort: number; - clientHostname: string; - openingCommand: string; - hostNameAppearsAs: string; - xClient: any; - xForward: any; - transmissionType: string; - tlsOptions: ITlsOptions; - envelope: IEnvelope; - transaction: number; - mailPath: string; -} - -export interface IEnvelopeFrom { - address: string; - args: boolean; -} - -export interface IEnvelopeTo { - address: string; - args: boolean; -} diff --git a/apps/api/src/app/widgets/usecases/mark-all-messages-as/mark-all-messages-as.usecase.ts b/apps/api/src/app/widgets/usecases/mark-all-messages-as/mark-all-messages-as.usecase.ts index 767ea7b7d57..d02391528ba 100644 --- a/apps/api/src/app/widgets/usecases/mark-all-messages-as/mark-all-messages-as.usecase.ts +++ b/apps/api/src/app/widgets/usecases/mark-all-messages-as/mark-all-messages-as.usecase.ts @@ -56,24 +56,15 @@ export class MarkAllMessagesAs { const isUnreadCountChanged = command.markAs === MarkMessagesAsEnum.READ || command.markAs === MarkMessagesAsEnum.UNREAD; - const countQuery = isUnreadCountChanged ? { read: false } : { seen: false }; - - const count = await this.messageRepository.getCount( - command.environmentId, - subscriber._id, - ChannelTypeEnum.IN_APP, - countQuery - ); - - this.webSocketsQueueService.add( - 'sendMessage', - { + this.webSocketsQueueService.add({ + name: 'sendMessage', + data: { event: isUnreadCountChanged ? WebSocketEventEnum.UNREAD : WebSocketEventEnum.UNSEEN, userId: subscriber._id, _environmentId: command.environmentId, }, - subscriber._organizationId - ); + groupId: subscriber._organizationId, + }); this.analyticsService.track( `Mark all messages as ${command.markAs}- [Notification Center]`, diff --git a/apps/api/src/app/widgets/usecases/mark-message-as/mark-message-as.usecase.ts b/apps/api/src/app/widgets/usecases/mark-message-as/mark-message-as.usecase.ts index 35bb46f4f83..f5b9952b644 100644 --- a/apps/api/src/app/widgets/usecases/mark-message-as/mark-message-as.usecase.ts +++ b/apps/api/src/app/widgets/usecases/mark-message-as/mark-message-as.usecase.ts @@ -1,6 +1,6 @@ import { Injectable, NotFoundException } from '@nestjs/common'; import { MessageEntity, MessageRepository, SubscriberRepository, SubscriberEntity, MemberRepository } from '@novu/dal'; -import { ChannelTypeEnum, WebSocketEventEnum } from '@novu/shared'; +import { WebSocketEventEnum } from '@novu/shared'; import { WebSocketsQueueService, AnalyticsService, @@ -85,15 +85,15 @@ export class MarkMessageAs { private updateSocketCount(subscriber: SubscriberEntity, mark: MarkEnum) { const eventMessage = mark === MarkEnum.READ ? WebSocketEventEnum.UNREAD : WebSocketEventEnum.UNSEEN; - this.webSocketsQueueService.add( - 'sendMessage', - { + this.webSocketsQueueService.add({ + name: 'sendMessage', + data: { event: eventMessage, userId: subscriber._id, _environmentId: subscriber._environmentId, }, - subscriber._organizationId - ); + groupId: subscriber._organizationId, + }); } @CachedEntity({ builder: (command: { subscriberId: string; _environmentId: string }) => diff --git a/apps/api/src/app/widgets/usecases/remove-message/remove-message.usecase.ts b/apps/api/src/app/widgets/usecases/remove-message/remove-message.usecase.ts index eff7b3eac25..b38e5e08d4d 100644 --- a/apps/api/src/app/widgets/usecases/remove-message/remove-message.usecase.ts +++ b/apps/api/src/app/widgets/usecases/remove-message/remove-message.usecase.ts @@ -98,14 +98,14 @@ export class RemoveMessage { private updateSocketCount(subscriber: SubscriberEntity, mark: MarkEnum) { const eventMessage = mark === MarkEnum.READ ? WebSocketEventEnum.UNREAD : WebSocketEventEnum.UNSEEN; - this.webSocketsQueueService.add( - 'sendMessage', - { + this.webSocketsQueueService.add({ + name: 'sendMessage', + data: { event: eventMessage, userId: subscriber._id, _environmentId: subscriber._environmentId, }, - subscriber._organizationId - ); + groupId: subscriber._organizationId, + }); } } diff --git a/apps/api/src/app/widgets/usecases/remove-messages/remove-all-messages.usecase.ts b/apps/api/src/app/widgets/usecases/remove-messages/remove-all-messages.usecase.ts index 8025a66b4e9..4c25d6af4ed 100644 --- a/apps/api/src/app/widgets/usecases/remove-messages/remove-all-messages.usecase.ts +++ b/apps/api/src/app/widgets/usecases/remove-messages/remove-all-messages.usecase.ts @@ -100,14 +100,14 @@ export class RemoveAllMessages { private updateSocketCount(subscriber: SubscriberEntity, mark: string) { const eventMessage = mark === MarkEnum.READ ? WebSocketEventEnum.UNREAD : WebSocketEventEnum.UNSEEN; - this.webSocketsQueueService.add( - 'sendMessage', - { + this.webSocketsQueueService.add({ + name: 'sendMessage', + data: { event: eventMessage, userId: subscriber._id, _environmentId: subscriber._environmentId, }, - subscriber._organizationId - ); + groupId: subscriber._organizationId, + }); } } diff --git a/apps/inbound-mail/src/server/inbound-mail.service.spec.ts b/apps/inbound-mail/src/server/inbound-mail.service.spec.ts index f0e1a674848..15bd893ece1 100644 --- a/apps/inbound-mail/src/server/inbound-mail.service.spec.ts +++ b/apps/inbound-mail/src/server/inbound-mail.service.spec.ts @@ -1,6 +1,7 @@ import { expect } from 'chai'; import { InboundMailService } from './inbound-mail.service'; +import { IInboundParseDataDto } from '@novu/application-generic'; let inboundMailService: InboundMailService; @@ -65,7 +66,11 @@ describe('Inbound Mail Service', () => { _organizationId, _userId, }; - await inboundMailService.inboundParseQueueService.add(jobId, jobData, _organizationId); + await inboundMailService.inboundParseQueueService.add({ + name: jobId, + data: jobData as unknown as IInboundParseDataDto, + groupId: _organizationId, + }); expect(await inboundMailService.inboundParseQueueService.queue.getActiveCount()).to.equal(0); expect(await inboundMailService.inboundParseQueueService.queue.getWaitingCount()).to.equal(1); @@ -93,7 +98,11 @@ describe('Inbound Mail Service', () => { _organizationId, _userId, }; - await inboundMailService.inboundParseQueueService.addMinimalJob(jobId, jobData, _organizationId); + await inboundMailService.inboundParseQueueService.addMinimalJob({ + name: jobId, + data: jobData, + groupId: _organizationId, + }); expect(await inboundMailService.inboundParseQueueService.queue.getActiveCount()).to.equal(0); expect(await inboundMailService.inboundParseQueueService.queue.getWaitingCount()).to.equal(1); diff --git a/apps/inbound-mail/src/server/index.ts b/apps/inbound-mail/src/server/index.ts index d8ec1afc28d..aa4d5d137ca 100644 --- a/apps/inbound-mail/src/server/index.ts +++ b/apps/inbound-mail/src/server/index.ts @@ -364,7 +364,11 @@ class Mailin extends events.EventEmitter { const username: string = parts[0]; const environmentId = username.split('-nv-e=').at(-1); - inboundMailService.inboundParseQueueService.add(finalizedMessage.messageId, finalizedMessage, environmentId); + inboundMailService.inboundParseQueueService.add({ + name: finalizedMessage.messageId, + data: finalizedMessage, + groupId: environmentId, + }); return resolve(); }); diff --git a/apps/worker/src/app/workflow/services/active-jobs-metric.service.ts b/apps/worker/src/app/workflow/services/active-jobs-metric.service.ts index 36bb21f1d9e..026f8cfe5f8 100644 --- a/apps/worker/src/app/workflow/services/active-jobs-metric.service.ts +++ b/apps/worker/src/app/workflow/services/active-jobs-metric.service.ts @@ -58,16 +58,21 @@ export class ActiveJobsMetricService { if (!exists) { Logger.debug(`metricJob doesn't exist, creating it`, LOG_CONTEXT); - return await this.activeJobsMetricQueueService.add(METRIC_JOB_ID, undefined, '', { - jobId: METRIC_JOB_ID, - repeatJobKey: METRIC_JOB_ID, - repeat: { - immediately: true, - pattern: '* * * * * *', + return await this.activeJobsMetricQueueService.add({ + name: METRIC_JOB_ID, + data: undefined, + groupId: '', + options: { + jobId: METRIC_JOB_ID, + repeatJobKey: METRIC_JOB_ID, + repeat: { + immediately: true, + pattern: '* * * * * *', + }, + removeOnFail: true, + removeOnComplete: true, + attempts: 1, }, - removeOnFail: true, - removeOnComplete: true, - attempts: 1, }); } diff --git a/apps/worker/src/app/workflow/services/completed-jobs-metric.service.ts b/apps/worker/src/app/workflow/services/completed-jobs-metric.service.ts index 0e08b22ec99..e79248b5dc7 100644 --- a/apps/worker/src/app/workflow/services/completed-jobs-metric.service.ts +++ b/apps/worker/src/app/workflow/services/completed-jobs-metric.service.ts @@ -59,16 +59,21 @@ export class CompletedJobsMetricService { if (!exists) { Logger.debug(`metricJob doesn't exist, creating it`, LOG_CONTEXT); - return await this.completedJobsMetricQueueService.add(METRIC_JOB_ID, undefined, '', { - jobId: METRIC_JOB_ID, - repeatJobKey: METRIC_JOB_ID, - repeat: { - immediately: true, - pattern: '0 * * * * *', + return await this.completedJobsMetricQueueService.add({ + name: METRIC_JOB_ID, + data: undefined, + groupId: '', + options: { + jobId: METRIC_JOB_ID, + repeatJobKey: METRIC_JOB_ID, + repeat: { + immediately: true, + pattern: '0 * * * * *', + }, + removeOnFail: true, + removeOnComplete: true, + attempts: 1, }, - removeOnFail: true, - removeOnComplete: true, - attempts: 1, }); } diff --git a/apps/worker/src/app/workflow/services/standard.worker.ts b/apps/worker/src/app/workflow/services/standard.worker.ts index 9253993de62..8563dac86aa 100644 --- a/apps/worker/src/app/workflow/services/standard.worker.ts +++ b/apps/worker/src/app/workflow/services/standard.worker.ts @@ -1,14 +1,10 @@ const nr = require('newrelic'); import { forwardRef, Inject, Injectable, Logger } from '@nestjs/common'; -import { - ExecutionDetailsSourceEnum, - ExecutionDetailsStatusEnum, - IJobData, - ObservabilityBackgroundTransactionEnum, -} from '@novu/shared'; +import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; import { getStandardWorkerOptions, INovuWorker, + IStandardDataDto, Job, PinoLogger, StandardWorkerService, @@ -45,7 +41,7 @@ export class StandardWorker extends StandardWorkerService implements INovuWorker this.initWorker(this.getWorkerProcessor(), this.getWorkerOptions()); - this.worker.on('failed', async (job: Job, error: Error): Promise => { + this.worker.on('failed', async (job: Job, error: Error): Promise => { await this.jobHasFailed(job, error); }); } @@ -59,22 +55,26 @@ export class StandardWorker extends StandardWorkerService implements INovuWorker }; } - private extractMinimalJobData(job: any): { + private extractMinimalJobData(data: IStandardDataDto): { environmentId: string; organizationId: string; jobId: string; userId: string; } { - const { _environmentId: environmentId, _id: jobId, _organizationId: organizationId, _userId: userId } = job; + const { _environmentId: environmentId, _id: jobId, _organizationId: organizationId, _userId: userId } = data; if (!environmentId || !jobId || !organizationId || !userId) { - const message = job.payload.message; + const message = data.payload?.message; + + if (!message) { + throw new Error('Job data is missing required fields' + JSON.stringify(data)); + } return { environmentId: message._environmentId, jobId: message._jobId, organizationId: message._organizationId, - userId: job.userId, + userId: userId, }; } @@ -87,7 +87,7 @@ export class StandardWorker extends StandardWorkerService implements INovuWorker } private getWorkerProcessor() { - return async ({ data }: { data: IJobData | any }) => { + return async ({ data }: { data: IStandardDataDto }) => { const minimalJobData = this.extractMinimalJobData(data); Logger.verbose(`Job ${minimalJobData.jobId} is being processed in the new instance standard worker`, LOG_CONTEXT); @@ -125,7 +125,7 @@ export class StandardWorker extends StandardWorkerService implements INovuWorker }; } - private async jobHasCompleted(job: Job): Promise { + private async jobHasCompleted(job: Job): Promise { let jobId; try { @@ -146,7 +146,7 @@ export class StandardWorker extends StandardWorkerService implements INovuWorker } } - private async jobHasFailed(job: Job, error: Error): Promise { + private async jobHasFailed(job: Job, error: Error): Promise { let jobId; try { diff --git a/apps/worker/src/app/workflow/services/subscriber-process.worker.ts b/apps/worker/src/app/workflow/services/subscriber-process.worker.ts index f70cb8574d4..991efb359fe 100644 --- a/apps/worker/src/app/workflow/services/subscriber-process.worker.ts +++ b/apps/worker/src/app/workflow/services/subscriber-process.worker.ts @@ -6,10 +6,10 @@ import { SubscriberJobBound, SubscriberProcessWorkerService, PinoLogger, - SubscriberJobBoundCommand, storage, Store, WorkerOptions, + IProcessSubscriberDataDto, } from '@novu/application-generic'; const LOG_CONTEXT = 'SubscriberProcessWorker'; @@ -22,7 +22,7 @@ export class SubscriberProcessWorker extends SubscriberProcessWorkerService { } public getWorkerProcessor() { - return async ({ data }: { data: SubscriberJobBoundCommand }) => { + return async ({ data }: { data: IProcessSubscriberDataDto }) => { return await new Promise(async (resolve, reject) => { // eslint-disable-next-line @typescript-eslint/no-this-alias const _this = this; diff --git a/apps/worker/src/app/workflow/services/workflow.worker.ts b/apps/worker/src/app/workflow/services/workflow.worker.ts index 7bd417a00ef..cefe69f1593 100644 --- a/apps/worker/src/app/workflow/services/workflow.worker.ts +++ b/apps/worker/src/app/workflow/services/workflow.worker.ts @@ -7,10 +7,11 @@ import { storage, Store, TriggerEvent, - TriggerEventCommand, WorkflowWorkerService, WorkerOptions, WorkerProcessor, + IWorkflowDataDto, + TriggerEventCommand, } from '@novu/application-generic'; import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; @@ -29,7 +30,7 @@ export class WorkflowWorker extends WorkflowWorkerService implements INovuWorker } private getWorkerProcessor(): WorkerProcessor { - return async ({ data }: { data: TriggerEventCommand }) => { + return async ({ data }: { data: IWorkflowDataDto }) => { return await new Promise(async (resolve, reject) => { // eslint-disable-next-line @typescript-eslint/no-this-alias const _this = this; @@ -44,7 +45,7 @@ export class WorkflowWorker extends WorkflowWorkerService implements INovuWorker storage.run(new Store(PinoLogger.root), () => { _this.triggerEventUsecase - .execute(data) + .execute(data as TriggerEventCommand) .then(resolve) .catch(reject) .finally(() => { diff --git a/apps/worker/src/app/workflow/usecases/send-message/send-message-in-app.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/send-message-in-app.usecase.ts index b5afc4b0ba0..059287988fe 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/send-message-in-app.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/send-message-in-app.usecase.ts @@ -260,9 +260,9 @@ export class SendMessageInApp extends SendMessageBase { }) ); - await this.webSocketsQueueService.bullMqService.add( - 'sendMessage', - { + await this.webSocketsQueueService.add({ + name: 'sendMessage', + data: { event: WebSocketEventEnum.RECEIVED, userId: command._subscriberId, _environmentId: command.environmentId, @@ -270,12 +270,12 @@ export class SendMessageInApp extends SendMessageBase { messageId: message._id, }, }, - { + options: { removeOnComplete: true, removeOnFail: true, }, - command.organizationId - ); + groupId: command.organizationId, + }); await this.createExecutionDetails.execute( CreateExecutionDetailsCommand.create({ diff --git a/apps/ws/src/socket/services/web-socket.worker.spec.ts b/apps/ws/src/socket/services/web-socket.worker.spec.ts index 6a8f7bedf6a..6ce3d5f954b 100644 --- a/apps/ws/src/socket/services/web-socket.worker.spec.ts +++ b/apps/ws/src/socket/services/web-socket.worker.spec.ts @@ -8,6 +8,8 @@ import { WebSocketWorker } from './web-socket.worker'; import { SocketModule } from '../socket.module'; import { ExternalServicesRoute } from '../usecases/external-services-route'; +import { IWebSocketDataDto } from '@novu/application-generic/build/main/dtos/web-sockets-job.dto'; +import { WebSocketEventEnum } from '@novu/shared'; let webSocketsQueueService: WebSocketsQueueService; let webSocketWorker: WebSocketWorker; @@ -57,15 +59,15 @@ describe('WebSocket Worker', () => { const _environmentId = 'web-socket-queue-environment-id'; const _organizationId = 'web-socket-queue-organization-id'; const _userId = 'web-socket-queue-user-id'; - const jobData = { - _id: jobId, - test: 'web-socket-queue-job-data', + const jobData: IWebSocketDataDto = { + payload: { messageId: 'web-socket-queue-job-message-id' }, + event: WebSocketEventEnum.RECEIVED, _environmentId, _organizationId, - _userId, + userId: _userId, }; - await webSocketsQueueService.add(jobId, jobData, _organizationId); + await webSocketsQueueService.add({ name: jobId, data: jobData, groupId: _organizationId }); expect(await webSocketsQueueService.queue.getActiveCount()).to.equal(1); expect(await webSocketsQueueService.queue.getWaitingCount()).to.equal(0); diff --git a/apps/ws/src/socket/services/web-socket.worker.ts b/apps/ws/src/socket/services/web-socket.worker.ts index 47dcb560ca6..dea3942ea47 100644 --- a/apps/ws/src/socket/services/web-socket.worker.ts +++ b/apps/ws/src/socket/services/web-socket.worker.ts @@ -1,3 +1,5 @@ +import { IWebSocketDataDto } from '@novu/application-generic/build/main/dtos/web-sockets-job.dto'; + const nr = require('newrelic'); import { Injectable, Logger } from '@nestjs/common'; @@ -37,14 +39,16 @@ export class WebSocketWorker extends WebSocketsWorkerService implements INovuWor 'WS Service', function () { const transaction = nr.getTransaction(); + const { data: jobData } = job; + const data = jobData as IWebSocketDataDto; _this.externalServicesRoute .execute( ExternalServicesRouteCommand.create({ - userId: job.data.userId, - event: job.data.event, - payload: job.data.payload, - _environmentId: job.data._environmentId, + userId: data.userId, + event: data.event, + payload: data.payload, + _environmentId: data._environmentId, }) ) .then(resolve) diff --git a/packages/application-generic/src/custom-providers/index.ts b/packages/application-generic/src/custom-providers/index.ts index 0a36030d08c..d27dc5215e0 100644 --- a/packages/application-generic/src/custom-providers/index.ts +++ b/packages/application-generic/src/custom-providers/index.ts @@ -5,12 +5,13 @@ import { CacheService, DistributedLockService, FeatureFlagsService, - ReadinessService, +} from '../services'; +import { StandardQueueService, - SubscriberProcessQueueService, WebSocketsQueueService, WorkflowQueueService, -} from '../services'; + SubscriberProcessQueueService, +} from '../services/queues'; import { GetIsTopicNotificationEnabled, GetUseMergedDigestId, diff --git a/packages/application-generic/src/dtos/inbound-parse-job.dto.ts b/packages/application-generic/src/dtos/inbound-parse-job.dto.ts new file mode 100644 index 00000000000..8d289de2acc --- /dev/null +++ b/packages/application-generic/src/dtos/inbound-parse-job.dto.ts @@ -0,0 +1,100 @@ +import { + IBulkJobParams, + IJobParams, +} from '../services/queues/queue-base.service'; + +export interface IInboundParseDataDto { + html: string; + text: string; + headers: IHeaders; + subject: string; + messageId: string; + priority: string; + from: IFrom[]; + to: ITo[]; + date: Date; + dkim: string; + spf: string; + spamScore: number; + language: string; + cc: any[]; + attachments?: any[]; + connection: IConnection; + envelopeFrom: IEnvelopeFrom; + envelopeTo: IEnvelopeTo[]; +} + +export interface IHeaders { + 'content-type': string; + from: string; + to: string; + subject: string; + 'message-id': string; + date: string; + 'mime-version': string; +} + +export interface IFrom { + address: string; + name: string; +} + +export interface ITo { + address: string; + name: string; +} + +export interface ITlsOptions { + name: string; + standardName: string; + version: string; +} + +export interface IMailFrom { + address: string; + args: boolean; +} + +export interface IRcptTo { + address: string; + args: boolean; +} + +export interface IEnvelope { + mailFrom: IMailFrom; + rcptTo: IRcptTo[]; +} + +export interface IConnection { + id: string; + remoteAddress: string; + remotePort: number; + clientHostname: string; + openingCommand: string; + hostNameAppearsAs: string; + xClient: any; + xForward: any; + transmissionType: string; + tlsOptions: ITlsOptions; + envelope: IEnvelope; + transaction: number; + mailPath: string; +} + +export interface IEnvelopeFrom { + address: string; + args: boolean; +} + +export interface IEnvelopeTo { + address: string; + args: boolean; +} + +export interface IInboundParseJobDto extends IJobParams { + data?: IInboundParseDataDto; +} + +export interface IInboundParseBulkJobDto extends IBulkJobParams { + data: IInboundParseDataDto; +} diff --git a/packages/application-generic/src/dtos/index.ts b/packages/application-generic/src/dtos/index.ts new file mode 100644 index 00000000000..143c9cfb863 --- /dev/null +++ b/packages/application-generic/src/dtos/index.ts @@ -0,0 +1,8 @@ +export * from './inbound-parse-job.dto'; +export { + IProcessSubscriberJobDto, + IProcessSubscriberDataDto, +} from './process-subscriber-job.dto'; +export { IStandardJobDto, IStandardDataDto } from './standard-job.dto'; +export * from './web-sockets-job.dto'; +export { IWorkflowJobDto, IWorkflowDataDto } from './workflow-job.dto'; diff --git a/packages/application-generic/src/dtos/process-subscriber-job.dto.ts b/packages/application-generic/src/dtos/process-subscriber-job.dto.ts new file mode 100644 index 00000000000..2d3337db7ca --- /dev/null +++ b/packages/application-generic/src/dtos/process-subscriber-job.dto.ts @@ -0,0 +1,29 @@ +import { ISubscribersDefine, ITenantDefine } from '@novu/shared'; +import { SubscriberEntity } from '@novu/dal'; + +import { + IBulkJobParams, + IJobParams, +} from '../services/queues/queue-base.service'; + +export interface IProcessSubscriberDataDto { + environmentId: string; + organizationId: string; + userId: string; + transactionId: string; + identifier: string; + payload: any; + overrides: Record>; + tenant?: ITenantDefine | null; + actor?: SubscriberEntity; + subscriber: ISubscribersDefine; + templateId: string; +} + +export interface IProcessSubscriberJobDto extends IJobParams { + data?: IProcessSubscriberDataDto; +} + +export interface IProcessSubscriberBulkJobDto extends IBulkJobParams { + data: IProcessSubscriberDataDto; +} diff --git a/packages/application-generic/src/dtos/standard-job.dto.ts b/packages/application-generic/src/dtos/standard-job.dto.ts new file mode 100644 index 00000000000..3bf47239061 --- /dev/null +++ b/packages/application-generic/src/dtos/standard-job.dto.ts @@ -0,0 +1,23 @@ +import { IJobParams } from '../services/queues/queue-base.service'; + +export class IStandardDataDto { + _userId: string; + _environmentId: string; + _organizationId: string; + _id: string; + /* + * payload is deprecated - todo remove 'payload' once the queue renewed + * payload was added due backwards compatibility, the legacy use is in standard-worker + */ + payload?: { + message: { + _jobId: string; + _environmentId: string; + _organizationId: string; + }; + }; +} + +export interface IStandardJobDto extends IJobParams { + data?: IStandardDataDto; +} diff --git a/packages/application-generic/src/dtos/web-sockets-job.dto.ts b/packages/application-generic/src/dtos/web-sockets-job.dto.ts new file mode 100644 index 00000000000..e7a1a0f493f --- /dev/null +++ b/packages/application-generic/src/dtos/web-sockets-job.dto.ts @@ -0,0 +1,30 @@ +import { WebSocketEventEnum } from '@novu/shared'; + +import { + IBulkJobParams, + IJobParams, +} from '../services/queues/queue-base.service'; +import { JobsOptions } from '../services/bull-mq'; + +export interface IWebSocketDataDto { + event: WebSocketEventEnum; + userId: string; + _environmentId: string; + _organizationId?: string; + payload?: { messageId: string }; +} + +export interface IWebSocketJob extends IJobParams { + name: string; + data: any; + groupId?: string; + options?: JobsOptions; +} + +export interface IWebSocketJobDto extends IWebSocketJob { + data: IWebSocketDataDto; +} + +export interface IWebSocketBulkJobDto extends IBulkJobParams { + data: IWebSocketDataDto; +} diff --git a/packages/application-generic/src/dtos/workflow-job.dto.ts b/packages/application-generic/src/dtos/workflow-job.dto.ts new file mode 100644 index 00000000000..785592b51f3 --- /dev/null +++ b/packages/application-generic/src/dtos/workflow-job.dto.ts @@ -0,0 +1,34 @@ +import { + AddressingTypeEnum, + TriggerRecipients, + TriggerRecipientSubscriber, + TriggerTenantContext, +} from '@novu/shared'; +import { + IBulkJobParams, + IJobParams, +} from '../services/queues/queue-base.service'; + +export type Addressing = + | { to: TriggerRecipients } + | { addressingType: AddressingTypeEnum }; + +export type IWorkflowDataDto = { + environmentId: string; + organizationId: string; + userId: string; + identifier: string; + payload: any; // eslint-disable-line @typescript-eslint/no-explicit-any + overrides: Record>; + transactionId?: string; + actor?: TriggerRecipientSubscriber | null; + tenant?: TriggerTenantContext | null; +} & Addressing; + +export interface IWorkflowJobDto extends IJobParams { + data?: IWorkflowDataDto; +} + +export interface IWorkflowBulkJobDto extends IBulkJobParams { + data: IWorkflowDataDto; +} diff --git a/packages/application-generic/src/health/subscriber-process-queue.health-indicator.ts b/packages/application-generic/src/health/subscriber-process-queue.health-indicator.ts index d9e6a9384ee..7e5149f99f8 100644 --- a/packages/application-generic/src/health/subscriber-process-queue.health-indicator.ts +++ b/packages/application-generic/src/health/subscriber-process-queue.health-indicator.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { SubscriberProcessQueueService } from '../services/queues'; +import { SubscriberProcessQueueService } from '../services/queues/subscriber-process/subscriber-process-queue.service'; import { ObservabilityBackgroundTransactionEnum } from '@novu/shared'; import { QueueHealthIndicator } from './queue-health-indicator.service'; diff --git a/packages/application-generic/src/index.ts b/packages/application-generic/src/index.ts index ab892147cfa..d29ca659e38 100644 --- a/packages/application-generic/src/index.ts +++ b/packages/application-generic/src/index.ts @@ -5,6 +5,7 @@ export * from './factories/index'; export * from './health/index'; export * from './encryption/index'; export * from './services'; +export * from './services/queues'; export * from './logging/index'; export * from './modules'; export * from './usecases'; @@ -17,3 +18,4 @@ export * from './utils/exceptions'; export * from './utils/email-normalization'; export * from './decorators/external-api.decorator'; export * from './decorators/user-session.decorator'; +export * from './dtos'; diff --git a/packages/application-generic/src/modules/queues.module.ts b/packages/application-generic/src/modules/queues.module.ts index ecc6e34098d..d3565ae4092 100644 --- a/packages/application-generic/src/modules/queues.module.ts +++ b/packages/application-generic/src/modules/queues.module.ts @@ -16,9 +16,9 @@ import { CompletedJobsMetricQueueService, InboundParseQueue, StandardQueueService, - SubscriberProcessQueueService, WebSocketsQueueService, WorkflowQueueService, + SubscriberProcessQueueService, } from '../services/queues'; import { ActiveJobsMetricWorkerService, diff --git a/packages/application-generic/src/services/bull-mq/bull-mq.service.ts b/packages/application-generic/src/services/bull-mq/bull-mq.service.ts index 190b9ef18a0..d3e21570652 100644 --- a/packages/application-generic/src/services/bull-mq/bull-mq.service.ts +++ b/packages/application-generic/src/services/bull-mq/bull-mq.service.ts @@ -210,7 +210,7 @@ export class BullMqService { data: { name: string; data: BullMqJobData; - options: BulkJobOptions; + options?: BulkJobOptions; groupId?: string; }[] ) { diff --git a/packages/application-generic/src/services/index.ts b/packages/application-generic/src/services/index.ts index 25582f4f071..04a68c19c39 100644 --- a/packages/application-generic/src/services/index.ts +++ b/packages/application-generic/src/services/index.ts @@ -2,7 +2,6 @@ export * from './in-memory-provider'; export * from './launch-darkly.service'; export * from './feature-flags.service'; export * from './cache'; -export * from './queues'; export * from './workers'; export { INovuWorker, ReadinessService } from './readiness'; export { AnalyticsService } from './analytics.service'; diff --git a/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.spec.ts b/packages/application-generic/src/services/queues/active-jobs-metric/active-jobs-metric-queue.service.spec.ts similarity index 100% rename from packages/application-generic/src/services/queues/active-jobs-metric-queue.service.spec.ts rename to packages/application-generic/src/services/queues/active-jobs-metric/active-jobs-metric-queue.service.spec.ts diff --git a/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.ts b/packages/application-generic/src/services/queues/active-jobs-metric/active-jobs-metric-queue.service.ts similarity index 74% rename from packages/application-generic/src/services/queues/active-jobs-metric-queue.service.ts rename to packages/application-generic/src/services/queues/active-jobs-metric/active-jobs-metric-queue.service.ts index 21571e33425..daf9a0b6fd2 100644 --- a/packages/application-generic/src/services/queues/active-jobs-metric-queue.service.ts +++ b/packages/application-generic/src/services/queues/active-jobs-metric/active-jobs-metric-queue.service.ts @@ -1,7 +1,6 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; - -import { QueueBaseService } from './queue-base.service'; +import { QueueBaseService } from '../queue-base.service'; const LOG_CONTEXT = 'ActiveJobsMetricQueueService'; diff --git a/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.spec.ts b/packages/application-generic/src/services/queues/completed-jobs-metric/completed-jobs-metric-queue.service.spec.ts similarity index 100% rename from packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.spec.ts rename to packages/application-generic/src/services/queues/completed-jobs-metric/completed-jobs-metric-queue.service.spec.ts diff --git a/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.ts b/packages/application-generic/src/services/queues/completed-jobs-metric/completed-jobs-metric-queue.service.ts similarity index 74% rename from packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.ts rename to packages/application-generic/src/services/queues/completed-jobs-metric/completed-jobs-metric-queue.service.ts index 1ef74d97871..02ba111a7e5 100644 --- a/packages/application-generic/src/services/queues/completed-jobs-metric-queue.service.ts +++ b/packages/application-generic/src/services/queues/completed-jobs-metric/completed-jobs-metric-queue.service.ts @@ -1,7 +1,6 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; - -import { QueueBaseService } from './queue-base.service'; +import { QueueBaseService } from '../queue-base.service'; const LOG_CONTEXT = 'CompletedJobsMetricQueueService'; diff --git a/packages/application-generic/src/services/queues/inbound-parse-queue.service.spec.ts b/packages/application-generic/src/services/queues/inbound-parse/inbound-parse-queue.service.spec.ts similarity index 92% rename from packages/application-generic/src/services/queues/inbound-parse-queue.service.spec.ts rename to packages/application-generic/src/services/queues/inbound-parse/inbound-parse-queue.service.spec.ts index 5ed92a3231d..d91971bdb76 100644 --- a/packages/application-generic/src/services/queues/inbound-parse-queue.service.spec.ts +++ b/packages/application-generic/src/services/queues/inbound-parse/inbound-parse-queue.service.spec.ts @@ -1,6 +1,7 @@ import { Test } from '@nestjs/testing'; import { InboundParseQueueService } from './inbound-parse-queue.service'; +import { IInboundParseDataDto } from '../../../dtos'; let inboundParseQueueService: InboundParseQueueService; @@ -65,7 +66,11 @@ describe('Inbound Parse Queue service', () => { _organizationId, _userId, }; - await inboundParseQueueService.add(jobId, jobData, _organizationId); + await inboundParseQueueService.add({ + name: jobId, + data: jobData as unknown as IInboundParseDataDto, + groupId: _organizationId, + }); expect(await inboundParseQueueService.queue.getActiveCount()).toEqual(0); expect(await inboundParseQueueService.queue.getWaitingCount()).toEqual(1); @@ -96,11 +101,11 @@ describe('Inbound Parse Queue service', () => { _organizationId, _userId, }; - await inboundParseQueueService.addMinimalJob( - jobId, - jobData, - _organizationId - ); + await inboundParseQueueService.addMinimalJob({ + name: jobId, + data: jobData, + groupId: _organizationId, + }); expect(await inboundParseQueueService.queue.getActiveCount()).toEqual(0); expect(await inboundParseQueueService.queue.getWaitingCount()).toEqual(1); diff --git a/packages/application-generic/src/services/queues/inbound-parse-queue.service.ts b/packages/application-generic/src/services/queues/inbound-parse/inbound-parse-queue.service.ts similarity index 60% rename from packages/application-generic/src/services/queues/inbound-parse-queue.service.ts rename to packages/application-generic/src/services/queues/inbound-parse/inbound-parse-queue.service.ts index 184b99dd09a..b75801a9cd2 100644 --- a/packages/application-generic/src/services/queues/inbound-parse-queue.service.ts +++ b/packages/application-generic/src/services/queues/inbound-parse/inbound-parse-queue.service.ts @@ -1,9 +1,9 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; +import { QueueOptions } from '../../bull-mq'; -import { QueueBaseService } from './queue-base.service'; - -import { QueueOptions } from '../bull-mq'; +import { QueueBaseService } from '../queue-base.service'; +import { IInboundParseBulkJobDto, IInboundParseJobDto } from '../../../dtos'; const LOG_CONTEXT = 'InboundParseQueueService'; @@ -17,6 +17,14 @@ export class InboundParseQueueService extends QueueBaseService { this.createQueue(this.getOverrideOptions()); } + public async add(data: IInboundParseJobDto) { + return await super.add(data); + } + + public async addBulk(data: IInboundParseBulkJobDto[]) { + return await super.addBulk(data); + } + private getOverrideOptions(): QueueOptions { return { defaultJobOptions: { diff --git a/packages/application-generic/src/services/queues/index.ts b/packages/application-generic/src/services/queues/index.ts index 695693a996d..dd77afff295 100644 --- a/packages/application-generic/src/services/queues/index.ts +++ b/packages/application-generic/src/services/queues/index.ts @@ -1,12 +1,11 @@ +import { ActiveJobsMetricQueueService } from './active-jobs-metric/active-jobs-metric-queue.service'; +import { CompletedJobsMetricQueueService } from './completed-jobs-metric/completed-jobs-metric-queue.service'; +import { InboundParseQueueService } from './inbound-parse/inbound-parse-queue.service'; import { QueueBaseService } from './queue-base.service'; - -import { ActiveJobsMetricQueueService } from './active-jobs-metric-queue.service'; -import { CompletedJobsMetricQueueService } from './completed-jobs-metric-queue.service'; -import { InboundParseQueueService } from './inbound-parse-queue.service'; -import { StandardQueueService } from './standard-queue.service'; -import { WebSocketsQueueService } from './web-sockets-queue.service'; -import { WorkflowQueueService } from './workflow-queue.service'; -import { SubscriberProcessQueueService } from './subscriber-process-queue.service'; +import { StandardQueueService } from './standard/standard-queue.service'; +import { WebSocketsQueueService } from './web-sockets/web-sockets-queue.service'; +import { WorkflowQueueService } from './workflow/workflow-queue.service'; +import { SubscriberProcessQueueService } from './subscriber-process/subscriber-process-queue.service'; export { QueueBaseService, diff --git a/packages/application-generic/src/services/queues/queue-base.service.ts b/packages/application-generic/src/services/queues/queue-base.service.ts index 65f3ce4e094..d470d584dde 100644 --- a/packages/application-generic/src/services/queues/queue-base.service.ts +++ b/packages/application-generic/src/services/queues/queue-base.service.ts @@ -71,52 +71,60 @@ export class QueueBaseService { await this.gracefulShutdown(); } - public async addMinimalJob( - id: string, - data?: any, - groupId?: string, - options: JobsOptions = {} - ) { + public async addMinimalJob(params: IJobParams) { + const { name, groupId, data, options } = params; + const jobData = data ? { _environmentId: data._environmentId, - _id: id, + _id: name, _organizationId: data._organizationId, _userId: data._userId, } : undefined; - await this.add(id, jobData, groupId, { - removeOnComplete: true, - removeOnFail: true, - ...options, - }); + await this.instance.add( + name, + jobData, + { + removeOnComplete: true, + removeOnFail: true, + ...options, + }, + groupId + ); } - public async add( - name: string, - data?: any, - groupId?: string, - options: JobsOptions = {} - ) { + public async add(params: IJobParams) { const jobOptions = { removeOnComplete: true, removeOnFail: true, - ...options, + ...params.options, }; - await this.instance.add(name, data, jobOptions, groupId); + await this.instance.add( + params.name, + params.data, + jobOptions, + params.groupId + ); } - public async addBulk( - data: [ - { - name: string; - data: any; - options: BulkJobOptions; - groupId?: string; - } - ] - ) { + + public async addBulk(data: IBulkJobParams[]) { await this.instance.addBulk(data); } } + +export interface IJobParams { + name: string; + data?: any; + groupId?: string; + options?: JobsOptions; +} + +export interface IBulkJobParams { + name: string; + data: any; + groupId?: string; + options?: BulkJobOptions; +} diff --git a/packages/application-generic/src/services/queues/standard-queue.service.ts b/packages/application-generic/src/services/queues/standard-queue.service.ts deleted file mode 100644 index 75febb14e1b..00000000000 --- a/packages/application-generic/src/services/queues/standard-queue.service.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; -import { JobTopicNameEnum } from '@novu/shared'; - -import { QueueBaseService } from './queue-base.service'; - -const LOG_CONTEXT = 'StandardQueueService'; - -@Injectable() -export class StandardQueueService extends QueueBaseService { - constructor() { - super(JobTopicNameEnum.STANDARD); - - Logger.log(`Creating queue ${this.topic}`, LOG_CONTEXT); - - this.createQueue(); - } -} diff --git a/packages/application-generic/src/services/queues/standard-queue.service.spec.ts b/packages/application-generic/src/services/queues/standard/standard-queue.service.spec.ts similarity index 89% rename from packages/application-generic/src/services/queues/standard-queue.service.spec.ts rename to packages/application-generic/src/services/queues/standard/standard-queue.service.spec.ts index 94becbba5fd..3e4b51970e8 100644 --- a/packages/application-generic/src/services/queues/standard-queue.service.spec.ts +++ b/packages/application-generic/src/services/queues/standard/standard-queue.service.spec.ts @@ -1,6 +1,7 @@ import { Test } from '@nestjs/testing'; import { StandardQueueService } from './standard-queue.service'; +import { IStandardDataDto } from '../../../dtos/standard-job.dto'; let standardQueueService: StandardQueueService; @@ -58,14 +59,17 @@ describe('Standard Queue service', () => { const _environmentId = 'standard-environment-id'; const _organizationId = 'standard-organization-id'; const _userId = 'standard-user-id'; - const jobData = { - _id: jobId, - test: 'standard-job-data', + const jobData: IStandardDataDto = { + _id: 'standard-job-data', _environmentId, _organizationId, - _userId, + _userId: _userId, }; - await standardQueueService.add(jobId, jobData, _organizationId); + await standardQueueService.add({ + name: jobId, + data: jobData, + groupId: _organizationId, + }); expect(await standardQueueService.queue.getActiveCount()).toEqual(0); expect(await standardQueueService.queue.getWaitingCount()).toEqual(1); @@ -88,14 +92,17 @@ describe('Standard Queue service', () => { const _environmentId = 'standard-environment-id'; const _organizationId = 'standard-organization-id'; const _userId = 'standard-user-id'; - const jobData = { + const jobData: IStandardDataDto = { _id: jobId, - test: 'standard-job-data-2', _environmentId, _organizationId, - _userId, + _userId: _userId, }; - await standardQueueService.addMinimalJob(jobId, jobData, _organizationId); + await standardQueueService.addMinimalJob({ + name: jobId, + data: jobData, + groupId: _organizationId, + }); expect(await standardQueueService.queue.getActiveCount()).toEqual(0); expect(await standardQueueService.queue.getWaitingCount()).toEqual(1); diff --git a/packages/application-generic/src/services/queues/standard/standard-queue.service.ts b/packages/application-generic/src/services/queues/standard/standard-queue.service.ts new file mode 100644 index 00000000000..27cfd63a801 --- /dev/null +++ b/packages/application-generic/src/services/queues/standard/standard-queue.service.ts @@ -0,0 +1,31 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { JobTopicNameEnum } from '@novu/shared'; +import { QueueBaseService } from '../queue-base.service'; +import { IStandardJobDto } from '../../../dtos/standard-job.dto'; + +const LOG_CONTEXT = 'StandardQueueService'; + +@Injectable() +export class StandardQueueService extends QueueBaseService { + constructor() { + super(JobTopicNameEnum.STANDARD); + + Logger.log(`Creating queue ${this.topic}`, LOG_CONTEXT); + + this.createQueue(); + } + + public async addMinimalJob(data: IStandardJobDto) { + return await super.addMinimalJob(data); + } + + public async add(data: unknown) { + // in order to implement we need to know what should be the data dto first + throw new Error('Not implemented'); + } + + public async addBulk(data: unknown[]) { + // in order to implement we need to know what should be the data dto first + throw new Error('Not implemented'); + } +} diff --git a/packages/application-generic/src/services/queues/subscriber-process-queue.service.ts b/packages/application-generic/src/services/queues/subscriber-process/subscriber-process-queue.service.ts similarity index 52% rename from packages/application-generic/src/services/queues/subscriber-process-queue.service.ts rename to packages/application-generic/src/services/queues/subscriber-process/subscriber-process-queue.service.ts index 94e5670b973..01ac7a90be6 100644 --- a/packages/application-generic/src/services/queues/subscriber-process-queue.service.ts +++ b/packages/application-generic/src/services/queues/subscriber-process/subscriber-process-queue.service.ts @@ -1,7 +1,10 @@ import { Injectable, Logger } from '@nestjs/common'; import { JobTopicNameEnum } from '@novu/shared'; - -import { QueueBaseService } from './queue-base.service'; +import { QueueBaseService } from '../queue-base.service'; +import { + IProcessSubscriberBulkJobDto, + IProcessSubscriberJobDto, +} from '../../../dtos/process-subscriber-job.dto'; @Injectable() export class SubscriberProcessQueueService extends QueueBaseService { @@ -13,4 +16,12 @@ export class SubscriberProcessQueueService extends QueueBaseService { this.createQueue(); } + + public async add(data: IProcessSubscriberJobDto) { + return await super.add(data); + } + + public async addBulk(data: IProcessSubscriberBulkJobDto[]) { + return await super.addBulk(data); + } } diff --git a/packages/application-generic/src/services/queues/web-sockets-queue.service.ts b/packages/application-generic/src/services/queues/web-sockets-queue.service.ts deleted file mode 100644 index da5e4ee81cd..00000000000 --- a/packages/application-generic/src/services/queues/web-sockets-queue.service.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; -import { JobTopicNameEnum } from '@novu/shared'; - -import { QueueBaseService } from './queue-base.service'; - -const LOG_CONTEXT = 'WebSocketsQueueService'; - -@Injectable() -export class WebSocketsQueueService extends QueueBaseService { - constructor() { - super(JobTopicNameEnum.WEB_SOCKETS); - - Logger.log(`Creating queue ${this.topic}`, LOG_CONTEXT); - - this.createQueue(); - } -} diff --git a/packages/application-generic/src/services/queues/web-sockets-queue.service.spec.ts b/packages/application-generic/src/services/queues/web-sockets/web-sockets-queue.service.spec.ts similarity index 87% rename from packages/application-generic/src/services/queues/web-sockets-queue.service.spec.ts rename to packages/application-generic/src/services/queues/web-sockets/web-sockets-queue.service.spec.ts index 5d8bddc2c67..063dc381982 100644 --- a/packages/application-generic/src/services/queues/web-sockets-queue.service.spec.ts +++ b/packages/application-generic/src/services/queues/web-sockets/web-sockets-queue.service.spec.ts @@ -1,6 +1,8 @@ import { Test } from '@nestjs/testing'; import { WebSocketsQueueService } from './web-sockets-queue.service'; +import { IWebSocketDataDto } from '../../../dtos/web-sockets-job.dto'; +import { WebSocketEventEnum } from '@novu/shared'; let webSocketsQueueService: WebSocketsQueueService; @@ -47,14 +49,19 @@ describe('WebSockets Queue service', () => { const _environmentId = 'web-sockets-queue-environment-id'; const _organizationId = 'web-sockets-queue-organization-id'; const _userId = 'web-sockets-queue-user-id'; - const jobData = { - _id: jobId, - test: 'web-sockets-queue-job-data', + const jobData: IWebSocketDataDto = { + event: WebSocketEventEnum.RECEIVED, + payload: { messageId: 'web-sockets-queue-job-message-id' }, _environmentId, _organizationId, - _userId, + userId: _userId, }; - await webSocketsQueueService.add(jobId, jobData, _organizationId); + + await webSocketsQueueService.add({ + name: jobId, + data: jobData, + groupId: _organizationId, + }); expect(await webSocketsQueueService.queue.getActiveCount()).toEqual(0); expect(await webSocketsQueueService.queue.getWaitingCount()).toEqual(1); @@ -84,11 +91,11 @@ describe('WebSockets Queue service', () => { _organizationId, _userId, }; - await webSocketsQueueService.addMinimalJob( - jobId, - jobData, - _organizationId - ); + await webSocketsQueueService.addMinimalJob({ + name: jobId, + data: jobData, + groupId: _organizationId, + }); expect(await webSocketsQueueService.queue.getActiveCount()).toEqual(0); expect(await webSocketsQueueService.queue.getWaitingCount()).toEqual(1); diff --git a/packages/application-generic/src/services/queues/web-sockets/web-sockets-queue.service.ts b/packages/application-generic/src/services/queues/web-sockets/web-sockets-queue.service.ts new file mode 100644 index 00000000000..e3559e3a176 --- /dev/null +++ b/packages/application-generic/src/services/queues/web-sockets/web-sockets-queue.service.ts @@ -0,0 +1,28 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { JobTopicNameEnum } from '@novu/shared'; +import { QueueBaseService } from '../queue-base.service'; +import { + IWebSocketBulkJobDto, + IWebSocketJobDto, +} from '../../../dtos/web-sockets-job.dto'; + +const LOG_CONTEXT = 'WebSocketsQueueService'; + +@Injectable() +export class WebSocketsQueueService extends QueueBaseService { + constructor() { + super(JobTopicNameEnum.WEB_SOCKETS); + + Logger.log(`Creating queue ${this.topic}`, LOG_CONTEXT); + + this.createQueue(); + } + + public async add(data: IWebSocketJobDto) { + return await super.add(data); + } + + public async addBulk(data: IWebSocketBulkJobDto[]) { + return await super.addBulk(data); + } +} diff --git a/packages/application-generic/src/services/queues/workflow-queue.service.ts b/packages/application-generic/src/services/queues/workflow-queue.service.ts deleted file mode 100644 index 77d48b6de9c..00000000000 --- a/packages/application-generic/src/services/queues/workflow-queue.service.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; -import { JobTopicNameEnum } from '@novu/shared'; - -import { QueueBaseService } from './queue-base.service'; - -const LOG_CONTEXT = 'WorkflowQueueService'; - -@Injectable() -export class WorkflowQueueService extends QueueBaseService { - constructor() { - super(JobTopicNameEnum.WORKFLOW); - - Logger.log(`Creating queue ${this.topic}`, LOG_CONTEXT); - - this.createQueue(); - } -} diff --git a/packages/application-generic/src/services/queues/workflow-queue.service.spec.ts b/packages/application-generic/src/services/queues/workflow/workflow-queue.service.spec.ts similarity index 93% rename from packages/application-generic/src/services/queues/workflow-queue.service.spec.ts rename to packages/application-generic/src/services/queues/workflow/workflow-queue.service.spec.ts index 115bb71f8d8..6bf424d5a40 100644 --- a/packages/application-generic/src/services/queues/workflow-queue.service.spec.ts +++ b/packages/application-generic/src/services/queues/workflow/workflow-queue.service.spec.ts @@ -65,7 +65,12 @@ describe('Workflow Queue service', () => { _organizationId, _userId, }; - await workflowQueueService.add(jobId, jobData, _organizationId); + + await workflowQueueService.addMinimalJob({ + name: jobId, + data: jobData, + groupId: _organizationId, + }); expect(await workflowQueueService.queue.getActiveCount()).toEqual(0); expect(await workflowQueueService.queue.getWaitingCount()).toEqual(1); @@ -95,7 +100,11 @@ describe('Workflow Queue service', () => { _organizationId, _userId, }; - await workflowQueueService.addMinimalJob(jobId, jobData, _organizationId); + await workflowQueueService.addMinimalJob({ + name: jobId, + data: jobData, + groupId: _organizationId, + }); expect(await workflowQueueService.queue.getActiveCount()).toEqual(0); expect(await workflowQueueService.queue.getWaitingCount()).toEqual(1); diff --git a/packages/application-generic/src/services/queues/workflow/workflow-queue.service.ts b/packages/application-generic/src/services/queues/workflow/workflow-queue.service.ts new file mode 100644 index 00000000000..adfc7896bc1 --- /dev/null +++ b/packages/application-generic/src/services/queues/workflow/workflow-queue.service.ts @@ -0,0 +1,26 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { JobTopicNameEnum } from '@novu/shared'; +import { QueueBaseService } from '../queue-base.service'; +import { IWorkflowJobDto } from '../../../dtos'; +import { IWorkflowBulkJobDto } from '../../../dtos/workflow-job.dto'; + +const LOG_CONTEXT = 'WorkflowQueueService'; + +@Injectable() +export class WorkflowQueueService extends QueueBaseService { + constructor() { + super(JobTopicNameEnum.WORKFLOW); + + Logger.log(`Creating queue ${this.topic}`, LOG_CONTEXT); + + this.createQueue(); + } + + public async add(data: IWorkflowJobDto) { + return await super.add(data); + } + + public async addBulk(data: IWorkflowBulkJobDto[]) { + return await super.addBulk(data); + } +} diff --git a/packages/application-generic/src/services/readiness/readiness.service.spec.ts b/packages/application-generic/src/services/readiness/readiness.service.spec.ts index ef30899d704..64e51158bdf 100644 --- a/packages/application-generic/src/services/readiness/readiness.service.spec.ts +++ b/packages/application-generic/src/services/readiness/readiness.service.spec.ts @@ -1,11 +1,8 @@ import { ReadinessService } from './readiness.service'; import { BullMqService } from '../bull-mq'; -import { - StandardQueueService, - SubscriberProcessQueueService, - WorkflowQueueService, -} from '../queues'; +import { StandardQueueService, WorkflowQueueService } from '../queues'; +import { SubscriberProcessQueueService } from '../queues/subscriber-process/subscriber-process-queue.service'; import { StandardWorkerService, WorkerBaseService } from '../workers'; import { StandardQueueServiceHealthIndicator, diff --git a/packages/application-generic/src/usecases/add-job/add-job.usecase.ts b/packages/application-generic/src/usecases/add-job/add-job.usecase.ts index 1a9a534b68d..047e85c129f 100644 --- a/packages/application-generic/src/usecases/add-job/add-job.usecase.ts +++ b/packages/application-generic/src/usecases/add-job/add-job.usecase.ts @@ -16,14 +16,12 @@ import { CreateExecutionDetailsCommand, DetailEnum, } from '../../usecases'; -import { - CalculateDelayService, - JobsOptions, - StandardQueueService, -} from '../../services'; +import { CalculateDelayService, JobsOptions } from '../../services'; +import { StandardQueueService } from '../../services/queues'; import { LogDecorator } from '../../logging'; import { InstrumentUsecase } from '../../instrumentation'; import { validateDigest } from './validation'; +import { IStandardDataDto } from '../../dtos/standard-job.dto'; export enum BackoffStrategiesEnum { WEBHOOK_FILTER_BACKOFF = 'webhookFilterBackoff', @@ -170,7 +168,7 @@ export class AddJob { options.attempts = this.standardQueueService.DEFAULT_ATTEMPTS; } - const jobData = { + const jobData: IStandardDataDto = { _environmentId: job._environmentId, _id: job._id, _organizationId: job._organizationId, @@ -183,12 +181,12 @@ export class AddJob { LOG_CONTEXT ); - await this.standardQueueService.addMinimalJob( - job._id, - jobData, - command.organizationId, - options - ); + await this.standardQueueService.addMinimalJob({ + name: job._id, + data: jobData, + groupId: command.organizationId, + options: options, + }); if (delay) { const logMessage = diff --git a/packages/application-generic/src/usecases/subscriber-job-bound/subscriber-job-bound.command.ts b/packages/application-generic/src/usecases/subscriber-job-bound/subscriber-job-bound.command.ts index 1095e40290b..b0e5bdda4cb 100644 --- a/packages/application-generic/src/usecases/subscriber-job-bound/subscriber-job-bound.command.ts +++ b/packages/application-generic/src/usecases/subscriber-job-bound/subscriber-job-bound.command.ts @@ -9,8 +9,12 @@ import { ISubscribersDefine, ITenantDefine } from '@novu/shared'; import { SubscriberEntity } from '@novu/dal'; import { EnvironmentWithUserCommand } from '../../commands'; +import { IProcessSubscriberDataDto } from '../../dtos'; -export class SubscriberJobBoundCommand extends EnvironmentWithUserCommand { +export class SubscriberJobBoundCommand + extends EnvironmentWithUserCommand + implements IProcessSubscriberDataDto +{ @IsString() @IsDefined() transactionId: string; @@ -32,9 +36,6 @@ export class SubscriberJobBoundCommand extends EnvironmentWithUserCommand { @IsOptional() actor?: SubscriberEntity | undefined; - @IsDefined() - to: ISubscribersDefine[]; - @IsDefined() @IsMongoId() templateId: string; diff --git a/packages/application-generic/src/usecases/trigger-event/trigger-event.usecase.ts b/packages/application-generic/src/usecases/trigger-event/trigger-event.usecase.ts index dff8ab17537..285544f34d6 100644 --- a/packages/application-generic/src/usecases/trigger-event/trigger-event.usecase.ts +++ b/packages/application-generic/src/usecases/trigger-event/trigger-event.usecase.ts @@ -39,7 +39,8 @@ import { ApiException } from '../../utils/exceptions'; import { ProcessTenant, ProcessTenantCommand } from '../process-tenant'; import { MapTriggerRecipients } from '../map-trigger-recipients/map-trigger-recipients.use-case'; import { MapTriggerRecipientsCommand } from '../map-trigger-recipients/map-trigger-recipients.command'; -import { SubscriberProcessQueueService } from '../../services/queues/subscriber-process-queue.service'; +import { SubscriberProcessQueueService } from '../../services/queues/subscriber-process/subscriber-process-queue.service'; +import { IProcessSubscriberBulkJobDto } from '../../dtos/process-subscriber-job.dto'; const LOG_CONTEXT = 'TriggerEventUseCase'; const QUEUE_CHUNK_SIZE = 100; @@ -329,11 +330,11 @@ export class TriggerEvent { } private mapSubscribersToJobs( - subscribers: { subscriberId: string }[], + subscribers: ISubscribersDefine[], command: TriggerEventCommand, - actorProcessed, - template - ) { + actorProcessed: SubscriberEntity, + template: NotificationTemplateEntity + ): IProcessSubscriberBulkJobDto[] { return subscribers.map((subscriber) => { return { name: command.transactionId + subscriber.subscriberId, @@ -365,10 +366,13 @@ export class TriggerEvent { return tenant; } - private async subscriberProcessQueueAddBulk(jobs) { + private async subscriberProcessQueueAddBulk( + jobs: IProcessSubscriberBulkJobDto[] + ) { return await Promise.all( - _.chunk(jobs, QUEUE_CHUNK_SIZE).map((chunk) => - this.subscriberProcessQueueService.addBulk(chunk) + _.chunk(jobs, QUEUE_CHUNK_SIZE).map( + (chunk: IProcessSubscriberBulkJobDto[]) => + this.subscriberProcessQueueService.addBulk(chunk) ) ); } From 85e6e618556c689b4aa9f022e1c7a68c7f6cfe94 Mon Sep 17 00:00:00 2001 From: Gosha Date: Tue, 14 Nov 2023 13:08:43 +0200 Subject: [PATCH 2/5] fix: update method input --- apps/worker/src/app/workflow/services/standard.worker.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/worker/src/app/workflow/services/standard.worker.spec.ts b/apps/worker/src/app/workflow/services/standard.worker.spec.ts index 869a428e9af..1786c856d3f 100644 --- a/apps/worker/src/app/workflow/services/standard.worker.spec.ts +++ b/apps/worker/src/app/workflow/services/standard.worker.spec.ts @@ -204,7 +204,7 @@ describe('Standard Worker', () => { _userId: jobCreated._userId, }; - await standardQueueService.addMinimalJob(jobCreated._id, jobData, '0'); + await standardQueueService.addMinimalJob({ name: jobCreated._id, data: jobData, groupId: '0' }); await jobsService.awaitRunningJobs({ templateId: _templateId, @@ -267,7 +267,7 @@ describe('Standard Worker', () => { _userId: jobCreated._userId, }; - await standardQueueService.addMinimalJob(jobCreated._id, jobData, '0'); + await standardQueueService.addMinimalJob({ name: jobCreated._id, data: jobData, groupId: '0' }); await jobsService.awaitRunningJobs({ templateId: _templateId, From a019d3babd4334c3092c1e3511e101fd4ca4c8fe Mon Sep 17 00:00:00 2001 From: Gosha Date: Sun, 26 Nov 2023 16:46:27 +0200 Subject: [PATCH 3/5] fix: after next merge --- .../handle-last-failed-job.usecase.ts | 10 +- .../message-matcher.usecase.ts | 20 ++-- .../send-message/digest/digest.usecase.ts | 10 +- .../digest/get-digest-events.usecase.ts | 10 +- .../send-message/send-message-chat.usecase.ts | 91 ++++++++-------- .../send-message-delay.usecase.ts | 10 +- .../send-message-email.usecase.ts | 101 +++++++++--------- .../send-message-in-app.usecase.ts | 30 +++--- .../send-message/send-message-push.usecase.ts | 40 +++---- .../send-message/send-message-sms.usecase.ts | 70 ++++++------ .../send-message/send-message.base.ts | 40 +++---- .../send-message/send-message.usecase.ts | 40 +++---- ...webhook-filter-backoff-strategy.usecase.ts | 10 +- .../execution-log-queue.service.spec.ts | 0 .../execution-log-queue.service.ts | 5 +- .../src/services/queues/index.ts | 2 +- .../src/usecases/add-job/add-job.usecase.ts | 27 +++-- .../add-job/merge-or-create-digest.usecase.ts | 16 ++- 18 files changed, 264 insertions(+), 268 deletions(-) rename packages/application-generic/src/services/queues/{ => execution-log}/execution-log-queue.service.spec.ts (100%) rename packages/application-generic/src/services/queues/{ => execution-log}/execution-log-queue.service.ts (73%) diff --git a/apps/worker/src/app/workflow/usecases/handle-last-failed-job/handle-last-failed-job.usecase.ts b/apps/worker/src/app/workflow/usecases/handle-last-failed-job/handle-last-failed-job.usecase.ts index d984489dd6e..cb56be84846 100644 --- a/apps/worker/src/app/workflow/usecases/handle-last-failed-job/handle-last-failed-job.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/handle-last-failed-job/handle-last-failed-job.usecase.ts @@ -41,9 +41,9 @@ export class HandleLastFailedJob { throw new PlatformException(message); } const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(job), detail: DetailEnum.WEBHOOK_FILTER_FAILED_LAST_RETRY, @@ -53,8 +53,8 @@ export class HandleLastFailedJob { isRetry: true, raw: JSON.stringify({ message: JSON.parse(error.message).message }), }), - job._organizationId - ); + groupId: job._organizationId, + }); if (!job?.step?.shouldStopOnFail) { await this.queueNextJob.execute( diff --git a/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.ts b/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.ts index a45dde929f1..ea379d45d00 100644 --- a/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/message-matcher/message-matcher.usecase.ts @@ -102,9 +102,9 @@ export class MessageMatcher extends Filter { const result = await this.processFilter(variables, children[0], command, filterProcessingDetails); if (!prefiltering) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: DetailEnum.PROCESSING_STEP_FILTER, @@ -114,8 +114,8 @@ export class MessageMatcher extends Filter { isRetry: false, raw: filterProcessingDetails.toString(), }), - command.organizationId - ); + groupId: command.organizationId, + }); } details.push(filterProcessingDetails); @@ -126,9 +126,9 @@ export class MessageMatcher extends Filter { const result = await this.handleGroupFilters(filter, variables, command, filterProcessingDetails); if (!prefiltering) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: DetailEnum.PROCESSING_STEP_FILTER, @@ -138,8 +138,8 @@ export class MessageMatcher extends Filter { isRetry: false, raw: filterProcessingDetails.toString(), }), - command.organizationId - ); + groupId: command.organizationId, + }); } details.push(filterProcessingDetails); diff --git a/apps/worker/src/app/workflow/usecases/send-message/digest/digest.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/digest/digest.usecase.ts index aadf24983b4..67f67b1c11c 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/digest/digest.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/digest/digest.usecase.ts @@ -57,9 +57,9 @@ export class Digest extends SendMessageType { const events = await getEvents(command, currentJob); const nextJobs = await this.getJobsToUpdate(command); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(currentJob), detail: DetailEnum.DIGEST_TRIGGERED_EVENTS, @@ -69,8 +69,8 @@ export class Digest extends SendMessageType { isRetry: false, raw: JSON.stringify(events), }), - currentJob._organizationId - ); + groupId: currentJob._organizationId, + }); await this.jobRepository.update( { diff --git a/apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events.usecase.ts index dbff746390f..1acb5a37ed9 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/digest/get-digest-events.usecase.ts @@ -61,9 +61,9 @@ export abstract class GetDigestEvents { if (!currentTrigger) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(currentJob), detail: DetailEnum.DIGEST_TRIGGERED_EVENTS, @@ -72,8 +72,8 @@ export abstract class GetDigestEvents { isTest: false, isRetry: false, }), - currentJob._organizationId - ); + groupId: currentJob._organizationId, + }); const message = `Trigger job for jobId ${currentJob._id} is not found`; Logger.error(message, LOG_CONTEXT); throw new PlatformException(message); diff --git a/apps/worker/src/app/workflow/usecases/send-message/send-message-chat.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/send-message-chat.usecase.ts index 7f827b88f60..a49f2786ea0 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/send-message-chat.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/send-message-chat.usecase.ts @@ -20,7 +20,6 @@ import { import { InstrumentUsecase, DetailEnum, - CreateExecutionDetails, CreateExecutionDetailsCommand, CompileTemplate, CompileTemplateCommand, @@ -118,9 +117,9 @@ export class SendMessageChat extends SendMessageBase { if (chatChannels.length === 0) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: DetailEnum.SUBSCRIBER_NO_ACTIVE_CHANNEL, @@ -129,8 +128,8 @@ export class SendMessageChat extends SendMessageBase { isTest: false, isRetry: false, }), - command.organizationId - ); + groupId: command.organizationId, + }); return; } @@ -151,9 +150,9 @@ export class SendMessageChat extends SendMessageBase { if (allFailed) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: DetailEnum.CHAT_ALL_CHANNELS_FAILED, @@ -162,8 +161,8 @@ export class SendMessageChat extends SendMessageBase { isTest: false, isRetry: false, }), - command.organizationId - ); + groupId: command.organizationId, + }); } } @@ -190,9 +189,9 @@ export class SendMessageChat extends SendMessageBase { if (!chatWebhookUrl) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: DetailEnum.CHAT_WEBHOOK_URL_MISSING, @@ -204,8 +203,8 @@ export class SendMessageChat extends SendMessageBase { reason: `webhookUrl for integrationId: ${subscriberChannel?._integrationId} is missing`, }), }), - command.organizationId - ); + groupId: command.organizationId, + }); } const message: MessageEntity = await this.messageRepository.create({ @@ -225,9 +224,9 @@ export class SendMessageChat extends SendMessageBase { if (!integration) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: DetailEnum.SUBSCRIBER_NO_ACTIVE_INTEGRATION, @@ -239,8 +238,8 @@ export class SendMessageChat extends SendMessageBase { reason: `Integration with integrationId: ${subscriberChannel?._integrationId} is either deleted or not active`, }), }), - command.organizationId - ); + groupId: command.organizationId, + }); return; } @@ -248,9 +247,9 @@ export class SendMessageChat extends SendMessageBase { await this.sendSelectedIntegrationExecution(command.job, integration); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: DetailEnum.MESSAGE_CREATED, @@ -261,8 +260,8 @@ export class SendMessageChat extends SendMessageBase { isRetry: false, raw: this.storeContent() ? JSON.stringify(content) : null, }), - command.organizationId - ); + groupId: command.organizationId, + }); if (chatWebhookUrl && integration) { await this.sendMessage(chatWebhookUrl, integration, content, message, command, channelSpecification); @@ -290,9 +289,9 @@ export class SendMessageChat extends SendMessageBase { ); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: message._id, @@ -305,8 +304,8 @@ export class SendMessageChat extends SendMessageBase { reason: `webhookUrl for integrationId: ${integration?.identifier} is missing`, }), }), - command.organizationId - ); + groupId: command.organizationId, + }); return; } @@ -320,9 +319,9 @@ export class SendMessageChat extends SendMessageBase { LogCodeEnum.MISSING_CHAT_INTEGRATION ); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: message._id, @@ -335,8 +334,8 @@ export class SendMessageChat extends SendMessageBase { reason: 'Integration is either deleted or not active', }), }), - command.organizationId - ); + groupId: command.organizationId, + }); return; } @@ -364,9 +363,9 @@ export class SendMessageChat extends SendMessageBase { }); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: message._id, @@ -377,8 +376,8 @@ export class SendMessageChat extends SendMessageBase { isRetry: false, raw: JSON.stringify(result), }), - command.organizationId - ); + groupId: command.organizationId, + }); } catch (e) { await this.sendErrorStatus( message, @@ -391,9 +390,9 @@ export class SendMessageChat extends SendMessageBase { ); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: message._id, @@ -404,8 +403,8 @@ export class SendMessageChat extends SendMessageBase { isRetry: false, raw: JSON.stringify(e), }), - command.organizationId - ); + groupId: command.organizationId, + }); } } } diff --git a/apps/worker/src/app/workflow/usecases/send-message/send-message-delay.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/send-message-delay.usecase.ts index dec75730ff3..33ebc2ffb8c 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/send-message-delay.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/send-message-delay.usecase.ts @@ -25,9 +25,9 @@ export class SendMessageDelay extends SendMessageType { @InstrumentUsecase() public async execute(command: SendMessageCommand) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: DetailEnum.DELAY_FINISHED, @@ -36,7 +36,7 @@ export class SendMessageDelay extends SendMessageType { isTest: false, isRetry: false, }), - command.organizationId - ); + groupId: command.organizationId, + }); } } diff --git a/apps/worker/src/app/workflow/usecases/send-message/send-message-email.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/send-message-email.usecase.ts index b92c6a10b1c..ba7296fa385 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/send-message-email.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/send-message-email.usecase.ts @@ -23,7 +23,6 @@ import * as Sentry from '@sentry/node'; import { InstrumentUsecase, DetailEnum, - CreateExecutionDetails, CreateExecutionDetailsCommand, SelectIntegration, CompileEmailTemplate, @@ -91,9 +90,9 @@ export class SendMessageEmail extends SendMessageBase { }); } catch (e) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: DetailEnum.LIMIT_PASSED_NOVU_INTEGRATION, @@ -103,8 +102,8 @@ export class SendMessageEmail extends SendMessageBase { isTest: false, isRetry: false, }), - command.organizationId - ); + groupId: command.organizationId, + }); return; } @@ -121,9 +120,9 @@ export class SendMessageEmail extends SendMessageBase { if (!integration) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: DetailEnum.SUBSCRIBER_NO_ACTIVE_INTEGRATION, @@ -139,8 +138,8 @@ export class SendMessageEmail extends SendMessageBase { } : {}), }), - command.organizationId - ); + groupId: command.organizationId, + }); return; } @@ -261,9 +260,9 @@ export class SendMessageEmail extends SendMessageBase { } const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: DetailEnum.MESSAGE_CREATED, @@ -274,8 +273,8 @@ export class SendMessageEmail extends SendMessageBase { isRetry: false, raw: this.storeContent() ? JSON.stringify(payload) : null, }), - command.organizationId - ); + groupId: command.organizationId, + }); const attachments = (command.payload.attachments)?.map( (attachment) => @@ -324,9 +323,9 @@ export class SendMessageEmail extends SendMessageBase { private async getReplyTo(command: SendMessageCommand, messageId: string): Promise { if (!command.step.replyCallback?.url) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: messageId, @@ -336,8 +335,8 @@ export class SendMessageEmail extends SendMessageBase { isTest: false, isRetry: false, }), - command.organizationId - ); + groupId: command.organizationId, + }); return null; } @@ -358,9 +357,9 @@ export class SendMessageEmail extends SendMessageBase { : DetailEnum.REPLY_CALLBACK_MISSING_MX_ROUTE_DOMAIN_CONFIGURATION; const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: messageId, @@ -370,8 +369,8 @@ export class SendMessageEmail extends SendMessageBase { isTest: false, isRetry: false, }), - command.organizationId - ); + groupId: command.organizationId, + }); return null; } @@ -395,9 +394,9 @@ export class SendMessageEmail extends SendMessageBase { ); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: message._id, @@ -407,8 +406,8 @@ export class SendMessageEmail extends SendMessageBase { isTest: false, isRetry: false, }), - command.organizationId - ); + groupId: command.organizationId, + }); return; } @@ -426,9 +425,9 @@ export class SendMessageEmail extends SendMessageBase { ); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: message._id, @@ -438,8 +437,8 @@ export class SendMessageEmail extends SendMessageBase { isTest: false, isRetry: false, }), - command.organizationId - ); + groupId: command.organizationId, + }); return; } @@ -461,9 +460,9 @@ export class SendMessageEmail extends SendMessageBase { Logger.verbose({ command }, 'Email message has been sent', LOG_CONTEXT); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: message._id, @@ -474,8 +473,8 @@ export class SendMessageEmail extends SendMessageBase { isRetry: false, raw: JSON.stringify(result), }), - command.organizationId - ); + groupId: command.organizationId, + }); Logger.verbose({ command }, 'Execution details of sending an email message have been stored', LOG_CONTEXT); @@ -503,9 +502,9 @@ export class SendMessageEmail extends SendMessageBase { ); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: message._id, @@ -516,8 +515,8 @@ export class SendMessageEmail extends SendMessageBase { isRetry: false, raw: JSON.stringify(error), }), - command.organizationId - ); + groupId: command.organizationId, + }); return; } @@ -536,9 +535,9 @@ export class SendMessageEmail extends SendMessageBase { ); if (!layoutOverride) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: DetailEnum.LAYOUT_NOT_FOUND, @@ -550,8 +549,8 @@ export class SendMessageEmail extends SendMessageBase { layoutIdentifier: overrideLayoutIdentifier, }), }), - command.organizationId - ); + groupId: command.organizationId, + }); } return layoutOverride?._id; diff --git a/apps/worker/src/app/workflow/usecases/send-message/send-message-in-app.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/send-message-in-app.usecase.ts index d160364a270..7a7d638724e 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/send-message-in-app.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/send-message-in-app.usecase.ts @@ -94,9 +94,9 @@ export class SendMessageInApp extends SendMessageBase { if (!integration) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: DetailEnum.SUBSCRIBER_NO_ACTIVE_INTEGRATION, @@ -105,8 +105,8 @@ export class SendMessageInApp extends SendMessageBase { isTest: false, isRetry: false, }), - command.organizationId - ); + groupId: command.organizationId, + }); return; } @@ -253,9 +253,9 @@ export class SendMessageInApp extends SendMessageBase { if (!message) throw new PlatformException('Message not found'); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), ...metadata, messageId: message._id, @@ -266,8 +266,8 @@ export class SendMessageInApp extends SendMessageBase { isTest: false, isRetry: false, }), - command.organizationId - ); + groupId: command.organizationId, + }); await this.webSocketsQueueService.add({ name: 'sendMessage', @@ -287,9 +287,9 @@ export class SendMessageInApp extends SendMessageBase { }); const meta = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - meta._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: meta._id, + data: CreateExecutionDetailsCommand.create({ ...meta, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: message._id, @@ -300,8 +300,8 @@ export class SendMessageInApp extends SendMessageBase { isTest: false, isRetry: false, }), - command.organizationId - ); + groupId: command.organizationId, + }); } private async compileInAppTemplate( diff --git a/apps/worker/src/app/workflow/usecases/send-message/send-message-push.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/send-message-push.usecase.ts index 42dfa7db786..226dc46873f 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/send-message-push.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/send-message-push.usecase.ts @@ -230,9 +230,9 @@ export class SendMessagePush extends SendMessageBase { private async sendNotificationError(job: JobEntity): Promise { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(job), detail: DetailEnum.NOTIFICATION_ERROR, @@ -241,8 +241,8 @@ export class SendMessagePush extends SendMessageBase { isTest: false, isRetry: false, }), - job._organizationId - ); + groupId: job._organizationId, + }); } private async sendPushMissingDeviceTokensError(job: JobEntity, channel: IChannelSettings): Promise { @@ -277,9 +277,9 @@ export class SendMessagePush extends SendMessageBase { // We avoid to throw the errors to be able to execute all actions in the loop try { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...CreateExecutionDetailsCommand.getDetailsFromJob(job), ...metadata, detail, @@ -291,8 +291,8 @@ export class SendMessagePush extends SendMessageBase { ...(contextData?.messageId && { messageId: contextData.messageId }), ...(contextData?.raw && { raw: contextData.raw }), }), - job._organizationId - ); + groupId: job._organizationId, + }); } catch (error) {} } @@ -321,9 +321,9 @@ export class SendMessagePush extends SendMessageBase { }); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: message._id, @@ -334,8 +334,8 @@ export class SendMessagePush extends SendMessageBase { isRetry: false, raw: JSON.stringify({ result, deviceToken }), }), - command.organizationId - ); + groupId: command.organizationId, + }); return true; } catch (e) { @@ -384,9 +384,9 @@ export class SendMessagePush extends SendMessageBase { }); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: `${DetailEnum.MESSAGE_CREATED}: ${integration.providerId}`, @@ -397,8 +397,8 @@ export class SendMessagePush extends SendMessageBase { isRetry: false, raw: this.storeContent() ? JSON.stringify(content) : null, }), - command.organizationId - ); + groupId: command.organizationId, + }); return message; } diff --git a/apps/worker/src/app/workflow/usecases/send-message/send-message-sms.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/send-message-sms.usecase.ts index 106d99df3b1..8cf24b5bf2a 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/send-message-sms.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/send-message-sms.usecase.ts @@ -125,9 +125,9 @@ export class SendMessageSms extends SendMessageBase { if (!integration) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: DetailEnum.SUBSCRIBER_NO_ACTIVE_INTEGRATION, @@ -143,8 +143,8 @@ export class SendMessageSms extends SendMessageBase { } : {}), }), - command.organizationId - ); + groupId: command.organizationId, + }); return; } @@ -178,9 +178,9 @@ export class SendMessageSms extends SendMessageBase { }); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: DetailEnum.MESSAGE_CREATED, @@ -191,8 +191,8 @@ export class SendMessageSms extends SendMessageBase { isRetry: false, raw: this.storeContent() ? JSON.stringify(messagePayload) : null, }), - command.organizationId - ); + groupId: command.organizationId, + }); if (phone && integration) { await this.sendMessage(phone, integration, content, message, command, overrides); @@ -215,9 +215,9 @@ export class SendMessageSms extends SendMessageBase { ); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: message._id, @@ -227,8 +227,8 @@ export class SendMessageSms extends SendMessageBase { isTest: false, isRetry: false, }), - command.organizationId - ); + groupId: command.organizationId, + }); return; } @@ -243,9 +243,9 @@ export class SendMessageSms extends SendMessageBase { ); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: message._id, @@ -255,8 +255,8 @@ export class SendMessageSms extends SendMessageBase { isTest: false, isRetry: false, }), - command.organizationId - ); + groupId: command.organizationId, + }); return; } @@ -270,9 +270,9 @@ export class SendMessageSms extends SendMessageBase { LogCodeEnum.MISSING_SMS_PROVIDER ); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: message._id, @@ -282,8 +282,8 @@ export class SendMessageSms extends SendMessageBase { isTest: false, isRetry: false, }), - command.organizationId - ); + groupId: command.organizationId, + }); return; } @@ -312,9 +312,9 @@ export class SendMessageSms extends SendMessageBase { }); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: message._id, @@ -325,8 +325,8 @@ export class SendMessageSms extends SendMessageBase { isRetry: false, raw: JSON.stringify(result), }), - command.organizationId - ); + groupId: command.organizationId, + }); if (!result?.id) { return; @@ -352,9 +352,9 @@ export class SendMessageSms extends SendMessageBase { ); const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), messageId: message._id, @@ -365,8 +365,8 @@ export class SendMessageSms extends SendMessageBase { isRetry: false, raw: JSON.stringify(e), }), - command.organizationId - ); + groupId: command.organizationId, + }); } } diff --git a/apps/worker/src/app/workflow/usecases/send-message/send-message.base.ts b/apps/worker/src/app/workflow/usecases/send-message/send-message.base.ts index c7b6609c2da..1e0f857f137 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/send-message.base.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/send-message.base.ts @@ -90,9 +90,9 @@ export abstract class SendMessageBase extends SendMessageType { protected async sendErrorHandlebars(job: JobEntity, error: string) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(job), detail: DetailEnum.MESSAGE_CONTENT_NOT_GENERATED, @@ -102,15 +102,15 @@ export abstract class SendMessageBase extends SendMessageType { isRetry: false, raw: JSON.stringify({ error }), }), - job._organizationId - ); + groupId: job._organizationId, + }); } protected async sendSelectedIntegrationExecution(job: JobEntity, integration: IntegrationEntity) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(job), detail: DetailEnum.INTEGRATION_INSTANCE_SELECTED, @@ -126,15 +126,15 @@ export abstract class SendMessageBase extends SendMessageType { _id: integration?._id, }), }), - job._organizationId - ); + groupId: job._organizationId, + }); } protected async sendSelectedTenantExecution(job: JobEntity, tenant: TenantEntity) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(job), detail: DetailEnum.TENANT_CONTEXT_SELECTED, @@ -152,8 +152,8 @@ export abstract class SendMessageBase extends SendMessageType { _id: tenant?._id, }), }), - job._organizationId - ); + groupId: job._organizationId, + }); } protected async handleTenantExecution(job: JobEntity): Promise { @@ -167,9 +167,9 @@ export abstract class SendMessageBase extends SendMessageType { }); if (!tenant) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(job), detail: DetailEnum.TENANT_NOT_FOUND, @@ -181,8 +181,8 @@ export abstract class SendMessageBase extends SendMessageType { tenantIdentifier: tenantIdentifier, }), }), - job._organizationId - ); + groupId: job._organizationId, + }); return null; } diff --git a/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts b/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts index cc69d8b570d..1d4f32c4517 100644 --- a/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/send-message/send-message.usecase.ts @@ -123,9 +123,9 @@ export class SendMessage { if (stepType !== StepTypeEnum.DELAY) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: stepType === StepTypeEnum.DIGEST ? DetailEnum.START_DIGESTING : DetailEnum.START_SENDING, @@ -134,8 +134,8 @@ export class SendMessage { isTest: false, isRetry: false, }), - command.organizationId - ); + groupId: command.organizationId, + }); } switch (stepType) { @@ -175,9 +175,9 @@ export class SendMessage { if (!shouldRun.passed) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(command.job), detail: DetailEnum.FILTER_STEPS, @@ -190,8 +190,8 @@ export class SendMessage { filters: command.step.filters, }), }), - command.organizationId - ); + groupId: command.organizationId, + }); } return shouldRun; @@ -229,9 +229,9 @@ export class SendMessage { if (!globalPreferenceResult) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(job), detail: DetailEnum.STEP_FILTERED_BY_GLOBAL_PREFERENCES, @@ -241,8 +241,8 @@ export class SendMessage { isRetry: false, raw: JSON.stringify(globalPreference), }), - job._organizationId - ); + groupId: job._organizationId, + }); return false; } @@ -262,9 +262,9 @@ export class SendMessage { if (!result) { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(job), detail: DetailEnum.STEP_FILTERED_BY_PREFERENCES, @@ -274,8 +274,8 @@ export class SendMessage { isRetry: false, raw: JSON.stringify(preference), }), - job._organizationId - ); + groupId: job._organizationId, + }); } return result; diff --git a/apps/worker/src/app/workflow/usecases/webhook-filter-backoff-strategy/webhook-filter-backoff-strategy.usecase.ts b/apps/worker/src/app/workflow/usecases/webhook-filter-backoff-strategy/webhook-filter-backoff-strategy.usecase.ts index 19967f18cfe..0a3c6787164 100644 --- a/apps/worker/src/app/workflow/usecases/webhook-filter-backoff-strategy/webhook-filter-backoff-strategy.usecase.ts +++ b/apps/worker/src/app/workflow/usecases/webhook-filter-backoff-strategy/webhook-filter-backoff-strategy.usecase.ts @@ -14,9 +14,9 @@ export class WebhookFilterBackoffStrategy { try { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(job), detail: DetailEnum.WEBHOOK_FILTER_FAILED_RETRY, @@ -26,8 +26,8 @@ export class WebhookFilterBackoffStrategy { isRetry: true, raw: JSON.stringify({ message: JSON.parse(error?.message).message, attempt: attemptsMade }), }), - job._organizationId - ); + groupId: job._organizationId, + }); } catch (anotherError) { Logger.error( anotherError, diff --git a/packages/application-generic/src/services/queues/execution-log-queue.service.spec.ts b/packages/application-generic/src/services/queues/execution-log/execution-log-queue.service.spec.ts similarity index 100% rename from packages/application-generic/src/services/queues/execution-log-queue.service.spec.ts rename to packages/application-generic/src/services/queues/execution-log/execution-log-queue.service.spec.ts diff --git a/packages/application-generic/src/services/queues/execution-log-queue.service.ts b/packages/application-generic/src/services/queues/execution-log/execution-log-queue.service.ts similarity index 73% rename from packages/application-generic/src/services/queues/execution-log-queue.service.ts rename to packages/application-generic/src/services/queues/execution-log/execution-log-queue.service.ts index cdfed90982a..468a51d6f59 100644 --- a/packages/application-generic/src/services/queues/execution-log-queue.service.ts +++ b/packages/application-generic/src/services/queues/execution-log/execution-log-queue.service.ts @@ -1,7 +1,8 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; + import { JobTopicNameEnum } from '@novu/shared'; -import { QueueBaseService } from './queue-base.service'; +import { QueueBaseService } from '../queue-base.service'; const LOG_CONTEXT = 'ExecutionLogQueueService'; diff --git a/packages/application-generic/src/services/queues/index.ts b/packages/application-generic/src/services/queues/index.ts index 74dbeb9a153..566e05d8337 100644 --- a/packages/application-generic/src/services/queues/index.ts +++ b/packages/application-generic/src/services/queues/index.ts @@ -6,7 +6,7 @@ import { StandardQueueService } from './standard/standard-queue.service'; import { WebSocketsQueueService } from './web-sockets/web-sockets-queue.service'; import { WorkflowQueueService } from './workflow/workflow-queue.service'; import { SubscriberProcessQueueService } from './subscriber-process/subscriber-process-queue.service'; -import { ExecutionLogQueueService } from './execution-log-queue.service'; +import { ExecutionLogQueueService } from './execution-log/execution-log-queue.service'; export { QueueBaseService, diff --git a/packages/application-generic/src/usecases/add-job/add-job.usecase.ts b/packages/application-generic/src/usecases/add-job/add-job.usecase.ts index 41481a38161..c8cf7578506 100644 --- a/packages/application-generic/src/usecases/add-job/add-job.usecase.ts +++ b/packages/application-generic/src/usecases/add-job/add-job.usecase.ts @@ -12,12 +12,11 @@ import { MergeOrCreateDigestCommand } from './merge-or-create-digest.command'; import { MergeOrCreateDigest } from './merge-or-create-digest.usecase'; import { AddJobCommand } from './add-job.command'; import { CreateExecutionDetailsCommand, DetailEnum } from '../../usecases'; +import { CalculateDelayService, JobsOptions } from '../../services'; import { - CalculateDelayService, ExecutionLogQueueService, - JobsOptions, -} from '../../services'; -import { StandardQueueService } from '../../services/queues'; + StandardQueueService, +} from '../../services/queues'; import { LogDecorator } from '../../logging'; import { InstrumentUsecase } from '../../instrumentation'; import { validateDigest } from './validation'; @@ -138,9 +137,9 @@ export class AddJob { } const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(job), detail: DetailEnum.STEP_QUEUED, @@ -149,8 +148,8 @@ export class AddJob { isTest: false, isRetry: false, }), - job._organizationId - ); + groupId: job._organizationId, + }); const delay = command.filtered ? 0 : digestAmount ?? delayAmount; @@ -203,9 +202,9 @@ export class AddJob { Logger.verbose(logMessage, LOG_CONTEXT); const meta = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - meta._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: meta._id, + data: CreateExecutionDetailsCommand.create({ ...meta, ...CreateExecutionDetailsCommand.getDetailsFromJob(job), detail: @@ -218,8 +217,8 @@ export class AddJob { isRetry: false, raw: JSON.stringify({ delay }), }), - job._organizationId - ); + groupId: job._organizationId, + }); } } diff --git a/packages/application-generic/src/usecases/add-job/merge-or-create-digest.usecase.ts b/packages/application-generic/src/usecases/add-job/merge-or-create-digest.usecase.ts index 1c5c99dc52a..b7746027318 100644 --- a/packages/application-generic/src/usecases/add-job/merge-or-create-digest.usecase.ts +++ b/packages/application-generic/src/usecases/add-job/merge-or-create-digest.usecase.ts @@ -11,16 +11,14 @@ import { import { MergeOrCreateDigestCommand } from './merge-or-create-digest.command'; import { ApiException } from '../../utils/exceptions'; -import { - EventsDistributedLockService, - ExecutionLogQueueService, -} from '../../services'; +import { EventsDistributedLockService } from '../../services'; import { DigestFilterSteps } from '../digest-filter-steps'; import { DetailEnum, CreateExecutionDetailsCommand, } from '../create-execution-details'; import { Instrument, InstrumentUsecase } from '../../instrumentation'; +import { ExecutionLogQueueService } from '../../services/queues'; interface IFindAndUpdateResponse { matched: number; @@ -179,9 +177,9 @@ export class MergeOrCreateDigest { private async digestMergedExecutionDetails(job: JobEntity): Promise { const metadata = CreateExecutionDetailsCommand.getExecutionLogMetadata(); - await this.executionLogQueueService.add( - metadata._id, - CreateExecutionDetailsCommand.create({ + await this.executionLogQueueService.add({ + name: metadata._id, + data: CreateExecutionDetailsCommand.create({ ...metadata, ...CreateExecutionDetailsCommand.getDetailsFromJob(job), detail: DetailEnum.DIGEST_MERGED, @@ -190,7 +188,7 @@ export class MergeOrCreateDigest { isTest: false, isRetry: false, }), - job._organizationId - ); + groupId: job._organizationId, + }); } } From 670f15429f785aa6db8b4f83693213321c84fc62 Mon Sep 17 00:00:00 2001 From: Gosha Date: Sun, 26 Nov 2023 17:22:04 +0200 Subject: [PATCH 4/5] fix: spec function input --- .../src/app/workflow/services/execution-log.worker.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/worker/src/app/workflow/services/execution-log.worker.spec.ts b/apps/worker/src/app/workflow/services/execution-log.worker.spec.ts index 2f54909be0d..a7d75db83b1 100644 --- a/apps/worker/src/app/workflow/services/execution-log.worker.spec.ts +++ b/apps/worker/src/app/workflow/services/execution-log.worker.spec.ts @@ -65,7 +65,7 @@ describe('ExecutionLog Worker', () => { _userId, }; - await executionLogQueueService.add(jobId, jobData, _organizationId); + await executionLogQueueService.add({ name: jobId, data: jobData, groupId: _organizationId }); expect(await executionLogQueueService.queue.getActiveCount()).to.equal(1); expect(await executionLogQueueService.queue.getWaitingCount()).to.equal(0); From 1025f1897d834e8e2b2bb00ce8c6159e437cf85b Mon Sep 17 00:00:00 2001 From: Gosha Date: Sun, 26 Nov 2023 20:03:43 +0200 Subject: [PATCH 5/5] fix: spec function input --- .../parse-event-request.usecase.ts | 4 ++-- .../workflow/services/workflow.worker.spec.ts | 18 ++++++++++-------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts b/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts index 2248907a09c..dd277e36337 100644 --- a/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts +++ b/apps/api/src/app/events/usecases/parse-event-request/parse-event-request.usecase.ts @@ -129,10 +129,10 @@ export class ParseEventRequest { command.payload = merge({}, defaultPayload, command.payload); - const jobData = { + const jobData: ParseEventRequestCommand = { ...command, transactionId, - } as ParseEventRequestCommand; + }; await this.workflowQueueService.add({ name: transactionId, data: jobData, groupId: command.organizationId }); diff --git a/apps/worker/src/app/workflow/services/workflow.worker.spec.ts b/apps/worker/src/app/workflow/services/workflow.worker.spec.ts index 65b56bd5d00..425d38b3915 100644 --- a/apps/worker/src/app/workflow/services/workflow.worker.spec.ts +++ b/apps/worker/src/app/workflow/services/workflow.worker.spec.ts @@ -2,7 +2,7 @@ import { Test } from '@nestjs/testing'; import { expect } from 'chai'; import { setTimeout } from 'timers/promises'; -import { TriggerEvent, WorkflowQueueService } from '@novu/application-generic'; +import { IWorkflowDataDto, TriggerEvent, WorkflowQueueService } from '@novu/application-generic'; import { WorkflowWorker } from './workflow.worker'; @@ -56,15 +56,17 @@ describe('Workflow Worker', () => { const _environmentId = 'trigger-processor-queue-environment-id'; const _organizationId = 'trigger-processor-queue-organization-id'; const _userId = 'trigger-processor-queue-user-id'; - const jobData = { - _id: jobId, - test: 'trigger-processor-queue-job-data', - _environmentId, - _organizationId, - _userId, + const jobData: IWorkflowDataDto = { + environmentId: _environmentId, + organizationId: _organizationId, + userId: _userId, + payload: {}, + overrides: {}, + to: ['trigger-processor-queue-job-to'], + identifier: 'trigger-processor-queue-job-identifier', }; - await workflowQueueService.add(jobId, jobData, _organizationId); + await workflowQueueService.add({ name: jobId, data: jobData, groupId: _organizationId }); expect(await workflowQueueService.queue.getActiveCount()).to.equal(1); expect(await workflowQueueService.queue.getWaitingCount()).to.equal(0);