-
Notifications
You must be signed in to change notification settings - Fork 708
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Always create different cascading pipes out of forked cross typed pipe #1908
Conversation
I've tried to write optimization rule to ensure the same, but:
|
I thought a rule like:
might be all that is needed: we always push Fork above a cross. Will that not work? Note: we do apply optimization rules to toPipe: |
Yes, we do, but this is not going to help in this case. Issue arise because a) we cache
This I might misunderstand but I don't see how it helps. If you do this, subsequent |
And I have related question - is there any way right now to force scalding to compute same typed pipe twice? Is it going to be always squashed? I.e. is it true that:
Always ends up with having fork after |
Currently no. There is no way for rerun. In this case you probably don’t want to materialize. We could choose to not materialize pure map operations if they could be helped by moving the fork above Map operations, then map fusion should do the thing we want. |
flatMap is more dubious since it often used for filtering as well. |
I see. I think in future we might want to have |
@johnynek tests are passing now so I think it's good to go, wdyt? As a side note I found that some counters (like |
} | ||
} | ||
|
||
private val transform: RecursiveK[TypedPipe, CascadingPipe] = new RecursiveK[TypedPipe, CascadingPipe] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m worried this is causing nothing below here to be cached. Note the use of rec here used to cache. Now it has been disabled for everything in here.
I think this is a pretty large difference from the current behavior and basically only the outer most pipe will be cache. I don’t think that’s what we want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess code isn't 100% clear, but everything apart from Cross
implementation (cross itself, and not left
and right
arguments to it), Fork
and WithDescriptionTypedPipe
will be cached.
transform
here is pure implementation of cascading backend tranformation without any caching logic applied. On top of that caching logic applied (in cached
) which is:
- don't cache
Fork
andWithDescriptionTypedPipe
- don't cache
Cross
, but cacheleft
andright
branches of it - cache everything else
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I think adding comments will help. There are two layers of caching here that is confusing.
- the first cache is the explicit HCache
- the second cache is the RecursiveK which caches each call.
So, I think you are right, the transform is caching, but I think nested Cross will be cached by transform.
so a.cross(b).cross(c)
I think will hit the inner cross inside transform, which will be cached.
Do you think that's not right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might know where your confusion comes from: before this code was using Memoize
over RecursiveK
. Now this transform
function isn't caching at all and I basically reimplemented it's caching ability explicitly with a bit different policy. And since core transforming function is still RecursiveK
by essence I reused it from Memoize
.
In the case you wrote - first cross will be not cached, then it will use notCached(Set(a.cross(b)), c))
which leads to cached(a.cross(b))
which leads to not caching inner cross as well. I guess I should rename .cached
to .withCachePolicy
or something like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, yeah. I was getting confused with new RecursiveK
thinking it was caching. It isn't. I see it is not controlling how the recursion is done.
Instead, you call it with transform(tp, this)
which recurses using the entire outer strategy.
Yeah, I now think I understand and think it is correct, but I think it is a bit subtle. Can we add a comment that describes and summarizes the points of confusion and clears them up. We are going be confused later:
- why are we doing this? We are avoiding caching certain node types because they make bad plans that are usually worse.
- how are we achieving this? We are wiring up this kind of co-recursive check so that the recursion has a branch and is sure not to cache certain types of nodes...
something like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed RecursiveK
usage. I think it's more straightforward and simpler this way. Also added comment explaining motivation for this logic.
@johnynek do you mind to take a look on updated version? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the latency. The new version is clearer to me now, thanks.
It would really be ideal to have a better algorithm for estimating if we should materialize or not a particular pipe.
@johnynek I have a feeling like at least all |
I don’t think it is clear. HashJoins can often be used to filter, and I think we generally would want to materialize filters that can out. I think we certainly want to materialize joins that fan out, no? That would be crazy to recompute joins wouldn’t it be? |
Maybe the rule is as simple as never materialize map-only operations. |
As described in #1905 currently scalding might create forks in cascading graph after
cross
nodes. This is almost never wanted and broke one of production jobs in Twitter - job started to produce petabytes of intermediate data.In this PR:
.toPipe
api and another one with two differentmap
s aftercross
.CascadingBackend
in a way it never caches cascading pipe created out ofcross
operation.