jupyter/demo.py (146 lines of code) (raw):
# deltix TimeBase client
import tbapi
from tbapi import InstrumentMessage
# FINOS Perspective
import perspective
from perspective import Table
# OrderBook
from orderbook import Book, to_dict
# Other libs
import threading
import asyncio
from datetime import datetime
import time
from datetime import datetime
from typing import List, Iterable
from sortedcollections import SortedDict, ItemSortedDict
import ipywidgets as widgets
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
def current_milli_time():
return round(time.time() * 1000)
class Demo:
def __init__(self, tb_url: str, stream_key: str, symbol: str, record_type: str, time_widget: widgets.Text, booksize=20):
self.tb_url = tb_url
self.stream_key = stream_key
self.symbol = symbol
self.record_type = record_type
self.time_widget = time_widget
self.booksize = booksize
self.schema = {
'key': str,
'symbol': str,
'side': str,
'size': float,
'price': float,
'numberOfOrders': int
}
self.table = Table(self.schema, limit=1000, index='key')
self.book = Book(symbol)
self.last_updated = 0
self.stop_reading = False
self.init_book()
def process_entry_update(self, entry: InstrumentMessage) -> None:
if entry.action == 'DELETE':
self.book.remove(entry.side, entry.price)
t = time.time()
if t - self.last_updated >= 0.5:
self.last_updated = t
self.table.update(self.book.get_bids(size=self.booksize))
self.table.update(self.book.get_asks(size=self.booksize))
elif entry.action == 'UPDATE':
e = to_dict(self.symbol, entry)
self.book.update(e)
t = time.time()
if t - self.last_updated >= 0.5:
self.last_updated = t
self.table.update(self.book.get_bids(size=self.booksize))
self.table.update(self.book.get_asks(size=self.booksize))
else:
raise Exception(f'Unknown action type: {entry.action}')
def process_entry_new(self, entry: InstrumentMessage) -> None:
e = to_dict(self.symbol, entry)
self.book.update(e)
t = time.time()
if t - self.last_updated >= 0.5:
self.last_updated = t
self.table.update(self.book.get_bids(size=self.booksize))
self.table.update(self.book.get_asks(size=self.booksize))
def process_snapshot(self, entries) -> None:
self.book.clear()
self.book.update(*map(lambda e: to_dict(self.symbol, e), entries))
t = time.time()
if t - self.last_updated >= 0.5:
self.last_updated = t
self.table.update(self.book.get_bids(size=self.booksize))
self.table.update(self.book.get_asks(size=self.booksize))
def init_book(self):
db = tbapi.TickDb.createFromUrl(self.tb_url)
try:
db.open(True)
stream = db.getStream(self.stream_key)
options = tbapi.SelectionOptions()
try:
cursor = db.select(current_milli_time() - 10000, [stream], options,
[self.record_type],
[self.symbol])
while cursor.next():
msg = cursor.getMessage()
if msg.packageType == 'PERIODICAL_SNAPSHOT':
self.process_snapshot(msg.entries)
break
finally:
cursor.close()
finally:
db.close()
async def read_cursor(self):
db = tbapi.TickDb.createFromUrl(self.tb_url)
try:
db.open(True)
stream = db.getStream(self.stream_key)
options = tbapi.SelectionOptions()
options.live = True
try:
cursor = db.select(current_milli_time(), [stream], options,
[self.record_type],
[self.symbol])
initialized = False
while cursor.next() and not self.stop_reading and not initialized:
msg = cursor.getMessage()
if msg.packageType == 'PERIODICAL_SNAPSHOT' or msg.packageType == 'VENDOR_SNAPSHOT':
logging.info('received snapshot')
self.process_snapshot(msg.entries)
initialized = True
self.time_widget.value = str(datetime.fromtimestamp(msg.timestamp / 10 ** 9))
while cursor.next() and not self.stop_reading:
msg = cursor.getMessage()
if msg.packageType == 'INCREMENTAL_UPDATE':
for entry in msg.entries:
if entry.typeName.endswith('L2EntryUpdate'):
self.process_entry_update(entry)
elif entry.typeName.endswith('L2EntryNew'):
self.process_entry_new(entry)
elif msg.packageType == 'PERIODICAL_SNAPSHOT' or msg.packageType == 'VENDOR_SNAPSHOT':
self.process_snapshot(msg.entries)
self.time_widget.value = str(datetime.fromtimestamp(msg.timestamp / 10 ** 9))
finally:
cursor.close()
finally:
db.close()
def update_table(self):
logging.info('Started streaming!')
loop = asyncio.new_event_loop()
task = loop.create_task(self.read_cursor())
loop.call_later(60, task.cancel)
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
logging.info("Stopped streaming!")
pass
def start(self):
self.stop_reading = False
self.thread = threading.Thread(target=self.update_table)
self.thread.start()
def stop(self):
self.stop_reading = True
self.thread.join()
def clear(self):
self.table.clear()
self.book.clear()