Skip to content

feat: add tasks for writing parquet tables. #1076

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 65 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
b482487
Formatting function enums
bpblanken Apr 9, 2025
f1a5a6c
improve function name
bpblanken Apr 9, 2025
18a38c9
handle lookups that propagate missing
bpblanken Apr 9, 2025
e006df0
format
bpblanken Apr 9, 2025
27d2464
ruff
bpblanken Apr 9, 2025
345a619
rename func
bpblanken Apr 9, 2025
f4c611c
second func
bpblanken Apr 9, 2025
6931585
start test
bpblanken Apr 10, 2025
e5880c9
finish tests
bpblanken Apr 10, 2025
2937808
Merge branch 'main' of github.com:broadinstitute/seqr-loading-pipelin…
bpblanken Apr 10, 2025
2c60e10
ruff
bpblanken Apr 10, 2025
e08ce89
Merge branch 'main' of github.com:broadinstitute/seqr-loading-pipelin…
bpblanken Apr 17, 2025
21e2d95
Add camelcase
bpblanken Apr 17, 2025
2a05241
add camelcase
bpblanken Apr 18, 2025
236d5bb
ruff
bpblanken Apr 18, 2025
135d0b3
missing init py
bpblanken Apr 18, 2025
085e886
move functions to export
bpblanken Apr 18, 2025
f2165ce
mostly functioning entries task
bpblanken Apr 20, 2025
4880195
import
bpblanken Apr 20, 2025
f28ec6a
improve private reference datasets logic
bpblanken Apr 20, 2025
359d773
first pass
bpblanken Apr 20, 2025
2d4420f
progress
bpblanken Apr 20, 2025
3964c8f
tests passing
bpblanken Apr 21, 2025
6f40918
Merge branch 'benb/add_key_to_pipeline' into benb/deenumerate_for_export
bpblanken Apr 21, 2025
8b67aa7
key
bpblanken Apr 21, 2025
752b123
annotations table
bpblanken Apr 21, 2025
fcaba23
test transcripts
bpblanken Apr 22, 2025
dd0c024
closer
bpblanken Apr 22, 2025
23ee29a
missed some tests
bpblanken Apr 22, 2025
b9b87a0
Merge branch 'benb/add_key_to_pipeline' of github.com:broadinstitute/…
bpblanken Apr 22, 2025
81e9ba4
add a parquet reader requirement
bpblanken Apr 22, 2025
bd67120
use project tables
bpblanken Apr 22, 2025
e4b6535
Merge branch 'main' of github.com:broadinstitute/seqr-loading-pipelin…
bpblanken Apr 23, 2025
7768409
finish off first pass at test!
bpblanken Apr 24, 2025
69e7bf1
reformat test
bpblanken Apr 24, 2025
3e79ff2
another batch
bpblanken Apr 24, 2025
5181848
add filters
bpblanken Apr 24, 2025
84e8472
ruff
bpblanken Apr 24, 2025
7985be8
handle hgmd edge case
bpblanken Apr 24, 2025
76de8b6
add hgmd
bpblanken Apr 24, 2025
125ea95
sort it
bpblanken Apr 24, 2025
9528d3c
ruff
bpblanken Apr 24, 2025
bb5aab3
bugfixes
bpblanken Apr 24, 2025
9d9a138
update test to new format
bpblanken Apr 25, 2025
1469298
ruff
bpblanken Apr 25, 2025
ac967b6
v03
bpblanken Apr 25, 2025
7b594ac
merge
bpblanken Apr 28, 2025
bd1a037
merge
bpblanken Apr 28, 2025
0de3398
Merge branch 'main' of github.com:broadinstitute/seqr-loading-pipelin…
bpblanken Apr 28, 2025
58b3355
no longer used
bpblanken Apr 28, 2025
944b102
Merge branch 'main' of github.com:broadinstitute/seqr-loading-pipelin…
bpblanken Apr 28, 2025
ae807c2
lint
bpblanken Apr 29, 2025
026c1ba
formatting
bpblanken Apr 29, 2025
6b172ca
special case the export
bpblanken Apr 29, 2025
6277649
Merge branch 'main' of github.com:broadinstitute/seqr-loading-pipelin…
bpblanken Apr 29, 2025
6f4b736
remove gene/map
bpblanken Apr 29, 2025
5af48f5
ruff
bpblanken Apr 29, 2025
585724f
add new annotations
bpblanken Apr 29, 2025
459e510
print
bpblanken Apr 29, 2025
c19c16b
ruff
bpblanken Apr 29, 2025
e972898
Merge branch 'main' of github.com:broadinstitute/seqr-loading-pipelin…
bpblanken May 4, 2025
ca2daf5
canonical is not a float
bpblanken May 5, 2025
1b4b265
add new clinvar variants parquet export (#1092)
bpblanken May 6, 2025
ccbc80d
feat: grch37 SNV_INDEL export (#1095)
bpblanken May 6, 2025
c25dbc6
feat: alphabetize nested field (#1096)
bpblanken May 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements-dev.in
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ responses>=0.23.1
ruff>=0.1.8
shellcheck-py>=0.10.0
pysam
pyarrow
2 changes: 2 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ packaging==24.2
# sphinx
pip-tools==7.4.1
# via -r requirements-dev.in
pyarrow==19.0.1
# via -r requirements-dev.in
pygments==2.19.1
# via
# -c requirements.txt
Expand Down
36 changes: 36 additions & 0 deletions v03_pipeline/lib/model/dataset_type.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import OrderedDict
from collections.abc import Callable
from enum import StrEnum

Expand Down Expand Up @@ -390,6 +391,10 @@ def filter_invalid_sites(self):
def should_export_to_vcf(self):
return self == DatasetType.SV

@property
def should_export_to_parquet(self):
return self == DatasetType.SNV_INDEL

@property
def export_vcf_annotation_fns(self) -> list[Callable[..., hl.Expression]]:
return {
Expand All @@ -400,6 +405,37 @@ def export_vcf_annotation_fns(self) -> list[Callable[..., hl.Expression]]:
],
}[self]

def export_parquet_filterable_transcripts_fields(
self,
reference_genome: ReferenceGenome,
) -> OrderedDict[str, str]:
fields = ['geneId']
if self in {DatasetType.SV, DatasetType.GCNV}:
fields = [
*fields,
'majorConsequence',
]
if self in {DatasetType.SNV_INDEL, DatasetType.MITO}:
fields = [
*fields,
'canonical',
'consequenceTerms',
]
fields = {
# above fields are renamed to themselves
k: k
for k in fields
}
if self == DatasetType.SNV_INDEL and reference_genome == ReferenceGenome.GRCh38:
fields = {
**fields,
'alphamissensePathogenicity': 'alphamissense.pathogenicity',
'extendedIntronicSpliceRegionVariant': 'spliceregion.extended_intronic_splice_region_variant',
'fiveutrConsequence': 'utrannotator.fiveutrConsequence',
}
# Parquet export expects all fields sorted alphabetically
return OrderedDict(sorted(fields.items()))

@property
def overwrite_male_non_par_calls(self) -> None:
return self == DatasetType.SV
Expand Down
2 changes: 2 additions & 0 deletions v03_pipeline/lib/model/feature_flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
)
CHECK_SEX_AND_RELATEDNESS = os.environ.get('CHECK_SEX_AND_RELATEDNESS') == '1'
EXPECT_TDR_METRICS = os.environ.get('EXPECT_TDR_METRICS') == '1'
EXPORT_TO_PARQUET = os.environ.get('EXPORT_TO_PARQUET') == '1'
INCLUDE_PIPELINE_VERSION_IN_PREFIX = (
os.environ.get('INCLUDE_PIPELINE_VERSION_IN_PREFIX') == '1'
)
Expand All @@ -21,6 +22,7 @@ class FeatureFlag:
ACCESS_PRIVATE_REFERENCE_DATASETS: bool = ACCESS_PRIVATE_REFERENCE_DATASETS
CHECK_SEX_AND_RELATEDNESS: bool = CHECK_SEX_AND_RELATEDNESS
EXPECT_TDR_METRICS: bool = EXPECT_TDR_METRICS
EXPORT_TO_PARQUET: bool = EXPORT_TO_PARQUET
INCLUDE_PIPELINE_VERSION_IN_PREFIX: bool = INCLUDE_PIPELINE_VERSION_IN_PREFIX
RUN_PIPELINE_ON_DATAPROC: bool = RUN_PIPELINE_ON_DATAPROC
SHOULD_TRIGGER_HAIL_BACKEND_RELOAD: bool = SHOULD_TRIGGER_HAIL_BACKEND_RELOAD
60 changes: 60 additions & 0 deletions v03_pipeline/lib/paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,66 @@ def variant_annotations_vcf_path(
)


def new_clinvar_variants_parquet_path(
reference_genome: ReferenceGenome,
dataset_type: DatasetType,
run_id: str,
) -> str:
return os.path.join(
runs_path(
reference_genome,
dataset_type,
),
run_id,
'new_clinvar_variants.parquet',
)


def new_entries_parquet_path(
reference_genome: ReferenceGenome,
dataset_type: DatasetType,
run_id: str,
) -> str:
return os.path.join(
runs_path(
reference_genome,
dataset_type,
),
run_id,
'new_entries.parquet',
)


def new_transcripts_parquet_path(
reference_genome: ReferenceGenome,
dataset_type: DatasetType,
run_id: str,
) -> str:
return os.path.join(
runs_path(
reference_genome,
dataset_type,
),
run_id,
'new_transcripts.parquet',
)


def new_variants_parquet_path(
reference_genome: ReferenceGenome,
dataset_type: DatasetType,
run_id: str,
) -> str:
return os.path.join(
runs_path(
reference_genome,
dataset_type,
),
run_id,
'new_variants.parquet',
)


def new_variants_table_path(
reference_genome: ReferenceGenome,
dataset_type: DatasetType,
Expand Down
19 changes: 19 additions & 0 deletions v03_pipeline/lib/tasks/base/base_write_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import luigi

from v03_pipeline.lib.misc.io import checkpoint
from v03_pipeline.lib.tasks.files import GCSorLocalFolderTarget


class BaseWriteParquetTask(luigi.Task):
def complete(self) -> luigi.Target:
return GCSorLocalFolderTarget(self.output().path).exists()

def run(self) -> None:
ht = self.create_table()
ht, _ = checkpoint(ht)
df = ht.to_spark(flatten=False)
df = df.withColumnRenamed('key_', 'key')
df.write.parquet(
self.output().path,
mode='overwrite',
)
Empty file.
Loading