in tormenta-kafka/src/main/scala/com/twitter/tormenta/spout/KafkaSpout.scala [30:41]
override def getSpout[R](transformer: Scheme[T] => Scheme[R], callOnOpen: => TopologyContext => Unit) = {
// Spout ID needs to be unique per spout, so create that string by taking the topic and appID.
val spoutId = topic + appID
val spoutConfig = new SpoutConfig(new ZkHosts(zkHost, brokerZkPath), topic, zkRoot, spoutId)
spoutConfig.scheme = transformer(scheme)
if (forceStartOffsetTime != -1) {
spoutConfig.startOffsetTime = forceStartOffsetTime
}
new RichStormSpout(new StormKafkaSpout(spoutConfig), callOnOpen)
}