-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcreate_completion_matrix.py
211 lines (180 loc) · 8.65 KB
/
create_completion_matrix.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# -*- coding: utf-8 -*-
#!/usr/bin/env/ python3
import os
import sys
import pythoncyc
import argparse
import subprocess
from pythoncyc import PTools as PTools
from pythoncyc.PTools import PToolsError as PToolsError
from pythoncyc.PTools import PythonCycError as PythonCycError
#Usage : python3 dev_pythoncyc.py -p META
def close_pgdb_wosaving(pgdb):
try:
r = PTools.sendQueryToPTools('(close-kb :kb (kb-of-organism "'+pgdb._orgid+'") :save-updates-p nil)')
except PToolsError as msg:
raise PythonCycError('Pathway Tools was unable to close KB of organism (orgid) {orgid}. More specifically: {msg}'.format(orgid=pgdb._orgid, msg=msg))
return(r)
def get_genes_of_reaction(rxn, pgdb):
genes = set()
for gene in pgdb.genes_of_reaction(rxn):
genes.add(gene)
for enz in pgdb.enzymes_of_reaction(rxn):
for gene in pgdb.genes_of_protein(enz):
genes.add(gene)
return(genes)
def write_pathway(pgdb):
with open("metacyc_pathways.tsv", "w") as pathways:
with open("metacyc_reactions_by_pathway.tsv", "w") as reactions:
header1="Pathway_Id\tCommon_name\n"
pathways.write(header1)
header2="Pathway_Id\tReaction_Id\tSpontaneous\tOrphan\tOrphan_in_Metacyc\n"
reactions.write(header2)
for path in pgdb.all_pathways(selector='all', base=True):
to_write=path.split("|")[1]+"\t"+pgdb[path].common_name+"\n"
pathways.write(to_write)
for rxn in pgdb[path].reaction_list:
orphan_in_metacyc = "FALSE"
orphan = "NA"
spontaneous = "FALSE"
if pgdb[rxn].spontaneous_p != None and pgdb[rxn].spontaneous_p == True:
spontaneous = "TRUE"
if len(get_genes_of_reaction(rxn, pgdb)) == 0 and spontaneous == "FALSE":
orphan_in_metacyc = "TRUE"
if pgdb[rxn].orphan_p != None:
if pgdb[rxn].orphan_p[0] == "|NO|":
orphan = "FALSE"
else:
orphan = "TRUE"
to_write_r = path.split("|")[1]+"\t"+rxn.split("|")[1]+"\t"+spontaneous+"\t"+orphan+"\t"+orphan_in_metacyc+"\n"
reactions.write(to_write_r)
def get_pathways_none_spontaneous_reactions(pgdb):
pathways = dict()
for path in pgdb.all_pathways(selector='all', base=True):
pathways[path] = dict()
pathways[path]['Name'] = pgdb[path].common_name
pathways[path]['Reactions'] = set()
for rxn in pgdb[path].reaction_list:
if pgdb[rxn].spontaneous_p != None:
if pgdb[rxn].spontaneous_p != True:
pathways[path]['Reactions'].add(rxn)
else:
pathways[path]['Reactions'].add(rxn)
return(pathways)
def get_pathways_none_spontaneous_orphan_reactions(pgdb):
pathways = dict()
for path in pgdb.all_pathways(selector='all', base=True):
pathways[path] = dict()
pathways[path]['Name'] = pgdb[path].common_name
pathways[path]['Reactions'] = set()
for rxn in pgdb[path].reaction_list:
is_spontaneous = False
is_orphan = None
is_orphan_in_metacyc = False
if pgdb[rxn].spontaneous_p != None and pgdb[rxn].spontaneous_p == True:
is_spontaneous = True
if is_spontaneous == False:
if len(get_genes_of_reaction(rxn, pgdb)) == 0:
is_orphan_in_metacyc = True
if pgdb[rxn].orphan_p != None and pgdb[rxn].orphan_p[0] == "|NO|":
is_orphan = False
else:
is_orphan = True
if not(is_orphan_in_metacyc and (is_orphan == None or is_orphan)) and not is_spontaneous:
pathways[path]['Reactions'].add(rxn)
return(pathways)
def get_reactions_with_genes(pgdb):
reactions = set()
for rxn in pgdb.all_rxns(type_of_reactions = ':all'):
if len(get_genes_of_reaction(rxn, pgdb)) != 0:
reactions.add(rxn)
return(reactions)
def get_pathways(pgdb):
pathways = set()
for path in pgdb.all_pathways(selector='all', base=True):
pathways.add(path)
return(pathways)
def write_pgdb_pathway_completion(pgdb, pgdb_name, use_orphan, completion_dict, position):
print(completion_dict)
if use_orphan:
meta_pathways = get_pathways_none_spontaneous_reactions(pgdb)
file_name = pgdb_name+"_pathway_completion.tsv"
else:
meta_pathways = get_pathways_none_spontaneous_orphan_reactions(pgdb)
file_name = pgdb_name+"_pathway_completion_wo_orphan.tsv"
pgdb_reactions = get_reactions_with_genes(pgdb)
pgdb_pathways = get_pathways(pgdb)
with open(file_name, "w") as pgdb_write:
header="PGDB\tPathway\tPathway_name\tIs_predicted\tCompletion\n"
pgdb_write.write(header)
for path in meta_pathways:
path_predicted = path in pgdb_pathways
if len(meta_pathways[path]['Reactions']) == 0:
completion = 0.0
else:
completion = len(meta_pathways[path]['Reactions'].intersection(pgdb_reactions))/len(meta_pathways[path]['Reactions'])
to_write=pgdb_name+"\t"+path.split("|")[1]+"\t"+meta_pathways[path]['Name']+"\t"+str(path_predicted)+"\t"+str(completion)+"\n"
pgdb_write.write(to_write)
completion_dict[pgdb[path].common_name][position] = (str(completion))
return(completion_dict)
def write_completion_matrix(completion_dict, file_name, header):
with open(file_name, "w") as matrix_file:
matrix_file.write(header)
for pathway in completion_dict:
values_to_write = "\t".join(completion_dict[pathway])
to_write = pathway+"\t"+values_to_write+"\n"
matrix_file.write(to_write)
def init_completion_dict(pgdb_list, completion_dict, completion_dict_wo_orphan):
for p in pgdb_list:
name = p.strip()
pgdb_name = '|'+name+'|'
pgdb = pythoncyc.select_organism(pgdb_name)
pgdb_pathways = get_pathways(pgdb)
for path in pgdb_pathways:
path_name = pgdb[path].common_name
if path_name not in completion_dict.keys():
completion_dict[path_name] = ["0.0"] * len(pgdb_list)
completion_dict_wo_orphan[path_name] = ["0.0"] * len(pgdb_list)
return(completion_dict, completion_dict_wo_orphan)
def main():
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument("-p", help="Enter the name of the PGDB (example : META)", required=False, type=str)
parser.add_argument("-l", help="Enter file with list of PGDB ", required=False, type=str)
parser.add_argument("-m", help="Argument for missing value : O or NA - 0 by default", required=False, type=str)
args = parser.parse_args()
## write completion in separate files
header = "#NAMES"
completion_dict = dict()
completion_dict_wo_orphan = dict()
with open(args.l) as pgdb_file:
pgdb_list = [line.strip() for line in pgdb_file]
(completion_dict, completion_dict_wo_orphan) = init_completion_dict(pgdb_list, completion_dict, completion_dict_wo_orphan)
if args.l:
for (name, position) in zip(pgdb_list, range(len(pgdb_list))):
name = name.strip()
print(name)
header = header +"\t"+name
print(header)
pgdb_name = '|'+name+'|'
print(pgdb_name)
pgdb = pythoncyc.select_organism(pgdb_name)
print(pgdb)
#write_reactions_kegg_cross_ref(pgdb)
write_pathway(pgdb)
completion_dict = write_pgdb_pathway_completion(pgdb, name, True, completion_dict, position)
completion_dict_wo_orphan = write_pgdb_pathway_completion(pgdb, name , False, completion_dict_wo_orphan, position)
#print(pgdb._orgid)
#close_pgdb_wosaving(pgdb)
elif args.p:
pgdb_name = '|'+args.p+'|'
pgdb = pythoncyc.select_organism(pgdb_name)
write_pathway(pgdb)
write_pgdb_pathway_completion(pgdb, meta, args.p, True)
write_pgdb_pathway_completion(pgdb, meta, args.p, False)
else:
sys.exit("Please select an option!")
header = header+"\n"
write_completion_matrix(completion_dict, "completion_matrix.txt", header)
write_completion_matrix(completion_dict_wo_orphan, "completion_matrix_wo_orphan.tsv", header)
if __name__ == "__main__":
main()