-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flink] FlinkRunner initializes the same split twice (#31313) #33606
Conversation
Assigning reviewers. If you would like to opt out of this review, comment R: @johnjcasey added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
142d3ba
to
e86f6fc
Compare
R: @dmvk |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
...java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSource.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fix makes sense, good catch 👍 My main question is whether not re-listing partitions breaks the contract or not 🤔
this.splitsInitialized = false; | ||
this.splitsInitialized = splitsInitialized; | ||
|
||
LOG.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This happens only when the job starts, I think info is fine here.
.../beam/runners/flink/translation/wrappers/streaming/io/source/FlinkSourceSplitEnumerator.java
Outdated
Show resolved
Hide resolved
List<FlinkSourceSplit<T>> splitsForSubtask = | ||
pendingSplits.computeIfAbsent(subtaskId, ignored -> new ArrayList<>()); | ||
splitsForSubtask.addAll(splits); | ||
// reshuffle splits, needed after rescaling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 would it make sense to provide additional reasoning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change could be reverted. I kept it, because it might make a difference under one improbable situation - reader returns splits to enumerator, and just after that a rescale happens. In all other cases this is equivalent to the previous implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please revert it then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could, but why? It helps under a rare circumstances and does no harm otherwise, so ... why revert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's where I was headed. If it fixes a corner case it CAN'T be reverted and we should document reasoning about the corner case to provide context for whoever touches the code next.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. The code was originally to solve the rescaling issue, but the whole process works a little differently and the corner case that this might solve is highly speculative, so I reverted the code to the previous version.
public void start() { | ||
context.callAsync( | ||
() -> { | ||
try { | ||
LOG.info("Starting source {}", beamSource); | ||
List<? extends Source<T>> beamSplitSourceList = splitBeamSource(); | ||
Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = new HashMap<>(); | ||
int i = 0; | ||
for (Source<T> beamSplitSource : beamSplitSourceList) { | ||
int targetSubtask = i % context.currentParallelism(); | ||
List<FlinkSourceSplit<T>> splitsForTask = | ||
flinkSourceSplitsList.computeIfAbsent( | ||
targetSubtask, ignored -> new ArrayList<>()); | ||
splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource)); | ||
i++; | ||
if (!splitsInitialized) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about minimizing the changset and minizing nesting by doing
@Override
public void start() {
if (!splitsInitialized) {
initializeSplits()
}
}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, is this correct? are splits guaranteed to be stable over time? is it possible that you for example want to re-list kafka partitions? How are new partitions handled in Beam?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This codepath works for deprecated Read transforms. Beam does modern IO using SDF, which can handle dynamic splitting. In Read, splitting is essentially static.
@@ -117,7 +103,8 @@ public Boundedness getBoundedness() { | |||
|
|||
@Override | |||
public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> | |||
createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext) throws Exception { | |||
createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unrelated change -> separate commit
925b074
to
22db506
Compare
No, under the legary Source Read API, splitting the source is a one-shot operation. |
22db506
to
d2e1af5
Compare
d2e1af5
to
960fe79
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #33606 +/- ##
=========================================
Coverage 57.38% 57.38%
Complexity 1474 1474
=========================================
Files 973 973
Lines 154624 154624
Branches 1076 1076
=========================================
Hits 88733 88733
Misses 63689 63689
Partials 2202 2202 ☔ View full report in Codecov by Sentry. |
Fixes #31313
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.