def getExtractor[T]()

in scalding-db/src/main/scala/com/twitter/scalding/db/macros/impl/ColumnDefinitionProviderImpl.scala [238:349]


  def getExtractor[T](c: Context)(implicit T: c.WeakTypeTag[T]): c.Expr[ResultSetExtractor[T]] = {
    import c.universe._

    val columnFormats = getColumnFormats[T](c)

    val rsmdTerm = newTermName(c.fresh("rsmd"))
    // we validate two things from ResultSetMetadata
    // 1. the column types match with actual DB schema
    // 2. all non-nullable fields are indeed non-nullable in DB schema
    val checks = columnFormats.zipWithIndex.map { case (cf: ColumnFormat[_], pos: Int) =>
      val fieldName = cf.fieldName.toStr
      val typeNameTerm = newTermName(c.fresh(s"colTypeName_$pos"))
      // MySQL uses names like `DATE`, `INTEGER` and `VARCHAR`;
      // Vertica uses names like `Date`, `Integer` and `Varchar`
      val typeName = q"""
        val $typeNameTerm = $rsmdTerm.getColumnTypeName(${pos + 1}).toUpperCase(java.util.Locale.US)
        """
      // certain types have synonyms, so we group them together here
      // note: this is mysql specific
      // http://dev.mysql.com/doc/refman/5.0/en/numeric-type-overview.html
      val typeValidation = cf.fieldType match {
        case "VARCHAR"             => q"""List("VARCHAR", "CHAR").contains($typeNameTerm)"""
        case "BOOLEAN" | "TINYINT" => q"""List("BOOLEAN", "BOOL", "TINYINT").contains($typeNameTerm)"""
        case "INT"                 => q"""List("INTEGER", "INT").contains($typeNameTerm)"""
        // In Vertica, `INTEGER`, `INT`, `BIGINT`, `INT8`, `SMALLINT`, and `TINYINT` are all 64 bits
        // https://www.vertica.com/docs/8.1.x/HTML/index.htm#Authoring/SQLReferenceManual/DataTypes/Numeric/INTEGER.htm
        // In MySQL, `TINYINT`, `SMALLINT`, `MEDIUMINT`, `INT`, and `BIGINT` are all <= 64 bits
        // https://dev.mysql.com/doc/refman/5.7/en/integer-types.html
        // As the user has told us this field can store a `BIGINT`, we can safely accept any of these
        // types from the database.
        case "BIGINT" =>
          q"""List("INTEGER", "INT", "BIGINT", "INT8", "SMALLINT",
               "TINYINT", "SMALLINT", "MEDIUMINT").contains($typeNameTerm)"""
        case "DATETIME" => q"""List("DATE","DATETIME","TIMESTAMP").contains($typeNameTerm)"""
        case f          => q"""$f == $typeNameTerm"""
      }
      val typeAssert = q"""
        if (!$typeValidation) {
          throw new _root_.com.twitter.scalding.db.JdbcValidationException(
            "Mismatched type for column '" + $fieldName + "'. Expected " + ${cf.fieldType} +
              " but set to " + $typeNameTerm + " in DB.")
        }
        """
      val nullableTerm = newTermName(c.fresh(s"isNullable_$pos"))
      val nullableValidation = q"""
        val $nullableTerm = $rsmdTerm.isNullable(${pos + 1})
        if ($nullableTerm == _root_.java.sql.ResultSetMetaData.columnNoNulls && ${cf.nullable}) {
          throw new _root_.com.twitter.scalding.db.JdbcValidationException(
            "Column '" + $fieldName + "' is not nullable in DB.")
        }
        """
      q"""
        $typeName
        $typeAssert
        $nullableValidation
        """
    }

    val rsTerm = newTermName(c.fresh("rs"))
    val formats = columnFormats.map {
      case cf: ColumnFormat[_] => {
        val fieldName = cf.fieldName.toStr
        // java boxed types needed below to populate cascading's Tuple
        val (box: Option[Tree], primitiveGetter: Tree) = cf.fieldType match {
          case "VARCHAR" | "TEXT" =>
            (None, q"""$rsTerm.getString($fieldName)""")
          case "BOOLEAN" =>
            (Some(q"""_root_.java.lang.Boolean.valueOf"""), q"""$rsTerm.getBoolean($fieldName)""")
          case "TINYINT" =>
            (Some(q"""_root_.java.lang.Byte.valueOf"""), q"""$rsTerm.getByte($fieldName)""")
          case "DATE" | "DATETIME" =>
            (
              None,
              q"""Option($rsTerm.getTimestamp($fieldName)).map { ts => new java.util.Date(ts.getTime) }.orNull"""
            )
          // dates set to null are populated as None by tuple converter
          // if the corresponding case class field is an Option[Date]
          case "DOUBLE" =>
            (Some(q"""_root_.java.lang.Double.valueOf"""), q"""$rsTerm.getDouble($fieldName)""")
          case "BIGINT" =>
            (Some(q"""_root_.java.lang.Long.valueOf"""), q"""$rsTerm.getLong($fieldName)""")
          case "INT" | "SMALLINT" =>
            (Some(q"""_root_.java.lang.Integer.valueOf"""), q"""$rsTerm.getInt($fieldName)""")
          case "BLOB" =>
            (
              None,
              q"""Option($rsTerm.getBlob($fieldName)).map ( blob => blob.getBytes(1,blob.length().toInt)).orNull """
            )
          case f =>
            (None, q"""sys.error("Invalid format " + $f + " for " + $fieldName)""")
        }
        // note: UNSIGNED BIGINT is currently unsupported
        val valueTerm = newTermName(c.fresh("colValue"))
        val boxed = box.map(b => q"""$b($valueTerm)""").getOrElse(q"""$valueTerm""")
        // primitiveGetter needs to be invoked before we can use wasNull
        // to check if the column value that was read is null or not
        q"""
          { val $valueTerm = $primitiveGetter; if ($rsTerm.wasNull) null else $boxed }
        """
      }
    }
    val tcTerm = newTermName(c.fresh("conv"))
    val res = q"""
    new _root_.com.twitter.scalding.db.ResultSetExtractor[$T] {
      def validate($rsmdTerm: _root_.java.sql.ResultSetMetaData): _root_.scala.util.Try[Unit] = _root_.scala.util.Try { ..$checks }
      def toCaseClass($rsTerm: java.sql.ResultSet, $tcTerm: _root_.com.twitter.scalding.TupleConverter[$T]): $T =
        $tcTerm(new _root_.cascading.tuple.TupleEntry(new _root_.cascading.tuple.Tuple(..$formats)))
    }
    """
    // ResultSet -> TupleEntry -> case class
    c.Expr[ResultSetExtractor[T]](res)
  }