src/tick_loader.cpp (172 lines of code) (raw):
#include "tick_loader.h"
#include "python_common.h"
#include "codecs/message_codec.h"
#include <thread>
#include <chrono>
namespace TbApiImpl {
namespace Python {
TickLoader::TickLoader(DxApi::TickLoader *loader) {
loader_ = std::unique_ptr<DxApi::TickLoader>(loader);
const DxApi::Nullable<std::string> &metadata = loader->stream()->metadata();
if (!metadata.has_value())
THROW("Empty stream schema.");
descriptors_ = Schema::TickDbClassDescriptor::parseDescriptors(metadata.get(), true);
}
TickLoader::~TickLoader() {
Py_DECREF(TYPE_ID_PROPERTY1);
Py_DECREF(TYPE_NAME_PROPERTY1);
Py_DECREF(INSTRUMENT_ID_PROPERTY1);
Py_DECREF(SYMBOL_PROPERTY1);
Py_DECREF(TIMESTAMP_PROPERTY1);
clearListeners();
freeListeners();
if (loader_ == nullptr)
return;
if (!loader_->isClosed())
loader_->close();
}
void TickLoader::clearListeners() {
MutexHolder mutexHolder(&errorListenerLock_);
for (auto it = errorListeners_.begin(); it != errorListeners_.end(); ++it) {
LoaderErrorListener *errorListener = it->second;
if (errorListener != 0) {
loader_->removeListener(errorListener);
removedListeners_.push_back(errorListener);
}
}
errorListeners_.clear();
}
void TickLoader::freeListeners() {
MutexHolder mutexHolder(&errorListenerLock_);
for (auto l : removedListeners_) {
delete l;
}
removedListeners_.clear();
}
uint32_t TickLoader::registerType(const std::string &type_name) {
auto it = type_to_id_.find(type_name);
if (it == type_to_id_.end()) {
int32_t descriptor_id = findDescriptor(type_name);
if (descriptor_id == INT32_MIN)
THROW_EXCEPTION("Type '%s' not found in stream schema.", type_name.c_str());
std::shared_ptr<MessageCodec> new_message_codec =
std::shared_ptr<MessageCodec>(new MessageCodec(descriptors_, descriptor_id));
while (message_codecs_.size() <= next_id_)
message_codecs_.push_back(NULL);
message_codecs_[next_id_] = new_message_codec;
type_to_id_[type_name] = next_id_;
loader_->registerMessageType(next_id_, type_name);
return next_id_++;
}
return it->second;
}
uint32_t TickLoader::registerInstrument(const std::string &instrument) {
return loader_->getInstrumentId(instrument);
}
void TickLoader::send(PyObject *message) {
int32_t type_id = getTypeId(message);
int32_t instrument_id = getInstrumentId(message);
DxApi::TimestampMs timestamp = getTimestamp(message);
DxApi::DataWriter &writer = loader_->beginMessage(type_id, instrument_id, timestamp);
message_codecs_[type_id]->encode(message, writer);
loader_->send();
}
void TickLoader::flush() {
loader_->flush();
}
void TickLoader::close() {
clearListeners();
Py_BEGIN_ALLOW_THREADS;
try {
loader_->close();
}
catch (...) {
std::cout << "Error occured while closing loader" << std::endl;
}
Py_END_ALLOW_THREADS;
}
void TickLoader::addListener(DxApi::TickLoader::ErrorListener * listener) {
MutexHolder mutexHolder(&errorListenerLock_);
LoaderErrorListener * loaderListener = new LoaderErrorListener(listener);
errorListeners_[listener] = loaderListener;
loader_->addListener(loaderListener);
}
void TickLoader::addListener(DxApi::TickLoader::SubscriptionListener *listener) {
loader_->addListener(listener);
}
void TickLoader::removeListener(DxApi::TickLoader::ErrorListener *listener) {
MutexHolder mutexHolder(&errorListenerLock_);
LoaderErrorListener * loaderListener = errorListeners_[listener];
if (loaderListener != 0) {
loader_->removeListener(loaderListener);
errorListeners_.erase(listener);
removedListeners_.push_back(loaderListener);
}
}
void TickLoader::removeListener(DxApi::TickLoader::SubscriptionListener *listener) {
loader_->removeListener(listener);
}
size_t TickLoader::nErrorListeners() {
return loader_->nErrorListeners();
}
size_t TickLoader::nSubscriptionListeners() {
return loader_->nSubscriptionListeners();
}
int32_t TickLoader::getTypeId(PyObject *message) {
int32_t type_id;
bool exists = getInt32Value(message, TYPE_ID_PROPERTY, TYPE_ID_PROPERTY1, type_id);
if (!exists) {
type_id = getStrTypeId(message);
if (type_id == INT32_MIN)
THROW_EXCEPTION("Unkown type of message. Specify '%s' attribute.", TYPE_NAME_PROPERTY.c_str());
}
return type_id;
}
int32_t TickLoader::getStrTypeId(PyObject *message) {
std::string type_name;
bool exists = getStringValue(message, TYPE_NAME_PROPERTY, TYPE_NAME_PROPERTY1, type_name);
if (!exists)
return INT32_MIN;
return registerType(type_name);
}
int32_t TickLoader::getInstrumentId(PyObject *message) {
int32_t instrument_id;
bool exists = getInt32Value(message, INSTRUMENT_ID_PROPERTY, INSTRUMENT_ID_PROPERTY1, instrument_id);
if (!exists) {
instrument_id = getStrInstrumentId(message);
if (instrument_id == INT32_MIN)
THROW_EXCEPTION("Unknown instrument id. Specify '%s' property.", INSTRUMENT_ID_PROPERTY.c_str());
}
return instrument_id;
}
int32_t TickLoader::getStrInstrumentId(PyObject *message) {
std::string symbol;
bool exists = getStringValue(message, SYMBOL_PROPERTY, SYMBOL_PROPERTY1, symbol);
if (!exists)
return INT32_MIN;
if (symbol.empty())
THROW_EXCEPTION("Symbol is empty. Specify '%s' attribute for message.", SYMBOL_PROPERTY.c_str());
auto it = symbol_to_id_.find(symbol);
if (it != symbol_to_id_.end())
return it->second;
int32_t id = registerInstrument(symbol);
symbol_to_id_[symbol] = id;
return id;
}
DxApi::TimestampMs TickLoader::getTimestamp(PyObject *message) {
int64_t ret_value;
bool ok = getInt64Value(message, TIMESTAMP_PROPERTY, TIMESTAMP_PROPERTY1, ret_value);
if (!ok)
return DxApi::TIMESTAMP_UNKNOWN;
return ret_value;
}
int32_t TickLoader::findDescriptor(const std::string &name) {
for (int i = 0; i < descriptors_.size(); ++i) {
if (descriptors_[i].className == name) {
return i;
}
}
return INT32_MIN;
}
} // namespace Python
} // namespace TbApiImpl