-
Notifications
You must be signed in to change notification settings - Fork 708
Aggregation using Algebird Aggregators
#Aggregators
Aggregators enable creation of reusable and compassable aggregation functions. There are three main functions on Aggregator trait.
trait Aggregator[-A, B, +C] extends {
/**
* Transform the input before the reduction.
*/
def prepare(input: A): B
/**
* Combine two values to produce a new value.
*/
def reduce(l: B, r: B): B
/**
* Transform the output of the reduction.
*/
def present(reduction: B): C
}
##Examples
In this section we will use the data below to show SQL aggregate functions and how to build similar aggregate functions in Scalding.
OrderID | OrderDate | OrderPrice | OrderQuantity | CustomerName |
---|---|---|---|---|
1 | 12/22/2005 | 160 | 2 | Smith |
2 | 08/10/2005 | 190 | 2 | Johnson |
3 | 07/13/2005 | 500 | 5 | Baldwin |
4 | 07/15/2005 | 420 | 2 | Smith |
5 | 12/22/2005 | 1000 | 4 | Wood |
6 | 10/2/2005 | 820 | 4 | Smith |
7 | 11/03/2005 | 2000 | 2 | Baldwin |
case class Order(orderId: Int, orderDate: String, orderPrice: Long, orderQuantity: Long,
customerName: String)
val orders = List(
Order(1, "12/22/2005", 160, 2, "Smith"),
Order(2, "08/10/2005", 190, 2, "Johnson"),
Order(3, "07/13/2005", 500, 5, "Baldwin"),
Order(4, "07/15/2005", 420, 2, "Smith"),
Order(5, "12/22/2005", 1000, 4, "Wood"),
Order(6, "10/2/2005", 820, 4, "Smith"),
Order(7, "11/03/2005", 2000, 2, "Baldwin"))
The SQL COUNT function returns the number of rows in a table satisfying the criteria specified in the WHERE clause.
SQL:
SELECT COUNT (*) FROM Orders
WHERE CustomerName = 'Smith'
Scalding:
TypedPipe.from(orders)
.aggregate(count(_.customerName == "Smith"))
If you don’t specify a WHERE clause when using COUNT, your statement will simply return the total number of rows in the table
SQL:
SELECT COUNT(*) FROM Orders
Scalding:
TypedPipe.from(orders)
.aggregate(size)
You can also use aggregate functions with Group By
.
SQL:
Select CustomerName, Count(CustomerName)
From Orders
Group by CustomerName
Scalding:
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(size)
Output:
(Baldwin,2)
(Johnson,1)
(Smith,3)
(Wood,1)
The SQL SUM function is used to return the sum of an expression in a SELECT statement
SQL:
SELECT SUM(OrderQuantity)
FROM Orders
GROUP BY CustomerName
Scalding:
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(sumBy(_.orderQuantity))
Output:
(Baldwin,7)
(Johnson,2)
(Smith,8)
(Wood,4)
The SQL MAX function retrieves the maximum numeric value from a column.
SQL:
SELECT CustomerName, MAX(OrderQuantity)
FROM Order
GROUP By CustomerName
Scalding:
val max = maxBy[Order, Long](_.orderQuantity)
.andThenPresent(_.orderQuantity)
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(max)
Output:
(Baldwin,5)
(Johnson,2)
(Smith,4)
(Wood,4)
The SQL MIN function selects the smallest number from a column.
SQL:
SELECT CustomerName, MIN(OrderQuantity)
FROM Order
GROUP By CustomerName
Scalding:
val min = minBy[Order, Long](_.orderQuantity)
.andThenPresent(_.orderQuantity)
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(min)
The SQL DISTINCT function selects distinct values from a column.
SQL:
SELECT DISTINCT CustomerName
FROM Order
Scalding:
val unique = HyperLogLogAggregator
.sizeAggregator(4)
.composePrepare[Order](_.customerName.getBytes("UTF-8"))
TypedPipe.from(orders)
.aggregate(unique)
Output:
3.0
Aggregators can be composed to perform multiple aggregation in one pass.
val max = maxBy[Order, Long](_.orderQuantity)
val min = minBy[Order, Long](_.orderPrice)
val combinedMetric = max.join(min)
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(combinedMetric)
Output:
(Baldwin,(5,500))
(Johnson,(2,190))
(Smith,(4,160))
(Wood,(4,1000))
composition can also be used to combine two or more aggregators to derive a new aggregate function.
val sumAggregator = sumBy[Order, Long](_.orderQuantity)
val sizeAggregator = size
val avg = sumAggregator.join(sizeAggregator)
.andThenPresent{ case (sum, count) => sum.toDouble / count.toDouble }
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(avg)
Output:
(Baldwin,3.5)
(Johnson,2.0)
(Smith,2.66)
(Wood,4.0)
val max = maxBy[Order, Long](_.orderPrice)
val min = minBy[Order, Long](_.orderPrice)
val sum = sumBy[Order, Long](_.orderPrice)
val moments = Moments.aggregator.composePrepare[Order](_.orderPrice.toDouble)
val multiAggregator = GeneratedTupleAggregator
.from4(max, min, sum, moments)
.andThenPresent {
case (mmax, mmin, ssum, moment) =>
(mmax.orderPrice, mmin.orderPrice, ssum, moment.count, moment.mean, moment.stddev)
}
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(multiAggregator)
Output:
(Baldwin,(2000,500,2500,2,1250.0,750.0))
(Johnson,(190,190,190,1,190.0,0.0))
(Smith,(820,160,1400,3,466.66,271.46))
(Wood,(1000,1000,1000,1,1000.0,0.0))
val topK = new PriorityQueueToListAggregator[Long](2)
.composePrepare[Order](_.orderQuantity)
TypedPipe.from(orders)
.groupBy(_.customerName)
.aggregate(topK)
Output:
(Baldwin,List(2, 5))
(Johnson,List(2))
(Smith,List(2, 2))
(Wood,List(4))
- Scaladocs
- Getting Started
- Type-safe API Reference
- SQL to Scalding
- Building Bigger Platforms With Scalding
- Scalding Sources
- Scalding-Commons
- Rosetta Code
- Fields-based API Reference (deprecated)
- Scalding: Powerful & Concise MapReduce Programming
- Scalding lecture for UC Berkeley's Analyzing Big Data with Twitter class
- Scalding REPL with Eclipse Scala Worksheets
- Scalding with CDH3U2 in a Maven project
- Running your Scalding jobs in Eclipse
- Running your Scalding jobs in IDEA intellij
- Running Scalding jobs on EMR
- Running Scalding with HBase support: Scalding HBase wiki
- Using the distributed cache
- Unit Testing Scalding Jobs
- TDD for Scalding
- Using counters
- Scalding for the impatient
- Movie Recommendations and more in MapReduce and Scalding
- Generating Recommendations with MapReduce and Scalding
- Poker collusion detection with Mahout and Scalding
- Portfolio Management in Scalding
- Find the Fastest Growing County in US, 1969-2011, using Scalding
- Mod-4 matrix arithmetic with Scalding and Algebird
- Dean Wampler's Scalding Workshop
- Typesafe's Activator for Scalding