in dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java [117:144]
public void prepareExport() throws Exception {
configureVersion();
final List<String> queries;
final Schema generatedSchema;
try (Connection connection = jdbcExportArgs.createConnection()) {
generatedSchema = createSchema(connection);
queries = jdbcExportArgs.queryBuilderArgs().buildQueries(connection);
final String tableName = pipelineOptions.as(DBeamPipelineOptions.class).getTable();
JobNameConfiguration.configureJobName(
pipeline.getOptions(), connection.getCatalog(), tableName);
}
if (!this.dataOnly) {
BeamHelper.saveStringOnSubPath(output, "/_AVRO_SCHEMA.avsc", generatedSchema.toString(true));
for (int i = 0; i < queries.size(); i++) {
BeamHelper.saveStringOnSubPath(
this.output, String.format("/_queries/query_%d.sql", i), queries.get(i));
}
}
LOGGER.info("Running queries: {}", queries.toString());
pipeline
.apply("JdbcQueries", Create.of(queries))
.apply(
"JdbcAvroSave",
JdbcAvroIO.createWrite(
output, ".avro", generatedSchema, jdbcExportArgs.jdbcAvroOptions()));
}