@@ -82,6 +82,27 @@ def parse_fasta_str(faa_str, split=DEFAULT_SPLIT, h_func=None):
82
82
return features
83
83
84
84
85
+ def read_gbff_records_from_file (filename : str ):
86
+ if filename .endswith (".gbff" ):
87
+ with open (filename , "r" ) as fh :
88
+ return read_gbff_records (fh )
89
+ elif filename .endswith (".gz" ):
90
+ import gzip
91
+ from io import StringIO
92
+
93
+ with gzip .open (filename , "rb" ) as fh :
94
+ return read_gbff_records (StringIO (fh .read ().decode ("utf-8" )))
95
+
96
+
97
+ def read_gbff_records (handler ):
98
+ from Bio import SeqIO
99
+
100
+ gbff_records = []
101
+ for record in SeqIO .parse (handler , "gb" ):
102
+ gbff_records .append (record )
103
+ return gbff_records
104
+
105
+
85
106
def extract_features (faa_str , split = DEFAULT_SPLIT , h_func = None ):
86
107
features = []
87
108
active_seq = None
@@ -168,6 +189,45 @@ def from_fasta(filename, split=" ", h_func=None):
168
189
genome .features += read_fasta2 (filename , split , h_func )
169
190
return genome
170
191
192
+ @staticmethod
193
+ def from_gbff_sequence (filename ):
194
+ gbff_records = read_gbff_records_from_file (filename )
195
+ genome = MSGenome ()
196
+ features = []
197
+ for rec in gbff_records :
198
+ feature = MSFeature (rec .id , str (rec .seq ), description = rec .description )
199
+ features .append (feature )
200
+ genome .features += features
201
+ return genome
202
+
203
+ @staticmethod
204
+ def from_gbff_features (
205
+ filename , feature_id_qualifier = "protein_id" , description_qualifier = "product"
206
+ ):
207
+ gbff_records = read_gbff_records_from_file (filename )
208
+ genome = MSGenome ()
209
+ features = []
210
+ for rec in gbff_records :
211
+ for f in rec .features :
212
+ if f .type == "CDS" :
213
+ translations = f .qualifiers .get ("translation" , [])
214
+ if len (translations ) == 1 :
215
+ feature_id = f .qualifiers .get (feature_id_qualifier , [None ])[0 ]
216
+ description = f .qualifiers .get (description_qualifier , [None ])[0 ]
217
+ if feature_id :
218
+ feature = MSFeature (
219
+ feature_id , translations [0 ], description = description
220
+ )
221
+ features .append (feature )
222
+ else :
223
+ logger .warning (
224
+ f"skip feature: unable to fetch id from qualifier { feature_id_qualifier } "
225
+ )
226
+ elif len (translations ) > 1 :
227
+ logger .warning (f"skip feature: with multiple sequences { f } " )
228
+ genome .features += features
229
+ return genome
230
+
171
231
def to_fasta (self , filename , l = 80 , fn_header = None ):
172
232
to_fasta (self .features , filename , l , fn_header )
173
233
return filename
@@ -203,3 +263,85 @@ def _repr_html_(self):
203
263
<td>{ len (self .features )} </td>
204
264
</tr>
205
265
</table>"""
266
+
267
+
268
+ class GenomeGff (MSGenome ):
269
+ def __init__ (self , contigs ):
270
+ self .contigs = contigs
271
+ super ().__init__ ()
272
+
273
+ @staticmethod
274
+ def read_sequence (feature_id , gff_record , expected_sequence , contigs ):
275
+ from Bio .Seq import Seq
276
+ from Bio import Align
277
+
278
+ protein_seq_cds = expected_sequence
279
+ feature_contig = contigs .features .get_by_id (gff_record .contig_id )
280
+ seq = Seq (feature_contig .seq [gff_record .start - 1 : gff_record .end ])
281
+ if gff_record .strand == "-" :
282
+ seq = seq .reverse_complement ()
283
+ seq_from_dna = str (seq .translate ())
284
+ if len (seq_from_dna ) > 0 and seq_from_dna [- 1 ] == "*" :
285
+ seq_from_dna = seq_from_dna [:- 1 ]
286
+ if len (protein_seq_cds ) > 0 and protein_seq_cds [- 1 ] == "*" :
287
+ protein_seq_cds = protein_seq_cds [:- 1 ]
288
+ eq = protein_seq_cds == seq_from_dna
289
+
290
+ score = None
291
+ if not eq and len (seq_from_dna ) > 0 :
292
+ try :
293
+ aligner = Align .PairwiseAligner ()
294
+ res = aligner .align (protein_seq_cds , seq_from_dna )
295
+ score = res .score
296
+ except ValueError as ex :
297
+ print ("error" , gff_record )
298
+ raise ex
299
+
300
+ feature = MSFeature (feature_id , protein_seq_cds )
301
+ feature .description = f"score: { score } "
302
+ feature .gff = gff_record
303
+ return feature
304
+
305
+ @staticmethod
306
+ def from_fna_faa_gff (
307
+ filename_fna , filename_faa , filename_gff , _fn_get_id , prodigal = False
308
+ ):
309
+ genome_gff_features = _read_gff_features (filename_gff )
310
+ genome_faa = MSGenome .from_fasta (filename_faa )
311
+ contigs = MSGenome .from_fasta (filename_fna )
312
+
313
+ feature_lookup = {}
314
+ if prodigal :
315
+ for feature in genome_faa .features :
316
+ attr = dict (
317
+ x .split ("=" )
318
+ for x in feature .description .split (" # " )[- 1 ].split (";" )
319
+ )
320
+ if attr ["ID" ] not in feature_lookup :
321
+ feature_lookup [attr ["ID" ]] = feature
322
+ else :
323
+ raise ValueError ("" )
324
+ else :
325
+ feature_lookup = {feature .id : feature for feature in genome_faa .features }
326
+
327
+ features = []
328
+ for gff_record in genome_gff_features :
329
+ if gff_record .feature_type == "CDS" :
330
+ feature_id = gff_record .attr .get ("ID" )
331
+ if _fn_get_id :
332
+ feature_id = _fn_get_id (gff_record )
333
+
334
+ feature_cds = feature_lookup .get (feature_id )
335
+
336
+ if feature_cds :
337
+ protein_seq_cds = feature_cds .seq
338
+ f = GenomeGff .read_sequence (
339
+ feature_id , gff_record , protein_seq_cds , contigs
340
+ )
341
+ features .append (f )
342
+ else :
343
+ print (f"not found { feature_id } " )
344
+
345
+ genome = GenomeGff (contigs )
346
+ genome .features += features
347
+ return genome
0 commit comments