From a1cafa5e310955cac89898fd383d20784511cfa1 Mon Sep 17 00:00:00 2001 From: Felix Hennig Date: Mon, 17 Feb 2025 16:18:55 +0100 Subject: [PATCH 1/4] Reapply "Make pipeline version per Organism" (#3702) This reverts commit 0f24409b39e0275b8735ca9ea58e67cc4afd7051. --- backend/docs/db/schema.sql | 86 ++++++------- .../backend/config/BackendSpringConfig.kt | 17 ++- .../controller/SubmissionController.kt | 2 +- .../debug/DeleteSequenceDataService.kt | 12 +- .../CurrentProcessingPipelineTable.kt | 42 +++++++ .../submission/SequenceEntriesTable.kt | 8 ++ .../submission/SubmissionDatabaseService.kt | 93 ++++++++++---- .../UseNewerProcessingPipelineVersionTask.kt | 7 +- .../V1.10__pipeline_version_per_organism.sql | 116 ++++++++++++++++++ .../controller/EndpointTestExtension.kt | 8 +- .../DeleteAllSequenceDataEndpointTest.kt | 4 +- .../submission/GetReleasedDataEndpointTest.kt | 28 +++-- ...eNewerProcessingPipelineVersionTaskTest.kt | 67 +++++++++- docs/astro.config.mjs | 12 +- .../build-new-preprocessing-pipeline.md | 2 +- .../existing-preprocessing-pipelines.md | 8 +- .../for-administrators/pipeline-concept.md | 28 +++++ kubernetes/loculus/values.yaml | 20 +-- 18 files changed, 435 insertions(+), 125 deletions(-) create mode 100644 backend/src/main/resources/db/migration/V1.10__pipeline_version_per_organism.sql create mode 100644 docs/src/content/docs/for-administrators/pipeline-concept.md diff --git a/backend/docs/db/schema.sql b/backend/docs/db/schema.sql index 6dd23e97bd..27bb1eb6fa 100644 --- a/backend/docs/db/schema.sql +++ b/backend/docs/db/schema.sql @@ -2,8 +2,8 @@ -- PostgreSQL database dump -- --- Dumped from database version 15.11 (Debian 15.11-1.pgdg120+1) --- Dumped by pg_dump version 16.7 (Debian 16.7-1.pgdg120+1) +-- Dumped from database version 15.10 (Debian 15.10-1.pgdg120+1) +-- Dumped by pg_dump version 16.6 (Debian 16.6-1.pgdg120+1) SET statement_timeout = 0; SET lock_timeout = 0; @@ -171,7 +171,8 @@ ALTER SEQUENCE public.audit_log_id_seq OWNED BY public.audit_log.id; CREATE TABLE public.current_processing_pipeline ( version bigint NOT NULL, - started_using_at timestamp without time zone NOT NULL + started_using_at timestamp without time zone NOT NULL, + organism text NOT NULL ); @@ -192,6 +193,28 @@ CREATE TABLE public.data_use_terms_table ( ALTER TABLE public.data_use_terms_table OWNER TO postgres; +-- +-- Name: sequence_entries; Type: TABLE; Schema: public; Owner: postgres +-- + +CREATE TABLE public.sequence_entries ( + accession text NOT NULL, + version bigint NOT NULL, + organism text NOT NULL, + submission_id text NOT NULL, + submitter text NOT NULL, + approver text, + group_id integer NOT NULL, + submitted_at timestamp without time zone NOT NULL, + released_at timestamp without time zone, + is_revocation boolean DEFAULT false NOT NULL, + original_data jsonb, + version_comment text +); + + +ALTER TABLE public.sequence_entries OWNER TO postgres; + -- -- Name: sequence_entries_preprocessed_data; Type: TABLE; Schema: public; Owner: postgres -- @@ -223,18 +246,18 @@ CREATE VIEW public.external_metadata_view AS WHEN (all_external_metadata.external_metadata IS NULL) THEN jsonb_build_object('metadata', (cpd.processed_data -> 'metadata'::text)) ELSE jsonb_build_object('metadata', ((cpd.processed_data -> 'metadata'::text) || all_external_metadata.external_metadata)) END AS joint_metadata - FROM (( SELECT sequence_entries_preprocessed_data.accession, - sequence_entries_preprocessed_data.version, - sequence_entries_preprocessed_data.pipeline_version, - sequence_entries_preprocessed_data.processed_data, - sequence_entries_preprocessed_data.errors, - sequence_entries_preprocessed_data.warnings, - sequence_entries_preprocessed_data.processing_status, - sequence_entries_preprocessed_data.started_processing_at, - sequence_entries_preprocessed_data.finished_processing_at - FROM public.sequence_entries_preprocessed_data - WHERE (sequence_entries_preprocessed_data.pipeline_version = ( SELECT current_processing_pipeline.version - FROM public.current_processing_pipeline))) cpd + FROM (( SELECT sepd.accession, + sepd.version, + sepd.pipeline_version, + sepd.processed_data, + sepd.errors, + sepd.warnings, + sepd.processing_status, + sepd.started_processing_at, + sepd.finished_processing_at + FROM ((public.sequence_entries_preprocessed_data sepd + JOIN public.sequence_entries se ON (((sepd.accession = se.accession) AND (sepd.version = se.version)))) + JOIN public.current_processing_pipeline cpp ON (((se.organism = cpp.organism) AND (sepd.pipeline_version = cpp.version))))) cpd LEFT JOIN public.all_external_metadata ON (((all_external_metadata.accession = cpd.accession) AND (all_external_metadata.version = cpd.version)))); @@ -421,28 +444,6 @@ CREATE TABLE public.seqsets ( ALTER TABLE public.seqsets OWNER TO postgres; --- --- Name: sequence_entries; Type: TABLE; Schema: public; Owner: postgres --- - -CREATE TABLE public.sequence_entries ( - accession text NOT NULL, - version bigint NOT NULL, - organism text NOT NULL, - submission_id text NOT NULL, - submitter text NOT NULL, - approver text, - group_id integer NOT NULL, - submitted_at timestamp without time zone NOT NULL, - released_at timestamp without time zone, - is_revocation boolean DEFAULT false NOT NULL, - original_data jsonb, - version_comment text -); - - -ALTER TABLE public.sequence_entries OWNER TO postgres; - -- -- Name: sequence_entries_view; Type: VIEW; Schema: public; Owner: postgres -- @@ -466,7 +467,8 @@ CREATE VIEW public.sequence_entries_view AS (sepd.processed_data || em.joint_metadata) AS joint_metadata, CASE WHEN se.is_revocation THEN ( SELECT current_processing_pipeline.version - FROM public.current_processing_pipeline) + FROM public.current_processing_pipeline + WHERE (current_processing_pipeline.organism = se.organism)) ELSE sepd.pipeline_version END AS pipeline_version, sepd.errors, @@ -484,9 +486,9 @@ CREATE VIEW public.sequence_entries_view AS WHEN ((sepd.warnings IS NOT NULL) AND (jsonb_array_length(sepd.warnings) > 0)) THEN 'HAS_WARNINGS'::text ELSE 'NO_ISSUES'::text END AS processing_result - FROM ((public.sequence_entries se - LEFT JOIN public.sequence_entries_preprocessed_data sepd ON (((se.accession = sepd.accession) AND (se.version = sepd.version) AND (sepd.pipeline_version = ( SELECT current_processing_pipeline.version - FROM public.current_processing_pipeline))))) + FROM (((public.sequence_entries se + LEFT JOIN public.sequence_entries_preprocessed_data sepd ON (((se.accession = sepd.accession) AND (se.version = sepd.version)))) + LEFT JOIN public.current_processing_pipeline ccp ON (((se.organism = ccp.organism) AND (sepd.pipeline_version = ccp.version)))) LEFT JOIN public.external_metadata_view em ON (((se.accession = em.accession) AND (se.version = em.version)))); @@ -601,7 +603,7 @@ ALTER TABLE ONLY public.audit_log -- ALTER TABLE ONLY public.current_processing_pipeline - ADD CONSTRAINT current_processing_pipeline_pkey PRIMARY KEY (version); + ADD CONSTRAINT current_processing_pipeline_pkey PRIMARY KEY (organism); -- diff --git a/backend/src/main/kotlin/org/loculus/backend/config/BackendSpringConfig.kt b/backend/src/main/kotlin/org/loculus/backend/config/BackendSpringConfig.kt index 6439ed30e4..a6c18cfd34 100644 --- a/backend/src/main/kotlin/org/loculus/backend/config/BackendSpringConfig.kt +++ b/backend/src/main/kotlin/org/loculus/backend/config/BackendSpringConfig.kt @@ -10,8 +10,11 @@ import org.flywaydb.core.Flyway import org.jetbrains.exposed.spring.autoconfigure.ExposedAutoConfiguration import org.jetbrains.exposed.sql.DatabaseConfig import org.jetbrains.exposed.sql.Slf4jSqlDebugLogger +import org.jetbrains.exposed.sql.transactions.transaction import org.loculus.backend.controller.LoculusCustomHeaders import org.loculus.backend.log.REQUEST_ID_HEADER_DESCRIPTION +import org.loculus.backend.service.submission.CurrentProcessingPipelineTable +import org.loculus.backend.utils.DateProvider import org.springdoc.core.customizers.OperationCustomizer import org.springframework.beans.factory.annotation.Value import org.springframework.boot.autoconfigure.ImportAutoConfiguration @@ -67,13 +70,25 @@ class BackendSpringConfig { @Bean @Profile("!test") - fun getFlyway(dataSource: DataSource): Flyway { + fun getFlyway(dataSource: DataSource, backendConfig: BackendConfig, dateProvider: DateProvider): Flyway { val configuration = Flyway.configure() .baselineOnMigrate(true) .dataSource(dataSource) .validateMigrationNaming(true) val flyway = Flyway(configuration) flyway.migrate() + + // Since migration V1.10 we need to initialize the CurrentProcessingPipelineTable + // in code, because the configured organisms are not known in the SQL table definitions. + logger.info("Initializing CurrentProcessingPipelineTable") + transaction { + val insertedRows = CurrentProcessingPipelineTable.setV1ForOrganismsIfNotExist( + backendConfig.organisms.keys, + dateProvider.getCurrentDateTime(), + ) + logger.info("$insertedRows inserted.") + } + return flyway } diff --git a/backend/src/main/kotlin/org/loculus/backend/controller/SubmissionController.kt b/backend/src/main/kotlin/org/loculus/backend/controller/SubmissionController.kt index f1f00ab502..180aeae7e7 100644 --- a/backend/src/main/kotlin/org/loculus/backend/controller/SubmissionController.kt +++ b/backend/src/main/kotlin/org/loculus/backend/controller/SubmissionController.kt @@ -176,7 +176,7 @@ open class SubmissionController( @RequestParam pipelineVersion: Long, @RequestHeader(value = HttpHeaders.IF_NONE_MATCH, required = false) ifNoneMatch: String?, ): ResponseEntity { - val currentProcessingPipelineVersion = submissionDatabaseService.getCurrentProcessingPipelineVersion() + val currentProcessingPipelineVersion = submissionDatabaseService.getCurrentProcessingPipelineVersion(organism) if (pipelineVersion < currentProcessingPipelineVersion) { throw UnprocessableEntityException( "The processing pipeline version $pipelineVersion is not accepted " + diff --git a/backend/src/main/kotlin/org/loculus/backend/service/debug/DeleteSequenceDataService.kt b/backend/src/main/kotlin/org/loculus/backend/service/debug/DeleteSequenceDataService.kt index 24dbee382a..949a370ad5 100644 --- a/backend/src/main/kotlin/org/loculus/backend/service/debug/DeleteSequenceDataService.kt +++ b/backend/src/main/kotlin/org/loculus/backend/service/debug/DeleteSequenceDataService.kt @@ -1,7 +1,7 @@ package org.loculus.backend.service.debug import org.jetbrains.exposed.sql.deleteAll -import org.jetbrains.exposed.sql.insert +import org.loculus.backend.config.BackendConfig import org.loculus.backend.service.datauseterms.DataUseTermsTable import org.loculus.backend.service.submission.CurrentProcessingPipelineTable import org.loculus.backend.service.submission.MetadataUploadAuxTable @@ -13,7 +13,7 @@ import org.springframework.stereotype.Component import org.springframework.transaction.annotation.Transactional @Component -class DeleteSequenceDataService(private val dateProvider: DateProvider) { +class DeleteSequenceDataService(private val dateProvider: DateProvider, private val config: BackendConfig) { @Transactional fun deleteAllSequenceData() { SequenceEntriesTable.deleteAll() @@ -22,9 +22,9 @@ class DeleteSequenceDataService(private val dateProvider: DateProvider) { SequenceUploadAuxTable.deleteAll() DataUseTermsTable.deleteAll() CurrentProcessingPipelineTable.deleteAll() - CurrentProcessingPipelineTable.insert { - it[versionColumn] = 1 - it[startedUsingAtColumn] = dateProvider.getCurrentDateTime() - } + CurrentProcessingPipelineTable.setV1ForOrganismsIfNotExist( + config.organisms.keys, + dateProvider.getCurrentDateTime(), + ) } } diff --git a/backend/src/main/kotlin/org/loculus/backend/service/submission/CurrentProcessingPipelineTable.kt b/backend/src/main/kotlin/org/loculus/backend/service/submission/CurrentProcessingPipelineTable.kt index 09293833b7..88cb22da8a 100644 --- a/backend/src/main/kotlin/org/loculus/backend/service/submission/CurrentProcessingPipelineTable.kt +++ b/backend/src/main/kotlin/org/loculus/backend/service/submission/CurrentProcessingPipelineTable.kt @@ -1,11 +1,53 @@ package org.loculus.backend.service.submission +import kotlinx.datetime.LocalDateTime import org.jetbrains.exposed.sql.Table +import org.jetbrains.exposed.sql.andWhere +import org.jetbrains.exposed.sql.batchInsert import org.jetbrains.exposed.sql.kotlin.datetime.datetime +import org.jetbrains.exposed.sql.selectAll +import org.jetbrains.exposed.sql.update const val CURRENT_PROCESSING_PIPELINE_TABLE_NAME = "current_processing_pipeline" object CurrentProcessingPipelineTable : Table(CURRENT_PROCESSING_PIPELINE_TABLE_NAME) { + val organismColumn = varchar("organism", 255) val versionColumn = long("version") val startedUsingAtColumn = datetime("started_using_at") + + /** + * Every organism needs to have a current pipeline version in the CurrentProcessingPipelineTable. + * This function sets V1 for all given organisms, if no version is defined yet. + */ + fun setV1ForOrganismsIfNotExist(organisms: Collection, now: LocalDateTime) = + CurrentProcessingPipelineTable.batchInsert(organisms, ignore = true) { organism -> + this[organismColumn] = organism + this[versionColumn] = 1 + this[startedUsingAtColumn] = now + } + + /** + * Given a version that was found that is potentially newer than the current once, check if the currently stored + * 'current' pipeline version for this organism is less than the one that was found? + * If so, the pipeline needs to 'update' i.e. reprocess older entries. + */ + fun pipelineNeedsUpdate(maybeNewerVersion: Long, organism: String) = CurrentProcessingPipelineTable + .selectAll() + .where { versionColumn less maybeNewerVersion } + .andWhere { organismColumn eq organism } + .empty() + .not() + + /** + * Set the pipeline version for the given organism to newVersion. + */ + fun updatePipelineVersion(organism: String, newVersion: Long, startedUsingAt: LocalDateTime) = + CurrentProcessingPipelineTable.update( + where = { + organismColumn eq organism + }, + ) { + it[versionColumn] = newVersion + it[startedUsingAtColumn] = startedUsingAt + } } diff --git a/backend/src/main/kotlin/org/loculus/backend/service/submission/SequenceEntriesTable.kt b/backend/src/main/kotlin/org/loculus/backend/service/submission/SequenceEntriesTable.kt index a17e996c7b..1643e16533 100644 --- a/backend/src/main/kotlin/org/loculus/backend/service/submission/SequenceEntriesTable.kt +++ b/backend/src/main/kotlin/org/loculus/backend/service/submission/SequenceEntriesTable.kt @@ -44,6 +44,14 @@ object SequenceEntriesTable : Table(SEQUENCE_ENTRIES_TABLE_NAME) { ) } + fun distinctOrganisms() = SequenceEntriesTable + .select(SequenceEntriesTable.organismColumn) + .withDistinct() + .asSequence() + .map { + it[SequenceEntriesTable.organismColumn] + } + fun accessionVersionIsIn(accessionVersions: List) = Pair(accessionColumn, versionColumn) inList accessionVersions.toPairs() diff --git a/backend/src/main/kotlin/org/loculus/backend/service/submission/SubmissionDatabaseService.kt b/backend/src/main/kotlin/org/loculus/backend/service/submission/SubmissionDatabaseService.kt index a3049127d4..d958075d70 100644 --- a/backend/src/main/kotlin/org/loculus/backend/service/submission/SubmissionDatabaseService.kt +++ b/backend/src/main/kotlin/org/loculus/backend/service/submission/SubmissionDatabaseService.kt @@ -16,6 +16,7 @@ import org.jetbrains.exposed.sql.SortOrder import org.jetbrains.exposed.sql.SqlExpressionBuilder.less import org.jetbrains.exposed.sql.SqlExpressionBuilder.plus import org.jetbrains.exposed.sql.Transaction +import org.jetbrains.exposed.sql.VarCharColumnType import org.jetbrains.exposed.sql.alias import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.batchInsert @@ -124,10 +125,11 @@ class SubmissionDatabaseService( ) } - fun getCurrentProcessingPipelineVersion(): Long { + fun getCurrentProcessingPipelineVersion(organism: Organism): Long { val table = CurrentProcessingPipelineTable return table .select(table.versionColumn) + .where { table.organismColumn eq organism.name } .map { it[table.versionColumn] } @@ -1115,35 +1117,52 @@ class SubmissionDatabaseService( } } - fun useNewerProcessingPipelineIfPossible(): Long? { - log.info("Checking for newer processing pipeline versions") + fun useNewerProcessingPipelineIfPossible(): Map = + SequenceEntriesTable.distinctOrganisms().map { organismName -> + Pair(organismName, useNewerProcessingPipelineIfPossible(organismName)) + }.toMap() + + /** + * Looks for new preprocessing pipeline version with [findNewPreprocessingPipelineVersion]; + * if a new version is found, the [CurrentProcessingPipelineTable] is updated accordingly. + * If the [CurrentProcessingPipelineTable] is updated, the newly set version is returned. + */ + private fun useNewerProcessingPipelineIfPossible(organismName: String): Long? { + log.info("Checking for newer processing pipeline versions for organism '$organismName'") return transaction { - val newVersion = findNewPreprocessingPipelineVersion() + val newVersion = findNewPreprocessingPipelineVersion(organismName) ?: return@transaction null - val pipelineNeedsUpdate = CurrentProcessingPipelineTable - .selectAll().where { CurrentProcessingPipelineTable.versionColumn neq newVersion } - .limit(1) - .empty() - .not() + val pipelineNeedsUpdate = CurrentProcessingPipelineTable.pipelineNeedsUpdate(newVersion, organismName) if (pipelineNeedsUpdate) { log.info { "Updating current processing pipeline to newer version: $newVersion" } - CurrentProcessingPipelineTable.update( - where = { - CurrentProcessingPipelineTable.versionColumn neq newVersion - }, - ) { - it[versionColumn] = newVersion - it[startedUsingAtColumn] = dateProvider.getCurrentDateTime() - } + CurrentProcessingPipelineTable.updatePipelineVersion( + organismName, + newVersion, + dateProvider.getCurrentDateTime(), + ) } + + val logMessage = "Started using results from new processing pipeline: version $newVersion" + log.info(logMessage) + auditLogger.log(logMessage) newVersion } } } -private fun Transaction.findNewPreprocessingPipelineVersion(): Long? { +private fun Transaction.findNewPreprocessingPipelineVersion(organism: String): Long? { + // Maybe we want to refactor this function: https://github.com/loculus-project/loculus/issues/3571 + + // This query goes into the processed data and finds _any_ processed data that was processed + // with a pipeline version greater than the current one. + // If such a version is found ('newer.pipeline_version'), we go in and check some stuff. + // We look at all the data that was processed successfully with the current pipeline version, + // and then we check whether all of these were also successfully processed with the newer version. + // If any accession.version either was processed unsuccessfully with the new version, or just wasn't + // processed yet -> we _don't_ return the new version yet. + val sql = """ select newest.version as version @@ -1152,21 +1171,32 @@ private fun Transaction.findNewPreprocessingPipelineVersion(): Long? { select max(pipeline_version) as version from ( -- Newer pipeline versions... - select distinct pipeline_version - from sequence_entries_preprocessed_data - where pipeline_version > (select version from current_processing_pipeline) + select distinct sep.pipeline_version + from sequence_entries_preprocessed_data sep + join sequence_entries se + on se.accession = sep.accession + and se.version = sep.version + where + se.organism = ? + and sep.pipeline_version > (select version from current_processing_pipeline + where organism = ?) ) as newer where not exists( -- ...for which no sequence exists... select from ( -- ...that was processed successfully with the current version... - select accession, version - from sequence_entries_preprocessed_data + select sep.accession, sep.version + from sequence_entries_preprocessed_data sep + join sequence_entries se + on se.accession = sep.accession + and se.version = sep.version where - pipeline_version = (select version from current_processing_pipeline) - and processing_status = 'PROCESSED' - and (errors is null or jsonb_array_length(errors) = 0) + se.organism = ? + and sep.pipeline_version = (select version from current_processing_pipeline + where organism = ?) + and sep.processing_status = 'PROCESSED' + and (sep.errors is null or jsonb_array_length(sep.errors) = 0) ) as successful where -- ...but not successfully with the newer version. @@ -1184,7 +1214,16 @@ private fun Transaction.findNewPreprocessingPipelineVersion(): Long? { ) as newest; """.trimIndent() - return exec(sql, explicitStatementType = StatementType.SELECT) { resultSet -> + return exec( + sql, + listOf( + Pair(VarCharColumnType(), organism), + Pair(VarCharColumnType(), organism), + Pair(VarCharColumnType(), organism), + Pair(VarCharColumnType(), organism), + ), + explicitStatementType = StatementType.SELECT, + ) { resultSet -> if (!resultSet.next()) { return@exec null } diff --git a/backend/src/main/kotlin/org/loculus/backend/service/submission/UseNewerProcessingPipelineVersionTask.kt b/backend/src/main/kotlin/org/loculus/backend/service/submission/UseNewerProcessingPipelineVersionTask.kt index 074bdf843a..1535751c76 100644 --- a/backend/src/main/kotlin/org/loculus/backend/service/submission/UseNewerProcessingPipelineVersionTask.kt +++ b/backend/src/main/kotlin/org/loculus/backend/service/submission/UseNewerProcessingPipelineVersionTask.kt @@ -15,11 +15,6 @@ class UseNewerProcessingPipelineVersionTask( @Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS) fun task() { - val newVersion = submissionDatabaseService.useNewerProcessingPipelineIfPossible() - if (newVersion != null) { - val logMessage = "Started using results from new processing pipeline: version $newVersion" - log.info(logMessage) - auditLogger.log(logMessage) - } + submissionDatabaseService.useNewerProcessingPipelineIfPossible() } } diff --git a/backend/src/main/resources/db/migration/V1.10__pipeline_version_per_organism.sql b/backend/src/main/resources/db/migration/V1.10__pipeline_version_per_organism.sql new file mode 100644 index 0000000000..0b144ca137 --- /dev/null +++ b/backend/src/main/resources/db/migration/V1.10__pipeline_version_per_organism.sql @@ -0,0 +1,116 @@ +-- Introduced in https://github.com/loculus-project/loculus/pull/3534 +-- Adds the new organism column to the current_processing_pipeline table, +-- Adds the organism constraint to external_metadata_view and sequence_entries_view + +ALTER TABLE current_processing_pipeline +ADD COLUMN organism text; + +-- Drop old primary key constraint so we can add multiple rows with the same version +ALTER TABLE current_processing_pipeline +DROP CONSTRAINT current_processing_pipeline_pkey; + +-- Add current versions for all organisms +WITH distinct_organisms AS ( + SELECT DISTINCT organism + FROM sequence_entries +), +pipeline_versions AS ( + SELECT version, started_using_at + FROM current_processing_pipeline +) +INSERT INTO current_processing_pipeline (version, started_using_at, organism) +SELECT pv.version, pv.started_using_at, o.organism +FROM pipeline_versions pv +CROSS JOIN distinct_organisms o; + +-- delete old null rows +DELETE FROM current_processing_pipeline +WHERE organism IS NULL; + +-- Now, enforce the NOT NULL constraint +ALTER TABLE current_processing_pipeline +ALTER COLUMN organism SET NOT NULL; + +--- Add new primary key constraint (only 'organism' is primary key!) +ALTER TABLE current_processing_pipeline +ADD CONSTRAINT current_processing_pipeline_pkey PRIMARY KEY (organism); + + +drop view if exists external_metadata_view cascade; + +create view external_metadata_view as +select + cpd.accession, + cpd.version, + all_external_metadata.updated_metadata_at, + -- Combines metadata from preprocessed data with any external metadata updates + -- If there's no external metadata, just use the preprocessed data's metadata + -- If there is external metadata, merge it with preprocessed data (external takes precedence) + case + when all_external_metadata.external_metadata is null then + jsonb_build_object('metadata', (cpd.processed_data->'metadata')) + else + jsonb_build_object( + 'metadata', + (cpd.processed_data->'metadata') || all_external_metadata.external_metadata + ) + end as joint_metadata +from + ( + -- Get only the preprocessed data for the current pipeline version + select sepd.* + from + sequence_entries_preprocessed_data sepd + join sequence_entries se + on sepd.accession = se.accession and sepd.version = se.version + join current_processing_pipeline cpp + on se.organism = cpp.organism and sepd.pipeline_version = cpp.version + ) cpd + left join all_external_metadata on + all_external_metadata.accession = cpd.accession + and all_external_metadata.version = cpd.version; + +drop view if exists sequence_entries_view; + +create view sequence_entries_view as +select + se.*, + sepd.started_processing_at, + sepd.finished_processing_at, + sepd.processed_data as processed_data, + sepd.processed_data || em.joint_metadata as joint_metadata, + case + when se.is_revocation then (select version from current_processing_pipeline + where organism = se.organism) + else sepd.pipeline_version + end as pipeline_version, + sepd.errors, + sepd.warnings, + case + when se.released_at is not null then 'APPROVED_FOR_RELEASE' + when se.is_revocation then 'PROCESSED' + when sepd.processing_status = 'IN_PROCESSING' then 'IN_PROCESSING' + when sepd.processing_status = 'PROCESSED' then 'PROCESSED' + else 'RECEIVED' + end as status, + case + when sepd.processing_status = 'IN_PROCESSING' then null + when sepd.errors is not null and jsonb_array_length(sepd.errors) > 0 then 'HAS_ERRORS' + when sepd.warnings is not null and jsonb_array_length(sepd.warnings) > 0 then 'HAS_WARNINGS' + else 'NO_ISSUES' + end as processing_result +from + sequence_entries se + left join sequence_entries_preprocessed_data sepd on + se.accession = sepd.accession + and se.version = sepd.version + left join current_processing_pipeline ccp on -- join or left join? + se.organism = ccp.organism + and sepd.pipeline_version = ccp.version + left join external_metadata_view em on + se.accession = em.accession + and se.version = em.version; + +update sequence_entries_preprocessed_data +set processing_status = 'PROCESSED' +where processing_status in ('HAS_ERRORS', 'FINISHED'); diff --git a/backend/src/test/kotlin/org/loculus/backend/controller/EndpointTestExtension.kt b/backend/src/test/kotlin/org/loculus/backend/controller/EndpointTestExtension.kt index 8275b90b3a..ab879cceea 100644 --- a/backend/src/test/kotlin/org/loculus/backend/controller/EndpointTestExtension.kt +++ b/backend/src/test/kotlin/org/loculus/backend/controller/EndpointTestExtension.kt @@ -198,8 +198,12 @@ private fun clearDatabaseStatement(): String = """ $USER_GROUPS_TABLE_NAME, $METADATA_UPLOAD_AUX_TABLE_NAME, $SEQUENCE_UPLOAD_AUX_TABLE_NAME, - $DATA_USE_TERMS_TABLE_NAME + $DATA_USE_TERMS_TABLE_NAME, + $CURRENT_PROCESSING_PIPELINE_TABLE_NAME cascade; alter sequence $ACCESSION_SEQUENCE_NAME restart with 1; - update $CURRENT_PROCESSING_PIPELINE_TABLE_NAME set version = 1, started_using_at = now(); + insert into $CURRENT_PROCESSING_PIPELINE_TABLE_NAME values + (1, now(), '$DEFAULT_ORGANISM'), + (1, now(), '$OTHER_ORGANISM'), + (1, now(), '$ORGANISM_WITHOUT_CONSENSUS_SEQUENCES'); """ diff --git a/backend/src/test/kotlin/org/loculus/backend/controller/debug/DeleteAllSequenceDataEndpointTest.kt b/backend/src/test/kotlin/org/loculus/backend/controller/debug/DeleteAllSequenceDataEndpointTest.kt index 1644751f04..f729b4167c 100644 --- a/backend/src/test/kotlin/org/loculus/backend/controller/debug/DeleteAllSequenceDataEndpointTest.kt +++ b/backend/src/test/kotlin/org/loculus/backend/controller/debug/DeleteAllSequenceDataEndpointTest.kt @@ -162,8 +162,8 @@ class DeleteAllSequenceDataEndpointTest( .map { PreparedProcessedData.successfullyProcessed(accession = it.accession, version = it.version) } submissionConvenienceClient.submitProcessedData(processedDataVersion2, pipelineVersion = 2) - val canUpdate = submissionDatabaseService.useNewerProcessingPipelineIfPossible() - assertThat("An update to v2 should be possible", canUpdate, `is`(2L)) + val newVersions = submissionDatabaseService.useNewerProcessingPipelineIfPossible() + assertThat("An update to v2 should be possible", newVersions["dummyOrganism"], `is`(2L)) } @Test diff --git a/backend/src/test/kotlin/org/loculus/backend/controller/submission/GetReleasedDataEndpointTest.kt b/backend/src/test/kotlin/org/loculus/backend/controller/submission/GetReleasedDataEndpointTest.kt index 6e2c41f875..d00d122ffe 100644 --- a/backend/src/test/kotlin/org/loculus/backend/controller/submission/GetReleasedDataEndpointTest.kt +++ b/backend/src/test/kotlin/org/loculus/backend/controller/submission/GetReleasedDataEndpointTest.kt @@ -10,6 +10,7 @@ import com.fasterxml.jackson.module.kotlin.readValue import com.github.luben.zstd.ZstdInputStream import com.ninjasquad.springmockk.MockkBean import io.mockk.every +import io.mockk.mockk import kotlinx.datetime.Clock import kotlinx.datetime.DateTimeUnit import kotlinx.datetime.Instant @@ -60,7 +61,7 @@ import org.loculus.backend.controller.groupmanagement.GroupManagementControllerC import org.loculus.backend.controller.groupmanagement.andGetGroupId import org.loculus.backend.controller.jacksonObjectMapper import org.loculus.backend.controller.jwtForDefaultUser -import org.loculus.backend.controller.submission.GetReleasedDataEndpointWithDataUseTermsUrlTest.ConfigWithModifiedDataUseTermsUrlSpringConfig +import org.loculus.backend.controller.submission.GetReleasedDataEndpointWithDataUseTermsUrlTest.GetReleasedDataEndpointWithDataUseTermsUrlTestConfig import org.loculus.backend.controller.submission.SubmitFiles.DefaultFiles.NUMBER_OF_SEQUENCES import org.loculus.backend.service.KeycloakAdapter import org.loculus.backend.service.submission.SequenceEntriesTable @@ -431,22 +432,14 @@ private const val OPEN_DATA_USE_TERMS_URL = "openUrl" private const val RESTRICTED_DATA_USE_TERMS_URL = "restrictedUrl" @EndpointTest -@Import(ConfigWithModifiedDataUseTermsUrlSpringConfig::class) +@Import(GetReleasedDataEndpointWithDataUseTermsUrlTestConfig::class) @TestPropertySource(properties = ["spring.main.allow-bean-definition-overriding=true"]) class GetReleasedDataEndpointWithDataUseTermsUrlTest( @Autowired val convenienceClient: SubmissionConvenienceClient, @Autowired val dataUseTermsClient: DataUseTermsControllerClient, @Autowired val submissionControllerClient: SubmissionControllerClient, + @Autowired var dateProvider: DateProvider, ) { - @MockkBean - private lateinit var dateProvider: DateProvider - - @BeforeEach - fun setup() { - every { dateProvider.getCurrentDateTime() } answers { callOriginal() } - every { dateProvider.getCurrentDate() } answers { callOriginal() } - } - @Test fun `GIVEN sequence entry WHEN I change data use terms THEN returns updated data use terms`() { every { dateProvider.getCurrentInstant() } answers { callOriginal() } @@ -547,10 +540,10 @@ class GetReleasedDataEndpointWithDataUseTermsUrlTest( } @TestConfiguration - class ConfigWithModifiedDataUseTermsUrlSpringConfig { + class GetReleasedDataEndpointWithDataUseTermsUrlTestConfig { @Bean @Primary - fun backendConfig( + fun configWithModifiedDataUseTermsUrl( objectMapper: ObjectMapper, @Value("\${${BackendSpringProperty.BACKEND_CONFIG_PATH}}") configPath: String, ): BackendConfig { @@ -564,6 +557,15 @@ class GetReleasedDataEndpointWithDataUseTermsUrlTest( ), ) } + + @Bean + @Primary + fun mockedDateProvider(): DateProvider { + val mock = mockk(relaxed = true) + every { mock.getCurrentDateTime() } answers { callOriginal() } + every { mock.getCurrentDate() } answers { callOriginal() } + return mock + } } } diff --git a/backend/src/test/kotlin/org/loculus/backend/service/submission/UseNewerProcessingPipelineVersionTaskTest.kt b/backend/src/test/kotlin/org/loculus/backend/service/submission/UseNewerProcessingPipelineVersionTaskTest.kt index 6cbbd96b63..d18d6e8de4 100644 --- a/backend/src/test/kotlin/org/loculus/backend/service/submission/UseNewerProcessingPipelineVersionTaskTest.kt +++ b/backend/src/test/kotlin/org/loculus/backend/service/submission/UseNewerProcessingPipelineVersionTaskTest.kt @@ -2,11 +2,18 @@ package org.loculus.backend.service.submission import org.hamcrest.MatcherAssert.assertThat import org.hamcrest.Matchers.`is` +import org.jetbrains.exposed.sql.selectAll +import org.jetbrains.exposed.sql.transactions.transaction import org.junit.jupiter.api.Test +import org.loculus.backend.api.Organism +import org.loculus.backend.controller.DEFAULT_ORGANISM import org.loculus.backend.controller.EndpointTest +import org.loculus.backend.controller.ORGANISM_WITHOUT_CONSENSUS_SEQUENCES +import org.loculus.backend.controller.OTHER_ORGANISM import org.loculus.backend.controller.submission.PreparedProcessedData import org.loculus.backend.controller.submission.SubmissionControllerClient import org.loculus.backend.controller.submission.SubmissionConvenienceClient +import org.loculus.backend.utils.DateProvider import org.springframework.beans.factory.annotation.Autowired import org.springframework.test.web.servlet.result.MockMvcResultMatchers.status @@ -16,11 +23,12 @@ class UseNewerProcessingPipelineVersionTaskTest( @Autowired val submissionControllerClient: SubmissionControllerClient, @Autowired val useNewerProcessingPipelineVersionTask: UseNewerProcessingPipelineVersionTask, @Autowired val submissionDatabaseService: SubmissionDatabaseService, + @Autowired val dateProvider: DateProvider, ) { @Test fun `GIVEN error-free data from a newer pipeline WHEN the task is executed THEN the newer pipeline is used`() { - assertThat(submissionDatabaseService.getCurrentProcessingPipelineVersion(), `is`(1L)) + assertThat(submissionDatabaseService.getCurrentProcessingPipelineVersion(Organism(DEFAULT_ORGANISM)), `is`(1L)) val accessionVersions = convenienceClient.submitDefaultFiles().submissionIdMappings val processedData = accessionVersions.map { @@ -32,19 +40,70 @@ class UseNewerProcessingPipelineVersionTaskTest( convenienceClient.extractUnprocessedData(pipelineVersion = 1) convenienceClient.submitProcessedData(processedData, pipelineVersion = 1) useNewerProcessingPipelineVersionTask.task() - assertThat(submissionDatabaseService.getCurrentProcessingPipelineVersion(), `is`(1L)) + assertThat(submissionDatabaseService.getCurrentProcessingPipelineVersion(Organism(DEFAULT_ORGANISM)), `is`(1L)) convenienceClient.extractUnprocessedData(pipelineVersion = 2) convenienceClient.submitProcessedData(processedDataWithError, pipelineVersion = 2) useNewerProcessingPipelineVersionTask.task() - assertThat(submissionDatabaseService.getCurrentProcessingPipelineVersion(), `is`(1L)) + assertThat(submissionDatabaseService.getCurrentProcessingPipelineVersion(Organism(DEFAULT_ORGANISM)), `is`(1L)) convenienceClient.extractUnprocessedData(pipelineVersion = 3) convenienceClient.submitProcessedData(processedData, pipelineVersion = 3) useNewerProcessingPipelineVersionTask.task() - assertThat(submissionDatabaseService.getCurrentProcessingPipelineVersion(), `is`(3L)) + assertThat(submissionDatabaseService.getCurrentProcessingPipelineVersion(Organism(DEFAULT_ORGANISM)), `is`(3L)) submissionControllerClient.extractUnprocessedData(numberOfSequenceEntries = 10, pipelineVersion = 2) .andExpect(status().isUnprocessableEntity) } + + @Suppress("ktlint:standard:max-line-length") + @Test + fun `GIVEN the pipeline version for one organism updates THEN the pipeline version for another organism is not updated`() { + assertThat(submissionDatabaseService.getCurrentProcessingPipelineVersion(Organism(DEFAULT_ORGANISM)), `is`(1L)) + assertThat(submissionDatabaseService.getCurrentProcessingPipelineVersion(Organism(OTHER_ORGANISM)), `is`(1L)) + + val accessionVersions = convenienceClient.submitDefaultFiles().submissionIdMappings + val processedData = accessionVersions.map { + PreparedProcessedData.successfullyProcessed(it.accession, it.version) + } + convenienceClient.extractUnprocessedData(pipelineVersion = 2) + convenienceClient.submitProcessedData(processedData, pipelineVersion = 2) + useNewerProcessingPipelineVersionTask.task() + + assertThat(submissionDatabaseService.getCurrentProcessingPipelineVersion(Organism(DEFAULT_ORGANISM)), `is`(2L)) + assertThat(submissionDatabaseService.getCurrentProcessingPipelineVersion(Organism(OTHER_ORGANISM)), `is`(1L)) + } + + @Test + fun `GIVEN the backend restarts THEN no faulty V1 entries are created`() { + val rowCount = transaction { + CurrentProcessingPipelineTable.setV1ForOrganismsIfNotExist( + listOf(DEFAULT_ORGANISM, OTHER_ORGANISM, ORGANISM_WITHOUT_CONSENSUS_SEQUENCES), + dateProvider.getCurrentDateTime(), + ) + + CurrentProcessingPipelineTable.selectAll().count() + } + + // update DEFAULT_ORGANISM to V2 + val accessionVersions = convenienceClient.submitDefaultFiles().submissionIdMappings + val processedData = accessionVersions.map { + PreparedProcessedData.successfullyProcessed(it.accession, it.version) + } + convenienceClient.extractUnprocessedData(pipelineVersion = 2) + convenienceClient.submitProcessedData(processedData, pipelineVersion = 2) + useNewerProcessingPipelineVersionTask.task() + + val rowCountAfterV2 = transaction { + // simulate a DB init by calling this function + CurrentProcessingPipelineTable.setV1ForOrganismsIfNotExist( + listOf(DEFAULT_ORGANISM, OTHER_ORGANISM, ORGANISM_WITHOUT_CONSENSUS_SEQUENCES), + dateProvider.getCurrentDateTime(), + ) + + CurrentProcessingPipelineTable.selectAll().count() + } + + assertThat(rowCount, `is`(rowCountAfterV2)) + } } diff --git a/docs/astro.config.mjs b/docs/astro.config.mjs index 86d85cf33a..8fa96e2d97 100644 --- a/docs/astro.config.mjs +++ b/docs/astro.config.mjs @@ -66,12 +66,12 @@ export default defineConfig({ { label: 'Setup with Kubernetes', link: '/for-administrators/setup-with-kubernetes/' }, { label: 'Schema designs', link: '/for-administrators/schema-designs/' }, { - label: 'Existing preprocessing pipelines', - link: '/for-administrators/existing-preprocessing-pipelines/', - }, - { - label: 'Build new preprocessing pipeline', - link: '/for-administrators/build-new-preprocessing-pipeline/', + label: 'Preprocessing pipelines', + items: [ + 'for-administrators/pipeline-concept', + 'for-administrators/existing-preprocessing-pipelines', + 'for-administrators/build-new-preprocessing-pipeline' + ] }, { label: 'Data use terms', link: '/for-administrators/data-use-terms/' }, { label: 'User administration', link: '/for-administrators/user-administration/' }, diff --git a/docs/src/content/docs/for-administrators/build-new-preprocessing-pipeline.md b/docs/src/content/docs/for-administrators/build-new-preprocessing-pipeline.md index 4d11291ab5..f902c9e603 100644 --- a/docs/src/content/docs/for-administrators/build-new-preprocessing-pipeline.md +++ b/docs/src/content/docs/for-administrators/build-new-preprocessing-pipeline.md @@ -1,5 +1,5 @@ --- -title: Build a new preprocessing pipeline +title: Building your own pipeline --- :::note diff --git a/docs/src/content/docs/for-administrators/existing-preprocessing-pipelines.md b/docs/src/content/docs/for-administrators/existing-preprocessing-pipelines.md index a6047de52b..4ca4b588ee 100644 --- a/docs/src/content/docs/for-administrators/existing-preprocessing-pipelines.md +++ b/docs/src/content/docs/for-administrators/existing-preprocessing-pipelines.md @@ -1,12 +1,8 @@ --- -title: Existing preprocessing pipelines +title: Existing pipelines --- -[Preprocessing pipelines](../../introduction/glossary/#preprocessing-pipeline) hold most of the organism- and domain-specific logic within a Loculus instance. They take the submitted input data and, as a minimum, validate them to ensure that the submitted data follow the defined format. Additionally, they can clean the data and enrich them by adding annotations and sequence alignments. - -The Loculus team maintain a customizable processing pipeline which uses [Nextclade](../../introduction/glossary/#nextclade) to align sequences to a reference and generate statistics, which is discussed in more detail below. - -Using an existing pipeline is the fastest way to get started with Loculus, but it is also easy to develop new pipelines that use custom tooling and logic. For a very brief guide on how to build a new pipeline, please see [here](../build-new-preprocessing-pipeline/). +The Loculus team maintains a customizable processing pipeline which uses [Nextclade](../../introduction/glossary/#nextclade) to align sequences to a reference and generate statistics, which is discussed in more detail below. If you have developed a pipeline and would like it to be added to this list, please contact us! diff --git a/docs/src/content/docs/for-administrators/pipeline-concept.md b/docs/src/content/docs/for-administrators/pipeline-concept.md new file mode 100644 index 0000000000..8247777acb --- /dev/null +++ b/docs/src/content/docs/for-administrators/pipeline-concept.md @@ -0,0 +1,28 @@ +--- +title: What is a preprocessing pipeline? +--- + +Before submitted user data is available for review and release, it is first processed by a [preprocessing pipeline](../../introduction/glossary/#preprocessing-pipeline). The preprocessing pipelines hold most of the organism- and domain-specific logic within a Loculus instance. They take the submitted input data and, as a minimum, validate them to ensure that the submitted data follow the defined format. Additionally, they can clean the data and enrich them by adding annotations and sequence alignments. + +Using an [existing pipeline](../existing-preprocessing-pipelines/) is the fastest way to get started with Loculus, but it is also easy to develop new pipelines that use custom tooling and logic. For a very brief guide on how to build a new pipeline, please see [here](../build-new-preprocessing-pipeline/). + +## Tasks + +The preprocessing pipeline receives the user submitted data and then validates and enriches this data. +While the exact functionality depends on the specific pipeline, generally a pipeline will do the following things: + +**Parsing:** The preprocessing pipeline receives the input data as strings and transforms them into the right format. For example, assuming there is a field `age` of type `integer`, given an input `{"age": "2"}` the preprocessing pipeline should transform it to `{"age": 2}` (simple type conversion). In another case, assuming there is a field `sequencingDate` of type `date`, the preprocessing pipeline might transform `{"sequencingDate": "30 August 2023"}` to the expected format of `{"sequencingDate": "2023-08-30"}`. + +**Validation:** The preprocessing pipeline checks the input data and emits errors or warnings. As mentioned above, the only constraint is that the output of the preprocessing pipeline conforms to the right (technical) format. Otherwise, a pipeline may be generous (e.g., allow every value in the "country" field) or be more restrictive (e.g., only allow a fixed set of values in the "country" field). + +**Alignment and translations:** The submitter only provides unaligned nucleotide sequences. If you want to allow searching by nucleotide and amino acid mutations, the preprocessing pipeline needs to perform the alignment and compute the translations to amino acid sequences. + +**Annotation:** The preprocessing pipeline can add annotations such as clade/lineage classifications. + +**Quality control (QC):** The preprocessing pipeline should check the quality of the sequences (and the metadata). + +## Pipeline versions + +As the preprocessing logic might change over time, preprocessing pipelines are versioned (You specify the pipeline version under `.preprocessing.version`). +The backend keeps track of which sequences have successfully been processed with which pipeline version. +Once all data for an organism has successfully been processed with a new version, that version will also automatically be served to users. \ No newline at end of file diff --git a/kubernetes/loculus/values.yaml b/kubernetes/loculus/values.yaml index cda6236b5d..ec326232a4 100644 --- a/kubernetes/loculus/values.yaml +++ b/kubernetes/loculus/values.yaml @@ -33,6 +33,7 @@ logo: lineageSystemDefinitions: pangoLineage: 1: https://raw.githubusercontent.com/loculus-project/loculus/c400348ea0ba0b8178aa43475d5c7539fc097997/preprocessing/dummy/lineage.yaml + 2: https://raw.githubusercontent.com/loculus-project/loculus/c400348ea0ba0b8178aa43475d5c7539fc097997/preprocessing/dummy/lineage.yaml defaultOrganismConfig: &defaultOrganismConfig schema: &schema loadSequencesAutomatically: true @@ -1177,7 +1178,6 @@ defaultOrganismConfig: &defaultOrganismConfig replicas: 2 configFile: &preprocessingConfigFile log_level: DEBUG - nextclade_dataset_name: nextstrain/ebola/sudan genes: [NP, VP35, VP40, GP, sGP, ssGP, VP30, VP24, L] batch_size: 100 ingest: &ingest @@ -1218,10 +1218,13 @@ defaultOrganisms: <<: *defaultOrganismConfig preprocessing: - <<: *preprocessing - configFile: - <<: *preprocessingConfigFile - nextclade_dataset_server: https://raw.githubusercontent.com/nextstrain/nextclade_data/ebola/data_output - nextclade_dataset_name: nextstrain/ebola/sudan + version: 1 + nextclade_dataset_name: nextstrain/ebola/sudan + nextclade_dataset_server: https://raw.githubusercontent.com/nextstrain/nextclade_data/ebola/data_output + - <<: *preprocessing # test having 2 simultaneous versions + version: 2 + nextclade_dataset_name: nextstrain/ebola/sudan + nextclade_dataset_server: https://raw.githubusercontent.com/nextstrain/nextclade_data/ebola/data_output west-nile: <<: *defaultOrganismConfig schema: @@ -1263,6 +1266,7 @@ defaultOrganisms: defaultOrder: descending preprocessing: - <<: *preprocessing + version: 1 configFile: <<: *preprocessingConfigFile nextclade_dataset_name: nextstrain/wnv/all-lineages @@ -1362,7 +1366,7 @@ defaultOrganisms: silo: dateToSortBy: date preprocessing: - - version: 1 + - version: 2 image: ghcr.io/loculus-project/preprocessing-dummy args: - "--watch" @@ -1448,7 +1452,7 @@ defaultOrganisms: silo: dateToSortBy: date preprocessing: - - version: 1 + - version: 3 image: ghcr.io/loculus-project/preprocessing-nextclade args: - "prepro" @@ -1506,7 +1510,7 @@ defaultOrganisms: silo: dateToSortBy: date preprocessing: - - version: 1 + - version: 4 image: ghcr.io/loculus-project/preprocessing-nextclade args: - "prepro" From 20984c02abeaa7142b12f9efee6a86b3c0fc2936 Mon Sep 17 00:00:00 2001 From: Felix Hennig Date: Mon, 17 Feb 2025 18:08:25 +0100 Subject: [PATCH 2/4] fix(backend): fix SQL query and add test --- backend/docs/db/schema.sql | 37 +++++++++++-------- .../V1.10__pipeline_version_per_organism.sql | 27 ++++++++++---- .../submission/GetReleasedDataEndpointTest.kt | 15 ++++++++ .../submission/SubmissionConvenienceClient.kt | 6 +++ 4 files changed, 63 insertions(+), 22 deletions(-) diff --git a/backend/docs/db/schema.sql b/backend/docs/db/schema.sql index 27bb1eb6fa..38db5fa5f7 100644 --- a/backend/docs/db/schema.sql +++ b/backend/docs/db/schema.sql @@ -2,8 +2,8 @@ -- PostgreSQL database dump -- --- Dumped from database version 15.10 (Debian 15.10-1.pgdg120+1) --- Dumped by pg_dump version 16.6 (Debian 16.6-1.pgdg120+1) +-- Dumped from database version 15.11 (Debian 15.11-1.pgdg120+1) +-- Dumped by pg_dump version 16.7 (Debian 16.7-1.pgdg120+1) SET statement_timeout = 0; SET lock_timeout = 0; @@ -246,18 +246,21 @@ CREATE VIEW public.external_metadata_view AS WHEN (all_external_metadata.external_metadata IS NULL) THEN jsonb_build_object('metadata', (cpd.processed_data -> 'metadata'::text)) ELSE jsonb_build_object('metadata', ((cpd.processed_data -> 'metadata'::text) || all_external_metadata.external_metadata)) END AS joint_metadata - FROM (( SELECT sepd.accession, - sepd.version, - sepd.pipeline_version, - sepd.processed_data, - sepd.errors, - sepd.warnings, - sepd.processing_status, - sepd.started_processing_at, - sepd.finished_processing_at - FROM ((public.sequence_entries_preprocessed_data sepd - JOIN public.sequence_entries se ON (((sepd.accession = se.accession) AND (sepd.version = se.version)))) - JOIN public.current_processing_pipeline cpp ON (((se.organism = cpp.organism) AND (sepd.pipeline_version = cpp.version))))) cpd + FROM (( SELECT sequence_entries_preprocessed_data.accession, + sequence_entries_preprocessed_data.version, + sequence_entries_preprocessed_data.pipeline_version, + sequence_entries_preprocessed_data.processed_data, + sequence_entries_preprocessed_data.errors, + sequence_entries_preprocessed_data.warnings, + sequence_entries_preprocessed_data.processing_status, + sequence_entries_preprocessed_data.started_processing_at, + sequence_entries_preprocessed_data.finished_processing_at + FROM public.sequence_entries_preprocessed_data + WHERE (sequence_entries_preprocessed_data.pipeline_version = ( SELECT current_processing_pipeline.version + FROM public.current_processing_pipeline + WHERE (current_processing_pipeline.organism = ( SELECT se.organism + FROM public.sequence_entries se + WHERE ((se.accession = sequence_entries_preprocessed_data.accession) AND (se.version = sequence_entries_preprocessed_data.version))))))) cpd LEFT JOIN public.all_external_metadata ON (((all_external_metadata.accession = cpd.accession) AND (all_external_metadata.version = cpd.version)))); @@ -487,7 +490,11 @@ CREATE VIEW public.sequence_entries_view AS ELSE 'NO_ISSUES'::text END AS processing_result FROM (((public.sequence_entries se - LEFT JOIN public.sequence_entries_preprocessed_data sepd ON (((se.accession = sepd.accession) AND (se.version = sepd.version)))) + LEFT JOIN public.sequence_entries_preprocessed_data sepd ON (((se.accession = sepd.accession) AND (se.version = sepd.version) AND (sepd.pipeline_version = ( SELECT current_processing_pipeline.version + FROM public.current_processing_pipeline + WHERE (current_processing_pipeline.organism = ( SELECT se_1.organism + FROM public.sequence_entries se_1 + WHERE ((se_1.accession = sepd.accession) AND (se_1.version = sepd.version))))))))) LEFT JOIN public.current_processing_pipeline ccp ON (((se.organism = ccp.organism) AND (sepd.pipeline_version = ccp.version)))) LEFT JOIN public.external_metadata_view em ON (((se.accession = em.accession) AND (se.version = em.version)))); diff --git a/backend/src/main/resources/db/migration/V1.10__pipeline_version_per_organism.sql b/backend/src/main/resources/db/migration/V1.10__pipeline_version_per_organism.sql index 0b144ca137..ca8206ed74 100644 --- a/backend/src/main/resources/db/migration/V1.10__pipeline_version_per_organism.sql +++ b/backend/src/main/resources/db/migration/V1.10__pipeline_version_per_organism.sql @@ -58,13 +58,17 @@ select from ( -- Get only the preprocessed data for the current pipeline version - select sepd.* - from - sequence_entries_preprocessed_data sepd - join sequence_entries se - on sepd.accession = se.accession and sepd.version = se.version - join current_processing_pipeline cpp - on se.organism = cpp.organism and sepd.pipeline_version = cpp.version + select * from sequence_entries_preprocessed_data + where pipeline_version = ( + select version + from current_processing_pipeline + where organism = ( + select organism + from sequence_entries se + where + se.accession = sequence_entries_preprocessed_data.accession + and se.version = sequence_entries_preprocessed_data.version + )) ) cpd left join all_external_metadata on all_external_metadata.accession = cpd.accession @@ -104,6 +108,15 @@ from left join sequence_entries_preprocessed_data sepd on se.accession = sepd.accession and se.version = sepd.version + and sepd.pipeline_version = ( + select version + from current_processing_pipeline + where organism = ( + select organism + from sequence_entries se + where se.accession = sepd.accession + and se.version = sepd.version + )) left join current_processing_pipeline ccp on -- join or left join? se.organism = ccp.organism and sepd.pipeline_version = ccp.version diff --git a/backend/src/test/kotlin/org/loculus/backend/controller/submission/GetReleasedDataEndpointTest.kt b/backend/src/test/kotlin/org/loculus/backend/controller/submission/GetReleasedDataEndpointTest.kt index d00d122ffe..5ccf0c7a94 100644 --- a/backend/src/test/kotlin/org/loculus/backend/controller/submission/GetReleasedDataEndpointTest.kt +++ b/backend/src/test/kotlin/org/loculus/backend/controller/submission/GetReleasedDataEndpointTest.kt @@ -271,6 +271,21 @@ class GetReleasedDataEndpointTest( } } + @Test + fun `GIVEN multiple processing pipelines have submitted data THEN no duplicates are returned`() { + val accessionVersions = convenienceClient.prepareDefaultSequenceEntriesToInProcessing() + val processedData = accessionVersions.map { + PreparedProcessedData.successfullyProcessed(accession = it.accession, version = it.version) + } + convenienceClient.submitProcessedData(processedData, pipelineVersion = 1) + convenienceClient.approveProcessedSequenceEntries(accessionVersions) + convenienceClient.extractUnprocessedData(pipelineVersion = 2) + convenienceClient.submitProcessedData(processedData, pipelineVersion = 2) + val response = submissionControllerClient.getReleasedData() + val responseBody = response.expectNdjsonAndGetContent>() + assertThat(accessionVersions.size, `is`(responseBody.size)) + } + @Test fun `GIVEN revocation version THEN all data is present but mostly null`() { convenienceClient.prepareRevokedSequenceEntries() diff --git a/backend/src/test/kotlin/org/loculus/backend/controller/submission/SubmissionConvenienceClient.kt b/backend/src/test/kotlin/org/loculus/backend/controller/submission/SubmissionConvenienceClient.kt index 61062f479c..cd9f885ec1 100644 --- a/backend/src/test/kotlin/org/loculus/backend/controller/submission/SubmissionConvenienceClient.kt +++ b/backend/src/test/kotlin/org/loculus/backend/controller/submission/SubmissionConvenienceClient.kt @@ -232,6 +232,12 @@ class SubmissionConvenienceClient( return approveProcessedSequenceEntries(revocationVersions, organism = organism) } + /** + * This function returns sequence entries that need to be reprocessed given a pipeline version. + * It also sets the status for these accession versions to IN_PROCESSING! + * You cannot submit data for accession versions without calling this function first. + * @see org.loculus.backend.controller.SubmissionController.extractUnprocessedData + */ fun extractUnprocessedData( numberOfSequenceEntries: Int = DefaultFiles.NUMBER_OF_SEQUENCES, organism: String = DEFAULT_ORGANISM, From a50dc3748853b67013543506a4e5d1386b48f345 Mon Sep 17 00:00:00 2001 From: Felix Hennig Date: Mon, 17 Feb 2025 22:30:46 +0000 Subject: [PATCH 3/4] Update backend/src/main/kotlin/org/loculus/backend/service/submission/CurrentProcessingPipelineTable.kt Co-authored-by: Theo Sanderson --- .../service/submission/CurrentProcessingPipelineTable.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/main/kotlin/org/loculus/backend/service/submission/CurrentProcessingPipelineTable.kt b/backend/src/main/kotlin/org/loculus/backend/service/submission/CurrentProcessingPipelineTable.kt index 88cb22da8a..604cd66967 100644 --- a/backend/src/main/kotlin/org/loculus/backend/service/submission/CurrentProcessingPipelineTable.kt +++ b/backend/src/main/kotlin/org/loculus/backend/service/submission/CurrentProcessingPipelineTable.kt @@ -27,7 +27,7 @@ object CurrentProcessingPipelineTable : Table(CURRENT_PROCESSING_PIPELINE_TABLE_ } /** - * Given a version that was found that is potentially newer than the current once, check if the currently stored + * Given a version that was found that is potentially newer than the current one, check if the currently stored * 'current' pipeline version for this organism is less than the one that was found? * If so, the pipeline needs to 'update' i.e. reprocess older entries. */ From dd1906d8bb9e6dd09f658034cfe06f88816ed7e4 Mon Sep 17 00:00:00 2001 From: Felix Hennig Date: Tue, 18 Feb 2025 09:59:03 +0100 Subject: [PATCH 4/4] improve test --- .../submission/GetReleasedDataEndpointTest.kt | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/backend/src/test/kotlin/org/loculus/backend/controller/submission/GetReleasedDataEndpointTest.kt b/backend/src/test/kotlin/org/loculus/backend/controller/submission/GetReleasedDataEndpointTest.kt index 5ccf0c7a94..b38dc9481d 100644 --- a/backend/src/test/kotlin/org/loculus/backend/controller/submission/GetReleasedDataEndpointTest.kt +++ b/backend/src/test/kotlin/org/loculus/backend/controller/submission/GetReleasedDataEndpointTest.kt @@ -65,6 +65,7 @@ import org.loculus.backend.controller.submission.GetReleasedDataEndpointWithData import org.loculus.backend.controller.submission.SubmitFiles.DefaultFiles.NUMBER_OF_SEQUENCES import org.loculus.backend.service.KeycloakAdapter import org.loculus.backend.service.submission.SequenceEntriesTable +import org.loculus.backend.service.submission.SubmissionDatabaseService import org.loculus.backend.utils.Accession import org.loculus.backend.utils.DateProvider import org.loculus.backend.utils.Version @@ -97,6 +98,7 @@ class GetReleasedDataEndpointTest( @Autowired private val submissionControllerClient: SubmissionControllerClient, @Autowired private val groupClient: GroupManagementControllerClient, @Autowired private val dataUseTermsClient: DataUseTermsControllerClient, + @Autowired private val submissionDatabaseService: SubmissionDatabaseService, ) { private val currentDate = Clock.System.now().toLocalDateTime(DateProvider.timeZone).date.toString() @@ -272,7 +274,7 @@ class GetReleasedDataEndpointTest( } @Test - fun `GIVEN multiple processing pipelines have submitted data THEN no duplicates are returned`() { + fun `GIVEN multiple processing pipelines have submitted data THEN only latest data is returned`() { val accessionVersions = convenienceClient.prepareDefaultSequenceEntriesToInProcessing() val processedData = accessionVersions.map { PreparedProcessedData.successfullyProcessed(accession = it.accession, version = it.version) @@ -281,9 +283,13 @@ class GetReleasedDataEndpointTest( convenienceClient.approveProcessedSequenceEntries(accessionVersions) convenienceClient.extractUnprocessedData(pipelineVersion = 2) convenienceClient.submitProcessedData(processedData, pipelineVersion = 2) + submissionDatabaseService.useNewerProcessingPipelineIfPossible() val response = submissionControllerClient.getReleasedData() val responseBody = response.expectNdjsonAndGetContent>() - assertThat(accessionVersions.size, `is`(responseBody.size)) + assertThat(responseBody.size, `is`(accessionVersions.size)) + responseBody.forEach { + assertThat(it.metadata["pipelineVersion"]!!.intValue(), `is`(2)) + } } @Test