in src/dxapi-tests.cpp [516:669]
bool test_l2msg_fmt1(TickDb& db) {
enum MsgType { UNKNOWN = 0, LEVEL2ACTION, CONNECTION_STATUS_CHANGE, L2MESSAGE, MF_RAW_MESSAGE, LEVEL2MESSAGE };
const char *messageNames[] = { "Unknown", "deltix.qsrv.hf.pub.Level2Action", "deltix.qsrv.hf.spi.data.ConnectionStatusChangeMessage",
"deltix.qsrv.hf.pub.L2Message", "deltix.qsrv.hf.plugins.data.mf.MarketFactoryRawMessage", "deltix.qsrv.hf.pub.Level2Message"
};
//TickStream stream = db->getStream("l2msg");
unique_ptr<TickStream>pstream(db.getStream("l2msg2"));
TickStream &stream = *pstream;
class Level2Action {
int level; // int16, non-nullable
bool isAsk; // boolean, non-nullable
int action; // enum 0..2
double price; // Float 64, nullable
double size; // Float 64, nullable
int numOfOrders; // int32, nullable
bool has_action, has_price, has_size, has_numOfOrders;
public:
inline void readFrom(DataReader& reader)
{
#define READER reader
READ(level, Int16);
READ(isAsk, Boolean);
READ_NULLABLE(action, Enum8); // TODO:
READ_NULLABLE(price, Float64);
READ_NULLABLE(size, Float64);
READ_NULLABLE(numOfOrders, Int32);
assert(level < 21);
assert(action < 20);
}
virtual std::string toString() const
{
char tmp[0x1000];
sprintf(tmp, "\n\t%1d %c %c %lf %lf %d", level, "BA??"[isAsk], "_+*-???"[action + 1], price, size, has_numOfOrders ? numOfOrders : -1);
return std::string(tmp);
}
};
class ConnectionStatusChange : public MarketMessage {
int status;
std::string cause;
bool has_cause, has_status;
public:
inline void readFrom(DataReader &reader)
{
READ_NULLABLE(status, Enum8);
has_cause = reader.readUTF8(cause);
}
virtual std::string toString() const { return MarketMessage::toString().append(" cause:").append(cause); }
};
class L2Message : public MarketMessage {
vector<Level2Action> actions;
std::string exchangeCode;
bool isImplied, isSnapshot;
int64_t sequenceId;
bool has_actions, has_exchangeCode, has_sequenceId;
public:
//void copyFrom(const MessageHeader &m) { *static_cast<MessageHeader *>(this) = m; }
inline void readFrom(DataReader &reader)
{
READ_NULLABLE(originalTimestamp, Timestamp);
READ_NULLABLE(currencyCode, Int16);
READ_NULLABLE(sequenceNumber, Int64);
intptr_t count;
//{ reader.skipArray(); return; }
count = reader.readArrayStart();
has_actions = false;
if (count >= 0) {
has_actions = true;
actions.resize(count);
for (intptr_t i = 0; i < count; ++i) {
//reader.readInt16(); // Skip first 2 bytes for now
if (reader.readObjectStart() >= 0) {
Level2Action act;
act.readFrom(reader);
actions[i] = act;
reader.readObjectEnd();
}
}
READ_NULLABLE2(exchangeCode, Alphanumeric, 10);
READ(isImplied, Boolean);
READ(isSnapshot, Boolean);
reader.readArrayEnd();
}
//READ_NULLABLE(sequenceId, Int64); // TODO: Sorry, will get to it later
}
virtual std::string toString() const
{
char tmp[0x1000];
sprintf(tmp, " currency: %d, actions(%d): ", (int)currencyCode, (int)actions.size());
std::string s = MarketMessage::toString().append(tmp);
for (intptr_t i = 0, count = actions.size(); i < count; ++i) {
s/*.append(i ? "," : "")*/.append(actions[i].toString());
}
return s;
}
};
SelectionOptions opt;
vector<string> types;
vector<InstrumentIdentity> symbols;
unique_ptr<TickCursor> pCursor(stream.select(TIMESTAMP_NULL, opt, NULL, NULL));
TickCursor &cursor = *pCursor;
for (intptr_t i = 0; i < COUNTOF(messageNames); ++i)
cursor.registerMessageType((unsigned)i, messageNames[i]);
//MessageHeader msg;
MarketMessage marketMessage; // We are using this as a default type
Level2Action l2Action;
L2Message l2Message;
ConnectionStatusChange connectionStatusChange;
marketMessage.cursor = &cursor;
START_TEST(L2Msg1);
DataReader &reader = cursor.getReader();
// Note: C++ API is not a priority, it will be made better-looking, but that code is not useful for Managed API, so it is not written yet.
while (cursor.next(&marketMessage) && nMessagesRead < 5000000) {
//MarketMessage *msg = &marketMessage;
switch (marketMessage.typeIdLocal) {
//case LEVEL2ACTION:
case CONNECTION_STATUS_CHANGE:
connectionStatusChange.copyHeaderFrom(marketMessage);
connectionStatusChange.readFrom(reader);
//msg = &connectionStatusChange;
//puts(connectionStatusChange.toString().c_str());
break;
case L2MESSAGE:
l2Message.copyHeaderFrom(marketMessage);
l2Message.readFrom(reader);
PRINT(l2Message.toString().c_str());
break;
break;
case MF_RAW_MESSAGE:
case LEVEL2MESSAGE:
//puts(marketMessage.toString().append(*marketMessage.getTypeName()).c_str());
break;
default:
printf("Unknown message <%s>. id: %d -> localId: %d", marketMessage.getTypeName()->c_str(), (int)marketMessage.typeId, (int)marketMessage.typeIdLocal);
//puts(marketMessage.toString().c_str());
}
// Print contents
//puts(msg->toString().c_str());
++nMessagesRead;
}
END_READING(L2Msg1);
return true;
}