src/dxapi/native/tickdb/data_writer_impl.h (169 lines of code) (raw):
/*
* Copyright 2021 EPAM Systems, Inc
*
* See the NOTICE file distributed with this work for additional information
* regarding copyright ownership. Licensed under the Apache License,
* Version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
#pragma once
#include "tickdb/common.h"
#include <dxapi/data_writer.h>
#define LOADER_MAX_NESTED_OBJECTS 16
namespace DxApi {
class TickLoader;
class DataWriter;
}
namespace DxApiImpl {
class TickLoaderImpl;
class DataWriterInternal : public DxApi::DataWriter {
friend class DxApi::DataWriter;
friend class DxApi::TickLoader;
friend class DxApiImpl::TickLoaderImpl;
public:
//
struct ExpansionPoint {
uint32_t blockOffset; // offset of the start of the data block to be moved to the right
uint32_t headerLength; // contents of the 3-byte length field to be inserted at offset - 1, top byte contains additional length = [1..2]
INLINE ExpansionPoint(uint32_t o, uint32_t l) : blockOffset(o), headerLength(l) {}
};
INLINE size_t remaining() const
{
return dataEnd - dataPtr;
}
INLINE byte* data()
{
return dataPtr;
}
INLINE const byte* dataStart() const
{
return bufferStart;
}
INLINE size_t dataSize() const
{
return dataPtr - bufferStart;
}
INLINE size_t nestingDepth() const
{
return depth;
}
INLINE byte* resetPointer()
{
return (dataPtr = bufferStart);
}
// Reserve header field and save pointer to the byte following it, also reset other data structures
INLINE byte* startMessage();
// Low level writing access
INLINE void advance(uintptr_t amount)
{
dataPtr += amount;
if (dataPtr > dataEnd) {
onOverflow();
}
}
template <typename T> static INLINE void storeBE(uint8_t *p, const T &value)
{
_storeBE(p, value);
}
void putInterval(uint32_t value);
// Write nanoseconds (only)
void putCompressedNanosecondTime(int64_t value);
// Write milliseconds
void putCompressedMillisecondTime(int64_t value);
// Write PUINT61 or PUINT30
template <typename T> void writePackedUInt(T value);
void putDecimal(double value, unsigned precision);
void putDecimal(double value);
void putAlphanumericNull(uint32_t size);
template <typename T, typename P> void putAlphanumeric(uint32 fieldSize, const P *str, size_t stringLength);
template <typename T> void putAlphanumericInt(uint32_t fieldSize, T value);
//template<int n> void writeAlphanumeric(const char *str, uintptr stringLength, uintptr fieldSize);
//void writeAlphanumeric32(const char *s, uintptr dataSize, uintptr fieldSize);
//void writeAlphanumeric(const char *s, uintptr dataSize, uintptr fieldSize);
void depth0ExpectedError();
void expandedTooBigError(size_t size, size_t remaining);
void writeArrayStart(size_t nItems);
void writeArrayEnd();
void writeObjectStart(uint8_t typeIndex);
void writeObjectEnd();
// Write binary array header, get write pointer, for given size
byte* writeBinaryArrayEx(size_t size);
byte* finishMessage(const byte *messageStart);
byte* finishMessageCommon();
void setBuffer(byte *allocPtr, byte *startPtr, byte *endPtr);
size_t headerSize() { return headerSize_; }
void setHeaderSize(size_t size) { headerSize_ = size; }
DataWriterInternal();
protected:
bool writeSpecialDecimal(double value);
// Write uint, returning the actual number of nonzero bytes used for storing the value.
// Note: if the value is 0, the number of written bytes is also 0, if this behavior isn't desired, use an extra check
template <typename T> uintptr writeVariableLengthInt(byte *&headerPtr, T value);
void writePackedUInt22(unsigned value);
void reserveLengthField();
void openContainer(bool isArray);
void closeContainer(bool isArray);
static void expandHeaders(uint8_t * const pBufferStart, uint8_t *pEnd, intptr_t offset,
const ExpansionPoint points[], size_t n_points, const uint8_t * const pMessageStart);
template <typename T> INLINE void putLE(const T value, size_t size)
{
uint8_t *p;
assert(size <= 8);
_storeLE(p = dataPtr, value);
if ((dataPtr = p + size) > dataEnd) {
onOverflow();
}
}
byte* onOverflow();
protected:
byte *bufferAllocPtr;
byte *bufferStart;
size_t headerSize_;
/**
* Complex types support (Arrays/Objects)
*/
size_t depth;
byte *containerHeader[LOADER_MAX_NESTED_OBJECTS];
byte containerIsArray[LOADER_MAX_NESTED_OBJECTS];
uint32_t containerContentsExtraSize[LOADER_MAX_NESTED_OBJECTS + 1]; // Last level is written, never read
#if (LOADER_OBJ_IMPL == 2)
// offsets are relative to bufferStart
std::vector<ExpansionPoint> expansionPoints;
#endif
DISALLOW_COPY_AND_ASSIGN(DataWriterInternal);
};
DEFINE_IMPL_CLASS(DxApi::DataWriter, DataWriterInternal);
INLINE void DataWriterInternal::putAlphanumericNull(uint32 fieldSize)
{
// alphanumeric null is encoded by storing size + 1 in high bits
uintptr nSizeBits = _bsr(++fieldSize);
putBE<uint32>(fieldSize << (32 - nSizeBits), (nSizeBits + 7) >> 3);
}
INLINE byte* DataWriterInternal::finishMessageCommon()
{
if (0 != depth) {
depth0ExpectedError();
}
#if 1 == LOADER_TRIM_NULL_FIELDS
return min(dataPtr, dataEndNotNull);
#else
return dataPtr;
#endif
}
#if LOADER_OBJ_IMPL==1
INLINE byte * DataWriterInternal::finishMessage(const byte * messageStart)
{
return finishMessageCommon();
}
#elif LOADER_OBJ_IMPL==2
INLINE byte* DataWriterInternal::startMessage()
{
uint8_t *p;
// Write placeholder for length
_storeLE(p = dataPtr, 0);
if ((dataPtr = p += 4) > dataEnd) {
onOverflow();
}
dataEndNotNull = p + headerSize_; /* initialize with full header size including length */
expansionPoints.clear();
containerContentsExtraSize[0] = 0;
depth = 0;
return p;
}
INLINE byte* DataWriterInternal::finishMessage(const byte *messageStart)
{
byte *dataPtr = finishMessageCommon();
unsigned extraSize = containerContentsExtraSize[0];
assert((extraSize != 0) == (expansionPoints.size() != 0));
if (0 == extraSize) {
return dataPtr;
}
// This part only executes when there's at least one container object with content size >= 0x80,
// So the performance cost of the following code is amortized
if (dataPtr + extraSize > dataEnd) {
expandedTooBigError(dataPtr + extraSize - messageStart, dataEnd - messageStart);
}
assert(expansionPoints.data() != NULL);
expandHeaders(bufferStart, dataPtr, extraSize, expansionPoints.data(), expansionPoints.size(), messageStart);
return (this->dataPtr = dataPtr + extraSize);
}
#else
#error LOADER_OBJ_IMPL not defined
#endif
}
namespace DxApi {
INLINE DataWriter::DataWriter()
{}
}