From 143f0c1781a10fca30f13bb5971fb4807f8ed9fe Mon Sep 17 00:00:00 2001 From: Etienne Boileau Date: Wed, 8 May 2024 15:55:23 +0200 Subject: [PATCH] WIP #80 re-implement service, not working --- server/src/scimodom/api/public.py | 9 +- server/src/scimodom/services/comparison.py | 308 +++++++++++++++++++++ server/src/scimodom/services/public.py | 68 ----- server/src/scimodom/utils/operations.py | 208 ++------------ 4 files changed, 342 insertions(+), 251 deletions(-) create mode 100644 server/src/scimodom/services/comparison.py diff --git a/server/src/scimodom/api/public.py b/server/src/scimodom/api/public.py index 29af323b..54f6d1fe 100644 --- a/server/src/scimodom/api/public.py +++ b/server/src/scimodom/api/public.py @@ -5,6 +5,7 @@ from flask_cors import cross_origin from scimodom.services.public import get_public_service +from scimodom.services.comparison import get_comparison_service api = Blueprint("api", __name__) @@ -126,11 +127,9 @@ def get_compare(): upload_path = request.args.get("upload", type=str) query_operation = request.args.get("operation", type=str) - public_service = get_public_service() - response = public_service.get_comparison( - reference_ids, comparison_ids, upload_path, query_operation - ) - return response + # TODO + compare_service = get_comparison_service() + return "Not implemented", 404 @api.route("/upload", methods=["POST"]) diff --git a/server/src/scimodom/services/comparison.py b/server/src/scimodom/services/comparison.py new file mode 100644 index 00000000..4d5ffcbd --- /dev/null +++ b/server/src/scimodom/services/comparison.py @@ -0,0 +1,308 @@ +from collections.abc import Sequence +from logging import getLogger +from pathlib import Path +from typing import Any, Optional, Literal, get_args + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from scimodom.database.database import get_session +from scimodom.database.models import Data, Association +from scimodom.utils.operations import to_bedtool, _remove_extra_feature +from scimodom.services.importer import get_bed_importer +import scimodom.utils.utils as utils + +logger = getLogger(__name__) + + +class FailedUploadError(Exception): + pass + + +class NoRecordsFoundError(Exception): + pass + + +class ComparisonService: + """Utility class to handle dataset comparison. + The EUFID(s) are assumed to be well-defined, + avoiding repeated validation queries to the + database. + + :param session: SQLAlchemy ORM session + :type session: Session + :param reference_ids: Reference EUFID(s) + :type reference_ids: list of str + :param comparison_ids: EUFID(s) to compare + :type comparison_ids: list of str + :param query_operation: Comparison to perform: + intersect, closest, or subtract (difference) + :type query_operation: str + :param is_strand: Perform strand-aware query + :type is_strand: bool + """ + + OPERATIONS = Literal["intersect", "closest", "subtract"] + + def __init__( + self, + session: Session, + reference_ids: list[str], + comparison_ids: list[str], + query_operation: OPERATIONS, + is_strand: bool, + ) -> None: + """Initializer method.""" + self._session = session + self._reference_ids = reference_ids + self._comparison_ids = comparison_ids + self._query_operation = query_operation + self._is_strand = is_strand + + self._reference_records: Sequence[Any] + self._comparison_records: Sequence[Any] + + operations = get_args(self.OPERATIONS) + assert ( + query_operation in operations + ), f"Undefined '{query_operation}'. Allowed values are {operations}." + + @classmethod + def from_upload( + cls, + session: Session, + reference_ids: list[str], + query_operation: str, + is_strand: bool, + upload: str | Path, + ): + """Provides ComparisonService factory to + instantiate class upload instead of EUFID(s). + + :param session: SQLAlchemy ORM session + :type session: Session + :param reference_ids: Reference EUFID(s) + :type reference_ids: list of str + :param upload: Uploaded dataset to compare + :type upload: str | Path + :param query_operation: Comparison to perform: + intersect, closest, or subtract (difference) + :type query_operation: str + :param is_strand: Perform strand-aware query + :type is_strand: bool + """ + upload_path = Path(upload) + if not upload_path.is_file(): + msg = f"No such file or directory: {upload_path.as_posix()}" + raise FileNotFoundError(msg) + try: + db_records = ComparisonService.upload_file() + except Exception as exc: + # upload itself is "fail-safe", catch eveything else... + msg = f"Failed to upload {upload_path.as_posix()}. " + raise FailedUploadError(msg) + records = [tuple([val for key, val in record.items()]) for record in db_records] + # ... but records might be "skipped" for various reasons + if not records: + raise NoRecordsFoundError + service = cls(session, reference_ids, [], query_operation, is_strand) + service._comparison_records = records + return service + + @staticmethod + def upload_file(file_path: Path) -> list[dict[str, Any]]: + """Upload bed-like file. + + :param file_path: Path to file + :type file_path: Path + :return: Uploaded records + :rtype: list of dict of {str: Any} + """ + importer = get_bed_importer(file_path) + importer.parse_records() + importer.close() + return importer.get_buffer() + + def get_records(self) -> None: + """Query the database for selected records.""" + + def _construct_query(idx): + query = ( + select( + Data.chrom, + Data.start, + Data.end, + Data.name, + Data.score, + Data.strand, + Association.dataset_id, + Data.coverage, + Data.frequency, + ) + .join_from(Data, Association, Data.inst_association) + .where(Association.dataset_id.in_(idx)) + ) + return query + + query = _construct_query(self._reference_ids) + self._reference_records = self._session.execute(query).all() + + if self._comparison_ids: + self._comparison_records = [] + for idx in self._comparison_ids: + query = _construct_query([idx]) + self._comparison_records.append(self._session.execute(query).all()) + + # check for empty records? + + def compare_dataset(self): # -> + """Execute comparison. + + :return: Result form comparison + :rtype: + """ + func = eval(f"_get_{self._query_operation}") + records = func(self._reference_records, self._comparison_records) + + # TODO + # from scimodom.utils.models import records_factory + # op, strand = query_operation.split("S") + # c_records = get_op(op)(a_records, b_records, s=eval(strand)) + # records = [records_factory(op.capitalize(), r)._asdict() for r in c_records] + + return records + + def _get_intersect( + a_records: Sequence[Any], + b_records: Sequence[Any], + s: bool = True, + sorted: bool = True, + ) -> list[Any]: + """Wrapper for pybedtools.bedtool.BedTool.intersect + + Relies on the behaviour of bedtools -wa -wb option: the first + column after the complete -a record lists the file number + from which the overlap came. + + :param a_records: DB records (A features) + :type a_records: Sequence (list of tuples) + :param b_records: DB records (B features) + :type b_records: Sequence (list of tuples) + :param s: Force strandedness + :type s: bool + :param sorted: Invoked sweeping algorithm + :type sorted: bool + :returns: records + :rtype: list of tuples + """ + + # required options + # write the original entry in A for each overlap + wa: bool = True + # write the original entry in B for each overlap + wb: bool = True + + a_bedtool, b_bedtool = to_bedtool(a_records), to_bedtool( + b_records, as_list=True + ) + bedtool = a_bedtool.intersect( + b=[b.fn for b in b_bedtool], wa=wa, wb=wb, s=s, sorted=sorted + ) + stream = bedtool.each(_remove_extra_feature) + records = [tuple(s.fields) for s in stream] + return records + + def _get_closest( + a_records: Sequence[Any], + b_records: Sequence[Any], + s: bool = True, + sorted: bool = True, + ) -> list[Any]: + """Wrapper for pybedtools.bedtool.BedTool.closest + + Relies on the behaviour of bedtools -io -t -mdb -D options: the first + column after the complete -a record lists the file number + from which the closest interval came. + + :param a_records: DB records (A features) + :type a_records: Sequence (list of tuples) + :param b_records: DB records (B features) + :type b_records: Sequence (list of tuples) + :param s: Force strandedness + :type s: bool + :param sorted: Invoked sweeping algorithm + :type sorted: bool + :returns: records + :rtype: list of tuples + """ + + # required options + # Ignore features in B that overlap A + io: bool = True + # Report all ties + t: str = "all" + # Report closest records among all databases + mdb: str = "all" + # Report distance with respect to A + D: str = "a" + + a_bedtool, b_bedtool = to_bedtool(a_records), to_bedtool( + b_records, as_list=True + ) + bedtool = a_bedtool.closest( + b=[b.fn for b in b_bedtool], io=io, t=t, mdb=mdb, D=D, s=s, sorted=sorted + ) + stream = bedtool.each(_remove_extra_feature) + records = [tuple(s.fields) for s in stream] + + # TODO + # Reports “none” for chrom (?) and “-1” for all other fields (?) when a feature + # is not found in B on the same chromosome as the feature in A. + # Note that "start" (fields) is a string! + # 9 + 0/1 + 1 + # c_bedtool = c_bedtool.filter(lambda c: c.fields[(offset + filnum_idx + 1)] != "-1") + + return records + + def _get_subtract( + a_records: Sequence[Any], + b_records: Sequence[Any], + s: bool = True, + sorted: bool = True, + ) -> list[Any]: + """Wrapper for pybedtools.bedtool.BedTool.subtract + + :param a_records: DB records (A features) + :type a_records: Sequence (list of tuples) + :param b_records: DB records (B features) + :type b_records: Sequence (list of tuples) + :param s: Force strandedness + :type s: bool + :param sorted: Invoked sweeping algorithm + :type sorted: bool + :param n_fields: Number of other fields attribute in addition to BED6 + :type n_fields: int + :returns: records + :rtype: list of tuples + """ + + a_bedtool, b_bedtool = to_bedtool(a_records), to_bedtool( + utils.flatten_list(b_records) + ) + c_bedtool = a_bedtool.subtract(b_bedtool, s=s, sorted=sorted) + # records = [(i.fields[:offset]) for i in c_bedtool] + # TODO ??? offset = 6 + 3 + # records = [tuple(i.fields[:offset]) for i in c_bedtool] + # return records + + +_cached_service: Optional[ComparisonService] = None + + +def get_comparison_service() -> ComparisonService: + global _cached_service + if _cached_service is None: + # TODO + # _cached_service = ComparisonService(session=get_session()) + pass + return _cached_service diff --git a/server/src/scimodom/services/public.py b/server/src/scimodom/services/public.py index e4266bdc..d26e0aa5 100644 --- a/server/src/scimodom/services/public.py +++ b/server/src/scimodom/services/public.py @@ -29,15 +29,10 @@ Selection, ) import scimodom.database.queries as queries -from scimodom.services.importer import get_bed_importer from scimodom.services.annotation import AnnotationService from scimodom.services.assembly import AssemblyService import scimodom.utils.specifications as specs -# TODO -from scimodom.utils.models import records_factory -from scimodom.utils.operations import get_op - class PublicService: """Utility class to handle all public requests. @@ -465,69 +460,6 @@ def get_dataset(self): return self._dump(query) - def get_comparison(self, reference_ids, comparison_ids, upload, query_operation): - """Retrieve ...""" - # TODO: refactor - # API call in compare, thenquery_operation pass as params to SPA components - # but sending all datasets may be too large? - # final call after dataset selection + query - # + lazy loading of results? - - query = ( - select( - Data.chrom, - Data.start, - Data.end, - Data.name, - Data.score, - Data.strand, - Association.dataset_id, - # Data.dataset_id, - Data.coverage, - Data.frequency, - ) - .join_from(Data, Association, Data.inst_association) - .where(Association.dataset_id.in_(reference_ids)) - # .order_by(Data.chrom.asc(), Data.start.asc()) - ) - a_records = self._session.execute(query).all() - - # AD HOC - EUF VERSION SHOULD COME FROM SOMEWHERE ELSE! - if upload: - importer = get_bed_importer(upload) - importer.parse_records() - importer.close() - b_records = importer.get_buffer() - # records = [tuple([val for key, val in record.items()]) for record in b_records] - # print(b_records) - else: - b_records = [] - for idx in comparison_ids: - query = ( - select( - Data.chrom, - Data.start, - Data.end, - Data.name, - Data.score, - Data.strand, - Association.dataset_id, - # Data.dataset_id, - Data.coverage, - Data.frequency, - ) - .join_from(Data, Association, Data.inst_association) - .where(Association.dataset_id == idx) - # .where(Data.dataset_id == idx) - ) - b_records.append(get_session().execute(query).all()) - - op, strand = query_operation.split("S") - c_records = get_op(op)(a_records, b_records, s=eval(strand)) - records = [records_factory(op.capitalize(), r)._asdict() for r in c_records] - - return records - def _dump(self, query): """Serialize a query from a select statement using individual columns of an ORM entity, i.e. using execute(), diff --git a/server/src/scimodom/utils/operations.py b/server/src/scimodom/utils/operations.py index 1e726c31..ea7a92e4 100644 --- a/server/src/scimodom/utils/operations.py +++ b/server/src/scimodom/utils/operations.py @@ -22,7 +22,7 @@ pybedtools.helpers.set_tempdir(tempdir) -def _to_bedtool(records, asl: bool = False): +def to_bedtool(records, as_list: bool = False): """Convert records to BedTool and sort TODO: records can be str | Path | Sequence[Any], see below get_genomic_annotation! @@ -31,189 +31,18 @@ def _to_bedtool(records, asl: bool = False): :param records: Database records (or list of records) :type records: Sequence - :returns: bedtool + :param as_list: Return results as a list of BedTool + :type as_list: bool + :return: bedtool :rtype: BedTool or list of BedTool """ - if asl: + if as_list: bedtool = [pybedtools.BedTool(record).sort() for record in records] else: bedtool = pybedtools.BedTool(records).sort() return bedtool -def get_op(op: str): - """Function selection - - :param op: operation - :type op: str - :returns: selected function - :rtype: function - """ - return eval(f"get_{op}") - - -def get_intersect( - a_records: Sequence[Any], - b_records: Sequence[Any], - s: bool = True, - sorted: bool = True, - n_fields: int = 3, -) -> list[Any]: - """Wrapper for pybedtools.bedtool.BedTool.intersect - - Relies on the behaviour of bedtools -wa -wb option: the first - column after the complete -a record lists the file number - from which the overlap came. - - :param a_records: DB records (A features) - :type a_records: Sequence (list of tuples) - :param b_records: DB records (B features) - :type b_records: Sequence (list of tuples) - :param s: Force strandedness - :type s: bool - :param sorted: Invoked sweeping algorithm - :type sorted: bool - :param n_fields: Number of other fields attribute in addition to BED6 - :type n_fields: int - :returns: c_records - :rtype: list of tuples - """ - - # required options - # write the original entry in A for each overlap - wa: bool = True - # write the original entry in B for each overlap - wb: bool = True - - # file number index - offset = 6 + n_fields - filnum_idx = 1 - if len(b_records) == 1: - filnum_idx = 0 - - a_bedtool, b_bedtool = _to_bedtool(a_records), _to_bedtool(b_records, asl=True) - c_bedtool = a_bedtool.intersect( - b=[b.fn for b in b_bedtool], wa=wa, wb=wb, s=s, sorted=sorted - ) - c_records = [ - tuple( - sum( - ( - [i.chrom, i.start, i.end, i.name, i.score, i.strand], - i.fields[6:offset], - i.fields[(offset + filnum_idx) :], - ), - [], - ) - ) - for i in c_bedtool - ] - return c_records - - -def get_closest( - a_records: Sequence[Any], - b_records: Sequence[Any], - s: bool = True, - sorted: bool = True, - n_fields: int = 3, -) -> list[Any]: - """Wrapper for pybedtools.bedtool.BedTool.closest - - Relies on the behaviour of bedtools -io -t -mdb -D options: the first - column after the complete -a record lists the file number - from which the closest interval came. - - :param a_records: DB records (A features) - :type a_records: Sequence (list of tuples) - :param b_records: DB records (B features) - :type b_records: Sequence (list of tuples) - :param s: Force strandedness - :type s: bool - :param sorted: Invoked sweeping algorithm - :type sorted: bool - :param n_fields: Number of other fields attribute in addition to BED6 - :type n_fields: int - :returns: c_records - :rtype: list of tuples - """ - - # required options - # Ignore features in B that overlap A - io: bool = True - # Report all ties - t: str = "all" - # Report closest records among all databases - mdb: str = "all" - # Report distance with respect to A - D: str = "a" - - # file number index - offset = 6 + n_fields - filnum_idx = 1 - if len(b_records) == 1: - filnum_idx = 0 - - a_bedtool, b_bedtool = _to_bedtool(a_records), _to_bedtool(b_records, asl=True) - c_bedtool = a_bedtool.closest( - b=[b.fn for b in b_bedtool], io=io, t=t, mdb=mdb, D=D, s=s, sorted=sorted - ) - - # Reports “none” for chrom (?) and “-1” for all other fields (?) when a feature - # is not found in B on the same chromosome as the feature in A. - # Note that "start" (fields) is a string! - c_bedtool = c_bedtool.filter(lambda c: c.fields[(offset + filnum_idx + 1)] != "-1") - c_records = [ - tuple( - sum( - ( - [i.chrom, i.start, i.end, i.name, i.score, i.strand], - i.fields[6:offset], - i.fields[(offset + filnum_idx) :], - ), - [], - ) - ) - for i in c_bedtool - ] - return c_records - - -def get_subtract( - a_records: Sequence[Any], - b_records: Sequence[Any], - s: bool = True, - sorted: bool = True, - n_fields: int = 3, -) -> list[Any]: - """Wrapper for pybedtools.bedtool.BedTool.subtract - - :param a_records: DB records (A features) - :type a_records: Sequence (list of tuples) - :param b_records: DB records (B features) - :type b_records: Sequence (list of tuples) - :param s: Force strandedness - :type s: bool - :param sorted: Invoked sweeping algorithm - :type sorted: bool - :param n_fields: Number of other fields attribute in addition to BED6 - :type n_fields: int - :returns: c_records - :rtype: list of tuples - """ - - # file number index - offset = 6 + n_fields - - a_bedtool, b_bedtool = _to_bedtool(a_records), _to_bedtool( - utils.flatten_list(b_records) - ) - c_bedtool = a_bedtool.subtract(b_bedtool, s=s, sorted=sorted) - # c_records = [(i.fields[:offset]) for i in c_bedtool] - c_records = [tuple(i.fields[:offset]) for i in c_bedtool] - return c_records - - def annotate_data_to_records( annotation_path: Path, features: dict[str, str], records: Sequence[Any], error ) -> Sequence[Any]: @@ -248,7 +77,7 @@ def _intersect(annotation, feature): if gene_id is not None ] - data_bedtool = _to_bedtool(records) + data_bedtool = to_bedtool(records) try: intergenic_feature = features.pop("intergenic") except KeyError as exc: @@ -409,7 +238,7 @@ def liftover_to_file( :returns: File with liftedOver features :rtype: str """ - bedtool = _to_bedtool(records) + bedtool = to_bedtool(records) result = pybedtools.BedTool._tmp() if unmapped is None: unmapped = pybedtools.BedTool._tmp() @@ -427,6 +256,29 @@ def liftover_to_file( return result +def _remove_extra_feature(feature, n_fields=9): + """This function is to be passed + as argument to BedTool.each(), to + generate a BED-like Interval. This is used + to "strip" the returned interval from the + "additional column describing the file number" + when calling intersect or closest (with -wa and + -wb). The default format is BED6+3, where + 3 additional fields are "dataset_id", "coverage", + and "frequency". + + :param feature: A feature from a BED file. + :type feature: pybedtools.Interval + :return: New interval + :rtype: pybedtools interval + """ + target = 2 * n_fields + 1 + line = [f for f in feature.fields] + if len(feature.fields) == target: + line.pop(n_fields) + return pybedtools.cbedtools.create_interval_from_list(line) + + def _get_gtf_attrs(feature): """This function is to be passed as argument to BedTool.each(), to