Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8780][RFC-83] Incremental Table Service #12601

Open
wants to merge 20 commits into
base: master
Choose a base branch
from

Conversation

zhangyue19921010
Copy link
Contributor

Change Logs

In Hudi, when scheduling Compaction and Clustering, the default behavior is to scan all partitions under the current table. When there are many historical partitions, such as 640,000 in our production environment, this scanning and planning operation becomes very inefficient. For Flink, it often leads to checkpoint timeouts, resulting in data delays.
As for cleaning, we already have the ability to do cleaning for incremental partitions.

This RFC will draw on the design of Incremental Clean to generalize the capability of processing incremental partitions to all table services, such as Clustering and Compaction.

Impact

compaction and clustering

Risk level (write none, low medium or high below)

medium

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Jan 8, 2025
@TheR1sing3un
Copy link
Member

@zhangyue19921010 Hi, judging from the rfc content, the goal this time is to use incremental compaction and clustering. Should we consider incorporating the incremental processing logic for clean as well? By the way, whether we can solve this problem: #11647 , it seems that we can solve it at the time of implementation.

@zhangyue19921010
Copy link
Contributor Author

Hi @TheR1sing3un , Thanks for your attention.

@zhangyue19921010 Hi, judging from the rfc content, the goal this time is to use incremental compaction and clustering. Should we consider incorporating the incremental processing logic for clean as well?

Sure thing we can build a unified incremental policy. But as we know, clean action is not a Strategy-Coding-Style-Action, even though we have many different cleaning strategies(clean by commits or clean by versions). So that we may need to reconstruct the clean plan phase and abstract it into different strategy objects. We can do it in the next PR.

By the way, whether we can solve this problem: #11647 , it seems that we can solve it at the time of implementation.

Unfortunately, this PR shouldn't solve the problem. This PR solves the problem of incremental processing, that is, how to process only incremental partitions next time after the last table service is completed. #11647 focuses on how to trigger the first action in an elegant way

.defaultValue(true)
.markAdvanced()
.sinceVersion("1.0.0")
.withDocumentation("Whether to enable incremental table service.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we elaborate the scope of the kinds of table services that are affected by this option.

enable -> enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed.

// get incremental partitions.
LOG.info("Start to fetch incremental partitions for " + type);
Set<String> incrementalPartitions = getIncrementalPartitions(type);
if (!incrementalPartitions.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why fall back to full partitions if the incremental partition list is empty?

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed.
For now, there are two situations fall back to get all partitions

  1. incremental partitions getting throws exceptions.
  2. last completed table service instant is archived.

AND
when incremental partition list is empty , we will skip current schedule.

Pair<Option<HoodieInstant>, List<String>> missingPair = fetchMissingPartitions(type);
if (!missingPair.getLeft().isPresent()) {
// Last complete table service commit maybe archived.
return Collections.emptySet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle the case the writer just commits the empty commits continuously? I think we still should trigger incremental partiton fetching. If the incremental partition list is empty we can just skip the scheduling?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make senses. changed
For now, there are two situations fall back to get all partitions

incremental partitions getting throws exceptions.
last completed table service instant is archived.
AND
when incremental partition list is empty , we will skip current schedule.

import java.util.List;

/**
* Marking strategy interface.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marking interface for table service srategy that utilitize incremental partitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed.

* Marking strategy interface.
*
* Any Strategy implement this `IncrementalPartitionAwareStrategy` could have the ability to perform incremental partitions processing.
* At this time, Incremental partitions should be passed to the current strategy.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<p> Any strategy class that implements this `IncrementalPartitionAwareStrategy` could have the ability to perform incremental partitions processing.
Currently, Incremental partitions will be passed to the strategy instance as a best-effort. In the following cases, the partitions would fallback to full partition list:

<ul>
  <li></li>
  ...
</ul>

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed.

/**
 * Marking interface for table service strategy that utilize incremental partitions.
 *
 * <p> Any strategy class that implements this `IncrementalPartitionAwareStrategy` could have the ability to perform incremental partitions processing.
 * Currently, Incremental partitions will be passed to the strategy instance as a best-effort. In the following cases, the partitions would fallback to full partition list:
 *
 * <ul>
 *   <li> Executing Table Service for the first time. </li>
 *   <li> The last completed table service instant is archived. </li>
 *   <li> Any exception thrown during retrieval of incremental partitions. </li>
 * </ul>
 */

/**
* Filter the given incremental partitions.
* @param writeConfig
* @param incrementalPartitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed.

@@ -42,13 +42,13 @@
*/
public class ClusteringPlanPartitionFilter {

public static List<String> filter(List<String> partitions, HoodieWriteConfig config) {
public static List<String> filter(List<String> partitions, HoodieWriteConfig config, ArrayList<String> missingPartitions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we return a Pair for both missing partitions and filtered partitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense.
changed.

@@ -128,7 +129,7 @@ protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String
.build();
}).collect(Collectors.toList()));
}
return ret.stream();
return Pair.of(ret.stream(), true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case the flag is false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This boolean value is used to indicate whether all candidate fileslice under the current partition have been processed. The external world uses this as a basis to determine whether the current partition is added to missingPartitions.

For BaseConsistentHashingBucketClusteringPlanStrategy it always return true.

For other PartitionAwareClusteringPlanStrategy like SparkSizeBasedClusteringPlanStrategy it may return false.
Such as , 100 fileSlices are passed in, but due to the limitation of writeConfig.getClusteringMaxNumGroups(), only 10 of them are processed. In this case, false should be returned.

@@ -55,7 +59,7 @@ public PartitionAwareClusteringPlanStrategy(HoodieTable table, HoodieEngineConte
/**
* Create Clustering group based on files eligible for clustering in the partition.
*/
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
protected Pair<Stream<HoodieClusteringGroup>, Boolean> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some doc on the boolean flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.

  /**
   * Create Clustering group based on files eligible for clustering in the partition.
   * return stream of HoodieClusteringGroup and boolean partial Scheduled indicating whether all given fileSlices in the current partition have been processed.
   * For example, if some file slices will not be processed due to writeConfig.getClusteringMaxNumGroups(), then return false
   */

@@ -68,6 +72,7 @@ protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String
- (o1.getBaseFile().isPresent() ? o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));

long totalSizeSoFar = 0;
boolean isAllSlicesIncluded = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isAllSlicesIncluded -> partitialScheduled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed.

protected abstract HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, List<HoodieCompactionOperation> operations);
protected abstract List<String> getPartitions();

protected abstract HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient metaClient, List<HoodieCompactionOperation> operations, Pair<List<String>,List<String>> partitionPair);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the missing partitions to generate the plan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we need to record this missing partitions(.setMissingSchedulePartitions(res)) in plan during getCompactionPlan function

@@ -46,7 +47,7 @@ public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeCon
targetIORemaining -= opIo;
finalOperations.add(op);
if (targetIORemaining <= 0) {
return finalOperations;
missingPartitions.add(op.getPartitionPath());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we do not modify the passed in collections.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some doc around the return values of #orderAndFilter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

// Strategy implementation can overload this method to set specific compactor-id
Set<String> missingPartitions = new HashSet<>(partitionPair.getRight());
List<HoodieCompactionOperation> operationsToProcess = orderAndFilter(writeConfig, operations, pendingCompactionPlans, missingPartitions);
List<String> res = writeConfig.isIncrementalTableServiceEnable() ? new ArrayList<>(missingPartitions) : new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe null as the default value to avoid deserialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chenged.

@github-actions github-actions bot added size:XL PR with lines of changes > 1000 and removed size:L PR with lines of changes in (300, 1000] labels Jan 13, 2025
@zhangyue19921010 zhangyue19921010 changed the title [HUDI-8780][RFC-83][WIP] Incremental Table Service [HUDI-8780][RFC-83] Incremental Table Service Jan 15, 2025
@zhangyue19921010
Copy link
Contributor Author

zhangyue19921010 commented Jan 15, 2025

Hi @danny0405 Thanks for your review. All comments are addressed.
Also Add several necessary UTs related. PTAL
Removed WIP.

@@ -808,6 +808,13 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Avro schema of the partial updates. This is automatically set by the "
+ "Hudi write client and user is not expected to manually change the value.");

public static final ConfigProperty<Boolean> INCREMENTAL_TABLE_SERVICE_ENABLE = ConfigProperty
.key("hoodie.incremental.tableservice.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this option exist for some corner cases, we should always try incremental scheduling I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incremental processing is used everywhere including UT. There is a switch here (default True) just in case. In some scenarios, users need full processing. Users can manually turn off the logic of incremental processing (similar to the switch between MDT and clean). )

if (completionTime.compareTo(leftBoundary) >= 0 && completionTime.compareTo(rightBoundary) < 0) {
HoodieCommitMetadata metadata = TimelineUtils.getCommitMetadata(instant, activeTimeline);
// ignore all the clustering operation for both mor and cow table
if (!metadata.getOperationType().equals(WriteOperationType.CLUSTER)) {
Copy link
Contributor

@danny0405 danny0405 Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why ignore cluster? Maybe we should ingore based on the passed in table service type.

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to ignore actually. Maybe filterCommitByTableType is good enough.


public Pair<Option<HoodieInstant>, List<String>> fetchMissingPartitions(TableServiceType tableServiceType) {
if (!config.isIncrementalTableServiceEnable()) {
return Pair.of(Option.empty(), new ArrayList<>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.emptyList?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

@@ -68,6 +75,7 @@ protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String
- (o1.getBaseFile().isPresent() ? o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));

long totalSizeSoFar = 0;
boolean partialScheduled = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value should be false I guess.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly Missing here. changed

List<FileSlice> fileSlicesEligible = getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList());
Pair<Stream<HoodieClusteringGroup>, Boolean> groupPair = buildClusteringGroupsForPartition(partitionPath, fileSlicesEligible);
List<HoodieClusteringGroup> clusteringGroupsPartition = groupPair.getLeft().collect(Collectors.toList());
Boolean allProcessed = groupPair.getRight();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partialScheduled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed. Sorry for missing this.

@@ -808,6 +808,13 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Avro schema of the partial updates. This is automatically set by the "
+ "Hudi write client and user is not expected to manually change the value.");

public static final ConfigProperty<Boolean> INCREMENTAL_TABLE_SERVICE_ENABLE = ConfigProperty
.key("hoodie.incremental.tableservice.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tableservice -> table.service to keep consistent with other table service conf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to hoodie.table.services.incremental.enabled

switch (table.getMetaClient().getTableType()) {
case MERGE_ON_READ: {
// for mor only take cares of delta commit and replace commit
Set<String> operations = CollectionUtils.createSet(DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better not create constant set per instant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed. Thanks

List<String> affectedPartitions2 = compactionPlan2.getOperations().stream()
.map(HoodieCompactionOperation::getPartitionPath).collect(Collectors.toList());
// compaction including 20250115 (fetched from recorded missing partitions)
assertTrue(affectedPartitions2.contains(partitions[0]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we test the fallback for archived commits?

}

@Test
public void testGetPartitionsFallbackToFullScan() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 This UT is test fall back to full scan when table service instant is archived.

}

@Test
public void testContinuousEmptyCommits() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testContinuousEmptyCommits

@@ -116,7 +116,7 @@ public Pair<Option<HoodieInstant>, Set<String>> getIncrementalPartitions(TableSe
String rightBoundary = instantTime;
// compute [leftBoundary, rightBoundary) as time window
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
Set<String> partitionsInCommitMeta = table.getActiveTimeline().filterCompletedInstants().getCommitsTimeline().getInstantsAsStream()
Set<String> partitionsInCommitMeta = table.getActiveTimeline().findInstantsAfter(leftBoundary).filterCompletedInstants().getCommitsTimeline().getInstantsAsStream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why filtering the instant time by leftBoundary?

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Jan 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 Nice catch Danny.
At first I just wanted to filter out the left boundary instant itself, but obviously I called the API findInstantsAfter by mistake. Here we should get and filter the entire active time line complete instants

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:XL PR with lines of changes > 1000
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants