in java/ws-server/src/main/java/com/epam/deltix/tbwg/webapp/services/timebase/export/imp/ImportFileTask.java [155:256]
private void importQsmsg(ImportSettings settings, String fileName, long fileSize, MessageReader2 reader,
String space, String symbol) {
DXTickStream stream = getOrCreateStream(timebaseService, settings, reader.getTypes());
if (symbol != null && symbols != null && !symbols.containsCharSequence(symbol)) {
return;
}
if (!isSchemaValid(stream, reader.getTypes())) {
String message = "Schema of " + fileName + " is incompatible with stream schema.";
LOGGER.error().append(message).commit();
report.sendWarning(message);
updateStatus(Collections.emptyList(), Collections.singletonList(message), Collections.emptyList(), ImportState.STARTED);
return;
}
RecordClassDescriptor[] inTypes = reader.getTypes();
RecordClassDescriptor[] outTypes = stream.isFixedType() ?
new RecordClassDescriptor[]{stream.getFixedType()} :
stream.getPolymorphicDescriptors();
SchemaAnalyzer analyzer = Utils.createSchemaAnalyzer(inTypes, outTypes);
HashMap<RecordClassDescriptor, Function<RawMessage, RawMessage>> converters = new HashMap<>();
LoadingOptions.WriteMode writeMode = settings.getWriteMode() != null ? settings.getWriteMode() : LoadingOptions.WriteMode.REWRITE;
String importTarget = stream.getKey() + (space != null ? ("[space: " + space + "]") : "") + (symbol != null ? ("[symbol: " + symbol + "]") : "");
String importMessage = "Importing " + fileName + " into " + importTarget + " (Write Mode: " + writeMode + ")";
LOGGER.info().append(importMessage).commit();
report.sendInfo(importMessage);
updateStatus(Collections.singletonList(importMessage), Collections.emptyList(), Collections.emptyList(), ImportState.STARTED);
int importedMessages = 0;
LoadingOptions options = new LoadingOptions();
options.raw = true;
options.channelQOS = ChannelQualityOfService.MAX_THROUGHPUT;
options.space = space;
options.writeMode = writeMode;
long lastSendProgressMs = 0;
importProcess.update();
try (TickLoader loader = stream.createLoader(options)) {
loader.addEventListener(report::newWarning);
while (reader.next() && !cancelled) {
InstrumentMessage msg = reader.getMessage();
if (msg.getTimeStampMs() > importProcess.importSettings().getEndTime()) {
break;
}
if (msg.getTimeStampMs() < importProcess.importSettings().getStartTime()) {
continue;
}
if (symbols != null && !symbols.containsCharSequence(msg.getSymbol())) {
continue;
}
if (msg instanceof RawMessage) {
RawMessage message = (RawMessage) msg;
Function<RawMessage, RawMessage> converter = converters.get(message.type);
if (converter == null) {
RecordClassDescriptor descriptor = Utils.findType(stream.getTypes(), message.type);
if (descriptor == null) {
continue;
}
MetaDataChange change = analyzer.getChanges(
new RecordClassSet(new RecordClassDescriptor[] { message.type }),
MetaDataChange.ContentType.Fixed,
new RecordClassSet(new RecordClassDescriptor[] { descriptor }),
MetaDataChange.ContentType.Fixed
);
SchemaConverter finalConverter = new SchemaConverter(change);
converters.put(message.type, converter = finalConverter::convert);
}
RawMessage converted = converter.apply(message);
if (converted != null) {
loader.send(converted);
importedMessages++;
if (System.currentTimeMillis() - lastSendProgressMs > 500) {
report.sendProgress(getCurrentProgress(reader.getProgress(), fileSize));
report.sendWarnings();
lastSendProgressMs = System.currentTimeMillis();
}
}
}
if (importedMessages % 10000 == 0) {
importProcess.update();
}
}
}
String successfulMessage = "Successfully imported " + fileName + " into " +
importTarget + ". Imported messages: " + importedMessages;
LOGGER.info().append(successfulMessage).commit();
report.sendInfo(successfulMessage);
updateStatus(Collections.singletonList(successfulMessage), Collections.emptyList(), Collections.emptyList(), ImportState.STARTED);
}