Skip to content

Commit 841df5f

Browse files
committed
Make genome mapping more parallel
The idea behind this is to make the genome mapping faster, use less memory and more panellized. Seems some of the new genomes are too large and the selecting hits part fails. We can rework this by select the best hits per chunk scanned first. Then once we merge we can select the best hits on a much smaller set. This should reduce the memory usage of the final select step (we should only need memory proportional to the number of chunks and number of sequences not the number of total hits). It should also be faster since we can do multiple select steps at once.
1 parent 7e57705 commit 841df5f

File tree

3 files changed

+107
-27
lines changed

3 files changed

+107
-27
lines changed

Diff for: main.nf

+24-14
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ process species_to_map {
511511
raw_genomes
512512
.splitCsv()
513513
.filter { s, a, t, d -> !params.genome_mapping.species_excluded_from_mapping.contains(s) }
514-
.into { assemblies; genomes_to_fetch; assembly_tracking }
514+
.into { assemblies; genomes_to_fetch }
515515

516516
assemblies
517517
.combine(Channel.fromPath('files/genome-mapping/find-unmapped.sql'))
@@ -543,13 +543,13 @@ process download_genome {
543543
set val(species), val(assembly), val(taxid), val(division) from genomes_to_fetch
544544

545545
output:
546-
set val(species), file('parts/*.{2bit,ooc}') into genomes
546+
set val(species), val(assembly), file('parts/*.{2bit,ooc}') into genomes
547547

548548
"""
549549
set -o pipefail
550550
551551
rnac genome-mapping url-for --host=$division $species $assembly - |\
552-
xargs -I {} fetch generic '{}' ${species}.fasta.gz
552+
xargs -I {} fetch generic '{}' ${species}.fasta.gz
553553
554554
gzip -d ${species}.fasta.gz
555555
split-sequences ${species}.fasta ${params.genome_mapping.download_genome.chunk_size} parts
@@ -569,10 +569,10 @@ process download_genome {
569569

570570
genomes
571571
.join(split_mappable_sequences)
572-
.flatMap { species, genome_chunks, chunks ->
572+
.flatMap { species, assembly, genome_chunks, chunks ->
573573
[genome_chunks.collate(2), chunks]
574574
.combinations()
575-
.inject([]) { acc, it -> acc << [species] + it.flatten() }
575+
.inject([]) { acc, it -> acc << [species, assembly] + it.flatten() }
576576
}
577577
.set { targets }
578578

@@ -581,12 +581,14 @@ process blat {
581581
errorStrategy 'finish'
582582

583583
input:
584-
set val(species), file(genome), file(ooc), file(chunk) from targets
584+
set val(species), val(assembly), file(genome), file(ooc), file(chunk) from targets
585585

586586
output:
587-
set val(species), file('output.psl') into blat_results
587+
set val(species), file('selected.json') into blat_results
588588

589589
"""
590+
set -o pipefail
591+
590592
blat \
591593
-ooc=$ooc \
592594
-noHead \
@@ -596,38 +598,46 @@ process blat {
596598
-minScore=${params.genome_mapping.blat.options.min_score} \
597599
-minIdentity=${params.genome_mapping.blat.options.min_identity} \
598600
$genome $chunk output.psl
601+
602+
sort -k 10 output.psl |\
603+
rnac genome-mapping blat as-json $assembly - - |\
604+
rnac genome-mapping blat select - selected.json
599605
"""
600606
}
601607

602608
blat_results
603609
.groupTuple()
604-
.join(assembly_tracking)
605-
.map { species, psl, assembly_id, taxid, division -> [psl, species, assembly_id] }
606610
.set { species_results }
607611

608612
process select_mapped_locations {
609613
tag { species }
610614
memory { params.genome_mapping.select_mapped.directives.memory }
611615

612616
input:
613-
set file('output*.psl'), val(species), val(assembly_id) from species_results
617+
set val(species), file('selected*.json') from species_results
614618

615619
output:
616-
file 'locations.csv' into selected_locations
620+
file('locations.csv') into selected_locations
617621

618622
"""
619623
set -o pipefail
620624
621-
sort -k 10 output*.psl > sorted.psl
622-
rnac genome-mapping select-hits $assembly_id sorted.psl locations.csv
625+
find . -name 'selected*.json' |\
626+
xargs cat |\
627+
rnac genome-mapping blat select --sort - - |\
628+
rnac genome-mapping blat as-importable - locations.csv
623629
"""
624630
}
625631

632+
selected_locations
633+
.collect()
634+
.set { blat_to_import }
635+
626636
process load_genome_mapping {
627637
maxForks 1
628638

629639
input:
630-
file('raw*.csv') from selected_locations.collect()
640+
file('raw*.csv') from blat_to_import
631641
file(ctl) from Channel.fromPath('files/genome-mapping/load.ctl')
632642

633643
output:

Diff for: rnacentral_pipeline/cli/genome_mapping.py

+49-3
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,49 @@ def cli():
2929
pass
3030

3131

32-
@cli.command('select-hits')
32+
@cli.group('blat')
33+
def hits():
34+
"""
35+
A series of commands for working with blat hits.
36+
"""
37+
pass
38+
39+
40+
@hits.command('as-json')
3341
@click.argument('assembly_id')
3442
@click.argument('hits', default='-', type=click.File('r'))
3543
@click.argument('output', default='-', type=click.File('w'))
36-
def select_hits(assembly_id, hits, output):
37-
blat.write_selected(assembly_id, hits, output)
44+
def hits_json(assembly_id, hits, output):
45+
"""
46+
Convert the PSL file into a JSON-line file (one object per line). This is a
47+
lossy operation but keeps everything needed for selecting later.
48+
"""
49+
blat.as_json(assembly_id, hits, output)
50+
51+
52+
@cli.command('as-importable')
53+
@click.argument('hits', default='-', type=click.File('r'))
54+
@click.argument('output', default='-', type=click.File('w'))
55+
def as_importable(raw, output):
56+
"""
57+
Convert a json-line file into a CSV that can be used for import by pgloader.
58+
This is lossy as it only keeps the things needed for the database.
59+
"""
60+
blat.as_importable(raw, output)
61+
62+
63+
@hits.command('select')
64+
@click.option('--sort', default=False)
65+
@click.argument('hits', default='-', type=click.File('r'))
66+
@click.argument('output', default='-', type=click.File('w'))
67+
def select_hits(hits, output, sort=False):
68+
"""
69+
Parse a JSON-line file and select the best hits in the file. The best hits
70+
are written to the output file. This assumes the file is sorted by
71+
urs_taxid unless --sort is given in which case the data is sorted in memory.
72+
That may be very expensive.
73+
"""
74+
blat.select_json(hits, output, sort=sort)
3875

3976

4077
@cli.command('url-for')
@@ -43,6 +80,10 @@ def select_hits(assembly_id, hits, output):
4380
@click.argument('assembly_id')
4481
@click.argument('output', default='-', type=click.File('w'))
4582
def find_remote_url(species, assembly_id, output, host=None):
83+
"""
84+
Determine the remote URL to fetch a the genome for a given species/assembly.
85+
The url is written to the output file and may include '*'.
86+
"""
4687
url = urls.url_for(species, assembly_id, host=host)
4788
output.write(url)
4889

@@ -51,4 +92,9 @@ def find_remote_url(species, assembly_id, output, host=None):
5192
@click.argument('filename', default='-', type=click.File('r'))
5293
@click.argument('output', default='-', type=click.File('w'))
5394
def find_remote_urls(filename, output):
95+
"""
96+
Determine the remote URL to fetch a the genomes for all entries in a file,
97+
where the file is a csv of species,assembly. The urls is written to the
98+
output file and may include '*'.
99+
"""
54100
urls.write_urls_for(filename, output)

Diff for: rnacentral_pipeline/rnacentral/genome_mapping/blat.py

+34-10
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,14 @@ def build(cls, assembly_id, raw):
8484

8585
@property
8686
def name(self):
87-
return self.region.name(upi=self.upi)
87+
return self.region.name(self.upi, is_upi=True)
8888

8989
@property
9090
def match_fraction(self):
9191
return float(self.matches) / float(self.sequence_length)
9292

9393
def writeable(self):
94-
return self.writeable(self.upi)
94+
return self.region.writeable(self.upi, is_upi=True)
9595

9696

9797
def select_possible(hit):
@@ -113,7 +113,7 @@ def select_best(hits):
113113
return hits
114114

115115

116-
def parse(assembly_id, handle):
116+
def parse_psl(assembly_id, handle):
117117
to_split = ['blockSizes', 'qStarts', 'tStarts']
118118
for row in csv.reader(handle, delimiter='\t'):
119119
result = dict(zip(FIELDS, row))
@@ -129,8 +129,13 @@ def parse(assembly_id, handle):
129129
yield BlatHit.build(assembly_id, result)
130130

131131

132-
def select_hits(assembly_id, handle):
133-
hits = parse(assembly_id, handle)
132+
def parse_json(handle):
133+
for line in handle:
134+
data = json.loads(line)
135+
yield BlatHit(**data)
136+
137+
138+
def select_hits(hits):
134139
hits = six.moves.filter(select_possible, hits)
135140
hits = it.groupby(hits, op.attrgetter('upi'))
136141
hits = six.moves.map(op.itemgetter(1), hits)
@@ -139,8 +144,27 @@ def select_hits(assembly_id, handle):
139144
return hits
140145

141146

142-
def write_selected(assembly_id, hits, output):
143-
selected = select_hits(assembly_id, hits)
144-
selected = six.moves.map(op.methodcaller('writeable'), selected)
145-
selected = it.chain.from_iterable(selected)
146-
csv.writer(output).writerows(selected)
147+
def write_json(hits, output):
148+
for hit in hits:
149+
output.write(json.dumps(hit))
150+
output.write('\n')
151+
152+
153+
def write_importable(hits, output):
154+
writeable = six.moves.map(op.methodcaller('writeable'), hits)
155+
writeable = it.chain.from_iterable(hits)
156+
csv.writer(output).writerows(writeable)
157+
158+
159+
def as_json(assembly_id, hits, output):
160+
parsed = parse_psl(assembly_id, hits)
161+
parsed = six.moves.map(attr.asdict, selected)
162+
write_json(parsed, output)
163+
164+
165+
def select_json(hits, output, sort=False):
166+
parsed = parse_json(hits)
167+
if sort:
168+
parsed = sorted(parsed, key=op.itemgetter('upi'))
169+
selected = select_hits(parsed)
170+
write_json(selected, output)

0 commit comments

Comments
 (0)