Skip to content

Commit

Permalink
Allow for multiple inputs from the config file
Browse files Browse the repository at this point in the history
  • Loading branch information
j23414 committed Feb 22, 2025
1 parent de4b930 commit 98cd799
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 50 deletions.
2 changes: 1 addition & 1 deletion phylogenetic/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ rule all:
input:
auspice_json = "auspice/zika.json"

include: "rules/prepare_sequences.smk"
include: "rules/merge_sequences_usvi.smk"
include: "rules/prepare_sequences.smk"
include: "rules/construct_phylogeny.smk"
include: "rules/annotate_phylogeny.smk"
include: "rules/export.smk"
Expand Down
12 changes: 10 additions & 2 deletions phylogenetic/defaults/config_zika.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
# Sequences must be FASTA and metadata must be TSV
# Both files must be zstd compressed
sequences_url: "https://data.nextstrain.org/files/workflows/zika/sequences.fasta.zst"
metadata_url: "https://data.nextstrain.org/files/workflows/zika/metadata.tsv.zst"

inputs:
- name: ncbi
metadata: "s3://nextstrain-data/files/workflows/zika/metadata.tsv.zst"
sequences: "s3://nextstrain-data/files/workflows/zika/sequences.fasta.zst"

additional_inputs:
- name: usvi
metadata: "data/metadata_usvi.tsv"
sequences: "data/sequences_usvi.fasta"

strain_id_field: "accession"

Expand Down
2 changes: 1 addition & 1 deletion phylogenetic/rules/annotate_phylogeny.smk
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ rule traits:
"""
input:
tree = "results/tree.nwk",
metadata = "data/metadata_all.tsv"
metadata = input_metadata,
output:
node_data = "results/traits.json",
params:
Expand Down
2 changes: 1 addition & 1 deletion phylogenetic/rules/construct_phylogeny.smk
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ rule refine:
input:
tree = "results/tree_raw.nwk",
alignment = "results/aligned.fasta",
metadata = "data/metadata_all.tsv"
metadata = input_metadata,
output:
tree = "results/tree.nwk",
node_data = "results/branch_lengths.json"
Expand Down
2 changes: 1 addition & 1 deletion phylogenetic/rules/export.smk
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rule export:
"""Exporting data files for for auspice"""
input:
tree = "results/tree.nwk",
metadata = "data/metadata_all.tsv",
metadata = input_metadata,
branch_lengths = "results/branch_lengths.json",
traits = "results/traits.json",
nt_muts = "results/nt_muts.json",
Expand Down
156 changes: 142 additions & 14 deletions phylogenetic/rules/merge_sequences_usvi.smk
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,150 @@ This part of the workflow usually includes the following steps:
"""

rule append_usvi:
"""Appending USVI sequences"""
input:
sequences = "data/sequences.fasta",
metadata = "data/metadata.tsv",
usvi_sequences = "data/sequences_usvi.fasta",
usvi_metadata = "data/metadata_usvi.tsv"
# ------------- helper functions to collect, merge & download input files ------------------- #
NEXTSTRAIN_PUBLIC_BUCKET = "s3://nextstrain-data/"

def _parse_config_input(input):
"""
Parses information from an individual config-defined input, i.e. an element within `config.inputs` or `config.additional_inputs`
and returns information snakemake rules can use to obtain the underlying data.
The structure of `input` is a dictionary with keys:
- name:string (required)
- metadata:string (optional) - a s3 URI or a local file path
- sequences:string|dict[string,string] (optional) - either a s3 URI or a local file path, in which case
it must include a '{segment}' wildcard substring, or a dict of segment → s3 URI or local file path,
in which case it must not include the wildcard substring.
Returns a dictionary with optional keys:
- metadata:string - the relative path to the metadata file. If the original data was remote then this represents
the output of a rule which downloads the file
- metadata_location:string - the URI for the remote file if applicable else `None`
- sequences:function. Takes in wildcards and returns the relative path to the sequences FASTA for the provided
segment wildcard, or returns `None` if this input doesn't define sequences for the provided segment.
- sequences_location:function. Takes in wildcards and returns the URI for the remote file, or `None`, where applicable.
Raises InvalidConfigError
"""
name = input['name']
lambda_none = lambda w: None

info = {'metadata': None, 'metadata_location': None, 'sequences': lambda_none, 'sequences_location': lambda_none}

def _source(uri, *, s3, local):
if uri.startswith('s3://'):
return s3
elif uri.lower().startswith('http://') or uri.lower().startswith('https://'):
raise InvalidConfigError("Workflow cannot yet handle HTTP[S] inputs")
return local

if location:=input.get('metadata', False):
info['metadata'] = _source(location, s3=f"data/{name}/metadata.tsv", local=location)
info['metadata_location'] = _source(location, s3=location, local=None)

if location:=input.get('sequences', False):
info['sequences'] = _source(location, s3=f"data/{name}/sequences.fasta", local=location)
info['sequences_location'] = _source(location, s3=location, local=None)

#if isinstance(location, dict):
# info['sequences'] = _source(location, s3=f"data/{name}/sequences.fasta", local=location)
# info['sequences_location'] = _source(location, s3=location, local=None) \
#elif isinstance(location, str):
# info['sequences'] = _source(location, s3=f"data/{name}/sequences.fasta", local=location)
# info['sequences_location'] = _source(location, s3=location, local=None)
#else:
# raise InvalidConfigError(f"Config input for {name} specifies sequences in an unknown format; must be dict or string")

return info


def _gather_inputs():
all_inputs = [*config['inputs'], *config.get('additional_inputs', [])]

if len(all_inputs)==0:
raise InvalidConfigError("Config must define at least one element in config.inputs or config.additional_inputs lists")
if not all([isinstance(i, dict) for i in all_inputs]):
raise InvalidConfigError("All of the elements in config.inputs and config.additional_inputs lists must be dictionaries"
"If you've used a command line '--config' double check your quoting.")
if len({i['name'] for i in all_inputs})!=len(all_inputs):
raise InvalidConfigError("Names of inputs (config.inputs and config.additional_inputs) must be unique")
if not all(['name' in i and ('sequences' in i or 'metadata' in i) for i in all_inputs]):
raise InvalidConfigError("Each input (config.inputs and config.additional_inputs) must have a 'name' and 'metadata' and/or 'sequences'")

return {i['name']: _parse_config_input(i) for i in all_inputs}

input_sources = _gather_inputs()

def input_metadata(wildcards):
inputs = [info['metadata'] for info in input_sources.values() if info.get('metadata', None)]
return inputs[0] if len(inputs)==1 else "results/metadata_merged.tsv"

def input_sequences(wildcards):
inputs = [info['sequences'] for info in input_sources.values() if info.get('sequences', None)]
return inputs[0] if len(inputs)==1 else "results/sequences_merged.fasta"

# def input_sequences(wildcards):
# inputs = list(filter(None, [info['sequences'](wildcards) for info in input_sources.values() if info.get('sequences', None)]))
# return inputs[0] if len(inputs)==1 else "results/sequences_merged_{segment}.fasta"

rule download_s3_sequences:
output:
sequences = "data/sequences_all.fasta",
metadata = "data/metadata_all.tsv"
sequences = "data/{input_name}/sequences.fasta",
params:
address = lambda w: input_sources[w.input_name]['sequences_location'],
no_sign_request=lambda w: "--no-sign-request" \
if input_sources[w.input_name]['sequences_location'].startswith(NEXTSTRAIN_PUBLIC_BUCKET) \
else "",
shell:
"""
seqkit rmdup {input.usvi_sequences} {input.sequences} > {output.sequences}
aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.sequences}
"""

rule download_s3_metadata:
output:
metadata = "data/{input_name}/metadata.tsv",
params:
address = lambda w: input_sources[w.input_name]['metadata_location'],
no_sign_request=lambda w: "--no-sign-request" \
if input_sources[w.input_name]['metadata_location'].startswith(NEXTSTRAIN_PUBLIC_BUCKET) \
else "",
shell:
"""
aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.metadata}
"""

rule merge_metadata:
"""
This rule should only be invoked if there are multiple defined metadata inputs
(config.inputs + config.additional_inputs)
"""
input:
**{name: info['metadata'] for name,info in input_sources.items() if info.get('metadata', None)}
params:
metadata = lambda w, input: list(map("=".join, input.items())),
id_field = config['strain_id_field'],
output:
metadata = "results/metadata_merged.tsv"
shell:
r"""
augur merge \
--metadata ingest={input.metadata} usvi={input.usvi_metadata} \
--metadata-id-columns accession \
--output-metadata {output.metadata}
"""
--metadata {params.metadata:q} \
--metadata-id-columns {params.id_field} \
--output-metadata {output.metadata}
"""

rule merge_sequences:
"""
This rule should only be invoked if there are multiple defined sequences inputs
(config.inputs + config.additional_inputs) for this particular segment
"""
input:
**{name: info['sequences'] for name,info in input_sources.items() if info.get('sequences', None)}
output:
sequences = "results/sequences_merged.fasta"
shell:
r"""
seqkit rmdup {input:q} > {output.sequences:q}
"""

# -------------------------------------------------------------------------------------------- #
32 changes: 2 additions & 30 deletions phylogenetic/rules/prepare_sequences.smk
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,6 @@ This part of the workflow usually includes the following steps:
See Augur's usage docs for these commands for more details.
"""

rule download:
"""Downloading sequences and metadata from data.nextstrain.org"""
output:
sequences = "data/sequences.fasta.zst",
metadata = "data/metadata.tsv.zst"
params:
sequences_url = config["sequences_url"],
metadata_url = config["metadata_url"],
shell:
"""
curl -fsSL --compressed {params.sequences_url:q} --output {output.sequences}
curl -fsSL --compressed {params.metadata_url:q} --output {output.metadata}
"""

rule decompress:
"""Decompressing sequences and metadata"""
input:
sequences = "data/sequences.fasta.zst",
metadata = "data/metadata.tsv.zst"
output:
sequences = "data/sequences.fasta",
metadata = "data/metadata.tsv"
shell:
"""
zstd -d -c {input.sequences} > {output.sequences}
zstd -d -c {input.metadata} > {output.metadata}
"""

rule filter:
"""
Filtering to
Expand All @@ -58,8 +30,8 @@ rule filter:
- minimum genome length of {params.min_length} (50% of Zika virus genome)
"""
input:
sequences = "data/sequences_all.fasta",
metadata = "data/metadata_all.tsv",
sequences = input_sequences,
metadata = input_metadata,
exclude = "defaults/dropped_strains.txt",
output:
sequences = "results/filtered.fasta"
Expand Down

0 comments on commit 98cd799

Please sign in to comment.