Skip to content

Commit

Permalink
feat(api): process maddo pgboss jobs in maddo server
Browse files Browse the repository at this point in the history
Co-authored-by: Aurélie Crouillebois <[email protected]>
Co-authored-by: Vincent Hardouin <[email protected]>
  • Loading branch information
3 people committed Feb 28, 2025
1 parent e3cdafb commit c0f272e
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 17 deletions.
2 changes: 1 addition & 1 deletion api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ process.on('SIGINT', () => {
try {
await start();
if (config.infra.startJobInWebProcess) {
registerJobs({ jobGroup: JobGroup.DEFAULT });
registerJobs({ jobGroups: [JobGroup.DEFAULT, JobGroup.FAST] });
}
} catch (error) {
logger.error(error);
Expand Down
7 changes: 6 additions & 1 deletion api/index.maddo.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ import 'dotenv/config';

import { databaseConnections } from './db/database-connections.js';
import { createMaddoServer } from './server.maddo.js';
import { schema as configSchema } from './src/shared/config.js';
import { JobGroup } from './src/shared/application/jobs/job-controller.js';
import { config, schema as configSchema } from './src/shared/config.js';
import { quitAllStorages } from './src/shared/infrastructure/key-value-storages/index.js';
import { logger } from './src/shared/infrastructure/utils/logger.js';
import { redisMonitor } from './src/shared/infrastructure/utils/redis-monitor.js';
import { validateEnvironmentVariables } from './src/shared/infrastructure/validate-environment-variables.js';
import { registerJobs } from './worker.js';

validateEnvironmentVariables(configSchema);

Expand Down Expand Up @@ -45,6 +47,9 @@ process.on('SIGINT', () => {
(async () => {
try {
await start();
if (config.infra.startJobInWebProcess) {
registerJobs({ jobGroups: [JobGroup.MADDO] });
}
} catch (error) {
logger.error(error);
throw error;
Expand Down
29 changes: 22 additions & 7 deletions api/tests/unit/worker_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ describe('#registerJobs', function () {
it('should register UserAnonymizedEventLoggingJob', async function () {
// when
await registerJobs({
jobGroup: JobGroup.DEFAULT,
jobGroups: [JobGroup.DEFAULT],
dependencies: {
startPgBoss: startPgBossStub,
createJobQueue: createJobQueueStub,
Expand All @@ -50,7 +50,7 @@ describe('#registerJobs', function () {
.stub(UserAnonymizedEventLoggingJobController.prototype, 'legacyName')
.get(() => 'legyNameForUserAnonymizedEventLoggingJobController');
await registerJobs({
jobGroup: JobGroup.DEFAULT,
jobGroups: [JobGroup.DEFAULT],
dependencies: {
startPgBoss: startPgBossStub,
createJobQueue: createJobQueueStub,
Expand All @@ -71,7 +71,7 @@ describe('#registerJobs', function () {

// when
await registerJobs({
jobGroup: JobGroup.DEFAULT,
jobGroups: [JobGroup.DEFAULT],
dependencies: {
startPgBoss: startPgBossStub,
createJobQueue: createJobQueueStub,
Expand All @@ -92,7 +92,7 @@ describe('#registerJobs', function () {

// when
await registerJobs({
jobGroup: JobGroup.DEFAULT,
jobGroups: [JobGroup.DEFAULT],
dependencies: {
startPgBoss: startPgBossStub,
createJobQueue: createJobQueueStub,
Expand All @@ -106,9 +106,24 @@ describe('#registerJobs', function () {
);
});

it('should throws an error when no groups is invalid', async function () {
// given
const error = await catchErr(registerJobs)({
dependencies: {
startPgBoss: startPgBossStub,
createJobQueue: createJobQueueStub,
},
});

// then
expect(error).to.be.instanceOf(Error);
expect(error.message).to.equal('Job groups are mandatory');
});

it('should throws an error when group is invalid', async function () {
// given
const error = await catchErr(registerJobs)({
jobGroups: ['pouet'],
dependencies: {
startPgBoss: startPgBossStub,
createJobQueue: createJobQueueStub,
Expand All @@ -126,7 +141,7 @@ describe('#registerJobs', function () {
sinon.stub(config.features.scheduleComputeOrganizationLearnersCertificability, 'cron').value('0 21 * * *');

await registerJobs({
jobGroup: JobGroup.DEFAULT,
jobGroups: [JobGroup.DEFAULT],
dependencies: {
startPgBoss: startPgBossStub,
createJobQueue: createJobQueueStub,
Expand All @@ -150,7 +165,7 @@ describe('#registerJobs', function () {
sinon.stub(config.features.scheduleComputeOrganizationLearnersCertificability, 'cron').value('0 21 * * *');

await registerJobs({
jobGroup: JobGroup.DEFAULT,
jobGroups: [JobGroup.DEFAULT],
dependencies: {
startPgBoss: startPgBossStub,
createJobQueue: createJobQueueStub,
Expand All @@ -170,7 +185,7 @@ describe('#registerJobs', function () {
sinon.stub(config.pgBoss, 'exportSenderJobEnabled').value(false);

await registerJobs({
jobGroup: JobGroup.DEFAULT,
jobGroups: [JobGroup.DEFAULT],
dependencies: {
startPgBoss: startPgBossStub,
createJobQueue: createJobQueueStub,
Expand Down
19 changes: 11 additions & 8 deletions api/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import { child } from './src/shared/infrastructure/utils/logger.js';

const logger = child('worker', { event: 'worker' });

const isJobInWebProcess = process.env.START_JOB_IN_WEB_PROCESS === 'true';
const workerDirPath = dirname(fileURLToPath(import.meta.url));

const metrics = new Metrics({ config });
Expand Down Expand Up @@ -57,15 +56,19 @@ function createJobQueue(pgBoss) {
return jobQueue;
}

function checkJobGroups(jobGroups) {
if (!jobGroups) throw new Error('Job groups are mandatory');
jobGroups.forEach((jobGroup) => checkJobGroup(jobGroup));
}

function checkJobGroup(jobGroup) {
if (!Object.values(JobGroup).includes(jobGroup)) {
throw new Error(`Job group invalid, allowed Job groups are [${Object.values(JobGroup)}]`);
}
logger.info(`Job group "${jobGroup}"`);
}

export async function registerJobs({ jobGroup, dependencies = { startPgBoss, createJobQueue } }) {
checkJobGroup(jobGroup);
export async function registerJobs({ jobGroups, dependencies = { startPgBoss, createJobQueue } }) {
checkJobGroups(jobGroups);

const pgBoss = await dependencies.startPgBoss();

Expand All @@ -88,7 +91,7 @@ export async function registerJobs({ jobGroup, dependencies = { startPgBoss, cre
for (const [moduleName, ModuleClass] of Object.entries(jobModules)) {
const job = new ModuleClass();

if (!isJobInWebProcess && job.jobGroup !== jobGroup) continue;
if (!jobGroups.includes(job.jobGroup)) continue;

if (job.isJobEnabled) {
logger.info(`Job "${job.jobName}" registered from module "${moduleName}."`);
Expand Down Expand Up @@ -127,15 +130,15 @@ export async function registerJobs({ jobGroup, dependencies = { startPgBoss, cre
}
}

logger.info(`${jobRegisteredCount} jobs registered for group "${jobGroup}".`);
logger.info(`${cronJobCount} cron jobs scheduled for group "${jobGroup}".`);
logger.info(`${jobRegisteredCount} jobs registered for groups "${jobGroups}".`);
logger.info(`${cronJobCount} cron jobs scheduled for groups "${jobGroups}".`);
}

const isRunningFromCli = import.meta.filename === process.argv[1];

async function main() {
const jobGroup = process.argv[2] ? JobGroup[process.argv[2]?.toUpperCase()] : JobGroup.DEFAULT;
await registerJobs({ jobGroup });
await registerJobs({ jobGroups: [jobGroup] });
process.on('SIGINT', async () => {
await quitAllStorages();
await metrics.clearMetrics();
Expand Down

0 comments on commit c0f272e

Please sign in to comment.