Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that our queue services get validated object data #4843

Conversation

djabarovgeorge
Copy link
Contributor

@djabarovgeorge djabarovgeorge commented Nov 14, 2023

What change does this PR introduce?

At the moment we do not validate the data we pass queue services for example SubscriberProcessQueueService.

Suggestions:
What we could do is create a middleware function 'add' in the child class, for example, SubscriberProcessQueueService with command and after then pass it to the parent 'add'.

Why was this change needed?

We could provide an invalid object to the queue services by mistake.

Other information and DOD's (Screenshots)

The remaining things that still need to be done:

  • Refactor the job data object to be defined, at the moment we allow to pass undefined data. IMO it was made in order to create metric cron jobs that do not pass any data, however, this option adds complexity to the code and could lead to a failure point where someone won't pass the data object. Instead, we could add minimal data on the metric identifier.
  • Make a discovery in order to make the base queue service move generic by getting the type from its child classes, this way we won't need to duplicate the methods in the services. in order to do so we will need to remove the mapping of the bull mq in the queue service and move it to the bull mq service.
  • Refactor so that all of the workers will execute the creation of the Command. was not sure if it should be done in this PR or not, because it could be a breaking change.
  • Align all of DTO's params to use align internal conventions, meaning if it is novu internal id it should have a prefix of _ at the moment we sometimes use it and sometimes not. for example userId _userId. This will be a breaking change that will require backward combability support so I'd prefer to do it in separate dedicated PR.

Copy link

linear bot commented Nov 14, 2023

NV-3012 Ensure that our queue services get validated object data

What?

At the moment we do not validate the data we pass queue services for example SubscriberProcessQueueService.

Suggestions:
What we could do is create a middleware function 'add' in the child class, for example, SubscriberProcessQueueService with command and after then pass it to the parent 'add'.

Why? (Context)

We could provide an invalid object by mistake.

Definition of Done

Comment on lines -113 to -140
if ((command as ParseEventRequestMulticastCommand).to?.length > 0) {
jobData = jobData as ParseEventRequestMulticastCommand;
jobData.to = (command as ParseEventRequestMulticastCommand).to;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done by me this month, it is not needed as it will be destructured above in ...command,.
More than that i regret that i created two types of one usecase command, Apologies in advance, i will revisit it in order to think of a better way.

Comment on lines +36 to 38
return async ({ data }: { data: IInboundParseDataDto }) => {
Logger.verbose({ data }, 'Processing the inbound parsed email', LOG_CONTEXT);
await this.emailParseUsecase.execute(InboundEmailParseCommand.create({ ...data }));
Copy link
Contributor Author

@djabarovgeorge djabarovgeorge Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should do the execution of Command create in this PR or not. it could be a breaking change.

Comment on lines -70 to -135
export interface IHeaders {
'content-type': string;
from: string;
to: string;
subject: string;
'message-id': string;
date: string;
'mime-version': string;
}

export interface IFrom {
address: string;
name: string;
}

export interface ITo {
address: string;
name: string;
}

export interface ITlsOptions {
name: string;
standardName: string;
version: string;
}

export interface IMailFrom {
address: string;
args: boolean;
}

export interface IRcptTo {
address: string;
args: boolean;
}

export interface IEnvelope {
mailFrom: IMailFrom;
rcptTo: IRcptTo[];
}

export interface IConnection {
id: string;
remoteAddress: string;
remotePort: number;
clientHostname: string;
openingCommand: string;
hostNameAppearsAs: string;
xClient: any;
xForward: any;
transmissionType: string;
tlsOptions: ITlsOptions;
envelope: IEnvelope;
transaction: number;
mailPath: string;
}

export interface IEnvelopeFrom {
address: string;
args: boolean;
}

export interface IEnvelopeTo {
address: string;
args: boolean;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted to application-generic so we could reuse it.

Comment on lines -59 to -66
const countQuery = isUnreadCountChanged ? { read: false } : { seen: false };

const count = await this.messageRepository.getCount(
command.environmentId,
subscriber._id,
ChannelTypeEnum.IN_APP,
countQuery
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was not used.

await inboundMailService.inboundParseQueueService.add(jobId, jobData, _organizationId);
await inboundMailService.inboundParseQueueService.add({
name: jobId,
data: jobData as unknown as IInboundParseDataDto,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make for the test, IInboundParseDataDto contained a lot of attributes that needed to be added. for now, added casting as those attributes were not tested.

@@ -45,7 +41,7 @@ export class StandardWorker extends StandardWorkerService implements INovuWorker

this.initWorker(this.getWorkerProcessor(), this.getWorkerOptions());

this.worker.on('failed', async (job: Job<IJobData, void, string>, error: Error): Promise<void> => {
this.worker.on('failed', async (job: Job<IStandardDataDto, void, string>, error: Error): Promise<void> => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will need either merge IJobData and IStandardDataDto, or connect them, as they share the same interface.

Comment on lines +69 to +71
if (!message) {
throw new Error('Job data is missing required fields' + JSON.stringify(data));
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not happen. if it does we need to throw it as no message exists.


if (!environmentId || !jobId || !organizationId || !userId) {
const message = job.payload.message;
const message = data.payload?.message;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This object is structure data.payload?.message deprecated, i could not locate any place in the code that passes this data.

@@ -44,7 +45,7 @@ export class WorkflowWorker extends WorkflowWorkerService implements INovuWorker

storage.run(new Store(PinoLogger.root), () => {
_this.triggerEventUsecase
.execute(data)
.execute(data as TriggerEventCommand)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should create a command here, but before and solve this issue of complex command interface https://github.com/novuhq/novu/pull/4843/files#r1392301028

@@ -37,14 +39,16 @@ export class WebSocketWorker extends WebSocketsWorkerService implements INovuWor
'WS Service',
function () {
const transaction = nr.getTransaction();
const { data: jobData } = job;
const data = jobData as IWebSocketDataDto;
Copy link
Contributor Author

@djabarovgeorge djabarovgeorge Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what is the type of job above therefore i created this mapping so we could at least get the data object right here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created those DTO's of our queue-worker communication for us to use.

Comment on lines +23 to +29
export interface IProcessSubscriberJobDto extends IJobParams {
data?: IProcessSubscriberDataDto;
}

export interface IProcessSubscriberBulkJobDto extends IBulkJobParams {
data: IProcessSubscriberDataDto;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of the complexity i was talking about in the issue description, due to the metrics add execution allowing undefined data objects we need to create two separate interfaces because we do not want to allow undefined in the data as much as possible e.x the bulk DTO.

Comment on lines +86 to +94
await this.instance.add(
name,
jobData,
{
removeOnComplete: true,
removeOnFail: true,
...options,
},
groupId
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the dependency of the add method.

groupId?: string,
options: JobsOptions = {}
) {
public async addMinimalJob(params: IJobParams) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to get object instead of params list so we could be explicit about the params.

groupId?: string,
options: JobsOptions = {}
) {
public async add(params: IJobParams) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to get object instead of params list so we could be explicit about the params.

Comment on lines +118 to +130
export interface IJobParams {
name: string;
data?: any;
groupId?: string;
options?: JobsOptions;
}

export interface IBulkJobParams {
name: string;
data: any;
groupId?: string;
options?: BulkJobOptions;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of the complexity i was talking about in the issue description, due to the metrics add execution allowing undefined data objects we need to create two separate interfaces because we do not want to allow undefined in the data as much as possible e.x the bulk DTO.

  • We need to find a better place for those interfaces.

]
) {

public async addBulk(data: IBulkJobParams[]) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to get object instead of params list so we could be explicit about the params.

@@ -58,14 +59,17 @@ describe('Standard Queue service', () => {
const _environmentId = 'standard-environment-id';
const _organizationId = 'standard-organization-id';
const _userId = 'standard-user-id';
const jobData = {
_id: jobId,
test: 'standard-job-data',
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invalid param

Comment on lines +22 to +30
public async add(data: unknown) {
// in order to implement we need to know what should be the data dto first
throw new Error('Not implemented');
}

public async addBulk(data: unknown[]) {
// in order to implement we need to know what should be the data dto first
throw new Error('Not implemented');
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not use those methods at the moment, therefore i can not tell what should be the DTO here, i wonder if we should keep the exception or allow any here.

await this.instance.addBulk(data);
}
}

export interface IJobParams {
name: string;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the id to name as it is the perm that is used in pull mq so i wanted to align with it. Same for IBulkJobParams.

…ur-queue-services-get-validated-object-data

# Conflicts:
#	packages/application-generic/src/custom-providers/index.ts
#	packages/application-generic/src/services/queues/index.ts
#	packages/application-generic/src/usecases/add-job/add-job.usecase.ts
@djabarovgeorge djabarovgeorge deleted the nv-3012-ensure-that-our-queue-services-get-validated-object-data branch December 10, 2023 10:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant