Skip to content

Commit f5a92f9

Browse files
authored
feat: refactor transcript export to exclude MITO/GCNV/SV (#1115)
* feat: refactor transcript export to exclude MITO/GCNV/SV * ruff * bugfix: mito annotations incorporate variant fields * reorder gcnvs
1 parent 599faa7 commit f5a92f9

File tree

9 files changed

+129
-347
lines changed

9 files changed

+129
-347
lines changed

v03_pipeline/lib/misc/clickhouse.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def src_path_fn(self) -> Callable:
5555
ClickHouseTable.ENTRIES: new_entries_parquet_path,
5656
}[self]
5757

58-
def should_load(
58+
def should_load( # noqa: PLR0911
5959
self,
6060
reference_genome: ReferenceGenome,
6161
dataset_type: DatasetType,
@@ -73,11 +73,12 @@ def should_load(
7373
dataset_type,
7474
)
7575
)
76+
if self == ClickHouseTable.TRANSCRIPTS:
77+
return dataset_type.should_write_new_transcripts
7678
return self in {
7779
ClickHouseTable.ANNOTATIONS_DISK,
7880
ClickHouseTable.ANNOTATIONS_MEMORY,
7981
ClickHouseTable.KEY_LOOKUP,
80-
ClickHouseTable.TRANSCRIPTS,
8182
}
8283
msg = f'Unhandled ClickHouseMigrationType: {migration_type.value}'
8384
raise ValueError(
@@ -91,11 +92,12 @@ def should_load(
9192
dataset_type,
9293
)
9394
)
95+
if self == ClickHouseTable.TRANSCRIPTS:
96+
return dataset_type.should_write_new_transcripts
9497
return self in {
9598
ClickHouseTable.ANNOTATIONS_DISK,
9699
ClickHouseTable.ANNOTATIONS_MEMORY,
97100
ClickHouseTable.KEY_LOOKUP,
98-
ClickHouseTable.TRANSCRIPTS,
99101
ClickHouseTable.ENTRIES,
100102
}
101103

v03_pipeline/lib/model/dataset_type.py

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from collections import OrderedDict
21
from collections.abc import Callable
32
from enum import StrEnum
43

@@ -406,36 +405,9 @@ def export_vcf_annotation_fns(self) -> list[Callable[..., hl.Expression]]:
406405
],
407406
}[self]
408407

409-
def export_parquet_filterable_transcripts_fields(
410-
self,
411-
reference_genome: ReferenceGenome,
412-
) -> OrderedDict[str, str]:
413-
fields = ['geneId']
414-
if self in {DatasetType.SV, DatasetType.GCNV}:
415-
fields = [
416-
*fields,
417-
'majorConsequence',
418-
]
419-
if self in {DatasetType.SNV_INDEL, DatasetType.MITO}:
420-
fields = [
421-
*fields,
422-
'canonical',
423-
'consequenceTerms',
424-
]
425-
fields = {
426-
# above fields are renamed to themselves
427-
k: k
428-
for k in fields
429-
}
430-
if self == DatasetType.SNV_INDEL and reference_genome == ReferenceGenome.GRCh38:
431-
fields = {
432-
**fields,
433-
'alphamissensePathogenicity': 'alphamissense.pathogenicity',
434-
'extendedIntronicSpliceRegionVariant': 'spliceregion.extended_intronic_splice_region_variant',
435-
'fiveutrConsequence': 'utrannotator.fiveutrConsequence',
436-
}
437-
# Parquet export expects all fields sorted alphabetically
438-
return OrderedDict(sorted(fields.items()))
408+
@property
409+
def should_write_new_transcripts(self):
410+
return self == DatasetType.SNV_INDEL
439411

440412
@property
441413
def overwrite_male_non_par_calls(self) -> None:

v03_pipeline/lib/tasks/exports/fields.py

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
import hail as hl
22

33
from v03_pipeline.lib.model import DatasetType, ReferenceGenome, SampleType
4-
from v03_pipeline.lib.tasks.exports.misc import (
5-
transcripts_field_name,
6-
)
4+
from v03_pipeline.lib.tasks.exports.misc import reformat_transcripts_for_export
75

86

97
def reference_independent_contig(locus: hl.LocusExpression):
@@ -256,11 +254,11 @@ def get_populations_export_fields(ht: hl.Table, dataset_type: DatasetType):
256254
},
257255
DatasetType.GCNV: lambda ht: {
258256
'seqrPop': hl.Struct(
259-
af=ht.gt_stats.AF,
260257
ac=ht.gt_stats.AC,
258+
af=ht.gt_stats.AF,
261259
an=ht.gt_stats.AN,
262-
Hom=ht.gt_stats.Hom,
263-
Het=ht.gt_stats.Het,
260+
het=ht.gt_stats.Het,
261+
hom=ht.gt_stats.Hom,
264262
),
265263
},
266264
}[dataset_type](ht)
@@ -313,17 +311,32 @@ def get_consequences_fields(
313311
reference_genome: ReferenceGenome,
314312
dataset_type: DatasetType,
315313
):
316-
consequences_field = transcripts_field_name(reference_genome, dataset_type)
317-
if (
318-
reference_genome == ReferenceGenome.GRCh38
319-
and dataset_type == DatasetType.SNV_INDEL
320-
):
321-
return {
322-
'sortedMotifFeatureConsequences': ht.sortedMotifFeatureConsequences,
323-
'sortedRegulatoryFeatureConsequences': ht.sortedRegulatoryFeatureConsequences,
324-
consequences_field: ht[consequences_field],
325-
}
326-
return {consequences_field: ht[consequences_field]}
314+
return {
315+
DatasetType.SNV_INDEL: lambda ht: {
316+
**(
317+
{
318+
'sortedMotifFeatureConsequences': ht.sortedMotifFeatureConsequences,
319+
'sortedRegulatoryFeatureConsequences': ht.sortedRegulatoryFeatureConsequences,
320+
}
321+
if reference_genome == ReferenceGenome.GRCh38
322+
else {}
323+
),
324+
'sortedTranscriptConsequences': ht.sortedTranscriptConsequences,
325+
},
326+
DatasetType.MITO: lambda ht: {
327+
# MITO transcripts are not exported to their own table,
328+
# but the structure should be preserved here.
329+
'sortedTranscriptConsequences': hl.enumerate(
330+
ht.sortedTranscriptConsequences,
331+
).starmap(reformat_transcripts_for_export),
332+
},
333+
DatasetType.SV: lambda ht: {
334+
'sortedGeneConsequences': ht.sortedGeneConsequences,
335+
},
336+
DatasetType.GCNV: lambda ht: {
337+
'sortedGeneConsequences': ht.sortedGeneConsequences,
338+
},
339+
}[dataset_type](ht)
327340

328341

329342
def get_variants_export_fields(

v03_pipeline/lib/tasks/exports/misc.py

Lines changed: 54 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from collections import OrderedDict
2+
13
import hail as hl
24

35
from v03_pipeline.lib.annotations.enums import (
@@ -44,40 +46,67 @@ def array_structexpression_fields(ht: hl.Table):
4446
]
4547

4648

47-
def transcripts_field_name(
49+
def reformat_transcripts_for_export(i: int, s: hl.StructExpression):
50+
formatted_s = (
51+
s.annotate(
52+
majorConsequence=s.consequenceTerms.first(),
53+
transcriptRank=i,
54+
)
55+
if hasattr(s, 'loftee')
56+
else s.annotate(
57+
loftee=hl.Struct(
58+
isLofNagnag=s.isLofNagnag,
59+
lofFilters=s.lofFilters,
60+
),
61+
majorConsequence=s.consequenceTerms.first(),
62+
transcriptRank=i,
63+
).drop('isLofNagnag', 'lofFilters')
64+
)
65+
return sorted_hl_struct(formatted_s)
66+
67+
68+
def export_parquet_filterable_transcripts_fields(
4869
reference_genome: ReferenceGenome,
49-
dataset_type: DatasetType,
50-
) -> str:
51-
formatting_annotation_names = {
52-
fa.__name__ for fa in dataset_type.formatting_annotation_fns(reference_genome)
70+
) -> OrderedDict[str, str]:
71+
fields = {
72+
k: k
73+
for k in [
74+
'canonical',
75+
'consequenceTerms',
76+
'geneId',
77+
]
5378
}
54-
if 'sorted_gene_consequences' in formatting_annotation_names:
55-
return snake_to_camelcase('sorted_gene_consequences')
56-
return snake_to_camelcase('sorted_transcript_consequences')
79+
if reference_genome == ReferenceGenome.GRCh38:
80+
fields = {
81+
**fields,
82+
'alphamissensePathogenicity': 'alphamissense.pathogenicity',
83+
'extendedIntronicSpliceRegionVariant': 'spliceregion.extended_intronic_splice_region_variant',
84+
'fiveutrConsequence': 'utrannotator.fiveutrConsequence',
85+
}
86+
# Parquet export expects all fields sorted alphabetically
87+
return OrderedDict(sorted(fields.items()))
5788

5889

59-
def subset_filterable_transcripts_fields(
90+
def subset_sorted_transcript_consequences_fields(
6091
ht: hl.Table,
6192
reference_genome: ReferenceGenome,
62-
dataset_type: DatasetType,
6393
) -> hl.Table:
64-
field_name = transcripts_field_name(reference_genome, dataset_type)
6594
return ht.annotate(
66-
**{
67-
field_name: hl.enumerate(ht[field_name]).starmap(
68-
lambda idx, c: c.select(
69-
**{
70-
new_nested_field_name: parse_nested_field(
71-
ht[field_name],
72-
existing_nested_field_name,
73-
)[idx]
74-
for new_nested_field_name, existing_nested_field_name in dataset_type.export_parquet_filterable_transcripts_fields(
75-
reference_genome,
76-
).items()
77-
},
78-
),
95+
sortedTranscriptConsequences=hl.enumerate(
96+
ht.sortedTranscriptConsequences,
97+
).starmap(
98+
lambda idx, c: c.select(
99+
**{
100+
new_field_name: parse_nested_field(
101+
ht.sortedTranscriptConsequences,
102+
existing_field_name,
103+
)[idx]
104+
for new_field_name, existing_field_name in export_parquet_filterable_transcripts_fields(
105+
reference_genome,
106+
).items()
107+
},
79108
),
80-
},
109+
),
81110
)
82111

83112

v03_pipeline/lib/tasks/exports/write_new_transcripts_parquet.py

Lines changed: 9 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,23 @@
22
import luigi
33
import luigi.util
44

5-
from v03_pipeline.lib.misc.callsets import get_callset_ht
65
from v03_pipeline.lib.paths import (
76
new_transcripts_parquet_path,
87
new_variants_table_path,
9-
variant_annotations_table_path,
108
)
119
from v03_pipeline.lib.tasks.base.base_loading_run_params import (
1210
BaseLoadingRunParams,
1311
)
1412
from v03_pipeline.lib.tasks.base.base_write_parquet import BaseWriteParquetTask
1513
from v03_pipeline.lib.tasks.exports.misc import (
1614
camelcase_array_structexpression_fields,
17-
sorted_hl_struct,
18-
transcripts_field_name,
15+
reformat_transcripts_for_export,
1916
unmap_formatting_annotation_enums,
2017
)
2118
from v03_pipeline.lib.tasks.files import GCSorLocalFolderTarget, GCSorLocalTarget
2219
from v03_pipeline.lib.tasks.update_new_variants_with_caids import (
2320
UpdateNewVariantsWithCAIDsTask,
2421
)
25-
from v03_pipeline.lib.tasks.update_variant_annotations_table_with_new_samples import (
26-
UpdateVariantAnnotationsTableWithNewSamplesTask,
27-
)
28-
from v03_pipeline.lib.tasks.write_new_variants_table import WriteNewVariantsTableTask
2922

3023

3124
@luigi.util.inherits(BaseLoadingRunParams)
@@ -43,35 +36,16 @@ def complete(self) -> luigi.Target:
4336
return GCSorLocalFolderTarget(self.output().path).exists()
4437

4538
def requires(self) -> luigi.Task:
46-
if self.dataset_type.export_all_callset_variants:
47-
return self.clone(UpdateVariantAnnotationsTableWithNewSamplesTask)
48-
if self.dataset_type.should_send_to_allele_registry:
49-
return self.clone(UpdateNewVariantsWithCAIDsTask)
50-
return self.clone(WriteNewVariantsTableTask)
39+
return self.clone(UpdateNewVariantsWithCAIDsTask)
5140

5241
def create_table(self) -> None:
53-
if self.dataset_type.export_all_callset_variants:
54-
ht = hl.read_table(
55-
variant_annotations_table_path(
56-
self.reference_genome,
57-
self.dataset_type,
58-
),
59-
)
60-
callset_ht = get_callset_ht(
42+
ht = hl.read_table(
43+
new_variants_table_path(
6144
self.reference_genome,
6245
self.dataset_type,
63-
self.callset_path,
64-
self.project_guids,
65-
)
66-
ht = ht.semi_join(callset_ht)
67-
else:
68-
ht = hl.read_table(
69-
new_variants_table_path(
70-
self.reference_genome,
71-
self.dataset_type,
72-
self.run_id,
73-
),
74-
)
46+
self.run_id,
47+
),
48+
)
7549
ht = unmap_formatting_annotation_enums(
7650
ht,
7751
self.reference_genome,
@@ -86,17 +60,6 @@ def create_table(self) -> None:
8660
return ht.select(
8761
key_=ht.key_,
8862
transcripts=hl.enumerate(
89-
ht[transcripts_field_name(self.reference_genome, self.dataset_type)],
90-
)
91-
.starmap(
92-
lambda i, s: (
93-
s
94-
if hasattr(s, 'majorConsequence')
95-
else s.annotate(
96-
majorConsequence=s.consequenceTerms.first(),
97-
transcriptRank=i,
98-
)
99-
),
100-
)
101-
.map(sorted_hl_struct),
63+
ht.sortedTranscriptConsequences,
64+
).starmap(reformat_transcripts_for_export),
10265
)

0 commit comments

Comments
 (0)