Skip to content

Commit

Permalink
Addressed Comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
Aryan Gupta committed Jan 10, 2025
1 parent 840a7d8 commit e2df43e
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,11 @@ DatanodeDeletedBlockTransactions getTransactions(
* considered to be failed if it has been sent more than MAX_RETRY limit
* and its count is reset to -1.
*
* @param batchSize Number of failed transactions to be processed in a batch.
* @param count Number of failed transactions to be returned.
* @param startTxId The least transaction id to start with.
* @return a list of failed deleted block transactions.
* @throws IOException
*/
List<DeletedBlocksTransaction> getFailedTransactionsBatch(int batchSize,
long startTxId) throws IOException;

List<DeletedBlocksTransaction> getFailedTransactions(int count,
long startTxId) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.UUID;
import java.util.Set;
import java.util.Map;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -121,58 +122,22 @@ public DeletedBlockLogImpl(ConfigurationSource conf,
}

@Override
public List<DeletedBlocksTransaction> getFailedTransactionsBatch(
int batchSize, long startTxId) throws IOException {
List<DeletedBlocksTransaction> failedTXs = new ArrayList<>();

public List<DeletedBlocksTransaction> getFailedTransactions(int count,
long startTxId) throws IOException {
lock.lock();
try {
final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
try (
TableIterator<Long, ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
deletedBlockLogStateManager.getReadOnlyIterator()) {

iter.seek(startTxId);

while (iter.hasNext() && failedTXs.size() < batchSize) {
while (iter.hasNext() && failedTXs.size() < count) {
DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() == -1) {
if (delTX.getCount() == -1 && delTX.getTxID() >= startTxId) {
failedTXs.add(delTX);
}
}
}
} finally {
lock.unlock();
}

return failedTXs;
}

@Override
public List<DeletedBlocksTransaction> getFailedTransactions(int count,
long startTxId) throws IOException {
lock.lock();
try {
final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
deletedBlockLogStateManager.getReadOnlyIterator()) {
if (count == LIST_ALL_FAILED_TRANSACTIONS) {
while (iter.hasNext()) {
DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() == -1) {
failedTXs.add(delTX);
}
}
} else {
iter.seek(startTxId);
while (iter.hasNext() && failedTXs.size() < count) {
DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() == -1 && delTX.getTxID() >= startTxId) {
failedTXs.add(delTX);
}
}
}
}
return failedTXs;
} finally {
lock.unlock();
Expand Down Expand Up @@ -212,7 +177,7 @@ public int resetCount(List<Long> txIDs) throws IOException {
List<DeletedBlocksTransaction> batch;
do {
// Fetch the batch of failed transactions
batch = getFailedTransactionsBatch(batchSize, startTxId);
batch = getFailedTransactions(batchSize, startTxId);

// If the batch is empty, skip further processing
if (!batch.isEmpty()) {
Expand All @@ -234,42 +199,23 @@ public int resetCount(List<Long> txIDs) throws IOException {
// Update startTxId to continue from the last processed transaction in the next iteration
startTxId = batch.get(batch.size() - 1).getTxID() + 1;
}

} while (!batch.isEmpty());
} else {
// Process txIDs provided by the user in batches
for (List<Long> batch : partitionList(txIDs, batchSize)) {
lock.lock();
try {
transactionStatusManager.resetRetryCount(batch);
int batchProcessed = deletedBlockLogStateManager.resetRetryCountOfTransactionInDB(
new ArrayList<>(batch));

totalProcessed += batchProcessed;
} finally {
lock.unlock();
}
lock.lock();
try {
transactionStatusManager.resetRetryCount(txIDs);
return deletedBlockLogStateManager.resetRetryCountOfTransactionInDB(
new ArrayList<>(new HashSet<>(txIDs)));
} finally {
lock.unlock();
}
}

return totalProcessed;
} catch (Exception e) {
throw new IOException("Error during transaction reset", e);
}
}


/**
* Helper method to partition a list into smaller batches.
*/
private <T> List<List<T>> partitionList(List<T> list, int batchSize) {
List<List<T>> partitions = new ArrayList<>();
for (int i = 0; i < list.size(); i += batchSize) {
partitions.add(list.subList(i, Math.min(i + batchSize, list.size())));
}
return partitions;
}

private DeletedBlocksTransaction constructNewTransaction(
long txID, long containerID, List<Long> blocks) {
return DeletedBlocksTransaction.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionInfo;
import org.apache.hadoop.hdds.protocol.proto.ReconfigureProtocolProtos.ReconfigureProtocolService;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusInfoResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmResponseProto;
Expand Down Expand Up @@ -904,18 +905,19 @@ public List<DeletedBlocksTransactionInfo> getFailedDeletedBlockTxn(int count,
auditMap.put("count", String.valueOf(count));
auditMap.put("startTxId", String.valueOf(startTxId));
try {
result = scm.getScmBlockManager().getDeletedBlockLog()
.getFailedTransactions(count, startTxId).stream()
.map(DeletedBlocksTransactionInfoWrapper::fromTxn)
.collect(Collectors.toList());
int transactionCount = (count == -1) ? Integer.MAX_VALUE : count;
List<StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>
failedTXs = scm.getScmBlockManager().getDeletedBlockLog()
.getFailedTransactions(transactionCount, startTxId);
result =
failedTXs.stream().map(DeletedBlocksTransactionInfoWrapper::fromTxn)
.collect(Collectors.toList());
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
SCMAction.GET_FAILED_DELETED_BLOCKS_TRANSACTION, auditMap));
return result;
} catch (IOException ex) {
AUDIT.logReadFailure(
buildAuditMessageForFailure(
SCMAction.GET_FAILED_DELETED_BLOCKS_TRANSACTION, auditMap, ex)
);
AUDIT.logReadFailure(buildAuditMessageForFailure(
SCMAction.GET_FAILED_DELETED_BLOCKS_TRANSACTION, auditMap, ex));
throw ex;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public void testBlockDeletionTransactions() throws Exception {
cluster.getStorageContainerManager().getScmHAManager()
.asSCMHADBTransactionBuffer().flush();
}
return delLog.getFailedTransactionsBatch(2, 0).size() == 0;
return delLog.getFailedTransactions(Integer.MAX_VALUE, 0).size() == 0;
} catch (IOException e) {
return false;
}
Expand Down

0 comments on commit e2df43e

Please sign in to comment.