in sdk/src/main/java/com/google/cloud/dataflow/sdk/coders/CoderRegistry.java [544:617]
void verifyCompatible(CoderT coder, Type candidateType) throws IncompatibleCoderException {
// Various representations of the coder's class
@SuppressWarnings("unchecked")
Class<CoderT> coderClass = (Class<CoderT>) coder.getClass();
TypeDescriptor<CoderT> coderDescriptor = TypeDescriptor.of(coderClass);
// Various representations of the actual coded type
@SuppressWarnings("unchecked")
TypeDescriptor<T> codedDescriptor = CoderUtils.getCodedType(coderDescriptor);
@SuppressWarnings("unchecked")
Class<T> codedClass = (Class<T>) codedDescriptor.getRawType();
Type codedType = codedDescriptor.getType();
// Various representations of the candidate type
@SuppressWarnings("unchecked")
TypeDescriptor<CandidateT> candidateDescriptor =
(TypeDescriptor<CandidateT>) TypeDescriptor.of(candidateType);
@SuppressWarnings("unchecked")
Class<CandidateT> candidateClass = (Class<CandidateT>) candidateDescriptor.getRawType();
// If coder has type Coder<T> where the actual value of T is lost
// to erasure, then we cannot rule it out.
if (candidateType instanceof TypeVariable) {
return;
}
// If the raw types are not compatible, we can certainly rule out
// coder compatibility
if (!codedClass.isAssignableFrom(candidateClass)) {
throw new IncompatibleCoderException(
String.format("Cannot encode elements of type %s with coder %s because the"
+ " coded type %s is not assignable from %s",
candidateType, coder, codedClass, candidateType),
coder, candidateType);
}
// we have established that this is a covariant upcast... though
// coders are invariant, we are just checking one direction
@SuppressWarnings("unchecked")
TypeDescriptor<T> candidateOkDescriptor = (TypeDescriptor<T>) candidateDescriptor;
// If the coded type is a parameterized type where any of the actual
// type parameters are not compatible, then the whole thing is certainly not
// compatible.
if ((codedType instanceof ParameterizedType) && !isNullOrEmpty(coder.getCoderArguments())) {
ParameterizedType parameterizedSupertype = ((ParameterizedType)
candidateOkDescriptor.getSupertype(codedClass).getType());
Type[] typeArguments = parameterizedSupertype.getActualTypeArguments();
List<? extends Coder<?>> typeArgumentCoders = coder.getCoderArguments();
if (typeArguments.length < typeArgumentCoders.size()) {
throw new IncompatibleCoderException(
String.format("Cannot encode elements of type %s with coder %s:"
+ " the generic supertype %s has %s type parameters, which is less than the"
+ " number of coder arguments %s has (%s).",
candidateOkDescriptor, coder,
parameterizedSupertype, typeArguments.length,
coder, typeArgumentCoders.size()),
coder, candidateOkDescriptor.getType());
}
for (int i = 0; i < typeArgumentCoders.size(); i++) {
try {
verifyCompatible(
typeArgumentCoders.get(i),
candidateDescriptor.resolveType(typeArguments[i]).getType());
} catch (IncompatibleCoderException exn) {
throw new IncompatibleCoderException(
String.format("Cannot encode elements of type %s with coder %s"
+ " because some component coder is incompatible",
candidateType, coder),
coder, candidateType, exn);
}
}
}
}