Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
Change Log
==========

3.13.0
=====
* Add `conversion-bam-to-cram` MetaWorkflowRun creation function for SMaHT


3.12.0
=====
* Add Kinnex alignment MetaWorkflowRun creation function for SMaHT
Expand Down
25 changes: 25 additions & 0 deletions magma_smaht/commands/create_meta_workflow_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
mwfr_short_read_fastqc,
mwfr_custom_qc,
mwfr_sample_identity_check,
mwfr_bam_to_cram,
)
from magma_smaht.utils import get_auth_key

Expand Down Expand Up @@ -430,6 +431,30 @@ def conversion_bam_to_fastq(fileset_accessions, auth_env):
mwfr_bam_to_fastq_paired_end(fileset_accession, smaht_key)


@cli.command()
@click.help_option("--help", "-h")
@click.option(
"-f",
"--fileset-accessions",
required=True,
multiple=True,
type=str,
help="Fileset accession(s)",
)
@click.option(
"-e",
"--auth-env",
required=True,
type=str,
help="Name of environment in smaht-keys file",
)
def conversion_bam_to_cram(fileset_accessions, auth_env):
"""Conversion MWFR for final output BAMs to CRAM"""
smaht_key = get_auth_key(auth_env)
mwfr_bam_to_cram(fileset_accessions, smaht_key)



@cli.command()
@click.help_option("--help", "-h")
@click.option(
Expand Down
7 changes: 6 additions & 1 deletion magma_smaht/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
MWF_NAME_FASTQ_SHORT_READ = "short_reads_FASTQ_quality_metrics"
MWF_NAME_CRAM_TO_FASTQ_PAIRED_END = "cram_to_fastq_paired-end"
MWF_NAME_BAM_TO_FASTQ_PAIRED_END = "bam_to_fastq_paired-end"
MWF_NAME_BAM_TO_CRAM = "bam_to_cram"
MWF_NAME_BAMQC_SHORT_READ = "paired-end_short_reads_BAM_quality_metrics_GRCh38"
MWF_NAME_ULTRA_LONG_BAMQC = "ultra-long_reads_BAM_quality_metrics_GRCh38"
MWF_NAME_LONG_READ_BAMQC = "long_reads_BAM_quality_metrics_GRCh38"
Expand Down Expand Up @@ -54,4 +55,8 @@
STATUS = "status"
FIRST_STRANDED = "First Stranded"
SECOND_STRANDED = "Second Stranded"
FAILED_JOBS = "failed_jobs"
FAILED_JOBS = "failed_jobs"

# Assays
WGS = "WGS"
RNASEQ = "RNA-seq"
78 changes: 74 additions & 4 deletions magma_smaht/create_metawfr.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
get_wfr_from_mwfr,
get_tag_for_sample_identity_check,
get_item,
get_item_es,
get_latest_somalier_run_for_donor,
get_alignment_mwfr,
get_final_output_file,
has_bam_to_cram_mwfr,
)

from magma_smaht.constants import (
Expand All @@ -45,6 +49,7 @@
MWF_NAME_FASTQ_SHORT_READ,
MWF_NAME_CRAM_TO_FASTQ_PAIRED_END,
MWF_NAME_BAM_TO_FASTQ_PAIRED_END,
MWF_NAME_BAM_TO_CRAM,
MWF_NAME_BAMQC_SHORT_READ,
MWF_NAME_ULTRA_LONG_BAMQC,
MWF_NAME_LONG_READ_BAMQC,
Expand Down Expand Up @@ -72,6 +77,7 @@
UPLOADED,
FIRST_STRANDED,
SECOND_STRANDED,
WGS
)


Expand Down Expand Up @@ -298,6 +304,69 @@ def mwfr_bam_to_fastq_paired_end(fileset_accession, smaht_key):
create_and_post_mwfr(mwf[UUID], file_set, INPUT_FILES_BAM, mwfr_input, smaht_key)


def mwfr_bam_to_cram(fileset_accessions, smaht_key):
"""Converts final output BAM files to CRAM files in a fileset"""

mwf = get_latest_mwf(MWF_NAME_BAM_TO_CRAM, smaht_key)

warnings = []

for fileset_accession in fileset_accessions:
file_set = get_item_es(fileset_accession, smaht_key, frame='embedded')
file_set_raw = get_item_es(fileset_accession, smaht_key)

if has_bam_to_cram_mwfr(file_set, smaht_key):
warnings.append(
f"Fileset {file_set[ACCESSION]} already has BAM to CRAM MetaWorkflowRun, skipping."
)
continue

alignment_mwfr = get_alignment_mwfr(file_set, smaht_key)
if not alignment_mwfr:
warnings.append(
f"Alignment MetaWorkflowRun not found for fileset: {file_set[ACCESSION]}"
)
continue

final_ouput_file = get_final_output_file(alignment_mwfr, WGS, smaht_key)
if not final_ouput_file:
warnings.append(
f"Final output file not found for fileset: {file_set[ACCESSION]}"
)
continue

file_format = final_ouput_file["file_format"]["display_title"]
if file_format not in ["cram", "bam"]:
warnings.append(
f"Unexpected file format '{file_format}' of final output file for fileset: {file_set[ACCESSION]}"
)
continue
if file_format == "cram":
warnings.append(
f"Fileset {file_set[ACCESSION]} already has final output CRAM files, skipping conversion."
)
continue

bams = [{"file": final_ouput_file[UUID], "dimension": "0"}]

mwfr_input = [
get_mwfr_file_input_arg(INPUT_FILES_BAM, bams),
]
mwfr_accession = create_and_post_mwfr(
mwf[UUID], file_set_raw, INPUT_FILES_BAM, mwfr_input, smaht_key, verbose=False
)
print(
f"MetaWorkflowRun for BAM to CRAM conversion created. Fileset/BAM/MWFR: {file_set[ACCESSION]}/{final_ouput_file[ACCESSION]}/{mwfr_accession}"
)


if warnings:
print("\nWarnings:")
for warning in warnings:
print(f"- {warning}")



################################################
# QC MetaWorkflowRuns
################################################
Expand Down Expand Up @@ -642,23 +711,24 @@ def get_core_alignment_mwfr_input(file_set, file_input_arg, smaht_key):
return mwfr_input


def create_and_post_mwfr(mwf_uuid, file_set, input_arg, mwfr_input, smaht_key):
def create_and_post_mwfr(mwf_uuid, file_set, input_arg, mwfr_input, smaht_key, verbose=True):

mwfr = mwfr_from_input(mwf_uuid, mwfr_input, input_arg, smaht_key)
if file_set:
mwfr[FILE_SETS] = [file_set[UUID]]
mwfr[COMMON_FIELDS] = get_common_fields(file_set)
# mwfr['final_status'] = 'stopped'
# pprint.pprint(mwfr)
#pprint.pprint(mwfr)

post_response = ff_utils.post_metadata(mwfr, META_WORFLOW_RUN, smaht_key)
mwfr_accession = post_response["@graph"][0]["accession"]
if file_set:
if file_set and verbose:
print(
f"Posted MetaWorkflowRun {mwfr_accession} for Fileset {file_set[ACCESSION]}."
)
else:
elif verbose:
print(f"Posted MetaWorkflowRun {mwfr_accession}.")
return mwfr_accession


def filter_list_of_dicts(list_of_dics, property_target, target):
Expand Down
88 changes: 88 additions & 0 deletions magma_smaht/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@
UUID,
CONSORTIA,
SUBMISSION_CENTERS,
STATUS,
COMPLETED,
DELETED,
ACCESSION,
WGS,
RNASEQ,
MWF_NAME_BAM_TO_CRAM
)

from packaging import version
Expand Down Expand Up @@ -457,8 +464,89 @@ def generate_input_structure(files):
exit()


def has_bam_to_cram_mwfr(fileset, key):
"""Check if the fileset has a BAM to CRAM workflow run.

Args:
fileset (dict): Fileset item from portal

Returns:
bool: True if BAM to CRAM workflow run exists, False otherwise
"""
mwfrs = fileset.get("meta_workflow_runs", [])
for mwfr in mwfrs:
mwf_item = get_item_es(mwfr['meta_workflow'][UUID], key)
if mwf_item['name'] == MWF_NAME_BAM_TO_CRAM:
return True

return False

def get_alignment_mwfr(fileset, key):
mwfrs = fileset.get("meta_workflow_runs", [])
results = []
for mwfr in mwfrs:
mwfr_item = get_item_es(mwfr[UUID], key, frame='embedded')
if mwfr_item[STATUS] == DELETED or mwfr_item["final_status"] != COMPLETED:
continue
categories = mwfr_item["meta_workflow"]["category"]

if "Alignment" in categories:
results.append(mwfr_item)
if len(results) == 1:
return results[0]
elif len(results) > 1:
mwfr = results[-1] # Take the last one if there are multiple
print(
f"Warning: Fileset {fileset[ACCESSION]} has multiple alignment MWFRs. Taking last one: {mwfr[ACCESSION]}"
)
return mwfr
return None

def get_final_output_file(mwfr, assay, key):
"""Get the final output file from a MetaWorkflowRun based on assay.

Args:
mwfr (dict): MetaWorkflowRun item from portal
assay (str): Processing mode (WGS or RNASEQ)
key (dict): Portal authorization key

Returns:
dict: Final output file item from portal, or None if not found

Raises:
ValueError: If assay is not supported
"""
if assay not in [WGS, RNASEQ]:
raise ValueError(f"Unsupported assay: {assay}. Supported assays are: {WGS}, {RNASEQ}")

mwf_version = version.parse(mwfr["meta_workflow"]["version"])
threshold_version = version.parse("0.3.0")

# Define workflow mapping based on assay and version
if assay == WGS:
if mwf_version <= threshold_version:
target_workflow = "samtools_merge"
else:
target_workflow = "bam_to_cram"
elif assay == RNASEQ:
target_workflow = "sentieon_Dedup"

# Find the target workflow run
for workflow_run in mwfr["workflow_runs"]:
if workflow_run["name"] == target_workflow:
file_uuid = workflow_run["output"][0]["file"][UUID]
file = get_item_es(file_uuid, key, frame='embedded')
if file["output_status"] == "Final Output":
return file

return None

def get_item(identifier, key, frame="raw"):
return ff_utils.get_metadata(
identifier, add_on=f"frame={frame}&datastore=database", key=key
)

def get_item_es(identifier, key, frame="raw"):
return ff_utils.get_metadata(
identifier, add_on=f"frame={frame}", key=key
)
1 change: 1 addition & 0 deletions magma_smaht/wrangler_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ def sample_identity_check_status(num_files: int, smaht_key: dict):
"&sequencing_center.display_title=BCM+GCC"
"&status=uploaded&status=released"
"&file_format.display_title=bam"
"&file_format.display_title=cram"
"&output_status=Final Output"
f"&limit={num_files}"
"&sort=date_created"
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "magma-suite"
version = "3.12.0"
version = "3.13.0"
description = "Collection of tools to manage meta-workflows automation."
authors = ["Michele Berselli <[email protected]>", "Doug Rioux", "Soo Lee", "CGAP team"]
license = "MIT"
Expand Down