in mbp-only/src/main/java/com/epam/cme/mdp3/core/control/InstrumentController.java [192:230]
void onSnapshotFullRefresh(final MdpFeedContext feedContext, final MdpMessage fullRefreshMsg) {
if (fullRefreshMsg.hasField(RPT_SEQ_NUM)) {
final InstrumentState currentState = this.state;
final long snptSeqNum = fullRefreshMsg.getUInt32(RPT_SEQ_NUM);
if (currentState == InstrumentState.INITIAL) {
/*logger.debug("Feed {}{} | #{} | Instrument: '{}'. State: {}. Got initial Snapshot. Initial prcdSeqNum is {}",
feedContext.getFeedType(), feedContext.getFeed(), snptSeqNum, this.getSecurityId(), this.state, snptSeqNum + 1);*/
this.processedRptSeqNum = snptSeqNum;
switchState(currentState, InstrumentState.SYNC);
handleSnapshotFullRefreshEntries(feedContext, fullRefreshMsg);
handleIncrementalQueue(feedContext, snptSeqNum);
/*logger.info("Feed {}{} | Instrument: '{}-({})'. Got initial Snapshot. Initial prcdSeqNum: {}. Last rptSeqNum in queue: {}",
feedContext.getFeedType(), feedContext.getFeed(), this.getSecurityId(), this.secDesc,
snptSeqNum, this.incrRefreshQueue.getLastRptSeqNum());*/
} else if (currentState == InstrumentState.OUTOFSYNC) {
if (snptSeqNum > this.processedRptSeqNum) {
/*logger.trace("Feed {}{} | #{} | Instrument: '{}'. State: {}. Got Snapshot to restore. Fast forward from {} to {}",
feedContext.getFeedType(), feedContext.getFeed(), snptSeqNum, this.getSecurityId(), this.state, this.processedRptSeqNum, snptSeqNum + 1);*/
this.processedRptSeqNum = snptSeqNum;
this.mdHandler.reset();
switchState(currentState, InstrumentState.SYNC);
handleSnapshotFullRefreshEntries(feedContext, fullRefreshMsg);
handleIncrementalQueue(feedContext, snptSeqNum);
/*logger.info("Feed {}{} | Instrument: '{}-({})'. Recovered from Snapshot. Snapshot prcdSeqNum: {}. Last rptSeqNum in queue: {}",
feedContext.getFeedType(), feedContext.getFeed(), this.getSecurityId(), this.secDesc,
snptSeqNum, this.incrRefreshQueue.getLastRptSeqNum());*/
}
} else if (currentState == InstrumentState.SYNC && snptSeqNum > this.processedRptSeqNum) {
if(logger.isTraceEnabled()) {
logger.trace("Feed {}:{} | #{} | Instrument: '{}'. State: {}. Snapshot with high sequence comes faster then Increments. Fast forward from {} to {}",
feedContext.getFeedType(), feedContext.getFeed(), snptSeqNum, this.getSecurityId(), this.state, processedRptSeqNum, snptSeqNum + 1);
}
this.processedRptSeqNum = snptSeqNum;
this.mdHandler.reset();
handleSnapshotFullRefreshEntries(feedContext, fullRefreshMsg);
handleIncrementalQueue(feedContext, snptSeqNum);
}
}
}