Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always create different cascading pipes out of forked cross typed pipe #1908

Merged
merged 6 commits into from
Apr 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import cascading.flow.FlowDef
import cascading.operation.Debug
import cascading.pipe.{ CoGroup, Each, Pipe, HashJoin }
import cascading.tuple.{ Fields, Tuple => CTuple }
import com.stripe.dagon.{ FunctionK, Id, Memoize, Rule, Dag }
import com.stripe.dagon.{ FunctionK, HCache, Id, Rule, Dag }
import com.twitter.scalding.TupleConverter.{ singleConverter, tuple2Converter }
import com.twitter.scalding.TupleSetter.{ singleSetter, tup2Setter }
import com.twitter.scalding.{
Expand Down Expand Up @@ -182,165 +182,220 @@ object CascadingBackend {
}
private[this] val cache = new CompilerCache

private def compile[T](mode: Mode): FunctionK[TypedPipe, CascadingPipe] =
Memoize.functionK[TypedPipe, CascadingPipe](
new Memoize.RecursiveK[TypedPipe, CascadingPipe] {
def toFunction[T] = {
case (cp@CounterPipe(_), rec) =>
def go[A](cp: CounterPipe[A]): CascadingPipe[A] = {
val CascadingPipe(pipe0, initF, fd, conv) = rec(cp.pipe)
val cpipe = RichPipe(pipe0)
.eachTo(initF -> f0)(new IncrementCounters[A](_, TupleConverter.asSuperConverter(conv)))
CascadingPipe.single[A](cpipe, fd)
}
go(cp)
case (cp@CrossPipe(_, _), rec) =>
rec(cp.viaHashJoin)
case (cv@CrossValue(_, _), rec) =>
rec(cv.viaHashJoin)
case (DebugPipe(p), rec) =>
val inner = rec(p)
inner.copy(pipe = new Each(inner.pipe, new Debug))
case (EmptyTypedPipe, rec) =>
// just use an empty iterable pipe.
rec(IterablePipe(List.empty[T]))
case (fk@FilterKeys(_, _), rec) =>
def go[K, V](node: FilterKeys[K, V]): CascadingPipe[(K, V)] = {
val rewrite = Filter[(K, V)](node.input, FilterKeysToFilter(node.fn))
rec(rewrite)
}
go(fk)
case (f@Filter(_, _), rec) =>
// hand holding for type inference
def go[T1 <: T](f: Filter[T1]): CascadingPipe[T] = {
val Filter(input, fn) = f
val CascadingPipe(pipe, initF, fd, conv) = rec(input)
// This does not need a setter, which is nice.
val fpipe = RichPipe(pipe).filter[T1](initF)(fn)(TupleConverter.asSuperConverter(conv))
CascadingPipe[T](fpipe, initF, fd, conv)
}
/**
* Method to compile scalding's `TypedPipe`s to cascading's `Pipe`s.
*
* Since equal `TypedPipe`s define same computation we would like to compile them into referentially the same cascading's `Pipe` instance.
* This logic breaks if one typed pipe is really big and has two forked different computations both of which significantly decrease size of the data.
* If we will cache common part of this two computations in the same cascading's `Pipe` instance we end up with common part being materialized.
* Therefore for some kind of `TypedPipe`s we want to avoid their caching.
*
* `.cross` `TypedPipe` is one of examples of such `TypedPipe`s we never want to materialize and, therefore, cache.
*
* `compile` logic is separated into next functions:
* - `transform` which defines main transformation logic, without any caching applied.
* This method accepts `rec` parameter which is being called to transform children pipes.
* - `withCachePolicy` which defines transformation logic with caching applied.
* - `notCached` to support use case with `.cross` pipe, where pipe itself shouldn't be cached but `left` and `right` sides of it should be.
*/
private def compile(mode: Mode): FunctionK[TypedPipe, CascadingPipe] =
new FunctionK[TypedPipe, CascadingPipe] {

private val cache = HCache.empty[TypedPipe, CascadingPipe]

override def toFunction[U]: TypedPipe[U] => CascadingPipe[U] = withCachePolicy

private def withCachePolicy[U]: TypedPipe[U] => CascadingPipe[U] = {
// Don't cache `CrossPipe`, but cache `left` and `right` side of it
case cp@CrossPipe(left, right) =>
notCached(excludes = Set(left, right))(cp)
// Don't cache `Fork` and `WithDescriptionTypedPipe`
// since if we do cache them `CrossPipe` will end up being cached as well
case tp@Fork(_) =>
transform(tp, this)
case tp@WithDescriptionTypedPipe(_, _) =>
transform(tp, this)
// Cache all other typed pipes
case tp =>
cache.getOrElseUpdate(tp, transform(tp, this))
}

go(f)
case (f@FlatMapValues(_, _), rec) =>
def go[K, V, U](node: FlatMapValues[K, V, U]): CascadingPipe[T] =
rec(FlatMapped[(K, V), (K, U)](node.input, FlatMapValuesToFlatMap(node.fn)))

go(f)
case (fm@FlatMapped(_, _), rec) =>
// TODO we can optimize a flatmapped input directly and skip some tupleconverters
def go[A, B <: T](fm: FlatMapped[A, B]): CascadingPipe[T] = {
val CascadingPipe(pipe, initF, fd, conv) = rec(fm.input)
val fmpipe = RichPipe(pipe).flatMapTo[A, T](initF -> f0)(fm.fn)(TupleConverter.asSuperConverter(conv), singleSetter)
CascadingPipe.single[B](fmpipe, fd)
}
private def notCached(excludes: Set[TypedPipe[_]]): FunctionK[TypedPipe, CascadingPipe] =
new FunctionK[TypedPipe, CascadingPipe] {
override def toFunction[U]: TypedPipe[U] => CascadingPipe[U] = { tp =>
if (excludes.contains(tp)) withCachePolicy(tp) else transform(tp, this)
}
}

go(fm)
case (ForceToDisk(input), rec) =>
val cp = rec(input)
cp.copy(pipe = RichPipe(cp.pipe).forceToDisk)
case (Fork(input), rec) =>
// fork doesn't mean anything here since we are already planning each TypedPipe to
// something in cascading. Fork is an optimizer level operation
rec(input)
case (IterablePipe(iter), _) =>
val fd = new FlowDef
val pipe = IterableSource[T](iter, f0)(singleSetter, singleConverter).read(fd, mode)
CascadingPipe.single[T](pipe, fd)
case (f@MapValues(_, _), rec) =>
def go[K, A, B](fn: MapValues[K, A, B]): CascadingPipe[_ <: (K, B)] =
rec(Mapped[(K, A), (K, B)](fn.input, MapValuesToMap(fn.fn)))

go(f)
case (m@Mapped(_, _), rec) =>
def go[A, B <: T](m: Mapped[A, B]): CascadingPipe[T] = {
val Mapped(input, fn) = m
val CascadingPipe(pipe, initF, fd, conv) = rec(input)
val fmpipe = RichPipe(pipe).mapTo[A, T](initF -> f0)(fn)(TupleConverter.asSuperConverter(conv), singleSetter)
CascadingPipe.single[B](fmpipe, fd)
}
private def transform[T](
pipe: TypedPipe[T],
rec: FunctionK[TypedPipe, CascadingPipe]
): CascadingPipe[T] = pipe match {
case cp@CounterPipe(_) =>
def go[A](cp: CounterPipe[A]): CascadingPipe[A] = {
val CascadingPipe(pipe0, initF, fd, conv) = rec(cp.pipe)
val cpipe = RichPipe(pipe0)
.eachTo(initF -> f0)(new IncrementCounters[A](_, TupleConverter
.asSuperConverter(conv)))
CascadingPipe.single[A](cpipe, fd)
}

go(m)

case (m@MergedTypedPipe(_, _), rec) =>
OptimizationRules.unrollMerge(m) match {
case Nil => rec(EmptyTypedPipe)
case h :: Nil => rec(h)
case nonEmpty =>
// TODO: a better optimization is to not materialize this
// node at all if there is no fan out since groupBy and cogroupby
// can accept multiple inputs

val flowDef = new FlowDef
// if all of the converters are the same, we could skip some work
// here, but need to be able to see that correctly
val pipes = nonEmpty.map { p => rec(p).toPipe(f0, flowDef, singleSetter) }
val merged = new cascading.pipe.Merge(pipes.map(RichPipe.assignName): _*)
CascadingPipe.single[T](merged, flowDef)
}
case (SourcePipe(typedSrc), _) =>
val fd = new FlowDef
val pipe = typedSrc.read(fd, mode)
CascadingPipe[T](pipe, typedSrc.sourceFields, fd, typedSrc.converter[T])
case (sblk@SumByLocalKeys(_, _), rec) =>
def go[K, V](sblk: SumByLocalKeys[K, V]): CascadingPipe[(K, V)] = {
val cp = rec(sblk.input)
val localFD = new FlowDef
val cpKV: Pipe = cp.toPipe(kvFields, localFD, tup2Setter)
val msr = new MapsideReduce(sblk.semigroup, new Fields("key"), valueField, None)(singleConverter[V], singleSetter[V])
val kvpipe = RichPipe(cpKV).eachTo(kvFields -> kvFields) { _ => msr }
CascadingPipe(kvpipe, kvFields, localFD, tuple2Converter[K, V])
go(cp)
case cp@CrossPipe(_, _) =>
rec(cp.viaHashJoin)
case cv@CrossValue(_, _) =>
rec(cv.viaHashJoin)
case DebugPipe(p) =>
val inner = rec(p)
inner.copy(pipe = new Each(inner.pipe, new Debug))
case EmptyTypedPipe =>
// just use an empty iterable pipe.
rec(IterablePipe(List.empty[T]))
case fk@FilterKeys(_, _) =>
def go[K, V](node: FilterKeys[K, V]): CascadingPipe[(K, V)] = {
val rewrite = Filter[(K, V)](node.input, FilterKeysToFilter(node.fn))
rec(rewrite)
}

go(fk)
case f@Filter(_, _) =>
// hand holding for type inference
def go[T1 <: T](f: Filter[T1]): CascadingPipe[T] = {
val Filter(input, fn) = f
val CascadingPipe(pipe, initF, fd, conv) = rec(input)
// This does not need a setter, which is nice.
val fpipe = RichPipe(pipe).filter[T1](initF)(fn)(TupleConverter.asSuperConverter(conv))
CascadingPipe[T](fpipe, initF, fd, conv)
}

go(f)
case f@FlatMapValues(_, _) =>
def go[K, V, U](node: FlatMapValues[K, V, U]): CascadingPipe[T] =
rec(FlatMapped[(K, V), (K, U)](node.input, FlatMapValuesToFlatMap(node.fn)))

go(f)
case fm@FlatMapped(_, _) =>
// TODO we can optimize a flatmapped input directly and skip some tupleconverters
def go[A, B <: T](fm: FlatMapped[A, B]): CascadingPipe[T] = {
val CascadingPipe(pipe, initF, fd, conv) = rec(fm.input)
val fmpipe = RichPipe(pipe).flatMapTo[A, T](initF -> f0)(fm.fn)(TupleConverter
.asSuperConverter(conv), singleSetter)
CascadingPipe.single[B](fmpipe, fd)
}

go(fm)
case ForceToDisk(input) =>
val cp = rec(input)
cp.copy(pipe = RichPipe(cp.pipe).forceToDisk)
case Fork(input) =>
// fork doesn't mean anything here since we are already planning each TypedPipe to
// something in cascading. Fork is an optimizer level operation
rec(input)
case IterablePipe(iter) =>
val fd = new FlowDef
val pipe = IterableSource[T](iter, f0)(singleSetter, singleConverter).read(fd, mode)
CascadingPipe.single[T](pipe, fd)
case f@MapValues(_, _) =>
def go[K, A, B](fn: MapValues[K, A, B]): CascadingPipe[_ <: (K, B)] =
rec(Mapped[(K, A), (K, B)](fn.input, MapValuesToMap(fn.fn)))

go(f)
case m@Mapped(_, _) =>
def go[A, B <: T](m: Mapped[A, B]): CascadingPipe[T] = {
val Mapped(input, fn) = m
val CascadingPipe(pipe, initF, fd, conv) = rec(input)
val fmpipe = RichPipe(pipe).mapTo[A, T](initF -> f0)(fn)(TupleConverter
.asSuperConverter(conv), singleSetter)
CascadingPipe.single[B](fmpipe, fd)
}

go(m)

case m@MergedTypedPipe(_, _) =>
OptimizationRules.unrollMerge(m) match {
case Nil => rec(EmptyTypedPipe)
case h :: Nil => rec(h)
case nonEmpty =>
// TODO: a better optimization is to not materialize this
// node at all if there is no fan out since groupBy and cogroupby
// can accept multiple inputs

val flowDef = new FlowDef
// if all of the converters are the same, we could skip some work
// here, but need to be able to see that correctly
val pipes = nonEmpty.map { p => rec(p).toPipe(f0, flowDef, singleSetter) }
val merged = new cascading.pipe.Merge(pipes.map(RichPipe.assignName): _*)
CascadingPipe.single[T](merged, flowDef)
}
case SourcePipe(typedSrc) =>
val fd = new FlowDef
val pipe = typedSrc.read(fd, mode)
CascadingPipe[T](pipe, typedSrc.sourceFields, fd, typedSrc.converter[T])
case sblk@SumByLocalKeys(_, _) =>
def go[K, V](sblk: SumByLocalKeys[K, V]): CascadingPipe[(K, V)] = {
val cp = rec(sblk.input)
val localFD = new FlowDef
val cpKV: Pipe = cp.toPipe(kvFields, localFD, tup2Setter)
val msr = new MapsideReduce(sblk
.semigroup, new Fields("key"), valueField, None)(singleConverter[V], singleSetter[V])
val kvpipe = RichPipe(cpKV).eachTo(kvFields -> kvFields) { _ => msr }
CascadingPipe(kvpipe, kvFields, localFD, tuple2Converter[K, V])
}

go(sblk)
case trapped: TrappedPipe[u] =>
val cp = rec(trapped.input)
import trapped._
// TODO: with diamonds in the graph, this might not be correct
// it seems cascading requires puts the immediate tuple that
// caused the exception, so if you addTrap( ).map(f).map(g)
// and f changes the tuple structure, if we don't collapse the
// maps into 1 operation, cascading can write two different
// schemas into the trap, making it unreadable.
// this basically means there can only be one operation in between
// a trap and a forceToDisk or a groupBy/cogroupBy (any barrier).
val fd = new FlowDef
val pp: Pipe = cp.toPipe[u](sink.sinkFields, fd, TupleSetter.asSubSetter(sink.setter))
val pipe = RichPipe.assignName(pp)
fd.addTrap(pipe, sink.createTap(Write)(mode))
CascadingPipe[u](pipe, sink.sinkFields, fd, conv)
case WithDescriptionTypedPipe(input, descs) =>

@annotation.tailrec
def loop[A](
t: TypedPipe[A],
acc: List[(String, Boolean)]
): (TypedPipe[A], List[(String, Boolean)]) =
t match {
case WithDescriptionTypedPipe(i, descs) =>
loop(i, descs ::: acc)
case notDescr => (notDescr, acc)
}
go(sblk)
case (trapped: TrappedPipe[u], rec) =>
val cp = rec(trapped.input)
import trapped._
// TODO: with diamonds in the graph, this might not be correct
// it seems cascading requires puts the immediate tuple that
// caused the exception, so if you addTrap( ).map(f).map(g)
// and f changes the tuple structure, if we don't collapse the
// maps into 1 operation, cascading can write two different
// schemas into the trap, making it unreadable.
// this basically means there can only be one operation in between
// a trap and a forceToDisk or a groupBy/cogroupBy (any barrier).
val fd = new FlowDef
val pp: Pipe = cp.toPipe[u](sink.sinkFields, fd, TupleSetter.asSubSetter(sink.setter))
val pipe = RichPipe.assignName(pp)
fd.addTrap(pipe, sink.createTap(Write)(mode))
CascadingPipe[u](pipe, sink.sinkFields, fd, conv)
case (WithDescriptionTypedPipe(input, descs), rec) =>

@annotation.tailrec
def loop[A](t: TypedPipe[A], acc: List[(String, Boolean)]): (TypedPipe[A], List[(String, Boolean)]) =
t match {
case WithDescriptionTypedPipe(i, descs) =>
loop(i, descs ::: acc)
case notDescr => (notDescr, acc)
}

val (root, allDesc) = loop(input, descs)
val cp = rec(root)
cp.copy(pipe = applyDescriptions(cp.pipe, allDesc))
val (root, allDesc) = loop(input, descs)
val cp = rec(root)
cp.copy(pipe = applyDescriptions(cp.pipe, allDesc))

case (WithOnComplete(input, fn), rec) =>
val cp = rec(input)
val next = new Each(cp.pipe, Fields.ALL, new CleanupIdentityFunction(fn))
cp.copy(pipe = next)
case WithOnComplete(input, fn) =>
val cp = rec(input)
val next = new Each(cp.pipe, Fields.ALL, new CleanupIdentityFunction(fn))
cp.copy(pipe = next)

case (hcg@HashCoGroup(_, _, _), rec) =>
def go[K, V1, V2, R](hcg: HashCoGroup[K, V1, V2, R]): CascadingPipe[(K, R)] =
planHashJoin(hcg.left,
hcg.right,
hcg.joiner,
rec)
case hcg@HashCoGroup(_, _, _) =>
def go[K, V1, V2, R](hcg: HashCoGroup[K, V1, V2, R]): CascadingPipe[(K, R)] =
planHashJoin(hcg.left,
hcg.right,
hcg.joiner,
rec)

go(hcg)
case (ReduceStepPipe(rs), rec) =>
planReduceStep(rs, rec)
go(hcg)
case ReduceStepPipe(rs) =>
planReduceStep(rs, rec)

case (CoGroupedPipe(cg), rec) =>
planCoGroup(cg, rec)
}
})
case CoGroupedPipe(cg) =>
planCoGroup(cg, rec)
}
}

private def applyDescriptions(p: Pipe, descriptions: List[(String, Boolean)]): Pipe = {
val ordered = descriptions.collect { case (d, false) => d }.reverse
Expand Down
Loading