diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java index c5d80d3ebe8a..5e5aa38df743 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java @@ -57,14 +57,11 @@ DatanodeDeletedBlockTransactions getTransactions( * considered to be failed if it has been sent more than MAX_RETRY limit * and its count is reset to -1. * - * @param batchSize Number of failed transactions to be processed in a batch. + * @param count Number of failed transactions to be returned. * @param startTxId The least transaction id to start with. * @return a list of failed deleted block transactions. * @throws IOException */ - List getFailedTransactionsBatch(int batchSize, - long startTxId) throws IOException; - List getFailedTransactions(int count, long startTxId) throws IOException; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java index 2ab0c4005145..a6045a34fd7b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java @@ -23,6 +23,7 @@ import java.util.UUID; import java.util.Set; import java.util.Map; +import java.util.HashSet; import java.util.ArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -121,58 +122,22 @@ public DeletedBlockLogImpl(ConfigurationSource conf, } @Override - public List getFailedTransactionsBatch( - int batchSize, long startTxId) throws IOException { - List failedTXs = new ArrayList<>(); - + public List getFailedTransactions(int count, + long startTxId) throws IOException { lock.lock(); try { + final List failedTXs = Lists.newArrayList(); try ( TableIterator> iter = deletedBlockLogStateManager.getReadOnlyIterator()) { - iter.seek(startTxId); - - while (iter.hasNext() && failedTXs.size() < batchSize) { + while (iter.hasNext() && failedTXs.size() < count) { DeletedBlocksTransaction delTX = iter.next().getValue(); - if (delTX.getCount() == -1) { + if (delTX.getCount() == -1 && delTX.getTxID() >= startTxId) { failedTXs.add(delTX); } } } - } finally { - lock.unlock(); - } - - return failedTXs; - } - - @Override - public List getFailedTransactions(int count, - long startTxId) throws IOException { - lock.lock(); - try { - final List failedTXs = Lists.newArrayList(); - try (TableIterator> iter = - deletedBlockLogStateManager.getReadOnlyIterator()) { - if (count == LIST_ALL_FAILED_TRANSACTIONS) { - while (iter.hasNext()) { - DeletedBlocksTransaction delTX = iter.next().getValue(); - if (delTX.getCount() == -1) { - failedTXs.add(delTX); - } - } - } else { - iter.seek(startTxId); - while (iter.hasNext() && failedTXs.size() < count) { - DeletedBlocksTransaction delTX = iter.next().getValue(); - if (delTX.getCount() == -1 && delTX.getTxID() >= startTxId) { - failedTXs.add(delTX); - } - } - } - } return failedTXs; } finally { lock.unlock(); @@ -212,7 +177,7 @@ public int resetCount(List txIDs) throws IOException { List batch; do { // Fetch the batch of failed transactions - batch = getFailedTransactionsBatch(batchSize, startTxId); + batch = getFailedTransactions(batchSize, startTxId); // If the batch is empty, skip further processing if (!batch.isEmpty()) { @@ -234,42 +199,23 @@ public int resetCount(List txIDs) throws IOException { // Update startTxId to continue from the last processed transaction in the next iteration startTxId = batch.get(batch.size() - 1).getTxID() + 1; } - } while (!batch.isEmpty()); } else { - // Process txIDs provided by the user in batches - for (List batch : partitionList(txIDs, batchSize)) { - lock.lock(); - try { - transactionStatusManager.resetRetryCount(batch); - int batchProcessed = deletedBlockLogStateManager.resetRetryCountOfTransactionInDB( - new ArrayList<>(batch)); - - totalProcessed += batchProcessed; - } finally { - lock.unlock(); - } + lock.lock(); + try { + transactionStatusManager.resetRetryCount(txIDs); + return deletedBlockLogStateManager.resetRetryCountOfTransactionInDB( + new ArrayList<>(new HashSet<>(txIDs))); + } finally { + lock.unlock(); } } - return totalProcessed; } catch (Exception e) { throw new IOException("Error during transaction reset", e); } } - - /** - * Helper method to partition a list into smaller batches. - */ - private List> partitionList(List list, int batchSize) { - List> partitions = new ArrayList<>(); - for (int i = 0; i < list.size(); i += batchSize) { - partitions.add(list.subList(i, Math.min(i + batchSize, list.size()))); - } - return partitions; - } - private DeletedBlocksTransaction constructNewTransaction( long txID, long containerID, List blocks) { return DeletedBlocksTransaction.newBuilder() diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 77cb1a263491..fde6ec07d9c8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionInfo; import org.apache.hadoop.hdds.protocol.proto.ReconfigureProtocolProtos.ReconfigureProtocolService; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusInfoResponseProto; import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmResponseProto; @@ -904,18 +905,19 @@ public List getFailedDeletedBlockTxn(int count, auditMap.put("count", String.valueOf(count)); auditMap.put("startTxId", String.valueOf(startTxId)); try { - result = scm.getScmBlockManager().getDeletedBlockLog() - .getFailedTransactions(count, startTxId).stream() - .map(DeletedBlocksTransactionInfoWrapper::fromTxn) - .collect(Collectors.toList()); + int transactionCount = (count == -1) ? Integer.MAX_VALUE : count; + List + failedTXs = scm.getScmBlockManager().getDeletedBlockLog() + .getFailedTransactions(transactionCount, startTxId); + result = + failedTXs.stream().map(DeletedBlocksTransactionInfoWrapper::fromTxn) + .collect(Collectors.toList()); AUDIT.logWriteSuccess(buildAuditMessageForSuccess( SCMAction.GET_FAILED_DELETED_BLOCKS_TRANSACTION, auditMap)); return result; } catch (IOException ex) { - AUDIT.logReadFailure( - buildAuditMessageForFailure( - SCMAction.GET_FAILED_DELETED_BLOCKS_TRANSACTION, auditMap, ex) - ); + AUDIT.logReadFailure(buildAuditMessageForFailure( + SCMAction.GET_FAILED_DELETED_BLOCKS_TRANSACTION, auditMap, ex)); throw ex; } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java index d8c6e8f0cbab..6842811207aa 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java @@ -344,7 +344,7 @@ public void testBlockDeletionTransactions() throws Exception { cluster.getStorageContainerManager().getScmHAManager() .asSCMHADBTransactionBuffer().flush(); } - return delLog.getFailedTransactionsBatch(2, 0).size() == 0; + return delLog.getFailedTransactions(Integer.MAX_VALUE, 0).size() == 0; } catch (IOException e) { return false; }