in algebird-core/src/main/scala/com/twitter/algebird/matrix/AdaptiveMatrix.scala [39:131]
def zero[V: Monoid](rows: Int, cols: Int): AdaptiveMatrix[V] =
fill(rows, cols)(implicitly[Monoid[V]].zero)
def fill[V: Monoid](rows: Int, cols: Int)(fill: V): AdaptiveMatrix[V] =
SparseColumnMatrix(Vector.fill(rows)(AdaptiveVector.fill[V](cols)(fill)))
def empty[V: Monoid](): AdaptiveMatrix[V] =
SparseColumnMatrix(IndexedSeq[AdaptiveVector[V]]())
// The adaptive monoid to swap between sparse modes.
implicit def monoid[V: Monoid]: Monoid[AdaptiveMatrix[V]] =
new Monoid[AdaptiveMatrix[V]] {
private[this] final val innerZero = implicitly[Monoid[V]].zero
override def zero: AdaptiveMatrix[V] =
SparseColumnMatrix[V](IndexedSeq[AdaptiveVector[V]]())
override def plus(a: AdaptiveMatrix[V], b: AdaptiveMatrix[V]): AdaptiveMatrix[V] =
sumOption(List(a, b)).get
private def denseInsert(
rows: Int,
cols: Int,
buff: ArrayBuffer[V],
remainder: Iterator[AdaptiveMatrix[V]]
): Option[AdaptiveMatrix[V]] = {
remainder.foreach(_.updateInto(buff))
Some(DenseMatrix(rows, cols, buff.toIndexedSeq))
}
private def denseUpdate(
current: AdaptiveMatrix[V],
remainder: Iterator[AdaptiveMatrix[V]]
): Option[AdaptiveMatrix[V]] = {
val rows = current.rows
val cols = current.cols
val buffer = ArrayBuffer.fill(rows * cols)(innerZero)
current.updateInto(buffer)
denseInsert(rows, cols, buffer, remainder)
}
private def sparseUpdate(storage: IndexedSeq[MMap[Int, V]], other: SparseColumnMatrix[V]): Unit =
other.rowsByColumns.zipWithIndex.foreach { case (contents, indx) =>
val curMap: MMap[Int, V] = storage(indx)
AdaptiveVector.toMap(contents).foreach { case (col, value) =>
curMap.update(col, Monoid.plus(value, curMap.getOrElse(col, innerZero)))
}
}
private def goDense(
rows: Int,
cols: Int,
storage: IndexedSeq[MMap[Int, V]],
remainder: Iterator[AdaptiveMatrix[V]]
): Option[AdaptiveMatrix[V]] = {
val buffer = ArrayBuffer.fill(rows * cols)(innerZero)
var row = 0
val iter = storage.iterator
while (iter.hasNext) {
val curRow = iter.next()
curRow.foreach { case (col, value) =>
buffer(row * cols + col) = value
}
row += 1
}
denseInsert(rows, cols, buffer, remainder)
}
override def sumOption(items: TraversableOnce[AdaptiveMatrix[V]]): Option[AdaptiveMatrix[V]] =
if (items.isEmpty) {
None
} else {
val iter = items.toIterator.buffered
val rows = iter.head.rows
val cols = iter.head.cols
val sparseStorage = (0 until rows).map(_ => MMap[Int, V]()).toIndexedSeq
while (iter.hasNext) {
val current = iter.next()
current match {
case d @ DenseMatrix(_, _, _) => return denseUpdate(d, iter)
case s @ SparseColumnMatrix(_) =>
sparseUpdate(sparseStorage, s)
if (sparseStorage(0).size > current.cols / 4) {
return goDense(rows, cols, sparseStorage, iter)
}
}
}
// Need to still be sparse to reach here, so must unpack the MMap to be used again.
Some(SparseColumnMatrix.fromSeqMap(cols, sparseStorage))
}
}