tbapi/__init__.py (1,440 lines of code) (raw):
# This file was automatically generated by SWIG (http://www.swig.org).
# Version 3.0.12
#
# Do not make changes to this file unless you know what you are doing--modify
# the SWIG interface file instead.
from sys import version_info as _swig_python_version_info
from sys import platform as _swig_python_platform
platform = 'windows'
if _swig_python_platform.startswith('linux'):
platform = 'linux'
elif _swig_python_platform.startswith('darwin'):
platform = 'darwin'
if _swig_python_version_info >= (3, 6) and _swig_python_version_info < (3, 7):
subdir = 'py36'
elif _swig_python_version_info >= (3, 7) and _swig_python_version_info < (3, 8):
subdir = 'py37'
elif _swig_python_version_info >= (3, 8) and _swig_python_version_info < (3, 9):
subdir = 'py38'
elif _swig_python_version_info >= (3, 9) and _swig_python_version_info < (3, 10):
subdir = 'py39'
elif _swig_python_version_info >= (3, 10) and _swig_python_version_info < (3, 11):
subdir = 'py310'
else:
raise Exception('Version of python (' + str(_swig_python_version_info) + ') is not supported')
if _swig_python_version_info >= (3, 7, 0):
def swig_import_helper():
import importlib
mname = '.'.join((__name__, platform, 'x64', subdir, '_tbapi')).lstrip('.')
try:
return importlib.import_module(mname)
except ImportError:
return importlib.import_module('_tbapi')
_tbapi = swig_import_helper()
del swig_import_helper
elif _swig_python_version_info >= (2, 6, 0):
def swig_import_helper():
from os.path import dirname
import imp
fp = None
try:
directory = '/'.join((dirname(__file__), platform, 'x64', subdir))
fp, pathname, description = imp.find_module('_tbapi', [directory])
except ImportError:
import _tbapi
return _tbapi
try:
_mod = imp.load_module('_tbapi', fp, pathname, description)
finally:
if fp is not None:
fp.close()
return _mod
_tbapi = swig_import_helper()
del swig_import_helper
else:
import _tbapi
del _swig_python_version_info
del _swig_python_platform
from os import path
def version():
tbapi_dir = path.dirname(__file__)
if tbapi_dir != '':
tbapi_dir = tbapi_dir + '/project.properties'
with open(tbapi_dir) as file:
lines = file.readlines()
for line in lines:
split_line = line.split('=')
if len(split_line) == 2:
key = split_line[0].strip()
value = split_line[1].strip()
if key.strip() == 'version' and value != None:
return value
return 'UNKNOWN'
try:
_swig_property = property
except NameError:
pass # Python < 2.2 doesn't have 'property'.
try:
import builtins as __builtin__
except ImportError:
import __builtin__
def _swig_setattr_nondynamic(self, class_type, name, value, static=1):
if (name == "thisown"):
return self.this.own(value)
if (name == "this"):
if type(value).__name__ == 'SwigPyObject':
self.__dict__[name] = value
return
method = class_type.__swig_setmethods__.get(name, None)
if method:
return method(self, value)
if (not static):
if _newclass:
object.__setattr__(self, name, value)
else:
self.__dict__[name] = value
else:
raise AttributeError("You cannot add attributes to %s" % self)
def _swig_setattr(self, class_type, name, value):
return _swig_setattr_nondynamic(self, class_type, name, value, 0)
def _swig_getattr(self, class_type, name):
if (name == "thisown"):
return self.this.own()
method = class_type.__swig_getmethods__.get(name, None)
if method:
return method(self)
raise AttributeError("'%s' object has no attribute '%s'" % (class_type.__name__, name))
def _swig_repr(self):
try:
strthis = "proxy of " + self.this.__repr__()
except __builtin__.Exception:
strthis = ""
return "<%s.%s; %s >" % (self.__class__.__module__, self.__class__.__name__, strthis,)
try:
_object = object
_newclass = 1
except __builtin__.Exception:
class _object:
pass
_newclass = 0
try:
import weakref
weakref_proxy = weakref.proxy
except __builtin__.Exception:
weakref_proxy = lambda x: x
class SwigPyIterator(_object):
__swig_setmethods__ = {}
__setattr__ = lambda self, name, value: _swig_setattr(self, SwigPyIterator, name, value)
__swig_getmethods__ = {}
__getattr__ = lambda self, name: _swig_getattr(self, SwigPyIterator, name)
def __init__(self, *args, **kwargs):
raise AttributeError("No constructor defined - class is abstract")
__repr__ = _swig_repr
__swig_destroy__ = _tbapi.delete_SwigPyIterator
__del__ = lambda self: None
def value(self):
return _tbapi.SwigPyIterator_value(self)
def incr(self, n=1):
return _tbapi.SwigPyIterator_incr(self, n)
def decr(self, n=1):
return _tbapi.SwigPyIterator_decr(self, n)
def distance(self, x):
return _tbapi.SwigPyIterator_distance(self, x)
def equal(self, x):
return _tbapi.SwigPyIterator_equal(self, x)
def copy(self):
return _tbapi.SwigPyIterator_copy(self)
def next(self):
return _tbapi.SwigPyIterator_next(self)
def __next__(self):
return _tbapi.SwigPyIterator___next__(self)
def previous(self):
return _tbapi.SwigPyIterator_previous(self)
def advance(self, n):
return _tbapi.SwigPyIterator_advance(self, n)
def __eq__(self, x):
return _tbapi.SwigPyIterator___eq__(self, x)
def __ne__(self, x):
return _tbapi.SwigPyIterator___ne__(self, x)
def __iadd__(self, n):
return _tbapi.SwigPyIterator___iadd__(self, n)
def __isub__(self, n):
return _tbapi.SwigPyIterator___isub__(self, n)
def __add__(self, n):
return _tbapi.SwigPyIterator___add__(self, n)
def __sub__(self, *args):
return _tbapi.SwigPyIterator___sub__(self, *args)
def __iter__(self):
return self
SwigPyIterator_swigregister = _tbapi.SwigPyIterator_swigregister
SwigPyIterator_swigregister(SwigPyIterator)
from contextlib import contextmanager
JAVA_LONG_MIN_VALUE = -9223372036854775808
JAVA_LONG_MAX_VALUE = 9223372036854775807
class InstrumentMessage(object):
def __str__(self):
return str(vars(self))
class StreamScope(_object):
"""
Determines the scope of a stream's durability, if any.
Example:
```
scope = tbapi.StreamScope('TRANSIENT')
```
Possible values:
```
DURABLE,
EXTERNAL_FILE,
TRANSIENT,
RUNTIME
```
"""
__swig_setmethods__ = {}
__setattr__ = lambda self, name, value: _swig_setattr(self, StreamScope, name, value)
__swig_getmethods__ = {}
__getattr__ = lambda self, name: _swig_getattr(self, StreamScope, name)
__repr__ = _swig_repr
DURABLE = _tbapi.StreamScope_DURABLE
EXTERNAL_FILE = _tbapi.StreamScope_EXTERNAL_FILE
TRANSIENT = _tbapi.StreamScope_TRANSIENT
RUNTIME = _tbapi.StreamScope_RUNTIME
def __init__(self, *args):
this = _tbapi.new_StreamScope(*args)
try:
self.this.append(this)
except __builtin__.Exception:
self.this = this
def __int__(self):
return _tbapi.StreamScope___int__(self)
def __str__(self):
return _tbapi.StreamScope___str__(self)
__swig_destroy__ = _tbapi.delete_StreamScope
__del__ = lambda self: None
StreamScope_swigregister = _tbapi.StreamScope_swigregister
StreamScope_swigregister(StreamScope)
class WriteMode(_object):
"""
APPEND: Adds only new data into a stream without truncations.
REPLACE: Adds data into a stream and removes previous data older that first message time
[truncate(first message time + 1)].
REWRITE: Default. Adds data into a stream and removes previous data by truncating using first message time.
[truncate(first message time)].
TRUNCATE: Stream truncated every time when loader writes a messages earlier than last message time.
Example:
```
mode = tbapi.StreamScope('TRUNCATE')
```
Possible values:
```
APPEND,
REPLACE,
REWRITE,
TRUNCATE
```
"""
__swig_setmethods__ = {}
__setattr__ = lambda self, name, value: _swig_setattr(self, WriteMode, name, value)
__swig_getmethods__ = {}
__getattr__ = lambda self, name: _swig_getattr(self, WriteMode, name)
__repr__ = _swig_repr
APPEND = _tbapi.WriteMode_APPEND
REPLACE = _tbapi.WriteMode_REPLACE
REWRITE = _tbapi.WriteMode_REWRITE
TRUNCATE = _tbapi.WriteMode_TRUNCATE
def __init__(self, *args):
this = _tbapi.new_WriteMode(*args)
try:
self.this.append(this)
except __builtin__.Exception:
self.this = this
def __int__(self):
return _tbapi.WriteMode___int__(self)
def __str__(self):
return _tbapi.WriteMode___str__(self)
__swig_destroy__ = _tbapi.delete_WriteMode
__del__ = lambda self: None
WriteMode_swigregister = _tbapi.WriteMode_swigregister
WriteMode_swigregister(WriteMode)
class SelectionOptions(_object):
"""
Options for selecting data from a stream.
Example:
```
so = tbapi.SelectionOptions()
so._from = 0
so.to = 100000
so.useCompression = False
...
```
"""
__swig_setmethods__ = {}
__setattr__ = lambda self, name, value: _swig_setattr(self, SelectionOptions, name, value)
__swig_getmethods__ = {}
__getattr__ = lambda self, name: _swig_getattr(self, SelectionOptions, name)
__repr__ = _swig_repr
def __init__(self):
this = _tbapi.new_SelectionOptions()
try:
self.this.append(this)
except __builtin__.Exception:
self.this = this
__swig_setmethods__["_from"] = _tbapi.SelectionOptions__from_set
__swig_getmethods__["_from"] = _tbapi.SelectionOptions__from_get
if _newclass:
_from = _swig_property(_tbapi.SelectionOptions__from_get, _tbapi.SelectionOptions__from_set)
__swig_setmethods__["to"] = _tbapi.SelectionOptions_to_set
__swig_getmethods__["to"] = _tbapi.SelectionOptions_to_get
if _newclass:
to = _swig_property(_tbapi.SelectionOptions_to_get, _tbapi.SelectionOptions_to_set)
__swig_setmethods__["useCompression"] = _tbapi.SelectionOptions_useCompression_set
__swig_getmethods__["useCompression"] = _tbapi.SelectionOptions_useCompression_get
if _newclass:
useCompression = _swig_property(_tbapi.SelectionOptions_useCompression_get, _tbapi.SelectionOptions_useCompression_set)
__swig_setmethods__["live"] = _tbapi.SelectionOptions_live_set
__swig_getmethods__["live"] = _tbapi.SelectionOptions_live_get
if _newclass:
live = _swig_property(_tbapi.SelectionOptions_live_get, _tbapi.SelectionOptions_live_set)
__swig_setmethods__["reverse"] = _tbapi.SelectionOptions_reverse_set
__swig_getmethods__["reverse"] = _tbapi.SelectionOptions_reverse_get
if _newclass:
reverse = _swig_property(_tbapi.SelectionOptions_reverse_get, _tbapi.SelectionOptions_reverse_set)
__swig_setmethods__["allowLateOutOfOrder"] = _tbapi.SelectionOptions_allowLateOutOfOrder_set
__swig_getmethods__["allowLateOutOfOrder"] = _tbapi.SelectionOptions_allowLateOutOfOrder_get
if _newclass:
allowLateOutOfOrder = _swig_property(_tbapi.SelectionOptions_allowLateOutOfOrder_get, _tbapi.SelectionOptions_allowLateOutOfOrder_set)
__swig_setmethods__["realTimeNotification"] = _tbapi.SelectionOptions_realTimeNotification_set
__swig_getmethods__["realTimeNotification"] = _tbapi.SelectionOptions_realTimeNotification_get
if _newclass:
realTimeNotification = _swig_property(_tbapi.SelectionOptions_realTimeNotification_get, _tbapi.SelectionOptions_realTimeNotification_set)
__swig_setmethods__["minLatency"] = _tbapi.SelectionOptions_minLatency_set
__swig_getmethods__["minLatency"] = _tbapi.SelectionOptions_minLatency_get
if _newclass:
minLatency = _swig_property(_tbapi.SelectionOptions_minLatency_get, _tbapi.SelectionOptions_minLatency_set)
__swig_destroy__ = _tbapi.delete_SelectionOptions
__del__ = lambda self: None
SelectionOptions_swigregister = _tbapi.SelectionOptions_swigregister
SelectionOptions_swigregister(SelectionOptions)
class LoadingOptions(_object):
"""
Options for loading data into a stream.
Example:
```
lo = tbapi.LoadingOptions()
lo.writeMode = tbapi.WriteMode('TRUNCATE')
so.space = 'myspace'
...
```
"""
__swig_setmethods__ = {}
__setattr__ = lambda self, name, value: _swig_setattr(self, LoadingOptions, name, value)
__swig_getmethods__ = {}
__getattr__ = lambda self, name: _swig_getattr(self, LoadingOptions, name)
__repr__ = _swig_repr
__swig_setmethods__["writeMode"] = _tbapi.LoadingOptions_writeMode_set
__swig_getmethods__["writeMode"] = _tbapi.LoadingOptions_writeMode_get
if _newclass:
writeMode = _swig_property(_tbapi.LoadingOptions_writeMode_get, _tbapi.LoadingOptions_writeMode_set)
__swig_setmethods__["minLatency"] = _tbapi.LoadingOptions_minLatency_set
__swig_getmethods__["minLatency"] = _tbapi.LoadingOptions_minLatency_get
if _newclass:
minLatency = _swig_property(_tbapi.LoadingOptions_minLatency_get, _tbapi.LoadingOptions_minLatency_set)
__swig_setmethods__["space"] = _tbapi.LoadingOptions_space_set
__swig_getmethods__["space"] = _tbapi.LoadingOptions_space_get
if _newclass:
space = _swig_property(_tbapi.LoadingOptions_space_get, _tbapi.LoadingOptions_space_set)
def __init__(self):
this = _tbapi.new_LoadingOptions()
try:
self.this.append(this)
except __builtin__.Exception:
self.this = this
__swig_destroy__ = _tbapi.delete_LoadingOptions
__del__ = lambda self: None
LoadingOptions_swigregister = _tbapi.LoadingOptions_swigregister
LoadingOptions_swigregister(LoadingOptions)
class StreamOptions(_object):
"""
Stream definition attributes.
Example:
```
so = tbapi.StreamOptions()
so.name = key
so.description = key
so.scope = tbapi.StreamScope('DURABLE')
so.distributionFactor = 1
so.highAvailability = False
so.polymorphic = False
so.metadata = schema
db.createStream(key, options)
```
"""
__swig_setmethods__ = {}
__setattr__ = lambda self, name, value: _swig_setattr(self, StreamOptions, name, value)
__swig_getmethods__ = {}
__getattr__ = lambda self, name: _swig_getattr(self, StreamOptions, name)
__repr__ = _swig_repr
def name(self, name: str = None) -> None:
'''Optional user-readable name.'''
if name == None:
return self.__getName()
else:
self.__setName(name)
return name
def description(self, description: str = None) -> None:
'''Optional multi-line description.'''
if description == None:
return self.__getDescription()
else:
self.__setDescription(description)
return description
def owner(self, owner: str = None) -> None:
'''Optional owner of stream.
During stream creation it will be set
equals to authenticated user name.
'''
if owner == None:
return self.__getOwner()
else:
self.__setOwner(owner)
return owner
def location(self, location: str = None) -> None:
'''Location of the stream (by default null). When defined this attribute provides alternative stream location (rather than default location under QuantServerHome)'''
if location == None:
return self.__getLocation()
else:
self.__setLocation(location)
return location
def distributionRuleName(self, distributionRuleName: str = None) -> None:
'''Class name of the distribution rule'''
if distributionRuleName == None:
return self.__getDistributionRuleName()
else:
self.__setDistributionRuleName(distributionRuleName)
return distributionRuleName
def metadata(self, metadata: str = None) -> None:
'''Stream metadata in XML format. To build metadata programatically, use tbapi.SchemaDef class.'''
if metadata == None:
return self.__getMetadata()
else:
self.__setMetadata(metadata)
return metadata
__swig_setmethods__["scope"] = _tbapi.StreamOptions_scope_set
__swig_getmethods__["scope"] = _tbapi.StreamOptions_scope_get
if _newclass:
scope = _swig_property(_tbapi.StreamOptions_scope_get, _tbapi.StreamOptions_scope_set)
__swig_setmethods__["distributionFactor"] = _tbapi.StreamOptions_distributionFactor_set
__swig_getmethods__["distributionFactor"] = _tbapi.StreamOptions_distributionFactor_get
if _newclass:
distributionFactor = _swig_property(_tbapi.StreamOptions_distributionFactor_get, _tbapi.StreamOptions_distributionFactor_set)
__swig_setmethods__["duplicatesAllowed"] = _tbapi.StreamOptions_duplicatesAllowed_set
__swig_getmethods__["duplicatesAllowed"] = _tbapi.StreamOptions_duplicatesAllowed_get
if _newclass:
duplicatesAllowed = _swig_property(_tbapi.StreamOptions_duplicatesAllowed_get, _tbapi.StreamOptions_duplicatesAllowed_set)
__swig_setmethods__["highAvailability"] = _tbapi.StreamOptions_highAvailability_set
__swig_getmethods__["highAvailability"] = _tbapi.StreamOptions_highAvailability_get
if _newclass:
highAvailability = _swig_property(_tbapi.StreamOptions_highAvailability_get, _tbapi.StreamOptions_highAvailability_set)
__swig_setmethods__["unique"] = _tbapi.StreamOptions_unique_set
__swig_getmethods__["unique"] = _tbapi.StreamOptions_unique_get
if _newclass:
unique = _swig_property(_tbapi.StreamOptions_unique_get, _tbapi.StreamOptions_unique_set)
__swig_setmethods__["polymorphic"] = _tbapi.StreamOptions_polymorphic_set
__swig_getmethods__["polymorphic"] = _tbapi.StreamOptions_polymorphic_get
if _newclass:
polymorphic = _swig_property(_tbapi.StreamOptions_polymorphic_get, _tbapi.StreamOptions_polymorphic_set)
__swig_setmethods__["periodicity"] = _tbapi.StreamOptions_periodicity_set
__swig_getmethods__["periodicity"] = _tbapi.StreamOptions_periodicity_get
if _newclass:
periodicity = _swig_property(_tbapi.StreamOptions_periodicity_get, _tbapi.StreamOptions_periodicity_set)
def __eq__(self, value):
return _tbapi.StreamOptions___eq__(self, value)
def __init__(self):
this = _tbapi.new_StreamOptions()
try:
self.this.append(this)
except __builtin__.Exception:
self.this = this
def __getName(self):
return _tbapi.StreamOptions___getName(self)
def __setName(self, name):
return _tbapi.StreamOptions___setName(self, name)
def __getDescription(self):
return _tbapi.StreamOptions___getDescription(self)
def __setDescription(self, description):
return _tbapi.StreamOptions___setDescription(self, description)
def __getOwner(self):
return _tbapi.StreamOptions___getOwner(self)
def __setOwner(self, owner):
return _tbapi.StreamOptions___setOwner(self, owner)
def __getLocation(self):
return _tbapi.StreamOptions___getLocation(self)
def __setLocation(self, location):
return _tbapi.StreamOptions___setLocation(self, location)
def __getDistributionRuleName(self):
return _tbapi.StreamOptions___getDistributionRuleName(self)
def __setDistributionRuleName(self, distributionRuleName):
return _tbapi.StreamOptions___setDistributionRuleName(self, distributionRuleName)
def __getMetadata(self):
return _tbapi.StreamOptions___getMetadata(self)
def __setMetadata(self, metadata):
return _tbapi.StreamOptions___setMetadata(self, metadata)
__swig_destroy__ = _tbapi.delete_StreamOptions
__del__ = lambda self: None
StreamOptions_swigregister = _tbapi.StreamOptions_swigregister
StreamOptions_swigregister(StreamOptions)
class QueryParameter(_object):
"""Input parameter definition for a prepared statement."""
__swig_setmethods__ = {}
__setattr__ = lambda self, name, value: _swig_setattr(self, QueryParameter, name, value)
__swig_getmethods__ = {}
__getattr__ = lambda self, name: _swig_getattr(self, QueryParameter, name)
__repr__ = _swig_repr
__swig_setmethods__["name"] = _tbapi.QueryParameter_name_set
__swig_getmethods__["name"] = _tbapi.QueryParameter_name_get
if _newclass:
name = _swig_property(_tbapi.QueryParameter_name_get, _tbapi.QueryParameter_name_set)
__swig_setmethods__["type"] = _tbapi.QueryParameter_type_set
__swig_getmethods__["type"] = _tbapi.QueryParameter_type_get
if _newclass:
type = _swig_property(_tbapi.QueryParameter_type_get, _tbapi.QueryParameter_type_set)
def __init__(self, *args):
this = _tbapi.new_QueryParameter(*args)
try:
self.this.append(this)
except __builtin__.Exception:
self.this = this
def value(self, *args):
return _tbapi.QueryParameter_value(self, *args)
__swig_destroy__ = _tbapi.delete_QueryParameter
__del__ = lambda self: None
QueryParameter_swigregister = _tbapi.QueryParameter_swigregister
QueryParameter_swigregister(QueryParameter)
class TickDb(_object):
"""
The top-level implementation to the methods of the Deltix Tick
Database engine. Instances of this class are created by static method
createFromUrl:
```
db = tbapi.TickDb.createFromUrl('dxtick://localhost:8011')
```
or
```
db = tbapi.TickDb.createFromUrl('dxtick://localhost:8011', 'user', 'password')
```
"""
__swig_setmethods__ = {}
__setattr__ = lambda self, name, value: _swig_setattr(self, TickDb, name, value)
__swig_getmethods__ = {}
__getattr__ = lambda self, name: _swig_getattr(self, TickDb, name)
def __init__(self, *args, **kwargs):
raise AttributeError("No constructor defined")
__repr__ = _swig_repr
@staticmethod
def createFromUrl(url: str, user: str = None, password: str = None) -> "TickDb":
'''Creates a new database instance with the specified root folder, or URL.
Args:
url (str): Connection URL.
user (str): User.
password (str): Password.
Returns:
TickDb: An un-opened TickDB instance.
'''
if user == None:
return _tbapi.TickDb___createFromUrl(url)
else:
return _tbapi.TickDb___createFromUrlWithUser(url, user, password)
@staticmethod
@contextmanager
def openFromUrl(url: str, readonly: bool, user: str = None, password: str = None):
'''Creates a new database instance with the specified root folder, or URL, and opens it.
Args:
url (str): Connection URL.
readonly (bool): Open data store in read-only mode.
user (str): User.
password (str): Password.
Returns:
TickDb: An opened TickDB instance.
'''
db = TickDb.createFromUrl(url, user, password)
try:
db.open(readonly)
yield db
finally:
if db.isOpen():
db.close()
def isReadOnly(self) -> bool:
'''Determines whether the store is open as read-only.'''
return self.__isReadOnly()
def isOpen(self) -> bool:
'''Determines whether the store is open.'''
return self.__isOpen()
def open(self, readOnlyMode: bool) -> bool:
'''Open the data store.
Args:
readOnlyMode (bool): Open data store in read-only mode.
'''
return self.__open(readOnlyMode)
def close(self) -> None:
'''Closes data store.'''
return self.__close()
def format(self) -> bool:
'''Create a new object on disk and format internally.
The data store is left open for read-write at the end of this method.
'''
return self.__format()
def listStreams(self) -> 'list[TickStream]':
'''Enumerates existing streams.
Returns:
list[TickStream]: An array of existing stream objects.
'''
return self.__listStreams()
def getStream(self, key: str) -> 'TickStream':
'''Looks up an existing stream by key.
Args:
key (str): Identifies the stream.
Returns:
TickStream: A stream object, or None if the key was not found.
'''
return self.__getStream(key)
def createStream(self, key: str, options: StreamOptions) -> 'TickStream':
'''Creates a new stream within the database.
Args:
key (str): A required key later used to identify the stream.
options (StreamOptions): Options for creating the stream.
Returns:
TickStream: A new instance of TickStream.
'''
return self.__createStream(key, options)
def createFileStream(self, key: str, dataFile: str) -> 'TickStream':
'''Creates a new stream mount to the given data file.
Args:
key (str): A required key later used to identify the stream.
dataFile (str): Path to the data file (on server side).
Returns:
TickStream: A new instance of TickStream.
'''
return self.__createFileStream(key)
def createCursor(self, stream: 'TickStream', options: SelectionOptions) -> 'TickCursor':
'''Opens an initially empty cursor for reading data from multiple streams,
according to the specified options. The messages
are returned from the cursor strictly ordered by time. Within the same
exact timestamp, the order of messages is undefined and may vary from
call to call, i.e. it is non-deterministic.
The cursor is returned initially empty and must be reset.
The TickCursor class provides
methods for dynamically re-configuring the subscription, or jumping to
a different timestamp.
Args:
stream (TickStream): Stream from which data will be selected.
options (SelectionOptions): Selection options.
Returns:
TickCursor: A cursor used to read messages.
'''
return self.__createCursor(stream, options)
@contextmanager
def tryCursor(self, stream: 'TickStream', options: SelectionOptions) -> 'TickCursor':
'''contextmanager version of createCursor. Usage:
```
with db.newCursor(stream, options) as cursor:
while cursor.next():
message = cursor.getMessage()
```
'''
cursor = None
try:
cursor = self.__createCursor(stream, options)
yield cursor
finally:
cursor.close()
def select(self, timestamp: int, streams: 'list[TickStream]', options: SelectionOptions, types: 'list[str]', entities: 'list[str]') -> 'TickCursor':
'''Opens a cursor for reading data from multiple streams,
according to the specified options. The messages
are returned from the cursor strictly ordered by time. Within the same
exact time stamp, the order of messages is undefined and may vary from
call to call, i.e. it is non-deterministic.
Note that the arguments of this method only determine the initial
configuration of the cursor. The TickCursor clsas provides
methods for dynamically re-configuring the subscription, or jumping to
a different timestamp.
Args:
timestamp (int): The start timestamp in millis.
streams (list[TickStream]): Streams from which data will be selected.
options (SelectionOptions): Selection options.
types (list[str]): Specified message types to be subscribed. If null, then all types will be subscribed.
entities (list[str]): Specified entities to be subscribed. If null, then all entities will be subscribed.
Returns:
TickCursor: A cursor used to read messages.
'''
return self.__select(timestamp, streams, options, types, entities)
@contextmanager
def trySelect(self, timestamp: int, streams: 'list[TickStream]', options: SelectionOptions, types: 'list[str]', entities: 'list[str]') -> 'TickCursor':
'''Contextmanager version of select. Usage:
```
with db.newSelect(timestamp, streams, options, types, entities) as cursor:
while cursor.next():
message = cursor.getMessage()
```
'''
cursor = None
try:
cursor = self.__select(timestamp, streams, options, types, entities)
yield cursor
finally:
cursor.close()
def createLoader(self, stream: 'TickStream', options: LoadingOptions) -> 'TickLoader':
'''Creates a channel for loading data. The loader must be closed
when the loading process is finished.
Args:
stream (TickStream): stream for loading data.
options (SelectionOptions): Loading Options.
Returns:
TickLoader: created loader.
'''
return self.__createLoader(stream, options)
@contextmanager
def tryLoader(self, stream: 'TickStream', options: LoadingOptions) -> 'TickLoader':
'''Contextmanager version of createLoader. Usage:
with db.newLoader(stream, options) as loader:
loader.send(message)
'''
loader = None
try:
loader = self.__createLoader(stream, options)
yield loader
finally:
loader.close()
def executeQuery(self, query: str, options: SelectionOptions = None, timestamp: int = JAVA_LONG_MIN_VALUE, entities: 'list[str]' = None, params: 'list[QueryParameter]' = []) -> 'TickCursor':
'''Execute Query and creates a message source for reading data from it,
according to the specified options. The messages
are returned from the cursor strictly ordered by time. Within the same
exact time stamp, the order of messages is undefined and may vary from
call to call, i.e. it is non-deterministic.
Args:
query (str): Query text element.
options (SelectionOptions): Selection options.
timestamp (int): The start timestamp in millis.
entities (list[str]): Specified entities to be subscribed.
If null, then all entities will be subscribed.
params (list[QueryParameter]): The parameter values of the query.
Returns:
TickCursor: An iterable message source to read messages.
'''
if options == None:
return self.__executeQuery(query)
else:
return self.__executeQueryFull(query, options, timestamp, entities, params);
@contextmanager
def tryExecuteQuery(self, query: str, options: SelectionOptions = None, timestamp: int = JAVA_LONG_MIN_VALUE, entities: 'list[str]' = None, params: 'list[QueryParameter]' = []) -> 'TickCursor':
'''Contextmanager version of executeQuery. Usage:
```
with db.newExecuteQuery('select * from stream') as cursor:
while cursor.next():
message = cursor.getMessage()
```
'''
cursor = None
try:
if options == None:
cursor = self.__executeQuery(query)
else:
cursor = self.__executeQueryFull(query, options, timestamp, entities, params);
yield cursor
finally:
cursor.close()
if _newclass:
__createFromUrl = staticmethod(_tbapi.TickDb___createFromUrl)
else:
__createFromUrl = _tbapi.TickDb___createFromUrl
if _newclass:
__createFromUrlWithUser = staticmethod(_tbapi.TickDb___createFromUrlWithUser)
else:
__createFromUrlWithUser = _tbapi.TickDb___createFromUrlWithUser
def __isReadOnly(self):
return _tbapi.TickDb___isReadOnly(self)
def __isOpen(self):
return _tbapi.TickDb___isOpen(self)
def __open(self, readOnlyMode):
return _tbapi.TickDb___open(self, readOnlyMode)
def __close(self):
return _tbapi.TickDb___close(self)
def __format(self):
return _tbapi.TickDb___format(self)
def __listStreams(self):
return _tbapi.TickDb___listStreams(self)
def __getStream(self, key):
return _tbapi.TickDb___getStream(self, key)
def __createStream(self, key, options):
return _tbapi.TickDb___createStream(self, key, options)
def __createFileStream(self, key, dataFile):
return _tbapi.TickDb___createFileStream(self, key, dataFile)
def __createCursor(self, stream, options):
return _tbapi.TickDb___createCursor(self, stream, options)
def __select(self, time, streams, options, types, entities):
return _tbapi.TickDb___select(self, time, streams, options, types, entities)
def __createLoader(self, stream, options):
return _tbapi.TickDb___createLoader(self, stream, options)
def __executeQueryFull(self, qql, options, time, instruments, params):
return _tbapi.TickDb___executeQueryFull(self, qql, options, time, instruments, params)
__swig_destroy__ = _tbapi.delete_TickDb
__del__ = lambda self: None
def __executeQuery(self, qql):
return _tbapi.TickDb___executeQuery(self, qql)
TickDb_swigregister = _tbapi.TickDb_swigregister
TickDb_swigregister(TickDb)
def TickDb___createFromUrl(url):
return _tbapi.TickDb___createFromUrl(url)
TickDb___createFromUrl = _tbapi.TickDb___createFromUrl
def TickDb___createFromUrlWithUser(url, username, password):
return _tbapi.TickDb___createFromUrlWithUser(url, username, password)
TickDb___createFromUrlWithUser = _tbapi.TickDb___createFromUrlWithUser
class TickStream(_object):
"""
The stream is a time series of messages for a number of
financial instruments ('entities'). Messages can be price bars, trade ticks,
bid/offer ticks, or any of the many more built-in and user-defined types.
In the simplest case, a database will have a single stream of data.
Multiple streams can be used to represent data of different frequencies, or completely
different factors. For instance, separate streams can represent
1-minute price bars and ticks for the same set of entities. Or,
you can have price bars and volatility bars in separate streams.
Get stream:
```
stream = tickdb.getStream('stream_key')
```
List stream:
```
streams = tickdb.listStreams()
```
"""
__swig_setmethods__ = {}
__setattr__ = lambda self, name, value: _swig_setattr(self, TickStream, name, value)
__swig_getmethods__ = {}
__getattr__ = lambda self, name: _swig_getattr(self, TickStream, name)
def __init__(self, *args, **kwargs):
raise AttributeError("No constructor defined")
__repr__ = _swig_repr
def key(self) -> str:
'''Returns the key, which uniquely identifies the stream within its database.'''
return self.__key()
def name(self) -> str:
'''Returns a user-readable short name.'''
return self.__name()
def distributionFactor(self) -> int:
'''Returns the target number of files to be used for storing data.'''
return self.__distributionFactor()
def description(self) -> str:
'''Returns a user-readable multi-line description.'''
return self.__description()
def owner(self) -> str:
'''Returns stream owner.'''
return self.__owner()
def location(self) -> str:
'''Returns stream location.'''
return self.__location()
def metadata(self) -> str:
'''Returns stream schema (in xml format).'''
return self.__metadata()
def scope(self) -> StreamScope:
'''Returns stream schema (in xml format).'''
return self.__scope()
def highAvailability(self) -> bool:
'''Returns stream memory caching parameter. High availability durable streams are cached on startup.'''
return self.__highAvailability()
def unique(self) -> bool:
'''Unique streams maintain in-memory cache of resent messages.
This concept assumes that stream messages will have some field(s) marked as primary key.
Primary key may be a simple field (e.g. symbol) or composite (e.g. symbol and portfolio ID).
For each key TimeBase runtime maintains a copy of the last message received for this key (cache).
Each new consumer will receive a snapshot of current cache at the beginning of live data subscription.
'''
return self.__unique()
def polymorphic(self) -> bool:
'''Returns whether the stream is configured as polymorphic.'''
return self.__polymorphic()
def periodicity(self) -> str:
'''Returns Stream periodicity, if known.'''
return self.__periodicity()
def options(self) -> StreamOptions:
'''Returns stream options object.'''
return self.__options()
def describe(self) -> str:
'''Returns stream DDL description.'''
return self.__describe()
def setSchema(self, options: StreamOptions) -> bool:
'''Changes stream schema.
Args:
options (StreamOptions): Stream options, that contains new schema xml.
Returns
bool: True, if schema was changed successfully.
'''
return self.__setSchema(options)
def listEntities(self) -> 'list[str]':
'''Return an inclusive range of times for which the specified entities
have data in the database.
Returns:
list[str]: selected entities.
'''
return self.__listEntities()
def truncate(self, timestamp: int, entities: 'list[str]' = None) -> bool:
'''Truncates stream data for the given entities from given time
Args:
timestamp (int): Timestamp in millis. If time less than stream start time, then all stream data will be deleted.
entities (list[str]): A list of entities. If None, all stream entities will be used.
Returns:
bool: true, if stream was truncated successfully.
'''
if entities == None:
return self.__truncate(timestamp)
else:
return self.__truncateEntities(timestamp, entities)
def clear(self, entities: 'list[str]' = None) -> bool:
'''Clear stream data for the given entities.
Args:
entities (list[str]): A list of entities. If None, all stream entities will be used.
'''
if entities == None:
return self.__clear()
else:
return self.__clearEntities(entities)
def purge(self, timestamp: int) -> bool:
'''Deletes stream data that is older than a specified time
Args:
timestamp (int):Purge time in milliseconds.
Returns:
bool: true, if stream was purged successfully.
'''
return self.__purge(timestamp)
def deleteStream(self) -> bool:
'''Deletes this stream
Returns:
bool: true, if stream was deleted successfully.
'''
return self.__deleteStream()
def abortBackgroundProcess(self) -> bool:
'''Aborts active background process if any exists'''
return self.__abortBackgroundProcess()
def select(self, timestamp: int, options: SelectionOptions, types: 'list[str]', entities: 'list[str]') -> 'TickCursor':
'''Opens a cursor for reading data from this stream, according to the
specified options. The messages
are returned from the cursor strictly ordered by time. Within the same
exact time stamp, the order of messages is undefined and may vary from
call to call, i.e. it is non-deterministic.
Note that the arguments of this method only determine the initial
configuration of the cursor. The TickCursor interface provides
methods for dynamically re-configuring the subscription, or jumping to
a different timestamp.
Args:
timestamp (int): The start timestamp in millis.
options (SelectionOptions): Selection options.
types (list[str]): Specified message types to be subscribed. If null, then all types will be subscribed.
entities (list[str]): Specified entities to be subscribed. If null, then all entities will be subscribed.
Returns:
TickCursor: A cursor used to read messages.
'''
return self.__select(timestamp, options, types, entities)
@contextmanager
def trySelect(self, timestamp: int, options: SelectionOptions, types: 'list[str]', entities: 'list[str]') -> 'TickCursor':
'''Contextmanager version of select. Usage:
```
with stream.newSelect(timestamp, options, types, entities) as cursor:
while cursor.next():
message = cursor.getMessage()
```
'''
cursor = None
try:
cursor = self.__select(timestamp, options, types, entities)
yield cursor
finally:
cursor.close()
def createCursor(self, options: SelectionOptions) -> 'TickCursor':
'''Creates a cursor for reading data from this stream, according to the
specified options, but initially with a fully restricted filter.
The user must call TickCursor.reset at least once, in order to
begin retrieving data. This method is equivalent to (but is
slightly more optimal than) calling createCursor(options)
Args:
options (SelectionOptions): Selection Options.
Returns:
A cursor used to read messages. Never null.
'''
return self.__createCursor(options)
@contextmanager
def tryCursor(self, options: SelectionOptions) -> 'TickCursor':
'''contextmanager version of createCursor. Usage:
```
with stream.newCursor(options) as cursor:
while cursor.next():
message = cursor.getMessage()
```
'''
cursor = None
try:
cursor = self.__createCursor(options)
yield cursor
finally:
cursor.close()
def createLoader(self, options: LoadingOptions) -> 'TickLoader':
'''Creates a channel for loading data. The loader must be closed
when the loading process is finished.
Args:
options (SelectionOptions): Loading Options.
Returns:
TickLoader: created loader.
'''
return self.__createLoader(options)
@contextmanager
def tryLoader(self, options: LoadingOptions) -> 'TickLoader':
'''Contextmanager version of createLoader. Usage:
```
with stream.newLoader(options) as loader:
loader.send(message)
```
'''
loader = None
try:
loader = self.__createLoader(options)
yield loader
finally:
loader.close()
def listSpaces(self) -> 'list[str]':
'''Returns all created "spaces" for the stream.
Default space returns as "" (empty string).
If backing stream does not support spaces None will be returned.
'''
return self.__listSpaces()
def renameSpace(self, newName: str, oldName: str) -> None:
'''Rename existing space.
Args:
nameName (str): space to rename.
oldName (str): new space name.
'''
return self.__renameSpace(newName, oldName)
def deleteSpaces(self, spaces: 'list[str]') -> None:
'''Removed given 'spaces' permanently.
Args:
spaces (list[str]): list of spaces names to delete.
'''
return self.__deleteSpaces(spaces)
def getTimeRange(self, entities: 'list[str]' = None) -> 'list[int]':
'''Return an inclusive range of times for which the specified entities
have data in the database.
Args:
entities (list[str]): A list of entities. If empty, return for all.
'''
if entities == None:
return self.__getTimeRange()
else:
return self.__getTimeRangeEntities(entities)
def getSpaceTimeRange(self, space: str) -> 'list[int]':
'''An array consisting of two long timestamps (from and to) or None if no data was found.
Args:
space (str): space name.
'''
return self.__getSpaceTimeRange(space)
def __key(self):
return _tbapi.TickStream___key(self)
def __distributionFactor(self):
return _tbapi.TickStream___distributionFactor(self)
def __name(self):
return _tbapi.TickStream___name(self)
def __description(self):
return _tbapi.TickStream___description(self)
def __owner(self):
return _tbapi.TickStream___owner(self)
def __location(self):
return _tbapi.TickStream___location(self)
def __metadata(self):
return _tbapi.TickStream___metadata(self)
def __scope(self):
return _tbapi.TickStream___scope(self)
def __highAvailability(self):
return _tbapi.TickStream___highAvailability(self)
def __unique(self):
return _tbapi.TickStream___unique(self)
def __polymorphic(self):
return _tbapi.TickStream___polymorphic(self)
def __periodicity(self):
return _tbapi.TickStream___periodicity(self)
def __options(self):
return _tbapi.TickStream___options(self)
def __describe(self):
return _tbapi.TickStream___describe(self)
def __setSchema(self, options):
return _tbapi.TickStream___setSchema(self, options)
def __listEntities(self):
return _tbapi.TickStream___listEntities(self)
def __truncate(self, millisecondTime):
return _tbapi.TickStream___truncate(self, millisecondTime)
def __truncateEntities(self, millisecondTime, entities):
return _tbapi.TickStream___truncateEntities(self, millisecondTime, entities)
def __clear(self):
return _tbapi.TickStream___clear(self)
def __clearEntities(self, entities):
return _tbapi.TickStream___clearEntities(self, entities)
def __purge(self, millisecondTime):
return _tbapi.TickStream___purge(self, millisecondTime)
def __deleteStream(self):
return _tbapi.TickStream___deleteStream(self)
def __abortBackgroundProcess(self):
return _tbapi.TickStream___abortBackgroundProcess(self)
def __select(self, millisecondTime, options, types, entities):
return _tbapi.TickStream___select(self, millisecondTime, options, types, entities)
def __createCursor(self, options):
return _tbapi.TickStream___createCursor(self, options)
def __createLoader(self, options):
return _tbapi.TickStream___createLoader(self, options)
def __listSpaces(self):
return _tbapi.TickStream___listSpaces(self)
def __renameSpace(self, newName, oldName):
return _tbapi.TickStream___renameSpace(self, newName, oldName)
def __deleteSpaces(self, spaces):
return _tbapi.TickStream___deleteSpaces(self, spaces)
__swig_destroy__ = _tbapi.delete_TickStream
__del__ = lambda self: None
def __getTimeRange(self):
return _tbapi.TickStream___getTimeRange(self)
def __getTimeRangeEntities(self, entities):
return _tbapi.TickStream___getTimeRangeEntities(self, entities)
def __getSpaceTimeRange(self, space):
return _tbapi.TickStream___getSpaceTimeRange(self, space)
TickStream_swigregister = _tbapi.TickStream_swigregister
TickStream_swigregister(TickStream)
OK = _tbapi.OK
END_OF_CURSOR = _tbapi.END_OF_CURSOR
UNAVAILABLE = _tbapi.UNAVAILABLE
class TickCursor(_object):
"""
A cursor (also known as iterator, or result set) for reading data from a
stream. This class provides methods for dynamically reconfiguring the feed,
as well as method reset for essentially re-opening the cursor on a completely different timestamp.
To get a cursor, use select method from TickDb or TickStream objects,
or call executeQuery to open cursor to QQL result set.
Also cursor can be created with createCursor method, but it will be not initialized cursor,
so cursor should be configured with types, entities and read time calling reset:
```
options = tbapi.SelectionOptions()
cursor = tickdb.createCursor(stream, options)
cursor.subscribeToAllEntities()
cursor.subscribeToAllTypes()
cursor.reset(timestamp)
```
"""
__swig_setmethods__ = {}
__setattr__ = lambda self, name, value: _swig_setattr(self, TickCursor, name, value)
__swig_getmethods__ = {}
__getattr__ = lambda self, name: _swig_getattr(self, TickCursor, name)
def __init__(self, *args, **kwargs):
raise AttributeError("No constructor defined")
__repr__ = _swig_repr
__swig_destroy__ = _tbapi.delete_TickCursor
__del__ = lambda self: None
def next(self) -> bool:
'''Moves cursor on to the next message. This method blocks until the next message becomes available,
or until the cursor is determined to be at the end of the sequence.
This method is illegal to call if isAtEnd() returns true.
Returns:
bool: false if at the end of the cursor.
'''
return self.__next()
def getMessage(self) -> 'InstrumentMessage':
'''Returns an InstrumentMessage object cursor points at.'''
return self.__getMessage()
def isAtEnd(self) -> bool:
'''Returns true if the last call to next() returned false. Returns false if next() has not been called yet.
This method is legal to call any number of times at any point in the cursor's lifecycle.
'''
return self.__isAtEnd()
def nextIfAvailable(self) -> int:
'''Moves cursor on to the next message, but this method NOT blocks until the next message becomes available.
Returns:
NextResult: OK (0) if new message is available,
END_OF_CURSOR (1) if cursor was closed,
otherwise, UNAVAILABLE (2)
'''
return self.__nextIfAvailable()
def isClosed(self) -> bool:
'''Returns true, if cursor was closed'''
return self.__isClosed()
def close(self) -> None:
'''Close the cursor'''
return self.__close()
def getCurrentStreamKey(self) -> str:
'''Return the key of the stream that is the source of the current message.'''
return self.__getCurrentStreamKey()
def reset(self, timestamp: int, entities: 'list[str]' = None) -> None:
'''Reposition the message source to a new point in time, while
preserving current subscription.
Args:
timestamp (int): The new position in time in millis.
entities ('list[str]'): list of entities to reset
'''
if entities == None:
return self.__reset(timestamp)
else:
return self.__resetEntities(timestamp, entities)
def subscribeToAllEntities(self) -> None:
'''Subscribe to all available entities.'''
return self.__subscribeToAllEntities()
def clearAllEntities(self) -> None:
'''Switch to selective subscription mode (if necessary) and clear the list.'''
return self.__clearAllEntities()
def addEntity(self, entity: str) -> None:
'''Add the specified entity to subscription. The type and symbol are copied
from the incoming object, if necessary, so the argument can be re-used
after the call.
Special note about options:
The following fragment will subscribe to specific option contract "DAV 100417P00085000":
cursor.addEntity('DAV 100417P00085000');
While the following will subscribe to option root (and you will get all instruments with this root):
cursor.addEntity("DAV ");
'''
return self.__addEntity(entity)
def addEntities(self, entities: 'list[str]') -> None:
'''Bulk add the specified entities to subscription. The type and symbol are copied
from the incoming objects, if necessary, so the arguments can be re-used
after the call.
'''
return self.__addEntities(entities)
def removeEntities(self, entities: 'list[str]') -> None:
'''Remove the specified entities from subscription. The type and symbol are copied
from the incoming objects, if necessary, so the arguments can be re-used
after the call.
'''
return self.__removeEntities(entities)
def removeEntity(self, entity: str) -> None:
'''Remove the specified entity from subscription. The type and symbol are copied
from the incoming object, if necessary, so the argument can be re-used
after the call.
'''
return self.__removeEntity(entity)
def subscribeToAllTypes(self) -> None:
'''Subscribe to all available types (no filtering).'''
return self.__subscribeToAllTypes()
def addTypes(self, types: 'list[str]') -> None:
'''Add the specified type names to subscription.'''
return self.__addTypes(types)
def removeTypes(self, types: 'list[str]') -> None:
'''Remove the specified types from subscription.'''
return self.__removeTypes(types)
def setTypes(self, types: 'list[str]') -> None:
'''Subscribe to specified types.'''
return self.__setTypes(types)
def add(self, types: 'list[str]', entities: 'list[str]') -> None:
'''Add the specified entities and types to subscription. The type and symbol are copied
from the incoming object, if necessary, so the argument can be re-used
after the call.
Args:
types (list[str]): not-null array of type names to subscribe.
entities (list[str]): not-null array of instruments to subscribe.
'''
return self.__add(types, entities)
def remove(self, types: 'list[str]', entities: 'list[str]') -> None:
'''Remove the specified entities and types from subscription. The type and symbol are copied
from the incoming objects, if necessary, so the arguments can be re-used
after the call.
Args:
types (list[str]): not-null array of type names to unsubscribe.
entities (list[str]): not-null array of instruments to unsubscribe.
'''
return self.__remove(types, entities)
def addStreams(self, streams: 'list[TickStream]') -> None:
'''Add streams to subscription. Current time and filter is used to query
data from new sources.
Args:
streams ('list[TickStream]'): Streams to add.
'''
return self.__addStreams(streams)
def removeStreams(self, streams: 'list[TickStream]') -> None:
'''Remove streams from subscription.
Args:
streams (list[TickStream]): Streams to remove.
'''
return self.__removeStreams(streams)
def removeAllStreams(self) -> None:
'''Remove all streams from subscription.'''
return self.__removeAllStreams()
def setTimeForNewSubscriptions(self, timestamp: int) -> None:
'''This method affects subsequent "add subscription" methods,
such as, for instance, addEntity(). New subscriptions start at
the specified time.
Args:
timestamp (int): The time to use.
'''
return self.__setTimeForNewSubscriptions(timestamp)
def __next(self):
return _tbapi.TickCursor___next(self)
def __nextIfAvailable(self):
return _tbapi.TickCursor___nextIfAvailable(self)
def __getMessage(self):
return _tbapi.TickCursor___getMessage(self)
def __isAtEnd(self):
return _tbapi.TickCursor___isAtEnd(self)
def __isClosed(self):
return _tbapi.TickCursor___isClosed(self)
def __close(self):
return _tbapi.TickCursor___close(self)
def __getCurrentStreamKey(self):
return _tbapi.TickCursor___getCurrentStreamKey(self)
def __reset(self, dt):
return _tbapi.TickCursor___reset(self, dt)
def __resetEntities(self, dt, entities):
return _tbapi.TickCursor___resetEntities(self, dt, entities)
def __subscribeToAllEntities(self):
return _tbapi.TickCursor___subscribeToAllEntities(self)
def __clearAllEntities(self):
return _tbapi.TickCursor___clearAllEntities(self)
def __addEntities(self, entities):
return _tbapi.TickCursor___addEntities(self, entities)
def __addEntity(self, entity):
return _tbapi.TickCursor___addEntity(self, entity)
def __removeEntities(self, entities):
return _tbapi.TickCursor___removeEntities(self, entities)
def __removeEntity(self, entity):
return _tbapi.TickCursor___removeEntity(self, entity)
def __subscribeToAllTypes(self):
return _tbapi.TickCursor___subscribeToAllTypes(self)
def __addTypes(self, types):
return _tbapi.TickCursor___addTypes(self, types)
def __removeTypes(self, types):
return _tbapi.TickCursor___removeTypes(self, types)
def __setTypes(self, types):
return _tbapi.TickCursor___setTypes(self, types)
def __add(self, types, entities):
return _tbapi.TickCursor___add(self, types, entities)
def __remove(self, types, entities):
return _tbapi.TickCursor___remove(self, types, entities)
def __addStreams(self, streams):
return _tbapi.TickCursor___addStreams(self, streams)
def __removeStreams(self, streams):
return _tbapi.TickCursor___removeStreams(self, streams)
def __removeAllStreams(self):
return _tbapi.TickCursor___removeAllStreams(self)
def __setTimeForNewSubscriptions(self, dt):
return _tbapi.TickCursor___setTimeForNewSubscriptions(self, dt)
TickCursor_swigregister = _tbapi.TickCursor_swigregister
TickCursor_swigregister(TickCursor)
class TickLoader(_object):
"""
Object which consumes messages.
Create loader from TickDb:
options = tbapi.LoadingOptions()
stream = tickdb.createLoader(stream, options)
Create loader from TickStream:
options = tbapi.LoadingOptions()
stream = stream.createLoader(options)
"""
__swig_setmethods__ = {}
__setattr__ = lambda self, name, value: _swig_setattr(self, TickLoader, name, value)
__swig_getmethods__ = {}
__getattr__ = lambda self, name: _swig_getattr(self, TickLoader, name)
__repr__ = _swig_repr
def __init__(self, loader):
this = _tbapi.new_TickLoader(loader)
try:
self.this.append(this)
except __builtin__.Exception:
self.this = this
__swig_destroy__ = _tbapi.delete_TickLoader
__del__ = lambda self: None
def send(self, message: InstrumentMessage) -> None:
'''This method is invoked to send a message to the object.
Args:
message (InstrumentMessage): A temporary buffer with the message.
By convention, the message is only valid for the duration of this call.
'''
return self.__send(message)
def flush(self) -> None:
'''Flushes all buffered messages by sending them to server.
Note that calling 'send' method not guaranty that all messages will be delivered and stored to server.
'''
return self.__flush()
def close(self) -> None:
'''Flushes and closes the loader'''
return self.__close()
def addListener(self, listener: 'ErrorListener') -> None:
'''Register error listener. All writing data errors will be delivered to the listener.
Args:
listener (ErrorListener): error listener to register.
'''
return self.__addListener(listener)
def removeListener(self, listener: 'ErrorListener') -> None:
'''Unsubscribe registered error listener.
Args:
listener (ErrorListener): error listener to unsubscribe.
'''
return self.__removeListener(listener)
def nErrorListeners(self) -> int:
'''Returns number of registered error listeners'''
return self.__nErrorListeners()
def registerType(self, type: str) -> int:
'''Register type of sending message to get type id. For performance reasons,
you could specify type id instead of type name, for example:
```
message = tbapi.InstrumentMessage()
message.typeId = loader.registerType("deltix.timebase.api.messages.universal.PackageHeader")
// as alternative, you could write:
// message.typeName = "deltix.timebase.api.messages.universal.PackageHeader"
loader.send(message)
```
Args:
type (str): name of type to register.
Returns:
int: id of registered type.
'''
return self.__registerType(type)
def registerInstrument(self, symbol: str) -> int:
'''Register instrument of sending message to get instrument id. For performance reasons,
you could specify instrument id instead of symbol and instrument type, for example:
```
message = tbapi.InstrumentMessage()
message.instrumentId = loader.registerInstrument('AAPL')
// as alternative, you could write:
// message.symbol = 'AAPL'
loader.send(message)
```
Args:
symbol (str): instrument ticker.
Returns:
int: id of registered instrument.
'''
return self.__registerInstrument(symbol)
def __registerType(self, type_name):
return _tbapi.TickLoader___registerType(self, type_name)
def __registerInstrument(self, instrument):
return _tbapi.TickLoader___registerInstrument(self, instrument)
def __send(self, message):
return _tbapi.TickLoader___send(self, message)
def __flush(self):
return _tbapi.TickLoader___flush(self)
def __close(self):
return _tbapi.TickLoader___close(self)
def __addListener(self, listener):
return _tbapi.TickLoader___addListener(self, listener)
def __removeListener(self, listener):
return _tbapi.TickLoader___removeListener(self, listener)
def __nErrorListeners(self):
return _tbapi.TickLoader___nErrorListeners(self)
TickLoader_swigregister = _tbapi.TickLoader_swigregister
TickLoader_swigregister(TickLoader)
class ErrorListener(_object):
__swig_setmethods__ = {}
__setattr__ = lambda self, name, value: _swig_setattr(self, ErrorListener, name, value)
__swig_getmethods__ = {}
__getattr__ = lambda self, name: _swig_getattr(self, ErrorListener, name)
__repr__ = _swig_repr
def onError(self, errorClass: str, errorMsg: str) -> None:
return self.__onError()
def __onError(self, errorClass, errorMsg):
return _tbapi.ErrorListener___onError(self, errorClass, errorMsg)
__swig_destroy__ = _tbapi.delete_ErrorListener
__del__ = lambda self: None
def __init__(self):
if self.__class__ == ErrorListener:
_self = None
else:
_self = self
this = _tbapi.new_ErrorListener(_self, )
try:
self.this.append(this)
except __builtin__.Exception:
self.this = this
def __disown__(self):
self.this.disown()
_tbapi.disown_ErrorListener(self)
return weakref_proxy(self)
ErrorListener_swigregister = _tbapi.ErrorListener_swigregister
ErrorListener_swigregister(ErrorListener)
from collections import defaultdict
def stream_to_dict(db, stream, fields=None, ts_from=0, ts_to=JAVA_LONG_MAX_VALUE):
if ts_to > JAVA_LONG_MAX_VALUE:
ts_to = JAVA_LONG_MAX_VALUE
if not db.isOpen():
raise Exception('Database is not opened.')
options = SelectionOptions()
options.to = ts_to
messages = []
table = defaultdict(list)
with open_TickCursor(stream, ts_from, options) as cursor:
counter = 0
while cursor.next():
message = vars(cursor.getMessage())
messages.append(message)
if fields is None:
def to_write(x):
return True
else:
def to_write(x):
return x in fields
for key in table.keys():
if key in message:
table[key].append(message[key])
del message[key]
else:
table[key].append(None)
for key in message:
if to_write(key):
table[key] = [None] * counter
table[key].append(message[key])
counter += 1
return table
import xml.etree.ElementTree as ETree
import re
class ClassType:
ENUM_CLASS = 'enumClass'
RECORD_CLASS = 'recordClass'
class FieldType:
NONSTATIC = 'nonStaticDataField'
STATIC = 'staticDataField'
class SchemaDef:
def __init__(self, types, all_types):
self.types = types
self.all_types = all_types
class ClassDescDef:
def __init__(self, name):
self.name = name
class TypeDef(ClassDescDef):
def __init__(self, name, fields):
ClassDescDef.__init__(self, name)
self.fields = fields
class EnumDef(ClassDescDef):
def __init__(self, name, values):
ClassDescDef.__init__(self, name)
self.values = values
class FieldDef:
def __init__(self, name, data_type, nullable, field_type):
self.name = name
self.data_type = data_type
self.nullable = nullable
self.field_type = field_type
class SchemaParser:
@staticmethod
def parse_Schema(stream):
xml = stream.metadata()
tree = ETree.fromstring(xml)
return SchemaParser.parse_classSet(tree)
@staticmethod
def parse_value(value, prefix):
return value.find('prefix:symbol', prefix).text
@staticmethod
def parse_field(field, prefix):
name = field.find('prefix:name', prefix)
field_type = SchemaParser.rm_brackets(list(field.attrib.values())[0])
if field_type != FieldType.STATIC and field_type != FieldType.NONSTATIC:
raise Exception('Unknown field type {}'.format(field_type))
data_type = field.find('prefix:type', prefix)
nullable = bool(data_type.find('prefix:nullable', prefix).text)
data_type = list(data_type.attrib.values())[0]
return FieldDef(name.text, data_type, nullable, field_type)
@staticmethod
def parse_classDescriptor(class_desc, prefix):
name = class_desc.find('prefix:name', prefix)
type_ = list(class_desc.attrib.values())[0]
if type_ == ClassType.RECORD_CLASS:
fields = [SchemaParser.parse_field(field, prefix) for field in class_desc.findall('prefix:field', prefix)]
return TypeDef(name.text, fields)
elif type_ == ClassType.ENUM_CLASS:
values = [SchemaParser.parse_value(value, prefix) for value in class_desc.findall('prefix:value', prefix)]
return EnumDef(name.text, values)
else:
raise Exception('Unknown class descriptor type {}'.format(type_))
@staticmethod
def parse_classSet(class_set):
types = []
all_types = []
prefix = {}
key = class_set.tag
prefix['prefix'] = re.search(r'(?<={).*(?=})', key).group(0)
for class_desc in class_set.findall('prefix:classDescriptor', prefix):
class_desc = SchemaParser.parse_classDescriptor(class_desc, prefix)
all_types.append(class_desc)
if isinstance(class_desc, TypeDef):
types.append(class_desc)
return SchemaDef(types, all_types)
@staticmethod
def rm_brackets(obj):
if isinstance(obj, str):
return re.sub('^{.*}', '', obj)
elif isinstance(obj, dict):
return {(SchemaParser.rm_brackets(key), value) for (key, value) in obj.items()}
else:
raise Exception('rm_brackets for {} is not implemented'.format(type(obj)))
def __getSchema(self):
return SchemaParser.parse_Schema(self)
TickStream.getSchema = __getSchema
# This file is compatible with both classic and new-style classes.