utils/indigo-service/backend/service/v2/db/BingoPostgresAdapter.py (279 lines of code) (raw):

import hashlib import json import logging import psycopg2 # type: ignore import psycopg2.extras # type: ignore from ..bingo_ql.query import QueryBuilder from ..common.util import merge_dicts from .database import db_session from .models import LibraryMeta bingo_logger = logging.getLogger("bingo") # bingo_logger.addHandler(logging.FileHandler('/srv/api/app.log')) class BingoPostgresAdapter(object): def __init__(self, settings, indigo, indigo_inchi): self._indigo = indigo self._indigo_inchi = indigo_inchi self._builder = QueryBuilder() self._settings = settings.get("BINGO_POSTGRES") self._connection = None @property def connection(self): if not self._connection: self._connection = psycopg2.connect(**self._settings) return self._connection def _get_structure_sql(self, structure, params): stype = params["type"] if stype != "sim": if stype.lower() == "molformula": stype = "gross" sql = "m @ (%(structure)s, %(options)s) :: bingo.{0}".format(stype) bind = { "structure": structure, "options": params["options"], } else: sql = "m @ (%(min_sim)s, %(max_sim)s, %(structure)s, %(metric)s) :: bingo.sim" bind = { "min_sim": params["min_sim"], "max_sim": params["max_sim"], "structure": structure, "metric": params["metric"], } return sql, bind def _get_property_sql(self, input_query): query = self._builder.build_query(input_query) sql = "({0})".format(query) return sql, self._builder.bind_params @staticmethod def get_table_name_for_id(library_id): return "indigoservice.structures_{0}".format( library_id.replace("-", "_") ) @staticmethod def get_index_name(table_name): return "idx_{0}".format( hashlib.sha1(table_name.encode("utf-8")).hexdigest() ) # Library def library_get_info(self, library_id): try: cursor = self.connection.cursor() cursor.execute( "select service_data, user_data, index_data from indigoservice.library_metadata where library_id = %s", (library_id,), ) result = cursor.fetchone() result_dict = {} result_dict["service_data"] = result[0] result_dict["user_data"] = result[1] props = result[2].get("properties", {}) if result[2] else {} result_dict["service_data"]["properties"] = props return result_dict finally: self.connection.commit() def library_create(self, library_name, user_data): try: metalib = LibraryMeta(name=library_name, user_data=user_data) db_session.add(metalib) db_session.commit() cursor = self.connection.cursor() cursor.execute( "create table {0}(s serial, m text not null, p jsonb not null)".format( self.get_table_name_for_id(metalib.library_id) ) ) return metalib.library_id finally: self.connection.commit() def library_get_properties(self, library_id): try: cursor = self.connection.cursor() cursor.execute( "select distinct elems->>'a' from {0}, jsonb_array_elements(p) elems".format( self.get_table_name_for_id(library_id) ) ) props = [] for item in cursor.fetchall(): props.append(item[0]) return props finally: self.connection.commit() def library_update(self, library_id, new_data, index_data=None): try: current_data = self.library_get_info(library_id) new_userdata = ( new_data.pop("user_data") if "user_data" in new_data else None ) service_data = userdata = None if new_data: service_data = current_data["service_data"] service_data.update(new_data) if new_userdata: userdata = current_data["user_data"] userdata.update(new_userdata) cursor = self.connection.cursor() assignment = ", ".join( [ x for x in [ "user_data = '{}'".format(json.dumps(userdata)) if userdata else None, "index_data = '{}'".format(json.dumps(index_data)) if index_data else None, "service_data = '{}'".format(json.dumps(service_data)) if service_data else None, ] if x is not None ] ) cursor.execute( "update indigoservice.library_metadata set {} where library_id = '{}'".format( assignment, library_id ) ) return "OK" finally: self.connection.commit() def library_delete(self, library_id): try: cursor = self.connection.cursor() LibraryMeta.query.filter( LibraryMeta.library_id == library_id ).delete(synchronize_session=False) db_session.commit() cursor.execute( "drop table {0}".format(self.get_table_name_for_id(library_id)) ) return "OK" finally: self.connection.commit() # Library upload def library_upload(self, library_id, stream): raise NotImplementedError() def library_upload_exists(self, library_id, upload_id): raise NotImplementedError() def library_upload_get_status(self, library_id, upload_id): raise NotImplementedError() # Search def make_full_sql(self, subquery, library_id, q_type, idx): table_name = self.get_table_name_for_id(library_id) if q_type == "total": template = """ select s, %(library_id_{{0}})s as library_id from {{1}} struct inner join jsonb_array_elements(struct.p) elems on {0} group by s, library_id""" elif q_type == "property": template = """ select struct.s as id, struct.m as data, struct.p as properties, %(library_id_{{0}})s as library_id, init_struct.matched as matched from {{1}} struct inner join ( select str.s as id, json_agg(elems->>'a') as matched from {{1}} str inner join jsonb_array_elements(str.p) elems on {0} group by str.s ) as init_struct on init_struct.id = struct.s""" else: template = """ select s as id, m as data, p as properties, %(library_id_{{0}})s as library_id from {{1}} where {0}""" template = template.format(subquery) return template.format(idx, table_name), { "library_id_" + idx: library_id } def do_search(self, params): try: bind_params = {} q_text = params["query_text"] q_structure = params["query_structure"] if q_text: prop_sql, bind = self._get_property_sql(q_text) bind_params = merge_dicts(bind_params, bind) else: prop_sql = "" if q_structure: struct_sql, bind = self._get_structure_sql(q_structure, params) bind_params = merge_dicts(bind_params, bind) else: struct_sql = "true" if prop_sql: subquery = " AND ".join([prop_sql, struct_sql]) q_type = "total" if params.get("total", False) else "property" else: subquery = struct_sql q_type = "total" if params.get("total", False) else "structure" sqlqueries = [] for idx, library_id in enumerate(params["library_ids"]): query, bind = self.make_full_sql( subquery, library_id, q_type, str(idx) ) bind_params = merge_dicts(bind_params, bind) sqlqueries.append(query) sql_query = "(" + " UNION ALL ".join(sqlqueries) + ")" cursor = self.connection.cursor() if params.get("total", False): total_query = """ select library_id, json_agg(s) as id_list from {} as combined group by library_id""".format( sql_query ) bingo_logger.info(total_query) cursor.execute(total_query, bind_params) return cursor.fetchall() if params.get("limit", None): sql_query += " limit %(limit)s" bind_params = merge_dicts( bind_params, {"limit": params["limit"]} ) if params.get("offset", None): sql_query += " offset %(offset)s" bind_params = merge_dicts( bind_params, {"offset": params["offset"]} ) bingo_logger.info(sql_query) bingo_logger.info(bind_params) cursor = self.connection.cursor() cursor.execute(sql_query, bind_params) return cursor finally: self.connection.commit() def insert_sdf(self, library_id, data): try: cursor = self.connection.cursor() insert_query = "insert into {0}(m, p) values %s".format( self.get_table_name_for_id(library_id) ) psycopg2.extras.execute_values( cursor, insert_query, data, template=None, page_size=1000 ) finally: self.connection.commit() def create_indices(self, table_name): try: index_name = self.get_index_name(table_name) cursor = self.connection.cursor() cursor.execute( "create index if not exists {0} on {1} using bingo_idx (m bingo.molecule) with (IGNORE_STEREOCENTER_ERRORS=1,IGNORE_CISTRANS_ERRORS=1,FP_TAU_SIZE=0)".format( index_name, table_name ) ) cursor.execute( "create index if not exists {0} on {1} (s)".format( "id_" + index_name, table_name ) ) finally: self.connection.commit() def drop_indices(self, table_name): try: index_name = self.get_index_name(table_name) cursor = self.connection.cursor() cursor.execute("drop index if exists {0}".format(index_name)) cursor.execute( "drop index if exists {0}".format("id_" + index_name) ) finally: self.connection.commit() def user_all(self): try: cursor = self.connection.cursor() cursor.execute( "select user_id, username, email from indigoservice.users" ) result = [] for item in cursor.fetchall(): result.append( { "id": item[0], "username": item[1], "email": item[2], } ) return result finally: self.connection.commit()