Skip to content

Commit 0f6e731

Browse files
committed
Merge fastqs: support arbitrary number of inputs
Also improve the compression handling.
1 parent 6840f80 commit 0f6e731

File tree

3 files changed

+426
-169
lines changed

3 files changed

+426
-169
lines changed

micall/core/merge_fastqs.py

+134-89
Original file line numberDiff line numberDiff line change
@@ -1,127 +1,172 @@
1-
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
1+
import argparse
22
import logging
33
import shutil
44
import sys
55
import os
66
import tempfile
77
import gzip
8+
import csv
9+
from typing import TextIO, List, Optional, Dict, IO, Set, Tuple
810

911
from micall.core.trim_fastqs import TrimSteps, trim
1012

1113

1214
logging.basicConfig(level=logging.INFO,
1315
format='%(asctime)s[%(levelname)s]%(name)s.%(funcName)s(): %(message)s')
14-
logger = logging.getLogger('micall')
16+
logger = logging.getLogger(__name__)
1517

1618

17-
def concatenate_files(input_file1, input_file2, output_file):
18-
with open(input_file1, 'rb') as src1, \
19-
open(input_file2, 'rb') as src2, \
20-
open(output_file, 'wb') as dst:
19+
def concatenate_files(input_files: List[str], output_file: str) -> None:
20+
with open(output_file, 'wb') as dst:
21+
for input_file in input_files:
22+
with open(input_file, 'rb') as src:
23+
shutil.copyfileobj(src, dst)
2124

22-
shutil.copyfileobj(src1, dst)
23-
shutil.copyfileobj(src2, dst)
2425

26+
def parse_inputs_and_merge_fastqs(trim_file: TextIO, mergeplan_file: TextIO, zip_file: Optional[TextIO]) -> None:
27+
mergeplan: Dict[str, List[str]] = {}
28+
trimplan: Set[Tuple[str, ...]] = set()
29+
zipped: Set[str] = set()
30+
bad_cycles: Dict[str, str] = {}
2531

26-
def merge_fastqs(args):
27-
with tempfile.NamedTemporaryFile() as trimmed_fastq1_a, \
28-
tempfile.NamedTemporaryFile() as trimmed_fastq2_a, \
29-
tempfile.NamedTemporaryFile() as trimmed_fastq1_b, \
30-
tempfile.NamedTemporaryFile() as trimmed_fastq2_b:
32+
mergeplan_reader = csv.DictReader(mergeplan_file)
33+
trim_reader = csv.DictReader(trim_file)
34+
zip_reader = csv.DictReader(zip_file) if zip_file is not None else None
3135

32-
logger.info('Processing reads of Sample A.')
36+
for row in mergeplan_reader:
37+
input_path = row["input"]
38+
output_path = row["output"]
3339

34-
trim((args.fastq1_a, args.fastq2_a),
35-
args.bad_cycles_a_csv,
36-
(trimmed_fastq1_a.name, trimmed_fastq2_a.name),
37-
use_gzip=not args.unzipped)
40+
if output_path not in mergeplan:
41+
mergeplan[output_path] = []
42+
mergeplan[output_path].append(input_path)
3843

39-
logger.info('Processing reads of Sample B.')
44+
no_badcycles_fields = list(sorted(
45+
(field for field in trim_reader.fieldnames or [] if field != "bad_cycles"),
46+
key=lambda field: field.lower()))
47+
expected_no_badcycles_fields = [f"r{i + 1}" for i in range(len(no_badcycles_fields))]
4048

41-
trim((args.fastq1_b, args.fastq2_b),
42-
args.bad_cycles_b_csv,
43-
(trimmed_fastq1_b.name, trimmed_fastq2_b.name),
44-
use_gzip=not args.unzipped)
49+
if [field.lower() for field in no_badcycles_fields] \
50+
!= expected_no_badcycles_fields:
51+
raise ValueError(f"Bad field names: {no_badcycles_fields}, expected {expected_no_badcycles_fields}")
4552

46-
logger.info('Merging resulting reads files.')
53+
for row in trim_reader:
54+
input_paths = tuple(row[field] for field in no_badcycles_fields)
55+
trimplan.add(input_paths)
56+
bad_cycles_path = row.get("bad_cycles", "")
57+
if bad_cycles_path:
58+
for input_path in input_paths:
59+
bad_cycles[input_path] = bad_cycles_path
4760

48-
os.makedirs(os.path.dirname(args.fastq1_result), exist_ok=True)
49-
os.makedirs(os.path.dirname(args.fastq2_result), exist_ok=True)
61+
if zip_reader is not None:
62+
for row in zip_reader:
63+
zipped.add(row["file"])
5064

51-
if args.unzipped:
52-
concatenate_files(trimmed_fastq1_a.name, trimmed_fastq1_b.name,
53-
args.fastq1_result)
54-
concatenate_files(trimmed_fastq2_a.name, trimmed_fastq2_b.name,
55-
args.fastq2_result)
65+
return merge_fastqs(trimplan, mergeplan, zipped, bad_cycles)
5666

57-
else:
58-
with tempfile.NamedTemporaryFile() as temp_fastq1, \
59-
tempfile.NamedTemporaryFile() as temp_fastq2:
6067

61-
temp_fastq1.close()
62-
temp_fastq2.close()
68+
def compress_file(input_path: str, output_path: str) -> None:
69+
with open(input_path, 'rb') as input_file, \
70+
open(output_path, 'wb') as output_file:
71+
with gzip.GzipFile(fileobj=output_file, mode='wb') as gzip_file:
72+
shutil.copyfileobj(input_file, gzip_file)
6373

64-
concatenate_files(trimmed_fastq1_a.name, trimmed_fastq1_b.name,
65-
temp_fastq1.name)
66-
concatenate_files(trimmed_fastq2_a.name, trimmed_fastq2_b.name,
67-
temp_fastq2.name)
6874

69-
logger.info('Compressing final outputs.')
75+
def uncompress_file(input_path: str, output_path: str) -> None:
76+
with open(input_path, 'rb') as compressed_file, \
77+
open(output_path, 'w+b') as ret:
78+
with gzip.GzipFile(fileobj=compressed_file, mode='rb') as gzip_file:
79+
shutil.copyfileobj(gzip_file, ret)
7080

71-
with open(temp_fastq1.name, 'rb') as f_in, gzip.open(args.fastq1_result, 'wb') as f_out:
72-
shutil.copyfileobj(f_in, f_out)
7381

74-
with open(temp_fastq2.name, 'rb') as f_in, gzip.open(args.fastq2_result, 'wb') as f_out:
75-
shutil.copyfileobj(f_in, f_out)
82+
def get_transitive(graph: Dict[str, str], key: str) -> str:
83+
if key in graph:
84+
return get_transitive(graph, graph[key])
85+
else:
86+
return key
7687

77-
logger.info('Done.')
7888

89+
def merge_fastqs(trimplan: Set[Tuple[str, ...]],
90+
mergeplan: Dict[str, List[str]],
91+
zipped: Set[str] = set(),
92+
bad_cycles: Dict[str, str] = {},
93+
) -> None:
94+
95+
inputs = [value for values in mergeplan.values() for value in values]
96+
outputs = list(mergeplan.keys())
97+
name_mapping: Dict[str, str] = {}
98+
temporary: List[IO] = []
99+
100+
for output_path in outputs:
101+
os.makedirs(os.path.dirname(output_path), exist_ok=True)
102+
103+
for input_path in inputs:
104+
if input_path in zipped:
105+
logger.info('Uncompressing %s.', input_path)
106+
uncompressed = tempfile.NamedTemporaryFile(mode="w+b")
107+
uncompressed_path = uncompressed.name
108+
uncompress_file(input_path, uncompressed_path)
109+
temporary.append(uncompressed)
110+
name_mapping[input_path] = uncompressed_path
111+
112+
for to_trim in trimplan:
113+
assert len(to_trim) > 0
114+
115+
trim_outputs: List[str] = []
116+
trim_inputs: List[str] = []
79117

80-
def main(argv) -> int:
81-
parser = ArgumentParser(
82-
description="Combine and filter the FASTQ files from two samples into a single output file.",
83-
formatter_class=ArgumentDefaultsHelpFormatter)
84-
85-
parser.add_argument(
86-
"fastq1_a",
87-
help="FASTQ file containing forward reads of sample A",
88-
)
89-
parser.add_argument(
90-
"fastq2_a",
91-
help="FASTQ file containing reverse reads of sample A",
92-
)
93-
parser.add_argument(
94-
"fastq1_b",
95-
help="FASTQ file containing forward reads of sample B",
96-
)
97-
parser.add_argument(
98-
"fastq2_b",
99-
help="FASTQ file containing reverse reads of sample B",
100-
)
101-
parser.add_argument(
102-
"fastq1_result",
103-
help="Resulting combined FASTQ file containing forward reads",
104-
)
105-
parser.add_argument(
106-
"fastq2_result",
107-
help="Resulting combined FASTQ file containing reverse reads",
108-
)
109-
parser.add_argument(
110-
"--bad_cycles_a_csv",
111-
help="list of tiles and cycles rejected for poor quality in sample A",
112-
)
113-
parser.add_argument(
114-
"--bad_cycles_b_csv",
115-
help="list of tiles and cycles rejected for poor quality in sample B",
116-
)
117-
parser.add_argument(
118-
'--unzipped', '-u',
119-
action='store_true',
120-
help='Set if the original FASTQ files are not compressed',
121-
)
118+
all_bad_cycles_paths = set(bad_cycles[path] for path in to_trim if path in bad_cycles)
119+
if len(all_bad_cycles_paths) == 0:
120+
bad_cycles_path = None
121+
elif len(all_bad_cycles_paths) == 1:
122+
bad_cycles_path = next(iter(all_bad_cycles_paths))
123+
else:
124+
raise ValueError(f"Ambiguous bad_cycles for {to_trim}: {all_bad_cycles_paths}.")
125+
126+
for path in to_trim:
127+
path = get_transitive(name_mapping, path)
128+
tmp = tempfile.NamedTemporaryFile()
129+
trim_inputs.append(path)
130+
trim_outputs.append(tmp.name)
131+
temporary.append(tmp)
132+
name_mapping[path] = tmp.name
133+
134+
logger.info('Trimming samples %s.', ','.join(map(repr, to_trim)))
135+
trim(trim_inputs, bad_cycles_path, trim_outputs, use_gzip=False)
136+
137+
for output_path in mergeplan:
138+
input_files = mergeplan[output_path]
139+
logger.info('Merging results %s to %s.', ','.join(map(repr, input_files)), output_path)
140+
input_files = [get_transitive(name_mapping, path) for path in input_files]
141+
tmp = tempfile.NamedTemporaryFile()
142+
temporary.append(tmp)
143+
name_mapping[output_path] = tmp.name
144+
output_path = tmp.name
145+
concatenate_files(input_files, output_path)
146+
147+
for output_path in mergeplan:
148+
concatenated = name_mapping[output_path]
149+
if output_path in zipped:
150+
logger.info('Compressing output %s.', output_path)
151+
compress_file(concatenated, output_path)
152+
else:
153+
os.rename(concatenated, output_path)
122154

155+
for toclose in temporary:
156+
try: toclose.close()
157+
except: pass
158+
159+
logger.info('Done.')
160+
161+
162+
def main(argv: List[str]) -> int:
163+
parser = argparse.ArgumentParser(description="Combine and filter the FASTQ files from multiple samples into single output files.")
164+
parser.add_argument("trimplan", type=argparse.FileType('rt'), help="A CSV file containing the lists of files to be trimmed.")
165+
parser.add_argument("mergeplan", type=argparse.FileType('rt'), help="A CSV file containing merge plan.")
166+
parser.add_argument("--zipfile", type=argparse.FileType('rt'), help="A CSV file containing a list of files that are compressed or need to be compressed.")
123167
args = parser.parse_args(argv)
124-
merge_fastqs(args)
168+
169+
parse_inputs_and_merge_fastqs(args.trimplan, args.mergeplan, args.zipfile)
125170
return 0
126171

127172

micall/core/trim_fastqs.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from pathlib import Path
1717

1818
import typing
19+
from typing import Optional
1920

2021
from Bio import SeqIO
2122

@@ -57,12 +58,12 @@ def parse_args():
5758

5859

5960
def trim(original_fastq_filenames: typing.Sequence[str],
60-
bad_cycles_filename: str,
61+
bad_cycles_filename: Optional[str],
6162
trimmed_fastq_filenames: typing.Sequence[str],
6263
use_gzip: bool = True,
63-
summary_file: typing.TextIO = None,
64-
skip: typing.Tuple[str] = (),
65-
project_code: str = None):
64+
summary_file: Optional[typing.TextIO] = None,
65+
skip: typing.Tuple[str, ...] = (),
66+
project_code: Optional[str] = None):
6667
"""
6768
6869
:param original_fastq_filenames: sequence of two filenames, containing

0 commit comments

Comments
 (0)