|
| 1 | +#! /usr/bin/env python |
| 2 | + |
| 3 | +import argparse |
| 4 | +import random |
| 5 | +import sys |
| 6 | +from pathlib import Path |
| 7 | +from typing import Sequence, Tuple, TextIO, Iterator |
| 8 | + |
| 9 | +from Bio import SeqIO |
| 10 | +from Bio.Seq import Seq |
| 11 | +from Bio.SeqRecord import SeqRecord |
| 12 | + |
| 13 | +NUCLEOTIDES = "ACGT" |
| 14 | + |
| 15 | + |
| 16 | +def introduce_errors(seq: str, |
| 17 | + qualities: Sequence[int], |
| 18 | + subst_rate: float, |
| 19 | + ins_rate: float, |
| 20 | + del_rate: float, |
| 21 | + ins_quality: int) -> Tuple[str, Sequence[int]]: |
| 22 | + |
| 23 | + """ |
| 24 | + Introduce substitution, insertion, and deletion errors into a sequence. |
| 25 | +
|
| 26 | + Args: |
| 27 | + seq: Original sequence string. |
| 28 | + qualities: List of phred quality values corresponding to each base. |
| 29 | + subst_rate: Per-base probability of substituting the base. |
| 30 | + ins_rate: Per-base probability (after processing a base) |
| 31 | + of inserting an extra nucleotide. |
| 32 | + del_rate: Per-base probability of deleting the base. |
| 33 | + ins_quality: Quality score for any inserted bases. |
| 34 | +
|
| 35 | + Returns: |
| 36 | + A tuple (new_seq, new_qualities) where new_seq is the |
| 37 | + error-introduced sequence and new_qualities is a list of quality |
| 38 | + scores for the new sequence. |
| 39 | + """ |
| 40 | + |
| 41 | + new_seq_chars = [] |
| 42 | + new_quals = [] |
| 43 | + |
| 44 | + for base, q in zip(seq, qualities): |
| 45 | + # First, decide whether to drop (delete) this base. |
| 46 | + if random.random() < del_rate: |
| 47 | + # Base is dropped (no substitution or insertion for this |
| 48 | + # base). You could decide to also sometimes insert an |
| 49 | + # extra unwanted base even if deletion occurs. |
| 50 | + continue |
| 51 | + |
| 52 | + # Otherwise, decide whether to substitute the base. |
| 53 | + if random.random() < subst_rate: |
| 54 | + # Choose a new nucleotide that is different from the original. |
| 55 | + possible = tuple(nuc for nuc in NUCLEOTIDES if nuc != base.upper()) |
| 56 | + new_base = random.choice(possible) |
| 57 | + else: |
| 58 | + new_base = base |
| 59 | + |
| 60 | + new_seq_chars.append(new_base) |
| 61 | + new_quals.append(q) |
| 62 | + |
| 63 | + # Now, decide whether to insert an extra (random) nucleotide |
| 64 | + # *after* this base. |
| 65 | + if random.random() < ins_rate: |
| 66 | + inserted_base = random.choice(NUCLEOTIDES) |
| 67 | + new_seq_chars.append(inserted_base) |
| 68 | + new_quals.append(ins_quality) |
| 69 | + |
| 70 | + return "".join(new_seq_chars), new_quals |
| 71 | + |
| 72 | + |
| 73 | +def process_records(input_handle: TextIO, |
| 74 | + subst_rate: float, |
| 75 | + ins_rate: float, |
| 76 | + del_rate: float, |
| 77 | + ins_quality: int, |
| 78 | + rng: random.Random, |
| 79 | + ) -> Iterator[SeqRecord]: |
| 80 | + |
| 81 | + for record in SeqIO.parse(input_handle, "fastq"): |
| 82 | + # Get original sequence and quality digits. |
| 83 | + original_seq = str(record.seq) |
| 84 | + original_quals = record.letter_annotations["phred_quality"] |
| 85 | + |
| 86 | + # Introduce errors into this read. |
| 87 | + new_seq_str, new_quals = introduce_errors(original_seq, |
| 88 | + original_quals, |
| 89 | + subst_rate, |
| 90 | + ins_rate, |
| 91 | + del_rate, |
| 92 | + ins_quality) |
| 93 | + |
| 94 | + # Create a new SeqRecord with updated sequence and quality. |
| 95 | + new_record = SeqRecord(Seq(new_seq_str), |
| 96 | + id=record.id, |
| 97 | + name=record.name, |
| 98 | + description=record.description) |
| 99 | + new_record.letter_annotations["phred_quality"] = new_quals |
| 100 | + yield new_record |
| 101 | + |
| 102 | + |
| 103 | +def process_fastq(in_fastq: Path, |
| 104 | + out_fastq: Path, |
| 105 | + subst_rate: float, |
| 106 | + ins_rate: float, |
| 107 | + del_rate: float, |
| 108 | + ins_quality: int, |
| 109 | + rng: random.Random, |
| 110 | + ) -> None: |
| 111 | + |
| 112 | + """ |
| 113 | + Process an input FASTQ file, introducing errors into each read, |
| 114 | + and write the modified reads to an output FASTQ file. |
| 115 | +
|
| 116 | + Args: |
| 117 | + in_fastq: Path to the input FASTQ file. |
| 118 | + out_fastq: Path to the output FASTQ file. |
| 119 | + subst_rate: Substitution error rate. |
| 120 | + ins_rate: Insertion error rate. |
| 121 | + del_rate: Deletion error rate. |
| 122 | + ins_quality: Base quality score for inserted bases. |
| 123 | + """ |
| 124 | + |
| 125 | + with open(in_fastq, "r") as input_handle, \ |
| 126 | + open(out_fastq, "w") as output_handle: |
| 127 | + new_records = process_records(input_handle=input_handle, |
| 128 | + subst_rate=subst_rate, |
| 129 | + ins_rate=ins_rate, |
| 130 | + del_rate=del_rate, |
| 131 | + ins_quality=ins_quality, |
| 132 | + rng=rng, |
| 133 | + ) |
| 134 | + SeqIO.write(new_records, output_handle, "fastq") |
| 135 | + |
| 136 | + |
| 137 | +def get_parser() -> argparse.ArgumentParser: |
| 138 | + parser = argparse.ArgumentParser( |
| 139 | + description="Introduce errors into a FASTQ file to generate test data for error-prone reads.", |
| 140 | + formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
| 141 | + ) |
| 142 | + parser.add_argument("in_fastq", type=Path, |
| 143 | + help="Input FASTQ file containing original reads.") |
| 144 | + parser.add_argument("out_fastq", type=Path, |
| 145 | + help="Output FASTQ file that will contain reads with errors.") |
| 146 | + parser.add_argument("--subst_rate", type=float, default=0.005, |
| 147 | + help="Per-base substitution error rate.") |
| 148 | + parser.add_argument("--ins_rate", type=float, default=0.0, |
| 149 | + help="Per-base insertion error rate.") |
| 150 | + parser.add_argument("--del_rate", type=float, default=0.0, |
| 151 | + help="Per-base deletion error rate.") |
| 152 | + parser.add_argument("--ins_quality", type=int, default=20, |
| 153 | + help="Quality score assigned to any inserted bases.") |
| 154 | + parser.add_argument("--seed", type=int, default=42, |
| 155 | + help="Random seed for reproducibility.") |
| 156 | + return parser |
| 157 | + |
| 158 | + |
| 159 | +def main(argv: Sequence[str]) -> int: |
| 160 | + parser = get_parser() |
| 161 | + args = parser.parse_args(argv) |
| 162 | + rng = random.Random(args.seed) |
| 163 | + process_fastq(in_fastq=args.in_fastq, |
| 164 | + out_fastq=args.out_fastq, |
| 165 | + subst_rate=args.subst_rate, |
| 166 | + ins_rate=args.ins_rate, |
| 167 | + del_rate=args.del_rate, |
| 168 | + ins_quality=args.ins_quality, |
| 169 | + rng=rng, |
| 170 | + ) |
| 171 | + return 0 |
| 172 | + |
| 173 | + |
| 174 | +def entry() -> None: |
| 175 | + sys.exit(main(sys.argv[1:])) |
| 176 | + |
| 177 | + |
| 178 | +if __name__ == '__main__': entry() # noqa |
0 commit comments