-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathuc2biom
executable file
·123 lines (110 loc) · 4.3 KB
/
uc2biom
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env python
import sys
import biom
import skbio
import os.path
import collections
def get_next_record_type(lines, types):
for line in lines:
line = line.strip()
if line and line[0] in types:
yield line.split('\t')
return
def build_seq_to_otu_id_map(fasta_fp):
"""
fasta_fp:
"""
results = {}
for s in skbio.io.read(fasta_fp, format='fasta'):
otu_id = s.metadata['id']
seq_id = s.metadata['description'].split()[0]
results[seq_id] = otu_id
return results
def uc2biom(uc_lines,
seed_to_otu_ids=None,
error_on_multiple_hits=True):
""" Given an open .uc file, return lists (clusters, failures, new_seeds)
uc_lines: open .uc file, or similar object -- this is the output
generated by uclust's -uc parameter
error_on_multiple_hits: if True (default), when a single query hits
to multiple seeds, as can happen when --allhits is passed to uclust,
throw a UclustParseError. if False, when a single query hits to
multiple seeds, it will appear in each cluster.
This function processes all hit (H), seed (S), and no hit (N) lines
to return all clusters, failures, and new_seeds generated in
a uclust run. failures should only arise when users have passed
--lib and --libonly, and a sequence doesn't cluster to any existing
reference database sequences.
"""
data = collections.defaultdict(int)
sample_idxs = {}
sample_ids = []
observation_idxs = {}
observation_ids = []
dn_otu_count = 0
# the types of hit lines we're interested in here
# are hit (H), seed (S), library seed (L) and no hit (N)
hit_types = {}.fromkeys(list('HSNL'))
for record in get_next_record_type(uc_lines, hit_types):
hit_type = record[0]
# sequence identifier lines from the fasta header lines only.
# strip off any comments here as this value
# is used in several places
query_id = record[8].split()[0]
sample_id = query_id.split('_')[0]
observation_id = record[9].split()[0]
if observation_id == '*':
if hit_type == 'S':
observation_id = query_id
else:
continue
try:
sample_idx = sample_idxs[sample_id]
except KeyError:
sample_idx = len(sample_ids)
sample_idxs[sample_id] = sample_idx
sample_ids.append(sample_id)
if seed_to_otu_ids is not None:
try:
observation_id = seed_to_otu_ids[observation_id]
except(KeyError):
raise KeyError("Unknown seed identifier observed: %s." % observation_id)
try:
observation_idx = observation_idxs[observation_id]
except KeyError:
observation_idx = len(observation_ids)
observation_ids.append(observation_id)
observation_idxs[observation_id] = observation_idx
if hit_type == 'H' or hit_type == 'S':
data[(observation_idx, sample_idx)] += 1
elif hit_type == 'L' or hit_type == 'N':
# we don't actually need to do anything here right now
pass
else:
# shouldn't be possible to get here, but provided for
# clarity
raise ValueError(
"Unexpected result parsing line:\n%s" % '\t'.join(record))
return biom.Table(data,
observation_ids=observation_ids,
sample_ids=sample_ids)
if __name__ == "__main__":
usage = "Usage: uc2biom <input-uc> <output-biom> [representative-set-fasta]"
if len(sys.argv) != 3 and len(sys.argv) != 4:
sys.exit(usage)
uc_fp = sys.argv[1]
biom_fp = sys.argv[2]
if len(sys.argv) == 4:
fasta_fp = sys.argv[3]
print(fasta_fp)
seq_to_otu_id_map = build_seq_to_otu_id_map(fasta_fp)
else:
seq_to_otu_id_map = None
if os.path.exists(biom_fp):
sys.exit("Output filepath exists. "
"Please specify a different output filepath.")
t = uc2biom(open(uc_fp, 'U'),
seed_to_otu_ids=seq_to_otu_id_map)
with open(biom_fp, 'w') as out_f:
t.to_json("[uc2biom](https://github.com/gregcaporaso/scaling-octo-robot)",
out_f)