in java/ws-server/src/main/java/com/epam/deltix/tbwg/webapp/services/charting/provider/TransformationServiceImpl.java [62:122]
public LinesQueryResult build(MessageSource source) {
BookSymbolQueryImpl query = (BookSymbolQueryImpl) source.getQuery();
long startTime = query.getInterval().getStartTimeMilli();
long endTime = query.getInterval().getEndTimeMilli();
long aggregation = aggregationCalculator.getAggregation(query.getInterval());
long newWindowSize = aggregationCalculator.getNewWindowSize(query.getInterval());
LinesQueryResult result = new LinesQueryResultImpl(query.getStream() + "[" + query.getSymbol() + "]", query.getInterval());
Observable<?> inputObservable = source.getInput()
.takeWhile(x -> x.getTimeStampMs() <= endTime + EXTEND_INTERVAL_MS);
if (legacy) {
inputObservable = inputObservable.lift(new LegacyToUniversalTransformation());
}
inputObservable = inputObservable.lift(new FeedStatusTransformation());
inputObservable = inputObservable.lift(new AdaptPeriodicityTransformation(query.getLevelsCount(), aggregation));
inputObservable = inputObservable.share();
// Levels
Observable<?> observable;
if (aggregation >= 60_000) {
observable = inputObservable.lift(
new UniversalL2SnapshotsToPointsTransformation(query.getSymbol(), query.getLevelsCount(), aggregation)
);
} else {
observable = inputObservable.lift(
new UniversalL2ToPointsTransformation(query.getSymbol(), query.getLevelsCount(), aggregation)
);
}
observable = observable.share();
for (int i = 0; i < query.getLevelsCount(); ++i) {
LevelPointToDtoTransformation bidLevelTransformation = new LevelPointToDtoTransformation(i, true, startTime, endTime);
result.getLines().add(
new LineResultImpl(
"BID[" + i + "]", observable.lift(bidLevelTransformation), aggregation, newWindowSize
)
);
LevelPointToDtoTransformation askLevelTransformation = new LevelPointToDtoTransformation(i, false, startTime, endTime);
result.getLines().add(
new LineResultImpl(
"ASK[" + i + "]", observable.lift(askLevelTransformation), aggregation, newWindowSize
)
);
}
// Trades
result.getLines().add(
new LineResultImpl(
"TRADES",
inputObservable.lift(new UniversalToTradeTransformation()).lift(new TradeTransformation(aggregation, startTime)),
aggregation, newWindowSize
)
);
return result;
}