in scalding-db/src/main/scala/com/twitter/scalding/db/macros/impl/ColumnDefinitionProviderImpl.scala [238:349]
def getExtractor[T](c: Context)(implicit T: c.WeakTypeTag[T]): c.Expr[ResultSetExtractor[T]] = {
import c.universe._
val columnFormats = getColumnFormats[T](c)
val rsmdTerm = newTermName(c.fresh("rsmd"))
// we validate two things from ResultSetMetadata
// 1. the column types match with actual DB schema
// 2. all non-nullable fields are indeed non-nullable in DB schema
val checks = columnFormats.zipWithIndex.map { case (cf: ColumnFormat[_], pos: Int) =>
val fieldName = cf.fieldName.toStr
val typeNameTerm = newTermName(c.fresh(s"colTypeName_$pos"))
// MySQL uses names like `DATE`, `INTEGER` and `VARCHAR`;
// Vertica uses names like `Date`, `Integer` and `Varchar`
val typeName = q"""
val $typeNameTerm = $rsmdTerm.getColumnTypeName(${pos + 1}).toUpperCase(java.util.Locale.US)
"""
// certain types have synonyms, so we group them together here
// note: this is mysql specific
// http://dev.mysql.com/doc/refman/5.0/en/numeric-type-overview.html
val typeValidation = cf.fieldType match {
case "VARCHAR" => q"""List("VARCHAR", "CHAR").contains($typeNameTerm)"""
case "BOOLEAN" | "TINYINT" => q"""List("BOOLEAN", "BOOL", "TINYINT").contains($typeNameTerm)"""
case "INT" => q"""List("INTEGER", "INT").contains($typeNameTerm)"""
// In Vertica, `INTEGER`, `INT`, `BIGINT`, `INT8`, `SMALLINT`, and `TINYINT` are all 64 bits
// https://www.vertica.com/docs/8.1.x/HTML/index.htm#Authoring/SQLReferenceManual/DataTypes/Numeric/INTEGER.htm
// In MySQL, `TINYINT`, `SMALLINT`, `MEDIUMINT`, `INT`, and `BIGINT` are all <= 64 bits
// https://dev.mysql.com/doc/refman/5.7/en/integer-types.html
// As the user has told us this field can store a `BIGINT`, we can safely accept any of these
// types from the database.
case "BIGINT" =>
q"""List("INTEGER", "INT", "BIGINT", "INT8", "SMALLINT",
"TINYINT", "SMALLINT", "MEDIUMINT").contains($typeNameTerm)"""
case "DATETIME" => q"""List("DATE","DATETIME","TIMESTAMP").contains($typeNameTerm)"""
case f => q"""$f == $typeNameTerm"""
}
val typeAssert = q"""
if (!$typeValidation) {
throw new _root_.com.twitter.scalding.db.JdbcValidationException(
"Mismatched type for column '" + $fieldName + "'. Expected " + ${cf.fieldType} +
" but set to " + $typeNameTerm + " in DB.")
}
"""
val nullableTerm = newTermName(c.fresh(s"isNullable_$pos"))
val nullableValidation = q"""
val $nullableTerm = $rsmdTerm.isNullable(${pos + 1})
if ($nullableTerm == _root_.java.sql.ResultSetMetaData.columnNoNulls && ${cf.nullable}) {
throw new _root_.com.twitter.scalding.db.JdbcValidationException(
"Column '" + $fieldName + "' is not nullable in DB.")
}
"""
q"""
$typeName
$typeAssert
$nullableValidation
"""
}
val rsTerm = newTermName(c.fresh("rs"))
val formats = columnFormats.map {
case cf: ColumnFormat[_] => {
val fieldName = cf.fieldName.toStr
// java boxed types needed below to populate cascading's Tuple
val (box: Option[Tree], primitiveGetter: Tree) = cf.fieldType match {
case "VARCHAR" | "TEXT" =>
(None, q"""$rsTerm.getString($fieldName)""")
case "BOOLEAN" =>
(Some(q"""_root_.java.lang.Boolean.valueOf"""), q"""$rsTerm.getBoolean($fieldName)""")
case "TINYINT" =>
(Some(q"""_root_.java.lang.Byte.valueOf"""), q"""$rsTerm.getByte($fieldName)""")
case "DATE" | "DATETIME" =>
(
None,
q"""Option($rsTerm.getTimestamp($fieldName)).map { ts => new java.util.Date(ts.getTime) }.orNull"""
)
// dates set to null are populated as None by tuple converter
// if the corresponding case class field is an Option[Date]
case "DOUBLE" =>
(Some(q"""_root_.java.lang.Double.valueOf"""), q"""$rsTerm.getDouble($fieldName)""")
case "BIGINT" =>
(Some(q"""_root_.java.lang.Long.valueOf"""), q"""$rsTerm.getLong($fieldName)""")
case "INT" | "SMALLINT" =>
(Some(q"""_root_.java.lang.Integer.valueOf"""), q"""$rsTerm.getInt($fieldName)""")
case "BLOB" =>
(
None,
q"""Option($rsTerm.getBlob($fieldName)).map ( blob => blob.getBytes(1,blob.length().toInt)).orNull """
)
case f =>
(None, q"""sys.error("Invalid format " + $f + " for " + $fieldName)""")
}
// note: UNSIGNED BIGINT is currently unsupported
val valueTerm = newTermName(c.fresh("colValue"))
val boxed = box.map(b => q"""$b($valueTerm)""").getOrElse(q"""$valueTerm""")
// primitiveGetter needs to be invoked before we can use wasNull
// to check if the column value that was read is null or not
q"""
{ val $valueTerm = $primitiveGetter; if ($rsTerm.wasNull) null else $boxed }
"""
}
}
val tcTerm = newTermName(c.fresh("conv"))
val res = q"""
new _root_.com.twitter.scalding.db.ResultSetExtractor[$T] {
def validate($rsmdTerm: _root_.java.sql.ResultSetMetaData): _root_.scala.util.Try[Unit] = _root_.scala.util.Try { ..$checks }
def toCaseClass($rsTerm: java.sql.ResultSet, $tcTerm: _root_.com.twitter.scalding.TupleConverter[$T]): $T =
$tcTerm(new _root_.cascading.tuple.TupleEntry(new _root_.cascading.tuple.Tuple(..$formats)))
}
"""
// ResultSet -> TupleEntry -> case class
c.Expr[ResultSetExtractor[T]](res)
}