-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Fix cleanup of orphaned statistics files in dropTableData #12132
base: main
Are you sure you want to change the base?
Changes from all commits
c29bd75
c2f0855
fee3cc1
7a32fea
521f612
fdbbcaf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -97,6 +97,10 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { | |
|
||
Set<String> manifestListsToDelete = Sets.newHashSet(); | ||
Set<ManifestFile> manifestsToDelete = Sets.newHashSet(); | ||
Set<String> metadataToDelete = Sets.newHashSet(); | ||
Set<StatisticsFile> statisticsToDelete = Sets.newHashSet(); | ||
Set<PartitionStatisticsFile> partitionStatsToDelete = Sets.newHashSet(); | ||
|
||
for (Snapshot snapshot : metadata.snapshots()) { | ||
// add all manifests to the delete set because both data and delete files should be removed | ||
Iterables.addAll(manifestsToDelete, snapshot.allManifests(io)); | ||
|
@@ -108,6 +112,31 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { | |
|
||
LOG.info("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete)); | ||
|
||
// Collect all metadata files and extract historical statistics files | ||
for (TableMetadata.MetadataLogEntry previousFile : metadata.previousFiles()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could be doing a lot of operations on the storage with this for loop in a sequential manner. We could use a Tasks.foreach() to do it in parallel. |
||
metadataToDelete.add(previousFile.file()); | ||
|
||
// Skip missing metadata files | ||
if (!io.newInputFile(previousFile.file()).exists()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it doesn't exist, no need to add to |
||
LOG.warn("Skipping missing metadata file: {}", previousFile.file()); | ||
continue; | ||
} | ||
|
||
TableMetadata previousMetadata = TableMetadataParser.read(io, previousFile.file()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we need the existence check on L120 at all. We could just call this read() and handle the exception if the file doesn't exist. This could reduce the number of operation made on the storage. |
||
statisticsToDelete.addAll(previousMetadata.statisticsFiles()); | ||
partitionStatsToDelete.addAll(previousMetadata.partitionStatisticsFiles()); | ||
} | ||
|
||
// Process the latest metadata file | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think instead of having dedicated code to take care of the latest metadata file, can't we simply initialize |
||
metadataToDelete.add(metadata.metadataFileLocation()); | ||
if (io.newInputFile(metadata.metadataFileLocation()).exists()) { | ||
TableMetadata latestMetadata = TableMetadataParser.read(io, metadata.metadataFileLocation()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it needed to do an IO here to read the latestMetadata? We already have it loaded in |
||
statisticsToDelete.addAll(latestMetadata.statisticsFiles()); | ||
partitionStatsToDelete.addAll(latestMetadata.partitionStatisticsFiles()); | ||
} else { | ||
LOG.warn("Skipping missing latest metadata file: {}", metadata.metadataFileLocation()); | ||
} | ||
|
||
// run all of the deletes | ||
|
||
boolean gcEnabled = | ||
|
@@ -120,22 +149,14 @@ public static void dropTableData(FileIO io, TableMetadata metadata) { | |
|
||
deleteFiles(io, Iterables.transform(manifestsToDelete, ManifestFile::path), "manifest", true); | ||
deleteFiles(io, manifestListsToDelete, "manifest list", true); | ||
deleteFiles(io, metadataToDelete, "metadata", true); | ||
deleteFiles( | ||
io, | ||
Iterables.transform(metadata.previousFiles(), TableMetadata.MetadataLogEntry::file), | ||
"previous metadata", | ||
true); | ||
deleteFiles( | ||
io, | ||
Iterables.transform(metadata.statisticsFiles(), StatisticsFile::path), | ||
"statistics", | ||
true); | ||
io, Iterables.transform(statisticsToDelete, StatisticsFile::path), "statistics", true); | ||
deleteFiles( | ||
io, | ||
Iterables.transform(metadata.partitionStatisticsFiles(), PartitionStatisticsFile::path), | ||
Iterables.transform(partitionStatsToDelete, PartitionStatisticsFile::path), | ||
"partition statistics", | ||
true); | ||
deleteFile(io, metadata.metadataFileLocation(), "metadata"); | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ | |
import org.apache.iceberg.Snapshot; | ||
import org.apache.iceberg.StatisticsFile; | ||
import org.apache.iceberg.TableMetadata; | ||
import org.apache.iceberg.TableMetadataParser; | ||
import org.apache.iceberg.TableProperties; | ||
import org.apache.iceberg.io.FileIO; | ||
import org.apache.iceberg.io.PositionOutputStream; | ||
|
@@ -82,8 +83,8 @@ public void dropTableDataDeletesExpectedFiles() throws IOException { | |
Set<String> manifestLocations = manifestLocations(snapshotSet, table.io()); | ||
Set<String> dataLocations = dataLocations(snapshotSet, table.io()); | ||
Set<String> metadataLocations = metadataLocations(tableMetadata); | ||
Set<String> statsLocations = statsLocations(tableMetadata); | ||
Set<String> partitionStatsLocations = partitionStatsLocations(tableMetadata); | ||
Set<String> statsLocations = statsLocations(tableMetadata, table.io()); | ||
Set<String> partitionStatsLocations = partitionStatsLocations(tableMetadata, table.io()); | ||
|
||
assertThat(manifestListLocations).as("should have 2 manifest lists").hasSize(2); | ||
assertThat(metadataLocations).as("should have 5 metadata locations").hasSize(5); | ||
|
@@ -129,6 +130,81 @@ public void dropTableDataDeletesExpectedFiles() throws IOException { | |
.containsAll(partitionStatsLocations); | ||
} | ||
|
||
@Test | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the review! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I might be wrong here but I don't think all the catalog implementations use this function to purge files. I haven't found any usage for RESTCatalog for instance. Adding this to CatalogTests would also result running this for non-affected catalog IMO. |
||
public void dropTableDataDeletesAllPuffinFiles() throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Puffin is used for deletion vectors in V3 sepc. It would be nice to rename the test to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I'll rename the test instead. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe |
||
table.newFastAppend().appendFile(FILE_A).commit(); | ||
|
||
StatisticsFile oldStatisticsFile = | ||
writeStatsFile( | ||
table.currentSnapshot().snapshotId(), | ||
table.currentSnapshot().sequenceNumber(), | ||
tableLocation + "/metadata/" + UUID.randomUUID() + ".stats", | ||
table.io()); | ||
table | ||
.updateStatistics() | ||
.setStatistics(oldStatisticsFile.snapshotId(), oldStatisticsFile) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function is deprecated. The other one without the snapshotId param should be used. This goes for other occurences below too. |
||
.commit(); | ||
|
||
StatisticsFile newStatisticsFile = | ||
writeStatsFile( | ||
table.currentSnapshot().snapshotId(), | ||
table.currentSnapshot().sequenceNumber(), | ||
tableLocation + "/metadata/" + UUID.randomUUID() + ".stats", | ||
table.io()); | ||
table | ||
.updateStatistics() | ||
.setStatistics(newStatisticsFile.snapshotId(), newStatisticsFile) | ||
.commit(); | ||
|
||
table.newAppend().appendFile(FILE_B).commit(); | ||
|
||
PartitionStatisticsFile oldPartitionStatisticsFile = | ||
writePartitionStatsFile( | ||
table.currentSnapshot().snapshotId(), | ||
tableLocation + "/metadata/" + UUID.randomUUID() + ".stats", | ||
table.io()); | ||
table.updatePartitionStatistics().setPartitionStatistics(oldPartitionStatisticsFile).commit(); | ||
|
||
PartitionStatisticsFile newPartitionStatisticsFile = | ||
writePartitionStatsFile( | ||
table.currentSnapshot().snapshotId(), | ||
tableLocation + "/metadata/" + UUID.randomUUID() + ".stats", | ||
table.io()); | ||
table.updatePartitionStatistics().setPartitionStatistics(newPartitionStatisticsFile).commit(); | ||
|
||
TableMetadata tableMetadata = readMetadataVersion(7); | ||
Set<Snapshot> snapshotSet = Sets.newHashSet(table.snapshots()); | ||
|
||
Set<String> statsLocations = statsLocations(tableMetadata, table.io()); | ||
Set<String> partitionStatsLocations = partitionStatsLocations(tableMetadata, table.io()); | ||
|
||
assertThat(statsLocations).as("should have 2 stats file").hasSize(2); | ||
assertThat(partitionStatsLocations).as("should have 2 partition stats file").hasSize(2); | ||
|
||
FileIO fileIO = createMockFileIO(table.io()); | ||
CatalogUtil.dropTableData(fileIO, tableMetadata); | ||
ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class); | ||
|
||
Mockito.verify( | ||
fileIO, | ||
Mockito.times( | ||
manifestListLocations(snapshotSet).size() | ||
+ manifestLocations(snapshotSet, fileIO).size() | ||
+ dataLocations(snapshotSet, table.io()).size() | ||
+ metadataLocations(tableMetadata).size() | ||
+ statsLocations.size() | ||
+ partitionStatsLocations.size())) | ||
.deleteFile(argumentCaptor.capture()); | ||
List<String> deletedPaths = argumentCaptor.getAllValues(); | ||
|
||
assertThat(deletedPaths) | ||
.as("should contain all created statistics") | ||
.containsAll(statsLocations); | ||
assertThat(deletedPaths) | ||
.as("should contain all created partition stats files") | ||
.containsAll(partitionStatsLocations); | ||
} | ||
|
||
@Test | ||
public void dropTableDataDoNotThrowWhenDeletesFail() { | ||
table.newFastAppend().appendFile(FILE_A).commit(); | ||
|
@@ -234,10 +310,24 @@ private static Set<String> metadataLocations(TableMetadata tableMetadata) { | |
return metadataLocations; | ||
} | ||
|
||
private static Set<String> statsLocations(TableMetadata tableMetadata) { | ||
return tableMetadata.statisticsFiles().stream() | ||
.map(StatisticsFile::path) | ||
.collect(Collectors.toSet()); | ||
private static Set<String> statsLocations(TableMetadata tableMetadata, FileIO fileIO) { | ||
Set<String> statsLocations = | ||
tableMetadata.statisticsFiles().stream() | ||
.map(StatisticsFile::path) | ||
.collect(Collectors.toSet()); | ||
|
||
tableMetadata | ||
.previousFiles() | ||
.forEach( | ||
previousMetadata -> { | ||
TableMetadata oldMetadata = TableMetadataParser.read(fileIO, previousMetadata.file()); | ||
statsLocations.addAll( | ||
oldMetadata.statisticsFiles().stream() | ||
.map(StatisticsFile::path) | ||
.collect(Collectors.toSet())); | ||
}); | ||
|
||
return statsLocations; | ||
} | ||
|
||
private static StatisticsFile writeStatsFile( | ||
|
@@ -281,9 +371,23 @@ private static PartitionStatisticsFile writePartitionStatsFile( | |
.build(); | ||
} | ||
|
||
private static Set<String> partitionStatsLocations(TableMetadata tableMetadata) { | ||
return tableMetadata.partitionStatisticsFiles().stream() | ||
.map(PartitionStatisticsFile::path) | ||
.collect(Collectors.toSet()); | ||
private static Set<String> partitionStatsLocations(TableMetadata tableMetadata, FileIO fileIO) { | ||
Set<String> partitionStatsLocations = | ||
tableMetadata.partitionStatisticsFiles().stream() | ||
.map(PartitionStatisticsFile::path) | ||
.collect(Collectors.toSet()); | ||
|
||
tableMetadata | ||
.previousFiles() | ||
.forEach( | ||
previousMetadata -> { | ||
TableMetadata oldMetadata = TableMetadataParser.read(fileIO, previousMetadata.file()); | ||
partitionStatsLocations.addAll( | ||
oldMetadata.partitionStatisticsFiles().stream() | ||
.map(PartitionStatisticsFile::path) | ||
.collect(Collectors.toSet())); | ||
}); | ||
|
||
return partitionStatsLocations; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found a
ReachableFileUtil.metadataFileLocations()
function that has a recursive mode that not just iterates thepreviousFiles()
but also looks into those metadata and theirpreviousFiles()
too. I wonder in what cases would it give different results for the recursive run compared to the non-recursive, but I guess it was implemented for some reason :)I think we should check if this iteration is enough here or if we should also do a recursive one to not miss any metadata locations.