modules/chempy/dictdb.py (587 lines of code) (raw):

#A* ------------------------------------------------------------------- #B* This file contains source code for the PyMOL computer program #C* copyright 1998-2000 by Warren Lyford Delano of DeLano Scientific. #D* ------------------------------------------------------------------- #E* It is unlawful to modify or remove this copyright notice. #F* ------------------------------------------------------------------- #G* Please see the accompanying LICENSE file for further information. #H* ------------------------------------------------------------------- #I* Additional authors of this source file include: #-* #-* #-* #Z* ------------------------------------------------------------------- from __future__ import print_function import threading import socket import socket # For gethostbyaddr() import sys if sys.version_info[0] == 2: import cPickle import SocketServer else: import pickle as cPickle import socketserver as SocketServer import traceback import copy import os # Dictionary Emulator Object Database # # After being constructed, DictDBLocal and DictDBClient objects # work like dictionary objects, but with the following additional methods: # # standby() optimizes index file to incorporate changes and purges # indexes from memory (indexes will be reread upon next call, if any) # # purge() repacks the database and index files to eliminate wasted space # # get_info() returns DictDBInfo object containing database statistics # # shutdown() terminates the database server (only useful for remote clients) # # reset() clears database # PART 1 # Database Engine, with a thread-safe API class DictDBInfo: def __init__(self): self.used = 0 # bytes containing data self.wasted = 0 # bytes wasted (from deleted/overwitten records) class DictDBLocal: __magic__ = b'*8#~' # 32-bit record stamp for record __delete_magic__ = b'*8#@' # 32-bit record stamp for deleted record def __init__(self,prefix,bin=1,read_only=0): # store important information self.index_file = prefix + ".dbi" # database information (can be reconstructed from .dbf) self.data_file = prefix + ".dbf" # database data self.bin = bin self.changed = 0 self.read_only = read_only # create lock self.lock = threading.RLock() # NOTE: recursive for convenience # restore indexed into memory self._restore() def get_info(self): result = None # restore dictionary (if nec.) if not hasattr(self,'rec'): self._restore() try: self.lock.acquire() # generate independent copy for calling thread result = copy.deepcopy(self.info) finally: self.lock.release() return result def has_key(self,key): result = None # restore dictionary (if nec.) if not hasattr(self,'rec'): self._restore() try: self.lock.acquire() result = key in self.rec finally: self.lock.release() return result __contains__ = has_key def keys(self): result = None # restore dictionary (if nec.) if not hasattr(self,'rec'): self._restore() try: self.lock.acquire() # get list of keys (thread safe...result is a new object) result = list(self.rec.keys()) finally: self.lock.release() return result def __delitem__(self,key): result = None # restore dictionary (if nec.) if not hasattr(self,'rec'): self._restore() try: self.lock.acquire() if key in self.rec: # account for space if key in self.rec: self.info.wasted = self.info.wasted + self.rec[key][1] # delete record del self.rec[key] if not self.read_only: # write delete magic to data file for recovery f=open(self.data_file,'ab') f.write(self.__delete_magic__) cPickle.dump(key,f,self.bin) f.close() # write blank index to index file f=open(self.index_file,'ab') cPickle.dump((key,None),f,self.bin) f.close() # note change self.changed = 1 else: raise KeyError(key) finally: self.lock.release() return result def standby(self): result = None # only do something if dictionary is already in RAM if hasattr(self,'rec') and not self.read_only: try: self.lock.acquire() # write index file f=open(self.index_file,'wb') cPickle.dump(self.info,f,self.bin) cPickle.dump(self.rec,f,self.bin) f.close() # free memory del self.rec del self.info finally: self.lock.release() return result def reset(self): result = None try: self.lock.acquire() if not self.read_only: # make sure files exist and are writable f = open(self.index_file,'wb') f.close() f = open(self.data_file,'wb') f.close() # new database information self.info = DictDBInfo() self.rec = {} self.changed = 1 self.standby() # write out blank indexes (important) finally: self.lock.release() return result def purge(self): result = None # restore dictionary (if nec.) if not hasattr(self,'rec'): self._restore() try: self.lock.acquire() if not self.read_only: # open source and temporary data files tmp_data = self.data_file + '_tmp' f = open(self.data_file,'rb') g = open(tmp_data,'wb') rec = self.rec used = 0 # iterate through records, copying only those which are extant for key in list(self.rec.keys()): f.seek(rec[key][0]) g.write(self.__magic__) cPickle.dump(key,g,self.bin); start = g.tell() g.write(f.read(rec[key][1])) used = used + rec[key][1] rec[key]=(start,rec[key][1]) # generate new record entry f.close() g.close() # now perform the switch-over os.unlink(self.index_file) # delete old index file... os.unlink(self.data_file) # delete old data file os.rename(tmp_data,self.data_file) # move new data file over old # update database information self.info.used = used self.info.wasted = 0 # write new index and information file f=open(self.index_file,'wb') cPickle.dump(self.info,f,self.bin) cPickle.dump(self.rec,f,self.bin) f.close() finally: self.lock.release() return result def shutdown(self): # dummy return None def __getitem__(self,key): # get with object result = None # restore dictionary (if nec.) if not hasattr(self,'rec'): self._restore() try: self.lock.acquire() if key in self.rec: # locate and retrieve object f=open(self.data_file,'rb') f.seek(self.rec[key][0]) result = cPickle.load(f) f.close() else: raise KeyError(key) finally: self.lock.release() return result def _get(self,key): # get with string (for remote connections) result = None # restore dictionary (if nec.) if not hasattr(self,'rec'): self._restore() try: self.lock.acquire() if key in self.rec: # locate and retrieve string f=open(self.data_file,'rb') f.seek(self.rec[key][0]) result = f.read(self.rec[key][1]) f.close() finally: self.lock.release() return result def __setitem__(self,key,object): result = None # restore dictionary (if nec.) if not hasattr(self,'rec'): self._restore() try: self.lock.acquire() # append data onto data file f=open(self.data_file,'ab') f.write(self.__magic__) # for recovery cPickle.dump(key,f,self.bin) # for recovery start = f.tell() cPickle.dump(object,f,self.bin) record_info = (start,f.tell()-start) f.close() # account for space (if replacing) if key in self.rec: self.info.wasted = self.info.wasted + self.rec[key][1] # update record info self.rec[key] = record_info # account for space self.info.used = self.info.used + record_info[1] # append new record onto index file f=open(self.index_file,'ab') cPickle.dump((key,record_info),f,self.bin) f.close() # note change self.changed = 1 finally: self.lock.release() return result def _set(self,key,data_string): # set with string (for remote connections) result = None # restore dictionary (if nec.) if not hasattr(self,'rec'): self._restore() try: self.lock.acquire() self.changed = 1 if not self.read_only: # append data onto data file f=open(self.data_file,'ab') f.write(self.__magic__) # for recovery cPickle.dump(key,f,self.bin) # for recovery start = f.tell() f.write(data_string) record_info = (start,f.tell()-start) f.close() # account for space (if replacing) if key in self.rec: self.info.wasted = self.info.wasted + self.rec[key][1] # update record info self.rec[key] = record_info # account for space self.info.used = self.info.used + record_info[1] if not self.read_only: # append new record onto index file f=open(self.index_file,'ab') cPickle.dump((key,record_info),f,self.bin) f.close() finally: self.lock.release() return result def _recover(self): # rebuild index from data file result = None # need to write recovery routine... self.info = DictDBInfo() self.rec = {} try: try: self.lock.acquire() f=open(self.data_file,'rb') # find length of file f.seek(0,2) eof = f.tell() f.seek(0,0) # create locals for better performance rec = self.rec self_info = self.info while f.tell()!=eof: chk = f.read(4) if chk==self.__magic__: # recover extant record key = cPickle.load(f) start = f.tell() data = cPickle.load(f) record_info = (start,f.tell()-start) # account for space (if replacing) if key in rec: self_info.wasted = self_info.wasted + rec[key][1] rec[key] = record_info # account for psace self_info.used = self_info.used + record_info[1] elif chk==self.__delete_magic__: # delete already recovered record key = cPickle.load(f) if key in rec: # account for space self_info.wasted = self_info.wasted + rec[key][1] del rec[key] else: raise RuntimeError('Bad Magic') except: print(" dictdb error: database recovery failed.") traceback.print_exc() sys.exit(1) if not self.read_only: # write recovered indexes to a new index file f = open(self.index_file,'wb') cPickle.dump(self.info,f,self.bin) cPickle.dump(self.rec,f,self.bin) f.close() finally: self.lock.release() return result def _restore(self): result = None try: self.lock.acquire() # make sure files exist and are writable f = open(self.index_file,'ab') f.close() f = open(self.data_file,'ab') f.close() # read current indexes f = open(self.index_file,'rb') # find length of file f.seek(0,2) eof = f.tell() f.seek(0,0) # start reading and appending recent changes (if any) try: self.info = cPickle.load(f) self.rec = cPickle.load(f) # use locals for better performance rec = self.rec cPickle_load = cPickle.load self_info = self.info while f.tell()!=eof: key, info = cPickle_load(f) if info!=None: # account for space (if replacing) if key in rec: self_info.wasted = self_info.wasted + rec[key][1] # set record infor rec[key] = info # account for space self_info.used = self_info.used + info[1] else: # account for space (if deleting object) self_info.wasted = self_info.wasted + rec[key][1] del rec[key] f.close() except EOFError: # if error occurs when reading indexes, then do a datafile-based recovery f.close() self._recover() finally: self.lock.release() return result # PART 2 # Database client class DictDBClient: def __init__(self,host='localhost',port=8000): self.host=host self.port=port self.sock = None def _remote_call(self,meth,args,kwds): result = None if self.sock == None: self.sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.sock.connect((self.host,self.port)) self.send = self.sock.makefile('w') self.recv = self.sock.makefile('r') cPickle.dump(meth,self.send,1) # binary by default cPickle.dump(args,self.send,1) cPickle.dump(kwds,self.send,1) self.send.flush() result = cPickle.load(self.recv) return result def __getitem__(self,key): if self._remote_call('has_key',(key,),{}): return cPickle.loads(self._remote_call('_get',(key,),{})) else: raise KeyError(key) def __setitem__(self,key,object): self.changed = 1 return self._remote_call('_set',(key,cPickle.dumps(object)),{}) def __delitem__(self,key): if self._remote_call('has_key',(key,),{}): return self._remote_call('__delitem__',(key,),{}) else: raise KeyError(key) def standby(self): return self._remote_call('standby',(),{}) def shutdown(self): try: # multiple connections are sometimes required... self._remote_call('shutdown',(),{}) self.sock=None self._remote_call('shutdown',(),{}) self.sock=None self._remote_call('shutdown',(),{}) self.sock=None self._remote_call('shutdown',(),{}) except: pass return None def purge(self): return self._remote_call('purge',(),{}) def reset(self): return self._remote_call('reset',(),{}) def keys(self): return self._remote_call('keys',(),{}) def has_key(self,key): return self._remote_call('has_key',(key,),{}) __contains__ = has_key def get_info(self): return self._remote_call('get_info',(),{}) # PART 3 # Database Socket Server class DictDBServer: def __init__(self,prefix,port=''): sys.setcheckinterval(0) server_address = ('', port) ddbs = _DictDBServer(server_address, DictDBRequestHandler) # assign dict database to this server ddbs.dictdb = DictDBLocal(prefix) # now serve requests forever ddbs.keep_alive = 1 while ddbs.keep_alive: ddbs.handle_request() class _DictDBServer(SocketServer.ThreadingTCPServer): def server_bind(self): """Override server_bind to store the server name.""" SocketServer.ThreadingTCPServer.server_bind(self) host, port = self.socket.getsockname() if not host or host == '0.0.0.0': host = socket.gethostname() hostname, hostnames, hostaddrs = socket.gethostbyaddr(host) if '.' not in hostname: for host in hostnames: if '.' in host: hostname = host break self.server_name = hostname self.server_port = port class DictDBRequestHandler(SocketServer.StreamRequestHandler): def handle(self): while self.server.keep_alive: # get method name from client try: method = cPickle.load(self.rfile) except (EOFError, socket.error): break if method == 'shutdown': self.server.keep_alive = 0 # get arguments from client args = cPickle.load(self.rfile) kw = cPickle.load(self.rfile) # get method pointer meth_obj = getattr(self.server.dictdb,method) # print method,args,kw # call method and return result cPickle.dump(meth_obj(*args, **kw),self.wfile,1) # binary by default self.wfile.flush() def server_test(port = 8000,prefix='test_dictdb'): print('Testing DictDBServer on port',str(port)) fp = DictDBServer(prefix,port=port) # socket servers don't terminate def client_test(host,port=8000): print('Testing Client with Server on port',str(port)) tdb = DictDBClient(port=port) tdb['test']='hello' print(tdb['test']) try: print(tdb['nonexistent']) except KeyError: print(" key error 1 as expected") try: del tdb['nonexistent'] except KeyError: print(" key error 2 as expected") tdb['extra'] = 'hi' print('extra' in tdb) print(list(tdb.keys())) if __name__=='__main__': import os print('***Testing DictDB***:') ddb = DictDBLocal('test_dictdb') print(list(ddb.keys())) ddb['test']='some data object' print(ddb['test']) ddb['another']='another data object' print(ddb['another']) ddb['test']='some updated data object' print(ddb['test']) del ddb ddb = DictDBLocal('test_dictdb') ddb['whoa']='whoa data object' print(ddb['test']) ddb['dude']='dude data object' print(ddb['dude']) ddb['number'] = 9999 print(ddb['number']) ddb.standby() del ddb ddb = DictDBLocal('test_dictdb') print('Current keys:'+str(list(ddb.keys()))) print(ddb['test']) print(ddb['another']) print('***Testing raw string methods:') print(cPickle.loads(ddb._get('test'))) print(cPickle.loads(ddb._get('another'))) ddb._set('some_key',cPickle.dumps("raw test1")) print(ddb['some_key']) info = ddb.get_info() print(info.used,info.wasted,info.used-info.wasted) del ddb os.unlink("test_dictdb.dbi") ddb = DictDBLocal('test_dictdb') info = ddb.get_info() print(info.used,info.wasted,info.used-info.wasted) del ddb os.unlink("test_dictdb.dbi") ddb = DictDBLocal('test_dictdb') info = ddb.get_info() print(info.used,info.wasted,info.used-info.wasted) ddb.standby() del ddb ddb = DictDBLocal('test_dictdb') info = ddb.get_info() print(info.used,info.wasted,info.used-info.wasted) del ddb ddb = DictDBLocal('test_dictdb') ddb.standby() ddb['reactivated'] = 1234 print('reactivated' in ddb) ddb.standby() ddb['reactivated'] = 495 print(ddb['reactivated']) try: print(ddb['nonexistent']) except KeyError: print(" key error 1 as expected") try: del ddb['nonexistent'] except KeyError: print(" key error 2 as expected") print("purge test:") info = ddb.get_info() print(info.used,info.wasted,info.used-info.wasted) ddb.purge() info = ddb.get_info() print(info.used,info.wasted,info.used-info.wasted) del ddb ddb = DictDBLocal('test_dictdb') info = ddb.get_info() print(info.used,info.wasted,info.used-info.wasted) ddb['hello']=10 ddb[123] = 'bacon' ddb['green']= 'color' print(ddb['hello']) print(list(ddb.keys())) print('green' in ddb) print('blue' in ddb) del ddb['hello'] try: print(ddb['hello']) except KeyError: print(' got expected key error 1') try: del ddb['hello'] except KeyError: print(' got expected key error 2') info = ddb.get_info() print(info.used,info.wasted,info.used-info.wasted) ddb.purge() info = ddb.get_info() print(info.used,info.wasted,info.used-info.wasted) print(ddb['green']) import random ddc=DictDBLocal('test_dictdb') print("loading...") lst = [] for a in range(1,250): ddc[a]=str(a) lst.append([random.random(),a]) ddc[str(a)] = a info = ddc.get_info() print('used:',info.used,'wasted:',info.wasted,'extant:',info.used-info.wasted) print('verifying...') for a in range(1,250): if ddc[a]!=str(a): raise RuntimeError for a in range(1,250): del ddc[str(a)] info = ddc.get_info() print('used:',info.used,'wasted:',info.wasted,'extant:',info.used-info.wasted) print('verifying...') for a in range(1,250): if ddc[a]!=str(a): raise RuntimeError del ddc ddc=DictDBLocal('test_dictdb') info = ddc.get_info() print('used:',info.used,'wasted:',info.wasted,'extant:',info.used-info.wasted) print('verifying...') for a in range(1,250): if ddc[a]!=str(a): raise RuntimeError ddc.standby() del ddc os.unlink("test_dictdb.dbi") print("unlink test") ddc=DictDBLocal('test_dictdb') info = ddc.get_info() print('used:',info.used,'wasted:',info.wasted,'extant:',info.used-info.wasted) print('verifying...') for a in range(1,250): if ddc[a]!=str(a): raise RuntimeError ddc.purge() info = ddc.get_info() print('used:',info.used,'wasted:',info.wasted,'extant:',info.used-info.wasted) print('verifying...') for a in range(1,250): if ddc[a]!=str(a): raise RuntimeError info = ddc.get_info() print('used:',info.used,'wasted:',info.wasted,'extant:',info.used-info.wasted) lst.sort() for a in lst: del ddc[a[1]] info = ddc.get_info() print('used:',info.used,'wasted:',info.wasted,'extant:',info.used-info.wasted) del ddc ddc=DictDBLocal('test_dictdb') info = ddc.get_info() print('used:',info.used,'wasted:',info.wasted,'extant:',info.used-info.wasted) ddc.purge() info = ddc.get_info() print('used:',info.used,'wasted:',info.wasted,'extant:',info.used-info.wasted) del ddc ddc=DictDBLocal('test_dictdb') info = ddc.get_info() print('used:',info.used,'wasted:',info.wasted,'extant:',info.used-info.wasted) print(list(ddc.keys())) ddc.reset() del ddc ddc=DictDBLocal('test_dictdb') info = ddc.get_info() print('used:',info.used,'wasted:',info.wasted,'extant:',info.used-info.wasted)