Skip to content

Commit

Permalink
[fix](hive)Clear processed tasks (#45309)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Problem Summary:

Clear processed tasks, or it will be executed twice.
  • Loading branch information
wuwenchi authored and Your Name committed Dec 12, 2024
1 parent 8217371 commit b02e525
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,11 @@ public List<HivePartitionWithStatistics> getPartitions() {
return partitions;
}

public void clear() {
partitions.clear();
createdPartitionValues.clear();
}

public void addPartition(HivePartitionWithStatistics partition) {
partitions.add(partition);
}
Expand Down Expand Up @@ -1136,6 +1141,7 @@ private void undoUpdateStatisticsTasks() {
for (CompletableFuture<?> undoUpdateFuture : undoUpdateFutures.build()) {
MoreFutures.getFutureValue(undoUpdateFuture);
}
updateStatisticsTasks.clear();
}

private void undoAddPartitionsTask() {
Expand All @@ -1150,6 +1156,7 @@ private void undoAddPartitionsTask() {
LOG.warn("Failed to rollback: add_partition for partition values {}.{}",
tableInfo, rollbackFailedPartitions);
}
addPartitionsTask.clear();
}

private void waitForAsyncFileSystemTaskSuppressThrowable() {
Expand All @@ -1162,6 +1169,7 @@ private void waitForAsyncFileSystemTaskSuppressThrowable() {
// ignore
}
}
asyncFileSystemTaskFutures.clear();
}

public void prepareInsertExistingTable(SimpleTableInfo tableInfo, TableAndMore tableAndMore) {
Expand Down Expand Up @@ -1312,6 +1320,7 @@ private void runDirectoryClearUpTasksForAbort() {
for (DirectoryCleanUpTask cleanUpTask : directoryCleanUpTasksForAbort) {
recursiveDeleteItems(cleanUpTask.getPath(), cleanUpTask.isDeleteEmptyDir(), false);
}
directoryCleanUpTasksForAbort.clear();
}

private void runRenameDirTasksForAbort() {
Expand All @@ -1327,6 +1336,7 @@ private void runRenameDirTasksForAbort() {
}
}
}
renameDirectoryTasksForAbort.clear();
}

private void runClearPathsForFinish() {
Expand Down Expand Up @@ -1479,6 +1489,7 @@ private void abortMultiUploads() {
.build());
}, fileSystemExecutor));
}
uncompletedMpuPendingUploads.clear();
}

public void doNothing() {
Expand Down Expand Up @@ -1513,6 +1524,7 @@ public void rollback() {
for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
MoreFutures.getFutureValue(future, RuntimeException.class);
}
asyncFileSystemTaskFutures.clear();
}

public void shutdownExecutorService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,5 +675,43 @@ public void testRollbackNewPartitionForPartitionedTableWithNewAppendPartition()
Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a"));
assertNumRows(3, pa);
}
}

@Test
public void testCommitWithRollback() {
genQueryID();
List<THivePartitionUpdate> pus = new ArrayList<>();
try {
pus.add(createRandomAppend(null));
pus.add(createRandomAppend(null));
pus.add(createRandomAppend(null));
} catch (Throwable t) {
Assert.fail();
}

mockDoOther(() -> {
Table table = hmsClient.getTable(dbName, tbWithoutPartition);
assertNumRows(3, table);
});

HMSTransaction hmsTransaction = new HMSTransaction(hmsOps, fileSystemProvider, fileSystemExecutor);
try {
hmsTransaction.setHivePartitionUpdates(pus);
HiveInsertCommandContext ctx = new HiveInsertCommandContext();
String queryId = DebugUtil.printId(ConnectContext.get().queryId());
ctx.setQueryId(queryId);
ctx.setWritePath(getWritePath());
hmsTransaction.beginInsertTable(ctx);
hmsTransaction.finishInsertTable(new SimpleTableInfo(dbName, tbWithoutPartition));
hmsTransaction.commit();
Assert.fail();
} catch (Throwable t) {
Assert.assertTrue(t.getMessage().contains("failed to do nothing"));
}

try {
hmsTransaction.rollback();
} catch (Throwable t) {
Assert.fail();
}
}
}

0 comments on commit b02e525

Please sign in to comment.