in ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala [311:361]
def diff[T: ClassTag: Coder](
lhs: SCollection[T],
rhs: SCollection[T],
d: Diffy[T],
keyFn: T => MultiKey,
ignoreNan: Boolean = false
): BigDiffy[T] =
new BigDiffy[T](lhs, rhs, d, keyFn, ignoreNan)
/** Diff two Avro data sets. */
def diffAvro[T <: SpecificRecordBase: ClassTag: Coder](
sc: ScioContext,
lhs: String,
rhs: String,
keyFn: T => MultiKey,
diffy: AvroDiffy[T],
ignoreNan: Boolean = false
): BigDiffy[T] =
diff(sc.avroFile[T](lhs), sc.avroFile[T](rhs), diffy, keyFn, ignoreNan)
/** Diff two ProtoBuf data sets. */
def diffProtoBuf[T <: AbstractMessage: ClassTag](
sc: ScioContext,
lhs: String,
rhs: String,
keyFn: T => MultiKey,
diffy: ProtoBufDiffy[T]
): BigDiffy[T] =
diff(sc.protobufFile(lhs), sc.protobufFile(rhs), diffy, keyFn)
/**
* Diff two Parquet data sets. Note that both typed-parquet and avro-parquet inputs are supported.
* However, in either case the diff will be written in Parquet format as Avro GenericRecords.
*/
def diffParquet(
sc: ScioContext,
lhs: String,
rhs: String,
keyFn: GenericRecord => MultiKey,
diffy: AvroDiffy[GenericRecord]
): BigDiffy[GenericRecord] = {
val compatSchema = ParquetIO.getCompatibleSchemaForFiles(lhs, rhs)
implicit val grCoder: Coder[GenericRecord] = avroGenericRecordCoder(compatSchema)
diff(
sc.parquetAvroFile[GenericRecord](lhs, compatSchema).map(identity),
sc.parquetAvroFile[GenericRecord](rhs, compatSchema).map(identity),
diffy,
keyFn
)
}