in mbp-only/src/main/java/com/epam/cme/mdp3/core/control/InstrumentController.java [232:279]
void onIncrementalRefresh(final MdpFeedContext feedContext, final long msgSeqNum, final short matchEventIndicator, final MdpGroupEntry incrRefreshEntry) {
final InstrumentState currentState = this.state;
if (incrRefreshEntry.hasField(RPT_SEQ_NUM)) {
final long rptSeqNum = incrRefreshEntry.getUInt32(RPT_SEQ_NUM);
if(logger.isDebugEnabled()) {
logger.debug("Feed {}:{} | #{} | RPT#{} | SecurityId={}, state={}, prcd={}",
feedContext.getFeedType(), feedContext.getFeed(), msgSeqNum, rptSeqNum, this.getSecurityId(), this.state, this.processedRptSeqNum);
}
final long expectedRptSeqNum = this.processedRptSeqNum + 1;
if (currentState == InstrumentState.SYNC) {
if (rptSeqNum == expectedRptSeqNum) {
this.processedRptSeqNum = rptSeqNum;
handleIncrementalRefreshEntry(msgSeqNum, matchEventIndicator, incrRefreshEntry);
handleIncrementalQueue(feedContext, rptSeqNum);
} else if (rptSeqNum > expectedRptSeqNum) {
pushIncrementalRefreshEntryInQueue(msgSeqNum, matchEventIndicator, rptSeqNum, incrRefreshEntry);
if (rptSeqNum > (expectedRptSeqNum + gapThreshold)) {
/*logger.info("Feed {}{} | SecurityId={}-({}), Processed rptSeqNum:{}. Received:{}",
feedContext.getFeedType(), feedContext.getFeed(), this.getSecurityId(), this.secDesc,
this.processedRptSeqNum, rptSeqNum);*/
this.mdHandler.reset();
switchState(InstrumentState.SYNC, InstrumentState.OUTOFSYNC);
}
}
} else if (currentState == InstrumentState.OUTOFSYNC) {
if (rptSeqNum == expectedRptSeqNum) {
if(logger.isTraceEnabled()) {
logger.trace("Feed {}{} | #{} | Instrument: '{}'. State: {}. Got expected increment to restore: #{}",
feedContext.getFeedType(), feedContext.getFeed(), msgSeqNum, this.getSecurityId(), this.state, expectedRptSeqNum);
}
this.processedRptSeqNum = rptSeqNum;
switchState(currentState, InstrumentState.SYNC);
handleIncrementalRefreshEntry(msgSeqNum, matchEventIndicator, incrRefreshEntry);
handleIncrementalQueue(feedContext, rptSeqNum);
} else if (rptSeqNum > expectedRptSeqNum) {
pushIncrementalRefreshEntryInQueue(msgSeqNum, matchEventIndicator, rptSeqNum, incrRefreshEntry);
}
} else if (currentState == InstrumentState.INITIAL) {
if (processedRptSeqNum == 0 && rptSeqNum == 1) {
this.processedRptSeqNum = rptSeqNum;
switchState(currentState, InstrumentState.SYNC);
handleIncrementalRefreshEntry(msgSeqNum, matchEventIndicator, incrRefreshEntry);
} else {
pushIncrementalRefreshEntryInQueue(msgSeqNum, matchEventIndicator, rptSeqNum, incrRefreshEntry);
}
}
}
}