From eae31d27237959b7d052c8576b847f0b58fd9f74 Mon Sep 17 00:00:00 2001 From: Nicolas Lepage <19571875+nlepage@users.noreply.github.com> Date: Thu, 27 Feb 2025 14:14:08 +0100 Subject: [PATCH 01/10] feat: adds maddo jobgroup in pgboss and maddo worker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Aurélie Crouillebois Co-authored-by: Ce que tu veux Co-authored-by: Vincent Hardouin --- api/Procfile-maddo | 1 + api/src/shared/application/jobs/job-controller.js | 1 + 2 files changed, 2 insertions(+) diff --git a/api/Procfile-maddo b/api/Procfile-maddo index b33d5c0844f..53a4598536f 100644 --- a/api/Procfile-maddo +++ b/api/Procfile-maddo @@ -5,3 +5,4 @@ postdeploy: npm run postdeploy:maddo # and https://github.com/npm/npm/issues/4603 # for more information web: exec node maddo.js +worker: exec node worker.js maddo diff --git a/api/src/shared/application/jobs/job-controller.js b/api/src/shared/application/jobs/job-controller.js index 6000431d231..7c6565c4480 100644 --- a/api/src/shared/application/jobs/job-controller.js +++ b/api/src/shared/application/jobs/job-controller.js @@ -7,6 +7,7 @@ import { JobExpireIn } from '../../infrastructure/repositories/jobs/job-reposito export const JobGroup = { DEFAULT: 'default', FAST: 'fast', + MADDO: 'maddo', }; export class JobController { From 5e9c974eee4d89faa9ab8d055fc9061aa9d666f0 Mon Sep 17 00:00:00 2001 From: Nicolas Lepage <19571875+nlepage@users.noreply.github.com> Date: Thu, 27 Feb 2025 15:09:15 +0100 Subject: [PATCH 02/10] feat(maddo): runs replications in pgboss jobs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Aurélie Crouillebois Co-authored-by: Ce que tu veux Co-authored-by: Vincent Hardouin --- api/db/seeds/data/common/common-builder.js | 2 +- api/server.maddo.js | 3 +- .../jobs/replication-job-controller.js | 25 ++++++ .../application/replications-controller.js | 34 ++------ api/src/maddo/domain/models/ReplicationJob.js | 5 ++ .../extract-transform-and-load-data.js | 9 ++- .../jobs/replication-job-repository.js | 4 + .../acceptance/replications-routes_test.js | 63 --------------- .../unit/replications-controller_test.js | 77 ++++-------------- .../extract-transform-and-load-data_test.js | 55 +++++++++++++ .../extract-transform-and-load-data_test.js | 80 ++++++++++++------- 11 files changed, 175 insertions(+), 182 deletions(-) create mode 100644 api/src/maddo/application/jobs/replication-job-controller.js create mode 100644 api/src/maddo/domain/models/ReplicationJob.js create mode 100644 api/src/maddo/infrastructure/repositories/jobs/replication-job-repository.js delete mode 100644 api/tests/maddo/application/acceptance/replications-routes_test.js create mode 100644 api/tests/maddo/domain/integration/usecases/extract-transform-and-load-data_test.js diff --git a/api/db/seeds/data/common/common-builder.js b/api/db/seeds/data/common/common-builder.js index 722915171a3..31b1974f74e 100644 --- a/api/db/seeds/data/common/common-builder.js +++ b/api/db/seeds/data/common/common-builder.js @@ -85,7 +85,7 @@ function createClientApplications(databaseBuilder) { name: 'pixData', clientId: 'pixData', clientSecret: 'pixdatasecret', - scopes: ['statistics'], + scopes: ['statistics', 'replication'], }); databaseBuilder.factory.buildClientApplication({ name: 'parcoursup', diff --git a/api/server.maddo.js b/api/server.maddo.js index e45e9abbebe..42680c0d8d9 100644 --- a/api/server.maddo.js +++ b/api/server.maddo.js @@ -4,6 +4,7 @@ import { parse } from 'neoqs'; import { setupErrorHandling } from './config/server-setup-error-handling.js'; import { knex } from './db/knex-database-connection.js'; +import * as authenticationRoutes from './lib/application/authentication/index.js'; import { authentication } from './lib/infrastructure/authentication.js'; import * as replicationRoutes from './src/maddo/application/replications-routes.js'; import { Metrics } from './src/monitoring/infrastructure/metrics.js'; @@ -178,7 +179,7 @@ const setupAuthentication = function (server) { }; const setupRoutesAndPlugins = async function (server) { - await server.register([...plugins, healthcheckRoutes, replicationRoutes]); + await server.register([...plugins, healthcheckRoutes, authenticationRoutes, replicationRoutes]); }; const setupOpenApiSpecification = async function (server) { diff --git a/api/src/maddo/application/jobs/replication-job-controller.js b/api/src/maddo/application/jobs/replication-job-controller.js new file mode 100644 index 00000000000..eb9ec5cd5b4 --- /dev/null +++ b/api/src/maddo/application/jobs/replication-job-controller.js @@ -0,0 +1,25 @@ +import { knex as datamartKnex } from '../../../../datamart/knex-database-connection.js'; +import { knex as datawarehouseKnex } from '../../../../datawarehouse/knex-database-connection.js'; +import { JobController, JobGroup } from '../../../shared/application/jobs/job-controller.js'; +import { ReplicationJob } from '../../domain/models/ReplicationJob.js'; +import { extractTransformAndLoadData } from '../../domain/usecases/extract-transform-and-load-data.js'; +import * as replicationRepository from '../../infrastructure/repositories/replication-repository.js'; + +export class ReplicationJobController extends JobController { + constructor() { + super(ReplicationJob.name, { jobGroup: JobGroup.MADDO }); + } + + async handle({ + data: { replicationName }, + dependencies = { extractTransformAndLoadData, replicationRepository, datamartKnex, datawarehouseKnex }, + }) { + const { extractTransformAndLoadData, replicationRepository, datamartKnex, datawarehouseKnex } = dependencies; + await extractTransformAndLoadData({ + replicationName, + replicationRepository, + datamartKnex, + datawarehouseKnex, + }); + } +} diff --git a/api/src/maddo/application/replications-controller.js b/api/src/maddo/application/replications-controller.js index a23ab8f1fa9..544550b0d71 100644 --- a/api/src/maddo/application/replications-controller.js +++ b/api/src/maddo/application/replications-controller.js @@ -1,47 +1,25 @@ -import { knex as datamartKnex } from '../../../datamart/knex-database-connection.js'; -import { knex as datawarehouseKnex } from '../../../datawarehouse/knex-database-connection.js'; -import { logger } from '../../shared/infrastructure/utils/logger.js'; -import { extractTransformAndLoadData } from '../domain/usecases/extract-transform-and-load-data.js'; +import { ReplicationJob } from '../domain/models/ReplicationJob.js'; +import { replicationJobRepository } from '../infrastructure/repositories/jobs/replication-job-repository.js'; import * as replicationRepository from '../infrastructure/repositories/replication-repository.js'; export async function replicate( request, h, dependencies = { - extractTransformAndLoadData, replicationRepository, - datamartKnex, - datawarehouseKnex, - logger, + replicationJobRepository, }, ) { + const { replicationRepository, replicationJobRepository } = dependencies; const { replicationName } = request.params; - const replication = dependencies.replicationRepository.getByName(replicationName); + const replication = replicationRepository.getByName(replicationName); if (!replication) { return h.response().code(404); } - const promise = dependencies - .extractTransformAndLoadData({ - replication, - datamartKnex: dependencies.datamartKnex, - datawarehouseKnex: dependencies.datawarehouseKnex, - }) - .catch((err) => - dependencies.logger.error( - { - event: 'replication', - err, - }, - 'Error during replication', - ), - ); - - if (!request.query.async) { - await promise; - } + await replicationJobRepository.performAsync(new ReplicationJob({ replicationName })); return h.response().code(204); } diff --git a/api/src/maddo/domain/models/ReplicationJob.js b/api/src/maddo/domain/models/ReplicationJob.js new file mode 100644 index 00000000000..88b999a8251 --- /dev/null +++ b/api/src/maddo/domain/models/ReplicationJob.js @@ -0,0 +1,5 @@ +export class ReplicationJob { + constructor({ replicationName }) { + this.replicationName = replicationName; + } +} diff --git a/api/src/maddo/domain/usecases/extract-transform-and-load-data.js b/api/src/maddo/domain/usecases/extract-transform-and-load-data.js index c06d61230b0..d40c323ddf0 100644 --- a/api/src/maddo/domain/usecases/extract-transform-and-load-data.js +++ b/api/src/maddo/domain/usecases/extract-transform-and-load-data.js @@ -1,6 +1,13 @@ const DEFAULT_CHUNK_SIZE = 1_000; -export async function extractTransformAndLoadData({ replication, datawarehouseKnex, datamartKnex }) { +export async function extractTransformAndLoadData({ + replicationName, + replicationRepository, + datawarehouseKnex, + datamartKnex, +}) { + const replication = replicationRepository.getByName(replicationName); + let context = { datawarehouseKnex, datamartKnex }; const additionnalContext = await replication.before?.(context); diff --git a/api/src/maddo/infrastructure/repositories/jobs/replication-job-repository.js b/api/src/maddo/infrastructure/repositories/jobs/replication-job-repository.js new file mode 100644 index 00000000000..0cfd52f5148 --- /dev/null +++ b/api/src/maddo/infrastructure/repositories/jobs/replication-job-repository.js @@ -0,0 +1,4 @@ +import { JobRepository, JobRetry } from '../../../../shared/infrastructure/repositories/jobs/job-repository.js'; +import { ReplicationJob } from '../../../domain/models/ReplicationJob.js'; + +export const replicationJobRepository = new JobRepository({ name: ReplicationJob.name, retry: JobRetry.FEW_RETRY }); diff --git a/api/tests/maddo/application/acceptance/replications-routes_test.js b/api/tests/maddo/application/acceptance/replications-routes_test.js deleted file mode 100644 index f7133b535b0..00000000000 --- a/api/tests/maddo/application/acceptance/replications-routes_test.js +++ /dev/null @@ -1,63 +0,0 @@ -import { replications } from '../../../../src/maddo/infrastructure/repositories/replication-repository.js'; -import { - createMaddoServer, - datamartKnex, - datawarehouseKnex, - expect, - generateValidRequestAuthorizationHeaderForApplication, -} from '../../../test-helper.js'; - -describe('Maddo | Application | Acceptance | Replications', function () { - describe('POST /api/replications/{replication}', function () { - let server; - - beforeEach(async function () { - const schema = (t) => { - t.string('firstName').notNullable(); - t.string('lastName').notNullable(); - }; - await datawarehouseKnex.schema.dropTableIfExists('to-replicate'); - await datawarehouseKnex.schema.createTable('to-replicate', schema); - await datamartKnex.schema.dropTableIfExists('replication'); - await datamartKnex.schema.createTable('replication', schema); - server = await createMaddoServer(); - }); - - it('should run given replication', async function () { - // given - const replication = 'my-replication'; - await datawarehouseKnex('to-replicate').insert([ - { firstName: 'first1', lastName: 'last1' }, - { firstName: 'first2', lastName: 'last2' }, - ]); - - replications.push({ - name: 'my-replication', - from: ({ datawarehouseKnex }) => { - return datawarehouseKnex('to-replicate').select('*'); - }, - to: ({ datamartKnex }, chunk) => { - return datamartKnex('replication').insert(chunk); - }, - }); - - // when - const response = await server.inject({ - method: 'POST', - url: `/api/replications/${replication}?async=false`, - headers: { - authorization: generateValidRequestAuthorizationHeaderForApplication( - 'pix-client', - 'pix-client', - 'replication', - ), - }, - }); - - // then - expect(response.statusCode).to.equal(204); - const { count } = await datamartKnex('replication').count().first(); - expect(count).to.equal(2); - }); - }); -}); diff --git a/api/tests/maddo/application/unit/replications-controller_test.js b/api/tests/maddo/application/unit/replications-controller_test.js index 8b2edbfa764..063c33e2fe9 100644 --- a/api/tests/maddo/application/unit/replications-controller_test.js +++ b/api/tests/maddo/application/unit/replications-controller_test.js @@ -1,9 +1,10 @@ import { replicate } from '../../../../src/maddo/application/replications-controller.js'; +import { ReplicationJob } from '../../../../src/maddo/domain/models/ReplicationJob.js'; import { expect, sinon } from '../../../test-helper.js'; describe('Maddo | Application | Unit | Controller | Replication', function () { describe('#replicate', function () { - it('should call use-case with given replication', async function () { + it('should create async replication job', async function () { // given const replicationName = 'foo'; const replication = Symbol('replication'); @@ -21,74 +22,27 @@ describe('Maddo | Application | Unit | Controller | Replication', function () { code: codeStub, }), }; - const extractTransformAndLoadData = sinon.stub().resolves(); const replicationRepository = { - getByName: sinon.stub().withArgs(replicationName).returns(replication), + getByName: sinon.stub().returns(replication), + }; + const replicationJobRepository = { + performAsync: sinon.stub().resolves(replication), }; - const datawarehouseKnex = Symbol('datawarehouse-knex'); - const datamartKnex = Symbol('datamart-knex'); // when await replicate(request, h, { - extractTransformAndLoadData, replicationRepository, - datamartKnex, - datawarehouseKnex, + replicationJobRepository, }); // then - expect(extractTransformAndLoadData).to.have.been.calledWithExactly({ - replication, - datamartKnex, - datawarehouseKnex, - }); + expect(replicationRepository.getByName).to.have.been.calledOnceWithExactly(replicationName); + expect(replicationJobRepository.performAsync).to.have.been.calledOnceWithExactly( + new ReplicationJob({ replicationName }), + ); expect(codeStub).to.have.been.calledWithExactly(204); }); - context('when usecase throw an error', function () { - it('should log error', async function () { - // given - const replicationName = 'foo'; - const replication = Symbol('replication'); - const request = { - params: { - replicationName, - }, - query: { - async: false, - }, - }; - const codeStub = sinon.stub(); - const h = { - response: () => ({ - code: codeStub, - }), - }; - - const error = new Error('extract-error'); - const extractTransformAndLoadData = sinon.stub().rejects(error); - const logger = { - error: sinon.stub(), - }; - const replicationRepository = { - getByName: sinon.stub().withArgs(replicationName).returns(replication), - }; - - // when - await replicate(request, h, { extractTransformAndLoadData, replicationRepository, logger }); - - // then - expect(logger.error).to.have.been.calledWithExactly( - { - event: 'replication', - err: error, - }, - 'Error during replication', - ); - expect(codeStub).to.have.been.calledWithExactly(204); - }); - }); - context('when replication name is unknown', function () { it('should return 404 status code', async function () { // given @@ -107,18 +61,19 @@ describe('Maddo | Application | Unit | Controller | Replication', function () { code: codeStub, }), }; - const replicationRepository = { getByName: sinon.stub().withArgs(replicationName).returns(undefined), }; - const extractTransformAndLoadData = sinon.stub(); + const replicationJobRepository = { + performAsync: sinon.stub(), + }; // when - await replicate(request, h, { extractTransformAndLoadData, replicationRepository }); + await replicate(request, h, { replicationRepository, replicationJobRepository }); // then expect(codeStub).to.have.been.calledWithExactly(404); - expect(extractTransformAndLoadData).not.to.have.been.called; + expect(replicationJobRepository.performAsync).not.to.have.been.called; }); }); }); diff --git a/api/tests/maddo/domain/integration/usecases/extract-transform-and-load-data_test.js b/api/tests/maddo/domain/integration/usecases/extract-transform-and-load-data_test.js new file mode 100644 index 00000000000..5d2ba2b7c66 --- /dev/null +++ b/api/tests/maddo/domain/integration/usecases/extract-transform-and-load-data_test.js @@ -0,0 +1,55 @@ +import _ from 'lodash'; + +import { extractTransformAndLoadData } from '../../../../../src/maddo/domain/usecases/extract-transform-and-load-data.js'; +import { datamartKnex, datawarehouseKnex, expect } from '../../../../test-helper.js'; + +describe('Maddo | Domain | Usecases | Integration | extract-transform-and-load-data', function () { + beforeEach(async function () { + const schema = (t) => { + t.string('firstName').notNullable(); + t.string('lastName').notNullable(); + }; + await datawarehouseKnex.schema.dropTableIfExists('to-replicate'); + await datawarehouseKnex.schema.createTable('to-replicate', schema); + await datamartKnex.schema.dropTableIfExists('replication'); + await datamartKnex.schema.createTable('replication', schema); + }); + + it('should run given replication', async function () { + // given + const replication = { + name: 'my-replication', + before: async ({ datamartKnex }) => { + await datamartKnex('replication').delete(); + }, + from: ({ datawarehouseKnex }) => { + return datawarehouseKnex('to-replicate').select('*'); + }, + transform: ({ firstName, lastName }) => ({ + firstName: _.capitalize(firstName), + lastName: _.capitalize(lastName), + }), + to: ({ datamartKnex }, chunk) => { + return datamartKnex('replication').insert(chunk); + }, + }; + await datamartKnex('replication').insert([ + { firstName: 'oldfirst1', lastName: 'oldlast1' }, + { firstName: 'oldfirst2', lastName: 'oldlast2' }, + ]); + await datawarehouseKnex('to-replicate').insert([ + { firstName: 'first1', lastName: 'last1' }, + { firstName: 'first2', lastName: 'last2' }, + ]); + + // when + await extractTransformAndLoadData({ replication, datamartKnex, datawarehouseKnex }); + + // then + const results = await datamartKnex.select().from('replication').orderBy('firstName'); + expect(results).to.deep.equal([ + { firstName: 'First1', lastName: 'Last1' }, + { firstName: 'First2', lastName: 'Last2' }, + ]); + }); +}); diff --git a/api/tests/maddo/domain/unit/usecases/extract-transform-and-load-data_test.js b/api/tests/maddo/domain/unit/usecases/extract-transform-and-load-data_test.js index aa76a7ebe6a..ea1b43de9d4 100644 --- a/api/tests/maddo/domain/unit/usecases/extract-transform-and-load-data_test.js +++ b/api/tests/maddo/domain/unit/usecases/extract-transform-and-load-data_test.js @@ -2,6 +2,7 @@ import { extractTransformAndLoadData } from '../../../../../src/maddo/domain/use import { expect, sinon } from '../../../../test-helper.js'; describe('Maddo | Domain | Usecases | Unit | extract-transform-and-load-data', function () { + let replicationRepository; let fromQueryBuilder; let fromFunction; @@ -14,6 +15,9 @@ describe('Maddo | Domain | Usecases | Unit | extract-transform-and-load-data', f let datawarehouseKnex; beforeEach(function () { + replicationRepository = { + getByName: sinon.stub(), + }; fromQueryBuilder = { async *stream() { for (let i = 0; i < 5; i++) { @@ -40,19 +44,25 @@ describe('Maddo | Domain | Usecases | Unit | extract-transform-and-load-data', f it('should insert into database with given query', async function () { // given + const replicationName = 'foo'; + const replication = { + from: fromFunction, + to: toFunction, + chunkSize: 2, + }; + replicationRepository.getByName.returns(replication); // when await extractTransformAndLoadData({ - replication: { - from: fromFunction, - to: toFunction, - chunkSize: 2, - }, + replicationName, + replicationRepository, datamartKnex, datawarehouseKnex, }); // then + expect(replicationRepository.getByName).to.have.been.calledOnceWithExactly(replicationName); + expect(fromFunction).to.have.been.calledOnce; expect(fromFunction).to.have.been.calledWithExactly({ datawarehouseKnex, datamartKnex }); expect(fromFunction).to.have.been.calledBefore(toFunction); @@ -71,11 +81,16 @@ describe('Maddo | Domain | Usecases | Unit | extract-transform-and-load-data', f describe('when chunkSize are not provided', function () { it('should use default chunkSize', async function () { // given + const replication = { + from: fromFunction, + to: toFunction, + }; + replicationRepository.getByName.returns(replication); + + // when await extractTransformAndLoadData({ - replication: { - from: fromFunction, - to: toFunction, - }, + replicationName: 'foo', + replicationRepository, datamartKnex, datawarehouseKnex, }); @@ -89,14 +104,18 @@ describe('Maddo | Domain | Usecases | Unit | extract-transform-and-load-data', f // given const beforeFunction = sinon.stub().resolves(); + const replication = { + from: fromFunction, + before: beforeFunction, + to: toFunction, + chunkSize: 2, + }; + replicationRepository.getByName.returns(replication); + // when await extractTransformAndLoadData({ - replication: { - from: fromFunction, - before: beforeFunction, - to: toFunction, - chunkSize: 2, - }, + replicationName: 'foo', + replicationRepository, datamartKnex, datawarehouseKnex, }); @@ -115,14 +134,18 @@ describe('Maddo | Domain | Usecases | Unit | extract-transform-and-load-data', f // given const beforeFunction = sinon.stub().resolves({ foo: 'foo', bar: 'bar' }); + const replication = { + from: fromFunction, + to: toFunction, + before: beforeFunction, + chunkSize: 2, + }; + replicationRepository.getByName.returns(replication); + // when await extractTransformAndLoadData({ - replication: { - from: fromFunction, - to: toFunction, - before: beforeFunction, - chunkSize: 2, - }, + replicationName: 'foo', + replicationRepository, datamartKnex, datawarehouseKnex, }); @@ -160,15 +183,18 @@ describe('Maddo | Domain | Usecases | Unit | extract-transform-and-load-data', f it('should call it for each row', async function () { // given const transform = (row) => row + 1; + const replication = { + from: fromFunction, + to: toFunction, + transform, + chunkSize: 2, + }; + replicationRepository.getByName.returns(replication); // when await extractTransformAndLoadData({ - replication: { - from: fromFunction, - to: toFunction, - transform, - chunkSize: 2, - }, + replicationName: 'foo', + replicationRepository, datamartKnex, datawarehouseKnex, }); From b195044c4a93fbdfee57c0079ddff51ff7e02eaf Mon Sep 17 00:00:00 2001 From: Vincent Hardouin Date: Thu, 27 Feb 2025 16:16:32 +0100 Subject: [PATCH 03/10] feat(api): add replications for certifications_results MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Nicolas Lepage Co-authored-by: Guillaume Lagorce Co-authored-by: Aurélie Crouillebois --- ...create-table-sco_certifications_results.js | 48 +++++++++++++++++ ...2903_create-table-certification_results.js | 35 +++++++++++++ .../repositories/replication-repository.js | 51 ++++++++++++++++++- 3 files changed, 133 insertions(+), 1 deletion(-) create mode 100644 api/datamart/migrations/20250227142721_create-table-sco_certifications_results.js create mode 100644 api/datamart/migrations/20250227142903_create-table-certification_results.js diff --git a/api/datamart/migrations/20250227142721_create-table-sco_certifications_results.js b/api/datamart/migrations/20250227142721_create-table-sco_certifications_results.js new file mode 100644 index 00000000000..92d49fae0bf --- /dev/null +++ b/api/datamart/migrations/20250227142721_create-table-sco_certifications_results.js @@ -0,0 +1,48 @@ +const TABLE_NAME = 'sco_certification_results'; + +/** + * @param { import("knex").Knex } knex + * @returns { Promise } + */ +const up = async function (knex) { + await knex.schema.createTable(TABLE_NAME, function (table) { + table.string('national_student_id'); + table.index('national_student_id'); + table.string('organization_uai'); + table.string('last_name'); + table.string('first_name'); + table.date('birthdate'); + table.text('status'); + table.integer('pix_score'); + table.timestamp('certification_date'); + table.integer('competence_level'); + table.index('organization_uai'); + table.string('competence_name'); + table.string('competence_code'); + table.string('area_name'); + table.string('certification_courses_id'); + }); + await knex.schema.raw('drop collation if exists "sco_certification_results_case_accent_punctuation_insensitive"'); + await knex.schema.raw( + 'create collation "sco_certification_results_case_accent_punctuation_insensitive" (provider = icu, locale = "und-u-ka-shifted-ks-level1-kv-punct", deterministic = false);', + ); + + await knex.schema.raw( + 'ALTER TABLE :tableName: alter column first_name type varchar(255) COLLATE sco_certification_results_case_accent_punctuation_insensitive;', + { tableName: TABLE_NAME }, + ); + await knex.schema.raw( + 'ALTER TABLE :tableName: alter column last_name type varchar(255) COLLATE sco_certification_results_case_accent_punctuation_insensitive;', + { tableName: TABLE_NAME }, + ); +}; + +/** + * @param { import("knex").Knex } knex + * @returns { Promise } + */ +const down = async function (knex) { + await knex.schema.dropTable(TABLE_NAME); +}; + +export { down, up }; diff --git a/api/datamart/migrations/20250227142903_create-table-certification_results.js b/api/datamart/migrations/20250227142903_create-table-certification_results.js new file mode 100644 index 00000000000..3fafce4c1bc --- /dev/null +++ b/api/datamart/migrations/20250227142903_create-table-certification_results.js @@ -0,0 +1,35 @@ +const TABLE_NAME = 'certification_results'; + +/** + * @param { import("knex").Knex } knex + * @returns { Promise } + */ +const up = async function (knex) { + await knex.schema.createTable(TABLE_NAME, function (table) { + table.string('certification_code_verification'); + table.string('last_name'); + table.string('first_name'); + table.date('birthdate'); + table.text('status'); + table.integer('pix_score'); + table.timestamp('certification_date'); + table.integer('competence_level'); + table.index('certification_code_verification'); + table.string('competence_name'); + table.string('competence_code'); + table.string('area_name'); + table.string('certification_courses_id'); + table.string('national_student_id'); + table.string('organization_uai'); + }); +}; + +/** + * @param { import("knex").Knex } knex + * @returns { Promise } + */ +const down = async function (knex) { + await knex.schema.dropTable(TABLE_NAME); +}; + +export { down, up }; diff --git a/api/src/maddo/infrastructure/repositories/replication-repository.js b/api/src/maddo/infrastructure/repositories/replication-repository.js index b4f8eed0095..f1ad2964fe6 100644 --- a/api/src/maddo/infrastructure/repositories/replication-repository.js +++ b/api/src/maddo/infrastructure/repositories/replication-repository.js @@ -1,4 +1,53 @@ -export const replications = []; +export const replications = [ + { + name: 'sco_certification_results', + before: async ({ datamartKnex }) => { + await datamartKnex('sco_certification_results').delete(); + }, + from: ({ datawarehouseKnex }) => { + return datawarehouseKnex('data_export_parcoursup_certif_result').select('*'); + }, + to: ({ datamartKnex }, chunk) => { + return datamartKnex('sco_certification_results').insert(chunk); + }, + }, + { + name: 'data_export_parcoursup_certif_result', + before: async ({ datamartKnex }) => { + await datamartKnex('data_export_parcoursup_certif_result').delete(); + }, + from: ({ datawarehouseKnex }) => { + return datawarehouseKnex('data_export_parcoursup_certif_result').select('*'); + }, + to: ({ datamartKnex }, chunk) => { + return datamartKnex('data_export_parcoursup_certif_result').insert(chunk); + }, + }, + { + name: 'certification_results', + before: async ({ datamartKnex }) => { + await datamartKnex('certification_results').delete(); + }, + from: ({ datawarehouseKnex }) => { + return datawarehouseKnex('data_export_parcoursup_certif_result_code_validation').select('*'); + }, + to: ({ datamartKnex }, chunk) => { + return datamartKnex('certification_results').insert(chunk); + }, + }, + { + name: 'data_export_parcoursup_certif_result_code_validation', + before: async ({ datamartKnex }) => { + await datamartKnex('data_export_parcoursup_certif_result_code_validation').delete(); + }, + from: ({ datawarehouseKnex }) => { + return datawarehouseKnex('data_export_parcoursup_certif_result_code_validation').select('*'); + }, + to: ({ datamartKnex }, chunk) => { + return datamartKnex('data_export_parcoursup_certif_result_code_validation').insert(chunk); + }, + }, +]; export function getByName(name, dependencies = { replications }) { return dependencies.replications.find((replication) => replication.name === name); From 2c4a3781d5474d6a631d27d3f21ac6b344363aff Mon Sep 17 00:00:00 2001 From: Aurelie Crouillebois Date: Thu, 27 Feb 2025 16:56:47 +0100 Subject: [PATCH 04/10] feat(maddo): return line count at the end of the replication job --- .../jobs/replication-job-controller.js | 2 +- .../extract-transform-and-load-data.js | 3 +++ .../extract-transform-and-load-data_test.js | 20 +++++++++++++++---- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/api/src/maddo/application/jobs/replication-job-controller.js b/api/src/maddo/application/jobs/replication-job-controller.js index eb9ec5cd5b4..b66df92dd95 100644 --- a/api/src/maddo/application/jobs/replication-job-controller.js +++ b/api/src/maddo/application/jobs/replication-job-controller.js @@ -15,7 +15,7 @@ export class ReplicationJobController extends JobController { dependencies = { extractTransformAndLoadData, replicationRepository, datamartKnex, datawarehouseKnex }, }) { const { extractTransformAndLoadData, replicationRepository, datamartKnex, datawarehouseKnex } = dependencies; - await extractTransformAndLoadData({ + return extractTransformAndLoadData({ replicationName, replicationRepository, datamartKnex, diff --git a/api/src/maddo/domain/usecases/extract-transform-and-load-data.js b/api/src/maddo/domain/usecases/extract-transform-and-load-data.js index d40c323ddf0..5241aa62de7 100644 --- a/api/src/maddo/domain/usecases/extract-transform-and-load-data.js +++ b/api/src/maddo/domain/usecases/extract-transform-and-load-data.js @@ -16,11 +16,13 @@ export async function extractTransformAndLoadData({ const queryBuilder = replication.from(context); let chunk = []; + let count = 0; const chunkSize = replication.chunkSize ?? DEFAULT_CHUNK_SIZE; const connection = await datamartKnex.context.client.acquireConnection(); try { for await (const data of queryBuilder.stream()) { chunk.push(replication.transform?.(data) ?? data); + count += 1; if (chunk.length === chunkSize) { await replication.to(context, chunk).connection(connection); chunk = []; @@ -30,6 +32,7 @@ export async function extractTransformAndLoadData({ if (chunk.length) { await replication.to(context, chunk).connection(connection); } + return { count }; } finally { await datamartKnex.context.client.releaseConnection(connection); } diff --git a/api/tests/maddo/domain/integration/usecases/extract-transform-and-load-data_test.js b/api/tests/maddo/domain/integration/usecases/extract-transform-and-load-data_test.js index 5d2ba2b7c66..be28e1afe38 100644 --- a/api/tests/maddo/domain/integration/usecases/extract-transform-and-load-data_test.js +++ b/api/tests/maddo/domain/integration/usecases/extract-transform-and-load-data_test.js @@ -1,6 +1,7 @@ import _ from 'lodash'; import { extractTransformAndLoadData } from '../../../../../src/maddo/domain/usecases/extract-transform-and-load-data.js'; +import * as replicationRepository from '../../../../../src/maddo/infrastructure/repositories/replication-repository.js'; import { datamartKnex, datawarehouseKnex, expect } from '../../../../test-helper.js'; describe('Maddo | Domain | Usecases | Integration | extract-transform-and-load-data', function () { @@ -17,8 +18,9 @@ describe('Maddo | Domain | Usecases | Integration | extract-transform-and-load-d it('should run given replication', async function () { // given + const replicationName = 'my-replication'; const replication = { - name: 'my-replication', + name: replicationName, before: async ({ datamartKnex }) => { await datamartKnex('replication').delete(); }, @@ -33,6 +35,9 @@ describe('Maddo | Domain | Usecases | Integration | extract-transform-and-load-d return datamartKnex('replication').insert(chunk); }, }; + + replicationRepository.replications.push(replication); + await datamartKnex('replication').insert([ { firstName: 'oldfirst1', lastName: 'oldlast1' }, { firstName: 'oldfirst2', lastName: 'oldlast2' }, @@ -43,11 +48,18 @@ describe('Maddo | Domain | Usecases | Integration | extract-transform-and-load-d ]); // when - await extractTransformAndLoadData({ replication, datamartKnex, datawarehouseKnex }); + const result = await extractTransformAndLoadData({ + replicationName, + replicationRepository, + datamartKnex, + datawarehouseKnex, + }); // then - const results = await datamartKnex.select().from('replication').orderBy('firstName'); - expect(results).to.deep.equal([ + expect(result).to.deep.equal({ count: 2 }); + + const replicatedData = await datamartKnex.select().from('replication').orderBy('firstName'); + expect(replicatedData).to.deep.equal([ { firstName: 'First1', lastName: 'Last1' }, { firstName: 'First2', lastName: 'Last2' }, ]); From 3f7da9ee6cd1ecd3f286f3191cdccfbf7e49c41f Mon Sep 17 00:00:00 2001 From: Guillaume Lagorce Date: Thu, 27 Feb 2025 17:10:56 +0100 Subject: [PATCH 05/10] chore: close db connections when worker is stopped Co-authored-by: Nicolas Lepage --- api/worker.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api/worker.js b/api/worker.js index d6826b653fd..4b2e52047d3 100644 --- a/api/worker.js +++ b/api/worker.js @@ -7,6 +7,7 @@ import { glob } from 'glob'; import _ from 'lodash'; import PgBoss from 'pg-boss'; +import { databaseConnections } from './db/database-connections.js'; import { Metrics } from './src/monitoring/infrastructure/metrics.js'; import { JobGroup } from './src/shared/application/jobs/job-controller.js'; import { config } from './src/shared/config.js'; @@ -52,6 +53,7 @@ function createJobQueues(pgBoss) { await quitAllStorages(); await metrics.clearMetrics(); await jobQueues.stop(); + await databaseConnections.disconnect(); // Make sure pgBoss stopped before quitting pgBoss.on('stopped', () => { From 2fcd7474f7eceb6f390d7f0c32a9e71d5fba985e Mon Sep 17 00:00:00 2001 From: Nicolas Lepage <19571875+nlepage@users.noreply.github.com> Date: Fri, 28 Feb 2025 09:39:28 +0100 Subject: [PATCH 06/10] refactor: renames maddo.js to index.maddo.js MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Aurélie Crouillebois Co-authored-by: Vincent Hardouin --- api/Procfile-maddo | 2 +- api/{maddo.js => index.maddo.js} | 0 api/package.json | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) rename api/{maddo.js => index.maddo.js} (100%) diff --git a/api/Procfile-maddo b/api/Procfile-maddo index 53a4598536f..de0d198f439 100644 --- a/api/Procfile-maddo +++ b/api/Procfile-maddo @@ -4,5 +4,5 @@ postdeploy: npm run postdeploy:maddo # see https://github.com/1024pix/pix/pull/796 # and https://github.com/npm/npm/issues/4603 # for more information -web: exec node maddo.js +web: exec node index.maddo.js worker: exec node worker.js maddo diff --git a/api/maddo.js b/api/index.maddo.js similarity index 100% rename from api/maddo.js rename to api/index.maddo.js diff --git a/api/package.json b/api/package.json index ffc2df071c2..7535344a0af 100644 --- a/api/package.json +++ b/api/package.json @@ -172,7 +172,7 @@ "scalingo-postbuild": "node scripts/generate-cron > cron.json && node scripts/generate-procfile", "dev": "nodemon index.js", "start": "node index.js", - "start:maddo": "MADDO=true node maddo.js", + "start:maddo": "MADDO=true node index.maddo.js", "start:watch": "npm run dev", "start:job": "node worker.js", "start:job:fast": "node worker.js fast", From 4b52bdc80ab7b7dc56bb2ae13e7e6ea20d4a0117 Mon Sep 17 00:00:00 2001 From: Nicolas Lepage <19571875+nlepage@users.noreply.github.com> Date: Fri, 28 Feb 2025 10:04:38 +0100 Subject: [PATCH 07/10] refactor: inverts control in worker.js MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Aurélie Crouillebois Co-authored-by: Vincent Hardouin --- api/index.js | 4 +- api/src/monitoring/infrastructure/metrics.js | 4 +- api/tests/unit/worker_test.js | 22 +++++----- api/worker.js | 45 +++++++++++--------- 4 files changed, 43 insertions(+), 32 deletions(-) diff --git a/api/index.js b/api/index.js index c1869e15a72..34ffa198d08 100644 --- a/api/index.js +++ b/api/index.js @@ -3,12 +3,14 @@ import 'dotenv/config'; import { databaseConnections } from './db/database-connections.js'; import { databaseConnection as liveDatabaseConnection } from './db/knex-database-connection.js'; import { createServer } from './server.js'; +import { JobGroup } from './src/shared/application/jobs/job-controller.js'; import { config, schema as configSchema } from './src/shared/config.js'; import { learningContentCache } from './src/shared/infrastructure/caches/learning-content-cache.js'; import { quitAllStorages } from './src/shared/infrastructure/key-value-storages/index.js'; import { logger } from './src/shared/infrastructure/utils/logger.js'; import { redisMonitor } from './src/shared/infrastructure/utils/redis-monitor.js'; import { validateEnvironmentVariables } from './src/shared/infrastructure/validate-environment-variables.js'; +import { registerJobs } from './worker.js'; validateEnvironmentVariables(configSchema); @@ -62,7 +64,7 @@ process.on('SIGINT', () => { try { await start(); if (config.infra.startJobInWebProcess) { - import('./worker.js'); + registerJobs({ jobGroup: JobGroup.DEFAULT }); } } catch (error) { logger.error(error); diff --git a/api/src/monitoring/infrastructure/metrics.js b/api/src/monitoring/infrastructure/metrics.js index 833664dd820..36388d46b43 100644 --- a/api/src/monitoring/infrastructure/metrics.js +++ b/api/src/monitoring/infrastructure/metrics.js @@ -1,6 +1,8 @@ import metrics from 'datadog-metrics'; -import { logger } from '../../shared/infrastructure/utils/logger.js'; +import { child } from '../../shared/infrastructure/utils/logger.js'; + +const logger = child('metrics', { event: 'metrics' }); export class Metrics { static intervals = []; diff --git a/api/tests/unit/worker_test.js b/api/tests/unit/worker_test.js index 404efa7eb99..e885793bd10 100644 --- a/api/tests/unit/worker_test.js +++ b/api/tests/unit/worker_test.js @@ -11,15 +11,15 @@ import { registerJobs } from '../../worker.js'; import { catchErr, expect, sinon } from '../test-helper.js'; describe('#registerJobs', function () { - let startPgBossStub, createJobQueuesStub, jobQueueStub; + let startPgBossStub, createJobQueueStub, jobQueueStub; beforeEach(function () { const pgBossStub = Symbol('pgBoss'); jobQueueStub = { register: sinon.stub(), scheduleCronJob: sinon.stub(), unscheduleCronJob: sinon.stub() }; startPgBossStub = sinon.stub(); startPgBossStub.resolves(pgBossStub); - createJobQueuesStub = sinon.stub(); - createJobQueuesStub.withArgs(pgBossStub).returns(jobQueueStub); + createJobQueueStub = sinon.stub(); + createJobQueueStub.withArgs(pgBossStub).returns(jobQueueStub); }); afterEach(function () { @@ -32,7 +32,7 @@ describe('#registerJobs', function () { jobGroup: JobGroup.DEFAULT, dependencies: { startPgBoss: startPgBossStub, - createJobQueues: createJobQueuesStub, + createJobQueue: createJobQueueStub, }, }); @@ -53,7 +53,7 @@ describe('#registerJobs', function () { jobGroup: JobGroup.DEFAULT, dependencies: { startPgBoss: startPgBossStub, - createJobQueues: createJobQueuesStub, + createJobQueue: createJobQueueStub, }, }); @@ -74,7 +74,7 @@ describe('#registerJobs', function () { jobGroup: JobGroup.DEFAULT, dependencies: { startPgBoss: startPgBossStub, - createJobQueues: createJobQueuesStub, + createJobQueue: createJobQueueStub, }, }); @@ -95,7 +95,7 @@ describe('#registerJobs', function () { jobGroup: JobGroup.DEFAULT, dependencies: { startPgBoss: startPgBossStub, - createJobQueues: createJobQueuesStub, + createJobQueue: createJobQueueStub, }, }); @@ -111,7 +111,7 @@ describe('#registerJobs', function () { const error = await catchErr(registerJobs)({ dependencies: { startPgBoss: startPgBossStub, - createJobQueues: createJobQueuesStub, + createJobQueue: createJobQueueStub, }, }); @@ -129,7 +129,7 @@ describe('#registerJobs', function () { jobGroup: JobGroup.DEFAULT, dependencies: { startPgBoss: startPgBossStub, - createJobQueues: createJobQueuesStub, + createJobQueue: createJobQueueStub, }, }); @@ -153,7 +153,7 @@ describe('#registerJobs', function () { jobGroup: JobGroup.DEFAULT, dependencies: { startPgBoss: startPgBossStub, - createJobQueues: createJobQueuesStub, + createJobQueue: createJobQueueStub, }, }); @@ -173,7 +173,7 @@ describe('#registerJobs', function () { jobGroup: JobGroup.DEFAULT, dependencies: { startPgBoss: startPgBossStub, - createJobQueues: createJobQueuesStub, + createJobQueue: createJobQueueStub, }, }); diff --git a/api/worker.js b/api/worker.js index 4b2e52047d3..773b50fe20e 100644 --- a/api/worker.js +++ b/api/worker.js @@ -11,12 +11,14 @@ import { databaseConnections } from './db/database-connections.js'; import { Metrics } from './src/monitoring/infrastructure/metrics.js'; import { JobGroup } from './src/shared/application/jobs/job-controller.js'; import { config } from './src/shared/config.js'; +import { learningContentCache } from './src/shared/infrastructure/caches/learning-content-cache.js'; import { JobQueue } from './src/shared/infrastructure/jobs/JobQueue.js'; import { quitAllStorages } from './src/shared/infrastructure/key-value-storages/index.js'; import { importNamedExportFromFile } from './src/shared/infrastructure/utils/import-named-exports-from-directory.js'; -import { logger } from './src/shared/infrastructure/utils/logger.js'; +import { child } from './src/shared/infrastructure/utils/logger.js'; + +const logger = child('worker', { event: 'worker' }); -const isTestEnv = process.env.NODE_ENV === 'test'; const isJobInWebProcess = process.env.START_JOB_IN_WEB_PROCESS === 'true'; const workerDirPath = dirname(fileURLToPath(import.meta.url)); @@ -47,36 +49,27 @@ async function startPgBoss() { return pgBoss; } -function createJobQueues(pgBoss) { - const jobQueues = new JobQueue(pgBoss); +function createJobQueue(pgBoss) { + const jobQueue = new JobQueue(pgBoss); process.on('SIGINT', async () => { - await quitAllStorages(); - await metrics.clearMetrics(); - await jobQueues.stop(); - await databaseConnections.disconnect(); - - // Make sure pgBoss stopped before quitting - pgBoss.on('stopped', () => { - // eslint-disable-next-line n/no-process-exit - process.exit(0); - }); + await jobQueue.stop(); }); - return jobQueues; + return jobQueue; } function checkJobGroup(jobGroup) { - if (!jobGroup) { + if (!Object.values(JobGroup).includes(jobGroup)) { throw new Error(`Job group invalid, allowed Job groups are [${Object.values(JobGroup)}]`); } logger.info(`Job group "${jobGroup}"`); } -export async function registerJobs({ jobGroup, dependencies = { startPgBoss, createJobQueues } }) { +export async function registerJobs({ jobGroup, dependencies = { startPgBoss, createJobQueue } }) { checkJobGroup(jobGroup); const pgBoss = await dependencies.startPgBoss(); - const jobQueues = dependencies.createJobQueues(pgBoss); + const jobQueues = dependencies.createJobQueue(pgBoss); const globPattern = `${workerDirPath}/src/**/application/**/*job-controller.js`; @@ -138,7 +131,21 @@ export async function registerJobs({ jobGroup, dependencies = { startPgBoss, cre logger.info(`${cronJobCount} cron jobs scheduled for group "${jobGroup}".`); } -if (!isTestEnv) { +const isRunningFromCli = import.meta.filename === process.argv[1]; + +async function main() { const jobGroup = process.argv[2] ? JobGroup[process.argv[2]?.toUpperCase()] : JobGroup.DEFAULT; await registerJobs({ jobGroup }); + process.on('SIGINT', async () => { + await quitAllStorages(); + await metrics.clearMetrics(); + await databaseConnections.disconnect(); + await learningContentCache.quit(); + }); +} + +if (isRunningFromCli) { + main().catch((err) => { + logger.error({ err }, 'worker crashed'); + }); } From 5d4cc4a4de9d5c8e8dcc19454f06f83fb7a64753 Mon Sep 17 00:00:00 2001 From: Nicolas Lepage <19571875+nlepage@users.noreply.github.com> Date: Fri, 28 Feb 2025 10:17:24 +0100 Subject: [PATCH 08/10] feat(api): process maddo pgboss jobs in maddo server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Aurélie Crouillebois Co-authored-by: Vincent Hardouin --- api/index.js | 2 +- api/index.maddo.js | 7 ++++++- api/tests/unit/worker_test.js | 29 ++++++++++++++++++++++------- api/worker.js | 19 +++++++++++-------- 4 files changed, 40 insertions(+), 17 deletions(-) diff --git a/api/index.js b/api/index.js index 34ffa198d08..7747edb29d0 100644 --- a/api/index.js +++ b/api/index.js @@ -64,7 +64,7 @@ process.on('SIGINT', () => { try { await start(); if (config.infra.startJobInWebProcess) { - registerJobs({ jobGroup: JobGroup.DEFAULT }); + registerJobs({ jobGroups: [JobGroup.DEFAULT, JobGroup.FAST] }); } } catch (error) { logger.error(error); diff --git a/api/index.maddo.js b/api/index.maddo.js index c2f9691fe37..9df2b8663d3 100644 --- a/api/index.maddo.js +++ b/api/index.maddo.js @@ -2,11 +2,13 @@ import 'dotenv/config'; import { databaseConnections } from './db/database-connections.js'; import { createMaddoServer } from './server.maddo.js'; -import { schema as configSchema } from './src/shared/config.js'; +import { JobGroup } from './src/shared/application/jobs/job-controller.js'; +import { config, schema as configSchema } from './src/shared/config.js'; import { quitAllStorages } from './src/shared/infrastructure/key-value-storages/index.js'; import { logger } from './src/shared/infrastructure/utils/logger.js'; import { redisMonitor } from './src/shared/infrastructure/utils/redis-monitor.js'; import { validateEnvironmentVariables } from './src/shared/infrastructure/validate-environment-variables.js'; +import { registerJobs } from './worker.js'; validateEnvironmentVariables(configSchema); @@ -45,6 +47,9 @@ process.on('SIGINT', () => { (async () => { try { await start(); + if (config.infra.startJobInWebProcess) { + registerJobs({ jobGroups: [JobGroup.MADDO] }); + } } catch (error) { logger.error(error); throw error; diff --git a/api/tests/unit/worker_test.js b/api/tests/unit/worker_test.js index e885793bd10..cd066432292 100644 --- a/api/tests/unit/worker_test.js +++ b/api/tests/unit/worker_test.js @@ -29,7 +29,7 @@ describe('#registerJobs', function () { it('should register UserAnonymizedEventLoggingJob', async function () { // when await registerJobs({ - jobGroup: JobGroup.DEFAULT, + jobGroups: [JobGroup.DEFAULT], dependencies: { startPgBoss: startPgBossStub, createJobQueue: createJobQueueStub, @@ -50,7 +50,7 @@ describe('#registerJobs', function () { .stub(UserAnonymizedEventLoggingJobController.prototype, 'legacyName') .get(() => 'legyNameForUserAnonymizedEventLoggingJobController'); await registerJobs({ - jobGroup: JobGroup.DEFAULT, + jobGroups: [JobGroup.DEFAULT], dependencies: { startPgBoss: startPgBossStub, createJobQueue: createJobQueueStub, @@ -71,7 +71,7 @@ describe('#registerJobs', function () { // when await registerJobs({ - jobGroup: JobGroup.DEFAULT, + jobGroups: [JobGroup.DEFAULT], dependencies: { startPgBoss: startPgBossStub, createJobQueue: createJobQueueStub, @@ -92,7 +92,7 @@ describe('#registerJobs', function () { // when await registerJobs({ - jobGroup: JobGroup.DEFAULT, + jobGroups: [JobGroup.DEFAULT], dependencies: { startPgBoss: startPgBossStub, createJobQueue: createJobQueueStub, @@ -106,9 +106,24 @@ describe('#registerJobs', function () { ); }); + it('should throws an error when no groups is invalid', async function () { + // given + const error = await catchErr(registerJobs)({ + dependencies: { + startPgBoss: startPgBossStub, + createJobQueue: createJobQueueStub, + }, + }); + + // then + expect(error).to.be.instanceOf(Error); + expect(error.message).to.equal('Job groups are mandatory'); + }); + it('should throws an error when group is invalid', async function () { // given const error = await catchErr(registerJobs)({ + jobGroups: ['pouet'], dependencies: { startPgBoss: startPgBossStub, createJobQueue: createJobQueueStub, @@ -126,7 +141,7 @@ describe('#registerJobs', function () { sinon.stub(config.features.scheduleComputeOrganizationLearnersCertificability, 'cron').value('0 21 * * *'); await registerJobs({ - jobGroup: JobGroup.DEFAULT, + jobGroups: [JobGroup.DEFAULT], dependencies: { startPgBoss: startPgBossStub, createJobQueue: createJobQueueStub, @@ -150,7 +165,7 @@ describe('#registerJobs', function () { sinon.stub(config.features.scheduleComputeOrganizationLearnersCertificability, 'cron').value('0 21 * * *'); await registerJobs({ - jobGroup: JobGroup.DEFAULT, + jobGroups: [JobGroup.DEFAULT], dependencies: { startPgBoss: startPgBossStub, createJobQueue: createJobQueueStub, @@ -170,7 +185,7 @@ describe('#registerJobs', function () { sinon.stub(config.pgBoss, 'exportSenderJobEnabled').value(false); await registerJobs({ - jobGroup: JobGroup.DEFAULT, + jobGroups: [JobGroup.DEFAULT], dependencies: { startPgBoss: startPgBossStub, createJobQueue: createJobQueueStub, diff --git a/api/worker.js b/api/worker.js index 773b50fe20e..c9b65062036 100644 --- a/api/worker.js +++ b/api/worker.js @@ -19,7 +19,6 @@ import { child } from './src/shared/infrastructure/utils/logger.js'; const logger = child('worker', { event: 'worker' }); -const isJobInWebProcess = process.env.START_JOB_IN_WEB_PROCESS === 'true'; const workerDirPath = dirname(fileURLToPath(import.meta.url)); const metrics = new Metrics({ config }); @@ -57,15 +56,19 @@ function createJobQueue(pgBoss) { return jobQueue; } +function checkJobGroups(jobGroups) { + if (!jobGroups) throw new Error('Job groups are mandatory'); + jobGroups.forEach((jobGroup) => checkJobGroup(jobGroup)); +} + function checkJobGroup(jobGroup) { if (!Object.values(JobGroup).includes(jobGroup)) { throw new Error(`Job group invalid, allowed Job groups are [${Object.values(JobGroup)}]`); } - logger.info(`Job group "${jobGroup}"`); } -export async function registerJobs({ jobGroup, dependencies = { startPgBoss, createJobQueue } }) { - checkJobGroup(jobGroup); +export async function registerJobs({ jobGroups, dependencies = { startPgBoss, createJobQueue } }) { + checkJobGroups(jobGroups); const pgBoss = await dependencies.startPgBoss(); @@ -88,7 +91,7 @@ export async function registerJobs({ jobGroup, dependencies = { startPgBoss, cre for (const [moduleName, ModuleClass] of Object.entries(jobModules)) { const job = new ModuleClass(); - if (!isJobInWebProcess && job.jobGroup !== jobGroup) continue; + if (!jobGroups.includes(job.jobGroup)) continue; if (job.isJobEnabled) { logger.info(`Job "${job.jobName}" registered from module "${moduleName}."`); @@ -127,15 +130,15 @@ export async function registerJobs({ jobGroup, dependencies = { startPgBoss, cre } } - logger.info(`${jobRegisteredCount} jobs registered for group "${jobGroup}".`); - logger.info(`${cronJobCount} cron jobs scheduled for group "${jobGroup}".`); + logger.info(`${jobRegisteredCount} jobs registered for groups "${jobGroups}".`); + logger.info(`${cronJobCount} cron jobs scheduled for groups "${jobGroups}".`); } const isRunningFromCli = import.meta.filename === process.argv[1]; async function main() { const jobGroup = process.argv[2] ? JobGroup[process.argv[2]?.toUpperCase()] : JobGroup.DEFAULT; - await registerJobs({ jobGroup }); + await registerJobs({ jobGroups: [jobGroup] }); process.on('SIGINT', async () => { await quitAllStorages(); await metrics.clearMetrics(); From 7680ef3598cf43541fb59f66feb085b242eeecdc Mon Sep 17 00:00:00 2001 From: Nicolas Lepage <19571875+nlepage@users.noreply.github.com> Date: Fri, 28 Feb 2025 10:25:40 +0100 Subject: [PATCH 09/10] feat(api): validate environment variables in workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Aurélie Crouillebois Co-authored-by: Vincent Hardouin --- api/worker.js | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/api/worker.js b/api/worker.js index c9b65062036..da786d7afbf 100644 --- a/api/worker.js +++ b/api/worker.js @@ -10,12 +10,13 @@ import PgBoss from 'pg-boss'; import { databaseConnections } from './db/database-connections.js'; import { Metrics } from './src/monitoring/infrastructure/metrics.js'; import { JobGroup } from './src/shared/application/jobs/job-controller.js'; -import { config } from './src/shared/config.js'; +import { config, schema as configSchema } from './src/shared/config.js'; import { learningContentCache } from './src/shared/infrastructure/caches/learning-content-cache.js'; import { JobQueue } from './src/shared/infrastructure/jobs/JobQueue.js'; import { quitAllStorages } from './src/shared/infrastructure/key-value-storages/index.js'; import { importNamedExportFromFile } from './src/shared/infrastructure/utils/import-named-exports-from-directory.js'; import { child } from './src/shared/infrastructure/utils/logger.js'; +import { validateEnvironmentVariables } from './src/shared/infrastructure/validate-environment-variables.js'; const logger = child('worker', { event: 'worker' }); @@ -137,8 +138,11 @@ export async function registerJobs({ jobGroups, dependencies = { startPgBoss, cr const isRunningFromCli = import.meta.filename === process.argv[1]; async function main() { + validateEnvironmentVariables(configSchema); + const jobGroup = process.argv[2] ? JobGroup[process.argv[2]?.toUpperCase()] : JobGroup.DEFAULT; await registerJobs({ jobGroups: [jobGroup] }); + process.on('SIGINT', async () => { await quitAllStorages(); await metrics.clearMetrics(); @@ -150,5 +154,6 @@ async function main() { if (isRunningFromCli) { main().catch((err) => { logger.error({ err }, 'worker crashed'); + process.exit(1); // eslint-disable-line n/no-process-exit }); } From 1102e8bbdd734472cc375304d6e75b1271ce9aa8 Mon Sep 17 00:00:00 2001 From: Nicolas Lepage <19571875+nlepage@users.noreply.github.com> Date: Fri, 28 Feb 2025 11:31:27 +0100 Subject: [PATCH 10/10] chore: ignores api/datamart/migrations in lslint --- api/.ls-lint.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/api/.ls-lint.yml b/api/.ls-lint.yml index 0c0fa652571..cdf49044a5c 100644 --- a/api/.ls-lint.yml +++ b/api/.ls-lint.yml @@ -29,6 +29,7 @@ ls: ignore: - .git - node_modules + - datamart/migrations - db/migrations - db/seeds - scripts