in sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/AvroCoder.java [546:622]
private void checkIndexedRecord(String context, Schema schema,
@Nullable String specificClassStr) {
if (!activeSchemas.add(schema)) {
reportError(context, "%s appears recursively", schema.getName());
return;
}
switch (schema.getType()) {
case ARRAY:
// Generic Records use GenericData.Array to implement arrays, which is
// essentially an ArrayList, and therefore ordering is deterministic.
// The array is thus deterministic if the elements are deterministic.
checkIndexedRecord(context, schema.getElementType(), null);
break;
case ENUM:
// Enums are deterministic because they encode as a single integer.
break;
case FIXED:
// In the case of GenericRecords, FIXED is deterministic because it
// encodes/decodes as a Byte[].
break;
case MAP:
reportError(context,
"GenericRecord and SpecificRecords use a HashMap to represent MAPs,"
+ " so it is non-deterministic");
break;
case RECORD:
for (org.apache.avro.Schema.Field field : schema.getFields()) {
checkIndexedRecord(
schema.getName() + "." + field.name(),
field.schema(),
field.getProp(SpecificData.CLASS_PROP));
}
break;
case STRING:
// GenericDatumWriter#findStringClass will use a CharSequence or a String
// for each string, so it is deterministic.
// SpecificCompiler#getStringType will use java.lang.String, org.apache.avro.util.Utf8,
// or java.lang.CharSequence, unless SpecificData.CLASS_PROP overrides that.
if (specificClassStr != null) {
Class<?> specificClass;
try {
specificClass = ClassUtils.forName(specificClassStr);
if (!DETERMINISTIC_STRINGABLE_CLASSES.contains(specificClass)) {
reportError(context, "Specific class %s is not known to be deterministic",
specificClassStr);
}
} catch (ClassNotFoundException e) {
reportError(context, "Specific class %s is not known to be deterministic",
specificClassStr);
}
}
break;
case UNION:
for (org.apache.avro.Schema subschema : schema.getTypes()) {
checkIndexedRecord(subschema.getName(), subschema, null);
}
break;
case BOOLEAN:
case BYTES:
case DOUBLE:
case INT:
case FLOAT:
case LONG:
case NULL:
// For types that Avro encodes using one of the above primitives, we assume they are
// deterministic.
break;
default:
reportError(context, "Unknown schema type %s may be non-deterministic", schema.getType());
break;
}
activeSchemas.remove(schema);
}