in scio-google-cloud-platform/src/main/scala/com/spotify/scio/pubsub/PubsubIO.scala [84:146]
def string(
name: String,
idAttribute: String = null,
timestampAttribute: String = null
): PubsubIO[String] =
StringPubsubIOWithoutAttributes(name, idAttribute, timestampAttribute)
def avro[T <: SpecificRecord: ClassTag](
name: String,
idAttribute: String = null,
timestampAttribute: String = null
): PubsubIO[T] =
AvroPubsubIOWithoutAttributes[T](name, idAttribute, timestampAttribute)
def proto[T <: Message: ClassTag](
name: String,
idAttribute: String = null,
timestampAttribute: String = null
): PubsubIO[T] =
MessagePubsubIOWithoutAttributes[T](name, idAttribute, timestampAttribute)
def pubsub[T <: beam.PubsubMessage](
name: String,
idAttribute: String = null,
timestampAttribute: String = null
): PubsubIO[T] =
PubSubMessagePubsubIOWithoutAttributes[T](name, idAttribute, timestampAttribute)
def coder[T: Coder](
name: String,
idAttribute: String = null,
timestampAttribute: String = null
): PubsubIO[T] =
FallbackPubsubIOWithoutAttributes[T](name, idAttribute, timestampAttribute)
def withAttributes[T: Coder](
name: String,
idAttribute: String = null,
timestampAttribute: String = null
): PubsubIO[(T, Map[String, String])] =
PubsubIOWithAttributes[T](name, idAttribute, timestampAttribute)
private[pubsub] def configureRead[T](
r: beam.PubsubIO.Read[T]
)(
name: String,
params: ReadParam,
idAttribute: String,
timestampAttribute: String
): beam.PubsubIO.Read[T] = {
var read =
params.readType match {
case Subscription => r.fromSubscription(name)
case Topic => r.fromTopic(name)
}
read = params.clientFactory.fold(read)(read.withClientFactory)
read = params.deadLetterTopic.fold(read)(read.withDeadLetterTopic)
read = Option(idAttribute).fold(read)(read.withIdAttribute)
read = Option(timestampAttribute).fold(read)(read.withTimestampAttribute)
read
}