-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8780][RFC-83] Incremental Table Service #12601
base: master
Are you sure you want to change the base?
Conversation
@zhangyue19921010 Hi, judging from the rfc content, the goal this time is to use incremental |
Hi @TheR1sing3un , Thanks for your attention.
Sure thing we can build a unified incremental policy. But as we know, clean action is not a Strategy-Coding-Style-Action, even though we have many different cleaning strategies(clean by commits or clean by versions). So that we may need to reconstruct the clean plan phase and abstract it into different strategy objects. We can do it in the next PR.
Unfortunately, this PR shouldn't solve the problem. This PR solves the problem of incremental processing, that is, how to process only incremental partitions next time after the last table service is completed. #11647 focuses on how to trigger the first action in an elegant way |
.defaultValue(true) | ||
.markAdvanced() | ||
.sinceVersion("1.0.0") | ||
.withDocumentation("Whether to enable incremental table service."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we elaborate the scope of the kinds of table services that are affected by this option.
enable -> enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
// get incremental partitions. | ||
LOG.info("Start to fetch incremental partitions for " + type); | ||
Set<String> incrementalPartitions = getIncrementalPartitions(type); | ||
if (!incrementalPartitions.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why fall back to full partitions if the incremental partition list is empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
For now, there are two situations fall back to get all partitions
- incremental partitions getting throws exceptions.
- last completed table service instant is archived.
AND
when incremental partition list is empty , we will skip current schedule.
Pair<Option<HoodieInstant>, List<String>> missingPair = fetchMissingPartitions(type); | ||
if (!missingPair.getLeft().isPresent()) { | ||
// Last complete table service commit maybe archived. | ||
return Collections.emptySet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we handle the case the writer just commits the empty commits continuously? I think we still should trigger incremental partiton fetching. If the incremental partition list is empty we can just skip the scheduling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make senses. changed
For now, there are two situations fall back to get all partitions
incremental partitions getting throws exceptions.
last completed table service instant is archived.
AND
when incremental partition list is empty , we will skip current schedule.
import java.util.List; | ||
|
||
/** | ||
* Marking strategy interface. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Marking interface for table service srategy that utilitize incremental partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
* Marking strategy interface. | ||
* | ||
* Any Strategy implement this `IncrementalPartitionAwareStrategy` could have the ability to perform incremental partitions processing. | ||
* At this time, Incremental partitions should be passed to the current strategy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<p> Any strategy class that implements this `IncrementalPartitionAwareStrategy` could have the ability to perform incremental partitions processing.
Currently, Incremental partitions will be passed to the strategy instance as a best-effort. In the following cases, the partitions would fallback to full partition list:
<ul>
<li></li>
...
</ul>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
/**
* Marking interface for table service strategy that utilize incremental partitions.
*
* <p> Any strategy class that implements this `IncrementalPartitionAwareStrategy` could have the ability to perform incremental partitions processing.
* Currently, Incremental partitions will be passed to the strategy instance as a best-effort. In the following cases, the partitions would fallback to full partition list:
*
* <ul>
* <li> Executing Table Service for the first time. </li>
* <li> The last completed table service instant is archived. </li>
* <li> Any exception thrown during retrieval of incremental partitions. </li>
* </ul>
*/
/** | ||
* Filter the given incremental partitions. | ||
* @param writeConfig | ||
* @param incrementalPartitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
@@ -42,13 +42,13 @@ | |||
*/ | |||
public class ClusteringPlanPartitionFilter { | |||
|
|||
public static List<String> filter(List<String> partitions, HoodieWriteConfig config) { | |||
public static List<String> filter(List<String> partitions, HoodieWriteConfig config, ArrayList<String> missingPartitions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we return a Pair for both missing partitions and filtered partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sense.
changed.
@@ -128,7 +129,7 @@ protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String | |||
.build(); | |||
}).collect(Collectors.toList())); | |||
} | |||
return ret.stream(); | |||
return Pair.of(ret.stream(), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which case the flag is false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This boolean value is used to indicate whether all candidate fileslice under the current partition have been processed. The external world uses this as a basis to determine whether the current partition is added to missingPartitions.
For BaseConsistentHashingBucketClusteringPlanStrategy
it always return true.
For other PartitionAwareClusteringPlanStrategy
like SparkSizeBasedClusteringPlanStrategy
it may return false.
Such as , 100 fileSlices are passed in, but due to the limitation of writeConfig.getClusteringMaxNumGroups()
, only 10 of them are processed. In this case, false should be returned.
@@ -55,7 +59,7 @@ public PartitionAwareClusteringPlanStrategy(HoodieTable table, HoodieEngineConte | |||
/** | |||
* Create Clustering group based on files eligible for clustering in the partition. | |||
*/ | |||
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) { | |||
protected Pair<Stream<HoodieClusteringGroup>, Boolean> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some doc on the boolean flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added.
/**
* Create Clustering group based on files eligible for clustering in the partition.
* return stream of HoodieClusteringGroup and boolean partial Scheduled indicating whether all given fileSlices in the current partition have been processed.
* For example, if some file slices will not be processed due to writeConfig.getClusteringMaxNumGroups(), then return false
*/
@@ -68,6 +72,7 @@ protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String | |||
- (o1.getBaseFile().isPresent() ? o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize()))); | |||
|
|||
long totalSizeSoFar = 0; | |||
boolean isAllSlicesIncluded = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isAllSlicesIncluded -> partitialScheduled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
protected abstract HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, List<HoodieCompactionOperation> operations); | ||
protected abstract List<String> getPartitions(); | ||
|
||
protected abstract HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, List<HoodieCompactionOperation> operations, Pair<List<String>,List<String>> partitionPair); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the missing partitions to generate the plan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because we need to record this missing partitions(.setMissingSchedulePartitions(res)
) in plan during getCompactionPlan function
@@ -46,7 +47,7 @@ public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeCon | |||
targetIORemaining -= opIo; | |||
finalOperations.add(op); | |||
if (targetIORemaining <= 0) { | |||
return finalOperations; | |||
missingPartitions.add(op.getPartitionPath()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually we do not modify the passed in collections.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some doc around the return values of #orderAndFilter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
// Strategy implementation can overload this method to set specific compactor-id | ||
Set<String> missingPartitions = new HashSet<>(partitionPair.getRight()); | ||
List<HoodieCompactionOperation> operationsToProcess = orderAndFilter(writeConfig, operations, pendingCompactionPlans, missingPartitions); | ||
List<String> res = writeConfig.isIncrementalTableServiceEnable() ? new ArrayList<>(missingPartitions) : new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe null as the default value to avoid deserialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chenged.
Hi @danny0405 Thanks for your review. All comments are addressed. |
@@ -808,6 +808,13 @@ public class HoodieWriteConfig extends HoodieConfig { | |||
.withDocumentation("Avro schema of the partial updates. This is automatically set by the " | |||
+ "Hudi write client and user is not expected to manually change the value."); | |||
|
|||
public static final ConfigProperty<Boolean> INCREMENTAL_TABLE_SERVICE_ENABLE = ConfigProperty | |||
.key("hoodie.incremental.tableservice.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this option exist for some corner cases, we should always try incremental scheduling I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incremental processing is used everywhere including UT. There is a switch here (default True) just in case. In some scenarios, users need full processing. Users can manually turn off the logic of incremental processing (similar to the switch between MDT and clean). )
if (completionTime.compareTo(leftBoundary) >= 0 && completionTime.compareTo(rightBoundary) < 0) { | ||
HoodieCommitMetadata metadata = TimelineUtils.getCommitMetadata(instant, activeTimeline); | ||
// ignore all the clustering operation for both mor and cow table | ||
if (!metadata.getOperationType().equals(WriteOperationType.CLUSTER)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why ignore cluster? Maybe we should ingore based on the passed in table service type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to ignore actually. Maybe filterCommitByTableType
is good enough.
|
||
public Pair<Option<HoodieInstant>, List<String>> fetchMissingPartitions(TableServiceType tableServiceType) { | ||
if (!config.isIncrementalTableServiceEnable()) { | ||
return Pair.of(Option.empty(), new ArrayList<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collections.emptyList?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
@@ -68,6 +75,7 @@ protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String | |||
- (o1.getBaseFile().isPresent() ? o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize()))); | |||
|
|||
long totalSizeSoFar = 0; | |||
boolean partialScheduled = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value should be false I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exactly Missing here. changed
List<FileSlice> fileSlicesEligible = getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList()); | ||
Pair<Stream<HoodieClusteringGroup>, Boolean> groupPair = buildClusteringGroupsForPartition(partitionPath, fileSlicesEligible); | ||
List<HoodieClusteringGroup> clusteringGroupsPartition = groupPair.getLeft().collect(Collectors.toList()); | ||
Boolean allProcessed = groupPair.getRight(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partialScheduled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed. Sorry for missing this.
@@ -808,6 +808,13 @@ public class HoodieWriteConfig extends HoodieConfig { | |||
.withDocumentation("Avro schema of the partial updates. This is automatically set by the " | |||
+ "Hudi write client and user is not expected to manually change the value."); | |||
|
|||
public static final ConfigProperty<Boolean> INCREMENTAL_TABLE_SERVICE_ENABLE = ConfigProperty | |||
.key("hoodie.incremental.tableservice.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tableservice -> table.service to keep consistent with other table service conf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to hoodie.table.services.incremental.enabled
switch (table.getMetaClient().getTableType()) { | ||
case MERGE_ON_READ: { | ||
// for mor only take cares of delta commit and replace commit | ||
Set<String> operations = CollectionUtils.createSet(DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better not create constant set per instant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed. Thanks
List<String> affectedPartitions2 = compactionPlan2.getOperations().stream() | ||
.map(HoodieCompactionOperation::getPartitionPath).collect(Collectors.toList()); | ||
// compaction including 20250115 (fetched from recorded missing partitions) | ||
assertTrue(affectedPartitions2.contains(partitions[0])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we test the fallback for archived commits?
} | ||
|
||
@Test | ||
public void testGetPartitionsFallbackToFullScan() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 This UT is test fall back to full scan when table service instant is archived.
} | ||
|
||
@Test | ||
public void testContinuousEmptyCommits() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testContinuousEmptyCommits
@@ -116,7 +116,7 @@ public Pair<Option<HoodieInstant>, Set<String>> getIncrementalPartitions(TableSe | |||
String rightBoundary = instantTime; | |||
// compute [leftBoundary, rightBoundary) as time window | |||
HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); | |||
Set<String> partitionsInCommitMeta = table.getActiveTimeline().filterCompletedInstants().getCommitsTimeline().getInstantsAsStream() | |||
Set<String> partitionsInCommitMeta = table.getActiveTimeline().findInstantsAfter(leftBoundary).filterCompletedInstants().getCommitsTimeline().getInstantsAsStream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why filtering the instant time by leftBoundary
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 Nice catch Danny.
At first I just wanted to filter out the left boundary instant itself, but obviously I called the API findInstantsAfter
by mistake. Here we should get and filter the entire active time line complete instants
Change Logs
In Hudi, when scheduling Compaction and Clustering, the default behavior is to scan all partitions under the current table. When there are many historical partitions, such as 640,000 in our production environment, this scanning and planning operation becomes very inefficient. For Flink, it often leads to checkpoint timeouts, resulting in data delays.
As for cleaning, we already have the ability to do cleaning for incremental partitions.
This RFC will draw on the design of Incremental Clean to generalize the capability of processing incremental partitions to all table services, such as Clustering and Compaction.
Impact
compaction and clustering
Risk level (write none, low medium or high below)
medium
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist