Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11714. resetDeletedBlockRetryCount with --all may fail and can cause long db lock in large cluster #7665

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ DatanodeDeletedBlockTransactions getTransactions(
throws IOException;

/**
* Return the failed transactions in the log. A transaction is
* Return the failed transactions in batches in the log. A transaction is
* considered to be failed if it has been sent more than MAX_RETRY limit
* and its count is reset to -1.
*
* @param count Maximum num of returned transactions, if < 0. return all.
* @param count Number of failed transactions to be returned.
* @param startTxId The least transaction id to start with.
* @return a list of failed deleted block transactions.
* @throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@

import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.Set;
import java.util.Map;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
Expand All @@ -53,7 +54,6 @@
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;

import com.google.common.collect.Lists;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY_DEFAULT;
import static org.apache.hadoop.hdds.scm.block.SCMDeletedBlockTransactionStatusManager.SCMDeleteBlocksCommandStatusManager.CmdStatus;
Expand Down Expand Up @@ -127,23 +127,14 @@ public List<DeletedBlocksTransaction> getFailedTransactions(int count,
lock.lock();
try {
final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
deletedBlockLogStateManager.getReadOnlyIterator()) {
if (count == LIST_ALL_FAILED_TRANSACTIONS) {
while (iter.hasNext()) {
DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() == -1) {
failedTXs.add(delTX);
}
}
} else {
iter.seek(startTxId);
while (iter.hasNext() && failedTXs.size() < count) {
DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() == -1 && delTX.getTxID() >= startTxId) {
failedTXs.add(delTX);
}
try (
TableIterator<Long, ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
deletedBlockLogStateManager.getReadOnlyIterator()) {
iter.seek(startTxId);
while (iter.hasNext() && failedTXs.size() < count) {
DeletedBlocksTransaction delTX = iter.next().getValue();
if (delTX.getCount() == -1 && delTX.getTxID() >= startTxId) {
failedTXs.add(delTX);
}
}
}
Expand Down Expand Up @@ -176,18 +167,52 @@ public void incrementCount(List<Long> txIDs)
*/
@Override
public int resetCount(List<Long> txIDs) throws IOException {
lock.lock();
final int batchSize = 100;
int totalProcessed = 0;
long startTxId = 0;

try {
// If txIDs are not provided, fetch all failed transactions in batches
if (txIDs == null || txIDs.isEmpty()) {
txIDs = getFailedTransactions(LIST_ALL_FAILED_TRANSACTIONS, 0).stream()
.map(DeletedBlocksTransaction::getTxID)
.collect(Collectors.toList());
List<DeletedBlocksTransaction> batch;
do {
// Fetch the batch of failed transactions
batch = getFailedTransactions(batchSize, startTxId);

// If the batch is empty, skip further processing
if (!batch.isEmpty()) {
List<Long> batchTxIDs = batch.stream()
.map(DeletedBlocksTransaction::getTxID)
.collect(Collectors.toList());

lock.lock();
try {
transactionStatusManager.resetRetryCount(batchTxIDs);
int batchProcessed = deletedBlockLogStateManager.resetRetryCountOfTransactionInDB(
new ArrayList<>(batchTxIDs));

totalProcessed += batchProcessed;
} finally {
lock.unlock();
}

// Update startTxId to continue from the last processed transaction in the next iteration
startTxId = batch.get(batch.size() - 1).getTxID() + 1;
}
} while (!batch.isEmpty());
} else {
lock.lock();
try {
transactionStatusManager.resetRetryCount(txIDs);
return deletedBlockLogStateManager.resetRetryCountOfTransactionInDB(
new ArrayList<>(new HashSet<>(txIDs)));
} finally {
lock.unlock();
}
}
transactionStatusManager.resetRetryCount(txIDs);
return deletedBlockLogStateManager.resetRetryCountOfTransactionInDB(
new ArrayList<>(new HashSet<>(txIDs)));
} finally {
lock.unlock();
return totalProcessed;
} catch (Exception e) {
throw new IOException("Error during transaction reset", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionInfo;
import org.apache.hadoop.hdds.protocol.proto.ReconfigureProtocolProtos.ReconfigureProtocolService;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerBalancerStatusInfoResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DecommissionScmResponseProto;
Expand Down Expand Up @@ -904,18 +905,19 @@ public List<DeletedBlocksTransactionInfo> getFailedDeletedBlockTxn(int count,
auditMap.put("count", String.valueOf(count));
auditMap.put("startTxId", String.valueOf(startTxId));
try {
result = scm.getScmBlockManager().getDeletedBlockLog()
.getFailedTransactions(count, startTxId).stream()
.map(DeletedBlocksTransactionInfoWrapper::fromTxn)
.collect(Collectors.toList());
int transactionCount = (count == -1) ? Integer.MAX_VALUE : count;
List<StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction>
failedTXs = scm.getScmBlockManager().getDeletedBlockLog()
.getFailedTransactions(transactionCount, startTxId);
result =
failedTXs.stream().map(DeletedBlocksTransactionInfoWrapper::fromTxn)
.collect(Collectors.toList());
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
SCMAction.GET_FAILED_DELETED_BLOCKS_TRANSACTION, auditMap));
return result;
} catch (IOException ex) {
AUDIT.logReadFailure(
buildAuditMessageForFailure(
SCMAction.GET_FAILED_DELETED_BLOCKS_TRANSACTION, auditMap, ex)
);
AUDIT.logReadFailure(buildAuditMessageForFailure(
SCMAction.GET_FAILED_DELETED_BLOCKS_TRANSACTION, auditMap, ex));
throw ex;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public void testBlockDeletionTransactions() throws Exception {
cluster.getStorageContainerManager().getScmHAManager()
.asSCMHADBTransactionBuffer().flush();
}
return delLog.getFailedTransactions(-1, 0).size() == 0;
return delLog.getFailedTransactions(Integer.MAX_VALUE, 0).size() == 0;
} catch (IOException e) {
return false;
}
Expand Down
Loading