Skip to content

Commit

Permalink
add suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
anna-parker committed Feb 27, 2025
1 parent c582912 commit c6f6131
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 65 deletions.
114 changes: 56 additions & 58 deletions ingest/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,39 @@ with open("results/config.yaml", "w") as f:

TAXON_ID = config["taxon_id"]
SEGMENTED = config["segmented"]
segment_detection_method: "align" | "minimizer" | None = (
None # align uses nextclade align to determine segments it is slower but more precise, minimizer uses nextclade sort and requires a minimizer index
)
LOG_LEVEL = config.get("log_level", "INFO")
NCBI_API_KEY = os.getenv("NCBI_API_KEY")
NCBI_GATEWAY_URL = config.get("ncbi_gateway_url")
MIRROR_BUCKET = config.get("mirror_bucket", "")
APPROVE_TIMEOUT_MIN = config.get("approve_timeout_min") # time in minutes
CHECK_ENA_DEPOSITION = config.get("check_ena_deposition", False)
ALIGN = True
FILTER = config.get("metadata_filter", None)
GROUPS_OVERRIDE_JSON = config.get("grouping_override", None) # JSON map from group to segments' insdcAccessionFull
GROUPS_OVERRIDE_JSON = config.get(
"grouping_override", None
) # JSON map from group to segments' insdcAccessionFull

dataset_server_map = {}
dataset_name_map = {}

if SEGMENTED:
if config.get("nextclade_sort", None):
ALIGN = False
elif config.get("nextclade_align", None):
ALIGN = True
nextclade_align_params = config["nextclade_align"]
for segment in config["nucleotide_sequences"]:
if nextclade_align_params.get("nextclade_dataset_server_map") and segment in nextclade_align_params["nextclade_dataset_server_map"]:
dataset_server_map[segment] = nextclade_align_params["nextclade_dataset_server_map"][segment]
else:
dataset_server_map[segment] = nextclade_align_params.get("nextclade_dataset_server")
if nextclade_align_params.get("nextclade_dataset_name_map") and segment in nextclade_align_params["nextclade_dataset_name_map"]:
dataset_name_map[segment] = nextclade_align_params["nextclade_dataset_name_map"][segment]
else:
dataset_name_map[segment] = nextclade_align_params.get("nextclade_dataset_name") + "/" + segment
else:
raise ValueError("No alignment or sorting specified for segmented sequences")
dataset_server_map = {}
dataset_name_map = {}

segment_identification = config.get("segment_identification", None)
if not segment_identification:
raise ValueError("Segmented sequences require segment_identification")
segment_detection_method = segment_identification.get("method")
if segment_detection_method == "align":
dataset_server_map[segment] = segment_identification.get(
"nextclade_dataset_server_map", {}
).get(segment, segment_identification.get("nextclade_dataset_server"))
dataset_name_map[segment] = segment_identification.get(
"nextclade_dataset_name_map", {}
).get(
segment,
segment_identification.get("nextclade_dataset_name") + "/" + segment,
)

if os.uname().sysname == "Darwin":
# Don't use conda-forge unzip on macOS
Expand All @@ -60,6 +63,14 @@ else:
unzip = "unzip"


def prepped_metadata():
if FILTER:
return "results/metadata_filtered.ndjson"
if SEGMENTED:
return "results/metadata_post_group.ndjson"
return "results/metadata_post_prepare.ndjson"


rule all:
params:
config=lambda wildcards: str(config),
Expand All @@ -82,8 +93,8 @@ rule fetch_ncbi_dataset_package:
taxon_id=TAXON_ID,
api_key=NCBI_API_KEY,
gateway=f"--gateway-url {NCBI_GATEWAY_URL}" if NCBI_GATEWAY_URL else "",
use_mirror=bool(MIRROR_BUCKET),
mirror_url=f"{MIRROR_BUCKET.strip('/')}/{TAXON_ID}.zip"
use_mirror=bool(MIRROR_BUCKET),
mirror_url=f"{MIRROR_BUCKET.strip('/')}/{TAXON_ID}.zip",
shell:
"""
if [[ "{params.use_mirror}" == "True" ]]; then
Expand Down Expand Up @@ -196,11 +207,12 @@ rule calculate_sequence_hashes:
--output-sequences {output.sequence_json}
"""

if ALIGN:

if SEGMENTED and segment_detection_method == "align":

rule align:
input:
sequences= "results/sequences.fasta",
sequences="results/sequences.fasta",
output:
results="results/nextclade_{segment}.tsv",
params:
Expand Down Expand Up @@ -403,18 +415,24 @@ rule heuristic_group_segments:
fi
"""


if FILTER:

rule metadata_filter:
"""
FILTER allows you to throw out sequences you don't want to ingest based on metadata
E.g. only H5N1, when the genbank download contains all influenza
"""
input:
metadata=(
"results/metadata_post_group.ndjson"
if SEGMENTED
else "results/metadata_post_prepare.ndjson"
"results/metadata_post_group.ndjson"
if SEGMENTED
else "results/metadata_post_prepare.ndjson"
),
sequences=(
"results/sequences_post_group.ndjson"
if SEGMENTED
else "results/sequences.ndjson"
"results/sequences_post_group.ndjson"
if SEGMENTED
else "results/sequences.ndjson"
),
script="scripts/metadata_filter.py",
config="results/config.yaml",
Expand Down Expand Up @@ -454,14 +472,7 @@ rule get_previous_submissions:
# Reduce likelihood of race condition of multi-submission
# By delaying the start of the script
script="scripts/call_loculus.py",
prepped_metadata=(
"results/metadata_filtered.ndjson"
if FILTER
else
"results/metadata_post_group.ndjson"
if SEGMENTED
else "results/metadata_post_prepare.ndjson"
),
prepped_metadata=prepped_metadata(),
config="results/config.yaml",
output:
hashes="results/previous_submissions.json",
Expand All @@ -484,14 +495,7 @@ rule compare_hashes:
script="scripts/compare_hashes.py",
config="results/config.yaml",
old_hashes="results/previous_submissions.json",
metadata=(
"results/metadata_filtered.ndjson"
if FILTER
else
"results/metadata_post_group.ndjson"
if SEGMENTED
else "results/metadata_post_prepare.ndjson"
),
metadata=prepped_metadata(),
output:
to_submit="results/to_submit.json",
to_revise="results/to_revise.json",
Expand Down Expand Up @@ -523,21 +527,15 @@ rule prepare_files:
input:
script="scripts/prepare_files.py",
config="results/config.yaml",
metadata=(
"results/metadata_filtered.ndjson"
if FILTER
else
"results/metadata_post_group.ndjson"
if SEGMENTED
else "results/metadata_post_prepare.ndjson"
),
metadata=prepped_metadata(),
sequences=(
"results/sequences_filtered.ndjson"
if FILTER
else
"results/sequences_post_group.ndjson"
if SEGMENTED
else "results/sequences.ndjson"
else (
"results/sequences_post_group.ndjson"
if SEGMENTED
else "results/sequences.ndjson"
)
),
to_submit="results/to_submit.json",
to_revise="results/to_revise.json",
Expand Down
3 changes: 2 additions & 1 deletion ingest/tests/config_cchf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ insdc_segment_specific_fields:
- insdcAccessionFull
- insdcRawReadsAccession
keycloak_token_url: http://localhost:8083/realms/loculus/protocol/openid-connect/token
nextclade_align:
segment_identification:
method: "align"
nextclade_dataset_name: nextstrain/cchfv/linked
nextclade_dataset_server: https://raw.githubusercontent.com/nextstrain/nextclade_data/cornelius-cchfv/data_output
nucleotide_sequences:
Expand Down
3 changes: 2 additions & 1 deletion kubernetes/loculus/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1576,7 +1576,8 @@ defaultOrganisms:
configFile:
<<: *ingestConfigFile
taxon_id: 3052518
nextclade_align:
segment_identification:
method: "align"
nextclade_dataset_server: https://raw.githubusercontent.com/nextstrain/nextclade_data/cornelius-cchfv/data_output
nextclade_dataset_name: nextstrain/cchfv/linked
grouping_override: https://anna-parker.github.io/influenza-a-groupings/results/cchf_groups.json
Expand Down
10 changes: 5 additions & 5 deletions preprocessing/nextclade/src/loculus_preprocessing/prepro.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ def add_input_metadata(
return None
errors.append(annotation)
return None
spec.args["segment_aligned"] = True
spec.args["some_segment_aligned"] = True
result: str | None = str(
dpath.get(
unprocessed.nextcladeMetadata[segment],
Expand Down Expand Up @@ -647,7 +647,7 @@ def process_single( # noqa: C901
if key in config.processing_spec:
output_metadata[key] = len(sequence) if sequence else 0

segment_aligned = False
some_segment_aligned = False
for output_field, spec_dict in config.processing_spec.items():
length_fields = [
"length" if segment == "main" else "length_" + segment
Expand All @@ -662,7 +662,7 @@ def process_single( # noqa: C901
args=spec_dict.get("args", {}),
)
spec.args = {} if spec.args is None else spec.args
spec.args["segment_aligned"] = segment_aligned
spec.args["some_segment_aligned"] = some_segment_aligned
processing_result = get_metadata(
id,
spec,
Expand All @@ -673,7 +673,7 @@ def process_single( # noqa: C901
config,
)
output_metadata[output_field] = processing_result.datum
segment_aligned = spec.args["segment_aligned"]
some_segment_aligned = spec.args["some_segment_aligned"]
if (
null_per_backend(processing_result.datum)
and spec.required
Expand Down Expand Up @@ -704,7 +704,7 @@ def process_single( # noqa: C901
id, unprocessed, config, output_metadata, errors, warnings
)

if not segment_aligned:
if not some_segment_aligned:
errors.append(
ProcessingAnnotation(
unprocessedFields=[
Expand Down

0 comments on commit c6f6131

Please sign in to comment.