Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Fix cleanup of orphaned statistics files in dropTableData #12132

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions core/src/main/java/org/apache/iceberg/CatalogUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ public static void dropTableData(FileIO io, TableMetadata metadata) {

Set<String> manifestListsToDelete = Sets.newHashSet();
Set<ManifestFile> manifestsToDelete = Sets.newHashSet();
Set<String> metadataToDelete = Sets.newHashSet();
Set<StatisticsFile> statisticsToDelete = Sets.newHashSet();
Set<PartitionStatisticsFile> partitionStatsToDelete = Sets.newHashSet();

for (Snapshot snapshot : metadata.snapshots()) {
// add all manifests to the delete set because both data and delete files should be removed
Iterables.addAll(manifestsToDelete, snapshot.allManifests(io));
Expand All @@ -108,6 +112,31 @@ public static void dropTableData(FileIO io, TableMetadata metadata) {

LOG.info("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete));

// Collect all metadata files and extract historical statistics files
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a ReachableFileUtil.metadataFileLocations() function that has a recursive mode that not just iterates the previousFiles() but also looks into those metadata and their previousFiles() too. I wonder in what cases would it give different results for the recursive run compared to the non-recursive, but I guess it was implemented for some reason :)
I think we should check if this iteration is enough here or if we should also do a recursive one to not miss any metadata locations.

for (TableMetadata.MetadataLogEntry previousFile : metadata.previousFiles()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could be doing a lot of operations on the storage with this for loop in a sequential manner. We could use a Tasks.foreach() to do it in parallel.

metadataToDelete.add(previousFile.file());

// Skip missing metadata files
if (!io.newInputFile(previousFile.file()).exists()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it doesn't exist, no need to add to metadataToDelete on L117. I'd move that line after this check.

LOG.warn("Skipping missing metadata file: {}", previousFile.file());
continue;
}

TableMetadata previousMetadata = TableMetadataParser.read(io, previousFile.file());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need the existence check on L120 at all. We could just call this read() and handle the exception if the file doesn't exist. This could reduce the number of operation made on the storage.

statisticsToDelete.addAll(previousMetadata.statisticsFiles());
partitionStatsToDelete.addAll(previousMetadata.partitionStatisticsFiles());
}

// Process the latest metadata file
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of having dedicated code to take care of the latest metadata file, can't we simply initialize metadataToDelete and the same for stats to have these instead of having this code here? (the existence check and io read probably isn't needed anyway. See my below comment.)

metadataToDelete.add(metadata.metadataFileLocation());
if (io.newInputFile(metadata.metadataFileLocation()).exists()) {
TableMetadata latestMetadata = TableMetadataParser.read(io, metadata.metadataFileLocation());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it needed to do an IO here to read the latestMetadata? We already have it loaded in metadata.

statisticsToDelete.addAll(latestMetadata.statisticsFiles());
partitionStatsToDelete.addAll(latestMetadata.partitionStatisticsFiles());
} else {
LOG.warn("Skipping missing latest metadata file: {}", metadata.metadataFileLocation());
}

// run all of the deletes

boolean gcEnabled =
Expand All @@ -120,22 +149,14 @@ public static void dropTableData(FileIO io, TableMetadata metadata) {

deleteFiles(io, Iterables.transform(manifestsToDelete, ManifestFile::path), "manifest", true);
deleteFiles(io, manifestListsToDelete, "manifest list", true);
deleteFiles(io, metadataToDelete, "metadata", true);
deleteFiles(
io,
Iterables.transform(metadata.previousFiles(), TableMetadata.MetadataLogEntry::file),
"previous metadata",
true);
deleteFiles(
io,
Iterables.transform(metadata.statisticsFiles(), StatisticsFile::path),
"statistics",
true);
io, Iterables.transform(statisticsToDelete, StatisticsFile::path), "statistics", true);
deleteFiles(
io,
Iterables.transform(metadata.partitionStatisticsFiles(), PartitionStatisticsFile::path),
Iterables.transform(partitionStatsToDelete, PartitionStatisticsFile::path),
"partition statistics",
true);
deleteFile(io, metadata.metadataFileLocation(), "metadata");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.PositionOutputStream;
Expand Down Expand Up @@ -82,8 +83,8 @@ public void dropTableDataDeletesExpectedFiles() throws IOException {
Set<String> manifestLocations = manifestLocations(snapshotSet, table.io());
Set<String> dataLocations = dataLocations(snapshotSet, table.io());
Set<String> metadataLocations = metadataLocations(tableMetadata);
Set<String> statsLocations = statsLocations(tableMetadata);
Set<String> partitionStatsLocations = partitionStatsLocations(tableMetadata);
Set<String> statsLocations = statsLocations(tableMetadata, table.io());
Set<String> partitionStatsLocations = partitionStatsLocations(tableMetadata, table.io());

assertThat(manifestListLocations).as("should have 2 manifest lists").hasSize(2);
assertThat(metadataLocations).as("should have 5 metadata locations").hasSize(5);
Expand Down Expand Up @@ -129,6 +130,81 @@ public void dropTableDataDeletesExpectedFiles() throws IOException {
.containsAll(partitionStatsLocations);
}

@Test
Copy link
Contributor

@ebyhr ebyhr Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestCatalogUtilDropTable exists under hadoop package. Can we move to CatalogTests or somewhere so we can test more catalogs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review!
Regarding the comment, I'm not sure if move to CatalogTests is really necessary since there isn't any catalog specific logic in dropTableData

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong here but I don't think all the catalog implementations use this function to purge files. I haven't found any usage for RESTCatalog for instance. Adding this to CatalogTests would also result running this for non-affected catalog IMO.

public void dropTableDataDeletesAllPuffinFiles() throws IOException {
Copy link
Contributor

@ebyhr ebyhr Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Puffin is used for deletion vectors in V3 sepc. It would be nice to rename the test to ...StatsFiles if it doesn't contain DV.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll rename the test instead.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe dropTableDataDeletesStatsFromAllMetadataFiles ?

table.newFastAppend().appendFile(FILE_A).commit();

StatisticsFile oldStatisticsFile =
writeStatsFile(
table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
tableLocation + "/metadata/" + UUID.randomUUID() + ".stats",
table.io());
table
.updateStatistics()
.setStatistics(oldStatisticsFile.snapshotId(), oldStatisticsFile)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is deprecated. The other one without the snapshotId param should be used. This goes for other occurences below too.

.commit();

StatisticsFile newStatisticsFile =
writeStatsFile(
table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
tableLocation + "/metadata/" + UUID.randomUUID() + ".stats",
table.io());
table
.updateStatistics()
.setStatistics(newStatisticsFile.snapshotId(), newStatisticsFile)
.commit();

table.newAppend().appendFile(FILE_B).commit();

PartitionStatisticsFile oldPartitionStatisticsFile =
writePartitionStatsFile(
table.currentSnapshot().snapshotId(),
tableLocation + "/metadata/" + UUID.randomUUID() + ".stats",
table.io());
table.updatePartitionStatistics().setPartitionStatistics(oldPartitionStatisticsFile).commit();

PartitionStatisticsFile newPartitionStatisticsFile =
writePartitionStatsFile(
table.currentSnapshot().snapshotId(),
tableLocation + "/metadata/" + UUID.randomUUID() + ".stats",
table.io());
table.updatePartitionStatistics().setPartitionStatistics(newPartitionStatisticsFile).commit();

TableMetadata tableMetadata = readMetadataVersion(7);
Set<Snapshot> snapshotSet = Sets.newHashSet(table.snapshots());

Set<String> statsLocations = statsLocations(tableMetadata, table.io());
Set<String> partitionStatsLocations = partitionStatsLocations(tableMetadata, table.io());

assertThat(statsLocations).as("should have 2 stats file").hasSize(2);
assertThat(partitionStatsLocations).as("should have 2 partition stats file").hasSize(2);

FileIO fileIO = createMockFileIO(table.io());
CatalogUtil.dropTableData(fileIO, tableMetadata);
ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);

Mockito.verify(
fileIO,
Mockito.times(
manifestListLocations(snapshotSet).size()
+ manifestLocations(snapshotSet, fileIO).size()
+ dataLocations(snapshotSet, table.io()).size()
+ metadataLocations(tableMetadata).size()
+ statsLocations.size()
+ partitionStatsLocations.size()))
.deleteFile(argumentCaptor.capture());
List<String> deletedPaths = argumentCaptor.getAllValues();

assertThat(deletedPaths)
.as("should contain all created statistics")
.containsAll(statsLocations);
assertThat(deletedPaths)
.as("should contain all created partition stats files")
.containsAll(partitionStatsLocations);
}

@Test
public void dropTableDataDoNotThrowWhenDeletesFail() {
table.newFastAppend().appendFile(FILE_A).commit();
Expand Down Expand Up @@ -234,10 +310,24 @@ private static Set<String> metadataLocations(TableMetadata tableMetadata) {
return metadataLocations;
}

private static Set<String> statsLocations(TableMetadata tableMetadata) {
return tableMetadata.statisticsFiles().stream()
.map(StatisticsFile::path)
.collect(Collectors.toSet());
private static Set<String> statsLocations(TableMetadata tableMetadata, FileIO fileIO) {
Set<String> statsLocations =
tableMetadata.statisticsFiles().stream()
.map(StatisticsFile::path)
.collect(Collectors.toSet());

tableMetadata
.previousFiles()
.forEach(
previousMetadata -> {
TableMetadata oldMetadata = TableMetadataParser.read(fileIO, previousMetadata.file());
statsLocations.addAll(
oldMetadata.statisticsFiles().stream()
.map(StatisticsFile::path)
.collect(Collectors.toSet()));
});

return statsLocations;
}

private static StatisticsFile writeStatsFile(
Expand Down Expand Up @@ -281,9 +371,23 @@ private static PartitionStatisticsFile writePartitionStatsFile(
.build();
}

private static Set<String> partitionStatsLocations(TableMetadata tableMetadata) {
return tableMetadata.partitionStatisticsFiles().stream()
.map(PartitionStatisticsFile::path)
.collect(Collectors.toSet());
private static Set<String> partitionStatsLocations(TableMetadata tableMetadata, FileIO fileIO) {
Set<String> partitionStatsLocations =
tableMetadata.partitionStatisticsFiles().stream()
.map(PartitionStatisticsFile::path)
.collect(Collectors.toSet());

tableMetadata
.previousFiles()
.forEach(
previousMetadata -> {
TableMetadata oldMetadata = TableMetadataParser.read(fileIO, previousMetadata.file());
partitionStatsLocations.addAll(
oldMetadata.partitionStatisticsFiles().stream()
.map(PartitionStatisticsFile::path)
.collect(Collectors.toSet()));
});

return partitionStatsLocations;
}
}