tbapi/__init__.py (1,440 lines of code) (raw):

# This file was automatically generated by SWIG (http://www.swig.org). # Version 3.0.12 # # Do not make changes to this file unless you know what you are doing--modify # the SWIG interface file instead. from sys import version_info as _swig_python_version_info from sys import platform as _swig_python_platform platform = 'windows' if _swig_python_platform.startswith('linux'): platform = 'linux' elif _swig_python_platform.startswith('darwin'): platform = 'darwin' if _swig_python_version_info >= (3, 6) and _swig_python_version_info < (3, 7): subdir = 'py36' elif _swig_python_version_info >= (3, 7) and _swig_python_version_info < (3, 8): subdir = 'py37' elif _swig_python_version_info >= (3, 8) and _swig_python_version_info < (3, 9): subdir = 'py38' elif _swig_python_version_info >= (3, 9) and _swig_python_version_info < (3, 10): subdir = 'py39' elif _swig_python_version_info >= (3, 10) and _swig_python_version_info < (3, 11): subdir = 'py310' else: raise Exception('Version of python (' + str(_swig_python_version_info) + ') is not supported') if _swig_python_version_info >= (3, 7, 0): def swig_import_helper(): import importlib mname = '.'.join((__name__, platform, 'x64', subdir, '_tbapi')).lstrip('.') try: return importlib.import_module(mname) except ImportError: return importlib.import_module('_tbapi') _tbapi = swig_import_helper() del swig_import_helper elif _swig_python_version_info >= (2, 6, 0): def swig_import_helper(): from os.path import dirname import imp fp = None try: directory = '/'.join((dirname(__file__), platform, 'x64', subdir)) fp, pathname, description = imp.find_module('_tbapi', [directory]) except ImportError: import _tbapi return _tbapi try: _mod = imp.load_module('_tbapi', fp, pathname, description) finally: if fp is not None: fp.close() return _mod _tbapi = swig_import_helper() del swig_import_helper else: import _tbapi del _swig_python_version_info del _swig_python_platform from os import path def version(): tbapi_dir = path.dirname(__file__) if tbapi_dir != '': tbapi_dir = tbapi_dir + '/project.properties' with open(tbapi_dir) as file: lines = file.readlines() for line in lines: split_line = line.split('=') if len(split_line) == 2: key = split_line[0].strip() value = split_line[1].strip() if key.strip() == 'version' and value != None: return value return 'UNKNOWN' try: _swig_property = property except NameError: pass # Python < 2.2 doesn't have 'property'. try: import builtins as __builtin__ except ImportError: import __builtin__ def _swig_setattr_nondynamic(self, class_type, name, value, static=1): if (name == "thisown"): return self.this.own(value) if (name == "this"): if type(value).__name__ == 'SwigPyObject': self.__dict__[name] = value return method = class_type.__swig_setmethods__.get(name, None) if method: return method(self, value) if (not static): if _newclass: object.__setattr__(self, name, value) else: self.__dict__[name] = value else: raise AttributeError("You cannot add attributes to %s" % self) def _swig_setattr(self, class_type, name, value): return _swig_setattr_nondynamic(self, class_type, name, value, 0) def _swig_getattr(self, class_type, name): if (name == "thisown"): return self.this.own() method = class_type.__swig_getmethods__.get(name, None) if method: return method(self) raise AttributeError("'%s' object has no attribute '%s'" % (class_type.__name__, name)) def _swig_repr(self): try: strthis = "proxy of " + self.this.__repr__() except __builtin__.Exception: strthis = "" return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,) try: _object = object _newclass = 1 except __builtin__.Exception: class _object: pass _newclass = 0 try: import weakref weakref_proxy = weakref.proxy except __builtin__.Exception: weakref_proxy = lambda x: x class SwigPyIterator(_object): __swig_setmethods__ = {} __setattr__ = lambda self, name, value: _swig_setattr(self, SwigPyIterator, name, value) __swig_getmethods__ = {} __getattr__ = lambda self, name: _swig_getattr(self, SwigPyIterator, name) def __init__(self, *args, **kwargs): raise AttributeError("No constructor defined - class is abstract") __repr__ = _swig_repr __swig_destroy__ = _tbapi.delete_SwigPyIterator __del__ = lambda self: None def value(self): return _tbapi.SwigPyIterator_value(self) def incr(self, n=1): return _tbapi.SwigPyIterator_incr(self, n) def decr(self, n=1): return _tbapi.SwigPyIterator_decr(self, n) def distance(self, x): return _tbapi.SwigPyIterator_distance(self, x) def equal(self, x): return _tbapi.SwigPyIterator_equal(self, x) def copy(self): return _tbapi.SwigPyIterator_copy(self) def next(self): return _tbapi.SwigPyIterator_next(self) def __next__(self): return _tbapi.SwigPyIterator___next__(self) def previous(self): return _tbapi.SwigPyIterator_previous(self) def advance(self, n): return _tbapi.SwigPyIterator_advance(self, n) def __eq__(self, x): return _tbapi.SwigPyIterator___eq__(self, x) def __ne__(self, x): return _tbapi.SwigPyIterator___ne__(self, x) def __iadd__(self, n): return _tbapi.SwigPyIterator___iadd__(self, n) def __isub__(self, n): return _tbapi.SwigPyIterator___isub__(self, n) def __add__(self, n): return _tbapi.SwigPyIterator___add__(self, n) def __sub__(self, *args): return _tbapi.SwigPyIterator___sub__(self, *args) def __iter__(self): return self SwigPyIterator_swigregister = _tbapi.SwigPyIterator_swigregister SwigPyIterator_swigregister(SwigPyIterator) from contextlib import contextmanager JAVA_LONG_MIN_VALUE = -9223372036854775808 JAVA_LONG_MAX_VALUE = 9223372036854775807 class InstrumentMessage(object): def __str__(self): return str(vars(self)) class StreamScope(_object): """ Determines the scope of a stream's durability, if any. Example: ``` scope = tbapi.StreamScope('TRANSIENT') ``` Possible values: ``` DURABLE, EXTERNAL_FILE, TRANSIENT, RUNTIME ``` """ __swig_setmethods__ = {} __setattr__ = lambda self, name, value: _swig_setattr(self, StreamScope, name, value) __swig_getmethods__ = {} __getattr__ = lambda self, name: _swig_getattr(self, StreamScope, name) __repr__ = _swig_repr DURABLE = _tbapi.StreamScope_DURABLE EXTERNAL_FILE = _tbapi.StreamScope_EXTERNAL_FILE TRANSIENT = _tbapi.StreamScope_TRANSIENT RUNTIME = _tbapi.StreamScope_RUNTIME def __init__(self, *args): this = _tbapi.new_StreamScope(*args) try: self.this.append(this) except __builtin__.Exception: self.this = this def __int__(self): return _tbapi.StreamScope___int__(self) def __str__(self): return _tbapi.StreamScope___str__(self) __swig_destroy__ = _tbapi.delete_StreamScope __del__ = lambda self: None StreamScope_swigregister = _tbapi.StreamScope_swigregister StreamScope_swigregister(StreamScope) class WriteMode(_object): """ APPEND: Adds only new data into a stream without truncations. REPLACE: Adds data into a stream and removes previous data older that first message time [truncate(first message time + 1)]. REWRITE: Default. Adds data into a stream and removes previous data by truncating using first message time. [truncate(first message time)]. TRUNCATE: Stream truncated every time when loader writes a messages earlier than last message time. Example: ``` mode = tbapi.StreamScope('TRUNCATE') ``` Possible values: ``` APPEND, REPLACE, REWRITE, TRUNCATE ``` """ __swig_setmethods__ = {} __setattr__ = lambda self, name, value: _swig_setattr(self, WriteMode, name, value) __swig_getmethods__ = {} __getattr__ = lambda self, name: _swig_getattr(self, WriteMode, name) __repr__ = _swig_repr APPEND = _tbapi.WriteMode_APPEND REPLACE = _tbapi.WriteMode_REPLACE REWRITE = _tbapi.WriteMode_REWRITE TRUNCATE = _tbapi.WriteMode_TRUNCATE def __init__(self, *args): this = _tbapi.new_WriteMode(*args) try: self.this.append(this) except __builtin__.Exception: self.this = this def __int__(self): return _tbapi.WriteMode___int__(self) def __str__(self): return _tbapi.WriteMode___str__(self) __swig_destroy__ = _tbapi.delete_WriteMode __del__ = lambda self: None WriteMode_swigregister = _tbapi.WriteMode_swigregister WriteMode_swigregister(WriteMode) class SelectionOptions(_object): """ Options for selecting data from a stream. Example: ``` so = tbapi.SelectionOptions() so._from = 0 so.to = 100000 so.useCompression = False ... ``` """ __swig_setmethods__ = {} __setattr__ = lambda self, name, value: _swig_setattr(self, SelectionOptions, name, value) __swig_getmethods__ = {} __getattr__ = lambda self, name: _swig_getattr(self, SelectionOptions, name) __repr__ = _swig_repr def __init__(self): this = _tbapi.new_SelectionOptions() try: self.this.append(this) except __builtin__.Exception: self.this = this __swig_setmethods__["_from"] = _tbapi.SelectionOptions__from_set __swig_getmethods__["_from"] = _tbapi.SelectionOptions__from_get if _newclass: _from = _swig_property(_tbapi.SelectionOptions__from_get, _tbapi.SelectionOptions__from_set) __swig_setmethods__["to"] = _tbapi.SelectionOptions_to_set __swig_getmethods__["to"] = _tbapi.SelectionOptions_to_get if _newclass: to = _swig_property(_tbapi.SelectionOptions_to_get, _tbapi.SelectionOptions_to_set) __swig_setmethods__["useCompression"] = _tbapi.SelectionOptions_useCompression_set __swig_getmethods__["useCompression"] = _tbapi.SelectionOptions_useCompression_get if _newclass: useCompression = _swig_property(_tbapi.SelectionOptions_useCompression_get, _tbapi.SelectionOptions_useCompression_set) __swig_setmethods__["live"] = _tbapi.SelectionOptions_live_set __swig_getmethods__["live"] = _tbapi.SelectionOptions_live_get if _newclass: live = _swig_property(_tbapi.SelectionOptions_live_get, _tbapi.SelectionOptions_live_set) __swig_setmethods__["reverse"] = _tbapi.SelectionOptions_reverse_set __swig_getmethods__["reverse"] = _tbapi.SelectionOptions_reverse_get if _newclass: reverse = _swig_property(_tbapi.SelectionOptions_reverse_get, _tbapi.SelectionOptions_reverse_set) __swig_setmethods__["allowLateOutOfOrder"] = _tbapi.SelectionOptions_allowLateOutOfOrder_set __swig_getmethods__["allowLateOutOfOrder"] = _tbapi.SelectionOptions_allowLateOutOfOrder_get if _newclass: allowLateOutOfOrder = _swig_property(_tbapi.SelectionOptions_allowLateOutOfOrder_get, _tbapi.SelectionOptions_allowLateOutOfOrder_set) __swig_setmethods__["realTimeNotification"] = _tbapi.SelectionOptions_realTimeNotification_set __swig_getmethods__["realTimeNotification"] = _tbapi.SelectionOptions_realTimeNotification_get if _newclass: realTimeNotification = _swig_property(_tbapi.SelectionOptions_realTimeNotification_get, _tbapi.SelectionOptions_realTimeNotification_set) __swig_setmethods__["minLatency"] = _tbapi.SelectionOptions_minLatency_set __swig_getmethods__["minLatency"] = _tbapi.SelectionOptions_minLatency_get if _newclass: minLatency = _swig_property(_tbapi.SelectionOptions_minLatency_get, _tbapi.SelectionOptions_minLatency_set) __swig_destroy__ = _tbapi.delete_SelectionOptions __del__ = lambda self: None SelectionOptions_swigregister = _tbapi.SelectionOptions_swigregister SelectionOptions_swigregister(SelectionOptions) class LoadingOptions(_object): """ Options for loading data into a stream. Example: ``` lo = tbapi.LoadingOptions() lo.writeMode = tbapi.WriteMode('TRUNCATE') so.space = 'myspace' ... ``` """ __swig_setmethods__ = {} __setattr__ = lambda self, name, value: _swig_setattr(self, LoadingOptions, name, value) __swig_getmethods__ = {} __getattr__ = lambda self, name: _swig_getattr(self, LoadingOptions, name) __repr__ = _swig_repr __swig_setmethods__["writeMode"] = _tbapi.LoadingOptions_writeMode_set __swig_getmethods__["writeMode"] = _tbapi.LoadingOptions_writeMode_get if _newclass: writeMode = _swig_property(_tbapi.LoadingOptions_writeMode_get, _tbapi.LoadingOptions_writeMode_set) __swig_setmethods__["minLatency"] = _tbapi.LoadingOptions_minLatency_set __swig_getmethods__["minLatency"] = _tbapi.LoadingOptions_minLatency_get if _newclass: minLatency = _swig_property(_tbapi.LoadingOptions_minLatency_get, _tbapi.LoadingOptions_minLatency_set) __swig_setmethods__["space"] = _tbapi.LoadingOptions_space_set __swig_getmethods__["space"] = _tbapi.LoadingOptions_space_get if _newclass: space = _swig_property(_tbapi.LoadingOptions_space_get, _tbapi.LoadingOptions_space_set) def __init__(self): this = _tbapi.new_LoadingOptions() try: self.this.append(this) except __builtin__.Exception: self.this = this __swig_destroy__ = _tbapi.delete_LoadingOptions __del__ = lambda self: None LoadingOptions_swigregister = _tbapi.LoadingOptions_swigregister LoadingOptions_swigregister(LoadingOptions) class StreamOptions(_object): """ Stream definition attributes. Example: ``` so = tbapi.StreamOptions() so.name = key so.description = key so.scope = tbapi.StreamScope('DURABLE') so.distributionFactor = 1 so.highAvailability = False so.polymorphic = False so.metadata = schema db.createStream(key, options) ``` """ __swig_setmethods__ = {} __setattr__ = lambda self, name, value: _swig_setattr(self, StreamOptions, name, value) __swig_getmethods__ = {} __getattr__ = lambda self, name: _swig_getattr(self, StreamOptions, name) __repr__ = _swig_repr def name(self, name: str = None) -> None: '''Optional user-readable name.''' if name == None: return self.__getName() else: self.__setName(name) return name def description(self, description: str = None) -> None: '''Optional multi-line description.''' if description == None: return self.__getDescription() else: self.__setDescription(description) return description def owner(self, owner: str = None) -> None: '''Optional owner of stream. During stream creation it will be set equals to authenticated user name. ''' if owner == None: return self.__getOwner() else: self.__setOwner(owner) return owner def location(self, location: str = None) -> None: '''Location of the stream (by default null). When defined this attribute provides alternative stream location (rather than default location under QuantServerHome)''' if location == None: return self.__getLocation() else: self.__setLocation(location) return location def distributionRuleName(self, distributionRuleName: str = None) -> None: '''Class name of the distribution rule''' if distributionRuleName == None: return self.__getDistributionRuleName() else: self.__setDistributionRuleName(distributionRuleName) return distributionRuleName def metadata(self, metadata: str = None) -> None: '''Stream metadata in XML format. To build metadata programatically, use tbapi.SchemaDef class.''' if metadata == None: return self.__getMetadata() else: self.__setMetadata(metadata) return metadata __swig_setmethods__["scope"] = _tbapi.StreamOptions_scope_set __swig_getmethods__["scope"] = _tbapi.StreamOptions_scope_get if _newclass: scope = _swig_property(_tbapi.StreamOptions_scope_get, _tbapi.StreamOptions_scope_set) __swig_setmethods__["distributionFactor"] = _tbapi.StreamOptions_distributionFactor_set __swig_getmethods__["distributionFactor"] = _tbapi.StreamOptions_distributionFactor_get if _newclass: distributionFactor = _swig_property(_tbapi.StreamOptions_distributionFactor_get, _tbapi.StreamOptions_distributionFactor_set) __swig_setmethods__["duplicatesAllowed"] = _tbapi.StreamOptions_duplicatesAllowed_set __swig_getmethods__["duplicatesAllowed"] = _tbapi.StreamOptions_duplicatesAllowed_get if _newclass: duplicatesAllowed = _swig_property(_tbapi.StreamOptions_duplicatesAllowed_get, _tbapi.StreamOptions_duplicatesAllowed_set) __swig_setmethods__["highAvailability"] = _tbapi.StreamOptions_highAvailability_set __swig_getmethods__["highAvailability"] = _tbapi.StreamOptions_highAvailability_get if _newclass: highAvailability = _swig_property(_tbapi.StreamOptions_highAvailability_get, _tbapi.StreamOptions_highAvailability_set) __swig_setmethods__["unique"] = _tbapi.StreamOptions_unique_set __swig_getmethods__["unique"] = _tbapi.StreamOptions_unique_get if _newclass: unique = _swig_property(_tbapi.StreamOptions_unique_get, _tbapi.StreamOptions_unique_set) __swig_setmethods__["polymorphic"] = _tbapi.StreamOptions_polymorphic_set __swig_getmethods__["polymorphic"] = _tbapi.StreamOptions_polymorphic_get if _newclass: polymorphic = _swig_property(_tbapi.StreamOptions_polymorphic_get, _tbapi.StreamOptions_polymorphic_set) __swig_setmethods__["periodicity"] = _tbapi.StreamOptions_periodicity_set __swig_getmethods__["periodicity"] = _tbapi.StreamOptions_periodicity_get if _newclass: periodicity = _swig_property(_tbapi.StreamOptions_periodicity_get, _tbapi.StreamOptions_periodicity_set) def __eq__(self, value): return _tbapi.StreamOptions___eq__(self, value) def __init__(self): this = _tbapi.new_StreamOptions() try: self.this.append(this) except __builtin__.Exception: self.this = this def __getName(self): return _tbapi.StreamOptions___getName(self) def __setName(self, name): return _tbapi.StreamOptions___setName(self, name) def __getDescription(self): return _tbapi.StreamOptions___getDescription(self) def __setDescription(self, description): return _tbapi.StreamOptions___setDescription(self, description) def __getOwner(self): return _tbapi.StreamOptions___getOwner(self) def __setOwner(self, owner): return _tbapi.StreamOptions___setOwner(self, owner) def __getLocation(self): return _tbapi.StreamOptions___getLocation(self) def __setLocation(self, location): return _tbapi.StreamOptions___setLocation(self, location) def __getDistributionRuleName(self): return _tbapi.StreamOptions___getDistributionRuleName(self) def __setDistributionRuleName(self, distributionRuleName): return _tbapi.StreamOptions___setDistributionRuleName(self, distributionRuleName) def __getMetadata(self): return _tbapi.StreamOptions___getMetadata(self) def __setMetadata(self, metadata): return _tbapi.StreamOptions___setMetadata(self, metadata) __swig_destroy__ = _tbapi.delete_StreamOptions __del__ = lambda self: None StreamOptions_swigregister = _tbapi.StreamOptions_swigregister StreamOptions_swigregister(StreamOptions) class QueryParameter(_object): """Input parameter definition for a prepared statement.""" __swig_setmethods__ = {} __setattr__ = lambda self, name, value: _swig_setattr(self, QueryParameter, name, value) __swig_getmethods__ = {} __getattr__ = lambda self, name: _swig_getattr(self, QueryParameter, name) __repr__ = _swig_repr __swig_setmethods__["name"] = _tbapi.QueryParameter_name_set __swig_getmethods__["name"] = _tbapi.QueryParameter_name_get if _newclass: name = _swig_property(_tbapi.QueryParameter_name_get, _tbapi.QueryParameter_name_set) __swig_setmethods__["type"] = _tbapi.QueryParameter_type_set __swig_getmethods__["type"] = _tbapi.QueryParameter_type_get if _newclass: type = _swig_property(_tbapi.QueryParameter_type_get, _tbapi.QueryParameter_type_set) def __init__(self, *args): this = _tbapi.new_QueryParameter(*args) try: self.this.append(this) except __builtin__.Exception: self.this = this def value(self, *args): return _tbapi.QueryParameter_value(self, *args) __swig_destroy__ = _tbapi.delete_QueryParameter __del__ = lambda self: None QueryParameter_swigregister = _tbapi.QueryParameter_swigregister QueryParameter_swigregister(QueryParameter) class TickDb(_object): """ The top-level implementation to the methods of the Deltix Tick Database engine. Instances of this class are created by static method createFromUrl: ``` db = tbapi.TickDb.createFromUrl('dxtick://localhost:8011') ``` or ``` db = tbapi.TickDb.createFromUrl('dxtick://localhost:8011', 'user', 'password') ``` """ __swig_setmethods__ = {} __setattr__ = lambda self, name, value: _swig_setattr(self, TickDb, name, value) __swig_getmethods__ = {} __getattr__ = lambda self, name: _swig_getattr(self, TickDb, name) def __init__(self, *args, **kwargs): raise AttributeError("No constructor defined") __repr__ = _swig_repr @staticmethod def createFromUrl(url: str, user: str = None, password: str = None) -> "TickDb": '''Creates a new database instance with the specified root folder, or URL. Args: url (str): Connection URL. user (str): User. password (str): Password. Returns: TickDb: An un-opened TickDB instance. ''' if user == None: return _tbapi.TickDb___createFromUrl(url) else: return _tbapi.TickDb___createFromUrlWithUser(url, user, password) @staticmethod @contextmanager def openFromUrl(url: str, readonly: bool, user: str = None, password: str = None): '''Creates a new database instance with the specified root folder, or URL, and opens it. Args: url (str): Connection URL. readonly (bool): Open data store in read-only mode. user (str): User. password (str): Password. Returns: TickDb: An opened TickDB instance. ''' db = TickDb.createFromUrl(url, user, password) try: db.open(readonly) yield db finally: if db.isOpen(): db.close() def isReadOnly(self) -> bool: '''Determines whether the store is open as read-only.''' return self.__isReadOnly() def isOpen(self) -> bool: '''Determines whether the store is open.''' return self.__isOpen() def open(self, readOnlyMode: bool) -> bool: '''Open the data store. Args: readOnlyMode (bool): Open data store in read-only mode. ''' return self.__open(readOnlyMode) def close(self) -> None: '''Closes data store.''' return self.__close() def format(self) -> bool: '''Create a new object on disk and format internally. The data store is left open for read-write at the end of this method. ''' return self.__format() def listStreams(self) -> 'list[TickStream]': '''Enumerates existing streams. Returns: list[TickStream]: An array of existing stream objects. ''' return self.__listStreams() def getStream(self, key: str) -> 'TickStream': '''Looks up an existing stream by key. Args: key (str): Identifies the stream. Returns: TickStream: A stream object, or None if the key was not found. ''' return self.__getStream(key) def createStream(self, key: str, options: StreamOptions) -> 'TickStream': '''Creates a new stream within the database. Args: key (str): A required key later used to identify the stream. options (StreamOptions): Options for creating the stream. Returns: TickStream: A new instance of TickStream. ''' return self.__createStream(key, options) def createFileStream(self, key: str, dataFile: str) -> 'TickStream': '''Creates a new stream mount to the given data file. Args: key (str): A required key later used to identify the stream. dataFile (str): Path to the data file (on server side). Returns: TickStream: A new instance of TickStream. ''' return self.__createFileStream(key) def createCursor(self, stream: 'TickStream', options: SelectionOptions) -> 'TickCursor': '''Opens an initially empty cursor for reading data from multiple streams, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact timestamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic. The cursor is returned initially empty and must be reset. The TickCursor class provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp. Args: stream (TickStream): Stream from which data will be selected. options (SelectionOptions): Selection options. Returns: TickCursor: A cursor used to read messages. ''' return self.__createCursor(stream, options) @contextmanager def tryCursor(self, stream: 'TickStream', options: SelectionOptions) -> 'TickCursor': '''contextmanager version of createCursor. Usage: ``` with db.newCursor(stream, options) as cursor: while cursor.next(): message = cursor.getMessage() ``` ''' cursor = None try: cursor = self.__createCursor(stream, options) yield cursor finally: cursor.close() def select(self, timestamp: int, streams: 'list[TickStream]', options: SelectionOptions, types: 'list[str]', entities: 'list[str]') -> 'TickCursor': '''Opens a cursor for reading data from multiple streams, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic. Note that the arguments of this method only determine the initial configuration of the cursor. The TickCursor clsas provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp. Args: timestamp (int): The start timestamp in millis. streams (list[TickStream]): Streams from which data will be selected. options (SelectionOptions): Selection options. types (list[str]): Specified message types to be subscribed. If null, then all types will be subscribed. entities (list[str]): Specified entities to be subscribed. If null, then all entities will be subscribed. Returns: TickCursor: A cursor used to read messages. ''' return self.__select(timestamp, streams, options, types, entities) @contextmanager def trySelect(self, timestamp: int, streams: 'list[TickStream]', options: SelectionOptions, types: 'list[str]', entities: 'list[str]') -> 'TickCursor': '''Contextmanager version of select. Usage: ``` with db.newSelect(timestamp, streams, options, types, entities) as cursor: while cursor.next(): message = cursor.getMessage() ``` ''' cursor = None try: cursor = self.__select(timestamp, streams, options, types, entities) yield cursor finally: cursor.close() def createLoader(self, stream: 'TickStream', options: LoadingOptions) -> 'TickLoader': '''Creates a channel for loading data. The loader must be closed when the loading process is finished. Args: stream (TickStream): stream for loading data. options (SelectionOptions): Loading Options. Returns: TickLoader: created loader. ''' return self.__createLoader(stream, options) @contextmanager def tryLoader(self, stream: 'TickStream', options: LoadingOptions) -> 'TickLoader': '''Contextmanager version of createLoader. Usage: with db.newLoader(stream, options) as loader: loader.send(message) ''' loader = None try: loader = self.__createLoader(stream, options) yield loader finally: loader.close() def executeQuery(self, query: str, options: SelectionOptions = None, timestamp: int = JAVA_LONG_MIN_VALUE, entities: 'list[str]' = None, params: 'list[QueryParameter]' = []) -> 'TickCursor': '''Execute Query and creates a message source for reading data from it, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic. Args: query (str): Query text element. options (SelectionOptions): Selection options. timestamp (int): The start timestamp in millis. entities (list[str]): Specified entities to be subscribed. If null, then all entities will be subscribed. params (list[QueryParameter]): The parameter values of the query. Returns: TickCursor: An iterable message source to read messages. ''' if options == None: return self.__executeQuery(query) else: return self.__executeQueryFull(query, options, timestamp, entities, params); @contextmanager def tryExecuteQuery(self, query: str, options: SelectionOptions = None, timestamp: int = JAVA_LONG_MIN_VALUE, entities: 'list[str]' = None, params: 'list[QueryParameter]' = []) -> 'TickCursor': '''Contextmanager version of executeQuery. Usage: ``` with db.newExecuteQuery('select * from stream') as cursor: while cursor.next(): message = cursor.getMessage() ``` ''' cursor = None try: if options == None: cursor = self.__executeQuery(query) else: cursor = self.__executeQueryFull(query, options, timestamp, entities, params); yield cursor finally: cursor.close() if _newclass: __createFromUrl = staticmethod(_tbapi.TickDb___createFromUrl) else: __createFromUrl = _tbapi.TickDb___createFromUrl if _newclass: __createFromUrlWithUser = staticmethod(_tbapi.TickDb___createFromUrlWithUser) else: __createFromUrlWithUser = _tbapi.TickDb___createFromUrlWithUser def __isReadOnly(self): return _tbapi.TickDb___isReadOnly(self) def __isOpen(self): return _tbapi.TickDb___isOpen(self) def __open(self, readOnlyMode): return _tbapi.TickDb___open(self, readOnlyMode) def __close(self): return _tbapi.TickDb___close(self) def __format(self): return _tbapi.TickDb___format(self) def __listStreams(self): return _tbapi.TickDb___listStreams(self) def __getStream(self, key): return _tbapi.TickDb___getStream(self, key) def __createStream(self, key, options): return _tbapi.TickDb___createStream(self, key, options) def __createFileStream(self, key, dataFile): return _tbapi.TickDb___createFileStream(self, key, dataFile) def __createCursor(self, stream, options): return _tbapi.TickDb___createCursor(self, stream, options) def __select(self, time, streams, options, types, entities): return _tbapi.TickDb___select(self, time, streams, options, types, entities) def __createLoader(self, stream, options): return _tbapi.TickDb___createLoader(self, stream, options) def __executeQueryFull(self, qql, options, time, instruments, params): return _tbapi.TickDb___executeQueryFull(self, qql, options, time, instruments, params) __swig_destroy__ = _tbapi.delete_TickDb __del__ = lambda self: None def __executeQuery(self, qql): return _tbapi.TickDb___executeQuery(self, qql) TickDb_swigregister = _tbapi.TickDb_swigregister TickDb_swigregister(TickDb) def TickDb___createFromUrl(url): return _tbapi.TickDb___createFromUrl(url) TickDb___createFromUrl = _tbapi.TickDb___createFromUrl def TickDb___createFromUrlWithUser(url, username, password): return _tbapi.TickDb___createFromUrlWithUser(url, username, password) TickDb___createFromUrlWithUser = _tbapi.TickDb___createFromUrlWithUser class TickStream(_object): """ The stream is a time series of messages for a number of financial instruments ('entities'). Messages can be price bars, trade ticks, bid/offer ticks, or any of the many more built-in and user-defined types. In the simplest case, a database will have a single stream of data. Multiple streams can be used to represent data of different frequencies, or completely different factors. For instance, separate streams can represent 1-minute price bars and ticks for the same set of entities. Or, you can have price bars and volatility bars in separate streams. Get stream: ``` stream = tickdb.getStream('stream_key') ``` List stream: ``` streams = tickdb.listStreams() ``` """ __swig_setmethods__ = {} __setattr__ = lambda self, name, value: _swig_setattr(self, TickStream, name, value) __swig_getmethods__ = {} __getattr__ = lambda self, name: _swig_getattr(self, TickStream, name) def __init__(self, *args, **kwargs): raise AttributeError("No constructor defined") __repr__ = _swig_repr def key(self) -> str: '''Returns the key, which uniquely identifies the stream within its database.''' return self.__key() def name(self) -> str: '''Returns a user-readable short name.''' return self.__name() def distributionFactor(self) -> int: '''Returns the target number of files to be used for storing data.''' return self.__distributionFactor() def description(self) -> str: '''Returns a user-readable multi-line description.''' return self.__description() def owner(self) -> str: '''Returns stream owner.''' return self.__owner() def location(self) -> str: '''Returns stream location.''' return self.__location() def metadata(self) -> str: '''Returns stream schema (in xml format).''' return self.__metadata() def scope(self) -> StreamScope: '''Returns stream schema (in xml format).''' return self.__scope() def highAvailability(self) -> bool: '''Returns stream memory caching parameter. High availability durable streams are cached on startup.''' return self.__highAvailability() def unique(self) -> bool: '''Unique streams maintain in-memory cache of resent messages. This concept assumes that stream messages will have some field(s) marked as primary key. Primary key may be a simple field (e.g. symbol) or composite (e.g. symbol and portfolio ID). For each key TimeBase runtime maintains a copy of the last message received for this key (cache). Each new consumer will receive a snapshot of current cache at the beginning of live data subscription. ''' return self.__unique() def polymorphic(self) -> bool: '''Returns whether the stream is configured as polymorphic.''' return self.__polymorphic() def periodicity(self) -> str: '''Returns Stream periodicity, if known.''' return self.__periodicity() def options(self) -> StreamOptions: '''Returns stream options object.''' return self.__options() def describe(self) -> str: '''Returns stream DDL description.''' return self.__describe() def setSchema(self, options: StreamOptions) -> bool: '''Changes stream schema. Args: options (StreamOptions): Stream options, that contains new schema xml. Returns bool: True, if schema was changed successfully. ''' return self.__setSchema(options) def listEntities(self) -> 'list[str]': '''Return an inclusive range of times for which the specified entities have data in the database. Returns: list[str]: selected entities. ''' return self.__listEntities() def truncate(self, timestamp: int, entities: 'list[str]' = None) -> bool: '''Truncates stream data for the given entities from given time Args: timestamp (int): Timestamp in millis. If time less than stream start time, then all stream data will be deleted. entities (list[str]): A list of entities. If None, all stream entities will be used. Returns: bool: true, if stream was truncated successfully. ''' if entities == None: return self.__truncate(timestamp) else: return self.__truncateEntities(timestamp, entities) def clear(self, entities: 'list[str]' = None) -> bool: '''Clear stream data for the given entities. Args: entities (list[str]): A list of entities. If None, all stream entities will be used. ''' if entities == None: return self.__clear() else: return self.__clearEntities(entities) def purge(self, timestamp: int) -> bool: '''Deletes stream data that is older than a specified time Args: timestamp (int):Purge time in milliseconds. Returns: bool: true, if stream was purged successfully. ''' return self.__purge(timestamp) def deleteStream(self) -> bool: '''Deletes this stream Returns: bool: true, if stream was deleted successfully. ''' return self.__deleteStream() def abortBackgroundProcess(self) -> bool: '''Aborts active background process if any exists''' return self.__abortBackgroundProcess() def select(self, timestamp: int, options: SelectionOptions, types: 'list[str]', entities: 'list[str]') -> 'TickCursor': '''Opens a cursor for reading data from this stream, according to the specified options. The messages are returned from the cursor strictly ordered by time. Within the same exact time stamp, the order of messages is undefined and may vary from call to call, i.e. it is non-deterministic. Note that the arguments of this method only determine the initial configuration of the cursor. The TickCursor interface provides methods for dynamically re-configuring the subscription, or jumping to a different timestamp. Args: timestamp (int): The start timestamp in millis. options (SelectionOptions): Selection options. types (list[str]): Specified message types to be subscribed. If null, then all types will be subscribed. entities (list[str]): Specified entities to be subscribed. If null, then all entities will be subscribed. Returns: TickCursor: A cursor used to read messages. ''' return self.__select(timestamp, options, types, entities) @contextmanager def trySelect(self, timestamp: int, options: SelectionOptions, types: 'list[str]', entities: 'list[str]') -> 'TickCursor': '''Contextmanager version of select. Usage: ``` with stream.newSelect(timestamp, options, types, entities) as cursor: while cursor.next(): message = cursor.getMessage() ``` ''' cursor = None try: cursor = self.__select(timestamp, options, types, entities) yield cursor finally: cursor.close() def createCursor(self, options: SelectionOptions) -> 'TickCursor': '''Creates a cursor for reading data from this stream, according to the specified options, but initially with a fully restricted filter. The user must call TickCursor.reset at least once, in order to begin retrieving data. This method is equivalent to (but is slightly more optimal than) calling createCursor(options) Args: options (SelectionOptions): Selection Options. Returns: A cursor used to read messages. Never null. ''' return self.__createCursor(options) @contextmanager def tryCursor(self, options: SelectionOptions) -> 'TickCursor': '''contextmanager version of createCursor. Usage: ``` with stream.newCursor(options) as cursor: while cursor.next(): message = cursor.getMessage() ``` ''' cursor = None try: cursor = self.__createCursor(options) yield cursor finally: cursor.close() def createLoader(self, options: LoadingOptions) -> 'TickLoader': '''Creates a channel for loading data. The loader must be closed when the loading process is finished. Args: options (SelectionOptions): Loading Options. Returns: TickLoader: created loader. ''' return self.__createLoader(options) @contextmanager def tryLoader(self, options: LoadingOptions) -> 'TickLoader': '''Contextmanager version of createLoader. Usage: ``` with stream.newLoader(options) as loader: loader.send(message) ``` ''' loader = None try: loader = self.__createLoader(options) yield loader finally: loader.close() def listSpaces(self) -> 'list[str]': '''Returns all created "spaces" for the stream. Default space returns as "" (empty string). If backing stream does not support spaces None will be returned. ''' return self.__listSpaces() def renameSpace(self, newName: str, oldName: str) -> None: '''Rename existing space. Args: nameName (str): space to rename. oldName (str): new space name. ''' return self.__renameSpace(newName, oldName) def deleteSpaces(self, spaces: 'list[str]') -> None: '''Removed given 'spaces' permanently. Args: spaces (list[str]): list of spaces names to delete. ''' return self.__deleteSpaces(spaces) def getTimeRange(self, entities: 'list[str]' = None) -> 'list[int]': '''Return an inclusive range of times for which the specified entities have data in the database. Args: entities (list[str]): A list of entities. If empty, return for all. ''' if entities == None: return self.__getTimeRange() else: return self.__getTimeRangeEntities(entities) def getSpaceTimeRange(self, space: str) -> 'list[int]': '''An array consisting of two long timestamps (from and to) or None if no data was found. Args: space (str): space name. ''' return self.__getSpaceTimeRange(space) def __key(self): return _tbapi.TickStream___key(self) def __distributionFactor(self): return _tbapi.TickStream___distributionFactor(self) def __name(self): return _tbapi.TickStream___name(self) def __description(self): return _tbapi.TickStream___description(self) def __owner(self): return _tbapi.TickStream___owner(self) def __location(self): return _tbapi.TickStream___location(self) def __metadata(self): return _tbapi.TickStream___metadata(self) def __scope(self): return _tbapi.TickStream___scope(self) def __highAvailability(self): return _tbapi.TickStream___highAvailability(self) def __unique(self): return _tbapi.TickStream___unique(self) def __polymorphic(self): return _tbapi.TickStream___polymorphic(self) def __periodicity(self): return _tbapi.TickStream___periodicity(self) def __options(self): return _tbapi.TickStream___options(self) def __describe(self): return _tbapi.TickStream___describe(self) def __setSchema(self, options): return _tbapi.TickStream___setSchema(self, options) def __listEntities(self): return _tbapi.TickStream___listEntities(self) def __truncate(self, millisecondTime): return _tbapi.TickStream___truncate(self, millisecondTime) def __truncateEntities(self, millisecondTime, entities): return _tbapi.TickStream___truncateEntities(self, millisecondTime, entities) def __clear(self): return _tbapi.TickStream___clear(self) def __clearEntities(self, entities): return _tbapi.TickStream___clearEntities(self, entities) def __purge(self, millisecondTime): return _tbapi.TickStream___purge(self, millisecondTime) def __deleteStream(self): return _tbapi.TickStream___deleteStream(self) def __abortBackgroundProcess(self): return _tbapi.TickStream___abortBackgroundProcess(self) def __select(self, millisecondTime, options, types, entities): return _tbapi.TickStream___select(self, millisecondTime, options, types, entities) def __createCursor(self, options): return _tbapi.TickStream___createCursor(self, options) def __createLoader(self, options): return _tbapi.TickStream___createLoader(self, options) def __listSpaces(self): return _tbapi.TickStream___listSpaces(self) def __renameSpace(self, newName, oldName): return _tbapi.TickStream___renameSpace(self, newName, oldName) def __deleteSpaces(self, spaces): return _tbapi.TickStream___deleteSpaces(self, spaces) __swig_destroy__ = _tbapi.delete_TickStream __del__ = lambda self: None def __getTimeRange(self): return _tbapi.TickStream___getTimeRange(self) def __getTimeRangeEntities(self, entities): return _tbapi.TickStream___getTimeRangeEntities(self, entities) def __getSpaceTimeRange(self, space): return _tbapi.TickStream___getSpaceTimeRange(self, space) TickStream_swigregister = _tbapi.TickStream_swigregister TickStream_swigregister(TickStream) OK = _tbapi.OK END_OF_CURSOR = _tbapi.END_OF_CURSOR UNAVAILABLE = _tbapi.UNAVAILABLE class TickCursor(_object): """ A cursor (also known as iterator, or result set) for reading data from a stream. This class provides methods for dynamically reconfiguring the feed, as well as method reset for essentially re-opening the cursor on a completely different timestamp. To get a cursor, use select method from TickDb or TickStream objects, or call executeQuery to open cursor to QQL result set. Also cursor can be created with createCursor method, but it will be not initialized cursor, so cursor should be configured with types, entities and read time calling reset: ``` options = tbapi.SelectionOptions() cursor = tickdb.createCursor(stream, options) cursor.subscribeToAllEntities() cursor.subscribeToAllTypes() cursor.reset(timestamp) ``` """ __swig_setmethods__ = {} __setattr__ = lambda self, name, value: _swig_setattr(self, TickCursor, name, value) __swig_getmethods__ = {} __getattr__ = lambda self, name: _swig_getattr(self, TickCursor, name) def __init__(self, *args, **kwargs): raise AttributeError("No constructor defined") __repr__ = _swig_repr __swig_destroy__ = _tbapi.delete_TickCursor __del__ = lambda self: None def next(self) -> bool: '''Moves cursor on to the next message. This method blocks until the next message becomes available, or until the cursor is determined to be at the end of the sequence. This method is illegal to call if isAtEnd() returns true. Returns: bool: false if at the end of the cursor. ''' return self.__next() def getMessage(self) -> 'InstrumentMessage': '''Returns an InstrumentMessage object cursor points at.''' return self.__getMessage() def isAtEnd(self) -> bool: '''Returns true if the last call to next() returned false. Returns false if next() has not been called yet. This method is legal to call any number of times at any point in the cursor's lifecycle. ''' return self.__isAtEnd() def nextIfAvailable(self) -> int: '''Moves cursor on to the next message, but this method NOT blocks until the next message becomes available. Returns: NextResult: OK (0) if new message is available, END_OF_CURSOR (1) if cursor was closed, otherwise, UNAVAILABLE (2) ''' return self.__nextIfAvailable() def isClosed(self) -> bool: '''Returns true, if cursor was closed''' return self.__isClosed() def close(self) -> None: '''Close the cursor''' return self.__close() def getCurrentStreamKey(self) -> str: '''Return the key of the stream that is the source of the current message.''' return self.__getCurrentStreamKey() def reset(self, timestamp: int, entities: 'list[str]' = None) -> None: '''Reposition the message source to a new point in time, while preserving current subscription. Args: timestamp (int): The new position in time in millis. entities ('list[str]'): list of entities to reset ''' if entities == None: return self.__reset(timestamp) else: return self.__resetEntities(timestamp, entities) def subscribeToAllEntities(self) -> None: '''Subscribe to all available entities.''' return self.__subscribeToAllEntities() def clearAllEntities(self) -> None: '''Switch to selective subscription mode (if necessary) and clear the list.''' return self.__clearAllEntities() def addEntity(self, entity: str) -> None: '''Add the specified entity to subscription. The type and symbol are copied from the incoming object, if necessary, so the argument can be re-used after the call. Special note about options: The following fragment will subscribe to specific option contract "DAV 100417P00085000": cursor.addEntity('DAV 100417P00085000'); While the following will subscribe to option root (and you will get all instruments with this root): cursor.addEntity("DAV "); ''' return self.__addEntity(entity) def addEntities(self, entities: 'list[str]') -> None: '''Bulk add the specified entities to subscription. The type and symbol are copied from the incoming objects, if necessary, so the arguments can be re-used after the call. ''' return self.__addEntities(entities) def removeEntities(self, entities: 'list[str]') -> None: '''Remove the specified entities from subscription. The type and symbol are copied from the incoming objects, if necessary, so the arguments can be re-used after the call. ''' return self.__removeEntities(entities) def removeEntity(self, entity: str) -> None: '''Remove the specified entity from subscription. The type and symbol are copied from the incoming object, if necessary, so the argument can be re-used after the call. ''' return self.__removeEntity(entity) def subscribeToAllTypes(self) -> None: '''Subscribe to all available types (no filtering).''' return self.__subscribeToAllTypes() def addTypes(self, types: 'list[str]') -> None: '''Add the specified type names to subscription.''' return self.__addTypes(types) def removeTypes(self, types: 'list[str]') -> None: '''Remove the specified types from subscription.''' return self.__removeTypes(types) def setTypes(self, types: 'list[str]') -> None: '''Subscribe to specified types.''' return self.__setTypes(types) def add(self, types: 'list[str]', entities: 'list[str]') -> None: '''Add the specified entities and types to subscription. The type and symbol are copied from the incoming object, if necessary, so the argument can be re-used after the call. Args: types (list[str]): not-null array of type names to subscribe. entities (list[str]): not-null array of instruments to subscribe. ''' return self.__add(types, entities) def remove(self, types: 'list[str]', entities: 'list[str]') -> None: '''Remove the specified entities and types from subscription. The type and symbol are copied from the incoming objects, if necessary, so the arguments can be re-used after the call. Args: types (list[str]): not-null array of type names to unsubscribe. entities (list[str]): not-null array of instruments to unsubscribe. ''' return self.__remove(types, entities) def addStreams(self, streams: 'list[TickStream]') -> None: '''Add streams to subscription. Current time and filter is used to query data from new sources. Args: streams ('list[TickStream]'): Streams to add. ''' return self.__addStreams(streams) def removeStreams(self, streams: 'list[TickStream]') -> None: '''Remove streams from subscription. Args: streams (list[TickStream]): Streams to remove. ''' return self.__removeStreams(streams) def removeAllStreams(self) -> None: '''Remove all streams from subscription.''' return self.__removeAllStreams() def setTimeForNewSubscriptions(self, timestamp: int) -> None: '''This method affects subsequent "add subscription" methods, such as, for instance, addEntity(). New subscriptions start at the specified time. Args: timestamp (int): The time to use. ''' return self.__setTimeForNewSubscriptions(timestamp) def __next(self): return _tbapi.TickCursor___next(self) def __nextIfAvailable(self): return _tbapi.TickCursor___nextIfAvailable(self) def __getMessage(self): return _tbapi.TickCursor___getMessage(self) def __isAtEnd(self): return _tbapi.TickCursor___isAtEnd(self) def __isClosed(self): return _tbapi.TickCursor___isClosed(self) def __close(self): return _tbapi.TickCursor___close(self) def __getCurrentStreamKey(self): return _tbapi.TickCursor___getCurrentStreamKey(self) def __reset(self, dt): return _tbapi.TickCursor___reset(self, dt) def __resetEntities(self, dt, entities): return _tbapi.TickCursor___resetEntities(self, dt, entities) def __subscribeToAllEntities(self): return _tbapi.TickCursor___subscribeToAllEntities(self) def __clearAllEntities(self): return _tbapi.TickCursor___clearAllEntities(self) def __addEntities(self, entities): return _tbapi.TickCursor___addEntities(self, entities) def __addEntity(self, entity): return _tbapi.TickCursor___addEntity(self, entity) def __removeEntities(self, entities): return _tbapi.TickCursor___removeEntities(self, entities) def __removeEntity(self, entity): return _tbapi.TickCursor___removeEntity(self, entity) def __subscribeToAllTypes(self): return _tbapi.TickCursor___subscribeToAllTypes(self) def __addTypes(self, types): return _tbapi.TickCursor___addTypes(self, types) def __removeTypes(self, types): return _tbapi.TickCursor___removeTypes(self, types) def __setTypes(self, types): return _tbapi.TickCursor___setTypes(self, types) def __add(self, types, entities): return _tbapi.TickCursor___add(self, types, entities) def __remove(self, types, entities): return _tbapi.TickCursor___remove(self, types, entities) def __addStreams(self, streams): return _tbapi.TickCursor___addStreams(self, streams) def __removeStreams(self, streams): return _tbapi.TickCursor___removeStreams(self, streams) def __removeAllStreams(self): return _tbapi.TickCursor___removeAllStreams(self) def __setTimeForNewSubscriptions(self, dt): return _tbapi.TickCursor___setTimeForNewSubscriptions(self, dt) TickCursor_swigregister = _tbapi.TickCursor_swigregister TickCursor_swigregister(TickCursor) class TickLoader(_object): """ Object which consumes messages. Create loader from TickDb: options = tbapi.LoadingOptions() stream = tickdb.createLoader(stream, options) Create loader from TickStream: options = tbapi.LoadingOptions() stream = stream.createLoader(options) """ __swig_setmethods__ = {} __setattr__ = lambda self, name, value: _swig_setattr(self, TickLoader, name, value) __swig_getmethods__ = {} __getattr__ = lambda self, name: _swig_getattr(self, TickLoader, name) __repr__ = _swig_repr def __init__(self, loader): this = _tbapi.new_TickLoader(loader) try: self.this.append(this) except __builtin__.Exception: self.this = this __swig_destroy__ = _tbapi.delete_TickLoader __del__ = lambda self: None def send(self, message: InstrumentMessage) -> None: '''This method is invoked to send a message to the object. Args: message (InstrumentMessage): A temporary buffer with the message. By convention, the message is only valid for the duration of this call. ''' return self.__send(message) def flush(self) -> None: '''Flushes all buffered messages by sending them to server. Note that calling 'send' method not guaranty that all messages will be delivered and stored to server. ''' return self.__flush() def close(self) -> None: '''Flushes and closes the loader''' return self.__close() def addListener(self, listener: 'ErrorListener') -> None: '''Register error listener. All writing data errors will be delivered to the listener. Args: listener (ErrorListener): error listener to register. ''' return self.__addListener(listener) def removeListener(self, listener: 'ErrorListener') -> None: '''Unsubscribe registered error listener. Args: listener (ErrorListener): error listener to unsubscribe. ''' return self.__removeListener(listener) def nErrorListeners(self) -> int: '''Returns number of registered error listeners''' return self.__nErrorListeners() def registerType(self, type: str) -> int: '''Register type of sending message to get type id. For performance reasons, you could specify type id instead of type name, for example: ``` message = tbapi.InstrumentMessage() message.typeId = loader.registerType("deltix.timebase.api.messages.universal.PackageHeader") // as alternative, you could write: // message.typeName = "deltix.timebase.api.messages.universal.PackageHeader" loader.send(message) ``` Args: type (str): name of type to register. Returns: int: id of registered type. ''' return self.__registerType(type) def registerInstrument(self, symbol: str) -> int: '''Register instrument of sending message to get instrument id. For performance reasons, you could specify instrument id instead of symbol and instrument type, for example: ``` message = tbapi.InstrumentMessage() message.instrumentId = loader.registerInstrument('AAPL') // as alternative, you could write: // message.symbol = 'AAPL' loader.send(message) ``` Args: symbol (str): instrument ticker. Returns: int: id of registered instrument. ''' return self.__registerInstrument(symbol) def __registerType(self, type_name): return _tbapi.TickLoader___registerType(self, type_name) def __registerInstrument(self, instrument): return _tbapi.TickLoader___registerInstrument(self, instrument) def __send(self, message): return _tbapi.TickLoader___send(self, message) def __flush(self): return _tbapi.TickLoader___flush(self) def __close(self): return _tbapi.TickLoader___close(self) def __addListener(self, listener): return _tbapi.TickLoader___addListener(self, listener) def __removeListener(self, listener): return _tbapi.TickLoader___removeListener(self, listener) def __nErrorListeners(self): return _tbapi.TickLoader___nErrorListeners(self) TickLoader_swigregister = _tbapi.TickLoader_swigregister TickLoader_swigregister(TickLoader) class ErrorListener(_object): __swig_setmethods__ = {} __setattr__ = lambda self, name, value: _swig_setattr(self, ErrorListener, name, value) __swig_getmethods__ = {} __getattr__ = lambda self, name: _swig_getattr(self, ErrorListener, name) __repr__ = _swig_repr def onError(self, errorClass: str, errorMsg: str) -> None: return self.__onError() def __onError(self, errorClass, errorMsg): return _tbapi.ErrorListener___onError(self, errorClass, errorMsg) __swig_destroy__ = _tbapi.delete_ErrorListener __del__ = lambda self: None def __init__(self): if self.__class__ == ErrorListener: _self = None else: _self = self this = _tbapi.new_ErrorListener(_self, ) try: self.this.append(this) except __builtin__.Exception: self.this = this def __disown__(self): self.this.disown() _tbapi.disown_ErrorListener(self) return weakref_proxy(self) ErrorListener_swigregister = _tbapi.ErrorListener_swigregister ErrorListener_swigregister(ErrorListener) from collections import defaultdict def stream_to_dict(db, stream, fields=None, ts_from=0, ts_to=JAVA_LONG_MAX_VALUE): if ts_to > JAVA_LONG_MAX_VALUE: ts_to = JAVA_LONG_MAX_VALUE if not db.isOpen(): raise Exception('Database is not opened.') options = SelectionOptions() options.to = ts_to messages = [] table = defaultdict(list) with open_TickCursor(stream, ts_from, options) as cursor: counter = 0 while cursor.next(): message = vars(cursor.getMessage()) messages.append(message) if fields is None: def to_write(x): return True else: def to_write(x): return x in fields for key in table.keys(): if key in message: table[key].append(message[key]) del message[key] else: table[key].append(None) for key in message: if to_write(key): table[key] = [None] * counter table[key].append(message[key]) counter += 1 return table import xml.etree.ElementTree as ETree import re class ClassType: ENUM_CLASS = 'enumClass' RECORD_CLASS = 'recordClass' class FieldType: NONSTATIC = 'nonStaticDataField' STATIC = 'staticDataField' class SchemaDef: def __init__(self, types, all_types): self.types = types self.all_types = all_types class ClassDescDef: def __init__(self, name): self.name = name class TypeDef(ClassDescDef): def __init__(self, name, fields): ClassDescDef.__init__(self, name) self.fields = fields class EnumDef(ClassDescDef): def __init__(self, name, values): ClassDescDef.__init__(self, name) self.values = values class FieldDef: def __init__(self, name, data_type, nullable, field_type): self.name = name self.data_type = data_type self.nullable = nullable self.field_type = field_type class SchemaParser: @staticmethod def parse_Schema(stream): xml = stream.metadata() tree = ETree.fromstring(xml) return SchemaParser.parse_classSet(tree) @staticmethod def parse_value(value, prefix): return value.find('prefix:symbol', prefix).text @staticmethod def parse_field(field, prefix): name = field.find('prefix:name', prefix) field_type = SchemaParser.rm_brackets(list(field.attrib.values())[0]) if field_type != FieldType.STATIC and field_type != FieldType.NONSTATIC: raise Exception('Unknown field type {}'.format(field_type)) data_type = field.find('prefix:type', prefix) nullable = bool(data_type.find('prefix:nullable', prefix).text) data_type = list(data_type.attrib.values())[0] return FieldDef(name.text, data_type, nullable, field_type) @staticmethod def parse_classDescriptor(class_desc, prefix): name = class_desc.find('prefix:name', prefix) type_ = list(class_desc.attrib.values())[0] if type_ == ClassType.RECORD_CLASS: fields = [SchemaParser.parse_field(field, prefix) for field in class_desc.findall('prefix:field', prefix)] return TypeDef(name.text, fields) elif type_ == ClassType.ENUM_CLASS: values = [SchemaParser.parse_value(value, prefix) for value in class_desc.findall('prefix:value', prefix)] return EnumDef(name.text, values) else: raise Exception('Unknown class descriptor type {}'.format(type_)) @staticmethod def parse_classSet(class_set): types = [] all_types = [] prefix = {} key = class_set.tag prefix['prefix'] = re.search(r'(?<={).*(?=})', key).group(0) for class_desc in class_set.findall('prefix:classDescriptor', prefix): class_desc = SchemaParser.parse_classDescriptor(class_desc, prefix) all_types.append(class_desc) if isinstance(class_desc, TypeDef): types.append(class_desc) return SchemaDef(types, all_types) @staticmethod def rm_brackets(obj): if isinstance(obj, str): return re.sub('^{.*}', '', obj) elif isinstance(obj, dict): return {(SchemaParser.rm_brackets(key), value) for (key, value) in obj.items()} else: raise Exception('rm_brackets for {} is not implemented'.format(type(obj))) def __getSchema(self): return SchemaParser.parse_Schema(self) TickStream.getSchema = __getSchema # This file is compatible with both classic and new-style classes.