-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure that our queue services get validated object data #4843
Ensure that our queue services get validated object data #4843
Conversation
NV-3012 Ensure that our queue services get validated object data
What?At the moment we do not validate the data we pass queue services for example SubscriberProcessQueueService. Suggestions: Why? (Context)We could provide an invalid object by mistake. Definition of Done |
if ((command as ParseEventRequestMulticastCommand).to?.length > 0) { | ||
jobData = jobData as ParseEventRequestMulticastCommand; | ||
jobData.to = (command as ParseEventRequestMulticastCommand).to; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done by me this month, it is not needed as it will be destructured above in ...command,
.
More than that i regret that i created two types of one usecase command, Apologies in advance, i will revisit it in order to think of a better way.
return async ({ data }: { data: IInboundParseDataDto }) => { | ||
Logger.verbose({ data }, 'Processing the inbound parsed email', LOG_CONTEXT); | ||
await this.emailParseUsecase.execute(InboundEmailParseCommand.create({ ...data })); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should do the execution of Command create in this PR or not. it could be a breaking change.
export interface IHeaders { | ||
'content-type': string; | ||
from: string; | ||
to: string; | ||
subject: string; | ||
'message-id': string; | ||
date: string; | ||
'mime-version': string; | ||
} | ||
|
||
export interface IFrom { | ||
address: string; | ||
name: string; | ||
} | ||
|
||
export interface ITo { | ||
address: string; | ||
name: string; | ||
} | ||
|
||
export interface ITlsOptions { | ||
name: string; | ||
standardName: string; | ||
version: string; | ||
} | ||
|
||
export interface IMailFrom { | ||
address: string; | ||
args: boolean; | ||
} | ||
|
||
export interface IRcptTo { | ||
address: string; | ||
args: boolean; | ||
} | ||
|
||
export interface IEnvelope { | ||
mailFrom: IMailFrom; | ||
rcptTo: IRcptTo[]; | ||
} | ||
|
||
export interface IConnection { | ||
id: string; | ||
remoteAddress: string; | ||
remotePort: number; | ||
clientHostname: string; | ||
openingCommand: string; | ||
hostNameAppearsAs: string; | ||
xClient: any; | ||
xForward: any; | ||
transmissionType: string; | ||
tlsOptions: ITlsOptions; | ||
envelope: IEnvelope; | ||
transaction: number; | ||
mailPath: string; | ||
} | ||
|
||
export interface IEnvelopeFrom { | ||
address: string; | ||
args: boolean; | ||
} | ||
|
||
export interface IEnvelopeTo { | ||
address: string; | ||
args: boolean; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extracted to application-generic so we could reuse it.
const countQuery = isUnreadCountChanged ? { read: false } : { seen: false }; | ||
|
||
const count = await this.messageRepository.getCount( | ||
command.environmentId, | ||
subscriber._id, | ||
ChannelTypeEnum.IN_APP, | ||
countQuery | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was not used.
await inboundMailService.inboundParseQueueService.add(jobId, jobData, _organizationId); | ||
await inboundMailService.inboundParseQueueService.add({ | ||
name: jobId, | ||
data: jobData as unknown as IInboundParseDataDto, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make for the test, IInboundParseDataDto contained a lot of attributes that needed to be added. for now, added casting as those attributes were not tested.
@@ -45,7 +41,7 @@ export class StandardWorker extends StandardWorkerService implements INovuWorker | |||
|
|||
this.initWorker(this.getWorkerProcessor(), this.getWorkerOptions()); | |||
|
|||
this.worker.on('failed', async (job: Job<IJobData, void, string>, error: Error): Promise<void> => { | |||
this.worker.on('failed', async (job: Job<IStandardDataDto, void, string>, error: Error): Promise<void> => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will need either merge IJobData and IStandardDataDto, or connect them, as they share the same interface.
if (!message) { | ||
throw new Error('Job data is missing required fields' + JSON.stringify(data)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should not happen. if it does we need to throw it as no message exists.
|
||
if (!environmentId || !jobId || !organizationId || !userId) { | ||
const message = job.payload.message; | ||
const message = data.payload?.message; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This object is structure data.payload?.message
deprecated, i could not locate any place in the code that passes this data.
@@ -44,7 +45,7 @@ export class WorkflowWorker extends WorkflowWorkerService implements INovuWorker | |||
|
|||
storage.run(new Store(PinoLogger.root), () => { | |||
_this.triggerEventUsecase | |||
.execute(data) | |||
.execute(data as TriggerEventCommand) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should create a command here, but before and solve this issue of complex command interface https://github.com/novuhq/novu/pull/4843/files#r1392301028
@@ -37,14 +39,16 @@ export class WebSocketWorker extends WebSocketsWorkerService implements INovuWor | |||
'WS Service', | |||
function () { | |||
const transaction = nr.getTransaction(); | |||
const { data: jobData } = job; | |||
const data = jobData as IWebSocketDataDto; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure what is the type of job
above therefore i created this mapping so we could at least get the data
object right here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created those DTO's of our queue-worker communication for us to use.
export interface IProcessSubscriberJobDto extends IJobParams { | ||
data?: IProcessSubscriberDataDto; | ||
} | ||
|
||
export interface IProcessSubscriberBulkJobDto extends IBulkJobParams { | ||
data: IProcessSubscriberDataDto; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is part of the complexity i was talking about in the issue description, due to the metrics add execution allowing undefined data objects we need to create two separate interfaces because we do not want to allow undefined in the data as much as possible e.x the bulk DTO.
await this.instance.add( | ||
name, | ||
jobData, | ||
{ | ||
removeOnComplete: true, | ||
removeOnFail: true, | ||
...options, | ||
}, | ||
groupId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the dependency of the add
method.
groupId?: string, | ||
options: JobsOptions = {} | ||
) { | ||
public async addMinimalJob(params: IJobParams) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored to get object instead of params list so we could be explicit about the params.
groupId?: string, | ||
options: JobsOptions = {} | ||
) { | ||
public async add(params: IJobParams) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored to get object instead of params list so we could be explicit about the params.
export interface IJobParams { | ||
name: string; | ||
data?: any; | ||
groupId?: string; | ||
options?: JobsOptions; | ||
} | ||
|
||
export interface IBulkJobParams { | ||
name: string; | ||
data: any; | ||
groupId?: string; | ||
options?: BulkJobOptions; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is part of the complexity i was talking about in the issue description, due to the metrics add execution allowing undefined data objects we need to create two separate interfaces because we do not want to allow undefined in the data as much as possible e.x the bulk DTO.
- We need to find a better place for those interfaces.
] | ||
) { | ||
|
||
public async addBulk(data: IBulkJobParams[]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored to get object instead of params list so we could be explicit about the params.
@@ -58,14 +59,17 @@ describe('Standard Queue service', () => { | |||
const _environmentId = 'standard-environment-id'; | |||
const _organizationId = 'standard-organization-id'; | |||
const _userId = 'standard-user-id'; | |||
const jobData = { | |||
_id: jobId, | |||
test: 'standard-job-data', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
invalid param
public async add(data: unknown) { | ||
// in order to implement we need to know what should be the data dto first | ||
throw new Error('Not implemented'); | ||
} | ||
|
||
public async addBulk(data: unknown[]) { | ||
// in order to implement we need to know what should be the data dto first | ||
throw new Error('Not implemented'); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not use those methods at the moment, therefore i can not tell what should be the DTO here, i wonder if we should keep the exception or allow any here.
await this.instance.addBulk(data); | ||
} | ||
} | ||
|
||
export interface IJobParams { | ||
name: string; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the id
to name
as it is the perm that is used in pull mq so i wanted to align with it. Same for IBulkJobParams.
…ur-queue-services-get-validated-object-data # Conflicts: # packages/application-generic/src/custom-providers/index.ts # packages/application-generic/src/services/queues/index.ts # packages/application-generic/src/usecases/add-job/add-job.usecase.ts
What change does this PR introduce?
At the moment we do not validate the data we pass queue services for example SubscriberProcessQueueService.
Suggestions:
What we could do is create a middleware function 'add' in the child class, for example, SubscriberProcessQueueService with command and after then pass it to the parent 'add'.
Why was this change needed?
We could provide an invalid object to the queue services by mistake.
Other information and DOD's (Screenshots)
The remaining things that still need to be done:
undefined
data. IMO it was made in order to create metric cron jobs that do not pass any data, however, this option adds complexity to the code and could lead to a failure point where someone won't pass the data object. Instead, we could add minimal data on the metric identifier._
at the moment we sometimes use it and sometimes not. for example userId _userId. This will be a breaking change that will require backward combability support so I'd prefer to do it in separate dedicated PR.