2
2
import logging
3
3
from functools import cache
4
4
from posixpath import join as urljoin
5
- from typing import Any , Sequence
5
+ from typing import Any , Sequence , TextIO
6
6
7
7
from sqlalchemy import select , func
8
8
from sqlalchemy .exc import NoResultFound
25
25
26
26
27
27
class AssemblyNotFoundError (Exception ):
28
- """Exception handling for a non-existing Assembly."""
28
+ """Exception for handling a non-existing Assembly."""
29
29
30
30
pass
31
31
32
32
33
33
class AssemblyVersionError (Exception ):
34
- """Exception handling for Assembly version mismatch."""
34
+ """Exception for handling an Assembly version mismatch."""
35
+
36
+ pass
37
+
38
+
39
+ class AssemblyAbortedError (Exception ):
40
+ """Exception for handling general errors associated with
41
+ preparing/adding assemblies e.g. request streaming, etc."""
35
42
36
43
pass
37
44
@@ -86,6 +93,36 @@ def get_assembly_by_id(self, assembly_id: int) -> Assembly:
86
93
except NoResultFound :
87
94
raise AssemblyNotFoundError (f"No such assembly with ID: { assembly_id } ." )
88
95
96
+ def get_assembly_by_name (
97
+ self , taxa_id : int , assembly_name : str , fail_safe : bool = True
98
+ ) -> Assembly :
99
+ """Retrieve assembly by name. If not found, add it to the database,
100
+ unless fail_safe is False, in which case raises AssemblyNotFoundError.
101
+
102
+ :param taxa_id: Taxonomy ID
103
+ :type taxa_id: int
104
+ :param assembly_name: Assembly name
105
+ :type assembly_name: str
106
+ :param fail_safe: If True (default), add assembly if not found.
107
+ :type fail_safe: bool
108
+ :returns: Newly created or existing assembly
109
+ :rtype: Assembly
110
+
111
+ :raises: AssemblyNotFoundError
112
+ """
113
+ try :
114
+ assembly = self ._session .execute (
115
+ select (Assembly ).filter_by (taxa_id = taxa_id , name = assembly_name )
116
+ ).scalar_one ()
117
+ return assembly
118
+ except NoResultFound :
119
+ if fail_safe :
120
+ return self ._add_assembly (taxa_id , assembly_name )
121
+ else :
122
+ raise AssemblyNotFoundError (
123
+ f"No such assembly '{ assembly_name } ' for organism '{ taxa_id } '."
124
+ )
125
+
89
126
def get_assemblies_by_taxa (self , taxa_id : int ) -> Sequence [Assembly ]:
90
127
"""Retrieve all assemblies for a given organism.
91
128
@@ -158,13 +195,13 @@ def _yield_chroms():
158
195
159
196
return list (_yield_chroms ())
160
197
161
- def liftover (
198
+ def create_lifted_file (
162
199
self ,
163
200
assembly : Assembly ,
164
201
raw_file : str ,
165
202
unmapped_file : str | None = None ,
166
203
threshold : float = ImportLimits .LIFTOVER .max ,
167
- ) -> str :
204
+ ) -> TextIO :
168
205
"""Liftover records to current assembly.
169
206
170
207
:param assembly: Assembly instance
@@ -175,8 +212,8 @@ def liftover(
175
212
:type unmapped_file: str | None
176
213
:param threshold: Threshold for raising LiftOverError
177
214
:type threshold: float
178
- :returns: Files pointing to the liftedOver features
179
- :rtype: str
215
+ :returns: File handle pointing to the liftedOver features
216
+ :rtype: TextIO
180
217
"""
181
218
if self .is_latest_assembly (assembly ):
182
219
raise AssemblyVersionError ("Cannot liftover for latest assembly." )
@@ -207,60 +244,7 @@ def liftover(
207
244
f"{ unmapped_lines } records could not be mapped... "
208
245
"Contact the system administrator if you have questions."
209
246
)
210
- return lifted_file
211
-
212
- def add_assembly (self , taxa_id : int , assembly_name : str ) -> int :
213
- """Add an alternative assembly to the database.
214
-
215
- If assembly exists, nothing is done.
216
-
217
- :param taxa_id: Taxonomy ID
218
- :type taxa_id: int
219
- :param assembly_name: Assembly name
220
- :type assembly_name: str
221
- :returns: Newly created or existing assembly ID
222
- :rtype: int
223
- """
224
- try :
225
- assembly = self ._session .execute (
226
- select (Assembly ).filter_by (taxa_id = taxa_id , name = assembly_name )
227
- ).scalar_one ()
228
- return assembly .id
229
- except NoResultFound :
230
- pass
231
-
232
- if self ._file_service .check_if_assembly_exists (taxa_id , assembly_name ):
233
- raise FileExistsError (
234
- f"Directory exists, but assembly '{ assembly_name } ' does not exist!"
235
- )
236
-
237
- chain_file_name = self ._get_chain_file_name (
238
- assembly_name , self .get_name_for_version (taxa_id )
239
- )
240
- url = self ._get_ensembl_chain_file_url (taxa_id , chain_file_name )
241
-
242
- logger .info (f"Setting up a new assembly for { assembly_name } ..." )
243
- try :
244
- with self ._file_service .create_chain_file (
245
- taxa_id , chain_file_name , assembly_name
246
- ) as fh :
247
- self ._web_service .stream_request_to_file (url , fh )
248
- version_nums = (
249
- self ._session .execute (select (func .distinct (Assembly .version )))
250
- .scalars ()
251
- .all ()
252
- )
253
- version_num = gen_short_uuid (Identifiers .ASSEMBLY .length , version_nums )
254
- assembly = Assembly (
255
- name = assembly_name , taxa_id = taxa_id , version = version_num
256
- )
257
- self ._session .add (assembly )
258
- self ._session .commit ()
259
- return assembly .id
260
- except Exception :
261
- self ._session .rollback ()
262
- self ._file_service .delete_assembly (taxa_id , assembly_name )
263
- raise
247
+ return self ._file_service .open_file_for_reading (lifted_file )
264
248
265
249
def prepare_assembly_for_version (self , assembly_id : int ) -> None :
266
250
"""Prepare directories and files for the latest version.
@@ -288,14 +272,56 @@ def prepare_assembly_for_version(self, assembly_id: int) -> None:
288
272
try :
289
273
self ._handle_gene_build (assembly )
290
274
self ._handle_release (assembly )
291
- except Exception :
275
+ except AssemblyVersionError :
292
276
self ._file_service .delete_assembly (assembly .taxa_id , assembly .name )
293
277
raise
278
+ except Exception as exc :
279
+ self ._file_service .delete_assembly (assembly .taxa_id , assembly .name )
280
+ raise AssemblyAbortedError (
281
+ f"Adding assembly for ID '{ assembly_id } ' aborted."
282
+ ) from exc
294
283
295
284
@staticmethod
296
285
def _get_chain_file_name (source_assembly_name , target_assembly_name ):
297
286
return f"{ source_assembly_name } _to_{ target_assembly_name } .chain.gz"
298
287
288
+ def _add_assembly (self , taxa_id : int , assembly_name : str ) -> Assembly :
289
+ if self ._file_service .check_if_assembly_exists (taxa_id , assembly_name ):
290
+ raise AssemblyAbortedError (
291
+ f"Suspected incomplete or inconsistent data: files were found on the "
292
+ f"system for '{ assembly_name } ', but assembly does not exist in the database."
293
+ )
294
+
295
+ chain_file_name = self ._get_chain_file_name (
296
+ assembly_name , self .get_name_for_version (taxa_id )
297
+ )
298
+ url = self ._get_ensembl_chain_file_url (taxa_id , chain_file_name )
299
+
300
+ logger .info (f"Setting up a new assembly for { assembly_name } ..." )
301
+ try :
302
+ with self ._file_service .create_chain_file (
303
+ taxa_id , chain_file_name , assembly_name
304
+ ) as fh :
305
+ self ._web_service .stream_request_to_file (url , fh )
306
+ version_nums = (
307
+ self ._session .execute (select (func .distinct (Assembly .version )))
308
+ .scalars ()
309
+ .all ()
310
+ )
311
+ version_num = gen_short_uuid (Identifiers .ASSEMBLY .length , version_nums )
312
+ assembly = Assembly (
313
+ name = assembly_name , taxa_id = taxa_id , version = version_num
314
+ )
315
+ self ._session .add (assembly )
316
+ self ._session .commit ()
317
+ return assembly
318
+ except Exception as exc :
319
+ self ._session .rollback ()
320
+ self ._file_service .delete_assembly (taxa_id , assembly_name )
321
+ raise AssemblyAbortedError (
322
+ f"Adding assembly for '{ assembly_name } ' aborted."
323
+ ) from exc
324
+
299
325
def _get_ensembl_chain_file_url (self , taxa_id : int , chain_file_name ):
300
326
return urljoin (
301
327
Ensembl .FTP .value ,
0 commit comments