in algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/AsyncSummerBenchmark.scala [24:135]
def hll[T](t: T)(implicit monoid: HyperLogLogMonoid, inj: Injection[T, Array[Byte]]): HLL =
monoid.create(inj(t))
@State(Scope.Benchmark)
class SummerState {
var asyncNonCompactListSum: AsyncSummer[(Long, HLL), Map[Long, HLL]] = _
var asyncCompactListSum: AsyncSummer[(Long, HLL), Map[Long, HLL]] = _
var asyncMapSum: AsyncSummer[(Long, HLL), Map[Long, HLL]] = _
var syncSummingQueue: AsyncSummer[(Long, HLL), Map[Long, HLL]] = _
var nullSummer: AsyncSummer[(Long, HLL), Map[Long, HLL]] = _
@Param(Array("10", "100", "1000", "10000"))
val numInputKeys: Int = 0
@Param(Array("10", "1000", "10000", "100000"))
val numInputItems: Int = 0
@Param(Array("100", "200", "500", "1000"))
val buffSizeInt: Int = 0
var inputItems: IndexedSeq[IndexedSeq[(Long, HLL)]] = _
var batchCount: Int = _
var bufferSize: BufferSize = _
def calcNumHeavyKeys(numInputKeys: Int): Int = {
val ratio = numInputKeys / (numInputKeys.toDouble + 50)
val count = (ratio * 100).toInt
val heavyHitterCount = min(count, numInputKeys / 10)
println("Producing %d heavy hitter keys".format(heavyHitterCount))
heavyHitterCount
}
@Setup(Level.Trial)
def setup(): Unit = {
val heavyKeys = (0 until calcNumHeavyKeys(numInputKeys)).toSet
val heavyKeysIndexedSeq = heavyKeys.toIndexedSeq
val rnd = new Random(3)
bufferSize = BufferSize(buffSizeInt)
nullSummer = new NullSummer[Long, HLL](Counter("tuplesIn"), Counter("tuplesOut"))
asyncNonCompactListSum = new AsyncListSum[Long, HLL](
bufferSize,
flushFrequency,
memoryFlushPercent,
Counter("memory"),
Counter("timeOut"),
Counter("insertOp"),
Counter("insertFails"),
Counter("size"),
Counter("tuplesIn"),
Counter("tuplesOut"),
workPool,
Compact(false),
CompactionSize(0)
)
asyncCompactListSum = new AsyncListSum[Long, HLL](
bufferSize,
flushFrequency,
memoryFlushPercent,
Counter("memory"),
Counter("timeOut"),
Counter("insertOp"),
Counter("insertFails"),
Counter("size"),
Counter("tuplesIn"),
Counter("tuplesOut"),
workPool,
Compact(true),
CompactionSize(500)
)
asyncMapSum = new AsyncMapSum[Long, HLL](
bufferSize,
flushFrequency,
memoryFlushPercent,
Counter("memory"),
Counter("timeOut"),
Counter("insertOp"),
Counter("tuplesOut"),
Counter("size"),
workPool
)
syncSummingQueue = new SyncSummingQueue[Long, HLL](
bufferSize,
flushFrequency,
memoryFlushPercent,
Counter("memory"),
Counter("timeOut"),
Counter("size"),
Counter("puts"),
Counter("tuplesIn"),
Counter("tuplesOut")
)
val inputData: IndexedSeq[(Long, HLL)] = (0L until numInputItems).map { _ =>
val pos = rnd.nextInt(10)
val k = if (pos < 8) { // 80% chance of hitting a heavy hitter
heavyKeysIndexedSeq(rnd.nextInt(heavyKeysIndexedSeq.size))
} else {
var kCandidate = rnd.nextInt(numInputKeys)
while (heavyKeys.contains(kCandidate)) {
kCandidate = rnd.nextInt(numInputKeys)
}
kCandidate
}
(k.toLong, hll(Random.nextLong))
}.toIndexedSeq
inputItems = inputData.grouped(bufferSize.v / 4).toIndexedSeq // materialize this
batchCount = inputItems.size
}
}