Skip to content

Commit

Permalink
[HUDI-8590] FIx wrong file path for consistent-bucket-commit-marker-f…
Browse files Browse the repository at this point in the history
…ile (apache#12344)

* fix: wrong file path for consistent-bucket-commit-marker-file

1. wrong file path for consistent-bucket-commit-marker-file
2. return the max committed metadata file when no hash metadata file has been successfully fixed

Signed-off-by: TheR1sing3un <[email protected]>

* refactor: return the `maxCommittedMetadataFileOpt` directly instead of adding it to the fixed list.

1. return the `maxCommittedMetadataFileOpt` directly instead of adding it to the fixed list.

Signed-off-by: TheR1sing3un <[email protected]>

---------

Signed-off-by: TheR1sing3un <[email protected]>
  • Loading branch information
TheR1sing3un authored Dec 18, 2024
1 parent acc310e commit a1411a6
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ public static HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable t
public static Option<HoodieConsistentHashingMetadata> loadMetadata(HoodieTable table, String partition) {
HoodieTableMetaClient metaClient = table.getMetaClient();
StoragePath metadataPath = FSUtils.constructAbsolutePath(metaClient.getHashingMetadataPath(), partition);
StoragePath partitionPath = FSUtils.constructAbsolutePath(metaClient.getBasePath(), partition);
try {
Predicate<StoragePathInfo> hashingMetaCommitFilePredicate = pathInfo -> {
String filename = pathInfo.getPath().getName();
Expand Down Expand Up @@ -141,6 +140,15 @@ public static Option<HoodieConsistentHashingMetadata> loadMetadata(HoodieTable t

// fix the in-consistency between un-committed and committed hashing metadata files.
List<StoragePathInfo> fixed = new ArrayList<>();
Option<StoragePathInfo> maxCommittedMetadataFileOpt = Option.empty();
if (maxCommitMetaFileTs != null) {
maxCommittedMetadataFileOpt = Option.fromJavaOptional(hashingMetaFiles.stream().filter(hashingMetaFile -> {
String timestamp = getTimestampFromFile(hashingMetaFile.getPath().getName());
return maxCommitMetaFileTs.equals(timestamp);
}).findFirst());
ValidationUtils.checkState(maxCommittedMetadataFileOpt.isPresent(),
"Failed to find max committed metadata file but commit marker file exist with instant: " + maxCommittedMetadataFileOpt);
}
hashingMetaFiles.forEach(hashingMetaFile -> {
StoragePath path = hashingMetaFile.getPath();
String timestamp = HoodieConsistentHashingMetadata.getTimestampFromFile(path.getName());
Expand All @@ -152,7 +160,7 @@ public static Option<HoodieConsistentHashingMetadata> loadMetadata(HoodieTable t
if (isRehashingCommitted) {
if (!commitMetaTss.contains(timestamp)) {
try {
createCommitMarker(table, path, partitionPath);
createCommitMarker(table, path, metadataPath);
} catch (IOException e) {
throw new HoodieIOException("Exception while creating marker file " + path.getName() + " for partition " + partition, e);
}
Expand All @@ -162,8 +170,11 @@ public static Option<HoodieConsistentHashingMetadata> loadMetadata(HoodieTable t
fixed.add(hashingMetaFile);
}
});

return fixed.isEmpty() ? Option.empty() : loadMetadataFromGivenFile(table, fixed.get(fixed.size() - 1));
if (!fixed.isEmpty()) {
return loadMetadataFromGivenFile(table, fixed.get(fixed.size() - 1));
}
// we should return max committed metadata file if there is not any metadata file can be successfully fixed.
return maxCommittedMetadataFileOpt.isPresent() ? loadMetadataFromGivenFile(table, maxCommittedMetadataFileOpt.get()) : Option.empty();
} catch (FileNotFoundException e) {
return Option.empty();
} catch (IOException e) {
Expand Down Expand Up @@ -211,12 +222,12 @@ public static boolean saveMetadata(HoodieTable table, HoodieConsistentHashingMet
*
* @param table Hoodie table
* @param path File for which commit marker should be created
* @param partitionPath Partition path the file belongs to
* @param metadataPath Consistent-Bucket metadata path the file belongs to
* @throws IOException
*/
private static void createCommitMarker(HoodieTable table, StoragePath path, StoragePath partitionPath) throws IOException {
private static void createCommitMarker(HoodieTable table, StoragePath path, StoragePath metadataPath) throws IOException {
HoodieStorage storage = table.getStorage();
StoragePath fullPath = new StoragePath(partitionPath,
StoragePath fullPath = new StoragePath(metadataPath,
getTimestampFromFile(path.getName()) + HASHING_METADATA_COMMIT_FILE_SUFFIX);
if (storage.exists(fullPath)) {
return;
Expand Down Expand Up @@ -272,7 +283,7 @@ private static Option<HoodieConsistentHashingMetadata> loadMetadataFromGivenFile
* @return true if hashing metadata file is latest else false
*/
private static boolean recommitMetadataFile(HoodieTable table, StoragePathInfo metaFile, String partition) {
StoragePath partitionPath = FSUtils.constructAbsolutePath(table.getMetaClient().getBasePath(), partition);
StoragePath metadataPath = FSUtils.constructAbsolutePath(table.getMetaClient().getHashingMetadataPath(), partition);
String timestamp = getTimestampFromFile(metaFile.getPath().getName());
if (table.getPendingCommitsTimeline().containsInstant(timestamp)) {
return false;
Expand All @@ -290,7 +301,7 @@ private static boolean recommitMetadataFile(HoodieTable table, StoragePathInfo m
if (table.getBaseFileOnlyView().getLatestBaseFiles(partition)
.map(fileIdPrefix -> FSUtils.getFileIdPfxFromFileId(fileIdPrefix.getFileId())).anyMatch(hoodieFileGroupIdPredicate)) {
try {
createCommitMarker(table, metaFile.getPath(), partitionPath);
createCommitMarker(table, metaFile.getPath(), metadataPath);
return true;
} catch (IOException e) {
throw new HoodieIOException("Exception while creating marker file " + metaFile.getPath().getName() + " for partition " + partition, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,13 @@ public static void checkState(final boolean expression, String errorMessage) {
throw new IllegalStateException(errorMessage);
}
}

/**
* Ensures the truth of an expression, throwing the custom errorMessage otherwise.
*/
public static void checkState(final boolean expression, final Supplier<String> errorMessageSupplier) {
if (!expression) {
throw new IllegalArgumentException(errorMessageSupplier.get());
}
}
}

0 comments on commit a1411a6

Please sign in to comment.