diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index 37d9ad86e16d..a31da36ec33b 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -97,6 +97,10 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { Set manifestListsToDelete = Sets.newHashSet(); Set manifestsToDelete = Sets.newHashSet(); + Set metadataToDelete = Sets.newHashSet(); + Set statisticsToDelete = Sets.newHashSet(); + Set partitionStatsToDelete = Sets.newHashSet(); + for (Snapshot snapshot : metadata.snapshots()) { // add all manifests to the delete set because both data and delete files should be removed Iterables.addAll(manifestsToDelete, snapshot.allManifests(io)); @@ -108,6 +112,31 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { LOG.info("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete)); + // Collect all metadata files and extract historical statistics files + for (TableMetadata.MetadataLogEntry previousFile : metadata.previousFiles()) { + metadataToDelete.add(previousFile.file()); + + // Skip missing metadata files + if (!io.newInputFile(previousFile.file()).exists()) { + LOG.warn("Skipping missing metadata file: {}", previousFile.file()); + continue; + } + + TableMetadata previousMetadata = TableMetadataParser.read(io, previousFile.file()); + statisticsToDelete.addAll(previousMetadata.statisticsFiles()); + partitionStatsToDelete.addAll(previousMetadata.partitionStatisticsFiles()); + } + + // Process the latest metadata file + metadataToDelete.add(metadata.metadataFileLocation()); + if (io.newInputFile(metadata.metadataFileLocation()).exists()) { + TableMetadata latestMetadata = TableMetadataParser.read(io, metadata.metadataFileLocation()); + statisticsToDelete.addAll(latestMetadata.statisticsFiles()); + partitionStatsToDelete.addAll(latestMetadata.partitionStatisticsFiles()); + } else { + LOG.warn("Skipping missing latest metadata file: {}", metadata.metadataFileLocation()); + } + // run all of the deletes boolean gcEnabled = @@ -120,22 +149,14 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { deleteFiles(io, Iterables.transform(manifestsToDelete, ManifestFile::path), "manifest", true); deleteFiles(io, manifestListsToDelete, "manifest list", true); + deleteFiles(io, metadataToDelete, "metadata", true); deleteFiles( - io, - Iterables.transform(metadata.previousFiles(), TableMetadata.MetadataLogEntry::file), - "previous metadata", - true); - deleteFiles( - io, - Iterables.transform(metadata.statisticsFiles(), StatisticsFile::path), - "statistics", - true); + io, Iterables.transform(statisticsToDelete, StatisticsFile::path), "statistics", true); deleteFiles( io, - Iterables.transform(metadata.partitionStatisticsFiles(), PartitionStatisticsFile::path), + Iterables.transform(partitionStatsToDelete, PartitionStatisticsFile::path), "partition statistics", true); - deleteFile(io, metadata.metadataFileLocation(), "metadata"); } /** diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java index 6765b23d8ab8..e2c482fe899e 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestCatalogUtilDropTable.java @@ -41,6 +41,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.StatisticsFile; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableProperties; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.PositionOutputStream; @@ -82,8 +83,8 @@ public void dropTableDataDeletesExpectedFiles() throws IOException { Set manifestLocations = manifestLocations(snapshotSet, table.io()); Set dataLocations = dataLocations(snapshotSet, table.io()); Set metadataLocations = metadataLocations(tableMetadata); - Set statsLocations = statsLocations(tableMetadata); - Set partitionStatsLocations = partitionStatsLocations(tableMetadata); + Set statsLocations = statsLocations(tableMetadata, table.io()); + Set partitionStatsLocations = partitionStatsLocations(tableMetadata, table.io()); assertThat(manifestListLocations).as("should have 2 manifest lists").hasSize(2); assertThat(metadataLocations).as("should have 5 metadata locations").hasSize(5); @@ -129,6 +130,81 @@ public void dropTableDataDeletesExpectedFiles() throws IOException { .containsAll(partitionStatsLocations); } + @Test + public void dropTableDataDeletesAllPuffinFiles() throws IOException { + table.newFastAppend().appendFile(FILE_A).commit(); + + StatisticsFile oldStatisticsFile = + writeStatsFile( + table.currentSnapshot().snapshotId(), + table.currentSnapshot().sequenceNumber(), + tableLocation + "/metadata/" + UUID.randomUUID() + ".stats", + table.io()); + table + .updateStatistics() + .setStatistics(oldStatisticsFile.snapshotId(), oldStatisticsFile) + .commit(); + + StatisticsFile newStatisticsFile = + writeStatsFile( + table.currentSnapshot().snapshotId(), + table.currentSnapshot().sequenceNumber(), + tableLocation + "/metadata/" + UUID.randomUUID() + ".stats", + table.io()); + table + .updateStatistics() + .setStatistics(newStatisticsFile.snapshotId(), newStatisticsFile) + .commit(); + + table.newAppend().appendFile(FILE_B).commit(); + + PartitionStatisticsFile oldPartitionStatisticsFile = + writePartitionStatsFile( + table.currentSnapshot().snapshotId(), + tableLocation + "/metadata/" + UUID.randomUUID() + ".stats", + table.io()); + table.updatePartitionStatistics().setPartitionStatistics(oldPartitionStatisticsFile).commit(); + + PartitionStatisticsFile newPartitionStatisticsFile = + writePartitionStatsFile( + table.currentSnapshot().snapshotId(), + tableLocation + "/metadata/" + UUID.randomUUID() + ".stats", + table.io()); + table.updatePartitionStatistics().setPartitionStatistics(newPartitionStatisticsFile).commit(); + + TableMetadata tableMetadata = readMetadataVersion(7); + Set snapshotSet = Sets.newHashSet(table.snapshots()); + + Set statsLocations = statsLocations(tableMetadata, table.io()); + Set partitionStatsLocations = partitionStatsLocations(tableMetadata, table.io()); + + assertThat(statsLocations).as("should have 2 stats file").hasSize(2); + assertThat(partitionStatsLocations).as("should have 2 partition stats file").hasSize(2); + + FileIO fileIO = createMockFileIO(table.io()); + CatalogUtil.dropTableData(fileIO, tableMetadata); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); + + Mockito.verify( + fileIO, + Mockito.times( + manifestListLocations(snapshotSet).size() + + manifestLocations(snapshotSet, fileIO).size() + + dataLocations(snapshotSet, table.io()).size() + + metadataLocations(tableMetadata).size() + + statsLocations.size() + + partitionStatsLocations.size())) + .deleteFile(argumentCaptor.capture()); + List deletedPaths = argumentCaptor.getAllValues(); + + assertThat(deletedPaths) + .as("should contain all created statistics") + .containsAll(statsLocations); + assertThat(deletedPaths) + .as("should contain all created partition stats files") + .containsAll(partitionStatsLocations); + } + @Test public void dropTableDataDoNotThrowWhenDeletesFail() { table.newFastAppend().appendFile(FILE_A).commit(); @@ -234,10 +310,24 @@ private static Set metadataLocations(TableMetadata tableMetadata) { return metadataLocations; } - private static Set statsLocations(TableMetadata tableMetadata) { - return tableMetadata.statisticsFiles().stream() - .map(StatisticsFile::path) - .collect(Collectors.toSet()); + private static Set statsLocations(TableMetadata tableMetadata, FileIO fileIO) { + Set statsLocations = + tableMetadata.statisticsFiles().stream() + .map(StatisticsFile::path) + .collect(Collectors.toSet()); + + tableMetadata + .previousFiles() + .forEach( + previousMetadata -> { + TableMetadata oldMetadata = TableMetadataParser.read(fileIO, previousMetadata.file()); + statsLocations.addAll( + oldMetadata.statisticsFiles().stream() + .map(StatisticsFile::path) + .collect(Collectors.toSet())); + }); + + return statsLocations; } private static StatisticsFile writeStatsFile( @@ -281,9 +371,23 @@ private static PartitionStatisticsFile writePartitionStatsFile( .build(); } - private static Set partitionStatsLocations(TableMetadata tableMetadata) { - return tableMetadata.partitionStatisticsFiles().stream() - .map(PartitionStatisticsFile::path) - .collect(Collectors.toSet()); + private static Set partitionStatsLocations(TableMetadata tableMetadata, FileIO fileIO) { + Set partitionStatsLocations = + tableMetadata.partitionStatisticsFiles().stream() + .map(PartitionStatisticsFile::path) + .collect(Collectors.toSet()); + + tableMetadata + .previousFiles() + .forEach( + previousMetadata -> { + TableMetadata oldMetadata = TableMetadataParser.read(fileIO, previousMetadata.file()); + partitionStatsLocations.addAll( + oldMetadata.partitionStatisticsFiles().stream() + .map(PartitionStatisticsFile::path) + .collect(Collectors.toSet())); + }); + + return partitionStatsLocations; } }