Skip to content

Commit

Permalink
[TECH] Amélioration de l'utilisation de pgBoss.
Browse files Browse the repository at this point in the history
  • Loading branch information
pix-service-auto-merge authored Feb 6, 2025
2 parents c659647 + e495e2d commit a0801c7
Show file tree
Hide file tree
Showing 30 changed files with 154 additions and 204 deletions.
9 changes: 0 additions & 9 deletions api/db/database-builder/database-builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ class DatabaseBuilder {

const publicResults = await this.knex.raw(_constructRawQuery('public'));
const learningContentResults = await this.knex.raw(_constructRawQuery('learningcontent'));
const pgbossResults = await this.knex.raw(_constructRawQuery('pgboss'));

this.tablesOrderedByDependencyWithDirtinessMap = [];

Expand All @@ -209,13 +208,6 @@ class DatabaseBuilder {
isDirty: false,
});
});
pgbossResults.rows.forEach(({ table_name }) => {
if (table_name === 'version') return;
this.tablesOrderedByDependencyWithDirtinessMap.push({
table: `pgboss.${table_name}`,
isDirty: false,
});
});
}

_setTableAsDirty(table) {
Expand All @@ -240,7 +232,6 @@ class DatabaseBuilder {
const tableName = databaseHelpers.getTableNameFromInsertSqlQuery(queryData.sql);

if (!_.isEmpty(tableName)) {
if (tableName === 'pgboss.version') return;
this._setTableAsDirty(tableName);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { AnswerJob } from '../../../quest/domain/models/AnwserJob.js';
import { config } from '../../../shared/config.js';
import { DomainTransaction } from '../../../shared/domain/DomainTransaction.js';
import { temporaryStorage } from '../../../shared/infrastructure/key-value-storages/index.js';
import { JobRepository } from '../../../shared/infrastructure/repositories/jobs/job-repository.js';

Expand All @@ -19,17 +18,8 @@ export class AnswerJobRepository extends JobRepository {
async performAsync(job) {
if (!config.featureToggles.isAsyncQuestRewardingCalculationEnabled || !config.featureToggles.isQuestEnabled) return;

const knexConn = DomainTransaction.getConnection();

if (knexConn.isTransaction) {
await super.performAsync(job);
await this.#profileRewardTemporaryStorage.increment(job.userId);
} else {
await DomainTransaction.execute(async () => {
await super.performAsync(job);
await this.#profileRewardTemporaryStorage.increment(job.userId);
});
}
await super.performAsync(job);
await this.#profileRewardTemporaryStorage.increment(job.userId);
}
}

Expand Down
2 changes: 2 additions & 0 deletions api/src/shared/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ const configuration = (function () {
exportSenderJobEnabled: process.env.PGBOSS_EXPORT_SENDER_JOB_ENABLED
? toBoolean(process.env.PGBOSS_EXPORT_SENDER_JOB_ENABLED)
: true,
databaseUrl: process.env.DATABASE_URL,
},
poleEmploi: {
clientId: process.env.POLE_EMPLOI_CLIENT_ID,
Expand Down Expand Up @@ -633,6 +634,7 @@ const configuration = (function () {
config.identityProviderConfigKey = null;

config.apiManager.url = 'http://external-partners-access/';
config.pgBoss.databaseUrl = process.env.TEST_DATABASE_URL;
}

return config;
Expand Down
27 changes: 11 additions & 16 deletions api/src/shared/infrastructure/repositories/jobs/job-repository.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Joi from 'joi';

import { DomainTransaction } from '../../../domain/DomainTransaction.js';
import { EntityValidationError } from '../../../domain/errors.js';
import { pgBoss } from './pg-boss.js';

export class JobRepository {
#schema = Joi.object({
Expand Down Expand Up @@ -46,24 +46,19 @@ export class JobRepository {
#buildPayload(data) {
return {
name: this.name,
retrylimit: this.retry.retryLimit,
retrydelay: this.retry.retryDelay,
retrybackoff: this.retry.retryBackoff,
expirein: this.expireIn,
data,
on_complete: true,
retryLimit: this.retry.retryLimit,
retryDelay: this.retry.retryDelay,
retryBackoff: this.retry.retryBackoff,
expireInSeconds: this.expireIn,
onComplete: true,
priority: this.priority,
};
}

async #send(jobs) {
const knexConn = DomainTransaction.getConnection();

const results = await knexConn.batchInsert('pgboss.job', jobs);

const rowCount = results.reduce((total, batchResult) => total + (batchResult.rowCount || 0), 0);

return { rowCount };
await pgBoss.insert(jobs);
return { rowCount: jobs.length };
}

async performAsync(...datas) {
Expand Down Expand Up @@ -123,12 +118,12 @@ export const JobRetry = Object.freeze({
});

/**
* Job expireIn. define few config to set expireIn field
* Job expireIn. define few config to set expireInSeconds field
* @see https://github.com/timgit/pg-boss/blob/9.0.3/docs/readme.md#insertjobs
* @readonly
* @enum {string}
*/
export const JobExpireIn = Object.freeze({
DEFAULT: '00:15:00',
HIGH: '00:30:00',
DEFAULT: 15 * 60,
HIGH: 30 * 60,
});
12 changes: 12 additions & 0 deletions api/src/shared/infrastructure/repositories/jobs/pg-boss.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import PgBoss from 'pg-boss';

import { config } from '../../../config.js';

const monitorStateIntervalSeconds = config.pgBoss.monitorStateIntervalSeconds;

export const pgBoss = new PgBoss({
connectionString: config.pgBoss.databaseUrl,
max: config.pgBoss.connexionPoolMaxSize,
...(monitorStateIntervalSeconds ? { monitorStateIntervalSeconds } : {}),
archiveFailedAfterSeconds: config.pgBoss.archiveFailedAfterSeconds,
});
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ describe('Integration | Certification | Infrastructure | Repository | Jobs | cer

// then
await expect(CertificationRescoringByScriptJob.name).to.have.been.performed.withJob({
retrylimit: 0,
retrydelay: 0,
retrybackoff: false,
retryLimit: 0,
retryDelay: 0,
retryBackoff: false,
data: {
certificationCourseId: 777,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { AnswerJobRepository } from '../../../../../src/evaluation/infrastructure/repositories/answer-job-repository.js';
import { config } from '../../../../../src/shared/config.js';
import { DomainTransaction } from '../../../../../src/shared/domain/DomainTransaction.js';
import { pgBoss } from '../../../../../src/shared/infrastructure/repositories/jobs/pg-boss.js';
import { expect, sinon } from '../../../../test-helper.js';

describe('Evaluation | Unit | Infrastructure | Repositories | AnswerJobRepository', function () {
beforeEach(function () {
sinon.stub(config, 'featureToggles');
sinon.stub(pgBoss, 'insert').resolves([]);
config.featureToggles.isQuestEnabled = true;
config.featureToggles.isAsyncQuestRewardingCalculationEnabled = true;
});
Expand All @@ -14,11 +15,6 @@ describe('Evaluation | Unit | Infrastructure | Repositories | AnswerJobRepositor
it('should do nothing if quests are disabled', async function () {
// given
const profileRewardTemporaryStorageStub = { increment: sinon.stub() };
const knexStub = { batchInsert: sinon.stub().resolves([]) };
sinon.stub(DomainTransaction, 'getConnection').returns(knexStub);
sinon.stub(DomainTransaction, 'execute').callsFake((callback) => {
return callback();
});
config.featureToggles.isQuestEnabled = false;
const userId = Symbol('userId');
const answerJobRepository = new AnswerJobRepository({
Expand All @@ -35,11 +31,6 @@ describe('Evaluation | Unit | Infrastructure | Repositories | AnswerJobRepositor
it('should do nothing if quests are in sync mode', async function () {
// given
const profileRewardTemporaryStorageStub = { increment: sinon.stub() };
const knexStub = { batchInsert: sinon.stub().resolves([]) };
sinon.stub(DomainTransaction, 'getConnection').returns(knexStub);
sinon.stub(DomainTransaction, 'execute').callsFake((callback) => {
return callback();
});
config.featureToggles.isAsyncQuestRewardingCalculationEnabled = false;
const userId = Symbol('userId');
const answerJobRepository = new AnswerJobRepository({
Expand All @@ -56,11 +47,6 @@ describe('Evaluation | Unit | Infrastructure | Repositories | AnswerJobRepositor
it("should increment user's jobs count in temporary storage", async function () {
// given
const profileRewardTemporaryStorageStub = { increment: sinon.stub() };
const knexStub = { batchInsert: sinon.stub().resolves([]) };
sinon.stub(DomainTransaction, 'getConnection').returns(knexStub);
sinon.stub(DomainTransaction, 'execute').callsFake((callback) => {
return callback();
});
const userId = Symbol('userId');
const answerJobRepository = new AnswerJobRepository({
dependencies: { profileRewardTemporaryStorage: profileRewardTemporaryStorageStub },
Expand All @@ -72,47 +58,5 @@ describe('Evaluation | Unit | Infrastructure | Repositories | AnswerJobRepositor
// then
expect(profileRewardTemporaryStorageStub.increment).to.have.been.calledWith(userId);
});

describe('should use transaction in all cases', function () {
it('should use existing transaction', async function () {
// given
const profileRewardTemporaryStorageStub = { increment: sinon.stub() };
const knexStub = { batchInsert: sinon.stub().resolves([]), isTransaction: true };
sinon.stub(DomainTransaction, 'getConnection').returns(knexStub);
sinon.stub(DomainTransaction, 'execute').callsFake((callback) => {
return callback();
});
const userId = Symbol('userId');
const answerJobRepository = new AnswerJobRepository({
dependencies: { profileRewardTemporaryStorage: profileRewardTemporaryStorageStub },
});

// when
await answerJobRepository.performAsync({ userId });

// then
expect(DomainTransaction.execute).to.have.not.been.called;
});

it('should create new transaction', async function () {
// given
const profileRewardTemporaryStorageStub = { increment: sinon.stub() };
const knexStub = { batchInsert: sinon.stub().resolves([]), isTransaction: false };
sinon.stub(DomainTransaction, 'getConnection').returns(knexStub);
sinon.stub(DomainTransaction, 'execute').callsFake((callback) => {
return callback();
});
const userId = Symbol('userId');
const answerJobRepository = new AnswerJobRepository({
dependencies: { profileRewardTemporaryStorage: profileRewardTemporaryStorageStub },
});

// when
await answerJobRepository.performAsync({ userId });

// then
expect(DomainTransaction.execute).to.have.been.called;
});
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ describe('Integration | Prescription | Application | Jobs | garAnonymizedBatchEv

// then
await expect(GarAnonymizedBatchEventsLoggingJob.name).to.have.been.performed.withJob({
retrylimit: 10,
retrydelay: 30,
retrybackoff: true,
retryLimit: 10,
retryDelay: 30,
retryBackoff: true,
data: {
userIds: [13, 42],
updatedByUserId: 777,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ describe('Integration | Repository | Jobs | CertificationCompletedJobRepository'

// then
await expect(CertificationCompletedJob.name).to.have.been.performed.withJob({
retrylimit: 10,
retrydelay: 30,
retrybackoff: true,
retryLimit: 10,
retryDelay: 30,
retryBackoff: true,
priority: JobPriority.HIGH,
data,
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { RescoreCertificationScript } from '../../../../scripts/certification/rescore-certifications.js';
import { createTempFile, expect, knex, sinon } from '../../../test-helper.js';
import { createTempFile, expect, sinon } from '../../../test-helper.js';

describe('Integration | Scripts | Certification | rescore-certfication', function () {
it('should parse input file', async function () {
Expand Down Expand Up @@ -28,17 +28,9 @@ describe('Integration | Scripts | Certification | rescore-certfication', functio
await script.handle({ logger, options: { file } });

// then
const [job1, job2] = await knex('pgboss.job')
.where({ name: 'CertificationRescoringByScriptJob' })
.orderBy('createdon', 'asc');

expect([job1.data, job2.data]).to.have.deep.members([
{
certificationCourseId: 1,
},
{
certificationCourseId: 2,
},
await expect('CertificationRescoringByScriptJob').to.have.been.performed.withJobPayloads([
{ certificationCourseId: 1 },
{ certificationCourseId: 2 },
]);
});
});
14 changes: 10 additions & 4 deletions api/tests/integration/tooling/jobs/expect-job.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ describe('Integration | Tooling | Expect Job', function () {
await expect('JobTest').to.have.been.performed.withJob({
name: 'JobTest',
data: { foo: 'bar' },
retrylimit: job.retry.retryLimit,
retrydelay: job.retry.retryDelay,
retrybackoff: job.retry.retryBackoff,
expirein: job.expireIn,
retryLimit: job.retry.retryLimit,
retryDelay: job.retry.retryDelay,
retryBackoff: job.retry.retryBackoff,
expireIn: job.expireIn,
});
});

Expand Down Expand Up @@ -208,6 +208,12 @@ describe('Integration | Tooling | Expect Job', function () {
data: { my_data: 'awesome_data' },
options: { tz: 'Europe/Paris' },
});
await jobQueue.scheduleCronJob({
name: 'otherJob',
cron: '*/5 * * * *',
data: { my_data: 'awesome_data' },
options: { tz: 'Europe/Paris' },
});

// then
await expect(jobName).to.have.been.schedule.withCronJobsCount(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ describe('Learning Content | Integration | Repository | Jobs | LcmsCreateRelease

// then
await expect(LcmsCreateReleaseJob.name).to.have.been.performed.withJob({
retrylimit: 0,
retrydelay: 0,
retrybackoff: false,
retryLimit: 0,
retryDelay: 0,
retryBackoff: false,
data: { userId: 123 },
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ describe('Learning Content | Integration | Repository | Jobs | LcmsRefreshCacheJ

// then
await expect(LcmsRefreshCacheJob.name).to.have.been.performed.withJob({
retrylimit: 0,
retrydelay: 0,
retrybackoff: false,
retryLimit: 0,
retryDelay: 0,
retryBackoff: false,
data: { userId: 123 },
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ describe('Integration | Prescription | Infrastructure | Repository | Jobs | part

// then
await expect(ParticipationCompletedJob.name).to.have.been.performed.withJob({
retrylimit: 0,
retrydelay: 0,
retrybackoff: false,
retryLimit: 0,
retryDelay: 0,
retryBackoff: false,
data,
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ describe('Integration | Prescription | Infrastructure | Repository | Jobs | part
// then
await expect(ParticipationResultCalculationJob.name).to.have.been.performed.withJob({
name: ParticipationResultCalculationJob.name,
retrylimit: JobRetry.FEW_RETRY.retryLimit,
retrydelay: JobRetry.FEW_RETRY.retryDelay,
retrybackoff: JobRetry.FEW_RETRY.retryBackoff,
retryLimit: JobRetry.FEW_RETRY.retryLimit,
retryDelay: JobRetry.FEW_RETRY.retryDelay,
retryBackoff: JobRetry.FEW_RETRY.retryBackoff,
data: { campaignParticipationId: 3 },
});
});
Expand Down
Loading

0 comments on commit a0801c7

Please sign in to comment.