in mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java [159:221]
public void handleIncrementalPacket(MdpFeedContext feedContext, MdpPacket mdpPacket) {
final long pkgSequence = mdpPacket.getMsgSeqNum();
if(log.isTraceEnabled()) {
log.trace("Feed {}:{} | handleIncrementalPacket: previous processed sequence '{}', current packet's sequence '{}'",
feedContext.getFeedType(), feedContext.getFeed(), lastProcessedSeqNum, pkgSequence);
}
try {
lock.lock();
switch (currentState) {
case SYNC:
long expectedSequence = lastProcessedSeqNum + 1;
if(pkgSequence == expectedSequence) {
target.handleIncrementalPacket(feedContext, mdpPacket);
if (wasChannelResetInPrcdPacket) {
wasChannelResetInPrcdPacket = false;
} else {
lastProcessedSeqNum = pkgSequence;
}
processMessagesFromBuffer(feedContext);
} else if(pkgSequence > expectedSequence) {
buffer.add(pkgSequence, mdpPacket);
if(pkgSequence > (expectedSequence + gapThreshold)) {
if(log.isInfoEnabled()) {
log.info("Past gap of {} expected {} current {}, lost count {}", gapThreshold, expectedSequence, pkgSequence, (pkgSequence - 1) - expectedSequence);
}
switchState(ChannelState.OUTOFSYNC);
long amountOfLostMessages = (pkgSequence - 1) - expectedSequence;
if(numberOfTCPAttempts < maxNumberOfTCPAttempts && amountOfLostMessages < TCPMessageRequester.MAX_AVAILABLE_MESSAGES
&& tcpRecoveryProcessor != null && executor != null) {
if(log.isTraceEnabled()) {
log.trace("TCP Replay request gap {}:{} TCP Attempts: {}", expectedSequence, (pkgSequence-1), numberOfTCPAttempts);
}
tcpRecoveryProcessor.setBeginSeqNo(expectedSequence);
tcpRecoveryProcessor.setEndSeqNo(pkgSequence - 1);
executor.execute(tcpRecoveryProcessor);
numberOfTCPAttempts++;
} else {
snapshotRecoveryManager.startRecovery();
}
}
} else {
if(log.isTraceEnabled()) {
log.trace("Feed {}:{} | handleIncrementalPacket: packet that has sequence '{}' has been skipped. Expected sequence '{}'",
feedContext.getFeedType(), feedContext.getFeed(), pkgSequence, expectedSequence);
}
}
break;
case INITIAL:
case OUTOFSYNC:
buffer.add(pkgSequence, mdpPacket);
packetsInBufferDuringInitialOrOutOfSync++;
if(log.isTraceEnabled()) {
log.trace("Feed {}:{} | handleIncrementalPacket: current state is '{}', so the packet with sequence '{}' has been put into buffer",
feedContext.getFeedType(), feedContext.getFeed(), currentState, pkgSequence);
}
break;
default:
break;
}
} finally {
lock.unlock();
}
}