in timescaledb-connector/src/main/java/com/epam/deltix/timebase/connector/ApplicationEventHandler.java [74:100]
public void handleApplicationStartUpEvent(ApplicationReadyEvent event) {
LOG.info().append("Start handling timebase streams: ").append(streams).commit();
migrationMetadataService.createMigrationTable();
streamWildcards = streams.stream()
.filter(DiscoveryUtils::isExpression)
.map(DiscoveryUtils::generateRegExp)
.collect(Collectors.toList());
Set<String> initStreams = streams.stream()
.filter(stream -> !DiscoveryUtils.isExpression(stream))
.collect(Collectors.toSet());
List<String> discoveredStreams = streamWildcards.stream()
.map(discoveryService::discover)
.flatMap(List::stream)
.distinct()
.collect(Collectors.toList());
initStreams.addAll(discoveredStreams);
initStreams.forEach(stream -> {
cacheService.add(stream, new StreamMetaData(StreamMetaData.Status.RUNNING));
executor.execute(() -> replicationService.replicate(stream));
});
}