Skip to content

Commit

Permalink
HDDS-12115. Replica not found repro
Browse files Browse the repository at this point in the history
Change-Id: I015d02fdba783fc4cab85a31f53b477241c12a7a
  • Loading branch information
swamirishi committed Jan 22, 2025
1 parent 4df73d5 commit 325b5d6
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -656,4 +656,8 @@ public void compactDb() {
}
}

@VisibleForTesting
public Map<ContainerType, Handler> getHandlers() {
return handlers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,27 @@
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_EC_IMPL_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import static org.apache.hadoop.ozone.container.TestHelper.waitForContainerClose;
import static org.apache.hadoop.ozone.container.TestHelper.waitForReplicaCount;
import static org.apache.ozone.test.GenericTestUtils.setLogLevel;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.in;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -47,8 +56,10 @@
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.function.Supplier;

Expand All @@ -59,13 +70,22 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackScatter;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackAware;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
Expand All @@ -75,29 +95,35 @@
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;

import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Flaky;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.slf4j.event.Level;

/**
* Tests ozone containers replication.
*/
@Timeout(300)
@Timeout(3000)
class TestContainerReplication {

private static final String VOLUME = "vol1";
Expand Down Expand Up @@ -168,10 +194,124 @@ void testContainerReplication(
private static MiniOzoneCluster newCluster(OzoneConfiguration conf)
throws IOException {
return MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(5)
.setNumDatanodes(10)
.build();
}

private <T, O> T setSpiedField(String fieldName, Class<T> value, O object)
throws NoSuchFieldException, IllegalAccessException {
Class<?> clazz = object.getClass();
while (clazz != null) {
try {
clazz.getDeclaredField(fieldName);
break;
} catch (NoSuchFieldException e) {
clazz = clazz.getSuperclass(); // Move to the superclass
}
}
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
T val = Mockito.spy((T)field.get(object));
field.set(object, val);
return val;
}

public static String convertInputStreamToString(InputStream inputStream) throws IOException {
StringBuilder result = new StringBuilder();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
while ((line = reader.readLine()) != null) {
result.append(line).append("\n"); // Append line with a newline
}
}
return result.toString().trim(); // Trim trailing newline if necessary
}

@Test
public void testReplicaNotFound() throws Exception {
OzoneConfiguration conf = createConfiguration(false);
ReplicationManagerConfiguration repConf =
conf.getObject(ReplicationManagerConfiguration.class);
repConf.setInterval(Duration.ofSeconds(1));
repConf.setUnderReplicatedInterval(Duration.ofSeconds(1));
repConf.setOverReplicatedInterval(Duration.ofSeconds(1));
repConf.setEventTimeout(Duration.ofSeconds(10));
repConf.setDatanodeTimeoutOffset(Duration.ofSeconds(5).toMillis());
conf.setFromObject(repConf);
conf.set(OZONE_SCM_EXPIRED_CONTAINER_REPLICA_OP_SCRUB_INTERVAL, "15s");
conf.set(OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, SCMContainerPlacementRackAware.class.getCanonicalName());
AtomicBoolean blockDeletes = new AtomicBoolean(true);
try (MiniOzoneCluster cluster = newCluster(conf)) {
cluster.waitForClusterToBeReady();
try (OzoneClient client = cluster.newClient()) {
createTestData(client);
}
List<OmKeyLocationInfo> keyLocations = lookupKey(cluster);
ContainerID containerId = new ContainerID(keyLocations.get(0).getContainerID());
Set<ContainerReplica> containerReplicas =
cluster.getStorageContainerManager().getContainerManager().getContainerReplicas(containerId);
int rf = containerReplicas.size();
DatanodeDetails datanodeDetails = containerReplicas.stream().findFirst().get().getDatanodeDetails();
cluster.shutdownHddsDatanode(datanodeDetails);
GenericTestUtils.waitFor(() -> {
try {
Set<ContainerReplica> replicas =
cluster.getStorageContainerManager().getContainerManager().getContainerReplicas(containerId);
return replicas.size() == rf && !replicas.stream().map(ContainerReplica::getDatanodeDetails).collect(
Collectors.toSet()).contains(datanodeDetails);
} catch (ContainerNotFoundException e) {
return false;
}
}, 1000, 100000);
containerReplicas =
cluster.getStorageContainerManager().getContainerManager().getContainerReplicas(containerId);
cluster.getStorageContainerManager().getReplicationManager().stop();

cluster.restartHddsDatanode(datanodeDetails, true);
Set<UUID> dnId = new ConcurrentHashSet<>();
SCMNodeManager nodeManager = setSpiedField("nodeManager", SCMNodeManager.class,
cluster.getStorageContainerManager().getReplicationManager());
doAnswer(i -> {
DatanodeDetails dn = i.getArgument(0);
System.out.println("Checking Delete for DN: " + dn.getUuidString() + "\t" + dnId);
if (dnId.contains(dn.getUuid())) {
return 10000;
}
return i.callRealMethod();
}).when(nodeManager).getTotalDatanodeCommandCount(Mockito.any(DatanodeDetails.class), Mockito.eq(
StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand));
for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes()) {
OzoneContainer ozoneContainer = datanodeService.getDatanodeStateMachine().getContainer();
for (Handler handler : ozoneContainer.getHandlers().values()) {
ContainerSet containerSet = setSpiedField("containerSet", ContainerSet.class, handler);
when(containerSet.removeContainer(anyLong())).thenAnswer(invocation -> {
dnId.add(datanodeService.getDatanodeDetails().getUuid());
while (blockDeletes.get()) {
Thread.sleep(1000);
}
return invocation.callRealMethod();
});
}
}
cluster.getStorageContainerManager().getReplicationManager().start();
while (dnId.size() < 4) {
System.out.println("DNs with Delete: " + dnId);
Thread.sleep(1000);
}
blockDeletes.set(false);
waitForReplicaCount(1, 0, cluster);
try (OzoneClient client = cluster.newClient()) {

try (OzoneInputStream inputStream =
client.getObjectStore().getVolume(VOLUME).getBucket(BUCKET).getKey(KEY).getContent()) {
String content = convertInputStreamToString(inputStream);
System.out.println(content);
}
}
}

}

private static OzoneConfiguration createConfiguration(boolean enableLegacy) {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
Expand Down

0 comments on commit 325b5d6

Please sign in to comment.