modules/pymol/commanding.py (521 lines of code) (raw):

#A* ------------------------------------------------------------------- #B* This file contains source code for the PyMOL computer program #C* Copyright (c) Schrodinger, LLC. #D* ------------------------------------------------------------------- #E* It is unlawful to modify or remove this copyright notice. #F* ------------------------------------------------------------------- #G* Please see the accompanying LICENSE file for further information. #H* ------------------------------------------------------------------- #I* Additional authors of this source file include: #-* #-* #-* #Z* ------------------------------------------------------------------- from __future__ import print_function, absolute_import if __name__=='pymol.commanding': import sys if sys.version_info[0] == 2: import thread import urllib2 else: import _thread as thread import urllib.request as urllib2 from io import FileIO as file import re import os import time import threading import traceback from . import parsing cmd = sys.modules["pymol.cmd"] import pymol from .cmd import _cmd, Shortcut, QuietException, \ fb_module, fb_mask, is_list, \ DEFAULT_ERROR, DEFAULT_SUCCESS, is_ok, is_error, is_string def resume(filename, _self=cmd): ''' DESCRIPTION "resume" executes a log file and opens it for recording of additional commands. USAGE resume filename SEE ALSO log, log_close ''' r = DEFAULT_ERROR if os.path.exists(filename): if(re.search(r"\.py$|\.PY$|\.pym$|.PYM$",filename)): r = _self.do("run %s"%filename) else: r = _self.do("@%s"%filename) if is_ok(r): r = _self.do("log_open %s,a"%filename) if _self._raising(r,_self): raise pymol.CmdException return r class QueueFile: def __init__(self,queue): self.queue = queue def write(self,command): self.queue.put(command) def flush(self): pass def close(self): del self.queue re_fetch = re.compile(r'(\bfetch\s+\w[^;\r\n\'"]+)') class LogFile(file): def _append_async0(self, m): s = m.group() if 'async' in s: return s return s.rstrip(',') + ', async=0' def write(self, s): s = re_fetch.sub(self._append_async0, s) file.write(self, s.encode()) def log_open(filename='log.pml', mode='w', _self=cmd): ''' DESCRIPTION "log_open" opens a log file for writing. USAGE log_open filename SEE ALSO log, log_close ''' pymol=_self._pymol if not is_string(filename): # we're logging to Queue, not a file. pymol._log_file = QueueFile(filename) _self.set("logging",1,quiet=1) else: try: try: if hasattr(pymol,"_log_file"): if pymol._log_file!=None: pymol._log_file.close() del pymol._log_file except: pass pymol._log_file = LogFile(filename,mode) if _self._feedback(fb_module.cmd,fb_mask.details): # redundant if mode!='a': print(" Cmd: logging to '%s'."%filename) else: print(" Cmd: appending to '%s'."%filename) if mode=='a': pymol._log_file.write("\n") # always start on a new line if(re.search(r"\.py$|\.PY$|\.pym$|\.PYM$",filename)): _self.set("logging",2,quiet=1) else: _self.set("logging",1,quiet=1) except: print("Error: unable to open log file '%s'"%filename) pymol._log_file = None _self.set("logging",0,quiet=1) traceback.print_exc() raise QuietException def log(text, alt_text=None, _self=cmd): ''' DESCRIPTION "log" writes a command to the log file (if one is open). `text` and/or `alt_text` must include the terminating line feed. ARGUMENTS text = str: PyMOL command (optional if alt_text is given) alt_text = str: Python expression (optional) SEE ALSO log_open, log_close ''' # See also equivalent C impelemtation: PLog log_file = getattr(_self._pymol, "_log_file", None) if log_file is None: return mode = _self.get_setting_int("logging") if mode == 1: # .pml if not text and alt_text: text = '/' + alt_text elif mode == 2: # .py if alt_text: text = alt_text elif text.startswith('/'): text = text[1:] else: text = "cmd.do(" + repr(text.strip()) + ")\n" else: return if text: log_file.write(text) log_file.flush() def log_close(_self=cmd): ''' DESCRIPTION "log_close" closes the current log file (if one is open). USAGE log_close SEE ALSO log, log_open ''' pymol=_self._pymol cmd=_self if hasattr(pymol,"_log_file"): if pymol._log_file!=None: pymol._log_file.close() del pymol._log_file _self.set("logging",0,quiet=1) if _self._feedback(fb_module.cmd,fb_mask.details): # redundant print(" Cmd: log closed.") def cls(_self=cmd): ''' DESCRIPTION "cls" clears the output buffer. USAGE cls ''' r = DEFAULT_ERROR try: _self.lock(_self) r = _cmd.cls(_self._COb) finally: _self.unlock(r,_self) if _self._raising(r,_self): raise pymol.CmdException return r def _load_splash_image(filename, url, _self=cmd): import tempfile import struct tmp_filename = "" contents = None if url: try: handle = urllib2.urlopen(url) contents = handle.read() handle.close() # png magic number if contents[:4] != b'\x89\x50\x4e\x47': raise IOError shape = struct.unpack('>II', contents[16:24]) tmp_filename = tempfile.mktemp('.png') with open(tmp_filename, 'wb') as handle: handle.write(contents) filename = tmp_filename except IOError: pass if os.path.exists(filename) and not _self.get_names(): # hide text splash print() # fit window to image try: if not contents: contents = open(filename, 'rb').read(24) shape = struct.unpack('>II', contents[16:24]) scale = _self.get_setting_int('display_scale_factor') _self.viewport(shape[0] * scale, shape[1] * scale) except Exception as e: print(e) # load image _self.load_png(filename, 0, quiet=1) if tmp_filename: os.unlink(tmp_filename) def splash(mode=0, _self=cmd): cmd=_self ''' DESCRIPTION "splash" shows the splash screen information. USAGE splash ''' r = DEFAULT_ERROR mode = int(mode) if mode == 2: # query try: _self.lock(_self) r = _cmd.splash(_self._COb,1) finally: _self.unlock(0,_self) elif mode == 1: # just show PNG show_splash = 1 try: _self.lock(_self) show_splash = _cmd.splash(_self._COb,1) finally: _self.unlock(0,_self) r = DEFAULT_SUCCESS png_url = "" if show_splash==1: # generic / open-source png_path = _self.exp_path("$PYMOL_DATA/pymol/splash.png") elif show_splash==2: # evaluation builds png_path = _self.exp_path("$PYMOL_DATA/pymol/epymol.png") elif show_splash==3: # edu builds png_path = _self.exp_path("$PYMOL_DATA/pymol/splash_edu.png") png_url = "http://pymol.org/splash/splash_edu_2.png" else: # incentive builds png_path = _self.exp_path("$PYMOL_DATA/pymol/ipymol.png") t = threading.Thread(target=_load_splash_image, args=(png_path, png_url, _self)) t.setDaemon(1) t.start() else: if _self.get_setting_int("internal_feedback") > 0: _self.set("text","1",quiet=1) print() try: _self.lock(_self) r = _cmd.splash(_self._COb,0) finally: _self.unlock(r,_self) if _self._raising(r,_self): raise pymol.CmdException return r reinit_code = { 'everything' : 0, 'settings' : 1, 'store_defaults' : 2, 'original_settings' : 3, 'purge_defaults' : 4, } reinit_sc = Shortcut(reinit_code.keys()) def reinitialize(what='everything', object='', _self=cmd): ''' DESCRIPTION "reinitialize" reinitializes the program by deleting all objects and restoring the default program settings. USAGE reinitialize ''' r = DEFAULT_ERROR what = reinit_code[reinit_sc.auto_err(str(what),'option')] try: _self.lock(_self) r = _cmd.reinitialize(_self._COb,int(what),str(object)) finally: _self.unlock(r,_self) if _self._raising(r,_self): raise pymol.CmdException return r def sync(timeout=1.0,poll=0.05,_self=cmd): ''' DESCRIPTION "sync" is an API-only function which waits until all current commmands have been executed before returning. A timeout can be used to insure that this command eventually returns. PYMOL API cmd.sync(float timeout=1.0,float poll=0.05) SEE ALSO frame ''' for t in async_threads: t.join() now = time.time() timeout = float(timeout) poll = float(poll) # first, make sure there aren't any commands waiting... if _cmd.wait_queue(_self._COb): # commands waiting to be executed? while 1: if not _cmd.wait_queue(_self._COb): break e = threading.Event() # using this for portable delay e.wait(poll) del e if (timeout>=0.0) and ((time.time()-now)>timeout): break if _cmd.wait_deferred(_self._COb): # deferred tasks waiting for a display event? if thread.get_ident() == pymol.glutThread: _self.refresh() else: while 1: if not _cmd.wait_queue(_self._COb): break e = threading.Event() # using this for portable delay e.wait(poll) del e if (timeout>=0.0) and ((time.time()-now)>timeout): break # then make sure we can grab the API while 1: if _self.lock_attempt(_self): _self.unlock(_self) break e = threading.Event() # using this for portable delay e.wait(poll) del e if (timeout>=0.0) and ((time.time()-now)>timeout): break def do(commands,log=1,echo=1,flush=0,_self=cmd): # WARNING: don't call this routine if you already have the API lock # use cmd._do instead ''' DESCRIPTION "do" makes it possible for python programs to issue simple PyMOL commands as if they were entered on the command line. PYMOL API cmd.do( commands ) USAGE (PYTHON) from pymol import cmd cmd.do("load file.pdb") ''' r = DEFAULT_SUCCESS log = int(log) if is_list(commands): cmmd_list = commands else: cmmd_list = [ commands ] n_cmmd = len(cmmd_list) if n_cmmd>1: # if processing a list of commands, defer updates defer = _self.get_setting_int("defer_updates") _self.set('defer_updates',1) for cmmd in cmmd_list: lst = cmmd.splitlines() if len(lst)<2: for a in lst: if(len(a)): try: _self.lock(_self) r = _cmd.do(_self._COb,a,log,echo) finally: _self.unlock(r,_self) else: try: _self.lock(_self) do_flush = flush or ((thread.get_ident() == _self._pymol.glutThread) and _self.lock_api_allow_flush) for a in lst: if len(a): r = _cmd.do(_self._COb,a,log,echo) if do_flush: _self.unlock(r,_self) # flushes _self.lock(_self) finally: _self.unlock(r,_self) if n_cmmd>1: _self.set('defer_updates',defer) if _self._raising(r,_self): raise pymol.CmdException return r def quit(code=0, _self=cmd): ''' DESCRIPTION "quit" terminates the program. USAGE quit [code] ARGUMENTS code = int: exit the application with status "code" {default: 0} PYMOL API cmd.quit(int code) ''' code = int(code) if thread.get_ident() == pymol.glutThread: _self._quit(code, _self) else: try: _self.lock(_self) _cmd.do(_self._COb,"_ time.sleep(0.100);cmd._quit(%d)" % (code),0,0) # allow time for a graceful exit from the calling thread try: thread.exit() except SystemExit: pass finally: _self.unlock(-1,_self=_self) return None def delete(name,_self=cmd): ''' DESCRIPTION "delete" removes objects and named selections USAGE delete name ARGUMENTS name = name(s) of object(s) or selection(s), supports wildcards (*) EXAMPLES delete measure* # delete all objects which names start with "measure" delete all # delete all objects and selections PYMOL API cmd.delete (string name = object-or-selection-name ) SEE ALSO remove ''' r = DEFAULT_ERROR try: _self.lock(_self) r = _cmd.delete(_self._COb,str(name)) finally: _self.unlock(r,_self) if _self._raising(r,_self): raise pymol.CmdException return r def extend(name, function=None, _self=cmd): ''' DESCRIPTION "extend" is an API-only function which binds a new external function as a command into the PyMOL scripting language. PYMOL API cmd.extend(string name,function function) PYTHON EXAMPLE def foo(moo=2): print moo cmd.extend('foo',foo) The following would now work within PyMOL: PyMOL>foo 2 PyMOL>foo 3 3 PyMOL>foo moo=5 5 PyMOL>foo ? Usage: foo [ moo ] NOTES For security reasons, new PyMOL commands created using "extend" are not saved or restored in sessions. SEE ALSO alias, api ''' if function is None: name, function = name.__name__, name _self.keyword[name] = [function, 0,0,',',parsing.STRICT] _self.kwhash.append(name) _self.help_sc.append(name) return function # for aliasing compound commands to a single keyword def extendaa(*arg, **kw): ''' DESCRIPTION API-only function to decorate a function as a PyMOL command with argument auto-completion. EXAMPLE @cmd.extendaa(cmd.auto_arg[0]['zoom']) def zoom_organic(selection='*'): cmd.zoom('organic & (%s)' % selection) ''' _self = kw.get('_self', cmd) auto_arg = _self.auto_arg def wrapper(func): name = func.__name__ _self.extend(name, func) for (i, aa) in enumerate(arg): if i == len(auto_arg): auto_arg.append({}) if aa is not None: auto_arg[i][name] = aa return func return wrapper def alias(name, command, _self=cmd): ''' DESCRIPTION "alias" binds routinely-used command inputs to a new command keyword. USAGE alias name, command ARGUMENTS name = string: new keyword command = string: literal input with commands separated by semicolons. EXAMPLE alias my_scene, hide; show ribbon, polymer; show sticks, organic; show nonbonded, solvent my_scene NOTES For security reasons, aliased commands are not saved or restored in sessions. SEE ALSO cmd.extend, api ''' _self.keyword[name] = [eval("lambda :do('''%s ''')"%command.replace("'''","")), 0,0,',',parsing.STRICT] _self.kwhash.append(name) async_threads = [] def async_(func, *args, **kwargs): ''' DESCRIPTION Run function threaded and show "please wait..." message. ''' from .wizard.message import Message _self = kwargs.pop('_self', cmd) wiz = Message(['please wait ...'], dismiss=0, _self=_self) try: _self.set_wizard(wiz) except: wiz = None if isinstance(func, str): func = _self.keyword[func][0] def wrapper(): async_threads.append(t) try: func(*args, **kwargs) except (pymol.CmdException, cmd.QuietException) as e: if e.args: print(e) finally: if wiz is not None: try: _self.set_wizard_stack([w for w in _self.get_wizard_stack() if w != wiz]) except: _self.do('_ wizard') else: _self.refresh_wizard() async_threads.remove(t) t = threading.Thread(target=wrapper) t.setDaemon(1) t.start()