private void importQsmsg()

in java/ws-server/src/main/java/com/epam/deltix/tbwg/webapp/services/timebase/export/imp/ImportFileTask.java [155:256]


    private void importQsmsg(ImportSettings settings, String fileName, long fileSize, MessageReader2 reader,
                             String space, String symbol) {
        DXTickStream stream = getOrCreateStream(timebaseService, settings, reader.getTypes());

        if (symbol != null && symbols != null && !symbols.containsCharSequence(symbol)) {
            return;
        }

        if (!isSchemaValid(stream, reader.getTypes())) {
            String message = "Schema of " + fileName + " is incompatible with stream schema.";
            LOGGER.error().append(message).commit();
            report.sendWarning(message);
            updateStatus(Collections.emptyList(), Collections.singletonList(message), Collections.emptyList(), ImportState.STARTED);
            return;
        }

        RecordClassDescriptor[] inTypes = reader.getTypes();
        RecordClassDescriptor[] outTypes = stream.isFixedType() ?
                new RecordClassDescriptor[]{stream.getFixedType()} :
                stream.getPolymorphicDescriptors();

        SchemaAnalyzer analyzer = Utils.createSchemaAnalyzer(inTypes, outTypes);
        HashMap<RecordClassDescriptor, Function<RawMessage, RawMessage>> converters = new HashMap<>();

        LoadingOptions.WriteMode writeMode = settings.getWriteMode() != null ? settings.getWriteMode() : LoadingOptions.WriteMode.REWRITE;
        String importTarget = stream.getKey() + (space != null ? ("[space: " + space + "]") : "") + (symbol != null ? ("[symbol: " + symbol + "]") : "");
        String importMessage = "Importing " + fileName + " into " + importTarget + " (Write Mode: " + writeMode + ")";
        LOGGER.info().append(importMessage).commit();
        report.sendInfo(importMessage);
        updateStatus(Collections.singletonList(importMessage), Collections.emptyList(), Collections.emptyList(), ImportState.STARTED);

        int importedMessages = 0;
        LoadingOptions options = new LoadingOptions();
        options.raw = true;
        options.channelQOS = ChannelQualityOfService.MAX_THROUGHPUT;
        options.space = space;
        options.writeMode = writeMode;

        long lastSendProgressMs = 0;

        importProcess.update();
        try (TickLoader loader = stream.createLoader(options)) {
            loader.addEventListener(report::newWarning);

            while (reader.next() && !cancelled) {
                InstrumentMessage msg = reader.getMessage();
                if (msg.getTimeStampMs() > importProcess.importSettings().getEndTime()) {
                    break;
                }
                if (msg.getTimeStampMs() < importProcess.importSettings().getStartTime()) {
                    continue;
                }

                if (symbols != null && !symbols.containsCharSequence(msg.getSymbol())) {
                    continue;
                }

                if (msg instanceof RawMessage) {
                    RawMessage message = (RawMessage) msg;
                    Function<RawMessage, RawMessage> converter = converters.get(message.type);
                    if (converter == null) {
                        RecordClassDescriptor descriptor = Utils.findType(stream.getTypes(), message.type);
                        if (descriptor == null) {
                            continue;
                        }

                        MetaDataChange change = analyzer.getChanges(
                            new RecordClassSet(new RecordClassDescriptor[] { message.type }),
                            MetaDataChange.ContentType.Fixed,
                            new RecordClassSet(new RecordClassDescriptor[] { descriptor }),
                            MetaDataChange.ContentType.Fixed
                        );

                        SchemaConverter finalConverter = new SchemaConverter(change);
                        converters.put(message.type, converter = finalConverter::convert);
                    }

                    RawMessage converted = converter.apply(message);
                    if (converted != null) {
                        loader.send(converted);
                        importedMessages++;

                        if (System.currentTimeMillis() - lastSendProgressMs > 500) {
                            report.sendProgress(getCurrentProgress(reader.getProgress(), fileSize));
                            report.sendWarnings();
                            lastSendProgressMs = System.currentTimeMillis();
                        }
                    }
                }

                if (importedMessages % 10000 == 0) {
                    importProcess.update();
                }
            }
        }

        String successfulMessage = "Successfully imported " + fileName + " into " +
            importTarget + ". Imported messages: " + importedMessages;
        LOGGER.info().append(successfulMessage).commit();
        report.sendInfo(successfulMessage);
        updateStatus(Collections.singletonList(successfulMessage), Collections.emptyList(), Collections.emptyList(), ImportState.STARTED);
    }