Skip to content

Commit 40d5aa9

Browse files
[TECH] Enregistrer l'historique des réplications des données froides (PIX-16710)(PIX-16712).
#11527
2 parents 4a4ec7f + 1102e8b commit 40d5aa9

23 files changed

+416
-232
lines changed

api/.ls-lint.yml

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ ls:
2929
ignore:
3030
- .git
3131
- node_modules
32+
- datamart/migrations
3233
- db/migrations
3334
- db/seeds
3435
- scripts

api/Procfile-maddo

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ postdeploy: npm run postdeploy:maddo
44
# see https://github.com/1024pix/pix/pull/796
55
# and https://github.com/npm/npm/issues/4603
66
# for more information
7-
web: exec node maddo.js
7+
web: exec node index.maddo.js
8+
worker: exec node worker.js maddo
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
const TABLE_NAME = 'sco_certification_results';
2+
3+
/**
4+
* @param { import("knex").Knex } knex
5+
* @returns { Promise<void> }
6+
*/
7+
const up = async function (knex) {
8+
await knex.schema.createTable(TABLE_NAME, function (table) {
9+
table.string('national_student_id');
10+
table.index('national_student_id');
11+
table.string('organization_uai');
12+
table.string('last_name');
13+
table.string('first_name');
14+
table.date('birthdate');
15+
table.text('status');
16+
table.integer('pix_score');
17+
table.timestamp('certification_date');
18+
table.integer('competence_level');
19+
table.index('organization_uai');
20+
table.string('competence_name');
21+
table.string('competence_code');
22+
table.string('area_name');
23+
table.string('certification_courses_id');
24+
});
25+
await knex.schema.raw('drop collation if exists "sco_certification_results_case_accent_punctuation_insensitive"');
26+
await knex.schema.raw(
27+
'create collation "sco_certification_results_case_accent_punctuation_insensitive" (provider = icu, locale = "und-u-ka-shifted-ks-level1-kv-punct", deterministic = false);',
28+
);
29+
30+
await knex.schema.raw(
31+
'ALTER TABLE :tableName: alter column first_name type varchar(255) COLLATE sco_certification_results_case_accent_punctuation_insensitive;',
32+
{ tableName: TABLE_NAME },
33+
);
34+
await knex.schema.raw(
35+
'ALTER TABLE :tableName: alter column last_name type varchar(255) COLLATE sco_certification_results_case_accent_punctuation_insensitive;',
36+
{ tableName: TABLE_NAME },
37+
);
38+
};
39+
40+
/**
41+
* @param { import("knex").Knex } knex
42+
* @returns { Promise<void> }
43+
*/
44+
const down = async function (knex) {
45+
await knex.schema.dropTable(TABLE_NAME);
46+
};
47+
48+
export { down, up };
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
const TABLE_NAME = 'certification_results';
2+
3+
/**
4+
* @param { import("knex").Knex } knex
5+
* @returns { Promise<void> }
6+
*/
7+
const up = async function (knex) {
8+
await knex.schema.createTable(TABLE_NAME, function (table) {
9+
table.string('certification_code_verification');
10+
table.string('last_name');
11+
table.string('first_name');
12+
table.date('birthdate');
13+
table.text('status');
14+
table.integer('pix_score');
15+
table.timestamp('certification_date');
16+
table.integer('competence_level');
17+
table.index('certification_code_verification');
18+
table.string('competence_name');
19+
table.string('competence_code');
20+
table.string('area_name');
21+
table.string('certification_courses_id');
22+
table.string('national_student_id');
23+
table.string('organization_uai');
24+
});
25+
};
26+
27+
/**
28+
* @param { import("knex").Knex } knex
29+
* @returns { Promise<void> }
30+
*/
31+
const down = async function (knex) {
32+
await knex.schema.dropTable(TABLE_NAME);
33+
};
34+
35+
export { down, up };

api/db/seeds/data/common/common-builder.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ function createClientApplications(databaseBuilder) {
8585
name: 'pixData',
8686
clientId: 'pixData',
8787
clientSecret: 'pixdatasecret',
88-
scopes: ['statistics'],
88+
scopes: ['statistics', 'replication'],
8989
});
9090
databaseBuilder.factory.buildClientApplication({
9191
name: 'parcoursup',

api/index.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ import 'dotenv/config';
33
import { databaseConnections } from './db/database-connections.js';
44
import { databaseConnection as liveDatabaseConnection } from './db/knex-database-connection.js';
55
import { createServer } from './server.js';
6+
import { JobGroup } from './src/shared/application/jobs/job-controller.js';
67
import { config, schema as configSchema } from './src/shared/config.js';
78
import { learningContentCache } from './src/shared/infrastructure/caches/learning-content-cache.js';
89
import { quitAllStorages } from './src/shared/infrastructure/key-value-storages/index.js';
910
import { logger } from './src/shared/infrastructure/utils/logger.js';
1011
import { redisMonitor } from './src/shared/infrastructure/utils/redis-monitor.js';
1112
import { validateEnvironmentVariables } from './src/shared/infrastructure/validate-environment-variables.js';
13+
import { registerJobs } from './worker.js';
1214

1315
validateEnvironmentVariables(configSchema);
1416

@@ -62,7 +64,7 @@ process.on('SIGINT', () => {
6264
try {
6365
await start();
6466
if (config.infra.startJobInWebProcess) {
65-
import('./worker.js');
67+
registerJobs({ jobGroups: [JobGroup.DEFAULT, JobGroup.FAST] });
6668
}
6769
} catch (error) {
6870
logger.error(error);

api/maddo.js api/index.maddo.js

+6-1
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ import 'dotenv/config';
22

33
import { databaseConnections } from './db/database-connections.js';
44
import { createMaddoServer } from './server.maddo.js';
5-
import { schema as configSchema } from './src/shared/config.js';
5+
import { JobGroup } from './src/shared/application/jobs/job-controller.js';
6+
import { config, schema as configSchema } from './src/shared/config.js';
67
import { quitAllStorages } from './src/shared/infrastructure/key-value-storages/index.js';
78
import { logger } from './src/shared/infrastructure/utils/logger.js';
89
import { redisMonitor } from './src/shared/infrastructure/utils/redis-monitor.js';
910
import { validateEnvironmentVariables } from './src/shared/infrastructure/validate-environment-variables.js';
11+
import { registerJobs } from './worker.js';
1012

1113
validateEnvironmentVariables(configSchema);
1214

@@ -45,6 +47,9 @@ process.on('SIGINT', () => {
4547
(async () => {
4648
try {
4749
await start();
50+
if (config.infra.startJobInWebProcess) {
51+
registerJobs({ jobGroups: [JobGroup.MADDO] });
52+
}
4853
} catch (error) {
4954
logger.error(error);
5055
throw error;

api/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@
172172
"scalingo-postbuild": "node scripts/generate-cron > cron.json && node scripts/generate-procfile",
173173
"dev": "nodemon index.js",
174174
"start": "node index.js",
175-
"start:maddo": "MADDO=true node maddo.js",
175+
"start:maddo": "MADDO=true node index.maddo.js",
176176
"start:watch": "npm run dev",
177177
"start:job": "node worker.js",
178178
"start:job:fast": "node worker.js fast",

api/server.maddo.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { parse } from 'neoqs';
44

55
import { setupErrorHandling } from './config/server-setup-error-handling.js';
66
import { knex } from './db/knex-database-connection.js';
7+
import * as authenticationRoutes from './lib/application/authentication/index.js';
78
import { authentication } from './lib/infrastructure/authentication.js';
89
import * as replicationRoutes from './src/maddo/application/replications-routes.js';
910
import { Metrics } from './src/monitoring/infrastructure/metrics.js';
@@ -178,7 +179,7 @@ const setupAuthentication = function (server) {
178179
};
179180

180181
const setupRoutesAndPlugins = async function (server) {
181-
await server.register([...plugins, healthcheckRoutes, replicationRoutes]);
182+
await server.register([...plugins, healthcheckRoutes, authenticationRoutes, replicationRoutes]);
182183
};
183184

184185
const setupOpenApiSpecification = async function (server) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import { knex as datamartKnex } from '../../../../datamart/knex-database-connection.js';
2+
import { knex as datawarehouseKnex } from '../../../../datawarehouse/knex-database-connection.js';
3+
import { JobController, JobGroup } from '../../../shared/application/jobs/job-controller.js';
4+
import { ReplicationJob } from '../../domain/models/ReplicationJob.js';
5+
import { extractTransformAndLoadData } from '../../domain/usecases/extract-transform-and-load-data.js';
6+
import * as replicationRepository from '../../infrastructure/repositories/replication-repository.js';
7+
8+
export class ReplicationJobController extends JobController {
9+
constructor() {
10+
super(ReplicationJob.name, { jobGroup: JobGroup.MADDO });
11+
}
12+
13+
async handle({
14+
data: { replicationName },
15+
dependencies = { extractTransformAndLoadData, replicationRepository, datamartKnex, datawarehouseKnex },
16+
}) {
17+
const { extractTransformAndLoadData, replicationRepository, datamartKnex, datawarehouseKnex } = dependencies;
18+
return extractTransformAndLoadData({
19+
replicationName,
20+
replicationRepository,
21+
datamartKnex,
22+
datawarehouseKnex,
23+
});
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,25 @@
1-
import { knex as datamartKnex } from '../../../datamart/knex-database-connection.js';
2-
import { knex as datawarehouseKnex } from '../../../datawarehouse/knex-database-connection.js';
3-
import { logger } from '../../shared/infrastructure/utils/logger.js';
4-
import { extractTransformAndLoadData } from '../domain/usecases/extract-transform-and-load-data.js';
1+
import { ReplicationJob } from '../domain/models/ReplicationJob.js';
2+
import { replicationJobRepository } from '../infrastructure/repositories/jobs/replication-job-repository.js';
53
import * as replicationRepository from '../infrastructure/repositories/replication-repository.js';
64

75
export async function replicate(
86
request,
97
h,
108
dependencies = {
11-
extractTransformAndLoadData,
129
replicationRepository,
13-
datamartKnex,
14-
datawarehouseKnex,
15-
logger,
10+
replicationJobRepository,
1611
},
1712
) {
13+
const { replicationRepository, replicationJobRepository } = dependencies;
1814
const { replicationName } = request.params;
1915

20-
const replication = dependencies.replicationRepository.getByName(replicationName);
16+
const replication = replicationRepository.getByName(replicationName);
2117

2218
if (!replication) {
2319
return h.response().code(404);
2420
}
2521

26-
const promise = dependencies
27-
.extractTransformAndLoadData({
28-
replication,
29-
datamartKnex: dependencies.datamartKnex,
30-
datawarehouseKnex: dependencies.datawarehouseKnex,
31-
})
32-
.catch((err) =>
33-
dependencies.logger.error(
34-
{
35-
event: 'replication',
36-
err,
37-
},
38-
'Error during replication',
39-
),
40-
);
41-
42-
if (!request.query.async) {
43-
await promise;
44-
}
22+
await replicationJobRepository.performAsync(new ReplicationJob({ replicationName }));
4523

4624
return h.response().code(204);
4725
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export class ReplicationJob {
2+
constructor({ replicationName }) {
3+
this.replicationName = replicationName;
4+
}
5+
}

api/src/maddo/domain/usecases/extract-transform-and-load-data.js

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
const DEFAULT_CHUNK_SIZE = 1_000;
22

3-
export async function extractTransformAndLoadData({ replication, datawarehouseKnex, datamartKnex }) {
3+
export async function extractTransformAndLoadData({
4+
replicationName,
5+
replicationRepository,
6+
datawarehouseKnex,
7+
datamartKnex,
8+
}) {
9+
const replication = replicationRepository.getByName(replicationName);
10+
411
let context = { datawarehouseKnex, datamartKnex };
512

613
const additionnalContext = await replication.before?.(context);
@@ -9,11 +16,13 @@ export async function extractTransformAndLoadData({ replication, datawarehouseKn
916

1017
const queryBuilder = replication.from(context);
1118
let chunk = [];
19+
let count = 0;
1220
const chunkSize = replication.chunkSize ?? DEFAULT_CHUNK_SIZE;
1321
const connection = await datamartKnex.context.client.acquireConnection();
1422
try {
1523
for await (const data of queryBuilder.stream()) {
1624
chunk.push(replication.transform?.(data) ?? data);
25+
count += 1;
1726
if (chunk.length === chunkSize) {
1827
await replication.to(context, chunk).connection(connection);
1928
chunk = [];
@@ -23,6 +32,7 @@ export async function extractTransformAndLoadData({ replication, datawarehouseKn
2332
if (chunk.length) {
2433
await replication.to(context, chunk).connection(connection);
2534
}
35+
return { count };
2636
} finally {
2737
await datamartKnex.context.client.releaseConnection(connection);
2838
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
import { JobRepository, JobRetry } from '../../../../shared/infrastructure/repositories/jobs/job-repository.js';
2+
import { ReplicationJob } from '../../../domain/models/ReplicationJob.js';
3+
4+
export const replicationJobRepository = new JobRepository({ name: ReplicationJob.name, retry: JobRetry.FEW_RETRY });

api/src/maddo/infrastructure/repositories/replication-repository.js

+50-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,53 @@
1-
export const replications = [];
1+
export const replications = [
2+
{
3+
name: 'sco_certification_results',
4+
before: async ({ datamartKnex }) => {
5+
await datamartKnex('sco_certification_results').delete();
6+
},
7+
from: ({ datawarehouseKnex }) => {
8+
return datawarehouseKnex('data_export_parcoursup_certif_result').select('*');
9+
},
10+
to: ({ datamartKnex }, chunk) => {
11+
return datamartKnex('sco_certification_results').insert(chunk);
12+
},
13+
},
14+
{
15+
name: 'data_export_parcoursup_certif_result',
16+
before: async ({ datamartKnex }) => {
17+
await datamartKnex('data_export_parcoursup_certif_result').delete();
18+
},
19+
from: ({ datawarehouseKnex }) => {
20+
return datawarehouseKnex('data_export_parcoursup_certif_result').select('*');
21+
},
22+
to: ({ datamartKnex }, chunk) => {
23+
return datamartKnex('data_export_parcoursup_certif_result').insert(chunk);
24+
},
25+
},
26+
{
27+
name: 'certification_results',
28+
before: async ({ datamartKnex }) => {
29+
await datamartKnex('certification_results').delete();
30+
},
31+
from: ({ datawarehouseKnex }) => {
32+
return datawarehouseKnex('data_export_parcoursup_certif_result_code_validation').select('*');
33+
},
34+
to: ({ datamartKnex }, chunk) => {
35+
return datamartKnex('certification_results').insert(chunk);
36+
},
37+
},
38+
{
39+
name: 'data_export_parcoursup_certif_result_code_validation',
40+
before: async ({ datamartKnex }) => {
41+
await datamartKnex('data_export_parcoursup_certif_result_code_validation').delete();
42+
},
43+
from: ({ datawarehouseKnex }) => {
44+
return datawarehouseKnex('data_export_parcoursup_certif_result_code_validation').select('*');
45+
},
46+
to: ({ datamartKnex }, chunk) => {
47+
return datamartKnex('data_export_parcoursup_certif_result_code_validation').insert(chunk);
48+
},
49+
},
50+
];
251

352
export function getByName(name, dependencies = { replications }) {
453
return dependencies.replications.find((replication) => replication.name === name);

api/src/monitoring/infrastructure/metrics.js

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import metrics from 'datadog-metrics';
22

3-
import { logger } from '../../shared/infrastructure/utils/logger.js';
3+
import { child } from '../../shared/infrastructure/utils/logger.js';
4+
5+
const logger = child('metrics', { event: 'metrics' });
46

57
export class Metrics {
68
static intervals = [];

api/src/shared/application/jobs/job-controller.js

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { JobExpireIn } from '../../infrastructure/repositories/jobs/job-reposito
77
export const JobGroup = {
88
DEFAULT: 'default',
99
FAST: 'fast',
10+
MADDO: 'maddo',
1011
};
1112

1213
export class JobController {

0 commit comments

Comments
 (0)