diff --git a/api/db/database-builder/database-builder.js b/api/db/database-builder/database-builder.js index 7d34d5c65b0..c6970742447 100644 --- a/api/db/database-builder/database-builder.js +++ b/api/db/database-builder/database-builder.js @@ -193,7 +193,6 @@ class DatabaseBuilder { const publicResults = await this.knex.raw(_constructRawQuery('public')); const learningContentResults = await this.knex.raw(_constructRawQuery('learningcontent')); - const pgbossResults = await this.knex.raw(_constructRawQuery('pgboss')); this.tablesOrderedByDependencyWithDirtinessMap = []; @@ -209,13 +208,6 @@ class DatabaseBuilder { isDirty: false, }); }); - pgbossResults.rows.forEach(({ table_name }) => { - if (table_name === 'version') return; - this.tablesOrderedByDependencyWithDirtinessMap.push({ - table: `pgboss.${table_name}`, - isDirty: false, - }); - }); } _setTableAsDirty(table) { @@ -240,7 +232,6 @@ class DatabaseBuilder { const tableName = databaseHelpers.getTableNameFromInsertSqlQuery(queryData.sql); if (!_.isEmpty(tableName)) { - if (tableName === 'pgboss.version') return; this._setTableAsDirty(tableName); } } diff --git a/api/src/evaluation/infrastructure/repositories/answer-job-repository.js b/api/src/evaluation/infrastructure/repositories/answer-job-repository.js index 7ec1838079b..daa165ed8a1 100644 --- a/api/src/evaluation/infrastructure/repositories/answer-job-repository.js +++ b/api/src/evaluation/infrastructure/repositories/answer-job-repository.js @@ -1,6 +1,5 @@ import { AnswerJob } from '../../../quest/domain/models/AnwserJob.js'; import { config } from '../../../shared/config.js'; -import { DomainTransaction } from '../../../shared/domain/DomainTransaction.js'; import { temporaryStorage } from '../../../shared/infrastructure/key-value-storages/index.js'; import { JobRepository } from '../../../shared/infrastructure/repositories/jobs/job-repository.js'; @@ -19,17 +18,8 @@ export class AnswerJobRepository extends JobRepository { async performAsync(job) { if (!config.featureToggles.isAsyncQuestRewardingCalculationEnabled || !config.featureToggles.isQuestEnabled) return; - const knexConn = DomainTransaction.getConnection(); - - if (knexConn.isTransaction) { - await super.performAsync(job); - await this.#profileRewardTemporaryStorage.increment(job.userId); - } else { - await DomainTransaction.execute(async () => { - await super.performAsync(job); - await this.#profileRewardTemporaryStorage.increment(job.userId); - }); - } + await super.performAsync(job); + await this.#profileRewardTemporaryStorage.increment(job.userId); } } diff --git a/api/src/shared/config.js b/api/src/shared/config.js index 6d073ab5000..907a70c22c2 100644 --- a/api/src/shared/config.js +++ b/api/src/shared/config.js @@ -388,6 +388,7 @@ const configuration = (function () { exportSenderJobEnabled: process.env.PGBOSS_EXPORT_SENDER_JOB_ENABLED ? toBoolean(process.env.PGBOSS_EXPORT_SENDER_JOB_ENABLED) : true, + databaseUrl: process.env.DATABASE_URL, }, poleEmploi: { clientId: process.env.POLE_EMPLOI_CLIENT_ID, @@ -633,6 +634,7 @@ const configuration = (function () { config.identityProviderConfigKey = null; config.apiManager.url = 'http://external-partners-access/'; + config.pgBoss.databaseUrl = process.env.TEST_DATABASE_URL; } return config; diff --git a/api/src/shared/infrastructure/repositories/jobs/job-repository.js b/api/src/shared/infrastructure/repositories/jobs/job-repository.js index e470c5b321a..4fea4b455b5 100644 --- a/api/src/shared/infrastructure/repositories/jobs/job-repository.js +++ b/api/src/shared/infrastructure/repositories/jobs/job-repository.js @@ -1,7 +1,7 @@ import Joi from 'joi'; -import { DomainTransaction } from '../../../domain/DomainTransaction.js'; import { EntityValidationError } from '../../../domain/errors.js'; +import { pgBoss } from './pg-boss.js'; export class JobRepository { #schema = Joi.object({ @@ -46,24 +46,19 @@ export class JobRepository { #buildPayload(data) { return { name: this.name, - retrylimit: this.retry.retryLimit, - retrydelay: this.retry.retryDelay, - retrybackoff: this.retry.retryBackoff, - expirein: this.expireIn, data, - on_complete: true, + retryLimit: this.retry.retryLimit, + retryDelay: this.retry.retryDelay, + retryBackoff: this.retry.retryBackoff, + expireInSeconds: this.expireIn, + onComplete: true, priority: this.priority, }; } async #send(jobs) { - const knexConn = DomainTransaction.getConnection(); - - const results = await knexConn.batchInsert('pgboss.job', jobs); - - const rowCount = results.reduce((total, batchResult) => total + (batchResult.rowCount || 0), 0); - - return { rowCount }; + await pgBoss.insert(jobs); + return { rowCount: jobs.length }; } async performAsync(...datas) { @@ -123,12 +118,12 @@ export const JobRetry = Object.freeze({ }); /** - * Job expireIn. define few config to set expireIn field + * Job expireIn. define few config to set expireInSeconds field * @see https://github.com/timgit/pg-boss/blob/9.0.3/docs/readme.md#insertjobs * @readonly * @enum {string} */ export const JobExpireIn = Object.freeze({ - DEFAULT: '00:15:00', - HIGH: '00:30:00', + DEFAULT: 15 * 60, + HIGH: 30 * 60, }); diff --git a/api/src/shared/infrastructure/repositories/jobs/pg-boss.js b/api/src/shared/infrastructure/repositories/jobs/pg-boss.js new file mode 100644 index 00000000000..a10e260d59f --- /dev/null +++ b/api/src/shared/infrastructure/repositories/jobs/pg-boss.js @@ -0,0 +1,12 @@ +import PgBoss from 'pg-boss'; + +import { config } from '../../../config.js'; + +const monitorStateIntervalSeconds = config.pgBoss.monitorStateIntervalSeconds; + +export const pgBoss = new PgBoss({ + connectionString: config.pgBoss.databaseUrl, + max: config.pgBoss.connexionPoolMaxSize, + ...(monitorStateIntervalSeconds ? { monitorStateIntervalSeconds } : {}), + archiveFailedAfterSeconds: config.pgBoss.archiveFailedAfterSeconds, +}); diff --git a/api/tests/certification/session-management/integration/infrastructure/repositories/jobs/certification-rescoring-by-script-job-repository_test.js b/api/tests/certification/session-management/integration/infrastructure/repositories/jobs/certification-rescoring-by-script-job-repository_test.js index 4a15fdbf7f5..b4265ef3e56 100644 --- a/api/tests/certification/session-management/integration/infrastructure/repositories/jobs/certification-rescoring-by-script-job-repository_test.js +++ b/api/tests/certification/session-management/integration/infrastructure/repositories/jobs/certification-rescoring-by-script-job-repository_test.js @@ -14,9 +14,9 @@ describe('Integration | Certification | Infrastructure | Repository | Jobs | cer // then await expect(CertificationRescoringByScriptJob.name).to.have.been.performed.withJob({ - retrylimit: 0, - retrydelay: 0, - retrybackoff: false, + retryLimit: 0, + retryDelay: 0, + retryBackoff: false, data: { certificationCourseId: 777, }, diff --git a/api/tests/evaluation/unit/infrastructure/repositories/answer-job-repository_test.js b/api/tests/evaluation/unit/infrastructure/repositories/answer-job-repository_test.js index 7226186cee4..1cb9083f0fb 100644 --- a/api/tests/evaluation/unit/infrastructure/repositories/answer-job-repository_test.js +++ b/api/tests/evaluation/unit/infrastructure/repositories/answer-job-repository_test.js @@ -1,11 +1,12 @@ import { AnswerJobRepository } from '../../../../../src/evaluation/infrastructure/repositories/answer-job-repository.js'; import { config } from '../../../../../src/shared/config.js'; -import { DomainTransaction } from '../../../../../src/shared/domain/DomainTransaction.js'; +import { pgBoss } from '../../../../../src/shared/infrastructure/repositories/jobs/pg-boss.js'; import { expect, sinon } from '../../../../test-helper.js'; describe('Evaluation | Unit | Infrastructure | Repositories | AnswerJobRepository', function () { beforeEach(function () { sinon.stub(config, 'featureToggles'); + sinon.stub(pgBoss, 'insert').resolves([]); config.featureToggles.isQuestEnabled = true; config.featureToggles.isAsyncQuestRewardingCalculationEnabled = true; }); @@ -14,11 +15,6 @@ describe('Evaluation | Unit | Infrastructure | Repositories | AnswerJobRepositor it('should do nothing if quests are disabled', async function () { // given const profileRewardTemporaryStorageStub = { increment: sinon.stub() }; - const knexStub = { batchInsert: sinon.stub().resolves([]) }; - sinon.stub(DomainTransaction, 'getConnection').returns(knexStub); - sinon.stub(DomainTransaction, 'execute').callsFake((callback) => { - return callback(); - }); config.featureToggles.isQuestEnabled = false; const userId = Symbol('userId'); const answerJobRepository = new AnswerJobRepository({ @@ -35,11 +31,6 @@ describe('Evaluation | Unit | Infrastructure | Repositories | AnswerJobRepositor it('should do nothing if quests are in sync mode', async function () { // given const profileRewardTemporaryStorageStub = { increment: sinon.stub() }; - const knexStub = { batchInsert: sinon.stub().resolves([]) }; - sinon.stub(DomainTransaction, 'getConnection').returns(knexStub); - sinon.stub(DomainTransaction, 'execute').callsFake((callback) => { - return callback(); - }); config.featureToggles.isAsyncQuestRewardingCalculationEnabled = false; const userId = Symbol('userId'); const answerJobRepository = new AnswerJobRepository({ @@ -56,11 +47,6 @@ describe('Evaluation | Unit | Infrastructure | Repositories | AnswerJobRepositor it("should increment user's jobs count in temporary storage", async function () { // given const profileRewardTemporaryStorageStub = { increment: sinon.stub() }; - const knexStub = { batchInsert: sinon.stub().resolves([]) }; - sinon.stub(DomainTransaction, 'getConnection').returns(knexStub); - sinon.stub(DomainTransaction, 'execute').callsFake((callback) => { - return callback(); - }); const userId = Symbol('userId'); const answerJobRepository = new AnswerJobRepository({ dependencies: { profileRewardTemporaryStorage: profileRewardTemporaryStorageStub }, @@ -72,47 +58,5 @@ describe('Evaluation | Unit | Infrastructure | Repositories | AnswerJobRepositor // then expect(profileRewardTemporaryStorageStub.increment).to.have.been.calledWith(userId); }); - - describe('should use transaction in all cases', function () { - it('should use existing transaction', async function () { - // given - const profileRewardTemporaryStorageStub = { increment: sinon.stub() }; - const knexStub = { batchInsert: sinon.stub().resolves([]), isTransaction: true }; - sinon.stub(DomainTransaction, 'getConnection').returns(knexStub); - sinon.stub(DomainTransaction, 'execute').callsFake((callback) => { - return callback(); - }); - const userId = Symbol('userId'); - const answerJobRepository = new AnswerJobRepository({ - dependencies: { profileRewardTemporaryStorage: profileRewardTemporaryStorageStub }, - }); - - // when - await answerJobRepository.performAsync({ userId }); - - // then - expect(DomainTransaction.execute).to.have.not.been.called; - }); - - it('should create new transaction', async function () { - // given - const profileRewardTemporaryStorageStub = { increment: sinon.stub() }; - const knexStub = { batchInsert: sinon.stub().resolves([]), isTransaction: false }; - sinon.stub(DomainTransaction, 'getConnection').returns(knexStub); - sinon.stub(DomainTransaction, 'execute').callsFake((callback) => { - return callback(); - }); - const userId = Symbol('userId'); - const answerJobRepository = new AnswerJobRepository({ - dependencies: { profileRewardTemporaryStorage: profileRewardTemporaryStorageStub }, - }); - - // when - await answerJobRepository.performAsync({ userId }); - - // then - expect(DomainTransaction.execute).to.have.been.called; - }); - }); }); }); diff --git a/api/tests/identity-access-management/integration/infrastructure/repositories/jobs/gar-anonymized-batch-events-logging-job.repository.test.js b/api/tests/identity-access-management/integration/infrastructure/repositories/jobs/gar-anonymized-batch-events-logging-job.repository.test.js index 3e084c0e37a..c6bbc5155f1 100644 --- a/api/tests/identity-access-management/integration/infrastructure/repositories/jobs/gar-anonymized-batch-events-logging-job.repository.test.js +++ b/api/tests/identity-access-management/integration/infrastructure/repositories/jobs/gar-anonymized-batch-events-logging-job.repository.test.js @@ -32,9 +32,9 @@ describe('Integration | Prescription | Application | Jobs | garAnonymizedBatchEv // then await expect(GarAnonymizedBatchEventsLoggingJob.name).to.have.been.performed.withJob({ - retrylimit: 10, - retrydelay: 30, - retrybackoff: true, + retryLimit: 10, + retryDelay: 30, + retryBackoff: true, data: { userIds: [13, 42], updatedByUserId: 777, diff --git a/api/tests/integration/infrastructure/repositories/jobs/certification-completed-job-repository_test.js b/api/tests/integration/infrastructure/repositories/jobs/certification-completed-job-repository_test.js index d15038798b4..5353b65bb42 100644 --- a/api/tests/integration/infrastructure/repositories/jobs/certification-completed-job-repository_test.js +++ b/api/tests/integration/infrastructure/repositories/jobs/certification-completed-job-repository_test.js @@ -20,9 +20,9 @@ describe('Integration | Repository | Jobs | CertificationCompletedJobRepository' // then await expect(CertificationCompletedJob.name).to.have.been.performed.withJob({ - retrylimit: 10, - retrydelay: 30, - retrybackoff: true, + retryLimit: 10, + retryDelay: 30, + retryBackoff: true, priority: JobPriority.HIGH, data, }); diff --git a/api/tests/integration/scripts/certification/rescore-certifications_test.js b/api/tests/integration/scripts/certification/rescore-certifications_test.js index e49533d1bbb..7d78100d11e 100644 --- a/api/tests/integration/scripts/certification/rescore-certifications_test.js +++ b/api/tests/integration/scripts/certification/rescore-certifications_test.js @@ -1,5 +1,5 @@ import { RescoreCertificationScript } from '../../../../scripts/certification/rescore-certifications.js'; -import { createTempFile, expect, knex, sinon } from '../../../test-helper.js'; +import { createTempFile, expect, sinon } from '../../../test-helper.js'; describe('Integration | Scripts | Certification | rescore-certfication', function () { it('should parse input file', async function () { @@ -28,17 +28,9 @@ describe('Integration | Scripts | Certification | rescore-certfication', functio await script.handle({ logger, options: { file } }); // then - const [job1, job2] = await knex('pgboss.job') - .where({ name: 'CertificationRescoringByScriptJob' }) - .orderBy('createdon', 'asc'); - - expect([job1.data, job2.data]).to.have.deep.members([ - { - certificationCourseId: 1, - }, - { - certificationCourseId: 2, - }, + await expect('CertificationRescoringByScriptJob').to.have.been.performed.withJobPayloads([ + { certificationCourseId: 1 }, + { certificationCourseId: 2 }, ]); }); }); diff --git a/api/tests/integration/tooling/jobs/expect-job.test.js b/api/tests/integration/tooling/jobs/expect-job.test.js index 417c359318e..f362227e719 100644 --- a/api/tests/integration/tooling/jobs/expect-job.test.js +++ b/api/tests/integration/tooling/jobs/expect-job.test.js @@ -50,10 +50,10 @@ describe('Integration | Tooling | Expect Job', function () { await expect('JobTest').to.have.been.performed.withJob({ name: 'JobTest', data: { foo: 'bar' }, - retrylimit: job.retry.retryLimit, - retrydelay: job.retry.retryDelay, - retrybackoff: job.retry.retryBackoff, - expirein: job.expireIn, + retryLimit: job.retry.retryLimit, + retryDelay: job.retry.retryDelay, + retryBackoff: job.retry.retryBackoff, + expireIn: job.expireIn, }); }); @@ -208,6 +208,12 @@ describe('Integration | Tooling | Expect Job', function () { data: { my_data: 'awesome_data' }, options: { tz: 'Europe/Paris' }, }); + await jobQueue.scheduleCronJob({ + name: 'otherJob', + cron: '*/5 * * * *', + data: { my_data: 'awesome_data' }, + options: { tz: 'Europe/Paris' }, + }); // then await expect(jobName).to.have.been.schedule.withCronJobsCount(1); diff --git a/api/tests/learning-content/integration/infrastructure/repositories/jobs/lcms-create-release-job-repository_test.js b/api/tests/learning-content/integration/infrastructure/repositories/jobs/lcms-create-release-job-repository_test.js index 1dd20c39957..d7f1c4d8323 100644 --- a/api/tests/learning-content/integration/infrastructure/repositories/jobs/lcms-create-release-job-repository_test.js +++ b/api/tests/learning-content/integration/infrastructure/repositories/jobs/lcms-create-release-job-repository_test.js @@ -10,9 +10,9 @@ describe('Learning Content | Integration | Repository | Jobs | LcmsCreateRelease // then await expect(LcmsCreateReleaseJob.name).to.have.been.performed.withJob({ - retrylimit: 0, - retrydelay: 0, - retrybackoff: false, + retryLimit: 0, + retryDelay: 0, + retryBackoff: false, data: { userId: 123 }, }); }); diff --git a/api/tests/learning-content/integration/infrastructure/repositories/jobs/lcms-refresh-cache-job-repository_test.js b/api/tests/learning-content/integration/infrastructure/repositories/jobs/lcms-refresh-cache-job-repository_test.js index 9c92667cea7..bb62a6aeca3 100644 --- a/api/tests/learning-content/integration/infrastructure/repositories/jobs/lcms-refresh-cache-job-repository_test.js +++ b/api/tests/learning-content/integration/infrastructure/repositories/jobs/lcms-refresh-cache-job-repository_test.js @@ -10,9 +10,9 @@ describe('Learning Content | Integration | Repository | Jobs | LcmsRefreshCacheJ // then await expect(LcmsRefreshCacheJob.name).to.have.been.performed.withJob({ - retrylimit: 0, - retrydelay: 0, - retrybackoff: false, + retryLimit: 0, + retryDelay: 0, + retryBackoff: false, data: { userId: 123 }, }); }); diff --git a/api/tests/prescription/campaign-participation/integration/infrastructure/repositories/jobs/participation-completed-job-repository_test.js b/api/tests/prescription/campaign-participation/integration/infrastructure/repositories/jobs/participation-completed-job-repository_test.js index 99cb05c08c3..3c482f15fb9 100644 --- a/api/tests/prescription/campaign-participation/integration/infrastructure/repositories/jobs/participation-completed-job-repository_test.js +++ b/api/tests/prescription/campaign-participation/integration/infrastructure/repositories/jobs/participation-completed-job-repository_test.js @@ -13,9 +13,9 @@ describe('Integration | Prescription | Infrastructure | Repository | Jobs | part // then await expect(ParticipationCompletedJob.name).to.have.been.performed.withJob({ - retrylimit: 0, - retrydelay: 0, - retrybackoff: false, + retryLimit: 0, + retryDelay: 0, + retryBackoff: false, data, }); }); diff --git a/api/tests/prescription/campaign-participation/integration/infrastructure/repositories/jobs/participation-result-calculation-job-repository_test.js b/api/tests/prescription/campaign-participation/integration/infrastructure/repositories/jobs/participation-result-calculation-job-repository_test.js index 630ad5c2db7..607c43029ae 100644 --- a/api/tests/prescription/campaign-participation/integration/infrastructure/repositories/jobs/participation-result-calculation-job-repository_test.js +++ b/api/tests/prescription/campaign-participation/integration/infrastructure/repositories/jobs/participation-result-calculation-job-repository_test.js @@ -12,9 +12,9 @@ describe('Integration | Prescription | Infrastructure | Repository | Jobs | part // then await expect(ParticipationResultCalculationJob.name).to.have.been.performed.withJob({ name: ParticipationResultCalculationJob.name, - retrylimit: JobRetry.FEW_RETRY.retryLimit, - retrydelay: JobRetry.FEW_RETRY.retryDelay, - retrybackoff: JobRetry.FEW_RETRY.retryBackoff, + retryLimit: JobRetry.FEW_RETRY.retryLimit, + retryDelay: JobRetry.FEW_RETRY.retryDelay, + retryBackoff: JobRetry.FEW_RETRY.retryBackoff, data: { campaignParticipationId: 3 }, }); }); diff --git a/api/tests/prescription/campaign-participation/integration/infrastructure/repositories/jobs/participation-shared-job-repository_test.js b/api/tests/prescription/campaign-participation/integration/infrastructure/repositories/jobs/participation-shared-job-repository_test.js index e33e81bbb8e..9fb4306c090 100644 --- a/api/tests/prescription/campaign-participation/integration/infrastructure/repositories/jobs/participation-shared-job-repository_test.js +++ b/api/tests/prescription/campaign-participation/integration/infrastructure/repositories/jobs/participation-shared-job-repository_test.js @@ -12,9 +12,9 @@ describe('Integration | Prescription | Infrastructure | Repository | Jobs | part await expect(ParticipationSharedJob.name).to.have.been.performed.withJob({ name: ParticipationSharedJob.name, - retrylimit: 0, - retrydelay: 0, - retrybackoff: false, + retryLimit: 0, + retryDelay: 0, + retryBackoff: false, data: { campaignParticipationId: 2 }, }); }); diff --git a/api/tests/prescription/campaign-participation/integration/infrastructure/repositories/jobs/participation-started-job-repository_test.js b/api/tests/prescription/campaign-participation/integration/infrastructure/repositories/jobs/participation-started-job-repository_test.js index 88b1d693783..87c2d26c869 100644 --- a/api/tests/prescription/campaign-participation/integration/infrastructure/repositories/jobs/participation-started-job-repository_test.js +++ b/api/tests/prescription/campaign-participation/integration/infrastructure/repositories/jobs/participation-started-job-repository_test.js @@ -14,9 +14,9 @@ describe('Integration | Prescription | Infrastructure | Repository | Jobs | part // then await expect(ParticipationStartedJob.name).to.have.been.performed.withJob({ - retrylimit: 0, - retrydelay: 0, - retrybackoff: false, + retryLimit: 0, + retryDelay: 0, + retryBackoff: false, data: { campaignParticipationId: 777, }, diff --git a/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/compute-certificability-job-repository_test.js b/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/compute-certificability-job-repository_test.js index 4e9b4431488..00c8ed750ae 100644 --- a/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/compute-certificability-job-repository_test.js +++ b/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/compute-certificability-job-repository_test.js @@ -10,9 +10,9 @@ describe('Integration | Prescription | Application | Jobs | computeCertificabili // then await expect(ComputeCertificabilityJob.name).to.have.been.performed.withJob({ - retrylimit: 0, - retrydelay: 0, - retrybackoff: false, + retryLimit: 0, + retryDelay: 0, + retryBackoff: false, data: { organizationLearnerId: 4123132 }, }); }); diff --git a/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/import-common-organization-learners-job-repository_test.js b/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/import-common-organization-learners-job-repository_test.js index f2d65672bbf..694242b13e9 100644 --- a/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/import-common-organization-learners-job-repository_test.js +++ b/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/import-common-organization-learners-job-repository_test.js @@ -16,10 +16,10 @@ describe('Integration | Prescription | Infrastructure | Repository | Jobs | impo // then await expect(ImportCommonOrganizationLearnersJob.name).to.have.have.been.performed.withJob({ - expirein: JobExpireIn.HIGH, - retrylimit: JobRetry.FEW_RETRY.retryLimit, - retrydelay: JobRetry.FEW_RETRY.retryDelay, - retrybackoff: JobRetry.FEW_RETRY.retryBackoff, + expireIn: JobExpireIn.HIGH, + retryLimit: JobRetry.FEW_RETRY.retryLimit, + retryDelay: JobRetry.FEW_RETRY.retryDelay, + retryBackoff: JobRetry.FEW_RETRY.retryBackoff, data: { organizationImportId: 4123132 }, }); }); diff --git a/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/import-organization-learners-job-repository_test.js b/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/import-organization-learners-job-repository_test.js index 93f4c3ad280..60f4f0d604f 100644 --- a/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/import-organization-learners-job-repository_test.js +++ b/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/import-organization-learners-job-repository_test.js @@ -16,10 +16,10 @@ describe('Integration | Prescription | Infrastructure | Repository | Jobs | impo // then await expect(ImportOrganizationLearnersJob.name).to.have.have.been.performed.withJob({ - expirein: JobExpireIn.HIGH, - retrylimit: JobRetry.FEW_RETRY.retryLimit, - retrydelay: JobRetry.FEW_RETRY.retryDelay, - retrybackoff: JobRetry.FEW_RETRY.retryBackoff, + expireIn: JobExpireIn.HIGH, + retryLimit: JobRetry.FEW_RETRY.retryLimit, + retryDelay: JobRetry.FEW_RETRY.retryDelay, + retryBackoff: JobRetry.FEW_RETRY.retryBackoff, data: { organizationImportId: 4123132 }, }); }); diff --git a/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/import-sco-csv-organization-learners-job-repository_test.js b/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/import-sco-csv-organization-learners-job-repository_test.js index ce0d7c39f3c..bc258a6d4e6 100644 --- a/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/import-sco-csv-organization-learners-job-repository_test.js +++ b/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/import-sco-csv-organization-learners-job-repository_test.js @@ -16,10 +16,10 @@ describe('Integration | Prescription | Infrastructure | Repository | Jobs | impo // then await expect(ImportScoCsvOrganizationLearnersJob.name).to.have.have.been.performed.withJob({ - expirein: JobExpireIn.HIGH, - retrylimit: JobRetry.FEW_RETRY.retryLimit, - retrydelay: JobRetry.FEW_RETRY.retryDelay, - retrybackoff: JobRetry.FEW_RETRY.retryBackoff, + expireIn: JobExpireIn.HIGH, + retryLimit: JobRetry.FEW_RETRY.retryLimit, + retryDelay: JobRetry.FEW_RETRY.retryDelay, + retryBackoff: JobRetry.FEW_RETRY.retryBackoff, data: { organizationImportId: 4123132, locale: 'fr' }, }); }); diff --git a/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/import-sup-organization-learners-job-repository_test.js b/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/import-sup-organization-learners-job-repository_test.js index 2f47219ec32..f416fd73cce 100644 --- a/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/import-sup-organization-learners-job-repository_test.js +++ b/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/import-sup-organization-learners-job-repository_test.js @@ -16,10 +16,10 @@ describe('Integration | Prescription | Infrastructure | Repository | Jobs | impo // then await expect(ImportSupOrganizationLearnersJob.name).to.have.have.been.performed.withJob({ - expirein: JobExpireIn.HIGH, - retrylimit: JobRetry.FEW_RETRY.retryLimit, - retrydelay: JobRetry.FEW_RETRY.retryDelay, - retrybackoff: JobRetry.FEW_RETRY.retryBackoff, + expireIn: JobExpireIn.HIGH, + retryLimit: JobRetry.FEW_RETRY.retryLimit, + retryDelay: JobRetry.FEW_RETRY.retryDelay, + retryBackoff: JobRetry.FEW_RETRY.retryBackoff, data: { organizationImportId: 4123132, type: 'REPLACE', locale: 'fr' }, }); }); diff --git a/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/validate-common-organization-learners-import-file-job-repository_test.js b/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/validate-common-organization-learners-import-file-job-repository_test.js index ad0a8d0b314..edebad96543 100644 --- a/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/validate-common-organization-learners-import-file-job-repository_test.js +++ b/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/validate-common-organization-learners-import-file-job-repository_test.js @@ -16,10 +16,10 @@ describe('Integration | Prescription | Infrastructure | Repository | Jobs | vali // then await expect(ValidateCommonOrganizationImportFileJob.name).to.have.been.performed.withJob({ - expirein: JobExpireIn.HIGH, - retrylimit: JobRetry.FEW_RETRY.retryLimit, - retrydelay: JobRetry.FEW_RETRY.retryDelay, - retrybackoff: JobRetry.FEW_RETRY.retryBackoff, + expireIn: JobExpireIn.HIGH, + retryLimit: JobRetry.FEW_RETRY.retryLimit, + retryDelay: JobRetry.FEW_RETRY.retryDelay, + retryBackoff: JobRetry.FEW_RETRY.retryBackoff, data: { organizationImportId: 4123132 }, }); }); diff --git a/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/validate-csv-organization-learners-import-file-job-repository_test.js b/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/validate-csv-organization-learners-import-file-job-repository_test.js index 5f32d2d1ec0..4e916f294fa 100644 --- a/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/validate-csv-organization-learners-import-file-job-repository_test.js +++ b/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/validate-csv-organization-learners-import-file-job-repository_test.js @@ -16,10 +16,10 @@ describe('Integration | Prescription | Infrastructure | Repository | Jobs | vali // then await expect(ValidateCsvOrganizationImportFileJob.name).to.have.been.performed.withJob({ - expirein: JobExpireIn.HIGH, - retrylimit: JobRetry.FEW_RETRY.retryLimit, - retrydelay: JobRetry.FEW_RETRY.retryDelay, - retrybackoff: JobRetry.FEW_RETRY.retryBackoff, + expireIn: JobExpireIn.HIGH, + retryLimit: JobRetry.FEW_RETRY.retryLimit, + retryDelay: JobRetry.FEW_RETRY.retryDelay, + retryBackoff: JobRetry.FEW_RETRY.retryBackoff, data: { organizationImportId: 4123132, type: 'REPLACE', locale: 'fr' }, }); }); diff --git a/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/validate-organization-learners-import-file-job-repository_test.js b/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/validate-organization-learners-import-file-job-repository_test.js index 9d67cad887c..05ba3af1804 100644 --- a/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/validate-organization-learners-import-file-job-repository_test.js +++ b/api/tests/prescription/learner-management/integration/infrastructure/repositories/jobs/validate-organization-learners-import-file-job-repository_test.js @@ -16,10 +16,10 @@ describe('Integration | Prescription | Infrastructure | Repository | Jobs | vali // then await expect(ValidateOrganizationImportFileJob.name).to.have.been.performed.withJob({ - expirein: JobExpireIn.HIGH, - retrylimit: JobRetry.FEW_RETRY.retryLimit, - retrydelay: JobRetry.FEW_RETRY.retryDelay, - retrybackoff: JobRetry.FEW_RETRY.retryBackoff, + expireIn: JobExpireIn.HIGH, + retryLimit: JobRetry.FEW_RETRY.retryLimit, + retryDelay: JobRetry.FEW_RETRY.retryDelay, + retryBackoff: JobRetry.FEW_RETRY.retryBackoff, data: { organizationImportId: 4123132 }, }); }); diff --git a/api/tests/shared/integration/infrastructure/repositories/jobs/job-repository_test.js b/api/tests/shared/integration/infrastructure/repositories/jobs/job-repository_test.js index 6b7a8bdd5fa..9671d184aac 100644 --- a/api/tests/shared/integration/infrastructure/repositories/jobs/job-repository_test.js +++ b/api/tests/shared/integration/infrastructure/repositories/jobs/job-repository_test.js @@ -23,11 +23,11 @@ describe('Integration | Infrastructure | Repositories | Jobs | job-repository', await expect(name).to.have.been.performed.withJob({ name, data: expectedParams, - expirein: '00:15:00', + expireIn: 900, priority, - retrydelay: 30, - retrylimit: 10, - retrybackoff: true, + retryDelay: 30, + retryLimit: 10, + retryBackoff: true, }); }); diff --git a/api/tests/test-helper.js b/api/tests/test-helper.js index 7f58a1be7cb..57f27dfbd0c 100644 --- a/api/tests/test-helper.js +++ b/api/tests/test-helper.js @@ -32,6 +32,7 @@ import * as areaRepository from '../src/shared/infrastructure/repositories/area- import * as challengeRepository from '../src/shared/infrastructure/repositories/challenge-repository.js'; import * as competenceRepository from '../src/shared/infrastructure/repositories/competence-repository.js'; import * as courseRepository from '../src/shared/infrastructure/repositories/course-repository.js'; +import { pgBoss } from '../src/shared/infrastructure/repositories/jobs/pg-boss.js'; import * as skillRepository from '../src/shared/infrastructure/repositories/skill-repository.js'; import * as tubeRepository from '../src/shared/infrastructure/repositories/tube-repository.js'; import * as customChaiHelpers from './tooling/chai-custom-helpers/index.js'; @@ -54,7 +55,12 @@ chaiUse(sinonChai); _.each(customChaiHelpers, chaiUse); -chaiUse(jobChai(knex)); +chaiUse(jobChai(pgBoss)); +try { + await pgBoss.start(); +} catch { + // pgBoss is not available on unit tests +} const { apimRegisterApplicationsCredentials, jwtConfig } = config; @@ -97,6 +103,11 @@ afterEach(async function () { missionRepository.clearCache(); await featureToggles.resetDefaults(); await datamartBuilder.clean(); + try { + await pgBoss.clearStorage(); + } catch { + // pgBoss is not available on unit tests + } return databaseBuilder.clean(); }); diff --git a/api/tests/tooling/jobs/expect-job.js b/api/tests/tooling/jobs/expect-job.js index 9b9d8cbd222..0067a59cf15 100644 --- a/api/tests/tooling/jobs/expect-job.js +++ b/api/tests/tooling/jobs/expect-job.js @@ -1,6 +1,6 @@ import { assert, Assertion } from 'chai'; -export const jobChai = (knex) => (_chai, utils) => { +export const jobChai = (pgBoss) => (_chai, utils) => { utils.addProperty(Assertion.prototype, 'performed', function () { return this; }); @@ -11,50 +11,68 @@ export const jobChai = (knex) => (_chai, utils) => { Assertion.addMethod('withJobsCount', async function (expectedCount) { const jobName = this._obj; - const jobs = await knex('pgboss.job').where({ name: jobName }); + const jobs = await pgBoss.fetch(jobName, expectedCount + 1, { includeMetadata: true }); + const actualCount = jobs?.length ?? 0; + assert.strictEqual( - jobs.length, + actualCount, expectedCount, - `expected ${jobName} to have been performed ${expectedCount} times, but it was performed ${jobs.length} times`, + `expected ${jobName} to have been performed ${expectedCount} times, but it was performed ${actualCount} times`, + ); + return (jobs ?? []).map( + ({ id, name, data, retrylimit, retrydelay, retrybackoff, expire_in_seconds, priority }) => ({ + id, + name, + data, + retryLimit: retrylimit, + retryDelay: retrydelay, + retryBackoff: retrybackoff, + expireIn: Math.round(expire_in_seconds), + priority, + }), ); }); Assertion.addMethod('withJob', async function (jobData) { - await this.withJobsCount(1); + const jobs = await this.withJobsCount(1); const jobName = this._obj; - const jobs = await knex('pgboss.job').select(knex.raw(`*, expirein::varchar`)).where({ name: jobName }); - assert.deepInclude(jobs[0], jobData, `Job '${jobName}' was performed with a different payload`); + assert.deepOwnInclude( + jobs[0], + jobData, + `Job '${jobName}' was performed with a different payload (${JSON.stringify(jobData)} was expected but performed with ${JSON.stringify(jobs[0])})`, + ); }); Assertion.addMethod('withCronJobsCount', async function (expectedCount) { const jobName = this._obj; - const jobs = await knex('pgboss.schedule').where({ name: jobName }); + const allJobs = (await pgBoss.getSchedules()) ?? []; + const jobs = allJobs.filter(({ name }) => name === jobName); assert.strictEqual( jobs.length, expectedCount, `expected ${jobName} to have been performed ${expectedCount} times, but it was performed ${jobs.length} times`, ); + return jobs; }); Assertion.addMethod('withCronJob', async function (jobData) { - await this.withCronJobsCount(1); + const jobs = await this.withCronJobsCount(1); const jobName = this._obj; - const job = await knex('pgboss.schedule') - .select('name', 'cron', 'data', 'options') - .where({ name: jobName }) - .first(); - assert.deepInclude(job, jobData, `Job '${jobName}' was schedule with a different payload`); + assert.deepOwnInclude( + jobs[0], + jobData, + `Job '${jobName}' was schedule with a different payload (${JSON.stringify(jobData)} was expected but performed with ${JSON.stringify(jobs[0])})`, + ); }); Assertion.addMethod('withJobPayloads', async function (payloads) { - await this.withJobsCount(payloads.length); + const jobs = await this.withJobsCount(payloads.length); const jobName = this._obj; - const jobs = await knex('pgboss.job').where({ name: jobName }); const actualPayloads = jobs.map((job) => job.data); - assert.deepEqual(actualPayloads, payloads, `Job '${jobName}' was performed with a different payload`); + assert.sameDeepMembers(actualPayloads, payloads, `Job '${jobName}' was performed with a different payload`); }); Assertion.addMethod('withJobPayload', async function (payload) { diff --git a/api/tests/unit/tooling/database-builder/database-helpers_test.js b/api/tests/unit/tooling/database-builder/database-helpers_test.js index 244f8f92056..47e138a63c7 100644 --- a/api/tests/unit/tooling/database-builder/database-helpers_test.js +++ b/api/tests/unit/tooling/database-builder/database-helpers_test.js @@ -12,11 +12,6 @@ describe('Unit | Tooling | DatabaseBuilder | database-helpers', function () { insertSqlQuery: '/* path: */ insert into "users" ("cgu", "createdAt", "email", "emailConfirmedAt", "firstName", "hasBeenAnonymised", "hasBeenAnonymisedBy", "hasSeenAssessmentInstructions", "hasSeenFocusedChallengeTooltip", "hasSeenNewDashboardInfo", "hasSeenOtherChallengesTooltip", "id", "isAnonymous", "lang", "lastDataProtectionPolicySeenAt", "lastName", "lastPixCertifTermsOfServiceValidatedAt", "lastPixOrgaTermsOfServiceValidatedAt", "lastTermsOfServiceValidatedAt", "locale", "mustValidateTermsOfService", "pixCertifTermsOfServiceAccepted", "pixOrgaTermsOfServiceAccepted", "updatedAt", "username") values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, DEFAULT, $21, $22, $23, $24, $25)', }, - { - expectedTableName: 'pgboss.job', - insertSqlQuery: - '/* path: /api/campaign-participations/{campaignParticipationId} */ insert into "pgboss"."job" ("data", "expirein", "name", "on_complete", "retrybackoff", "retrydelay", "retrylimit") values ($1, $2, $3, $4, $5, $6, $7)', - }, { expectedTableName: 'user-logins', insertSqlQuery: diff --git a/api/worker.js b/api/worker.js index b2aef66a2e7..327d836db6c 100644 --- a/api/worker.js +++ b/api/worker.js @@ -5,13 +5,13 @@ import { fileURLToPath } from 'node:url'; import { glob } from 'glob'; import _ from 'lodash'; -import PgBoss from 'pg-boss'; import { Metrics } from './src/monitoring/infrastructure/metrics.js'; import { JobGroup } from './src/shared/application/jobs/job-controller.js'; import { config } from './src/shared/config.js'; import { JobQueue } from './src/shared/infrastructure/jobs/JobQueue.js'; import { quitAllStorages } from './src/shared/infrastructure/key-value-storages/index.js'; +import { pgBoss } from './src/shared/infrastructure/repositories/jobs/pg-boss.js'; import { importNamedExportFromFile } from './src/shared/infrastructure/utils/import-named-exports-from-directory.js'; import { logger } from './src/shared/infrastructure/utils/logger.js'; @@ -23,13 +23,7 @@ const metrics = new Metrics({ config }); async function startPgBoss() { logger.info('Starting pg-boss'); - const monitorStateIntervalSeconds = config.pgBoss.monitorStateIntervalSeconds; - const pgBoss = new PgBoss({ - connectionString: process.env.DATABASE_URL, - max: config.pgBoss.connexionPoolMaxSize, - ...(monitorStateIntervalSeconds ? { monitorStateIntervalSeconds } : {}), - archiveFailedAfterSeconds: config.pgBoss.archiveFailedAfterSeconds, - }); + pgBoss.on('monitor-states', (state) => { logger.info({ event: 'pg-boss-state', name: 'global' }, { ...state, queues: undefined }); _.each(state.queues, (queueState, queueName) => {