in dbeam-core/src/main/java/com/spotify/dbeam/options/JdbcExportArgsFactory.java [84:130]
public static QueryBuilderArgs createQueryArgs(final JdbcExportPipelineOptions options)
throws IOException {
final TemporalAmount partitionPeriod =
partitionPeriodTemporalAmount(options.getPartitionPeriod());
final Optional<Instant> partition =
Optional.ofNullable(options.getPartition()).map(JdbcExportArgsFactory::parseInstant);
final Optional<String> partitionColumn = Optional.ofNullable(options.getPartitionColumn());
checkArgument(
!partitionColumn.isPresent() || partition.isPresent(),
"To use --partitionColumn the --partition parameter must also be configured");
if (!(options.isSkipPartitionCheck() || partitionColumn.isPresent())) {
Instant minPartitionDateTime =
Optional.ofNullable(options.getMinPartitionPeriod())
.map(JdbcExportArgsFactory::parseInstant)
// given Instant does not support operations with ChronoUnit.MONTHS
.orElse(
Instant.now()
.atOffset(ZoneOffset.UTC)
.minus(partitionPeriod) // subtracts two partitions
.minus(partitionPeriod)
.toInstant());
partition.map(p -> validatePartition(p, minPartitionDateTime));
}
final Optional<String> splitColumn = Optional.ofNullable(options.getSplitColumn());
final Optional<Integer> queryParallelism = Optional.ofNullable(options.getQueryParallelism());
checkArgument(
queryParallelism.isPresent() == splitColumn.isPresent(),
"Either both --queryParallelism and --splitColumn must be present or " + "none of them");
queryParallelism.ifPresent(
p ->
checkArgument(
p > 0,
"Query Parallelism must be a positive number. Specified queryParallelism was %s",
p));
return createQueryBuilderArgs(options)
.builder()
.setLimit(Optional.ofNullable(options.getLimit()))
.setPartitionColumn(partitionColumn)
.setPartition(partition)
.setPartitionPeriod(partitionPeriod)
.setSplitColumn(splitColumn)
.setQueryParallelism(queryParallelism)
.build();
}