in sdk/src/main/java/com/google/cloud/dataflow/sdk/values/TypedPValue.java [129:192]
private Coder<T> inferCoderOrFail() {
// First option for a coder: use the Coder set on this PValue.
if (coder != null) {
return coder;
}
AppliedPTransform<?, ?, ?> application = getProducingTransformInternal();
// Second option for a coder: Look in the coder registry.
CoderRegistry registry = getPipeline().getCoderRegistry();
TypeDescriptor<T> token = getTypeDescriptor();
CannotProvideCoderException inferFromTokenException = null;
if (token != null) {
try {
return registry.getDefaultCoder(token);
} catch (CannotProvideCoderException exc) {
inferFromTokenException = exc;
// Attempt to detect when the token came from a TupleTag used for a ParDo side output,
// and provide a better error message if so. Unfortunately, this information is not
// directly available from the TypeDescriptor, so infer based on the type of the PTransform
// and the error message itself.
if (application.getTransform() instanceof ParDo.BoundMulti
&& exc.getReason() == ReasonCode.TYPE_ERASURE) {
inferFromTokenException = new CannotProvideCoderException(exc.getMessage()
+ " If this error occurs for a side output of the producing ParDo, verify that the "
+ "TupleTag for this output is constructed with proper type information (see "
+ "TupleTag Javadoc) or explicitly set the Coder to use if this is not possible.");
}
}
}
// Third option for a coder: use the default Coder from the producing PTransform.
CannotProvideCoderException inputCoderException;
try {
return ((PTransform) application.getTransform()).getDefaultOutputCoder(
application.getInput(), this);
} catch (CannotProvideCoderException exc) {
inputCoderException = exc;
}
// Build up the error message and list of causes.
StringBuilder messageBuilder = new StringBuilder()
.append("Unable to return a default Coder for ").append(this)
.append(". Correct one of the following root causes:");
// No exception, but give the user a message about .setCoder() has not been called.
messageBuilder.append("\n No Coder has been manually specified; ")
.append(" you may do so using .setCoder().");
if (inferFromTokenException != null) {
messageBuilder
.append("\n Inferring a Coder from the CoderRegistry failed: ")
.append(inferFromTokenException.getMessage());
}
if (inputCoderException != null) {
messageBuilder
.append("\n Using the default output Coder from the producing PTransform failed: ")
.append(inputCoderException.getMessage());
}
// Build and throw the exception.
throw new IllegalStateException(messageBuilder.toString());
}