modules/pymol/internal.py (562 lines of code) (raw):

from __future__ import print_function, absolute_import import os import sys cmd = sys.modules["pymol.cmd"] from pymol import _cmd import threading import traceback if sys.version_info[0] == 2: import thread import urllib2 else: import _thread as thread import urllib.request as urllib2 import re import time import pymol import chempy.io from .cmd import DEFAULT_ERROR, DEFAULT_SUCCESS, loadable, _load2str, Shortcut, \ is_string, is_ok # cache management: def _cache_validate(_self=cmd): r = DEFAULT_SUCCESS try: _self.lock_data(_self) _pymol = _self._pymol if not hasattr(_pymol,"_cache"): _pymol._cache = [] if not hasattr(_pymol,"_cache_memory"): _pymol._cache_memory = 0 finally: _self.unlock_data(_self) def _cache_clear(_self=cmd): r = DEFAULT_SUCCESS try: _self.lock_data(_self) _pymol = _self._pymol _pymol._cache = [] _pymol._cache_memory = 0 finally: _self.unlock_data(_self) return r def _cache_mark(_self=cmd): r = DEFAULT_SUCCESS try: _self.lock_data(_self) _pymol = _self._pymol _cache_validate(_self) for entry in _self._pymol._cache: entry[5] = 0.0 finally: _self.unlock_data(_self) return r def _cache_purge(max_size, _self=cmd): r = DEFAULT_SUCCESS try: _self.lock_data(_self) _pymol = _self._pymol _cache_validate(_self) if len(_pymol._cache): cur_size = sum(x[0] for x in _pymol._cache) if max_size>=0: # purge to reduce size now = time.time() # sort by last access time new_cache = [[(now-x[5])/x[4],x] for x in _pymol._cache] new_cache.sort() new_cache = [x[1] for x in new_cache] # remove oldest entries one by one until size requirement is met while (cur_size>max_size) and (len(new_cache)>1): entry = new_cache.pop() cur_size = cur_size - entry[0] _pymol._cache = new_cache _pymol._cache_memory = cur_size else: # purge to eliminate unused entries new_cache = [] for entry in _pymol._cache: if entry[5] == 0.0: cur_size = cur_size - entry[0] else: new_cache.append(entry) _pymol._cache = new_cache _pymol._cache_memory = cur_size result = _pymol._cache_memory finally: _self.unlock_data(_self) return result def _cache_get(target, hash_size = None, _self=cmd): result = None try: _self.lock_data(_self) try: if hash_size == None: hash_size = len(target[1]) key = target[1][0:hash_size] # should optimize this with a dictionary lookup, key -> index in _cache for entry in _self._pymol._cache: if entry[1][0:hash_size] == key: if entry[2] == target[2]: while len(entry)<6: entry.append(0) entry[4] = entry[4] + 1 # access count entry[5] = time.time() # timestamp result = entry[3] break except: traceback.print_exc() finally: _self.unlock_data(_self) return result def _cache_set(new_entry, max_size, _self=cmd): r = DEFAULT_SUCCESS try: _self.lock_data(_self) _pymol = _self._pymol _cache_validate(_self) try: hash_size = len(new_entry[1]) key = new_entry[1][0:hash_size] count = 0 found = 0 new_entry[4] = new_entry[4] + 1 # incr access count new_entry[5] = time.time() # timestamp for entry in _pymol._cache: if entry[1][0:hash_size] == key: if entry[2] == new_entry[2]: # dupe (shouldn't happen) entry[3] = new_entry[3] found = 1 break count = count + 1 if not found: _pymol._cache.append(new_entry) _pymol._cache_memory = _pymol._cache_memory + new_entry[0] if max_size > 0: if _pymol._cache_memory > max_size: _cache_purge(max_size, _self) except: traceback.print_exc() finally: _self.unlock_data(_self) return r # ray tracing threads def _ray_anti_spawn(thread_info,_self=cmd): # WARNING: internal routine, subject to change # internal routine to support multithreaded raytracing thread_list = [] for a in thread_info[1:]: t = threading.Thread(target=_cmd.ray_anti_thread, args=(_self._COb,a)) t.setDaemon(1) thread_list.append(t) for t in thread_list: t.start() _cmd.ray_anti_thread(_self._COb,thread_info[0]) for t in thread_list: t.join() def _ray_hash_spawn(thread_info,_self=cmd): # WARNING: internal routine, subject to change # internal routine to support multithreaded raytracing thread_list = [] for a in thread_info[1:]: if a != None: t = threading.Thread(target=_cmd.ray_hash_thread, args=(_self._COb,a)) t.setDaemon(1) thread_list.append(t) for t in thread_list: t.start() if thread_info[0] != None: _cmd.ray_hash_thread(_self._COb,thread_info[0]) for t in thread_list: t.join() def _ray_spawn(thread_info,_self=cmd): # WARNING: internal routine, subject to change # internal routine to support multithreaded raytracing thread_list = [] for a in thread_info[1:]: t = threading.Thread(target=_cmd.ray_trace_thread, args=(_self._COb,a)) t.setDaemon(1) thread_list.append(t) for t in thread_list: t.start() _cmd.ray_trace_thread(_self._COb,thread_info[0]) for t in thread_list: t.join() def _coordset_update_thread(list_lock,thread_info,_self=cmd): # WARNING: internal routine, subject to change while 1: list_lock.acquire() if not len(thread_info): list_lock.release() break else: info = thread_info.pop(0) list_lock.release() _cmd.coordset_update_thread(_self._COb,info) def _coordset_update_spawn(thread_info,n_thread,_self=cmd): # WARNING: internal routine, subject to change if len(thread_info): list_lock = threading.Lock() # mutex for list thread_list = [] for a in range(1,n_thread): t = threading.Thread(target=_coordset_update_thread, args=(list_lock,thread_info)) t.setDaemon(1) thread_list.append(t) for t in thread_list: t.start() _coordset_update_thread(list_lock,thread_info) for t in thread_list: t.join() def _object_update_thread(list_lock,thread_info,_self=cmd): # WARNING: internal routine, subject to change while 1: list_lock.acquire() if not len(thread_info): list_lock.release() break else: info = thread_info.pop(0) list_lock.release() _cmd.object_update_thread(_self._COb,info) def _object_update_spawn(thread_info,n_thread,_self=cmd): # WARNING: internal routine, subject to change if len(thread_info): list_lock = threading.Lock() # mutex for list thread_list = [] for a in range(1,n_thread): t = threading.Thread(target=_object_update_thread, args=(list_lock,thread_info)) t.setDaemon(1) thread_list.append(t) for t in thread_list: t.start() _object_update_thread(list_lock,thread_info) for t in thread_list: t.join() # status reporting # do command (while API already locked) def _do(cmmd,log=0,echo=1,_self=cmd): return _cmd.do(_self._COb,cmmd,log,echo) # movie rendering def _mpng(prefix, first=-1, last=-1, preserve=0, modal=0, format=-1, mode=-1, quiet=1, width=0, height=0, _self=cmd): # INTERNAL format = int(format) # WARNING: internal routine, subject to change try: _self.lock(_self) fname = prefix if re.search("[0-9]*\.png$",fname): # remove numbering, etc. fname = re.sub("[0-9]*\.png$","",fname) if re.search("[0-9]*\.ppm$",fname): if format<0: format = 1 # PPM fname = re.sub("[0-9]*\.ppm$","",fname) if format<0: format = 0 # default = PNG fname = cmd.exp_path(fname) r = _cmd.mpng_(_self._COb,str(fname),int(first), int(last),int(preserve),int(modal), format,int(mode),int(quiet), int(width), int(height)) finally: _self.unlock(-1,_self) return r # copy image def _copy_image(_self=cmd,quiet=1): r = DEFAULT_ERROR try: _self.lock(_self) r = _cmd.copy_image(_self._COb,int(quiet)) finally: _self.unlock(r,_self) return r # loading def file_read(finfo, _self=cmd): ''' Read a file, possibly gzipped or bzipped, and return the uncompressed file contents as a string. finfo may be a filename, URL or open file handle. ''' try: if not is_string(finfo): handle = finfo elif '://' in finfo: req = urllib2.Request(finfo, headers={'User-Agent': 'PyMOL/' + _self.get_version()[0]}) handle = urllib2.urlopen(req) else: handle = open(finfo, 'rb') contents = handle.read() handle.close() except IOError: raise pymol.CmdException('failed to open file "%s"' % finfo) if contents[:2] == b'\x1f\x8b': # gzip magic number import io, gzip fakestream = io.BytesIO(contents) return gzip.GzipFile(fileobj=fakestream).read() if contents[:2] == b'BZ' and contents[4:10] == b'1AY&SY': # bzip magic import bz2 return bz2.decompress(contents) return contents def download_chem_comp(resn, quiet=1, _self=cmd): ''' WARNING: internal routine, subject to change Download the chemical components CIF for the given residue name and return its local filename, or an empty string on failure. ''' filename = os.path.join(_self.get('fetch_path'), resn + ".cif") if os.path.exists(filename): return filename url = "ftp://ftp.ebi.ac.uk/pub/databases/msd/pdbechem/files/mmcif/" + resn + ".cif" url = "http://files.rcsb.org/ligands/download/" + resn + ".cif" if not quiet: print(' Downloading ' + url) try: contents = _self.file_read(url) if not contents: raise except: print(' Error: Download failed') return '' try: with open(filename, 'wb') as handle: handle.write(contents) except IOError as e: print(e) print('Your "fetch_path" setting might point to a read-only directory') return '' if not quiet: print(' ->' + filename) return filename def _load(oname,finfo,state,ftype,finish,discrete, quiet=1,multiplex=0,zoom=-1,mimic=1, plugin='', object_props=None, atom_props=None, _self=cmd): # WARNING: internal routine, subject to change # caller must already hold API lock # NOTE: state index assumes 1-based state r = DEFAULT_ERROR size = 0 if ftype not in (loadable.model,loadable.brick): if True: if ftype in _load2str: finfo = _self.file_read(finfo) ftype = _load2str[ftype] r = _cmd.load(_self._COb,str(oname),finfo,int(state)-1,int(ftype), int(finish),int(discrete),int(quiet), int(multiplex),int(zoom), plugin, object_props, atom_props, int(mimic)) else: try: x = chempy.io.pkl.fromFile(finfo) if isinstance(x, (list, tuple)): for a in x: r = _cmd.load_object(_self._COb,str(oname),a,int(state)-1, int(ftype),0,int(discrete),int(quiet), int(zoom)) if(state>0): state = state + 1 _cmd.finish_object(_self._COb,str(oname)) else: r = _cmd.load_object(_self._COb,str(oname),x, int(state)-1,int(ftype), int(finish),int(discrete), int(quiet),int(zoom)) except: # traceback.print_exc() print("Load-Error: Unable to load file '%s'." % finfo) return r # function keys and other specials modifier_keys = [ '', 'SHFT', 'CTRL', 'CTSH', 'ALT', ] special_key_codes = { # GLUT special key codes (see glutSpecialFunc) 1 : 'F1', 2 : 'F2', 3 : 'F3', 4 : 'F4', 5 : 'F5', 6 : 'F6', 7 : 'F7', 8 : 'F8', 9 : 'F9', 10 : 'F10', 11 : 'F11', 12 : 'F12', 100 : 'left', 101 : 'up', 102 : 'right', 103 : 'down', 104 : 'pgup', 105 : 'pgdn', 106 : 'home', 107 : 'end', 108 : 'insert', } special_key_names = set(special_key_codes.values()) def _invoke_key(key, quiet=0, _self=cmd): '''Invoke a function that was mapped with cmd.set_key()''' try: mapping = _self.key_mappings[key] except KeyError: mapping = None if not mapping: if not quiet: print(" No key mapping for '%s'" % (key)) return False if is_string(mapping): _self.do(mapping) else: fn, args, kwargs = mapping fn(*args, **kwargs) return True def _special(k,x,y,m=0,_self=cmd): # INTERNAL (invoked when special key is pressed) pymol=_self._pymol # WARNING: internal routine, subject to change k=int(k) m=int(m) # convert numeric codes to string key try: key = special_key_codes[k] if m: key = modifier_keys[m] + '-' + key except KeyError: return False # check for explicit mapping if _invoke_key(key, 1, _self): return True # check for scenes and views for (fn, sc) in [ (_self.scene, pymol._scene_dict_sc), (_self.view, pymol._view_dict_sc), ]: if key in sc.keywords: fn(key) return True autocomp = sc.interpret(key + '-') if is_string(autocomp): fn(autocomp) return True print(" No key mapping and no scene or view for '%s'" % (key)) return False # control keys def _ctrl(k,_self=cmd): # WARNING: internal routine, subject to change _invoke_key('CTRL-' + k, 0, _self) # alt keys def _alt(k,_self=cmd): # WARNING: internal routine, subject to change _invoke_key('ALT-' + k.upper(), 0, _self) # command (apple) keys def _cmmd(k,_self=cmd): # WARNING: internal routine, subject to change # command-key on macs if k in _self.cmmd: ak = _self.cmmd[k] if ak[0]!=None: ak[0](*ak[1], **ak[2]) return None def _ctsh(k,_self=cmd): # WARNING: internal routine, subject to change _invoke_key('CTSH-' + k, 0, _self) # writing PNG files (thread-unsafe) def _png(a,width=0,height=0,dpi=-1.0,ray=0,quiet=1,prior=0,format=-1,_self=cmd): # INTERNAL - can only be safely called by GLUT thread (unless prior == 1) # WARNING: internal routine, subject to change try: _self.lock(_self) fname = a if re.search("\.ppm$",fname): if format<0: format = 1 # PPM elif not re.search("\.png$",fname): if a[0:1] != chr(1): # not an encoded file descriptor (integer) fname = fname +".png" if format<0: format = 0 # PNG fname = cmd.exp_path(fname) r = _cmd.png(_self._COb,str(fname),int(width),int(height), float(dpi),int(ray),int(quiet),int(prior),int(format)) finally: _self.unlock(-1,_self) return r # quitting (thread-specific) def _quit(code=0, _self=cmd): pymol=_self._pymol # WARNING: internal routine, subject to change try: _self.lock(_self) try: # flush and close log if possible to avoid threading exception if pymol._log_file!=None: try: pymol._log_file.flush() except: pass pymol._log_file.close() del pymol._log_file except: pass if _self.reaper!=None: try: _self.reaper.join() except: pass r = _cmd.quit(_self._COb, int(code)) finally: _self.unlock(-1,_self) return r # screen redraws (thread-specific) def _refresh(swap_buffers=1,_self=cmd): # Only call with GLUT thread! # WARNING: internal routine, subject to change r = None try: _self.lock(_self) if hasattr(_self._pymol,'glutThread'): if thread.get_ident() == _self._pymol.glutThread: if swap_buffers: r = _cmd.refresh_now(_self._COb) else: r = _cmd.refresh(_self._COb) else: r = _cmd.refresh_later(_self._COb) else: r = _cmd.refresh_later(_self._COb) finally: _self.unlock(-1,_self) return r # stereo (platform dependent ) def _sgi_stereo(flag): # SGI-SPECIFIC - bad bad bad # WARNING: internal routine, subject to change if sys.platform[0:4]=='irix': if os.path.exists("/usr/gfx/setmon"): if flag: mode = os.environ.get('PYMOL_SGI_STEREO','1024x768_96s') os.system("/usr/gfx/setmon -n "+mode) else: mode = os.environ.get('PYMOL_SGI_MONO','72hz') os.system("/usr/gfx/setmon -n "+mode) # color alias interpretation def _interpret_color(_self,color): # WARNING: internal routine, subject to change _validate_color_sc(_self) new_color = _self.color_sc.interpret(color) if new_color: if is_string(new_color): return new_color else: _self.color_sc.auto_err(color,'color') else: return color def _validate_color_sc(_self=cmd): # WARNING: internal routine, subject to change if _self.color_sc == None: # update color shortcuts if needed lst = _self.get_color_indices() names = [x[0] for x in lst] names.extend(['default', 'auto', 'current', 'atomic']) names.extend(_self.get_names_of_type('object:ramp')) _self.color_sc = Shortcut(names) def _invalidate_color_sc(_self=cmd): # WARNING: internal routine, subject to change _self.color_sc = None def _get_color_sc(_self=cmd): # WARNING: internal routine, subject to change _validate_color_sc(_self=_self) return _self.color_sc def _get_feedback(_self=cmd): # INTERNAL # WARNING: internal routine, subject to change l = [] if _self.lock_attempt(_self): try: r = _cmd.get_feedback(_self._COb) while r: l.append(r) r = _cmd.get_feedback(_self._COb) finally: _self.unlock(-1,_self) else: l = None return l get_feedback = _get_feedback # for legacy compatibility def _fake_drag(_self=cmd): # internal _self.lock(_self) try: _cmd.fake_drag(_self._COb) finally: _self.unlock(-1,_self) return 1 def _sdof(tx,ty,tz,rx,ry,rz,_self=cmd): _cmd._sdof(_self._COb,tx,ty,tz,rx,ry,rz) # testing tools # for comparing floating point numbers calculated using # different FPUs and which may show some wobble... def _dump_floats(lst,format="%7.3f",cnt=9): # WARNING: internal routine, subject to change c = cnt for a in lst: print(format%a, end=' ') c = c -1 if c<=0: print() c=cnt if c!=cnt: print() def _dump_ufloats(lst,format="%7.3f",cnt=9): # WARNING: internal routine, subject to change c = cnt for a in lst: print(format%abs(a), end=' ') c = c -1 if c<=0: print() c=cnt if c!=cnt: print()