in mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java [91:156]
public void handleSnapshotPacket(MdpFeedContext feedContext, MdpPacket mdpPacket) {
final long pkgSequence = mdpPacket.getMsgSeqNum();
if(log.isTraceEnabled()) {
log.trace("Feed {}:{} | handleSnapshotPacket: previous processed sequence '{}', current packet's sequence '{}'",
feedContext.getFeedType(), feedContext.getFeed(), lastProcessedSeqNum, pkgSequence);
}
try {
lock.lock();
if(mdpPacket.getMsgSeqNum() == 1) {
if(receivingCycle) {
smallestSnapshotSequence = mboCycleHandler.getSmallestSnapshotSequence();
highestSnapshotSequence = mboCycleHandler.getHighestSnapshotSequence();
if (smallestSnapshotSequence != SnapshotCycleHandler.SNAPSHOT_SEQUENCE_UNDEFINED
&& highestSnapshotSequence != SnapshotCycleHandler.SNAPSHOT_SEQUENCE_UNDEFINED
&& mbpCycleHandler.getSmallestSnapshotSequence() != SnapshotCycleHandler.SNAPSHOT_SEQUENCE_UNDEFINED
&& mbpCycleHandler.getHighestSnapshotSequence() != SnapshotCycleHandler.SNAPSHOT_SEQUENCE_UNDEFINED) {
if(mbpCycleHandler.getSmallestSnapshotSequence() != smallestSnapshotSequence
|| mbpCycleHandler.getHighestSnapshotSequence() != highestSnapshotSequence) {
log.error("MBP(Highest '{}', Smallest '{}') and MBO(Highest '{}', Smallest '{}') snapshots are not synchronized",
mbpCycleHandler.getHighestSnapshotSequence(), mbpCycleHandler.getSmallestSnapshotSequence(),
mboCycleHandler.getHighestSnapshotSequence(), mboCycleHandler.getSmallestSnapshotSequence());
}
lastProcessedSeqNum = highestSnapshotSequence;
snapshotRecoveryManager.stopRecovery();
switchState(ChannelState.SYNC);
if (log.isInfoEnabled()) {
log.info("{} Packets added to buffer during initial or outofsync event", packetsInBufferDuringInitialOrOutOfSync);
}
packetsInBufferDuringInitialOrOutOfSync = 0;
processMessagesFromBuffer(feedContext);
receivingCycle = false;
numberOfTCPAttempts = 0;
}
} else {
mboCycleHandler.reset();
mbpCycleHandler.reset();
receivingCycle = true;
}
}
switch (currentState) {
case INITIAL:
case OUTOFSYNC:
if(receivingCycle) {
for (MdpMessage mdpMessage : mdpPacket) {
updateSemanticMsgType(mdpMessageTypes, mdpMessage);
long lastMsgSeqNumProcessed = mdpMessage.getUInt32(MdConstants.LAST_MSG_SEQ_NUM_PROCESSED);
int securityId = mdpMessage.getInt32(MdConstants.SECURITY_ID);
long totNumReports = mdpMessage.getUInt32(MdConstants.TOT_NUM_REPORTS);
if (isMBOSnapshot(mdpMessage)) {
long noChunks = mdpMessage.getUInt32(MdConstants.NO_CHUNKS);
long currentChunk = mdpMessage.getUInt32(MdConstants.CURRENT_CHUNK);
mboCycleHandler.update(totNumReports, lastMsgSeqNumProcessed, securityId, noChunks, currentChunk);
} else {
mbpCycleHandler.update(totNumReports, lastMsgSeqNumProcessed, securityId, 1, 1);
}
}
target.handleSnapshotPacket(feedContext, mdpPacket);
}
break;
default:
break;
}
} finally {
lock.unlock();
}
}