Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that our queue services get validated object data #4843

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,12 @@ export class ParseEventRequest {

command.payload = merge({}, defaultPayload, command.payload);

let jobData = {
const jobData: ParseEventRequestCommand = {
...command,
actor: command.actor,
transactionId,
} as ParseEventRequestCommand;

if ((command as ParseEventRequestMulticastCommand).to?.length > 0) {
jobData = jobData as ParseEventRequestMulticastCommand;
jobData.to = (command as ParseEventRequestMulticastCommand).to;
Comment on lines -138 to -140
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done by me this month, it is not needed as it will be destructured above in ...command,.
More than that i regret that i created two types of one usecase command, Apologies in advance, i will revisit it in order to think of a better way.

}
};

await this.workflowQueueService.add(transactionId, jobData, command.organizationId);
await this.workflowQueueService.add({ name: transactionId, data: jobData, groupId: command.organizationId });

return {
acknowledged: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ import {
InboundParseQueue,
InboundParseWorker,
Queue,
QueueOptions,
Worker,
WorkerOptions,
} from '@novu/application-generic';
import { JobTopicNameEnum } from '@novu/shared';
import { Injectable, Logger } from '@nestjs/common';

import { InboundEmailParse } from '../usecases/inbound-email-parse/inbound-email-parse.usecase';
import { InboundEmailParseCommand } from '../usecases/inbound-email-parse/inbound-email-parse.command';
import { IInboundParseDataDto } from '@novu/application-generic/build/main/dtos/inbound-parse-job.dto';

const LOG_CONTEXT = 'InboundParseQueueService';

Expand All @@ -34,7 +33,7 @@ export class InboundParseQueueService {
}

public getWorkerProcessor() {
return async ({ data }: { data: InboundEmailParseCommand }) => {
return async ({ data }: { data: IInboundParseDataDto }) => {
Logger.verbose({ data }, 'Processing the inbound parsed email', LOG_CONTEXT);
await this.emailParseUsecase.execute(InboundEmailParseCommand.create({ ...data }));
Comment on lines +36 to 38
Copy link
Contributor Author

@djabarovgeorge djabarovgeorge Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should do the execution of Command create in this PR or not. it could be a breaking change.

};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import { IsDefined, IsNumber, IsOptional, IsString } from 'class-validator';
import { BaseCommand } from '@novu/application-generic';

export class InboundEmailParseCommand extends BaseCommand {
import {
BaseCommand,
IConnection,
IEnvelopeFrom,
IEnvelopeTo,
IFrom,
IHeaders,
IInboundParseDataDto,
ITo,
} from '@novu/application-generic';

export class InboundEmailParseCommand extends BaseCommand implements IInboundParseDataDto {
@IsDefined()
@IsString()
html: string;
Expand Down Expand Up @@ -66,70 +75,3 @@ export class InboundEmailParseCommand extends BaseCommand {
@IsDefined()
envelopeTo: IEnvelopeTo[];
}

export interface IHeaders {
'content-type': string;
from: string;
to: string;
subject: string;
'message-id': string;
date: string;
'mime-version': string;
}

export interface IFrom {
address: string;
name: string;
}

export interface ITo {
address: string;
name: string;
}

export interface ITlsOptions {
name: string;
standardName: string;
version: string;
}

export interface IMailFrom {
address: string;
args: boolean;
}

export interface IRcptTo {
address: string;
args: boolean;
}

export interface IEnvelope {
mailFrom: IMailFrom;
rcptTo: IRcptTo[];
}

export interface IConnection {
id: string;
remoteAddress: string;
remotePort: number;
clientHostname: string;
openingCommand: string;
hostNameAppearsAs: string;
xClient: any;
xForward: any;
transmissionType: string;
tlsOptions: ITlsOptions;
envelope: IEnvelope;
transaction: number;
mailPath: string;
}

export interface IEnvelopeFrom {
address: string;
args: boolean;
}

export interface IEnvelopeTo {
address: string;
args: boolean;
}
Comment on lines -70 to -135
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted to application-generic so we could reuse it.

Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,15 @@ export class MarkAllMessagesAs {
const isUnreadCountChanged =
command.markAs === MarkMessagesAsEnum.READ || command.markAs === MarkMessagesAsEnum.UNREAD;

const countQuery = isUnreadCountChanged ? { read: false } : { seen: false };

const count = await this.messageRepository.getCount(
command.environmentId,
subscriber._id,
ChannelTypeEnum.IN_APP,
countQuery
);
Comment on lines -59 to -66
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was not used.


this.webSocketsQueueService.add(
'sendMessage',
{
this.webSocketsQueueService.add({
name: 'sendMessage',
data: {
event: isUnreadCountChanged ? WebSocketEventEnum.UNREAD : WebSocketEventEnum.UNSEEN,
userId: subscriber._id,
_environmentId: command.environmentId,
},
subscriber._organizationId
);
groupId: subscriber._organizationId,
});

this.analyticsService.track(
`Mark all messages as ${command.markAs}- [Notification Center]`,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Injectable, NotFoundException } from '@nestjs/common';
import { MessageEntity, MessageRepository, SubscriberRepository, SubscriberEntity, MemberRepository } from '@novu/dal';
import { ChannelTypeEnum, WebSocketEventEnum } from '@novu/shared';
import { WebSocketEventEnum } from '@novu/shared';
import {
WebSocketsQueueService,
AnalyticsService,
Expand Down Expand Up @@ -85,15 +85,15 @@ export class MarkMessageAs {
private updateSocketCount(subscriber: SubscriberEntity, mark: MarkEnum) {
const eventMessage = mark === MarkEnum.READ ? WebSocketEventEnum.UNREAD : WebSocketEventEnum.UNSEEN;

this.webSocketsQueueService.add(
'sendMessage',
{
this.webSocketsQueueService.add({
name: 'sendMessage',
data: {
event: eventMessage,
userId: subscriber._id,
_environmentId: subscriber._environmentId,
},
subscriber._organizationId
);
groupId: subscriber._organizationId,
});
}
@CachedEntity({
builder: (command: { subscriberId: string; _environmentId: string }) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ export class RemoveMessage {
private updateSocketCount(subscriber: SubscriberEntity, mark: MarkEnum) {
const eventMessage = mark === MarkEnum.READ ? WebSocketEventEnum.UNREAD : WebSocketEventEnum.UNSEEN;

this.webSocketsQueueService.add(
'sendMessage',
{
this.webSocketsQueueService.add({
name: 'sendMessage',
data: {
event: eventMessage,
userId: subscriber._id,
_environmentId: subscriber._environmentId,
},
subscriber._organizationId
);
groupId: subscriber._organizationId,
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ export class RemoveAllMessages {
private updateSocketCount(subscriber: SubscriberEntity, mark: string) {
const eventMessage = mark === MarkEnum.READ ? WebSocketEventEnum.UNREAD : WebSocketEventEnum.UNSEEN;

this.webSocketsQueueService.add(
'sendMessage',
{
this.webSocketsQueueService.add({
name: 'sendMessage',
data: {
event: eventMessage,
userId: subscriber._id,
_environmentId: subscriber._environmentId,
},
subscriber._organizationId
);
groupId: subscriber._organizationId,
});
}
}
13 changes: 11 additions & 2 deletions apps/inbound-mail/src/server/inbound-mail.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { expect } from 'chai';

import { InboundMailService } from './inbound-mail.service';
import { IInboundParseDataDto } from '@novu/application-generic';

let inboundMailService: InboundMailService;

Expand Down Expand Up @@ -65,7 +66,11 @@ describe('Inbound Mail Service', () => {
_organizationId,
_userId,
};
await inboundMailService.inboundParseQueueService.add(jobId, jobData, _organizationId);
await inboundMailService.inboundParseQueueService.add({
name: jobId,
data: jobData as unknown as IInboundParseDataDto,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make for the test, IInboundParseDataDto contained a lot of attributes that needed to be added. for now, added casting as those attributes were not tested.

groupId: _organizationId,
});

expect(await inboundMailService.inboundParseQueueService.queue.getActiveCount()).to.equal(0);
expect(await inboundMailService.inboundParseQueueService.queue.getWaitingCount()).to.equal(1);
Expand Down Expand Up @@ -93,7 +98,11 @@ describe('Inbound Mail Service', () => {
_organizationId,
_userId,
};
await inboundMailService.inboundParseQueueService.addMinimalJob(jobId, jobData, _organizationId);
await inboundMailService.inboundParseQueueService.addMinimalJob({
name: jobId,
data: jobData,
groupId: _organizationId,
});

expect(await inboundMailService.inboundParseQueueService.queue.getActiveCount()).to.equal(0);
expect(await inboundMailService.inboundParseQueueService.queue.getWaitingCount()).to.equal(1);
Expand Down
6 changes: 5 additions & 1 deletion apps/inbound-mail/src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,11 @@ class Mailin extends events.EventEmitter {
const username: string = parts[0];
const environmentId = username.split('-nv-e=').at(-1);

inboundMailService.inboundParseQueueService.add(finalizedMessage.messageId, finalizedMessage, environmentId);
inboundMailService.inboundParseQueueService.add({
name: finalizedMessage.messageId,
data: finalizedMessage,
groupId: environmentId,
});

return resolve();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,21 @@ export class ActiveJobsMetricService {
if (!exists) {
Logger.debug(`metricJob doesn't exist, creating it`, LOG_CONTEXT);

return await this.activeJobsMetricQueueService.add(METRIC_JOB_ID, undefined, '', {
jobId: METRIC_JOB_ID,
repeatJobKey: METRIC_JOB_ID,
repeat: {
immediately: true,
pattern: '* * * * * *',
return await this.activeJobsMetricQueueService.add({
name: METRIC_JOB_ID,
data: undefined,
groupId: '',
options: {
jobId: METRIC_JOB_ID,
repeatJobKey: METRIC_JOB_ID,
repeat: {
immediately: true,
pattern: '* * * * * *',
},
removeOnFail: true,
removeOnComplete: true,
attempts: 1,
},
removeOnFail: true,
removeOnComplete: true,
attempts: 1,
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,21 @@ export class CompletedJobsMetricService {
if (!exists) {
Logger.debug(`metricJob doesn't exist, creating it`, LOG_CONTEXT);

return await this.completedJobsMetricQueueService.add(METRIC_JOB_ID, undefined, '', {
jobId: METRIC_JOB_ID,
repeatJobKey: METRIC_JOB_ID,
repeat: {
immediately: true,
pattern: '0 * * * * *',
return await this.completedJobsMetricQueueService.add({
name: METRIC_JOB_ID,
data: undefined,
groupId: '',
options: {
jobId: METRIC_JOB_ID,
repeatJobKey: METRIC_JOB_ID,
repeat: {
immediately: true,
pattern: '0 * * * * *',
},
removeOnFail: true,
removeOnComplete: true,
attempts: 1,
},
removeOnFail: true,
removeOnComplete: true,
attempts: 1,
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ describe('ExecutionLog Worker', () => {
_userId,
};

await executionLogQueueService.add(jobId, jobData, _organizationId);
await executionLogQueueService.add({ name: jobId, data: jobData, groupId: _organizationId });

expect(await executionLogQueueService.queue.getActiveCount()).to.equal(1);
expect(await executionLogQueueService.queue.getWaitingCount()).to.equal(0);
Expand Down
4 changes: 2 additions & 2 deletions apps/worker/src/app/workflow/services/standard.worker.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ describe('Standard Worker', () => {
_userId: jobCreated._userId,
};

await standardQueueService.addMinimalJob(jobCreated._id, jobData, '0');
await standardQueueService.addMinimalJob({ name: jobCreated._id, data: jobData, groupId: '0' });

await jobsService.awaitRunningJobs({
templateId: _templateId,
Expand Down Expand Up @@ -267,7 +267,7 @@ describe('Standard Worker', () => {
_userId: jobCreated._userId,
};

await standardQueueService.addMinimalJob(jobCreated._id, jobData, '0');
await standardQueueService.addMinimalJob({ name: jobCreated._id, data: jobData, groupId: '0' });

await jobsService.awaitRunningJobs({
templateId: _templateId,
Expand Down
Loading
Loading