public void handleIncrementalPacket()

in mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java [159:221]


    public void handleIncrementalPacket(MdpFeedContext feedContext, MdpPacket mdpPacket) {
        final long pkgSequence = mdpPacket.getMsgSeqNum();
        if(log.isTraceEnabled()) {
            log.trace("Feed {}:{} | handleIncrementalPacket: previous processed sequence '{}', current packet's sequence '{}'",
                    feedContext.getFeedType(), feedContext.getFeed(), lastProcessedSeqNum, pkgSequence);
        }
        try {
            lock.lock();
            switch (currentState) {
                case SYNC:
                    long expectedSequence = lastProcessedSeqNum + 1;
                    if(pkgSequence == expectedSequence) {
                        target.handleIncrementalPacket(feedContext, mdpPacket);
                        if (wasChannelResetInPrcdPacket) {
                            wasChannelResetInPrcdPacket = false;
                        } else {
                            lastProcessedSeqNum = pkgSequence;
                        }
                        processMessagesFromBuffer(feedContext);
                    } else if(pkgSequence > expectedSequence) {
                        buffer.add(pkgSequence, mdpPacket);
                        if(pkgSequence > (expectedSequence + gapThreshold)) {
                            if(log.isInfoEnabled()) {
                                log.info("Past gap of {} expected {} current {}, lost count {}", gapThreshold, expectedSequence, pkgSequence, (pkgSequence - 1) - expectedSequence);
                            }
                            switchState(ChannelState.OUTOFSYNC);
                            long amountOfLostMessages = (pkgSequence - 1) - expectedSequence;
                            if(numberOfTCPAttempts < maxNumberOfTCPAttempts && amountOfLostMessages < TCPMessageRequester.MAX_AVAILABLE_MESSAGES
                                    && tcpRecoveryProcessor != null && executor != null) {
                                if(log.isTraceEnabled()) {
                                    log.trace("TCP Replay request gap {}:{} TCP Attempts: {}", expectedSequence, (pkgSequence-1), numberOfTCPAttempts);
                                }
                                tcpRecoveryProcessor.setBeginSeqNo(expectedSequence);
                                tcpRecoveryProcessor.setEndSeqNo(pkgSequence - 1);
                                executor.execute(tcpRecoveryProcessor);
                                numberOfTCPAttempts++;
                            } else {
                                snapshotRecoveryManager.startRecovery();
                            }
                        }
                    } else {
                        if(log.isTraceEnabled()) {
                            log.trace("Feed {}:{} | handleIncrementalPacket: packet that has sequence '{}' has been skipped. Expected sequence '{}'",
                                    feedContext.getFeedType(), feedContext.getFeed(), pkgSequence, expectedSequence);
                        }
                    }
                    break;
                case INITIAL:
                case OUTOFSYNC:
                    buffer.add(pkgSequence, mdpPacket);
                    packetsInBufferDuringInitialOrOutOfSync++;
                    if(log.isTraceEnabled()) {
                        log.trace("Feed {}:{} | handleIncrementalPacket: current state is '{}', so the packet with sequence '{}' has been put into buffer",
                                feedContext.getFeedType(), feedContext.getFeed(), currentState, pkgSequence);
                    }
                    break;
                default:
                    break;
            }
        } finally {
            lock.unlock();
        }
    }