|
| 1 | +import random |
| 2 | +import pytest |
| 3 | +from pathlib import Path |
| 4 | +from io import StringIO |
| 5 | + |
| 6 | +from Bio import SeqIO |
| 7 | +from Bio.Seq import Seq |
| 8 | +from Bio.SeqRecord import SeqRecord |
| 9 | + |
| 10 | +from micall.utils.randomize_fastq import ( |
| 11 | + introduce_errors, |
| 12 | + process_records, |
| 13 | + process_fastq, |
| 14 | + get_parser, |
| 15 | + main, |
| 16 | + entry, |
| 17 | + NUCLEOTIDES, |
| 18 | +) |
| 19 | + |
| 20 | + |
| 21 | +# --- Tests for introduce_errors --- |
| 22 | +def test_introduce_errors_no_error(): |
| 23 | + seq = "ACGTACGT" |
| 24 | + qualities = [30] * len(seq) |
| 25 | + # no errors: all error rates set to 0. |
| 26 | + new_seq, new_quals = introduce_errors(seq, qualities, |
| 27 | + subst_rate=0.0, |
| 28 | + ins_rate=0.0, |
| 29 | + del_rate=0.0, |
| 30 | + ins_quality=20) |
| 31 | + assert new_seq == seq |
| 32 | + assert new_quals == qualities |
| 33 | + |
| 34 | + |
| 35 | +def test_introduce_errors_full_deletion(): |
| 36 | + seq = "ACGTACGT" |
| 37 | + qualities = [30] * len(seq) |
| 38 | + new_seq, new_quals = introduce_errors(seq, qualities, |
| 39 | + subst_rate=0.0, |
| 40 | + ins_rate=0.0, |
| 41 | + del_rate=1.0, |
| 42 | + ins_quality=20) |
| 43 | + # With full deletion, expect empty sequence and empty quality list. |
| 44 | + assert new_seq == "" |
| 45 | + assert new_quals == [] |
| 46 | + |
| 47 | + |
| 48 | +def test_introduce_errors_full_substitution(): |
| 49 | + seq = "ACGT" |
| 50 | + qualities = [30, 31, 32, 33] |
| 51 | + # Set subst_rate=1 so every base is substituted. |
| 52 | + new_seq, new_quals = introduce_errors(seq, qualities, |
| 53 | + subst_rate=1.0, |
| 54 | + ins_rate=0.0, |
| 55 | + del_rate=0.0, |
| 56 | + ins_quality=20) |
| 57 | + assert len(new_seq) == len(seq) |
| 58 | + # for each base, new base should be different than original. |
| 59 | + for original, new in zip(seq, new_seq): |
| 60 | + assert new.upper() in NUCLEOTIDES |
| 61 | + assert new.upper() != original.upper() |
| 62 | + assert new_quals == qualities |
| 63 | + |
| 64 | + |
| 65 | +def test_introduce_errors_full_insertion(): |
| 66 | + seq = "ACGT" |
| 67 | + qualities = [30] * len(seq) |
| 68 | + new_seq, new_quals = introduce_errors(seq, qualities, |
| 69 | + subst_rate=0.0, |
| 70 | + ins_rate=1.0, |
| 71 | + del_rate=0.0, |
| 72 | + ins_quality=15) |
| 73 | + # Every base should be followed by an inserted base: length 2*len(seq) |
| 74 | + assert len(new_seq) == 2 * len(seq) |
| 75 | + # And the bases at even indices (0-indexed) should be the original bases. |
| 76 | + for i, base in enumerate(seq): |
| 77 | + assert new_seq[2*i] == base |
| 78 | + # Check inserted qualities: |
| 79 | + # The inserted qualities (at odd indices) should equal ins_quality. |
| 80 | + for i in range(len(seq)): |
| 81 | + assert new_quals[2*i] == qualities[i] |
| 82 | + assert new_quals[2*i+1] == 15 |
| 83 | + |
| 84 | + |
| 85 | +# --- Test for process_records using an in-memory FASTQ string --- |
| 86 | +def test_process_records_no_error(): |
| 87 | + fastq_str = ( |
| 88 | + "@read1\n" |
| 89 | + "ACGT\n" |
| 90 | + "+\n" |
| 91 | + "!!!!\n" |
| 92 | + ) |
| 93 | + |
| 94 | + # in FASTQ, qualities are usually encoded by ascii, but BioPython converts |
| 95 | + # them to a list of phred quality scores. |
| 96 | + handle = StringIO(fastq_str) |
| 97 | + # All error rates set to 0, so no changes. |
| 98 | + records = list(process_records(handle, |
| 99 | + subst_rate=0.0, |
| 100 | + ins_rate=0.0, |
| 101 | + del_rate=0.0, |
| 102 | + ins_quality=20, |
| 103 | + rng=random.Random(42))) |
| 104 | + assert len(records) == 1 |
| 105 | + rec = records[0] |
| 106 | + assert str(rec.seq) == "ACGT" |
| 107 | + |
| 108 | + # Check phred qualities: |
| 109 | + # '!' converts to 0 in Phred. |
| 110 | + assert rec.letter_annotations["phred_quality"] == [0]*4 |
| 111 | + |
| 112 | + |
| 113 | +# --- Tests for argument parser --- |
| 114 | +def test_get_parser_defaults(): |
| 115 | + parser = get_parser() |
| 116 | + args = parser.parse_args(["in.fastq", "out.fastq"]) |
| 117 | + assert args.in_fastq == Path("in.fastq") |
| 118 | + assert args.out_fastq == Path("out.fastq") |
| 119 | + # default error rates from the parser. |
| 120 | + assert args.subst_rate == 0.005 |
| 121 | + assert args.ins_rate == 0.0 |
| 122 | + assert args.del_rate == 0.0 |
| 123 | + assert args.ins_quality == 20 |
| 124 | + assert args.seed == 42 |
| 125 | + |
| 126 | + |
| 127 | +@pytest.fixture |
| 128 | +def tmp_fastq_file(tmp_path: Path) -> Path: |
| 129 | + fastq_path = tmp_path / "input.fastq" |
| 130 | + records = [ |
| 131 | + SeqRecord(Seq("ACGTACGT"), |
| 132 | + id="read1", |
| 133 | + description="", |
| 134 | + letter_annotations={"phred_quality": [30]*8}), |
| 135 | + ] |
| 136 | + with fastq_path.open("w") as fh: |
| 137 | + SeqIO.write(records, fh, "fastq") |
| 138 | + return fastq_path |
| 139 | + |
| 140 | + |
| 141 | +def test_process_fastq_no_error(tmp_fastq_file: Path, tmp_path: Path): |
| 142 | + output_path = tmp_path / "output.fastq" |
| 143 | + # Run process_fastq with error rates set to 0. |
| 144 | + process_fastq(in_fastq=tmp_fastq_file, |
| 145 | + out_fastq=output_path, |
| 146 | + subst_rate=0.0, |
| 147 | + ins_rate=0.0, |
| 148 | + del_rate=0.0, |
| 149 | + ins_quality=20, |
| 150 | + rng=random.Random(42)) |
| 151 | + # Read back the output file. |
| 152 | + recs = list(SeqIO.parse(str(output_path), "fastq")) |
| 153 | + assert len(recs) == 1 |
| 154 | + # Without errors, the sequence should be identical. |
| 155 | + assert str(recs[0].seq) == "ACGTACGT" |
| 156 | + |
| 157 | + |
| 158 | +def test_main_integration(tmp_fastq_file: Path, tmp_path: Path, monkeypatch): |
| 159 | + # Build a fake command line to run main. |
| 160 | + out_path = tmp_path / "output2.fastq" |
| 161 | + args = [ |
| 162 | + str(tmp_fastq_file), |
| 163 | + str(out_path), |
| 164 | + "--subst_rate", "0.0", |
| 165 | + "--ins_rate", "0.0", |
| 166 | + "--del_rate", "0.0", |
| 167 | + "--ins_quality", "20", |
| 168 | + "--seed", "42" |
| 169 | + ] |
| 170 | + # Call main directly. |
| 171 | + exit_code = main(args) |
| 172 | + assert exit_code == 0 |
| 173 | + recs = list(SeqIO.parse(str(out_path), "fastq")) |
| 174 | + assert len(recs) == 1 |
| 175 | + assert str(recs[0].seq) == "ACGTACGT" |
| 176 | + |
| 177 | + |
| 178 | +def test_entry_integration(tmp_fastq_file: Path, tmp_path: Path, monkeypatch): |
| 179 | + out_path = tmp_path / "output3.fastq" |
| 180 | + monkeypatch.setattr("sys.argv", [ |
| 181 | + "error_introducer.py", |
| 182 | + str(tmp_fastq_file), |
| 183 | + str(out_path), |
| 184 | + "--subst_rate", "0.0", |
| 185 | + "--ins_rate", "0.0", |
| 186 | + "--del_rate", "0.0", |
| 187 | + "--ins_quality", "20", |
| 188 | + "--seed", "42" |
| 189 | + ]) |
| 190 | + |
| 191 | + # entry() calls sys.exit; we can catch the SystemExit exception. |
| 192 | + with pytest.raises(SystemExit) as e: |
| 193 | + entry() |
| 194 | + |
| 195 | + assert e.value.code == 0 |
| 196 | + |
| 197 | + recs = list(SeqIO.parse(str(out_path), "fastq")) |
| 198 | + assert len(recs) == 1 |
| 199 | + assert str(recs[0].seq) == "ACGTACGT" |
0 commit comments