in tweetypie/server/src/main/scala/com/twitter/tweetypie/config/Caches.scala [64:280]
def apply(
settings: TweetServiceSettings,
stats: StatsReceiver,
timer: Timer,
clients: BackendClients,
tweetKeyFactory: TweetKeyFactory,
deciderGates: TweetypieDeciderGates,
clientIdHelper: ClientIdHelper,
): Caches = {
val cachesStats = stats.scope("caches")
val cachesInprocessStats = cachesStats.scope("inprocess")
val cachesMemcacheStats = cachesStats.scope("memcache")
val cachesMemcacheObserver = new StatsReceiverCacheObserver(cachesStats, 10000, "memcache")
val cachesMemcacheTweetStats = cachesMemcacheStats.scope("tweet")
val cachesInprocessDeviceSourceStats = cachesInprocessStats.scope("device_source")
val cachesMemcacheCountStats = cachesMemcacheStats.scope("count")
val cachesMemcacheTweetCreateStats = cachesMemcacheStats.scope("tweet_create")
val cachesMemcacheGeoScrubStats = cachesMemcacheStats.scope("geo_scrub")
val memcacheClient = clients.memcacheClient
val caffieneMemcachedClient = settings.inProcessCacheConfigOpt match {
case Some(inProcessCacheConfig) =>
new CaffeineMemcacheClient(
proxyClient = memcacheClient,
inProcessCacheConfig.maximumSize,
inProcessCacheConfig.ttl,
cachesMemcacheStats.scope("caffeine")
)
case None =>
memcacheClient
}
val observedMemcacheWithCaffeineClient =
new ObservableMemcache(
new FinagleMemcache(
caffieneMemcachedClient
),
cachesMemcacheObserver
)
def observeCache[K, V](
cache: Cache[K, V],
stats: StatsReceiver,
logName: String,
windowSize: Int = 10000
) =
ObservableCache(
cache,
stats,
windowSize,
// Need to use an old-school c.t.logging.Logger because that's what servo needs
com.twitter.logging.Logger(s"com.twitter.tweetypie.cache.$logName")
)
def mkCache[K, V](
ttl: Duration,
serializer: CacheSerializer[V],
perCacheStats: StatsReceiver,
logName: String,
windowSize: Int = 10000
): Cache[K, V] = {
observeCache(
new MemcacheCache[K, V](
observedMemcacheWithCaffeineClient,
ttl,
serializer
),
perCacheStats,
logName,
windowSize
)
}
def toLockingCache[K, V](
cache: Cache[K, V],
stats: StatsReceiver,
backoffs: Stream[Duration] = settings.lockingCacheBackoffs
): LockingCache[K, V] =
new OptimisticLockingCache(
underlyingCache = cache,
backoffs = Backoff.fromStream(backoffs),
observer = new OptimisticLockingCacheObserver(stats),
timer = timer
)
def mkLockingCache[K, V](
ttl: Duration,
serializer: CacheSerializer[V],
stats: StatsReceiver,
logName: String,
windowSize: Int = 10000,
backoffs: Stream[Duration] = settings.lockingCacheBackoffs
): LockingCache[K, V] =
toLockingCache(
mkCache(ttl, serializer, stats, logName, windowSize),
stats,
backoffs
)
def trackTimeInCache[K, V](
cache: Cache[K, Cached[V]],
stats: StatsReceiver
): Cache[K, Cached[V]] =
new CacheWrapper[K, Cached[V]] {
val ageStat: Stat = stats.stat("time_in_cache_ms")
val underlyingCache: Cache[K, Cached[V]] = cache
override def get(keys: Seq[K]): Future[KeyValueResult[K, Cached[V]]] =
underlyingCache.get(keys).onSuccess(record)
private def record(res: KeyValueResult[K, Cached[V]]): Unit = {
val now = Time.now
for (c <- res.found.values) {
ageStat.add(c.cachedAt.until(now).inMilliseconds)
}
}
}
new Caches {
override val memcachedClientWithInProcessCaching: memcached.Client = caffieneMemcachedClient
private val observingTweetCache: Cache[TweetKey, Cached[CachedTweet]] =
trackTimeInCache(
mkCache(
ttl = settings.tweetMemcacheTtl,
serializer = Serializer.CachedTweet.CachedCompact,
perCacheStats = cachesMemcacheTweetStats,
logName = "MemcacheTweetCache"
),
cachesMemcacheTweetStats
)
// Wrap the tweet cache with a wrapper that will scribe the cache writes
// that happen to a fraction of tweets. This was added as part of the
// investigation into missing place ids and cache inconsistencies that
// were discovered by the additional fields hydrator.
private[this] val writeLoggingTweetCache =
new ScribeTweetCacheWrites(
underlyingCache = observingTweetCache,
logYoungTweetCacheWrites = deciderGates.logYoungTweetCacheWrites,
logTweetCacheWrites = deciderGates.logTweetCacheWrites
)
val tweetCache: LockingCache[TweetKey, Cached[CachedTweet]] =
toLockingCache(
cache = writeLoggingTweetCache,
stats = cachesMemcacheTweetStats
)
val tweetDataCache: LockingCache[TweetId, Cached[TweetData]] =
toLockingCache(
cache = TweetDataCache(tweetCache, tweetKeyFactory.fromId),
stats = cachesMemcacheTweetStats
)
val tweetResultCache: LockingCache[TweetId, Cached[TweetResult]] =
toLockingCache(
cache = TweetResultCache(tweetDataCache),
stats = cachesMemcacheTweetStats
)
val tweetCountsCache: LockingCache[TweetCountKey, Cached[Count]] =
mkLockingCache(
ttl = settings.tweetCountsMemcacheTtl,
serializer = Serializers.CachedLong.Compact,
stats = cachesMemcacheCountStats,
logName = "MemcacheTweetCountCache",
windowSize = 1000,
backoffs = Backoff.linear(0.millis, 2.millis).take(2).toStream
)
val tweetCreateLockerCache: Cache[TweetCreationLock.Key, TweetCreationLock.State] =
observeCache(
new TtlCacheToCache(
underlyingCache = new KeyValueTransformingTtlCache(
underlyingCache = observedMemcacheWithCaffeineClient,
transformer = TweetCreationLock.State.Serializer,
underlyingKey = (_: TweetCreationLock.Key).toString
),
ttl = CacheBasedTweetCreationLock.ttlChooser(
shortTtl = settings.tweetCreateLockingMemcacheTtl,
longTtl = settings.tweetCreateLockingMemcacheLongTtl
)
),
stats = cachesMemcacheTweetCreateStats,
logName = "MemcacheTweetCreateLockingCache",
windowSize = 1000
)
val deviceSourceInProcessCache: LockingCache[String, Cached[DeviceSource]] =
toLockingCache(
observeCache(
new ExpiringLruCache(
ttl = settings.deviceSourceInProcessTtl,
maximumSize = settings.deviceSourceInProcessCacheMaxSize
),
stats = cachesInprocessDeviceSourceStats,
logName = "InprocessDeviceSourceCache"
),
stats = cachesInprocessDeviceSourceStats
)
val geoScrubCache: LockingCache[UserId, Cached[Time]] =
toLockingCache[UserId, Cached[Time]](
new KeyTransformingCache(
mkCache[GeoScrubTimestampKey, Cached[Time]](
ttl = settings.geoScrubMemcacheTtl,
serializer = Serializer.toCached(CacheSerializer.Time),
perCacheStats = cachesMemcacheGeoScrubStats,
logName = "MemcacheGeoScrubCache"
),
(userId: UserId) => GeoScrubTimestampKey(userId)
),
cachesMemcacheGeoScrubStats
)
}
}