in core/src/main/java/com/epam/cme/mdp3/core/channel/tcp/MdpTCPMessageRequester.java [59:107]
public synchronized boolean askForLostMessages(long beginSeqNo, long endSeqNo, TCPPacketListener tcpPacketListener) {
ByteBuffer request = preparePacketWithRequest(channelId, beginSeqNo, endSeqNo, ++requestCounter);
boolean connected = tcpChannel.connect();
if(connected) {
try {
onFeedStarted();
sendPacket(logon);
MdpPacket receivedPacket = receivePacket();
if (isPacketWithLogon(receivedPacket)) {
tcpChannel.send(request);
long startRequestTime = System.currentTimeMillis();
long packetSeqNum;
do {
if((System.currentTimeMillis() - startRequestTime) > REQUEST_TIMEOUT_IN_MSEC){
logger.warn("Request has not been processed for {} msec", REQUEST_TIMEOUT_IN_MSEC);
return false;
}
receivedPacket = receivePacket();
packetSeqNum = receivedPacket.getMsgSeqNum();
if (packetSeqNum >= beginSeqNo && packetSeqNum <= endSeqNo) {
tcpPacketListener.onPacket(tcpChannel.getFeedContext(), receivedPacket);
}
} while (packetSeqNum < endSeqNo);
receivedPacket = receivePacket();
return isPacketWithLogout(receivedPacket);
} else {
MdpMessage firstMessage = receivedPacket.iterator().next();
int schemaId = firstMessage.getSchemaId();
firstMessage.setMessageType(mdpMessageTypes.getMessageType(schemaId));
logger.warn("The message {} has been received instead of logon", firstMessage);
if(schemaId == LOGOUT_TEMPLATE_ID){
firstMessage.getString(LOGOUT_TEXT_FIELD_ID, sbeString);
logger.warn("Logout has been received with reason '{}'", sbeString.getString());
}
}
} catch (Exception e) {
logger.error("Last packet [{}] - {}", mdpPacket, e.getMessage(), e);
} finally {
try {
sendPacket(logout);
} catch (IOException e) {
logger.error("Error has occurred during logout", e);
}
tcpChannel.disconnect();
onFeedStopped();
}
}
return false;
}