Skip to content

Commit c287614

Browse files
authored
Merge pull request #1247 from cfe-lab/fasta-to-fastq
FASTQ generation tools
2 parents f88c1d8 + 7e148ea commit c287614

6 files changed

+875
-0
lines changed

micall/main.py

+3
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@
8888
"micall/monitor/update_qai.py",
8989
"micall/monitor/micall_watcher.py",
9090
"micall/tcr/igblast.py",
91+
"micall/utils/fasta_to_fastq.py",
92+
"micall/utils/append_primers.py",
93+
"micall/utils/randomize_fastq.py",
9194
]
9295

9396

micall/tests/test_append_primers.py

+162
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
2+
import pytest
3+
from pathlib import Path
4+
from Bio import SeqIO
5+
from Bio.Seq import Seq
6+
from Bio.SeqRecord import SeqRecord
7+
8+
from micall.utils.append_primers import (
9+
append_primers_to_record,
10+
entry,
11+
parse_arguments,
12+
append_primers,
13+
UserError,
14+
DEFAULT_FORWARD_PRIMER,
15+
DEFAULT_REVERSE_PRIMER,
16+
)
17+
18+
19+
def test_append_primers_to_record():
20+
original_seq = "ATGCGT"
21+
record = SeqRecord(Seq(original_seq), id="test", description="dummy")
22+
fwd_primer = "AAA"
23+
rev_primer = "TTT"
24+
new_record = append_primers_to_record(record, fwd_primer, rev_primer)
25+
expected_seq = fwd_primer + original_seq + rev_primer
26+
assert str(new_record.seq) == expected_seq
27+
assert new_record.id == record.id
28+
assert new_record.description == record.description
29+
30+
31+
def test_parse_arguments_defaults():
32+
# simulate command-line arguments,
33+
# note that the first two arguments are input and output files.
34+
test_args = [
35+
"input.fa",
36+
"output.fa",
37+
]
38+
args = parse_arguments(test_args)
39+
# Check that default primers are set.
40+
assert args.input_fasta == Path("input.fa")
41+
assert args.output_fasta == Path("output.fa")
42+
assert args.forward_primer == DEFAULT_FORWARD_PRIMER
43+
assert args.reverse_primer == DEFAULT_REVERSE_PRIMER
44+
45+
46+
def test_parse_arguments_custom_primers():
47+
test_args = [
48+
"in.fa",
49+
"out.fa",
50+
"--forward-primer", "CUSTOMFWD",
51+
"--reverse-primer", "CUSTOMREV"
52+
]
53+
args = parse_arguments(test_args)
54+
assert args.forward_primer == "CUSTOMFWD"
55+
assert args.reverse_primer == "CUSTOMREV"
56+
57+
58+
@pytest.fixture
59+
def temp_fasta_file(tmp_path: Path) -> Path:
60+
# Create a temporary FASTA file with two records.
61+
fasta_path = tmp_path / "input.fa"
62+
records = [
63+
SeqRecord(Seq("ATGC"), id="rec1", description="first"),
64+
SeqRecord(Seq("GATTACA"), id="rec2", description="second"),
65+
]
66+
with fasta_path.open("w") as f:
67+
SeqIO.write(records, f, "fasta")
68+
return fasta_path
69+
70+
71+
def test_append_primers_default(temp_fasta_file: Path, tmp_path: Path):
72+
# Use default primers.
73+
output_path = tmp_path / "output.fa"
74+
append_primers(
75+
input=temp_fasta_file,
76+
output=output_path,
77+
forward_primer=DEFAULT_FORWARD_PRIMER,
78+
reverse_primer=DEFAULT_REVERSE_PRIMER,
79+
)
80+
# Now read back the output.
81+
records = list(SeqIO.parse(str(output_path), "fasta"))
82+
# Check that the primers were appended.
83+
for rec in records:
84+
# The record sequence should start with DEFAULT_FORWARD_PRIMER
85+
# and end with DEFAULT_REVERSE_PRIMER.
86+
seq_str = str(rec.seq)
87+
assert seq_str.startswith(DEFAULT_FORWARD_PRIMER)
88+
assert seq_str.endswith(DEFAULT_REVERSE_PRIMER)
89+
# The middle part should be the original sequence. For instance,
90+
# for rec1, the original sequence was "ATGC".
91+
# We can extract the original: sequence[len(forward): -len(reverse)]
92+
original_seq = seq_str[len(DEFAULT_FORWARD_PRIMER): -len(DEFAULT_REVERSE_PRIMER)]
93+
# Check that the original sequence exists in the input file.
94+
# Without building a dict from the input file, we can simply check that
95+
# the extracted sequence is one of the known ones.
96+
assert original_seq in {"ATGC", "GATTACA"}
97+
98+
99+
def test_append_primers_default_main(temp_fasta_file: Path,
100+
tmp_path: Path,
101+
monkeypatch):
102+
# Use default primers.
103+
output_path = tmp_path / "output.fa"
104+
105+
monkeypatch.setattr("sys.argv", ["append_primers.py",
106+
str(temp_fasta_file),
107+
str(output_path),
108+
])
109+
110+
with pytest.raises(SystemExit) as exinfo:
111+
entry()
112+
113+
ex: SystemExit = exinfo.value
114+
assert ex.code == 0
115+
116+
# Now read back the output.
117+
records = list(SeqIO.parse(str(output_path), "fasta"))
118+
# Check that the primers were appended.
119+
for rec in records:
120+
# The record sequence should start with DEFAULT_FORWARD_PRIMER
121+
# and end with DEFAULT_REVERSE_PRIMER.
122+
seq_str = str(rec.seq)
123+
assert seq_str.startswith(DEFAULT_FORWARD_PRIMER)
124+
assert seq_str.endswith(DEFAULT_REVERSE_PRIMER)
125+
# The middle part should be the original sequence. For instance,
126+
# for rec1, the original sequence was "ATGC".
127+
# We can extract the original: sequence[len(forward): -len(reverse)]
128+
original_seq = seq_str[len(DEFAULT_FORWARD_PRIMER): -len(DEFAULT_REVERSE_PRIMER)]
129+
# Check that the original sequence exists in the input file.
130+
# Without building a dict from the input file, we can simply check that
131+
# the extracted sequence is one of the known ones.
132+
assert original_seq in {"ATGC", "GATTACA"}
133+
134+
135+
def test_append_primers_custom(temp_fasta_file: Path, tmp_path: Path):
136+
output_path = tmp_path / "output_custom.fa"
137+
custom_fwd = "CUSTOMFWD"
138+
custom_rev = "CUSTOMREV"
139+
append_primers(
140+
input=temp_fasta_file,
141+
output=output_path,
142+
forward_primer=custom_fwd,
143+
reverse_primer=custom_rev,
144+
)
145+
records = list(SeqIO.parse(str(output_path), "fasta"))
146+
for rec in records:
147+
seq_str = str(rec.seq)
148+
assert seq_str.startswith(custom_fwd)
149+
assert seq_str.endswith(custom_rev)
150+
151+
152+
def test_append_primers_missing_input(tmp_path: Path):
153+
# Create a non-existent input file path.
154+
missing_file = tmp_path / "non_existent.fa"
155+
output_file = tmp_path / "output.fa"
156+
with pytest.raises(UserError):
157+
append_primers(
158+
input=missing_file,
159+
output=output_file,
160+
forward_primer=DEFAULT_FORWARD_PRIMER,
161+
reverse_primer=DEFAULT_REVERSE_PRIMER,
162+
)

micall/tests/test_randomize_fastq.py

+199
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
import random
2+
import pytest
3+
from pathlib import Path
4+
from io import StringIO
5+
6+
from Bio import SeqIO
7+
from Bio.Seq import Seq
8+
from Bio.SeqRecord import SeqRecord
9+
10+
from micall.utils.randomize_fastq import (
11+
introduce_errors,
12+
process_records,
13+
process_fastq,
14+
get_parser,
15+
main,
16+
entry,
17+
NUCLEOTIDES,
18+
)
19+
20+
21+
# --- Tests for introduce_errors ---
22+
def test_introduce_errors_no_error():
23+
seq = "ACGTACGT"
24+
qualities = [30] * len(seq)
25+
# no errors: all error rates set to 0.
26+
new_seq, new_quals = introduce_errors(seq, qualities,
27+
subst_rate=0.0,
28+
ins_rate=0.0,
29+
del_rate=0.0,
30+
ins_quality=20)
31+
assert new_seq == seq
32+
assert new_quals == qualities
33+
34+
35+
def test_introduce_errors_full_deletion():
36+
seq = "ACGTACGT"
37+
qualities = [30] * len(seq)
38+
new_seq, new_quals = introduce_errors(seq, qualities,
39+
subst_rate=0.0,
40+
ins_rate=0.0,
41+
del_rate=1.0,
42+
ins_quality=20)
43+
# With full deletion, expect empty sequence and empty quality list.
44+
assert new_seq == ""
45+
assert new_quals == []
46+
47+
48+
def test_introduce_errors_full_substitution():
49+
seq = "ACGT"
50+
qualities = [30, 31, 32, 33]
51+
# Set subst_rate=1 so every base is substituted.
52+
new_seq, new_quals = introduce_errors(seq, qualities,
53+
subst_rate=1.0,
54+
ins_rate=0.0,
55+
del_rate=0.0,
56+
ins_quality=20)
57+
assert len(new_seq) == len(seq)
58+
# for each base, new base should be different than original.
59+
for original, new in zip(seq, new_seq):
60+
assert new.upper() in NUCLEOTIDES
61+
assert new.upper() != original.upper()
62+
assert new_quals == qualities
63+
64+
65+
def test_introduce_errors_full_insertion():
66+
seq = "ACGT"
67+
qualities = [30] * len(seq)
68+
new_seq, new_quals = introduce_errors(seq, qualities,
69+
subst_rate=0.0,
70+
ins_rate=1.0,
71+
del_rate=0.0,
72+
ins_quality=15)
73+
# Every base should be followed by an inserted base: length 2*len(seq)
74+
assert len(new_seq) == 2 * len(seq)
75+
# And the bases at even indices (0-indexed) should be the original bases.
76+
for i, base in enumerate(seq):
77+
assert new_seq[2*i] == base
78+
# Check inserted qualities:
79+
# The inserted qualities (at odd indices) should equal ins_quality.
80+
for i in range(len(seq)):
81+
assert new_quals[2*i] == qualities[i]
82+
assert new_quals[2*i+1] == 15
83+
84+
85+
# --- Test for process_records using an in-memory FASTQ string ---
86+
def test_process_records_no_error():
87+
fastq_str = (
88+
"@read1\n"
89+
"ACGT\n"
90+
"+\n"
91+
"!!!!\n"
92+
)
93+
94+
# in FASTQ, qualities are usually encoded by ascii, but BioPython converts
95+
# them to a list of phred quality scores.
96+
handle = StringIO(fastq_str)
97+
# All error rates set to 0, so no changes.
98+
records = list(process_records(handle,
99+
subst_rate=0.0,
100+
ins_rate=0.0,
101+
del_rate=0.0,
102+
ins_quality=20,
103+
rng=random.Random(42)))
104+
assert len(records) == 1
105+
rec = records[0]
106+
assert str(rec.seq) == "ACGT"
107+
108+
# Check phred qualities:
109+
# '!' converts to 0 in Phred.
110+
assert rec.letter_annotations["phred_quality"] == [0]*4
111+
112+
113+
# --- Tests for argument parser ---
114+
def test_get_parser_defaults():
115+
parser = get_parser()
116+
args = parser.parse_args(["in.fastq", "out.fastq"])
117+
assert args.in_fastq == Path("in.fastq")
118+
assert args.out_fastq == Path("out.fastq")
119+
# default error rates from the parser.
120+
assert args.subst_rate == 0.005
121+
assert args.ins_rate == 0.0
122+
assert args.del_rate == 0.0
123+
assert args.ins_quality == 20
124+
assert args.seed == 42
125+
126+
127+
@pytest.fixture
128+
def tmp_fastq_file(tmp_path: Path) -> Path:
129+
fastq_path = tmp_path / "input.fastq"
130+
records = [
131+
SeqRecord(Seq("ACGTACGT"),
132+
id="read1",
133+
description="",
134+
letter_annotations={"phred_quality": [30]*8}),
135+
]
136+
with fastq_path.open("w") as fh:
137+
SeqIO.write(records, fh, "fastq")
138+
return fastq_path
139+
140+
141+
def test_process_fastq_no_error(tmp_fastq_file: Path, tmp_path: Path):
142+
output_path = tmp_path / "output.fastq"
143+
# Run process_fastq with error rates set to 0.
144+
process_fastq(in_fastq=tmp_fastq_file,
145+
out_fastq=output_path,
146+
subst_rate=0.0,
147+
ins_rate=0.0,
148+
del_rate=0.0,
149+
ins_quality=20,
150+
rng=random.Random(42))
151+
# Read back the output file.
152+
recs = list(SeqIO.parse(str(output_path), "fastq"))
153+
assert len(recs) == 1
154+
# Without errors, the sequence should be identical.
155+
assert str(recs[0].seq) == "ACGTACGT"
156+
157+
158+
def test_main_integration(tmp_fastq_file: Path, tmp_path: Path, monkeypatch):
159+
# Build a fake command line to run main.
160+
out_path = tmp_path / "output2.fastq"
161+
args = [
162+
str(tmp_fastq_file),
163+
str(out_path),
164+
"--subst_rate", "0.0",
165+
"--ins_rate", "0.0",
166+
"--del_rate", "0.0",
167+
"--ins_quality", "20",
168+
"--seed", "42"
169+
]
170+
# Call main directly.
171+
exit_code = main(args)
172+
assert exit_code == 0
173+
recs = list(SeqIO.parse(str(out_path), "fastq"))
174+
assert len(recs) == 1
175+
assert str(recs[0].seq) == "ACGTACGT"
176+
177+
178+
def test_entry_integration(tmp_fastq_file: Path, tmp_path: Path, monkeypatch):
179+
out_path = tmp_path / "output3.fastq"
180+
monkeypatch.setattr("sys.argv", [
181+
"error_introducer.py",
182+
str(tmp_fastq_file),
183+
str(out_path),
184+
"--subst_rate", "0.0",
185+
"--ins_rate", "0.0",
186+
"--del_rate", "0.0",
187+
"--ins_quality", "20",
188+
"--seed", "42"
189+
])
190+
191+
# entry() calls sys.exit; we can catch the SystemExit exception.
192+
with pytest.raises(SystemExit) as e:
193+
entry()
194+
195+
assert e.value.code == 0
196+
197+
recs = list(SeqIO.parse(str(out_path), "fastq"))
198+
assert len(recs) == 1
199+
assert str(recs[0].seq) == "ACGTACGT"

0 commit comments

Comments
 (0)