diff --git a/flask/cluster.py b/flask/cluster.py index 02da210..7e5db86 100644 --- a/flask/cluster.py +++ b/flask/cluster.py @@ -1,32 +1,32 @@ from xml.etree import ElementTree import subprocess -import utils +from configManager import ConfigManager +from logger import Logger import query from sys import platform - -uclust_identity = utils.get_config()['uclust_identity'] # how similar sequences in the same cluster must be +config_manager = ConfigManager() +config = config_manager.load_config() # Load config once +uclust_identity = config['uclust_identity'] # Get the uclust identity value +logger_ = Logger() sequences_filename = 'dumps/sequences.fsa' -if 'which_search' not in utils.get_config(): - explorerConfig = utils.get_config() - explorerConfig['which_search'] = 'vsearch' - utils.set_config(explorerConfig) +# Ensure 'which_search' is set in config +if 'which_search' not in config: + config['which_search'] = 'vsearch' + config_manager.save_config(config) -whichSearch = utils.get_config()['which_search'] +whichSearch = config['which_search'] -if platform == "linux" or platform == "linux2": - if whichSearch == 'usearch': - usearch_binary_filename = 'usearch/usearch10.0.240_i86linux32' - elif whichSearch == 'vsearch': - usearch_binary_filename = 'usearch/vsearch_linux' +# Determine the correct binary filename based on OS and search tool +usearch_binary_filename = None +if platform.startswith("linux"): + usearch_binary_filename = 'usearch/vsearch_linux' if whichSearch == 'vsearch' else 'usearch/usearch10.0.240_i86linux32' elif platform == "darwin": - if whichSearch == 'usearch': - usearch_binary_filename = 'usearch/usearch11.0.667_i86osx32' - elif whichSearch == 'vsearch': - usearch_binary_filename = 'usearch/vsearch_macos' + usearch_binary_filename = 'usearch/vsearch_macos' if whichSearch == 'vsearch' else 'usearch/usearch11.0.667_i86osx32' else: - utils.log("Sorry, your OS is not supported for sequence based-search.") + logger_.log("Sorry, your OS is not supported for sequence-based search.") + raise SystemExit uclust_results_filename = 'usearch/uclust_results.uc' @@ -40,115 +40,73 @@ } ''' - def write_fasta(sequences): - f = open(sequences_filename, 'w') - - for sequence in sequences: - f.write('>%s\n' % sequence['subject']) - f.write('%s\n' % sequence['sequence']) - - f.close() - + with open(sequences_filename, 'w') as f: + for sequence in sequences: + f.write(f">{sequence['subject']}\n{sequence['sequence']}\n") def run_uclust(): args = [usearch_binary_filename, '-cluster_fast', sequences_filename, '-id', uclust_identity, '-sort', 'length', '-uc', uclust_results_filename] - popen = subprocess.Popen(args, stdout=subprocess.PIPE) - popen.wait() - output = popen.stdout.read() - utils.log_indexing(str(output)) - + # result = subprocess.run(args, capture_output=True, text=True) # Python3.7 + result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) + logger_.log(result.stdout, True) def analyze_uclust(): - f = open(uclust_results_filename, 'r') - results = f.read() - total_parts = 0 total_identity = 0.0 hits = 0 - lines = results.splitlines() - for line in lines: - line = line.split() - record_type = line[0] - - if record_type in ('H', 'S'): - total_parts += 1 - - if line[0] is 'H': - total_identity += float(line[3]) - hits += 1 - - f.close() - utils.log_indexing('parts: ' + str(total_parts)) - utils.log_indexing('hits: ' + str(hits)) - + with open(uclust_results_filename, 'r') as f: + for line in f: + parts = line.split() + record_type = parts[0] + if record_type in ('H', 'S'): + total_parts += 1 + if record_type == 'H': + total_identity += float(parts[3]) + hits += 1 + + logger_.log(f'parts: {total_parts}', True) + logger_.log(f'hits: {hits}', True) if hits > 0: - utils.log_indexing('average hit identity: ' + str(total_identity / hits)) - + logger_.log(f'average hit identity: {total_identity / hits}', True) def uclust2uris(fileName): uris = set() - - f = open(fileName, 'r') - results = f.read() - lines = results.splitlines() - - for line in lines: - line = line.split() - - if line[0] is 'H': - partURI = line[9] - - uris.add(partURI) - - f.close() - + with open(fileName, 'r') as f: + for line in f: + parts = line.split() + if parts[0] == 'H': + uris.add(parts[9]) return uris def uclust2clusters(): - # populate cluster2parts cluster2parts = {} - f = open(uclust_results_filename, 'r') - results = f.read() - lines = results.splitlines() - - for line in lines: - line = line.split() - - if line[0] is 'H' or line[0] is 'S': - part, cluster = line[8], line[1] + with open(uclust_results_filename, 'r') as f: + for line in f: + parts = line.split() + if parts[0] in ('H', 'S'): + part, cluster = parts[8], parts[1] + if cluster not in cluster2parts: + cluster2parts[cluster] = set() + cluster2parts[cluster].add(part) - if cluster not in cluster2parts: - cluster2parts[cluster] = set() - cluster2parts[cluster].add(part) - - f.close() - - # transform cluster2parts to clusters - clusters = {} - - for cluster in cluster2parts: - parts = cluster2parts[cluster] - for part in parts: - clusters[part] = parts.difference({part}) + clusters = {part: parts.difference({part}) for cluster, parts in cluster2parts.items() for part in parts} return clusters - def update_clusters(): - utils.log_indexing('------------ Updating clusters ------------') - utils.log_indexing('******** Query for sequences ********') + logger_.log('------------ Updating clusters ------------', True) + logger_.log('******** Query for sequences ********', True) sequences_response = query.query_sparql(sequence_query) - utils.log_indexing('******** Query for sequences complete ********') + logger_.log('******** Query for sequences complete ********', True) write_fasta(sequences_response) - utils.log_indexing('******** Running uclust ********') + logger_.log('******** Running uclust ********', True) run_uclust() - utils.log_indexing('******** Running uclust complete ********') + logger_.log('******** Running uclust complete ********', True) analyze_uclust() - utils.log_indexing('------------ Successsfully updated clusters ------------\n') + logger_.log('------------ Successfully updated clusters ------------\n', True) return uclust2clusters() - diff --git a/flask/config.json b/flask/config.json index a188e64..ceceb42 100644 --- a/flask/config.json +++ b/flask/config.json @@ -2,7 +2,7 @@ "uclust_identity": "0.8", "elasticsearch_index_name": "part", "pagerank_tolerance": "0.0001", - "elasticsearch_endpoint": "http://localhost:9200/", + "elasticsearch_endpoint": "http://elasticsearch:9200/", "sparql_endpoint": "http://localhost:8890/sparql?", "last_update_start": "none", "last_update_end": "none", diff --git a/flask/configManager.py b/flask/configManager.py new file mode 100644 index 0000000..096ff41 --- /dev/null +++ b/flask/configManager.py @@ -0,0 +1,63 @@ +import json +import datetime + +class ConfigManager: + def __init__(self, config_file='config.json'): + self.config_file = config_file + self._config = None + + def load_config(self): + """ + Gets a copy of the config file + Returns: Config file in JSON + + """ + if self._config is None: + with open(self.config_file) as f: + self._config = json.load(f) + return self._config + + def save_config(self, new_config): + """ + Overwrites the existing config with a new config file + Args: + new_config: New config file with the updated information + Returns: + """ + config = self.load_config() + config.update(new_config) + with open(self.config_file, 'w') as f: + json.dump(config, f) + + def save_time(self, attribute): + """ + Saves the current time to an attribute in the config + Args: + attribute: Config attribute to save current time to + + Returns: + + """ + config = self.load_config() + config[attribute] = datetime.datetime.now().isoformat() + self.save_config(config) + + def get_es_endpoint(self): + return self.load_config().get('elasticsearch_endpoint') + + def save_update_end_time(self): + """ + Save end time of indexing + Returns: + + """ + return self.save_time("last_update_end") + + + def save_update_start_time(self): + """ + Save start time of indexing + Returns: + + """ + return self.save_time("last_update_start") diff --git a/flask/dataManager.py b/flask/dataManager.py new file mode 100644 index 0000000..82e50e0 --- /dev/null +++ b/flask/dataManager.py @@ -0,0 +1,76 @@ +import pickle +import os +class DataManager: + def __init__(self, clusters_filename='dumps/clusters_dump', uri2rank_filename='dumps/uri2rank_dump'): + self.clusters_filename = clusters_filename + self.uri2rank_filename = uri2rank_filename + self._clusters = None + self._uri2rank = None + + def save_clusters(self, clusters): + """ + Save clusters of parts + Args: + new_clusters: Clusters to be saved + + Returns: + + """ + self._clusters = clusters + self._serialize(self._clusters, self.clusters_filename) + + def get_clusters(self): + if self._clusters is None: + self._clusters = self._deserialize(self.clusters_filename) + return self._clusters + + def save_uri2rank(self, uri2rank): + """ + Saves the pagerank of all URI's + Args: + new_uri2rank: + + Returns: + + """ + self._uri2rank = uri2rank + self._serialize(self._uri2rank, self.uri2rank_filename) + + def get_uri2rank(self): + """ + Gets all pageranks of URI's + Returns: + + """ + if self._uri2rank is None: + self._uri2rank = self._deserialize(self.uri2rank_filename) + return self._uri2rank + + @staticmethod + def _serialize(data, filename): + """ + Serializes some data to a file + Args: + data: Data to be written + filename: File to be written to + + Returns: + + """ + with open(filename, 'wb') as f: + pickle.dump(data, f) + + @staticmethod + def _deserialize(filename): + """ + Deserializes data from a serialized file + Args: + filename: Serialized file + + Returns: Deserialized data from file + + """ + if os.path.exists(filename): + with open(filename, 'rb') as f: + return pickle.load(f) + return {} diff --git a/flask/docker/Dockerfile b/flask/docker/Dockerfile index 60926a1..b6f5883 100644 --- a/flask/docker/Dockerfile +++ b/flask/docker/Dockerfile @@ -1,13 +1,25 @@ -FROM ubuntu:16.04 -MAINTAINER Michael Zhang +FROM ubuntu:22.04 + +# Set the timezone environment variables to avoid interaction +ENV DEBIAN_FRONTEND=noninteractive +ENV TZ=America/New_York + +# Install tzdata without interaction +RUN apt-get update && apt-get install -y tzdata + +# Set timezone +RUN ln -fs /usr/share/zoneinfo/America/New_York /etc/localtime && \ + dpkg-reconfigure -f noninteractive tzdata RUN apt-get update && \ - apt-get install -y software-properties-common && \ + apt-get install -y software-properties-common coreutils && \ add-apt-repository ppa:deadsnakes/ppa && \ apt-get update && \ - apt-get install -y git python3.6 python3.6-pip && \ - python3.6 -m pip install pip --upgrade && \ - git clone https://github.com/michael13162/SBOLExplorer.git && \ + apt-get install -y git cron python3.11 python3-pip python3.11-venv && \ + python3.11 -m pip install pip --upgrade && \ + python3.11 -m venv jammy && \ + . jammy/bin/activate && \ + git clone https://github.com/SynBioDex/SBOLExplorer.git && \ cd SBOLExplorer/flask && \ pip install -r requirements.txt && \ crontab update.cron @@ -26,5 +38,4 @@ RUN mkdir /mnt/config && \ rm -rf dumps && \ ln -s /mnt/data dumps -CMD "./start.sh" - +CMD sh -c ". ../../jammy/bin/activate && ./start.sh" diff --git a/flask/docker/Dockerfile-synbiohub b/flask/docker/Dockerfile-synbiohub index ed54445..cfe8566 100644 --- a/flask/docker/Dockerfile-synbiohub +++ b/flask/docker/Dockerfile-synbiohub @@ -1,13 +1,25 @@ -FROM ubuntu:16.04 -MAINTAINER Michael Zhang +FROM ubuntu:22.04 + +#Set the timezone environment variables to avoid interaction +ENV DEBIAN_FRONTEND=noninteractive +ENV TZ=America/New_York + +# Install tzdata without interaction +RUN apt-get update && apt-get install -y tzdata + +# Set timezone +RUN ln -fs /usr/share/zoneinfo/America/New_York /etc/localtime && \ + dpkg-reconfigure -f noninteractive tzdata RUN apt-get update && \ apt-get install -y software-properties-common && \ add-apt-repository ppa:deadsnakes/ppa && \ apt-get update && \ - apt-get install -y git python3.6 python3-pip && \ - python3.6 -m pip install pip --upgrade && \ - git clone https://github.com/michael13162/SBOLExplorer.git && \ + apt-get install -y git cron python3.11 python3-pip python3.11-venv && \ + python3.11 -m pip install pip --upgrade && \ + python3.11 -m venv jammy && \ + . jammy/bin/activate && \ + git clone https://github.com/SynBioDex/SBOLExplorer.git && \ cd SBOLExplorer/flask && \ pip install -r requirements.txt && \ crontab update.cron @@ -28,5 +40,4 @@ RUN mkdir /mnt/config && \ ADD config-synbiohub.json /mnt/config/config.json -CMD "./start.sh" - +CMD sh -c ". ../../jammy/bin/activate && ./start.sh" diff --git a/flask/docker/docker-compose.yml b/flask/docker/docker-compose.yml index b57a530..9af118a 100644 --- a/flask/docker/docker-compose.yml +++ b/flask/docker/docker-compose.yml @@ -1,7 +1,7 @@ version: "3" services: sbolexplorer: - image: michael13162/sbolexplorer:latest + image: myersresearchgroup/sbolexplorer:snapshot ports: - "13162:13162" depends_on: diff --git a/flask/elasticsearchManager.py b/flask/elasticsearchManager.py new file mode 100644 index 0000000..985dc94 --- /dev/null +++ b/flask/elasticsearchManager.py @@ -0,0 +1,17 @@ +from elasticsearch import Elasticsearch + +class ElasticsearchManager: + def __init__(self, config_manager): + self.config_manager = config_manager + self._es = None + + def get_es(self): + """ + Gets an instance of elasticsearch + Returns: The instance of elasticsearch + """ + if self._es is None: + self._es = Elasticsearch([self.config_manager.get_es_endpoint()], verify_certs=True) + if not self._es.ping(): + raise ValueError('Elasticsearch connection failed') + return self._es \ No newline at end of file diff --git a/flask/explorer.py b/flask/explorer.py index 7401d7e..d32129f 100644 --- a/flask/explorer.py +++ b/flask/explorer.py @@ -1,150 +1,148 @@ #!/usr/bin/python3 -from flask import Flask, request, jsonify, abort +from flask import Flask, request, jsonify, abort, render_template from werkzeug.exceptions import HTTPException - import os import traceback import logging +import threading +import time import cluster import pagerank import index import search -import utils import query +from configManager import ConfigManager +from dataManager import DataManager +from elasticsearchManager import ElasticsearchManager +from logger import Logger -import threading -import time +# Configure logging, This will affect all loggers in your application, not just the Werkzeug logger. log = logging.getLogger('werkzeug') log.setLevel(logging.ERROR) +config_manager = ConfigManager() +data_manager = DataManager() +elasticsearch_manager = ElasticsearchManager(config_manager) +logger_ = Logger() + app = Flask(__name__) + +# Error handler @app.errorhandler(Exception) def handle_error(e): - utils.log('[ERROR] Returning error ' + str(e) + "\n Traceback:\n" + traceback.format_exc()) - + log.error(f'[ERROR] Returning error {e}\n Traceback:\n{traceback.format_exc()}') if isinstance(e, HTTPException): return jsonify(error=str(e.name + ": " + e.description)), e.code - else: - return jsonify(error=str(type(e).__name__) + str(e)), 500 + return jsonify(error=str(type(e).__name__) + str(e)), 500 -@app.before_first_request +@app.before_request def startup(): - # Method for running auto indexing def auto_update_index(): + update_interval = int(config_manager.load_config().get('updateTimeInDays', 0)) * 86400 while True: - time.sleep(int(utils.get_config()['updateTimeInDays']) * 86400) - if utils.get_config()['autoUpdateIndex'] and utils.get_config()['updateTimeInDays'] > 0: - utils.log('Updating index automatically. To disable, set the \"autoUpdateIndex\" property in config.json to false.') - update() + time.sleep(update_interval) + # Implement your update logic here + if config_manager.load_config().get('autoUpdateIndex', False): + update_index() - # Thread for automatically updaing the index periodically + # Start the background thread for auto-updating the index update_thread = threading.Thread(target=auto_update_index, daemon=True) update_thread.start() - if os.path.exists('log.txt') and os.path.getsize('log.txt') > 20000000: # Delete the log if it is > 20 MB - os.remove('log.txt') - - if os.path.exists('indexing_log.txt') and os.path.getsize('indexing_log.txt') > 20000000: # Delete the log if it is > 20 MB - os.remove('indexing_log.txt') + # Manage log file sizes + for log_file in ['log.txt', 'indexing_log.txt']: + if os.path.exists(log_file) and os.path.getsize(log_file) > 20000000: # 20 MB + os.remove(log_file) - utils.log('SBOLExplorer started :)') + logger_.log('SBOLExplorer started :)') + # Check and create index if necessary try: - if utils.get_es().indices.exists(index=utils.get_config()['elasticsearch_index_name']) is False: - utils.log('Index not found, creating new index.') - update() - except: + es = elasticsearch_manager.get_es() + index_name = config_manager.load_config().get('elasticsearch_index_name') + if not es.indices.exists(index=index_name): + logger_.log('Index not found, creating new index.') + update_index() + except Exception as e: + log.error(f'Error during startup: {e}') raise -@app.errorhandler(Exception) -def handle_error(e): - utils.log('[ERROR] Returning error ' + str(e) + "\n Traceback:\n" + traceback.format_exc()) - return jsonify(error=str(e)), 500 +def update_index(): + logger_.log('============ STARTING INDEXING ============\n\n', True) + config_manager.save_update_start_time() + clusters = cluster.update_clusters() + data_manager.save_clusters(clusters) + + uri2rank = pagerank.update_pagerank() + data_manager.save_uri2rank(uri2rank) + + index.update_index(data_manager.get_uri2rank()) + + query.memoized_query_sparql.cache_clear() + logger_.log('Cache cleared', True) + + config_manager.save_update_end_time() + logger_.log('============ INDEXING COMPLETED ============\n\n', True) @app.route('/info', methods=['GET']) def info(): - utils.log('Explorer up!!! Virtutoso ' + str(query.memoized_query_sparql.cache_info())) - return utils.get_log() + logger_.log('Explorer up!!! Virtuoso ' + str(query.memoized_query_sparql.cache_info())) + return logger_.get_log() @app.route('/indexinginfo', methods=['GET']) def indexinginfo(): - return utils.get_indexing_log() + return logger_.get_indexing_log() @app.route('/config', methods=['POST', 'GET']) -def config(): +def config_route(): if request.method == 'POST': new_config = request.get_json() - utils.set_config(new_config) - utils.log('Successfully updated config') - - config = utils.get_config() - return jsonify(config) + config_manager.save_config(new_config) + logger_.log('Successfully updated config') + return jsonify(config_manager.load_config()) @app.route('/update', methods=['GET']) def update(): try: subject = request.args.get('subject') - - if subject is None: - utils.log_indexing('============ STARTING INDEXING ============\n\n') - utils.log('============ STARTING INDEXING ============\n\n') - utils.save_update_start_time() - - clusters = cluster.update_clusters() - utils.save_clusters(clusters) - - - uri2rank = pagerank.update_pagerank() - utils.save_uri2rank(uri2rank) - - index.update_index(utils.get_uri2rank()) - - query.memoized_query_sparql.cache_clear() - utils.log_indexing('Cache cleared') - - utils.save_update_end_time() - success_message = 'Successfully updated entire index' + if subject: + index.refresh_index(subject, data_manager.get_uri2rank()) + success_message = f'Successfully refreshed: {subject}' else: - index.refresh_index(subject, utils.get_uri2rank()) - success_message = 'Successfully refreshed: ' + subject - - utils.log_indexing('============ INDEXING COMPLETED ============\n\n') - utils.log('============ INDEXING COMPLETED ============\n\n') + update_index() + success_message = 'Successfully updated entire index' return success_message except Exception as e: - utils.log_indexing('[ERROR] Returning error ' + str(e) + "\n Traceback:\n" + traceback.format_exc()) - + log.error(f'Error during update: {e}') + raise @app.route('/incrementalupdate', methods=['POST']) def incremental_update(): try: updates = request.get_json() - - index.incremental_update(updates, utils.get_uri2rank()) - + index.incremental_update(updates, data_manager.get_uri2rank()) success_message = 'Successfully incrementally updated parts' - utils.log(success_message) - return - except: + logger_.log(success_message) + return success_message + except Exception as e: + log.error(f'Error during incremental update: {e}') raise - @app.route('/incrementalremove', methods=['GET']) def incremental_remove(): try: subject = request.args.get('subject') - index.incremental_remove(subject) - - success_message = 'Successfully incrementally removed: ' + subject - utils.log(success_message) + success_message = f'Successfully incrementally removed: {subject}' + logger_.log(success_message) return success_message - except: + except Exception as e: + log.error(f'Error during incremental remove: {e}') raise @app.route('/incrementalremovecollection', methods=['GET']) @@ -152,50 +150,62 @@ def incremental_remove_collection(): try: subject = request.args.get('subject') uri_prefix = request.args.get('uriPrefix') - index.incremental_remove_collection(subject, uri_prefix) - - success_message = 'Successfully incrementally removed collection and members: ' + subject - utils.log(success_message) + success_message = f'Successfully incrementally removed collection and members: {subject}' + logger_.log(success_message) return success_message - except: + except Exception as e: + log.error(f'Error during incremental remove collection: {e}') raise +@app.route('/test', methods=['GET']) +def SBOLExplore_test_endpoint(): + return render_template('index.html') @app.route('/', methods=['GET']) def sparql_search_endpoint(): try: - # make sure index is built, or throw exception - if utils.get_es().indices.exists(index=utils.get_config()['elasticsearch_index_name']) is False or utils.get_es().cat.indices(format='json')[0]['health'] is 'red': + es = elasticsearch_manager.get_es() + index_name = config_manager.load_config().get('elasticsearch_index_name') + if not es.indices.exists(index=index_name) or es.cat.indices(format='json')[0]['health'] == 'red': abort(503, 'Elasticsearch is not working or the index does not exist.') sparql_query = request.args.get('query') - - if sparql_query is not None: + if sparql_query: default_graph_uri = request.args.get('default-graph-uri') - response = jsonify(search.search(sparql_query, utils.get_uri2rank(), utils.get_clusters(), default_graph_uri)) + response = jsonify(search.search( + sparql_query, + data_manager.get_uri2rank(), + data_manager.get_clusters(), + default_graph_uri + )) return response - else: - return "

Welcome to SBOLExplorer!

The available indices in Elasticsearch are shown below:


"\ - + str(utils.get_es().cat.indices(format='json'))\ + return "

Welcome to SBOLExplorer!

The available indices in Elasticsearch are shown below:


"\ + + str(elasticsearch_manager.get_es().cat.indices(format='json'))\ + "

The config options are set to:


"\ - + str(utils.get_config())\ + + str(config_manager.load_config())\ + "



Visit our GitHub repository!"\ + "

Any issues can be reported to our issue tracker."\ + "

Used by SynBioHub." - except: + + except Exception as e: + log.error(f'Error during SPARQL search: {e}') raise @app.route('/search', methods=['GET']) def search_by_string(): try: - if utils.get_es().indices.exists(index=utils.get_config()['elasticsearch_index_name']) is False or utils.get_es().cat.indices(format='json')[0]['health'] is 'red': + es = elasticsearch_manager.get_es() + index_name = config_manager.load_config().get('elasticsearch_index_name') + if not es.indices.exists(index=index_name) or es.cat.indices(format='json')[0]['health'] == 'red': abort(503, 'Elasticsearch is not working or the index does not exist.') query = request.args.get('query') - response = jsonify(search.search_es(query)['hits']) - return response - except: + except Exception as e: + log.error(f'Error during search by string: {e}') raise + +if __name__ == "__main__": + app.run(debug=False, threaded=True) # threaded=True diff --git a/flask/index.py b/flask/index.py index 070ee7b..8724552 100644 --- a/flask/index.py +++ b/flask/index.py @@ -1,103 +1,94 @@ from elasticsearch import helpers -import utils +from configManager import ConfigManager +from elasticsearchManager import ElasticsearchManager import query import json +from logger import Logger +# Load config and initialize managers once +config_manager = ConfigManager() +config = config_manager.load_config() +elasticsearch_manager = ElasticsearchManager(config_manager) +logger_ = Logger() def add_pagerank(parts_response, uri2rank): """ - Adds the pagerank score for each part + Adds the pagerank score for each part. Arguments: parts_response {List} -- List containing all parts from the SPARQL query - uri2rank {List} -- List of each part and its calculated pagerank score + uri2rank {Dict} -- Dictionary of each part and its calculated pagerank score """ - for part in parts_response: - subject = part['subject'] - - if subject in uri2rank: - part['pagerank'] = uri2rank[subject] - else: - part['pagerank'] = 1 + part['pagerank'] = uri2rank.get(part['subject'], 1) def add_keywords(parts_response): """ - Adds the displayId to the 'keyword' category - + Adds the displayId to the 'keyword' category. + Arguments: parts_response {List} -- List containing all parts from the SPARQL query """ - for part in parts_response: - keywords = [] - - displayId = part.get('displayId') - if displayId is not None: - keywords.extend(displayId.split('_')) + display_id = part.get('displayId') + if display_id: + part['keywords'] = ' '.join(display_id.split('_')) + else: + part['keywords'] = '' - part['keywords'] = ' '.join(keywords) -def add_roles(parts_response): +def add_roles(parts_response, term_list): """ - Adds the synonyms from the SO-Ontologies list to each part's keyword category - + Adds the synonyms from the SO-Ontologies list to each part's keyword category. + Arguments: parts_response {List} -- List containing all parts from the SPARQL query + term_list {List} -- List of terms from the SO-Ontologies """ - with open('so-simplified.json','r') as so_json: - term_list = json.load(so_json) - - for part in parts_response: - # Split the CSV of roles from sparql - role = part.get('role') + for part in parts_response: + # Split the CSV of roles from sparql + role = part.get('role') + if role and 'identifiers.org' in role: + keywords_list = [] + so_term = role[-10:].replace(':','_') + + for term in term_list: + if so_term in term['id']: + keywords_list.append(term['lbl']) + synonyms = term.get('synonyms', []) + if synonyms: + for synonym in synonyms: + # remove the annoying header from the synonyms + if 'INSDC' in synonym: + synonym = synonym.replace('INSDC_qualifier:', '') + if synonym not in keywords_list: + keywords_list.append(synonym) + + part['keywords'] += ' ' + ' '.join(keywords_list) - if role is not None and 'identifiers.org' in role: - keywords_list = [] - so_term = role[-10:] - so_term = so_term.replace(':','_') - - for term in term_list: - if so_term in term['id']: - keywords_list.append(term['lbl']) - - if 'synonyms' in term and term['synonyms'] is not None: - for synonym in term['synonyms']: - - # remove the annoying header from the synonyms - if 'INSDC' in synonym: - synonym = synonym.replace('INSDC_qualifier:', '') - - if synonym not in keywords_list: - keywords_list.append(synonym) - - for keyword in keywords_list: - part['keywords'] += ' ' + keyword def add_sbol_type(parts_response): for part in parts_response: sbol_type = part.get('sboltype') + if sbol_type and 'http://www.biopax.org/release/biopax-level3.owl#' in sbol_type: + type_ = sbol_type[48:] + if 'region' in type_: + type_ = type_.replace('Region','') + part['keywords'] += ' ' + type_ - if sbol_type is not None and 'http://www.biopax.org/release/biopax-level3.owl#' in sbol_type: - type = sbol_type[48:] - - if 'region' in type: - type = type.replace('Region','') - - part['keywords'] += ' ' + type def create_parts_index(index_name): """ - Creates a new index + Creates a new index. Arguments: index_name {String} -- Name of the new index """ - - if utils.get_es().indices.exists(index_name): - utils.log_indexing('Index already exists -> deleting') - utils.get_es().indices.delete(index=index_name) + es = elasticsearch_manager.get_es() + if es.indices.exists(index_name): + logger_.log('Index already exists -> deleting', True) + es.indices.delete(index=index_name) body = { 'mappings': { @@ -112,86 +103,83 @@ def create_parts_index(index_name): }, } }, - "settings": { - "number_of_shards": 1 + 'settings': { + 'number_of_shards': 1 } - } - utils.get_es().indices.create(index=index_name, body=body) - utils.log_indexing('Index created') + es.indices.create(index=index_name, body=body) + + logger_.log('Index created', True) def bulk_index_parts(parts_response, index_name): """ - Adds each part as a document to the index - + Adds each part as a document to the index. + Arguments: parts_response {List} -- List containing all parts from the SPARQL query index_name {String} -- Name of the index - - Raises: - Exception -- Indexing fails """ + es = elasticsearch_manager.get_es() + + def actions(): + for part in parts_response: + yield { + '_index': index_name, + '_type': index_name, + '_id': part['subject'], + '_source': part + } - actions = [] - for i in range(len(parts_response)): - action = { - '_index': index_name, - '_type': index_name, - '_id': parts_response[i].get('subject'), - '_source': parts_response[i] - } - - actions.append(action) - - utils.log_indexing('Bulk indexing') + logger_.log('Bulk indexing', True) try: - stats = helpers.bulk(utils.get_es(), actions) - utils.log_indexing('Bulk indexing complete') - except: - utils.log_indexing('[ERROR] Error_messages: ' + '\n'.join(stats[1])) - raise Exception("Bulk indexing failed") + stats = helpers.bulk(es, actions()) + logger_.log('Bulk indexing complete', True) + except Exception as e: + logger_.log(f'[ERROR] Error during bulk indexing: {str(e)}' + '\n'.join(stats[1]), True) + raise + def update_index(uri2rank): """ - Main method - Args: - uri2rank: List of pageranks for each URI - - Returns: + Main method to update the index. + Args: + uri2rank: Dictionary of pageranks for each URI """ - index_name = utils.get_config()['elasticsearch_index_name'] + index_name = config['elasticsearch_index_name'] - utils.log_indexing('------------ Updating index ------------') + logger_.log('------------ Updating index ------------', True) + logger_.log('******** Query for parts ********', True) + parts_response = query.query_parts(indexing=True) + logger_.log('******** Query for parts complete ********', True) - utils.log_indexing('******** Query for parts ********') - parts_response = query.query_parts(indexing = True) - utils.log_indexing('******** Query for parts complete ********') - - utils.log_indexing('******** Adding parts to new index ********') + logger_.log('******** Adding parts to new index ********', True) add_pagerank(parts_response, uri2rank) add_keywords(parts_response) - add_roles(parts_response) + + # Load the SO-Ontologies list once + with open('so-simplified.json', 'r') as so_json: + term_list = json.load(so_json) + add_roles(parts_response, term_list) + add_sbol_type(parts_response) create_parts_index(index_name) bulk_index_parts(parts_response, index_name) - utils.log_indexing('******** Finished adding ' + str(len(parts_response)) + ' parts to index ********') - - utils.log_indexing('------------ Successfully updated index ------------\n') + logger_.log(f'******** Finished adding {len(parts_response)} parts to index ********', True) + logger_.log('------------ Successfully updated index ------------\n', True) def delete_subject(subject): """ - Delete part for incremental indexing - Args: - subject: - - Returns: + Delete part for incremental indexing. + Args: + subject: The subject to delete from the index. """ - index_name = utils.get_config()['elasticsearch_index_name'] + index_name = config['elasticsearch_index_name'] + es = elasticsearch_manager.get_es() body = { 'query': { @@ -203,19 +191,19 @@ def delete_subject(subject): }, 'conflicts': 'proceed' } - utils.get_es().delete_by_query(index=index_name, doc_type=index_name, body=body) + es.delete_by_query(index=index_name, doc_type=index_name, body=body) def index_part(part): delete_subject(part['subject']) - index_name = utils.get_config()['elasticsearch_index_name'] - utils.get_es().index(index=index_name, doc_type=index_name, id=part['subject'], body=part) + index_name = config['elasticsearch_index_name'] + es = elasticsearch_manager.get_es() + es.index(index=index_name, doc_type=index_name, id=part['subject'], body=part) def refresh_index(subject, uri2rank): delete_subject(subject) - - part_response = query.query_parts('', 'FILTER (?subject = <' + subject + '>)', True) + part_response = query.query_parts('', f'FILTER (?subject = <{subject}>)', True) if len(part_response) == 1: add_pagerank(part_response, uri2rank) @@ -241,18 +229,16 @@ def incremental_remove(subject): def incremental_remove_collection(subject, uri_prefix): - collection_membership_query = ''' + collection_membership_query = f''' SELECT ?s - WHERE { - <''' + subject + '''> sbol2:member ?s . - FILTER(STRSTARTS(str(?s),''' + "'" + uri_prefix + "'" + ''')) - } + WHERE {{ + <{subject}> sbol2:member ?s . + FILTER(STRSTARTS(str(?s), '{uri_prefix}')) + }} ''' members = query.query_sparql(collection_membership_query) delete_subject(subject) for member in members: delete_subject(member['s']) - - diff --git a/flask/logger.py b/flask/logger.py new file mode 100644 index 0000000..00c7259 --- /dev/null +++ b/flask/logger.py @@ -0,0 +1,39 @@ +import datetime +import os +class Logger: + def __init__(self, log_file='log.txt', indexing_log_file='indexing_log.txt'): + self.log_file = log_file + self.indexing_log_file = indexing_log_file + + def log(self, message, to_indexing_log=False): + """ + Writes a message to the log + Args: + message: Message to write + + Returns: + """ + log_message = f'[{datetime.datetime.now().isoformat()}] {message}\n' + print(log_message, end='') # Output to console + + file = self.indexing_log_file if to_indexing_log else self.log_file + with open(file, 'a+') as f: + f.write(log_message) + + def get_log(self): + """ + Gets a copy of the log + Returns: Stream from the read() method + + """ + return self._read_file(self.log_file) + + def get_indexing_log(self): + return self._read_file(self.indexing_log_file) + + @staticmethod + def _read_file(filename): + if os.path.exists(filename): + with open(filename, 'r') as f: + return f.read() + return "" diff --git a/flask/pagerank.py b/flask/pagerank.py index 816c9dd..ccd3040 100644 --- a/flask/pagerank.py +++ b/flask/pagerank.py @@ -1,8 +1,10 @@ -from xml.etree import ElementTree import numpy as np -import utils import query +from logger import Logger +from configManager import ConfigManager +config_manager = ConfigManager() +logger_ = Logger() link_query = ''' SELECT DISTINCT ?parent ?child @@ -22,84 +24,34 @@ } ''' - -class graph: - # create uri to index mapping - def init_mapping(self, adjacency_list): - uris = set() - for parent in adjacency_list: - uris.add(parent) - for child in adjacency_list[parent]: - uris.add(child) - - self.index2uri = list(uris) - self.uri2index = {} - - for i in range(len(self.index2uri)): - uri = self.index2uri[i] - self.uri2index[uri] = i - - # assert mappings are correct - for i in range(len(self.index2uri)): - uri = self.index2uri[i] - index = self.uri2index[uri] - assert(index == i) - - - def init_in_links(self, adjacency_list): - for j in range(self.size): - self.in_links[j] = [] - - for parent in adjacency_list: - for child in adjacency_list[parent]: - parent_idx = self.uri2index[parent] - child_idx = self.uri2index[child] - self.in_links[child_idx].append(parent_idx) - - - def init_number_out_links(self, adjacency_list): - for j in range(self.size): - self.number_out_links[j] = 0 - - for parent in adjacency_list: - parent_idx = self.uri2index[parent] - number_children = len(adjacency_list[parent]) - self.number_out_links[parent_idx] = number_children - - - def init_dangling_pages(self, adjacency_list): - for parent in adjacency_list: - number_children = len(adjacency_list[parent]) - if number_children == 0: - self.dangling_pages.add(self.uri2index[parent]) - - +class Graph: def __init__(self, adjacency_list): - self.index2uri = [] - self.uri2index = {} - self.init_mapping(adjacency_list) - + self.uri2index = {uri: idx for idx, uri in enumerate(adjacency_list)} + self.index2uri = list(adjacency_list.keys()) self.size = len(self.index2uri) - - self.in_links = {} - self.init_in_links(adjacency_list) - - self.number_out_links = {} - self.init_number_out_links(adjacency_list) - + + self.in_links = {_:[] for _ in range(self.size)} + self.number_out_links = {_:0 for _ in range(self.size)} self.dangling_pages = set() - self.init_dangling_pages(adjacency_list) + for parent, children in adjacency_list.items(): + parent_idx = self.uri2index[parent] + if children: + self.number_out_links[parent_idx] = len(children) + for child in children: + child_idx = self.uri2index[child] + self.in_links[child_idx].append(parent_idx) + else: + self.dangling_pages.add(parent_idx) -# add uris as keys to adjacency_list -def populate_uris(uri_response): - adjacency_list = {} + def get_dangling_contrib(self, p): + return sum([p[j] for j in self.dangling_pages]) / self.size - for uri in uri_response: - adjacency_list[uri['subject']] = set() - - return adjacency_list + def get_teleportation_contrib(self): + return 1.0 / self.size +def populate_uris(uri_response): + return {uri['subject']: set() for uri in uri_response} # add edges def populate_links(link_response, adjacency_list): @@ -108,74 +60,58 @@ def populate_links(link_response, adjacency_list): adjacency_list[link['parent']].add(link['child']) except: raise - def pagerank(g, s=0.85, tolerance=0.001): n = g.size - p = np.matrix(np.ones((n, 1))) / n - + p = np.ones(n) / n # Initial probability distribution vector if n == 0: - utils.log_indexing('no iterations: empty graph') + logger_.log('no iterations: empty graph', True) return p iteration = 1 delta = 2 - - while delta > tolerance: - v = np.matrix(np.zeros((n, 1))) - dangling_contrib = sum([p[j] for j in g.dangling_pages]) / n - teleportation_contrib = 1 / n - + while delta > tolerance: + v = np.zeros(n) + dangling_contrib = g.get_dangling_contrib(p) + teleportation_contrib = g.get_teleportation_contrib() + for j in range(n): - link_contrib = sum([p[k] / g.number_out_links[k] for k in g.in_links[j]]) - v[j] = s * link_contrib + s * dangling_contrib + (1 - s) * teleportation_contrib - new_p = v / np.sum(v) - - delta = np.sum(np.abs(p - new_p)) - utils.log_indexing('Iteration ' + str(iteration) + ': L1 norm delta is ' + str(delta)) + in_link_contrib = np.sum(p[k] / g.number_out_links[k] for k in g.in_links[j]) + v[j] = s * (in_link_contrib + dangling_contrib) + (1 - s) * teleportation_contrib + + v /= np.sum(v) + delta = np.sum(np.abs(p - v)) + logger_.log(f'Iteration {iteration}: L1 norm delta is {delta}', True) - p = new_p + p = v iteration += 1 - - return p + return p def make_uri2rank(pr_vector, uri2index): - uri2rank = {} - - try: - for uri in uri2index: - uri2rank[uri] = pr_vector[uri2index[uri]] - except: - raise - - return uri2rank - + return {uri: pr_vector[idx] for uri, idx in uri2index.items()} def update_pagerank(): - utils.log_indexing('------------ Updating pagerank ------------') - utils.log_indexing('******** Query for uris ********') + logger_.log('------------ Updating pagerank ------------', True) + logger_.log('******** Query for uris ********', True) uri_response = query.query_sparql(uri_query) - utils.log_indexing('******** Query for uris complete ********') + logger_.log('******** Query for uris complete ********', True) + adjacency_list = populate_uris(uri_response) - utils.log_indexing('******** Query for links ********') + logger_.log('******** Query for links ********', True) link_response = query.query_sparql(link_query) - utils.log_indexing('******** Query for links complete ********') + logger_.log('******** Query for links complete ********', True) + populate_links(link_response, adjacency_list) - g = graph(adjacency_list) - utils.log_indexing('******** Running pagerank ********') - pr = pagerank(g, tolerance=float(utils.get_config()['pagerank_tolerance'])) - utils.log_indexing('******** Running pagerank complete ********') - utils.log_indexing('------------ Successfully updated pagerank ------------\n') - pr_vector = np.squeeze(np.asarray(pr)) + g = Graph(adjacency_list) - # after squeeze, make sure it at least has a dimension in the case that there is only one element - if pr_vector.shape == (): - pr_vector = np.array([pr_vector]) + logger_.log('******** Running pagerank ********', True) + pr_vector = pagerank(g, tolerance=float(config_manager.load_config()['pagerank_tolerance'])) + logger_.log('******** Running pagerank complete ********', True) + logger_.log('------------ Successfully updated pagerank ------------\n', True) return make_uri2rank(pr_vector, g.uri2index) - diff --git a/flask/query.py b/flask/query.py old mode 100755 new mode 100644 index 75c53d1..9f00e1b --- a/flask/query.py +++ b/flask/query.py @@ -1,23 +1,29 @@ import requests import urllib.parse from functools import lru_cache -import json -import utils +from wor_client import WORClient import re +from configManager import ConfigManager +from logger import Logger +# Load config once and reuse +config_manager = ConfigManager() +config = config_manager.load_config() -def query_parts(_from = '', criteria = '', indexing = False): +logger_ = Logger() +wor_client_ = WORClient() + +def query_parts(_from='', criteria='', indexing=False): """ - Gets all parts from Virtuoso + Gets all parts from Virtuoso. Args: _from: Graph the parts are from criteria: Any additional criteria indexing: Whether this query is being called during indexing Returns: Formatted list of all parts from Virtuoso - """ - query = ''' + query = f''' SELECT DISTINCT ?subject ?displayId @@ -28,117 +34,85 @@ def query_parts(_from = '', criteria = '', indexing = False): ?graph ?role ?sboltype - ''' + _from + ''' - WHERE { - ''' + criteria + ''' + {_from} + WHERE {{ + {criteria} ?subject a ?type . - ?subject sbh:topLevel ?subject .''' + ('''\n GRAPH ?graph { ?subject ?a ?t } .''' if indexing else "") + ''' - OPTIONAL { ?subject sbol2:displayId ?displayId . } - OPTIONAL { ?subject sbol2:version ?version . } - OPTIONAL { ?subject dcterms:title ?name . } - OPTIONAL { ?subject dcterms:description ?description . } - OPTIONAL { ?subject sbol2:role ?role . } - OPTIONAL { ?subject sbol2:type ?sboltype . } - } + ?subject sbh:topLevel ?subject . + {("GRAPH ?graph { ?subject ?a ?t } ." if indexing else "")} + OPTIONAL {{ ?subject sbol2:displayId ?displayId . }} + OPTIONAL {{ ?subject sbol2:version ?version . }} + OPTIONAL {{ ?subject dcterms:title ?name . }} + OPTIONAL {{ ?subject dcterms:description ?description . }} + OPTIONAL {{ ?subject sbol2:role ?role . }} + OPTIONAL {{ ?subject sbol2:type ?sboltype . }} + }} ''' - return memoized_query_sparql(query) - @lru_cache(maxsize=32) def memoized_query_sparql(query): """ - Speeds up SPARQL queries using a LRU cache + Speeds up SPARQL queries using a LRU cache. Args: query: SPARQL Query Returns: Results of the SPARQL query - """ return query_sparql(query) - def query_sparql(query): """ - Query instances of Virtuoso + Query instances of Virtuoso. Args: query: SPARQL query - Returns: - + Returns: Deduplicated results of the SPARQL query """ - endpoints = [utils.get_config()['sparql_endpoint']] + endpoints = [config['sparql_endpoint']] - if utils.get_config()['distributed_search']: - instances = utils.get_wor() - for instance in instances: - endpoints.append(instance['instanceUrl'] + '/sparql?') + if config.get('distributed_search'): + instances = wor_client_.get_wor_instance() + endpoints.extend(instance['instanceUrl'] + '/sparql?' for instance in instances) results = [] for endpoint in endpoints: try: results.extend(page_query(query, endpoint)) - except: - utils.log('[ERROR] failed querying:' + endpoint) - raise Exception("Endpoint not responding") + except Exception as e: + logger_.log(f'[ERROR] failed querying: {endpoint} - {str(e)}') + continue return deduplicate_results(results) - def deduplicate_results(results): """ - Removes duplicates from all SPARQL queries to various Virtuoso instances + Removes duplicates from all SPARQL queries to various Virtuoso instances. Args: results: List of results which may contain duplicates Returns: Deduplicated list of results - """ - deduped = set() - + seen = set() + deduped = [] for result in results: - deduped.add(json.dumps(result, sort_keys=True)) - - return [json.loads(result) for result in deduped] - + result_tuple = tuple(sorted(result.items())) + if result_tuple not in seen: + seen.add(result_tuple) + deduped.append(result) + return deduped def page_query(query, endpoint): """ - Query to get results from a particular page in SynBioHub + Query to get results from a particular page in SynBioHub. Args: query: Query to run endpoint: Virtuoso endpoint to hit Returns: List of parts - """ - utils.log('Current endpoint: ' + endpoint) - - bar = [ - "[ ]", - "[= ]", - "[=== ]", - "[==== ]", - "[===== ]", - "[====== ]", - "[======= ]", - "[========]", - "[ =======]", - "[ ======]", - "[ =====]", - "[ ====]", - "[ ===]", - "[ ==]", - "[ =]", - "[ ]", - "[ ]" - ] - bar_counter = 0 - - if endpoint != utils.get_config()['sparql_endpoint']: - query = re.sub(r'''FROM.*\n''', '', query) - + logger_.log(f'Current endpoint: {endpoint}') query_prefix = ''' PREFIX rdf: PREFIX dcterms: @@ -156,61 +130,50 @@ def page_query(query, endpoint): offset = 0 limit = 10000 - results = [] - while True: - print(bar[bar_counter % len(bar)], end="\r") - bar_counter+= 1 + if endpoint != config['sparql_endpoint']: + query = re.sub(r'''FROM.*\n''', '', query) - full_query = query_prefix + query + 'OFFSET ' + str(offset) + ' LIMIT ' + str(limit) + while True: + full_query = f"{query_prefix} {query} OFFSET {offset} LIMIT {limit}" new_results = send_query(full_query, endpoint) results.extend(new_results) - if len(new_results) != limit: + if len(new_results) < limit: break offset += limit return results - def send_query(query, endpoint): """ - Sends a query to Virtuoso + Sends a query to Virtuoso. Args: query: Query to be sent endpoint: Endpoint where Virtuoso resides Returns: List of parts from Virtuoso - """ params = {'query': query} - if endpoint == utils.get_config()['sparql_endpoint']: - params['default-graph-uri'] = '' # utils.get_config()['synbiohub_public_graph'] + if endpoint == config['sparql_endpoint']: + params['default-graph-uri'] = '' # Modify this if needed - url = endpoint + urllib.parse.urlencode(params) + url = f"{endpoint}{urllib.parse.urlencode(params)}" headers = {'Accept': 'application/json'} try: r = requests.get(url, headers=headers) - except Exception as e: - utils.log("[ERROR] exception when connecting: " + str(e)) + r.raise_for_status() # Raises an error for bad HTTP responses + except requests.RequestException as e: + logger_.log(f"[ERROR] exception when connecting: {str(e)}") raise Exception("Local SynBioHub isn't responding") - if r.status_code != 200: - utils.log('[ERROR] Got status code when querying: ' + str(r.status_code)) - utils.log(r.text) - raise Exception(url + ' is not responding') - - results = [] - - for binding in r.json()['results']['bindings']: - result = {} - for key in binding: - result[key] = binding[key]['value'] - results.append(result) + results = [ + {key: binding[key]['value'] for key in binding} + for binding in r.json()['results']['bindings'] + ] return results - diff --git a/flask/requirements.txt b/flask/requirements.txt index 41a7f8e..828e708 100644 --- a/flask/requirements.txt +++ b/flask/requirements.txt @@ -1,17 +1,18 @@ certifi==2018.4.16 chardet==3.0.4 -click==6.7 +click==8.1.7 elasticsearch==6.3.0 elasticsearch-dsl==6.1.0 -Flask==1.0.2 +Flask==3.0.3 idna==2.7 ipaddress==1.0.22 -itsdangerous==0.24 -Jinja2==2.10 -MarkupSafe==1.0 -numpy==1.14.5 +itsdangerous==2.2.0 +Jinja2==3.1.4 +MarkupSafe==2.1.5 +numpy==2.1.1 python-dateutil==2.7.3 requests==2.19.1 six==1.11.0 urllib3==1.23 -Werkzeug==0.14.1 +Werkzeug==3.0.4 +apscheduler==3.10.4 diff --git a/flask/search.py b/flask/search.py index dd6a272..2e314ed 100644 --- a/flask/search.py +++ b/flask/search.py @@ -1,18 +1,35 @@ import re -import utils +from typing import List, Dict, Tuple, Optional import query import sequencesearch +from wor_client import WORClient +from elasticsearchManager import ElasticsearchManager +from configManager import ConfigManager +from logger import Logger +config_manager = ConfigManager() +elasticsearch_manager = ElasticsearchManager(config_manager) +logger_ = Logger() +wor_client_ = WORClient() -def search_es(es_query): +# Compile regex patterns +FROM_COUNT_PATTERN = re.compile(r'SELECT \(count\(distinct \?subject\) as \?tempcount\)\s*(.*)\s*WHERE {') +FROM_NORMAL_PATTERN = re.compile(r'\?type\n(.*)\s*WHERE {') +CRITERIA_PATTERN = re.compile(r'WHERE {\s*(.*)\s*\?subject a \?type \.') +OFFSET_PATTERN = re.compile(r'OFFSET (\d+)') +LIMIT_PATTERN = re.compile(r'LIMIT (\d+)') +SEQUENCE_PATTERN = re.compile(r'\s*\?subject sbol2:sequence \?seq \.\s*\?seq sbol2:elements \"([a-zA-Z]*)\"') +FLAG_PATTERN = re.compile(r'# flag_([a-zA-Z0-9._]*): ([a-zA-Z0-9./-_]*)') +KEYWORD_PATTERN = re.compile(r"CONTAINS\(lcase\(\?displayId\), lcase\('([^']*)'\)\)") + + +def extract_offset(sparql_query): + offset_match = OFFSET_PATTERN.search(sparql_query) + return int(offset_match.group(1)) if offset_match else 0 + +def search_es(es_query: str) -> Dict: """ - String query for ES searches - - Arguments: - es_query {string} -- String to search for - - Returns: - List -- List of all search results + String query for ES searches. """ body = { 'query': { @@ -44,15 +61,14 @@ def search_es(es_query): 'size': 10000 } try: - return utils.get_es().search(index=utils.get_config()['elasticsearch_index_name'], body=body) + return elasticsearch_manager.get_es().search(index=config_manager.load_config()['elasticsearch_index_name'], body=body) except: + logger_.log("search_es(es_query: str)") raise - -def empty_search_es(offset, limit, allowed_graphs): +def empty_search_es(offset: int, limit: int, allowed_graphs: List[str]) -> Dict: """ - Empty string search based solely on pagerank - + Empty string search based solely on pagerank. Arguments: offset {int} -- Offset for search results limit {int} -- Size of search @@ -61,10 +77,7 @@ def empty_search_es(offset, limit, allowed_graphs): Returns: List -- List of search results """ - if len(allowed_graphs) == 1: - query = {'term': {'graph': allowed_graphs[0]}} - else: - query = {'terms': {'graph': allowed_graphs}} + query = {'term': {'graph': allowed_graphs[0]}} if len(allowed_graphs) == 1 else {'terms': {'graph': allowed_graphs}} body = { 'query': { @@ -81,15 +94,14 @@ def empty_search_es(offset, limit, allowed_graphs): 'size': limit } try: - return utils.get_es().search(index=utils.get_config()['elasticsearch_index_name'], body=body) + return elasticsearch_manager.get_es().search(index=config_manager.load_config()['elasticsearch_index_name'], body=body) except: + logger_.log("empty_search_es(offset: int, limit: int, allowed_graphs: List[str])") raise - -def search_es_allowed_subjects(es_query, allowed_subjects): +def search_es_allowed_subjects(es_query: str, allowed_subjects: List[str]) -> Dict: """ - String query for ES searches limited to allowed parts - + String query for ES searches limited to allowed parts. Arguments: es_query {string} -- String to search for allowed_subjects {list} - list of allowed subjects from Virtuoso @@ -107,7 +119,7 @@ def search_es_allowed_subjects(es_query, allowed_subjects): 'query': es_query, 'fields': [ 'subject', - 'displayId^3', # caret indicates displayId is 3 times as important during search + 'displayId^3', 'version', 'name', 'description', @@ -123,26 +135,23 @@ def search_es_allowed_subjects(es_query, allowed_subjects): }, 'script_score': { 'script': { - 'source': "_score * Math.log(doc['pagerank'].value + 1)" # Math.log is a natural log + 'source': "_score * Math.log(doc['pagerank'].value + 1)" } }, - }, - }, 'from': 0, 'size': 10000 } try: - return utils.get_es().search(index=utils.get_config()['elasticsearch_index_name'], body=body) + return elasticsearch_manager.get_es().search(index=config_manager.load_config()['elasticsearch_index_name'], body=body) except: + logger_.log("search_es_allowed_subjects(es_query: str, allowed_subjects: List[str])") raise - -def search_es_allowed_subjects_empty_string(allowed_subjects): +def search_es_allowed_subjects_empty_string(allowed_subjects: List[str]): """ - ES search purely limited to allowed parts - + ES search purely limited to allowed parts. Arguments: allowed_subjects {list} - list of allowed subjects from Virtuoso @@ -161,21 +170,51 @@ def search_es_allowed_subjects_empty_string(allowed_subjects): }, 'script_score': { 'script': { - 'source': "_score * Math.log(doc['pagerank'].value + 1)" # Math.log is a natural log + 'source': "_score * Math.log(doc['pagerank'].value + 1)" } }, - }, - }, 'from': 0, 'size': 10000 } try: - return utils.get_es().search(index=utils.get_config()['elasticsearch_index_name'], body=body) + return elasticsearch_manager.get_es().search(index=config_manager.load_config()['elasticsearch_index_name'], body=body) except: + logger_.log("search_es_allowed_subjects_empty_string") raise +def parse_sparql_query(sparql_query, is_count_query): + # Find FROM clause + _from_search = FROM_COUNT_PATTERN.search(sparql_query) if is_count_query else FROM_NORMAL_PATTERN.search(sparql_query) + _from = _from_search.group(1).strip() if _from_search else '' + + # Find criteria + criteria_search = CRITERIA_PATTERN.search(sparql_query) + criteria = criteria_search.group(1).strip() if criteria_search else '' + + # Find offset + offset_match = OFFSET_PATTERN.search(sparql_query) + offset = int(offset_match.group(1)) if offset_match else 0 + + # Find limit + limit_match = LIMIT_PATTERN.search(sparql_query) + limit = int(limit_match.group(1)) if limit_match else 50 + + # Find sequence + sequence_match = SEQUENCE_PATTERN.search(sparql_query) + sequence = sequence_match.group(1) if sequence_match else '' + + # Extract flags + flags = {match.group(1): match.group(2) for match in FLAG_PATTERN.finditer(sparql_query)} + # Extract keywords + keywords = KEYWORD_PATTERN.findall(criteria) + + # Construct es_query + es_query = ' '.join(keywords).strip() + #print("Hello es_query: ", es_query) + + return es_query, _from, criteria, offset, limit, sequence, flags def extract_query(sparql_query): """ @@ -187,54 +226,11 @@ def extract_query(sparql_query): Returns: List -- List of information extracted """ - _from = '' - if is_count_query(sparql_query): - _from_search = re.search(r'''SELECT \(count\(distinct \?subject\) as \?tempcount\)\s*(.*)\s*WHERE {''', - sparql_query) - else: - _from_search = re.search(r'''\?type\n(.*)\s*WHERE {''', sparql_query) - if _from_search: - _from = _from_search.group(1).strip() - - criteria = '' - criteria_search = re.search(r'''WHERE {\s*(.*)\s*\?subject a \?type \.''', sparql_query) - if criteria_search: - criteria = criteria_search.group(1).strip() - - offset = 0 - offset_search = re.search(r'''OFFSET (\d*)''', sparql_query) - if offset_search: - offset = int(offset_search.group(1)) - - limit = 50 - limit_search = re.search(r'''LIMIT (\d*)''', sparql_query) - if limit_search: - limit = int(limit_search.group(1)) - - sequence = '' - sequence_search = re.search(r'''\s*\?subject sbol2:sequence \?seq \.\s*\?seq sbol2:elements \"([a-zA-Z]*)\"''', - sparql_query) - if sequence_search: - sequence = sequence_search.group(1) - - flags = {} - flag_search = re.finditer(r'''# flag_([a-zA-Z0-9._]*): ([a-zA-Z0-9./-_]*)''', sparql_query) - for flag in flag_search: - flags[flag.group(1)] = flag.group(2) - - extract_keyword_re = re.compile(r'''CONTAINS\(lcase\(\?displayId\), lcase\('([^']*)'\)\)''') - keywords = [] - for keyword in re.findall(extract_keyword_re, criteria): - keywords.append(keyword) - es_query = ' '.join(keywords).strip() + return parse_sparql_query(sparql_query, is_count_query(sparql_query)) - return es_query, _from, criteria, offset, limit, sequence, flags - - -def extract_allowed_graphs(_from, default_graph_uri): +def extract_allowed_graphs(_from: str, default_graph_uri: str) -> List[str]: """ - Extracts the allowed graphs to search over - + Extracts the allowed graphs to search over. Arguments: _from {string} -- Graph where search originated default_graph_uri {string} -- The default graph URI pulled from SBH @@ -242,34 +238,17 @@ def extract_allowed_graphs(_from, default_graph_uri): Returns: List -- List of allowed graphs """ - allowed_graphs = [] + allowed_graphs = [default_graph_uri] if not _from else [graph.strip()[1:-1] for graph in _from.split('FROM') if graph.strip()[1:-1]] + if config_manager.load_config()['distributed_search']: + allowed_graphs.extend(instance['instanceUrl'] + '/public' for instance in wor_client_.get_wor_instance()) + return allowed_graphs - if utils.get_config()['distributed_search']: - instances = utils.get_wor() - for instance in instances: - allowed_graphs.append(instance['instanceUrl'] + '/public') - - if _from == '': - allowed_graphs.append(default_graph_uri) - return allowed_graphs - else: - for graph in _from.split('FROM'): - graph = graph.strip() - graph = graph[1:len(graph) - 1] - - if graph != '': - allowed_graphs.append(graph) - - return allowed_graphs - - -def is_count_query(sparql_query): +def is_count_query(sparql_query: str) -> bool: return 'SELECT (count(distinct' in sparql_query - -def create_response(count, bindings, return_count): +def create_response(count: int, bindings: List[Dict], return_count: bool) -> Dict: """ - Creates response to be sent back to SBH + Creates response to be sent back to SBH. Arguments: count {int} -- ? @@ -280,30 +259,33 @@ def create_response(count, bindings, return_count): ? -- ? """ if return_count: - response = {"head": - {"link": [], "vars": ["count"]}, - "results": {"distinct": False, "ordered": True, - "bindings": [{"count": { - "type": "typed-literal", - "datatype": "http://www.w3.org/2001/XMLSchema#integer", - "value": "10"}}]}} - response['results']['bindings'][0]['count']['value'] = str(count) - else: - response = {"head": {"link": [], - "vars": ["subject", "displayId", "version", "name", "description", "type", "percentMatch", - "strandAlignment", "CIGAR"]}, - "results": {"distinct": False, "ordered": True, "bindings": []}} - response['results']['bindings'] = bindings - - return response - + return { + "head": {"link": [], "vars": ["count"]}, + "results": { + "distinct": False, + "ordered": True, + "bindings": [{"count": { + "type": "typed-literal", + "datatype": "http://www.w3.org/2001/XMLSchema#integer", + "value": str(count) + } + }] + } + } + return { + "head": { + "link": [], + "vars": ["subject", "displayId", "version", "name", "description", "type", "percentMatch", "strandAlignment", "CIGAR"] + }, + "results": {"distinct": False, "ordered": True, "bindings": bindings} + } -def create_binding(subject, displayId, version, name, description, _type, role, sbol_type, order_by, percentMatch=-1, - strandAlignment='N/A', CIGAR='N/A'): +def create_binding(subject: str, displayId: Optional[str], version: Optional[int], name: Optional[str], description: Optional[str], + _type: Optional[str], role: Optional[str], sbol_type: Optional[str], order_by: Optional[float], + percentMatch: float = -1, strandAlignment: str = 'N/A', CIGAR: str = 'N/A') -> Dict: """ - Creates bindings to be sent to SBH - - Arguments: + Creates bindings to be sent to SBH. + Arguments: subject {string} -- URI of part displayId {string} -- DisplayId of part version {int} -- Version of part @@ -320,92 +302,30 @@ def create_binding(subject, displayId, version, name, description, _type, role, Returns: Dict -- Part and its information + """ binding = {} - - if subject is not None: - binding["subject"] = { - "type": "uri", - "datatype": "http://www.w3.org/2001/XMLSchema#uri", - "value": subject - } - - if displayId is not None: - binding["displayId"] = { - "type": "literal", - "datatype": "http://www.w3.org/2001/XMLSchema#string", - "value": displayId - } - - if version is not None: - binding["version"] = { - "type": "literal", - "datatype": "http://www.w3.org/2001/XMLSchema#string", - "value": version - } - - if name is not None: - binding["name"] = { - "type": "literal", - "datatype": "http://www.w3.org/2001/XMLSchema#string", - "value": name - } - - if description is not None: - binding["description"] = { - "type": "literal", - "datatype": "http://www.w3.org/2001/XMLSchema#string", - "value": description - } - - if _type is not None: - binding["type"] = { - "type": "uri", - "datatype": "http://www.w3.org/2001/XMLSchema#uri", - "value": _type - } - - if role is not None: - binding["role"] = { - "type": "uri", - "datatype": "http://www.w3.org/2001/XMLSchema#uri", - "value": role - } - - if sbol_type is not None: - binding["sboltype"] = { - "type": "uri", - "datatype": "http://www.w3.org/2001/XMLSchema#uri", - "value": sbol_type - } - - if order_by is not None: - binding["order_by"] = order_by - - if percentMatch != -1: - binding["percentMatch"] = { - "type": "literal", - "datatype": "http://www.w3.org/2001/XMLSchema#string", - "value": str(percentMatch) - } - - if strandAlignment != 'N/A': - binding["strandAlignment"] = { - "type": "literal", - "datatype": "http://www.w3.org/2001/XMLSchema#string", - "value": strandAlignment - } - - if CIGAR != 'N/A': - binding["CIGAR"] = { - "type": "literal", - "datatype": "http://www.w3.org/2001/XMLSchema#string", - "value": CIGAR - } - + attributes = { + "subject": subject, + "displayId": displayId, + "version": str(version) if version is not None else None, + "name": name, + "description": description, + "type": _type, + "role": role, + "sboltype": sbol_type, + "order_by": order_by, + "percentMatch": str(percentMatch) if percentMatch != -1 else None, + "strandAlignment": strandAlignment if strandAlignment != 'N/A' else None, + "CIGAR": CIGAR if CIGAR != 'N/A' else None + } + for key, value in attributes.items(): + if value is not None: + datatype = "http://www.w3.org/2001/XMLSchema#uri" if key in ["subject", "type", "role", "sboltype"] else "http://www.w3.org/2001/XMLSchema#string" + ltype = "uri" if key in ["subject", "type", "role", "sboltype"] else "literal" + binding[key] = {"type": ltype, "value": str(value), "datatype": datatype} if not key=="order_by" else order_by return binding - def create_bindings(es_response, clusters, allowed_graphs, allowed_subjects=None): """ Creates the mass binding consisting of all parts in the search @@ -421,43 +341,50 @@ def create_bindings(es_response, clusters, allowed_graphs, allowed_subjects=None Returns: Dict -- All parts and their corresponding information """ + if es_response is None or 'hits' not in es_response or 'hits' not in es_response['hits']: + logger_.log("[ERROR] Elasticsearch response is None or malformed.") + return [] + bindings = [] cluster_duplicates = set() + allowed_subjects_set = set(allowed_subjects) if allowed_subjects else None + for hit in es_response['hits']['hits']: _source = hit['_source'] _score = hit['_score'] subject = _source['subject'] - if allowed_subjects is not None and subject not in allowed_subjects: + if allowed_subjects_set and subject not in allowed_subjects_set: continue - if _source.get('graph') not in allowed_graphs: + graph = _source.get('graph') + if graph not in allowed_graphs: continue if subject in cluster_duplicates: - _score = _score / 2.0 + _score /= 2.0 elif subject in clusters: cluster_duplicates.update(clusters[subject]) if _source.get('type') is not None and 'http://sbols.org/v2#Sequence' in _source.get('type'): - _score = _score / 10.0 - - binding = create_binding(subject, - _source.get('displayId'), - _source.get('version'), - _source.get('name'), - _source.get('description'), - _source.get('type'), - _source.get('role'), - _source.get('sboltype'), - _score - ) + _score /= 10.0 + + binding = create_binding( + subject, + _source.get('displayId'), + _source.get('version'), + _source.get('name'), + _source.get('description'), + _source.get('type'), + _source.get('role'), + _source.get('sboltype'), + _score + ) bindings.append(binding) return bindings - def create_criteria_bindings(criteria_response, uri2rank, sequence_search=False, ucTableName=''): """ Creates binding for all non-string or non-empty searches @@ -477,44 +404,43 @@ def create_criteria_bindings(criteria_response, uri2rank, sequence_search=False, parts = (p for p in criteria_response if p.get('role') is None or 'http://wiki.synbiohub.org' in p.get('role')) for part in parts: subject = part.get('subject') + pagerank = uri2rank.get(subject, 1) - if subject not in uri2rank: - pagerank = 1 - else: - pagerank = uri2rank[subject] - - if part.get('type') is not None and 'http://sbols.org/v2#Sequence' in part.get('type'): - pagerank = pagerank / 10.0 + if 'http://sbols.org/v2#Sequence' in part.get('type', ''): + pagerank /= 10.0 if sequence_search: - pagerank = pagerank * (float(get_percent_match(part.get('subject'), ucTableName)) / 100) - binding = create_binding(part.get('subject'), - part.get('displayId'), - part.get('version'), - part.get('name'), - part.get('description'), - part.get('type'), - part.get('role'), - part.get('sboltype'), - pagerank, - get_percent_match(part.get('subject'), ucTableName), - get_strand_alignment(part.get('subject'), ucTableName), - get_cigar_data(part.get('subject'), ucTableName)) - + percent_match = float(get_percent_match(subject, ucTableName)) / 100 + binding = create_binding( + subject, + part.get('displayId'), + part.get('version'), + part.get('name'), + part.get('description'), + part.get('type'), + part.get('role'), + part.get('sboltype'), + pagerank * percent_match, + percent_match, + get_strand_alignment(subject, ucTableName), + get_cigar_data(subject, ucTableName) + ) else: - binding = create_binding(part.get('subject'), - part.get('displayId'), - part.get('version'), - part.get('name'), - part.get('description'), - part.get('type'), - part.get('role'), - part.get('sboltype'), - pagerank) + binding = create_binding( + subject, + part.get('displayId'), + part.get('version'), + part.get('name'), + part.get('description'), + part.get('type'), + part.get('role'), + part.get('sboltype'), + pagerank + ) bindings.append(binding) - return bindings + return bindings def get_allowed_subjects(criteria_response): """ @@ -525,15 +451,10 @@ def get_allowed_subjects(criteria_response): Returns: Parts the user is allowed to see """ - subjects = set() - - for part in criteria_response: - subjects.add(part['subject']) - - return subjects - + return {part['subject'] for part in criteria_response} def create_similar_criteria(criteria, clusters): + """ Adds filter to query to be sent to Virtuoso Args: @@ -548,7 +469,8 @@ def create_similar_criteria(criteria, clusters): if subject not in clusters or not clusters[subject]: return 'FILTER (?subject != ?subject)' - return 'FILTER (' + ' || '.join(['?subject = <' + duplicate + '>' for duplicate in clusters[subject]]) + ')' + filters = ' || '.join(f'?subject = <{duplicate}>' for duplicate in clusters[subject]) + return f'FILTER ({filters})' def create_sequence_criteria(criteria, uris): @@ -561,10 +483,10 @@ def create_sequence_criteria(criteria, uris): Returns: String containing a SPARQL filter """ - if len(uris) == 0: + if not uris: return '' - - return 'FILTER (' + ' || '.join(['?subject = <' + uri + '>' for uri in uris]) + ')' + filters = ' || '.join(f'?subject = <{uri}>' for uri in uris) + return f'FILTER ({filters})' def parse_allowed_graphs(allowed_graphs): @@ -576,12 +498,7 @@ def parse_allowed_graphs(allowed_graphs): Returns: List of allowed graphs """ - result = '' - for allowed_graph in allowed_graphs: - if allowed_graph is not None: - result += 'FROM <' + allowed_graph + '> ' - return result - + return ' '.join(f'FROM <{graph}>' for graph in allowed_graphs if graph) def search(sparql_query, uri2rank, clusters, default_graph_uri): """ @@ -596,7 +513,7 @@ def search(sparql_query, uri2rank, clusters, default_graph_uri): """ es_query, _from, criteria, offset, limit, sequence, flags = extract_query(sparql_query) - + if criteria.strip() == 'FILTER ()': criteria = '' @@ -610,17 +527,16 @@ def search(sparql_query, uri2rank, clusters, default_graph_uri): allowed_uris = filter_sequence_search_subjects(_from, results) criteria_response = query.query_parts(_from) # Filter searches by URI to hide private parts here instead of on Virtuoso - criteria_response_filtered = [c for c in criteria_response if any(f for f in allowed_uris if f in c.get('subject'))] + criteria_response_filtered = [c for c in criteria_response if any(f in c.get('subject', '') for f in allowed_uris)] bindings = create_criteria_bindings(criteria_response_filtered, uri2rank, True, filename[:-4] + '.uc') - elif len(sequence.strip()) > 0: + elif sequence.strip(): # send sequence search to search.py temp_filename = sequencesearch.write_to_temp(sequence) results = sequencesearch.sequence_search(flags, temp_filename) - allowed_uris = filter_sequence_search_subjects(_from, results) criteria_response = query.query_parts(_from) - criteria_response_filtered = [c for c in criteria_response if any(f for f in allowed_uris if f in c.get('subject'))] + criteria_response_filtered = [c for c in criteria_response if any(f in c.get('subject', '') for f in allowed_uris)] bindings = create_criteria_bindings(criteria_response_filtered, uri2rank, True, temp_filename[:-4] + '.uc') elif 'SIMILAR' in criteria: @@ -634,37 +550,40 @@ def search(sparql_query, uri2rank, clusters, default_graph_uri): criteria_response = query.query_parts(_from, criteria) bindings = create_criteria_bindings(criteria_response, uri2rank) - elif es_query == '' and filterless_criteria == '': + elif es_query == '' and not filterless_criteria: # empty search es_response = empty_search_es(offset, limit, allowed_graphs) bindings = create_bindings(es_response, clusters, allowed_graphs) - bindings.sort(key=lambda binding: binding['order_by'], reverse=True) + bindings.sort(key=lambda b: b['order_by'], reverse=True) return create_response(es_response['hits']['total'], bindings, is_count_query(sparql_query)) else: - - if filterless_criteria == '': + if not filterless_criteria: es_response = search_es(es_query) # pure string search bindings = create_bindings(es_response, clusters, allowed_graphs) - else: # advanced search and string search criteria_response = query.query_parts(_from, filterless_criteria) allowed_subjects = get_allowed_subjects(criteria_response) - if es_query == '': - es_allowed_subject = search_es_allowed_subjects_empty_string(allowed_subjects) - else: - es_allowed_subject = search_es_allowed_subjects(es_query, allowed_subjects) + es_allowed_subject = (search_es_allowed_subjects_empty_string(allowed_subjects) + if es_query == '' + else search_es_allowed_subjects(es_query, allowed_subjects)) bindings = create_bindings(es_allowed_subject, clusters, allowed_graphs, allowed_subjects) - utils.log('Advanced string search complete.') - - bindings.sort(key=lambda binding: binding['order_by'], reverse=True) + logger_.log('Advanced string search complete.') + bindings.sort(key=lambda b: b['order_by'], reverse=True) return create_response(len(bindings), bindings[offset:offset + limit], is_count_query(sparql_query)) +def get_info_from_uc_table(uri, ucTableName, column_index): + with open(ucTableName, 'r') as file: + for line in file: + parts = line.split() + if parts[9] == uri: + return parts[column_index] + return 'N/A' def get_percent_match(uri, ucTableName): """ @@ -676,16 +595,7 @@ def get_percent_match(uri, ucTableName): Returns: Percent match if available, else -1 """ - with open(ucTableName, 'r') as read: - uc_reader = read.read() - lines = uc_reader.splitlines() - - for line in lines: - line = line.split() - if line[9] == uri: - return line[3] - - return -1 + return get_info_from_uc_table(uri, ucTableName, 3) def get_strand_alignment(uri, ucTableName): @@ -698,38 +608,10 @@ def get_strand_alignment(uri, ucTableName): Returns: + or - """ - with open(ucTableName, 'r') as read: - uc_reader = read.read() - lines = uc_reader.splitlines() - - for line in lines: - line = line.split() - if line[9] == uri: - return line[4] - - return 'N/A' - + return get_info_from_uc_table(uri, ucTableName, 4) def get_cigar_data(uri, ucTableName): - """ - Gets the CIGAR data of a part (see https://genome.sph.umich.edu/wiki/SAM) - Args: - uri: URI of the part - ucTableName: UClust table - - Returns: CIGAR data if found, or N/A - - """ - with open(ucTableName, 'r') as read: - uc_reader = read.read() - lines = uc_reader.splitlines() - - for line in lines: - line = line.split() - if line[9] == uri: - return line[7] - - return 'N/A' + return get_info_from_uc_table(uri, ucTableName, 7) def filter_sequence_search_subjects(_from, uris): """ @@ -741,9 +623,5 @@ def filter_sequence_search_subjects(_from, uris): _from {list} -- List of allowed graphs uris {list} -- List of URI's from sequence search """ - from_uris = [] - result = re.findall(r"\<([A-Za-z0-9:\/.]+)\>*", _from) - for r in result: - from_uris.append(r) - - return [uri for uri in uris if any(f for f in from_uris if f in uri)] + from_uris = set(re.findall(r"\<([A-Za-z0-9:\/.]+)\>*", _from)) + return [uri for uri in uris if any(f in uri for f in from_uris)] \ No newline at end of file diff --git a/flask/sequencesearch.py b/flask/sequencesearch.py index 6cb899b..f9c0dec 100644 --- a/flask/sequencesearch.py +++ b/flask/sequencesearch.py @@ -1,41 +1,46 @@ -from xml.etree import ElementTree +import os import subprocess -import utils -import query -import cluster -import search -from sys import platform -import base64 import tempfile +from sys import platform +from logger import Logger +import cluster - -# handling selection of VSEARCH binary -if platform == "linux" or platform == "linux2": - vsearch_binary_filename = 'usearch/vsearch_linux' -elif platform == "darwin": - vsearch_binary_filename = 'usearch/vsearch_macos' -else: - utils.log("Sorry, your OS is not supported for sequence based-search.") - -# add valid flags to here -globalFlags = {'maxaccepts': '50', 'id': '0.8', 'iddef': '2', 'maxrejects': '0', 'maxseqlength': '5000', 'minseqlength': '20'} -exactFlags = {} +logger_ = Logger() + +# Handling selection of VSEARCH binary +vsearch_binaries = { + "linux": "usearch/vsearch_linux", + "darwin": "usearch/vsearch_macos" +} + +vsearch_binary_filename = vsearch_binaries.get(platform, None) +if not vsearch_binary_filename: + logger_.log("Sorry, your OS is not supported for sequence-based search.") + +# Predefined global and exact search flags +global_flags = { + 'maxaccepts': '50', + 'id': '0.8', + 'iddef': '2', + 'maxrejects': '0', + 'maxseqlength': '5000', + 'minseqlength': '20' +} +exact_flags = {} def write_to_temp(sequence): """ - Writes text sequence to temp FASTA file for search + Writes a text sequence to a temporary FASTA file for search. Arguments: - sequence {string} -- Sequence to write to file + sequence {str} -- Sequence to write to file Returns: - string -- file path + str -- Path to the temp file """ - temp = tempfile.NamedTemporaryFile(suffix=".fsa",delete=False) - with open(temp.name, 'w') as f: - f.write('>sequence_to_search\n') - f.write('%s\n' % sequence) - return temp.name + with tempfile.NamedTemporaryFile(suffix=".fsa", delete=False, mode='w') as temp_file: + temp_file.write(f'>sequence_to_search\n{sequence}\n') + return temp_file.name # pass in the sequence to this function, replace searchsequence.fsa with the query sequence def run_vsearch_global(fileName): @@ -45,14 +50,16 @@ def run_vsearch_global(fileName): Arguments: fileName {string} -- Path to file """ + # setting maxaccepts to 0 disables the limit (searches for all possible matches) args = [vsearch_binary_filename, '--usearch_global', fileName, '--db', 'dumps/sequences.fsa','--uc', fileName[:-4] + '.uc', '--uc_allhits',] - args = append_flags_to_args(args, globalFlags) - popen = subprocess.Popen(args, stdout=subprocess.PIPE) + args = append_flags_to_args(args, global_flags) + + popen = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) popen.wait() output = popen.stdout.read() - utils.log(output) - + logger_.log(output) + def run_vsearch_exact(fileName): """ Runs the "search_exact" command @@ -62,12 +69,11 @@ def run_vsearch_exact(fileName): """ # setting maxaccepts to 0 disables the limit (searches for all possible matches) args = [vsearch_binary_filename, '--search_exact', fileName, '--db', 'dumps/sequences.fsa','--uc', fileName[:-4] + '.uc', '--uc_allhits'] - args = append_flags_to_args(args, exactFlags) + args = append_flags_to_args(args, exact_flags) popen = subprocess.Popen(args, stdout=subprocess.PIPE) popen.wait() output = popen.stdout.read() - utils.log(output) - + logger_.log(output) def append_flags_to_args(argsList, flags): """ @@ -93,9 +99,8 @@ def add_global_flags(userFlags): userFlags {dict} -- flags selected by user """ for flag in userFlags: - if flag in globalFlags: - globalFlags[flag] = userFlags[flag] - + if flag in global_flags: + global_flags[flag] = userFlags[flag] def add_exact_flags(userFlags): """ @@ -107,32 +112,29 @@ def add_exact_flags(userFlags): userFlags {dict} -- flags selected by user """ for flag in userFlags: - if flag in exactFlags: - exactFlags[flag] = userFlags[flag] + if flag in exact_flags: + exact_flags[flag] = userFlags[flag] - -def sequence_search(userFlags, fileName): +def sequence_search(user_flags, file_name): """ - Main method - - Handles all search queries + Handles all search queries. Arguments: - userFlags {dict} -- flags selected by user - fileName {string} -- path to temp file + user_flags {dict} -- Flags selected by the user + file_name {str} -- Path to the temp file Returns: - set -- search results by URI + set -- Search results by URI """ - utils.log('Starting sequence search') - - if "search_exact" in userFlags: - add_exact_flags(userFlags) - run_vsearch_exact(fileName) + logger_.log('Starting sequence search') + + if "search_exact" in user_flags: + add_exact_flags(user_flags) + run_vsearch_exact(file_name) else: - add_global_flags(userFlags) - run_vsearch_global(fileName) - utils.log('Sequence search complete') - - return cluster.uclust2uris(fileName[:-4] + '.uc') + add_global_flags(user_flags) + run_vsearch_global(file_name) + logger_.log('Sequence search complete') + + return cluster.uclust2uris(file_name[:-4] + '.uc') diff --git a/flask/start.sh b/flask/start.sh index 66c1465..2b9765f 100755 --- a/flask/start.sh +++ b/flask/start.sh @@ -1,7 +1,7 @@ #!/bin/bash echo "Starting SBOLExplorer" - +source ../../jammy/bin/activate export FLASK_APP=explorer.py export FLASK_ENV=development flask run --host=0.0.0.0 --port=13162 diff --git a/flask/utils.py b/flask/utils.py deleted file mode 100644 index db0efc8..0000000 --- a/flask/utils.py +++ /dev/null @@ -1,249 +0,0 @@ -from elasticsearch import Elasticsearch -import json -import pickle -import requests -import datetime -import os - -config = None - -def get_config(): - """ - Gets a copy of the config file - Returns: Config file in JSON - - """ - global config - - if not config: - with open('config.json') as f: - config = json.load(f) - - return config - - -def set_config(new_config): - """ - Overwrites the existing config with a new config file - Args: - new_config: New config file with the updated information - - Returns: - - """ - global config - - config = get_config() - - for key in new_config: - if key in config: - config[key] = new_config[key] - - with open('config.json', 'w') as f: - json.dump(config, f) - - -def save_time(attribute): - """ - Saves the current time to an attribute in the config - Args: - attribute: Config attribute to save current time to - - Returns: - - """ - config = get_config() - - now = datetime.datetime.now() - - config[attribute] = str(now) - - set_config(config) - -def save_update_end_time(): - """ - Save end time of indexing - Returns: - - """ - save_time("last_update_end") - - -def save_update_start_time(): - """ - Save start time of indexing - Returns: - - """ - save_time("last_update_start") - - -def get_wor(): - """ - Gets all instances of SynBioHub from the Web of Registries - Returns: - - """ - try: - instances = requests.get('https://wor.synbiohub.org/instances/') - except Exception: - log('[ERROR] Web of Registries had a problem!') - return [] - - if instances.status_code != 200: - log('[ERROR] Web of Registries had a problem!') - return [] - - return instances.json() - - -def get_es(): - """ - Gets an instance of elasticsearch - Returns: The instance of elasticsearch - - """ - es = Elasticsearch([get_config()['elasticsearch_endpoint']], verify_certs=True) - - if not es.ping(): - raise ValueError('Elasticsearch connection failed') - - return es - - -def log(message): - """ - Writes a message to the log - Args: - message: Message to write - - Returns: - - """ - log_message = '[' + str(datetime.datetime.now()) + '] ' + str(message) + '\n' - print(log_message) - - with open('log.txt', 'a+') as f: - f.write(log_message) - -def log_indexing(message): - log_message = '[' + str(datetime.datetime.now()) + '] ' + str(message) + '\n' - print(log_message) - - with open('indexing_log.txt', 'a+') as f: - f.write(log_message) - -def get_log(): - """ - Gets a copy of the log - Returns: Stream from the read() method - - """ - try: - with open('log.txt', 'r') as f: - return f.read() - except: - return "" - -def get_indexing_log(): - try: - with open('indexing_log.txt', 'r') as f: - return f.read() - except: - return "" - -clusters = None -clusters_filename = 'dumps/clusters_dump' - -uri2rank = None -uri2rank_filename = 'dumps/uri2rank_dump' - - -def save_clusters(new_clusters): - """ - Save clusters of parts - Args: - new_clusters: Clusters to be saved - - Returns: - - """ - global clusters - clusters = new_clusters - serialize(clusters, clusters_filename) - - -def get_clusters(): - """ - Gets all clusters of parts - Returns: - - """ - global clusters - - if clusters is None: - clusters = deserialize(clusters_filename) - - return clusters - - -def save_uri2rank(new_uri2rank): - """ - Saves the pagerank of all URI's - Args: - new_uri2rank: - - Returns: - - """ - global uri2rank - uri2rank = new_uri2rank - serialize(uri2rank, uri2rank_filename) - - -def get_uri2rank(): - """ - Gets all pageranks of URI's - Returns: - - """ - global uri2rank - - if uri2rank is None: - uri2rank = deserialize(uri2rank_filename) - - return uri2rank - - -def serialize(data, filename): - """ - Serializes some data to a file - Args: - data: Data to be written - filename: File to be written to - - Returns: - - """ - f = open(filename, 'wb') - pickle.dump(data, f) - f.close() - - -def deserialize(filename): - """ - Deserializes data from a serialized file - Args: - filename: Serialized file - - Returns: Deserialized data from file - - """ - if not os.path.exists(filename): - return {} - - f = open(filename, 'rb') - data = pickle.load(f) - f.close() - return data - \ No newline at end of file diff --git a/flask/wor_client.py b/flask/wor_client.py new file mode 100644 index 0000000..3f0f607 --- /dev/null +++ b/flask/wor_client.py @@ -0,0 +1,19 @@ +import requests +from logger import Logger + +logger_ = Logger() +class WORClient: + @staticmethod + def get_wor_instances(): + """ + Gets all instances of SynBioHub from the Web of Registries + Returns: + + """ + try: + response = requests.get('https://wor.synbiohub.org/instances/') + response.raise_for_status() + return response.json() + except requests.RequestException: + logger_.log('[ERROR] Web of Registries had a problem!') + return [] \ No newline at end of file