in sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java [288:374]
static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValues) {
if (value == null) {
return null;
}
switch (fieldType.getTypeName()) {
// BEAM-12176: Numbers aren't always the type we expect.
case BYTE:
return ((Number) value).byteValue();
case INT16:
return ((Number) value).shortValue();
case INT32:
return ((Number) value).intValue();
case INT64:
return ((Number) value).longValue();
case FLOAT:
return ((Number) value).floatValue();
case DOUBLE:
return ((Number) value).doubleValue();
case DECIMAL:
if (value instanceof BigDecimal) {
return (BigDecimal) value;
} else if (value instanceof Long) {
return BigDecimal.valueOf((Long) value);
} else if (value instanceof Integer) {
return BigDecimal.valueOf((Integer) value);
}
return new BigDecimal(((Number) value).toString());
case STRING:
return (String) value;
case BOOLEAN:
return (Boolean) value;
case DATETIME:
if (value instanceof Timestamp) {
value = SqlFunctions.toLong((Timestamp) value);
}
return Instant.ofEpochMilli(((Number) value).longValue());
case BYTES:
if (value instanceof byte[]) {
return value;
}
return ((ByteString) value).getBytes();
case ARRAY:
return toBeamList((List<Object>) value, fieldType.getCollectionElementType(), verifyValues);
case MAP:
return toBeamMap(
(Map<Object, Object>) value,
fieldType.getMapKeyType(),
fieldType.getMapValueType(),
verifyValues);
case ROW:
if (value instanceof Object[]) {
value = Arrays.asList((Object[]) value);
}
return toBeamRow((List<Object>) value, fieldType.getRowSchema(), verifyValues);
case LOGICAL_TYPE:
String identifier = fieldType.getLogicalType().getIdentifier();
if (CharType.IDENTIFIER.equals(identifier)) {
return (String) value;
} else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) {
return Instant.ofEpochMilli(((Number) value).longValue());
} else if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
if (value instanceof Date) {
value = SqlFunctions.toInt((Date) value);
}
// BEAM-12175: value should always be Integer here, but it isn't.
return LocalDate.ofEpochDay(((Number) value).longValue());
} else if (SqlTypes.TIME.getIdentifier().equals(identifier)) {
if (value instanceof Time) {
value = SqlFunctions.toInt((Time) value);
}
// BEAM-12175: value should always be Integer here, but it isn't.
return LocalTime.ofNanoOfDay(((Number) value).longValue() * NANOS_PER_MILLISECOND);
} else if (SqlTypes.DATETIME.getIdentifier().equals(identifier)) {
if (value instanceof Timestamp) {
value = SqlFunctions.toLong((Timestamp) value);
}
return LocalDateTime.of(
LocalDate.ofEpochDay(((Number) value).longValue() / MILLIS_PER_DAY),
LocalTime.ofNanoOfDay(
(((Number) value).longValue() % MILLIS_PER_DAY) * NANOS_PER_MILLISECOND));
} else {
throw new UnsupportedOperationException("Unable to convert logical type " + identifier);
}
default:
throw new UnsupportedOperationException("Unable to convert " + fieldType.getTypeName());
}
}