Skip to content

Commit 39a73f1

Browse files
committed
Contig stitcher: fix type checking errors
1 parent 5875c39 commit 39a73f1

File tree

1 file changed

+37
-30
lines changed

1 file changed

+37
-30
lines changed

micall/core/contig_stitcher.py

+37-30
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ def combine_contigs(parts: List[AlignedContig]) -> AlignedContig:
196196
return ret
197197

198198

199-
def align_to_reference(contig) -> Iterable[GenotypedContig]:
199+
def align_to_reference(contig: GenotypedContig) -> Iterable[GenotypedContig]:
200200
"""
201201
Align a single Contig to its reference sequence, producing potentially multiple aligned contigs.
202202
@@ -212,12 +212,11 @@ def align_to_reference(contig) -> Iterable[GenotypedContig]:
212212

213213
aligner = Aligner(seq=contig.ref_seq, preset='map-ont')
214214
alignments = list(aligner.map(contig.seq))
215-
hits_array = [(CigarHit(Cigar(x.cigar),
216-
min(x.r_st, x.r_en - 1),
217-
max(x.r_st, x.r_en - 1),
218-
min(x.q_st, x.q_en - 1),
219-
max(x.q_st, x.q_en - 1)),
220-
"forward" if x.strand == 1 else "reverse") for x in alignments]
215+
hits_array: List[Tuple[CigarHit, Literal["forward", "reverse"]]] = \
216+
[(CigarHit(Cigar(x.cigar),
217+
min(x.r_st, x.r_en - 1), max(x.r_st, x.r_en - 1),
218+
min(x.q_st, x.q_en - 1), max(x.q_st, x.q_en - 1)),
219+
"forward" if x.strand == 1 else "reverse") for x in alignments]
221220

222221
connected = connect_cigar_hits(list(map(lambda p: p[0], hits_array))) if hits_array else []
223222

@@ -263,35 +262,47 @@ def align_to_reference(contig) -> Iterable[GenotypedContig]:
263262
yield part
264263

265264

266-
def strip_conflicting_mappings(contigs):
265+
def strip_conflicting_mappings(contigs: Iterable[GenotypedContig]) -> Iterable[GenotypedContig]:
267266
contigs = list(contigs)
268267
names = {contig.name: contig for contig in contigs}
269-
reference_indexes = list(sorted(names.keys(), key=lambda name: names[name].alignment.r_st if isinstance(names[name], AlignedContig) else -1))
270-
query_indexes = list(sorted(names.keys(), key=lambda name: names[name].alignment.q_st if isinstance(names[name], AlignedContig) else -1))
271268

272-
def is_out_of_order(name):
273-
return reference_indexes.index(name) != query_indexes.index(name)
269+
def get_indexes(name: str) -> Tuple[int, int]:
270+
contig = names[name]
271+
if isinstance(contig, AlignedContig):
272+
return (contig.alignment.q_st, contig.alignment.r_st)
273+
else:
274+
return (-1, -1)
274275

275-
sorted_by_query = list(sorted(contigs, key=lambda contig: contig.alignment.q_st if isinstance(contig, AlignedContig) else -1))
276+
reference_sorted = list(sorted(names.keys(), key=lambda name: get_indexes(name)[1]))
277+
query_sorted = list(sorted(names.keys(), key=lambda name: get_indexes(name)[0]))
278+
279+
def is_out_of_order(name: str) -> bool:
280+
return reference_sorted.index(name) != query_sorted.index(name)
276281

282+
sorted_by_query = list(sorted(contigs, key=lambda contig: contig.alignment.q_st if isinstance(contig, AlignedContig) else -1))
277283
for prev_contig, contig, next_contig in sliding_window(sorted_by_query):
278-
name = contig.name
279-
if prev_contig is not None or is_out_of_order(name):
280-
contig = contig.lstrip_query()
281-
if next_contig is not None or is_out_of_order(name):
282-
contig = contig.rstrip_query()
284+
if isinstance(contig, AlignedContig):
285+
name = contig.name
286+
if prev_contig is not None or is_out_of_order(name):
287+
contig = contig.lstrip_query()
288+
if next_contig is not None or is_out_of_order(name):
289+
contig = contig.rstrip_query()
290+
283291
yield contig
284292

285293

286-
def align_all_to_reference(contigs):
294+
def align_all_to_reference(contigs: Iterable[GenotypedContig]) -> Iterable[GenotypedContig]:
287295
"""
288296
Align multiple contigs to their respective reference sequences.
289297
290298
Applies align_to_reference to each contig in the given collection,
291299
flattening the result into a single list.
292300
"""
293301

294-
return [contig for parts in map(strip_conflicting_mappings, map(align_to_reference, contigs)) for contig in parts]
302+
groups = map(align_to_reference, contigs)
303+
groups = map(strip_conflicting_mappings, groups)
304+
for group in groups:
305+
yield from group
295306

296307

297308
def align_queries(seq1: str, seq2: str) -> Tuple[str, str]:
@@ -514,7 +525,7 @@ def merge_intervals(intervals: List[Tuple[int, int]]) -> List[Tuple[int, int]]:
514525
return merged_intervals
515526

516527

517-
def find_covered_contig(contigs: List[AlignedContig]) -> Optional[AlignedContig]:
528+
def find_covered_contig(contigs: List[AlignedContig]) -> Tuple[Optional[AlignedContig], List[AlignedContig]]:
518529
"""
519530
Find and return the first contig that is completely covered by other contigs.
520531
@@ -539,7 +550,7 @@ def calculate_cumulative_coverage(contigs) -> List[Tuple[int, int]]:
539550
for cover_interval in cumulative_coverage):
540551
return current, overlaping_contigs
541552

542-
return None, None
553+
return None, []
543554

544555

545556
def drop_completely_covered(contigs: List[AlignedContig]) -> List[AlignedContig]:
@@ -624,7 +635,7 @@ def try_split(contig):
624635
return contigs
625636

626637

627-
def stitch_contigs(contigs: Iterable[GenotypedContig]) -> Iterable[AlignedContig]:
638+
def stitch_contigs(contigs: Iterable[GenotypedContig]) -> Iterable[GenotypedContig]:
628639
contigs = list(contigs)
629640
for contig in contigs:
630641
logger.info("Introduced contig %r of ref %r, group_ref %r, and length %s.",
@@ -634,15 +645,11 @@ def stitch_contigs(contigs: Iterable[GenotypedContig]) -> Iterable[AlignedContig
634645
contig.name, contig.seq, contig.ref_name,
635646
contig.group_ref, contig.ref_seq, len(contig.seq))
636647

637-
aligned = align_all_to_reference(contigs)
648+
maybe_aligned = list(align_all_to_reference(contigs))
638649

639650
# Contigs that did not align do not need any more processing
640-
yield from (x for x in aligned if not isinstance(x, AlignedContig))
641-
aligned = [x for x in aligned if isinstance(x, AlignedContig)]
642-
643-
# Contigs aligned in reverse do not need any more processing
644-
yield from (x for x in aligned if x.strand == "reverse")
645-
aligned = [x for x in aligned if x.strand == "forward"]
651+
yield from (x for x in maybe_aligned if not isinstance(x, AlignedContig))
652+
aligned = [x for x in maybe_aligned if isinstance(x, AlignedContig)]
646653

647654
aligned = split_contigs_with_gaps(aligned)
648655
aligned = drop_completely_covered(aligned)

0 commit comments

Comments
 (0)