in dbeam-core/src/main/java/com/spotify/dbeam/avro/BeamJdbcAvroSchema.java [56:79]
public static Schema createSchema(
final Pipeline pipeline, final JdbcExportArgs args, final Connection connection)
throws Exception {
final long startTime = System.nanoTime();
final Schema generatedSchema = generateAvroSchema(args, connection);
final long elapsedTimeSchema = (System.nanoTime() - startTime) / 1000000;
LOGGER.info("Elapsed time to schema {} seconds", elapsedTimeSchema / 1000.0);
final Counter cnt =
Metrics.counter(BeamJdbcAvroSchema.class.getCanonicalName(), "schemaElapsedTimeMs");
pipeline
.apply(
"ExposeSchemaCountersSeed",
Create.of(Collections.singletonList(0)).withType(TypeDescriptors.integers()))
.apply(
"ExposeSchemaCounters",
MapElements.into(TypeDescriptors.integers())
.via(
v -> {
cnt.inc(elapsedTimeSchema);
return v;
}));
return generatedSchema;
}