-
Notifications
You must be signed in to change notification settings - Fork 708
Aggregation using Algebird Aggregators
#Aggregators
Aggregators enable creation of reusable and compassable aggregation functions. There are three main functions on Aggregator trait.
trait Aggregator[-A, B, +C] {
/**
* Transform the input before the reduction.
*/
def prepare(input: A): B
/**
* Combine two values to produce a new value.
*/
def reduce(l: B, r: B): B
/**
* Transform the output of the reduction.
*/
def present(reduction: B): C
}
##Examples
In this section we will use the data below to show SQL aggregate functions and how to build similar aggregate functions in Scalding.
OrderID | OrderDate | OrderPrice | OrderQuantity | CustomerName |
---|---|---|---|---|
1 | 12/22/2005 | 160 | 2 | Smith |
2 | 08/10/2005 | 190 | 2 | Johnson |
3 | 07/13/2005 | 500 | 5 | Baldwin |
4 | 07/15/2005 | 420 | 2 | Smith |
5 | 12/22/2005 | 1000 | 4 | Wood |
6 | 10/2/2005 | 820 | 4 | Smith |
7 | 11/03/2005 | 2000 | 2 | Baldwin |
case class Order(orderId: Int, orderDate: String, orderPrice: Long, orderQuantity: Long,
customerName: String)
val orders = List(
Order(1, "12/22/2005", 160, 2, "Smith"),
Order(2, "08/10/2005", 190, 2, "Johnson"),
Order(3, "07/13/2005", 500, 5, "Baldwin"),
Order(4, "07/15/2005", 420, 2, "Smith"),
Order(5, "12/22/2005", 1000, 4, "Wood"),
Order(6, "10/2/2005", 820, 4, "Smith"),
Order(7, "11/03/2005", 2000, 2, "Baldwin"))
The SQL COUNT function returns the number of rows in a table satisfying the criteria specified in the WHERE clause.
SQL:
SELECT COUNT (*) FROM Orders
WHERE CustomerName = 'Smith'
//Scalding:
import com.twitter.algebird.Aggregator.count
TypedPipe.from(orders)
.aggregate(count(_.customerName == "Smith"))
If you don’t specify a WHERE clause when using COUNT, your statement will simply return the total number of rows in the table
SQL:
SELECT COUNT(*) FROM Orders
//Scalding:
import com.twitter.algebird.Aggregator.size
TypedPipe.from(orders)
.aggregate(size)
You can also use aggregate functions with Group By
.
SQL:
Select CustomerName, Count(CustomerName)
From Orders
Group by CustomerName
//Scalding:
import com.twitter.algebird.Aggregator.size
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(size)
Output:
(Baldwin,2)
(Johnson,1)
(Smith,3)
(Wood,1)
The SQL SUM function is used to return the sum of an expression in a SELECT statement
SQL:
SELECT SUM(OrderQuantity)
FROM Orders
GROUP BY CustomerName
//Scalding:
import com.twitter.algebird.Aggregator.sumBy
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(sumBy(_.orderQuantity))
Output:
(Baldwin,7)
(Johnson,2)
(Smith,8)
(Wood,4)
The SQL MAX function retrieves the maximum numeric value from a column.
SQL:
SELECT CustomerName, MAX(OrderQuantity)
FROM Order
GROUP By CustomerName
//Scalding:
import com.twitter.algebird.Aggregator.maxBy
val maxOp = maxBy[Order, Long](_.orderQuantity)
.andThenPresent(_.orderQuantity)
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(maxOp)
Output:
(Baldwin,5)
(Johnson,2)
(Smith,4)
(Wood,4)
The SQL MIN function selects the smallest number from a column.
SQL:
SELECT CustomerName, MIN(OrderQuantity)
FROM Order
GROUP By CustomerName
//Scalding:
import com.twitter.algebird.Aggregator.minBy
val minOp = minBy[Order, Long](_.orderQuantity)
.andThenPresent(_.orderQuantity)
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(minOp)
The SQL AVG function calculates average value of a numeric column.
SQL:
SELECT CustomerName, AVG(OrderQuantity)
FROM Order
GROUP BY CustomerName
import com.twitter.algebird._
val avg = AveragedValue.aggregator.composePrepare[Order](_.orderQuantity)
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(avg)
Output:
(Baldwin,3.5)
(Johnson,2.0)
(Smith,2.66)
(Wood,4.0)
The SQL DISTINCT function selects distinct values from a column. In scalding we use a probabilistic data structure called HyperLogLog to calculate distinct values.
SQL:
SELECT DISTINCT CustomerName
FROM Order
//Scalding:
import com.twitter.algebird.HyperLogLogAggregator
val unique = HyperLogLogAggregator
//HLL Error is about 1.04/sqrt(2^{bits}), so you want something like 12 bits for 1% error
// which means each HLLInstance is about 2^{12} = 4kb per instance.
.sizeAggregator(bits = 12)
//convert customer names to UTF-8 encoded bytes as HyperLogLog expects a byte array.
.composePrepare[Order](_.customerName.getBytes("UTF-8"))
TypedPipe.from(orders)
.aggregate(unique)
Output:
4.0
import com.twitter.algebird.mutable.PriorityQueueToListAggregator
val topK = new PriorityQueueToListAggregator[Long](2)
.composePrepare[Order](_.orderQuantity)
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(topK)
Output:
(Baldwin,List(2, 5))
(Johnson,List(2))
(Smith,List(2, 2))
(Wood,List(4))
Aggregators can be composed to perform multiple aggregation in one pass.
import com.twitter.algebird.Aggregator._
val maxOp = maxBy[Order, Long](_.orderQuantity)
val minOp = minBy[Order, Long](_.orderPrice)
val combinedMetric = maxOp.join(minOp)
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(combinedMetric)
Output:
(Baldwin,(5,500))
(Johnson,(2,190))
(Smith,(4,160))
(Wood,(4,1000))
composition can also be used to combine two or more aggregators to derive a new aggregate function.
import com.twitter.algebird.Aggregator._
val sumAggregator = sumBy[Order, Long](_.orderQuantity)
val sizeAggregator = size
val avg = sumAggregator.join(sizeAggregator)
.andThenPresent{ case (sum, count) => sum.toDouble / count.toDouble }
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(avg)
Output:
(Baldwin,3.5)
(Johnson,2.0)
(Smith,2.66)
(Wood,4.0)
you can join up to 22 aggregators
by using GeneratedTupleAggregator
. Example below show calculating Max, Min, Sum, Count, Mean and Standard Deviation in one pass by joining different aggregators.
import com.twitter.algebird.Aggregator._
import com.twitter.algebird.{GeneratedTupleAggregator, MomentsAggregator, Moments }
val maxOp = maxBy[Order, Long](_.orderPrice)
val minOp = minBy[Order, Long](_.orderPrice)
val sum = sumBy[Order, Long](_.orderPrice)
val moments = Moments.aggregator.composePrepare[Order](_.orderPrice.toDouble)
val multiAggregator = GeneratedTupleAggregator
.from4(maxOp, minOp, sum, moments)
.andThenPresent {
case (mmax, mmin, ssum, moment) =>
(mmax.orderPrice, mmin.orderPrice, ssum, moment.count, moment.mean, moment.stddev)
}
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(multiAggregator)
Output:
(Baldwin,(2000,500,2500,2,1250.0,750.0))
(Johnson,(190,190,190,1,190.0,0.0))
(Smith,(820,160,1400,3,466.66,271.46))
(Wood,(1000,1000,1000,1,1000.0,0.0))
- Scaladocs
- Getting Started
- Type-safe API Reference
- SQL to Scalding
- Building Bigger Platforms With Scalding
- Scalding Sources
- Scalding-Commons
- Rosetta Code
- Fields-based API Reference (deprecated)
- Scalding: Powerful & Concise MapReduce Programming
- Scalding lecture for UC Berkeley's Analyzing Big Data with Twitter class
- Scalding REPL with Eclipse Scala Worksheets
- Scalding with CDH3U2 in a Maven project
- Running your Scalding jobs in Eclipse
- Running your Scalding jobs in IDEA intellij
- Running Scalding jobs on EMR
- Running Scalding with HBase support: Scalding HBase wiki
- Using the distributed cache
- Unit Testing Scalding Jobs
- TDD for Scalding
- Using counters
- Scalding for the impatient
- Movie Recommendations and more in MapReduce and Scalding
- Generating Recommendations with MapReduce and Scalding
- Poker collusion detection with Mahout and Scalding
- Portfolio Management in Scalding
- Find the Fastest Growing County in US, 1969-2011, using Scalding
- Mod-4 matrix arithmetic with Scalding and Algebird
- Dean Wampler's Scalding Workshop
- Typesafe's Activator for Scalding