include/native/dxapi/data_reader.h (381 lines of code) (raw):
/*
* Copyright 2021 EPAM Systems, Inc
*
* See the NOTICE file distributed with this work for additional information
* regarding copyright ownership. Licensed under the Apache License,
* Version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
#pragma once
#include "dxcommon.h"
#include "dxconstants.h"
// PUBLIC API
// Version of implementation. Temporary defines for internal use
#define SR_IMPL 1
//#define FAKESKIP 1
// SR_VERIFY 1 turns on schema verification
//#define SR_VERIFY 1
// SR_VERIFY 2 turns on read method logging, only compiles in debug mode
#ifdef _DEBUG
//#define SR_VERIFY 2
#endif
namespace DxApi {
class DataReader {
public:
/**********************************************************************
* Uncompressed integer types
*/
// Integers
uint8_t readUInt8();
int8_t readInt8();
int16_t readInt16();
int32_t readInt32();
uint64_t readUInt40();
int64_t readInt48();
int64_t readInt64();
void skipInt8();
void skipInt16();
void skipInt32();
void skipInt48();
void skipInt64();
// Compressed integers, all are nullable
uint32_t readPUInt30();
uint64_t readPUInt61();
uint32_t readInterval();
int64_t readDateTimeMs();
int64_t readDateTimeNs();
void skipPUInt30();
void skipPUInt61();
void skipInterval();
void skipDateTime();
// Floating point types
double readFloat64();
float readFloat32();
// Compressed floating point
double readDecimal();
void skipDecimal();
// Timestamp (uncompressed int64)
int64_t readTimestamp();
// TimeOfDay
int32_t readTimeOfDay();
// Enum types
int readEnum8();
int readEnum16();
int readEnum32();
int64_t readEnum64();
// Other types
bool readBoolean();
int8_t readNullableBooleanInt8();
wchar_t readWChar();
/**********************************************************************
* Nullable types, return false if null
*/
// Integers
bool readInt8(int8_t &value);
bool readInt16(int16_t &value);
bool readInt32(int32_t &value);
bool readInt48(int64_t &value);
bool readInt64(int64_t &value);
// Compressed unsigned integers
bool readPUInt30(uint32_t &value);
bool readPUInt61(uint64_t &value);
bool readPUInt30(int32_t &value);
bool readPUInt61(int64_t &value);
bool readInterval(int32_t &value);
bool readInterval(uint32_t &value);
// Floating point types
bool readFloat64(double &value);
bool readFloat32(float &value);
// Compressed floating point
bool readDecimal(double &value);
// Timestamp
bool readTimestamp(int64_t &value);
// TimeOfDay
//INLINE bool getTimeOfDay(int32_t &value);
// Enum types
bool readEnum8(int &value);
bool readEnum16(int &value);
bool readEnum32(int &value);
bool readEnum64(int64_t &value);
// Other types
bool readNullableBoolean(bool &value);
bool readWChar(wchar_t &value);
public:
// Strings and raw data
template <typename Q> bool readAlphanumeric(std::basic_string<Q, std::char_traits<Q>, std::allocator<Q> > &value, uint32_t maxFieldLength);
bool readAlphanumeric(char value[], uint32_t maxFieldLength);
template <typename T> T readAlphanumericAs(uint32_t maxFieldLength);
void skipAlphanumeric(uint32_t maxFieldLength);
// returns true if not NULL
bool readUTF8(std::string &value);
void skipUTF8();
bool readAscii(std::string &value);
void skipAscii();
bool readBinary(std::vector<uint8_t> &value);
void skipBinary();
// Array
// Returns array length(number of items, starting from 0). Returns INT32_NULL if NULL
int32_t readArrayStart();
void readArrayEnd();
void skipArray();
// Reads object header. Returns object type index (0 .. 0xFF) Returns INT32_NULL if NULL
int32_t readObjectStart();
void readObjectEnd();
void skipObject();
// true if read pointer is at the end of the message
bool endOfContent() const;
// Skip the specified amount of compressed data bytes.
void skip(size_t numBytes);
protected:
uint8_t getByte();
uint16_t getUInt16();
uint32_t getUInt32();
uint64_t getUInt64();
// copy bytes from string into buffer, must hold at least dataLength + 1 bytes
void getUTF8chars(uint8_t *dstBuffer, size_t size);
void getUTF8chars(char *dstBuffer, size_t size);
// Currently conversion is done elsewhere, so this version is not supported.
// TODO: cleanup
// void getUTF8chars(wchar_t *dstBuffer, size_t size);
// Stubs that redirects to virtual member call, preferably without inlining.
const uint8_t* onReadBarrier(const uint8_t *from, size_t size);
void onSkipBarrier(const uint8_t *from, size_t size);
void onReadTypeMismatch(uint8_t t);
void onSkipTypeMismatch(uint8_t t);
// Get constant pointer to the specified number of bytes from the input stream and advance the read pointer by that amount
// bytes are guaranteed to be available for reading until the next getXX/bytes() call
const uint8_t* bytes(size_t size);
// Should not be called directly by any code visible through this header file
virtual const uint8_t* get(const uint8_t * from, size_t size) = 0;
virtual void skip(const uint8_t * from, size_t size) = 0;
// Low-level data access functions
template <typename P, typename Q> static INLINE const P& as(const Q &x) { return DxApiImpl::_as<P, Q>(x); }
template <typename T> INLINE T loadBE() { return DxApiImpl::_loadBE<T>(bytes(sizeof(T))); }
template <typename T, int NBITS> INLINE T loadBE() { return DxApiImpl::_loadBE<T>(bytes((NBITS + 7) >> 3)) >> ((8 * sizeof(T)) - NBITS); }
DataReader();
DISALLOW_COPY_AND_ASSIGN(DataReader);
protected:
const uint8_t *dataPtr; // Read pointer
const uint8_t *dataEnd; // Valid data end barrier
const uint8_t *contentEnd; // End of the current object/array/message
const uint32_t *schemaPtr; //
};
// TODO: All this debug/verification code should be moved into separate header that is set to empty for release build
#if !defined(SR_VERIFY) || SR_VERIFY == 0
#define SR_LOG_READ(OUT_TYPE, TYPE_NAME)
#define SR_LOG_SKIP(TYPE_NAME)
#define SR_LOG (void)
#define SR_READ(OUT_TYPE, TYPE_NAME, ENUM_TYPE_NAME) OUT_TYPE DataReader::read##TYPE_NAME() {
#define SR_READ_NULLABLE(OUT_TYPE, TYPE_NAME, ENUM_TYPE_NAME) bool DataReader::read##TYPE_NAME(OUT_TYPE & value) {
#define SR_SKIP(TYPE_NAME, ENUM_TYPE_NAME) void DataReader::skip##TYPE_NAME() { if (endOfContent()) return;
#elif SR_VERIFY == 1
#define SR_LOG_READ(OUT_TYPE, TYPE_NAME)
#define SR_LOG_SKIP(TYPE_NAME)
#define SR_LOG (void)
#error Update verification code to use nullable flag and sizes
#define SR_READ(OUT_TYPE, TYPE_NAME, ENUM_TYPE_NAME) OUT_TYPE DataReader::read##TYPE_NAME() { \
if ((*(uint8_t *)(schemaPtr++) & 0x1F) != (uint8_t)FieldType::ENUM_TYPE_NAME) onReadTypeMismatch((uint8_t)FieldType::ENUM_TYPE_NAME);
#define SR_READ_NULLABLE(OUT_TYPE, TYPE_NAME, ENUM_TYPE_NAME) bool DataReader::read##TYPE_NAME(OUT_TYPE & value) { \
if ((*(uint8_t *)(schemaPtr++) & 0x1F) != (uint8_t)FieldType::ENUM_TYPE_NAME) onReadTypeMismatch((uint8_t)FieldType::ENUM_TYPE_NAME);
#define SR_SKIP(TYPE_NAME, ENUM_TYPE_NAME) void DataReader::skip##TYPE_NAME() { \
if ((*(uint8_t *)(schemaPtr++) & 0x1F) != (uint8_t)FieldType::ENUM_TYPE_NAME) onSkipTypeMismatch((uint8_t)FieldType::ENUM_TYPE_NAME); \
if (endOfContent()) return;
#else
#define SR_LOG_READ(OUT_TYPE, TYPE_NAME) \
DBGLOG(" " #OUT_TYPE " read" #TYPE_NAME "()");
#define SR_LOG_SKIP(TYPE_NAME) \
DBGLOG(" skip" #TYPE_NAME "()");
#define SR_LOG DBGLOG
#define SR_READ(OUT_TYPE, TYPE_NAME, ENUM_TYPE_NAME) OUT_TYPE DataReader::read##TYPE_NAME() { \
SR_LOG_READ(OUT_TYPE, TYPE_NAME);
#define SR_READ_NULLABLE(OUT_TYPE, TYPE_NAME, ENUM_TYPE_NAME) bool DataReader::read##TYPE_NAME(OUT_TYPE & value) { \
SR_LOG_READ(OUT_TYPE, TYPE_NAME);
#define SR_SKIP(TYPE_NAME, ENUM_TYPE_NAME) void DataReader::skip##TYPE_NAME() { \
SR_LOG_SKIP(TYPE_NAME); \
if (endOfContent()) return;
#endif
/*************************************************************************
* DataReader public inline methods implementation
*/
INLINE bool DataReader::endOfContent() const
{
return dataPtr == contentEnd;
}
INLINE SR_READ(uint8_t, UInt8, INT8)
return endOfContent() ? INT8_NULL : getByte();
}
INLINE SR_READ(int8_t, Int8, INT8)
return endOfContent() ? INT8_NULL : (int8_t)getByte();
}
INLINE SR_READ(int16_t, Int16, INT16)
return endOfContent() ? INT16_NULL : (int16_t)getUInt16();
}
INLINE SR_READ(int32_t, Int32, INT32)
return endOfContent() ? INT32_NULL : (int32_t)getUInt32();
}
// I dont have null value for this type.
INLINE SR_READ(uint64_t, UInt40, UNKNOWN)
return loadBE<uint64_t, 40>();
//return endOfContent() ? INT40_NULL : loadBE<uint64_t, 40>();
}
INLINE SR_READ(int64_t, Int48, INT48)
return endOfContent() ? INT48_NULL : loadBE<int64_t, 48>();
}
INLINE SR_READ(int64_t, Int64, INT64)
return endOfContent() ? INT64_NULL : getUInt64();
}
INLINE SR_READ(double, Float64, FLOAT64)
return endOfContent() ? FLOAT64_NULL : loadBE<double>();
}
INLINE SR_READ(float, Float32, FLOAT32)
return endOfContent() ? FLOAT32_NULL : loadBE<float>();
}
INLINE SR_READ(int64_t, Timestamp, INT64)
return endOfContent() ? TIMESTAMP_NULL : getUInt64();
}
INLINE SR_READ(int32_t, TimeOfDay, TIME_OF_DAY)
return endOfContent() ? TIMEOFDAY_NULL : getUInt32();
}
INLINE SR_READ(int, Enum8, ENUM8)
if (endOfContent()) {
return ENUM_NULL;
}
return loadBE<int8_t>();
}
INLINE SR_READ(int, Enum16, ENUM16)
if (endOfContent()) {
return ENUM_NULL;
}
return loadBE<int16_t>();
}
INLINE SR_READ(int, Enum32, ENUM32)
if (endOfContent()) {
return ENUM_NULL;
}
return loadBE<int32_t>();
}
INLINE SR_READ(int64_t, Enum64, ENUM64)
if (endOfContent()) {
return ENUM_NULL;
}
return loadBE<int64_t>();
}
INLINE SR_READ(bool, Boolean, BOOL)
return 1 == getByte();
}
INLINE SR_READ(int8_t, NullableBooleanInt8, BOOL)
return endOfContent() ? (int8_t)BOOL_NULL : (int8_t)getByte();
}
INLINE SR_READ(wchar_t, WChar, CHAR)
return endOfContent() ? CHAR_NULL : getUInt16();
}
INLINE SR_SKIP(Int8, INT8)
skip(1);
}
INLINE SR_SKIP(Int16, INT16)
skip(2);
}
INLINE SR_SKIP(Int32, INT32)
skip(4);
}
INLINE SR_SKIP(Int48, INT48)
skip(6);
}
INLINE SR_SKIP(Int64, INT64)
skip(8);
}
INLINE SR_SKIP(PUInt61, PUINT61)
skip(getByte() >> 5);
}
INLINE SR_SKIP(PUInt30, PUINT30)
skip(getByte() >> 6);
}
INLINE SR_SKIP(Interval, PINTERVAL)
skip(0xC0 == (0xC0 & getByte()) ? 4 : 0);
}
INLINE SR_SKIP(UTF8, STRING)
unsigned l = getUInt16();
skip(0xFFFF == l ? 0 : l);
}
INLINE SR_SKIP(Ascii, STRING)
unsigned l = getUInt16();
skip(0xFFFF == l ? 0 : l);
}
/**********************************************************************
* Nullable types, return false if null.
*/
// INLINE bool DataReader::readInt8(int8_t &value)
INLINE SR_READ_NULLABLE(int8_t, Int8, INT8)
return !endOfContent() && INT8_NULL != (value = getByte());
}
//INLINE bool DataReader::readInt16(int16_t &value)
INLINE SR_READ_NULLABLE(int16_t, Int16, INT16)
if (endOfContent()) return false;
int16_t x = loadBE<int16_t>();
return INT16_NULL != (value = x);
}
//INLINE bool DataReader::readInt32(int32_t &value)
INLINE SR_READ_NULLABLE(int32_t, Int32, INT32)
if (endOfContent()) return false;
int32_t x = loadBE<int32_t>();
return INT32_NULL != (value = x);
}
//INLINE bool DataReader::readInt48(int64_t &value)
INLINE SR_READ_NULLABLE(int64_t, Int48, INT48)
if (endOfContent()) return false;
int64_t x = loadBE<int64_t, 48>();
return INT48_NULL != (value = x);
}
//INLINE bool DataReader::readInt64(int64_t &value)
INLINE SR_READ_NULLABLE(int64_t, Int64, INT64)
if (endOfContent()) return false;
int64_t x = loadBE<int64_t>();
return INT64_NULL != (value = x);
}
//INLINE bool DataReader::readUInt30(uint32_t &value)
INLINE SR_READ_NULLABLE(uint32_t, PUInt30, PUINT30)
if (endOfContent()) return false;
uint32_t x = readPUInt30();
return UINT30_NULL != (value = x);
}
//INLINE bool DataReader::readUInt30(int32_t &value)
INLINE SR_READ_NULLABLE(int32_t, PUInt30, PUINT30)
if (endOfContent()) return false;
uint32_t x = readPUInt30();
return UINT30_NULL != (value = x);
}
//INLINE bool DataReader::readUInt61(uint64_t &value)
// TODO: Double check here etc.
INLINE SR_READ_NULLABLE(uint64_t, PUInt61, PUINT61)
if (endOfContent()) return false;
uint64_t x = readPUInt61();
return UINT61_NULL != (value = x);
}
//INLINE bool DataReader::readUInt61(int64_t &value)
INLINE SR_READ_NULLABLE(int64_t, PUInt61, PUINT61)
if (endOfContent()) return false;
uint64_t x = readPUInt61();
return UINT61_NULL != (value = x);
}
//INLINE bool DataReader::readInterval(uint32_t &value)
INLINE SR_READ_NULLABLE(uint32_t, Interval, PINTERVAL)
if (endOfContent()) return false;
uint32_t x = readInterval();
return INTERVAL_NULL != (value = x);
}
//INLINE bool DataReader::readInterval(int32_t &value)
INLINE SR_READ_NULLABLE(int32_t, Interval, PINTERVAL)
if (endOfContent()) return false;
uint32_t x = readInterval();
return INTERVAL_NULL != (value = x);
}
// Floating point types
//INLINE bool DataReader::readFloat64(double &value)
INLINE SR_READ_NULLABLE(double, Float64, FLOAT64)
if (endOfContent()) return false;
uint64_t x = getUInt64();
value = as<double>(x);
return float64null.u == x;
}
//INLINE bool DataReader::readFloat32(float &value)
INLINE SR_READ_NULLABLE(float, Float32, FLOAT32)
if (endOfContent()) return false;
uint32_t x = getUInt32();
value = as<float>(x);
return float32null.u == x;
}
/*INLINE bool getDecimal(double &value) {
x = getDecimal();
return DECIMAL_NULL == (value = x);
}*/
// Enum types
//INLINE bool DataReader::readEnum8(int &value)
INLINE SR_READ_NULLABLE(int, Enum8, ENUM8)
if (endOfContent()) return false;
return (value = loadBE<int8_t>()) != ENUM_NULL;
}
//INLINE bool DataReader::readEnum16(int &value)
INLINE SR_READ_NULLABLE(int, Enum16, ENUM16)
if (endOfContent()) return false;
return (value = loadBE<int16_t>()) != ENUM_NULL;
}
//INLINE bool DataReader::readEnum32(int &value)
INLINE SR_READ_NULLABLE(int, Enum32, ENUM32)
if (endOfContent()) return false;
return (value = loadBE<int32_t>()) != ENUM_NULL;
}
//INLINE bool DataReader::readEnum64(int64_t &value)
INLINE SR_READ_NULLABLE(int64_t, Enum64, ENUM64)
if (endOfContent()) return false;
return (value = loadBE<int64_t>()) != ENUM_NULL;
}
//INLINE bool DataReader::readTimestamp(int64_t &value)
INLINE SR_READ_NULLABLE(int64_t, Timestamp, INT64)
if (endOfContent()) return false;
int64_t x = readInt64();
return TIMESTAMP_NULL != (value = x);
}
//INLINE bool DataReader::readNullableBoolean(bool &value)
INLINE SR_READ_NULLABLE(bool, NullableBoolean, BOOL)
if (endOfContent()) return false;
uint8_t x = getByte();
value = 1 == x;
return BOOL_NULL != x;
}
//INLINE bool DataReader::readWChar(wchar_t &value)
INLINE SR_READ_NULLABLE(wchar_t, WChar, CHAR)
if (endOfContent()) return false;
return CHAR_NULL != (value = getUInt16());
}
/*********************************************************************
* Private methods
*/
INLINE uint8_t DataReader::getByte()
{
return loadBE<uint8_t>();
}
INLINE uint16_t DataReader::getUInt16()
{
return loadBE<uint16_t>();
}
INLINE uint32_t DataReader::getUInt32()
{
return loadBE<uint32_t>();
}
INLINE uint64_t DataReader::getUInt64()
{
return loadBE<uint64_t>();
}
INLINE void DataReader::skip(size_t size)
{
#if (FAKESKIP)
while (size--) getByte();
#else
const uint8_t * p = dataPtr;
if ((dataPtr = (p + size)) > dataEnd) {
onSkipBarrier(p, size);
}
#endif
}
// Stub that redirects to virtual member call, preferably without inlining. See fetchData definition.
#if 1 == SR_IMPL
INLINE const uint8_t* DataReader::bytes(size_t size)
{
const uint8_t *p = dataPtr;
if ((dataPtr = (p + size)) > dataEnd) {
p = onReadBarrier(p, size);
}
return p;
}
#elif 2==SR_IMPL
#error Removed
#elif 3==SR_IMPL
#error Removed
#endif
} // namespace DxApi