Skip to content

Commit

Permalink
Coverage and documentation for AveragedValue (#589)
Browse files Browse the repository at this point in the history
  • Loading branch information
sritchie authored Dec 2, 2016
1 parent 0ff8bfe commit 0841e92
Show file tree
Hide file tree
Showing 16 changed files with 603 additions and 121 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Add `metricsLaws[T]` to `BaseProperties` in `algebird-test`: https://github.com/twitter/algebird/pull/584
* Modify generated `Tuple2Monoid`, etc to extend `TupleNSemigroup`, giving subclasses access to efficient `sumOption`: https://github.com/twitter/algebird/pull/585
* optimize `Generated{Abstract,Product}Algebra.sumOption` with benchmarking https://github.com/twitter/algebird/pull/591
* Add an efficient `sumOption`, `+`, `-` and docs to `AveragedValue`: https://github.com/twitter/algebird/pull/589

### Version 0.12.2 ###

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.twitter.algebird
package benchmark

import scala.util.Random
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole

import scala.math._

object AveragedValueBenchmark {
@State(Scope.Benchmark)
class AVState {
@Param(Array("10000"))
var numElements: Int = 0

var inputData: Seq[AveragedValue] = _

@Setup(Level.Trial)
def setup(): Unit = {
inputData = Seq.fill(numElements)(AveragedValue(Random.nextInt(1000).toLong))
}
}
}

class AveragedValueBenchmark {
import AveragedValueBenchmark._
import AveragedGroup.{ plus, sumOption }

@Benchmark
def timePlus(state: AVState, bh: Blackhole) =
bh.consume(state.inputData.reduce(plus(_, _)))

@Benchmark
def timeSumOption(state: AVState, bh: Blackhole) =
bh.consume(sumOption(state.inputData))
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.twitter.algebird.benchmark
package com.twitter.algebird
package benchmark

import com.twitter.algebird._
import scala.util.Random
import com.twitter.bijection._

import com.twitter.algebird.util._
import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations._
import org.openjdk.jmh.infra.Blackhole

import scala.math._

class OldMonoid(bits: Int) extends HyperLogLogMonoid(bits) {
import HyperLogLog._

Expand Down
174 changes: 138 additions & 36 deletions algebird-core/src/main/scala/com/twitter/algebird/AveragedValue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,60 +16,162 @@ limitations under the License.

package com.twitter.algebird

/**
* Tracks the count and mean value of Doubles in a data stream.
*
* Adding two instances of [[AveragedValue]] with [[+]]
* is equivalent to taking an average of the two streams, with each
* stream weighted by its count.
*
* The mean calculation uses a numerically stable online algorithm
* suitable for large numbers of records, similar to Chan et. al.'s
* [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
* parallel variance algorithm on Wikipedia]]. As long as your count
* doesn't overflow a Long, the mean calculation won't overflow.
*
* @see [[MomentsGroup.getCombinedMean]] for implementation of [[+]]
* @param count the number of aggregated items
* @param value the average value of all aggregated items
*/
case class AveragedValue(count: Long, value: Double) {
/**
* Returns a copy of this instance with a negative value. Note that
*
* {{{
* a + -b == a - b
* }}}
*/
def unary_- : AveragedValue = copy(count = -count)

/**
* Averages this instance with the *opposite* of the supplied
* [[AveragedValue]] instance, effectively subtracting out that
* instance's contribution to the mean.
*
* @param r the instance to subtract
* @return an instance with `r`'s stream subtracted out
*/
def -(r: AveragedValue): AveragedValue = AveragedGroup.minus(this, r)

/**
* Averages this instance with another [[AveragedValue]] instance.
* @param r the other instance
* @return an instance representing the mean of this instance and `r`.
*/
def +(r: AveragedValue): AveragedValue = AveragedGroup.plus(this, r)

/**
* Returns a new instance that averages `that` into this instance.
*
* @param that value to average into this instance
* @return an instance representing the mean of this instance and `that`.
*/
def +(that: Double): AveragedValue =
AveragedValue(
count + 1L,
MomentsGroup.getCombinedMean(count, value, 1L, that))

/**
* Returns a new instance that averages `that` into this instance.
*
* @param that value to average into this instance
* @return an instance representing the mean of this instance and `that`.
*/
def +[N](that: N)(implicit num: Numeric[N]): AveragedValue =
this + num.toDouble(that)
}

/**
* Provides a set of operations needed to create and use
* [[AveragedValue]] instances.
*/
object AveragedValue {
/** implicit instance of [[Group]][AveragedValue] */
implicit val group = AveragedGroup

/**
* Returns an [[Aggregator]] that uses [[AveragedValue]] to
* calculate the mean of all `Double` values in the stream. Each
* Double value receives a count of 1 during aggregation.
*/
def aggregator: Aggregator[Double, AveragedValue, Double] = Averager

/**
* Returns an [[Aggregator]] that uses [[AveragedValue]] to
* calculate the mean of all values in the stream. Each numeric
* value receives a count of `1` during aggregation.
*
* @tparam N numeric type to convert into `Double`
*/
def numericAggregator[N](implicit num: Numeric[N]): MonoidAggregator[N, AveragedValue, Double] =
Aggregator.prepareMonoid { n: N => AveragedValue(num.toDouble(n)) }
.andThenPresent(_.value)

def apply[V <% Double](v: V) = new AveragedValue(1L, v)
def apply[V <% Double](c: Long, v: V) = new AveragedValue(c, v)
}

case class AveragedValue(count: Long, value: Double)
/**
* Creates [[AveragedValue]] with a value of `v` and a count of 1.
*
* @tparam V type with an implicit conversion to Double
*/
def apply[V <% Double](v: V): AveragedValue = apply(1L, v)

object AveragedGroup extends Group[AveragedValue] {
// When combining averages, if the counts sizes are too close we should use a different
// algorithm. This constant defines how close the ratio of the smaller to the total count
// can be:
private val STABILITY_CONSTANT = 0.1
/**
* Uses a more stable online algorithm which should
* be suitable for large numbers of records
* similar to:
* http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
* Creates an [[AveragedValue]] with a count of of `c` and a value
* of `v`.
*
* @tparam V type with an implicit conversion to Double
*/
def apply[V <% Double](c: Long, v: V): AveragedValue = new AveragedValue(c, v)
}

/**
* [[Group]] implementation for [[AveragedValue]].
*
* @define T `AveragedValue`
*/
object AveragedGroup extends Group[AveragedValue] {
import MomentsGroup.getCombinedMean

val zero = AveragedValue(0L, 0.0)

override def isNonZero(av: AveragedValue) = (av.count != 0L)

override def negate(av: AveragedValue) = AveragedValue(-av.count, av.value)

def plus(cntAve1: AveragedValue, cntAve2: AveragedValue): AveragedValue = {
val (big, small) = if (cntAve1.count >= cntAve2.count)
(cntAve1, cntAve2)
else
(cntAve2, cntAve1)
val n = big.count
val k = small.count
val newCnt = n + k
if (newCnt == n) {
// Handle zero without allocation
big
} else if (newCnt == 0L) {
zero
} else {
val an = big.value
val ak = small.value
val scaling = k.toDouble / newCnt
// a_n + (a_k - a_n)*(k/(n+k)) is only stable if n is not approximately k
val newAve = if (scaling < STABILITY_CONSTANT) (an + (ak - an) * scaling) else (n * an + k * ak) / newCnt
new AveragedValue(newCnt, newAve)
override def negate(av: AveragedValue) = -av

/**
* Optimized implementation of [[plus]]. Uses internal mutation to
* combine the supplied [[AveragedValue]] instances without creating
* intermediate objects.
*/
override def sumOption(iter: TraversableOnce[AveragedValue]): Option[AveragedValue] =
if (iter.isEmpty) None
else {
var count = 0L
var average = 0.0
iter.foreach {
case AveragedValue(c, v) =>
average = getCombinedMean(count, average, c, v)
count += c
}
Some(AveragedValue(count, average))
}

/**
* @inheritdoc
* @see [[AveragedValue.+]] for the implementation
*/
def plus(l: AveragedValue, r: AveragedValue): AveragedValue = {
val n = l.count
val k = r.count
val newAve = getCombinedMean(n, l.value, k, r.value)
AveragedValue(n + k, newAve)
}
}

/**
* [[Aggregator]] that uses [[AveragedValue]] to calculate the mean
* of all `Double` values in the stream. Each Double value receives a
* count of 1 during aggregation.
*/
object Averager extends MonoidAggregator[Double, AveragedValue, Double] {
val monoid = AveragedGroup
def prepare(value: Double) = AveragedValue(value)
Expand Down
49 changes: 40 additions & 9 deletions algebird-core/src/main/scala/com/twitter/algebird/First.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,63 @@ limitations under the License.
package com.twitter.algebird

/**
* First tracks the "most recent" item by the order in which items
* are seen.
* Tracks the "least recent", or earliest, wrapped instance of `T` by
* the order in which items are seen.
*
* @param get wrapped instance of `T`
*/
case class First[@specialized(Int, Long, Float, Double) +T](get: T) {
/**
* Returns this instance, always.
*
* @param r ignored instance of `First[U]`
*/
def +[U >: T](r: First[U]): First[U] = this
}

/**
* Provides a set of operations and typeclass instances needed to use
* [[First]] instances.
*/
object First extends FirstInstances {
/**
* Returns an [[Aggregator]] that selects the first instance of `T`
* in the aggregated stream.
*/
def aggregator[T]: FirstAggregator[T] = FirstAggregator()
}

private[algebird] sealed abstract class FirstInstances {
def firstSemigroup[T] = new Semigroup[T] {
def plus(l: T, r: T): T = l
/**
* Returns a [[Semigroup]] instance with a `plus` implementation
* that always returns the first (ie, the left) `T` argument.
*
* This semigroup's `sumOption` is efficient; it only selects the
* head of the `TraversableOnce` instance, leaving the rest
* untouched.
*/
def firstSemigroup[T]: Semigroup[T] =
new Semigroup[T] {
def plus(l: T, r: T): T = l

override def sumOption(iter: TraversableOnce[T]): Option[T] =
if (iter.isEmpty) None else Some(iter.toIterator.next)
}
override def sumOption(iter: TraversableOnce[T]): Option[T] =
if (iter.isEmpty) None else Some(iter.toIterator.next)
}

/**
* Returns a [[Semigroup]] instance for [[First]][T]. The `plus`
* implementation always returns the first (ie, the left) `First[T]`
* argument.
*/
implicit def semigroup[T]: Semigroup[First[T]] = firstSemigroup[First[T]]
}

/**
* [[Aggregator]] that selects the first instance of `T` in the
* aggregated stream.
*/
case class FirstAggregator[T]() extends Aggregator[T, T, T] {
def prepare(v: T) = v

val semigroup: Semigroup[T] = First.firstSemigroup[T]

def present(v: T) = v
}
38 changes: 34 additions & 4 deletions algebird-core/src/main/scala/com/twitter/algebird/Last.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,53 @@ limitations under the License.
package com.twitter.algebird

/**
* Last tracks the "most recent" item by the order in which items are
* seen.
* Tracks the "most recent", or last, wrapped instance of `T` by the
* order in which items are seen.
*
* @param get wrapped instance of `T`
*/
case class Last[@specialized(Int, Long, Float, Double) +T](get: T) {
/**
* Returns the argument `r`, always.
*
* @param r returned of `Last[U]`
*/
def +[U >: T](r: Last[U]): Last[U] = r
}

/**
* Provides a set of operations and typeclass instances needed to use
* [[Last]] instances.
*/
object Last extends LastInstances {
/**
* Returns an [[Aggregator]] that selects the last instance of `T`
* in the aggregated stream.
*/
def aggregator[T]: LastAggregator[T] = LastAggregator()
}

private[algebird] sealed abstract class LastInstances {
implicit def semigroup[T]: Semigroup[Last[T]] = Semigroup.from { (l, r) => r }
/**
* Returns a [[Semigroup]] instance with a `plus` implementation
* that always returns the last (ie, the right) `T` argument.
*/
def lastSemigroup[T]: Semigroup[T] = Semigroup.from { (l, r) => r }

/**
* Returns a [[Semigroup]] instance for [[Last]][T]. The `plus`
* implementation always returns the last (ie, the right) `Last[T]`
* argument.
*/
implicit def semigroup[T]: Semigroup[Last[T]] = lastSemigroup[Last[T]]
}

/**
* [[Aggregator]] that selects the last instance of `T` in the
* aggregated stream.
*/
case class LastAggregator[T]() extends Aggregator[T, T, T] {
def prepare(v: T) = v
val semigroup: Semigroup[T] = Semigroup.from { (l: T, r: T) => r }
val semigroup: Semigroup[T] = Last.lastSemigroup[T]
def present(v: T) = v
}
Loading

0 comments on commit 0841e92

Please sign in to comment.