From 4efc241d7a89f7736b79ef0d16deab796e9e05f5 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 8 Jan 2025 15:41:34 +0800 Subject: [PATCH] [MINOR] Avoid wrong way to get the latest completed instant (#12590) 1. avoid wrong way to get the latest completed instant Signed-off-by: TheR1sing3un --- .../apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java | 6 +----- .../apache/hudi/common/table/timeline/HoodieTimeline.java | 2 +- .../spark/sql/hudi/streaming/HoodieStreamSourceV1.scala | 6 +----- .../org/apache/hudi/sync/datahub/DataHubSyncClient.java | 8 +------- .../java/org/apache/hudi/hive/HoodieHiveSyncClient.java | 7 +------ .../test/java/org/apache/hudi/hive/TestHiveSyncTool.java | 4 +--- 6 files changed, 6 insertions(+), 27 deletions(-) diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java index 506023b22a7d3..829242c72db7e 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java @@ -896,11 +896,7 @@ public void updateLastCommitTimeSynced(String tableName) { HoodieTimeline activeTimeline = getActiveTimeline(); Option lastCommitSynced = activeTimeline.lastInstant().map(HoodieInstant::requestedTime); Option lastCommitCompletionSynced = activeTimeline - .getInstantsOrderedByCompletionTime() - .skip(activeTimeline.countInstants() - 1) - .findFirst() - .map(i -> Option.of(i.getCompletionTime())) - .orElse(Option.empty()); + .getLatestCompletionTime(); if (lastCommitSynced.isPresent()) { try { HashMap propertyMap = new HashMap<>(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index 33ea149e14857..cadf45574d48c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -405,7 +405,7 @@ public interface HoodieTimeline extends Serializable { Option getLatestCompletionTime(); /** - * Get the stream of instants in order by state transition timestamp of actions. + * Get the stream of instants in order by completion timestamp of actions. */ Stream getInstantsOrderedByCompletionTime(); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala index a43b77551a8d8..82a7e80465620 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV1.scala @@ -107,11 +107,7 @@ class HoodieStreamSourceV1(sqlContext: SQLContext, filteredTimeline match { case activeInstants if !activeInstants.empty() => val timestamp = if (hollowCommitHandling == USE_TRANSITION_TIME) { - activeInstants.getInstantsOrderedByCompletionTime - .skip(activeInstants.countInstants() - 1) - .findFirst() - .get() - .getCompletionTime + activeInstants.getLatestCompletionTime.get() } else { activeInstants.lastInstant().get().requestedTime() } diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java index 44ce04f59e68f..f9bb2ea271084 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java @@ -99,13 +99,7 @@ protected Option getLastCommitTime() { } protected Option getLastCommitCompletionTime() { - int countInstants = getActiveTimeline().countInstants(); - return getActiveTimeline() - .getInstantsOrderedByCompletionTime() - .skip(countInstants - 1) - .findFirst() - .map(HoodieInstant::getCompletionTime) - .map(Option::of).orElseGet(Option::empty); + return getActiveTimeline().getLatestCompletionTime(); } @Override diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index bd00773e9138a..797b20aea6904 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -404,12 +404,7 @@ public void updateLastCommitTimeSynced(String tableName) { // Set the last commit time and commit completion from the TBLproperties HoodieTimeline activeTimeline = getActiveTimeline(); Option lastCommitSynced = activeTimeline.lastInstant().map(HoodieInstant::requestedTime); - Option lastCommitCompletionSynced = activeTimeline - .getInstantsOrderedByCompletionTime() - .skip(activeTimeline.countInstants() - 1) - .findFirst() - .map(i -> Option.of(i.getCompletionTime())) - .orElse(Option.empty()); + Option lastCommitCompletionSynced = activeTimeline.getLatestCompletionTime(); if (lastCommitSynced.isPresent()) { try { Table table = client.getTable(databaseName, tableName); diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 9e420683e0cd7..11671b1e02b11 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -1979,9 +1979,7 @@ private void reSyncHiveTable() { } private String getLastCommitCompletionTimeSynced() { - return hiveClient.getActiveTimeline() - .getInstantsOrderedByCompletionTime() - .skip(hiveClient.getActiveTimeline().countInstants() - 1).findFirst().get().getCompletionTime(); + return hiveClient.getActiveTimeline().getLatestCompletionTime().get(); } private void reInitHiveSyncClient() {