Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TECH] Enregistrer l'historique des réplications des données froides (PIX-16710)(PIX-16712). #11527

Merged
merged 10 commits into from
Feb 28, 2025
Merged
1 change: 1 addition & 0 deletions api/.ls-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ ls:
ignore:
- .git
- node_modules
- datamart/migrations
- db/migrations
- db/seeds
- scripts
Expand Down
3 changes: 2 additions & 1 deletion api/Procfile-maddo
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ postdeploy: npm run postdeploy:maddo
# see https://github.com/1024pix/pix/pull/796
# and https://github.com/npm/npm/issues/4603
# for more information
web: exec node maddo.js
web: exec node index.maddo.js
worker: exec node worker.js maddo
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
const TABLE_NAME = 'sco_certification_results';

/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
const up = async function (knex) {
await knex.schema.createTable(TABLE_NAME, function (table) {
table.string('national_student_id');
table.index('national_student_id');
table.string('organization_uai');
table.string('last_name');
table.string('first_name');
table.date('birthdate');
table.text('status');
table.integer('pix_score');
table.timestamp('certification_date');
table.integer('competence_level');
table.index('organization_uai');
table.string('competence_name');
table.string('competence_code');
table.string('area_name');
table.string('certification_courses_id');
});
await knex.schema.raw('drop collation if exists "sco_certification_results_case_accent_punctuation_insensitive"');
await knex.schema.raw(
'create collation "sco_certification_results_case_accent_punctuation_insensitive" (provider = icu, locale = "und-u-ka-shifted-ks-level1-kv-punct", deterministic = false);',
);

await knex.schema.raw(
'ALTER TABLE :tableName: alter column first_name type varchar(255) COLLATE sco_certification_results_case_accent_punctuation_insensitive;',
{ tableName: TABLE_NAME },
);
await knex.schema.raw(
'ALTER TABLE :tableName: alter column last_name type varchar(255) COLLATE sco_certification_results_case_accent_punctuation_insensitive;',
{ tableName: TABLE_NAME },
);
};

/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
const down = async function (knex) {
await knex.schema.dropTable(TABLE_NAME);
};

export { down, up };
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
const TABLE_NAME = 'certification_results';

/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
const up = async function (knex) {
await knex.schema.createTable(TABLE_NAME, function (table) {
table.string('certification_code_verification');
table.string('last_name');
table.string('first_name');
table.date('birthdate');
table.text('status');
table.integer('pix_score');
table.timestamp('certification_date');
table.integer('competence_level');
table.index('certification_code_verification');
table.string('competence_name');
table.string('competence_code');
table.string('area_name');
table.string('certification_courses_id');
table.string('national_student_id');
table.string('organization_uai');
});
};

/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
const down = async function (knex) {
await knex.schema.dropTable(TABLE_NAME);
};

export { down, up };
2 changes: 1 addition & 1 deletion api/db/seeds/data/common/common-builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ function createClientApplications(databaseBuilder) {
name: 'pixData',
clientId: 'pixData',
clientSecret: 'pixdatasecret',
scopes: ['statistics'],
scopes: ['statistics', 'replication'],
});
databaseBuilder.factory.buildClientApplication({
name: 'parcoursup',
Expand Down
4 changes: 3 additions & 1 deletion api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ import 'dotenv/config';
import { databaseConnections } from './db/database-connections.js';
import { databaseConnection as liveDatabaseConnection } from './db/knex-database-connection.js';
import { createServer } from './server.js';
import { JobGroup } from './src/shared/application/jobs/job-controller.js';
import { config, schema as configSchema } from './src/shared/config.js';
import { learningContentCache } from './src/shared/infrastructure/caches/learning-content-cache.js';
import { quitAllStorages } from './src/shared/infrastructure/key-value-storages/index.js';
import { logger } from './src/shared/infrastructure/utils/logger.js';
import { redisMonitor } from './src/shared/infrastructure/utils/redis-monitor.js';
import { validateEnvironmentVariables } from './src/shared/infrastructure/validate-environment-variables.js';
import { registerJobs } from './worker.js';

validateEnvironmentVariables(configSchema);

Expand Down Expand Up @@ -62,7 +64,7 @@ process.on('SIGINT', () => {
try {
await start();
if (config.infra.startJobInWebProcess) {
import('./worker.js');
registerJobs({ jobGroups: [JobGroup.DEFAULT, JobGroup.FAST] });
}
} catch (error) {
logger.error(error);
Expand Down
7 changes: 6 additions & 1 deletion api/maddo.js → api/index.maddo.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ import 'dotenv/config';

import { databaseConnections } from './db/database-connections.js';
import { createMaddoServer } from './server.maddo.js';
import { schema as configSchema } from './src/shared/config.js';
import { JobGroup } from './src/shared/application/jobs/job-controller.js';
import { config, schema as configSchema } from './src/shared/config.js';
import { quitAllStorages } from './src/shared/infrastructure/key-value-storages/index.js';
import { logger } from './src/shared/infrastructure/utils/logger.js';
import { redisMonitor } from './src/shared/infrastructure/utils/redis-monitor.js';
import { validateEnvironmentVariables } from './src/shared/infrastructure/validate-environment-variables.js';
import { registerJobs } from './worker.js';

validateEnvironmentVariables(configSchema);

Expand Down Expand Up @@ -45,6 +47,9 @@ process.on('SIGINT', () => {
(async () => {
try {
await start();
if (config.infra.startJobInWebProcess) {
registerJobs({ jobGroups: [JobGroup.MADDO] });
}
} catch (error) {
logger.error(error);
throw error;
Expand Down
2 changes: 1 addition & 1 deletion api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@
"scalingo-postbuild": "node scripts/generate-cron > cron.json && node scripts/generate-procfile",
"dev": "nodemon index.js",
"start": "node index.js",
"start:maddo": "MADDO=true node maddo.js",
"start:maddo": "MADDO=true node index.maddo.js",
"start:watch": "npm run dev",
"start:job": "node worker.js",
"start:job:fast": "node worker.js fast",
Expand Down
3 changes: 2 additions & 1 deletion api/server.maddo.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { parse } from 'neoqs';

import { setupErrorHandling } from './config/server-setup-error-handling.js';
import { knex } from './db/knex-database-connection.js';
import * as authenticationRoutes from './lib/application/authentication/index.js';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A revoir un de ces quatre pour éviter d'importer toutes les routes d'authent inutilement

import { authentication } from './lib/infrastructure/authentication.js';
import * as replicationRoutes from './src/maddo/application/replications-routes.js';
import { Metrics } from './src/monitoring/infrastructure/metrics.js';
Expand Down Expand Up @@ -178,7 +179,7 @@ const setupAuthentication = function (server) {
};

const setupRoutesAndPlugins = async function (server) {
await server.register([...plugins, healthcheckRoutes, replicationRoutes]);
await server.register([...plugins, healthcheckRoutes, authenticationRoutes, replicationRoutes]);
};

const setupOpenApiSpecification = async function (server) {
Expand Down
25 changes: 25 additions & 0 deletions api/src/maddo/application/jobs/replication-job-controller.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { knex as datamartKnex } from '../../../../datamart/knex-database-connection.js';
import { knex as datawarehouseKnex } from '../../../../datawarehouse/knex-database-connection.js';
import { JobController, JobGroup } from '../../../shared/application/jobs/job-controller.js';
import { ReplicationJob } from '../../domain/models/ReplicationJob.js';
import { extractTransformAndLoadData } from '../../domain/usecases/extract-transform-and-load-data.js';
import * as replicationRepository from '../../infrastructure/repositories/replication-repository.js';

export class ReplicationJobController extends JobController {
constructor() {
super(ReplicationJob.name, { jobGroup: JobGroup.MADDO });
}

async handle({
data: { replicationName },
dependencies = { extractTransformAndLoadData, replicationRepository, datamartKnex, datawarehouseKnex },
}) {
const { extractTransformAndLoadData, replicationRepository, datamartKnex, datawarehouseKnex } = dependencies;
return extractTransformAndLoadData({
replicationName,
replicationRepository,
datamartKnex,
datawarehouseKnex,
});
}
}
34 changes: 6 additions & 28 deletions api/src/maddo/application/replications-controller.js
Original file line number Diff line number Diff line change
@@ -1,47 +1,25 @@
import { knex as datamartKnex } from '../../../datamart/knex-database-connection.js';
import { knex as datawarehouseKnex } from '../../../datawarehouse/knex-database-connection.js';
import { logger } from '../../shared/infrastructure/utils/logger.js';
import { extractTransformAndLoadData } from '../domain/usecases/extract-transform-and-load-data.js';
import { ReplicationJob } from '../domain/models/ReplicationJob.js';
import { replicationJobRepository } from '../infrastructure/repositories/jobs/replication-job-repository.js';
import * as replicationRepository from '../infrastructure/repositories/replication-repository.js';

export async function replicate(
request,
h,
dependencies = {
extractTransformAndLoadData,
replicationRepository,
datamartKnex,
datawarehouseKnex,
logger,
replicationJobRepository,
},
) {
const { replicationRepository, replicationJobRepository } = dependencies;
const { replicationName } = request.params;

const replication = dependencies.replicationRepository.getByName(replicationName);
const replication = replicationRepository.getByName(replicationName);

if (!replication) {
return h.response().code(404);
}

const promise = dependencies
.extractTransformAndLoadData({
replication,
datamartKnex: dependencies.datamartKnex,
datawarehouseKnex: dependencies.datawarehouseKnex,
})
.catch((err) =>
dependencies.logger.error(
{
event: 'replication',
err,
},
'Error during replication',
),
);

if (!request.query.async) {
await promise;
}
await replicationJobRepository.performAsync(new ReplicationJob({ replicationName }));

return h.response().code(204);
}
5 changes: 5 additions & 0 deletions api/src/maddo/domain/models/ReplicationJob.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export class ReplicationJob {
constructor({ replicationName }) {
this.replicationName = replicationName;
}
}
12 changes: 11 additions & 1 deletion api/src/maddo/domain/usecases/extract-transform-and-load-data.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
const DEFAULT_CHUNK_SIZE = 1_000;

export async function extractTransformAndLoadData({ replication, datawarehouseKnex, datamartKnex }) {
export async function extractTransformAndLoadData({
replicationName,
replicationRepository,
datawarehouseKnex,
datamartKnex,
}) {
const replication = replicationRepository.getByName(replicationName);

let context = { datawarehouseKnex, datamartKnex };

const additionnalContext = await replication.before?.(context);
Expand All @@ -9,11 +16,13 @@ export async function extractTransformAndLoadData({ replication, datawarehouseKn

const queryBuilder = replication.from(context);
let chunk = [];
let count = 0;
const chunkSize = replication.chunkSize ?? DEFAULT_CHUNK_SIZE;
const connection = await datamartKnex.context.client.acquireConnection();
try {
for await (const data of queryBuilder.stream()) {
chunk.push(replication.transform?.(data) ?? data);
count += 1;
if (chunk.length === chunkSize) {
await replication.to(context, chunk).connection(connection);
chunk = [];
Expand All @@ -23,6 +32,7 @@ export async function extractTransformAndLoadData({ replication, datawarehouseKn
if (chunk.length) {
await replication.to(context, chunk).connection(connection);
}
return { count };
} finally {
await datamartKnex.context.client.releaseConnection(connection);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import { JobRepository, JobRetry } from '../../../../shared/infrastructure/repositories/jobs/job-repository.js';
import { ReplicationJob } from '../../../domain/models/ReplicationJob.js';

export const replicationJobRepository = new JobRepository({ name: ReplicationJob.name, retry: JobRetry.FEW_RETRY });
Original file line number Diff line number Diff line change
@@ -1,4 +1,53 @@
export const replications = [];
export const replications = [
{
name: 'sco_certification_results',
before: async ({ datamartKnex }) => {
await datamartKnex('sco_certification_results').delete();
},
from: ({ datawarehouseKnex }) => {
return datawarehouseKnex('data_export_parcoursup_certif_result').select('*');
},
to: ({ datamartKnex }, chunk) => {
return datamartKnex('sco_certification_results').insert(chunk);
},
},
{
name: 'data_export_parcoursup_certif_result',
before: async ({ datamartKnex }) => {
await datamartKnex('data_export_parcoursup_certif_result').delete();
},
from: ({ datawarehouseKnex }) => {
return datawarehouseKnex('data_export_parcoursup_certif_result').select('*');
},
to: ({ datamartKnex }, chunk) => {
return datamartKnex('data_export_parcoursup_certif_result').insert(chunk);
},
},
{
name: 'certification_results',
before: async ({ datamartKnex }) => {
await datamartKnex('certification_results').delete();
},
from: ({ datawarehouseKnex }) => {
return datawarehouseKnex('data_export_parcoursup_certif_result_code_validation').select('*');
},
to: ({ datamartKnex }, chunk) => {
return datamartKnex('certification_results').insert(chunk);
},
},
{
name: 'data_export_parcoursup_certif_result_code_validation',
before: async ({ datamartKnex }) => {
await datamartKnex('data_export_parcoursup_certif_result_code_validation').delete();
},
from: ({ datawarehouseKnex }) => {
return datawarehouseKnex('data_export_parcoursup_certif_result_code_validation').select('*');
},
to: ({ datamartKnex }, chunk) => {
return datamartKnex('data_export_parcoursup_certif_result_code_validation').insert(chunk);
},
},
];

export function getByName(name, dependencies = { replications }) {
return dependencies.replications.find((replication) => replication.name === name);
Expand Down
4 changes: 3 additions & 1 deletion api/src/monitoring/infrastructure/metrics.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import metrics from 'datadog-metrics';

import { logger } from '../../shared/infrastructure/utils/logger.js';
import { child } from '../../shared/infrastructure/utils/logger.js';

const logger = child('metrics', { event: 'metrics' });

export class Metrics {
static intervals = [];
Expand Down
1 change: 1 addition & 0 deletions api/src/shared/application/jobs/job-controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { JobExpireIn } from '../../infrastructure/repositories/jobs/job-reposito
export const JobGroup = {
DEFAULT: 'default',
FAST: 'fast',
MADDO: 'maddo',
};

export class JobController {
Expand Down
Loading