Skip to content

Commit e5a8f73

Browse files
authored
feat: add tasks for writing parquet tables. (#1076)
* Formatting function enums * improve function name * handle lookups that propagate missing * format * ruff * rename func * second func * start test * finish tests * ruff * Add camelcase * add camelcase * ruff * missing init py * move functions to export * mostly functioning entries task * import * improve private reference datasets logic * first pass * progress * tests passing * key * annotations table * test transcripts * closer * missed some tests * add a parquet reader requirement * use project tables * finish off first pass at test! * reformat test * another batch * add filters * ruff * handle hgmd edge case * add hgmd * sort it * ruff * bugfixes * update test to new format * ruff * v03 * merge * no longer used * lint * formatting * special case the export * remove gene/map * ruff * add new annotations * print * ruff * canonical is not a float * add new clinvar variants parquet export (#1092) * feat: grch37 SNV_INDEL export (#1095) * support grch37 * enable grch37 * feat: alphabetize nested field (#1096) * Support alphabetization of nested field * better sorting * Update misc_test.py * ruff
1 parent 803e189 commit e5a8f73

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+2270
-4
lines changed

requirements-dev.in

+1
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ responses>=0.23.1
77
ruff>=0.1.8
88
shellcheck-py>=0.10.0
99
pysam
10+
pyarrow

requirements-dev.txt

+2
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ packaging==24.2
5959
# sphinx
6060
pip-tools==7.4.1
6161
# via -r requirements-dev.in
62+
pyarrow==19.0.1
63+
# via -r requirements-dev.in
6264
pygments==2.19.1
6365
# via
6466
# -c requirements.txt

v03_pipeline/lib/model/dataset_type.py

+36
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from collections import OrderedDict
12
from collections.abc import Callable
23
from enum import StrEnum
34

@@ -390,6 +391,10 @@ def filter_invalid_sites(self):
390391
def should_export_to_vcf(self):
391392
return self == DatasetType.SV
392393

394+
@property
395+
def should_export_to_parquet(self):
396+
return self == DatasetType.SNV_INDEL
397+
393398
@property
394399
def export_vcf_annotation_fns(self) -> list[Callable[..., hl.Expression]]:
395400
return {
@@ -400,6 +405,37 @@ def export_vcf_annotation_fns(self) -> list[Callable[..., hl.Expression]]:
400405
],
401406
}[self]
402407

408+
def export_parquet_filterable_transcripts_fields(
409+
self,
410+
reference_genome: ReferenceGenome,
411+
) -> OrderedDict[str, str]:
412+
fields = ['geneId']
413+
if self in {DatasetType.SV, DatasetType.GCNV}:
414+
fields = [
415+
*fields,
416+
'majorConsequence',
417+
]
418+
if self in {DatasetType.SNV_INDEL, DatasetType.MITO}:
419+
fields = [
420+
*fields,
421+
'canonical',
422+
'consequenceTerms',
423+
]
424+
fields = {
425+
# above fields are renamed to themselves
426+
k: k
427+
for k in fields
428+
}
429+
if self == DatasetType.SNV_INDEL and reference_genome == ReferenceGenome.GRCh38:
430+
fields = {
431+
**fields,
432+
'alphamissensePathogenicity': 'alphamissense.pathogenicity',
433+
'extendedIntronicSpliceRegionVariant': 'spliceregion.extended_intronic_splice_region_variant',
434+
'fiveutrConsequence': 'utrannotator.fiveutrConsequence',
435+
}
436+
# Parquet export expects all fields sorted alphabetically
437+
return OrderedDict(sorted(fields.items()))
438+
403439
@property
404440
def overwrite_male_non_par_calls(self) -> None:
405441
return self == DatasetType.SV

v03_pipeline/lib/model/feature_flag.py

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
)
88
CHECK_SEX_AND_RELATEDNESS = os.environ.get('CHECK_SEX_AND_RELATEDNESS') == '1'
99
EXPECT_TDR_METRICS = os.environ.get('EXPECT_TDR_METRICS') == '1'
10+
EXPORT_TO_PARQUET = os.environ.get('EXPORT_TO_PARQUET') == '1'
1011
INCLUDE_PIPELINE_VERSION_IN_PREFIX = (
1112
os.environ.get('INCLUDE_PIPELINE_VERSION_IN_PREFIX') == '1'
1213
)
@@ -21,6 +22,7 @@ class FeatureFlag:
2122
ACCESS_PRIVATE_REFERENCE_DATASETS: bool = ACCESS_PRIVATE_REFERENCE_DATASETS
2223
CHECK_SEX_AND_RELATEDNESS: bool = CHECK_SEX_AND_RELATEDNESS
2324
EXPECT_TDR_METRICS: bool = EXPECT_TDR_METRICS
25+
EXPORT_TO_PARQUET: bool = EXPORT_TO_PARQUET
2426
INCLUDE_PIPELINE_VERSION_IN_PREFIX: bool = INCLUDE_PIPELINE_VERSION_IN_PREFIX
2527
RUN_PIPELINE_ON_DATAPROC: bool = RUN_PIPELINE_ON_DATAPROC
2628
SHOULD_TRIGGER_HAIL_BACKEND_RELOAD: bool = SHOULD_TRIGGER_HAIL_BACKEND_RELOAD

v03_pipeline/lib/paths.py

+60
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,66 @@ def variant_annotations_vcf_path(
363363
)
364364

365365

366+
def new_clinvar_variants_parquet_path(
367+
reference_genome: ReferenceGenome,
368+
dataset_type: DatasetType,
369+
run_id: str,
370+
) -> str:
371+
return os.path.join(
372+
runs_path(
373+
reference_genome,
374+
dataset_type,
375+
),
376+
run_id,
377+
'new_clinvar_variants.parquet',
378+
)
379+
380+
381+
def new_entries_parquet_path(
382+
reference_genome: ReferenceGenome,
383+
dataset_type: DatasetType,
384+
run_id: str,
385+
) -> str:
386+
return os.path.join(
387+
runs_path(
388+
reference_genome,
389+
dataset_type,
390+
),
391+
run_id,
392+
'new_entries.parquet',
393+
)
394+
395+
396+
def new_transcripts_parquet_path(
397+
reference_genome: ReferenceGenome,
398+
dataset_type: DatasetType,
399+
run_id: str,
400+
) -> str:
401+
return os.path.join(
402+
runs_path(
403+
reference_genome,
404+
dataset_type,
405+
),
406+
run_id,
407+
'new_transcripts.parquet',
408+
)
409+
410+
411+
def new_variants_parquet_path(
412+
reference_genome: ReferenceGenome,
413+
dataset_type: DatasetType,
414+
run_id: str,
415+
) -> str:
416+
return os.path.join(
417+
runs_path(
418+
reference_genome,
419+
dataset_type,
420+
),
421+
run_id,
422+
'new_variants.parquet',
423+
)
424+
425+
366426
def new_variants_table_path(
367427
reference_genome: ReferenceGenome,
368428
dataset_type: DatasetType,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import luigi
2+
3+
from v03_pipeline.lib.misc.io import checkpoint
4+
from v03_pipeline.lib.tasks.files import GCSorLocalFolderTarget
5+
6+
7+
class BaseWriteParquetTask(luigi.Task):
8+
def complete(self) -> luigi.Target:
9+
return GCSorLocalFolderTarget(self.output().path).exists()
10+
11+
def run(self) -> None:
12+
ht = self.create_table()
13+
ht, _ = checkpoint(ht)
14+
df = ht.to_spark(flatten=False)
15+
df = df.withColumnRenamed('key_', 'key')
16+
df.write.parquet(
17+
self.output().path,
18+
mode='overwrite',
19+
)

v03_pipeline/lib/tasks/exports/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)