From bcf3864d71aa6fcc79b36f328806289e95d43925 Mon Sep 17 00:00:00 2001 From: Chia-Chuan Yu Date: Sat, 18 Jan 2025 20:19:01 +0800 Subject: [PATCH] HDDS-12005. Refactor TestBlockDataStreamOutput --- .../client/rpc/TestBlockDataStreamOutput.java | 420 ++++++++++-------- 1 file changed, 232 insertions(+), 188 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index c1345207d99..e2f3ae97247 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -19,15 +19,18 @@ import org.apache.hadoop.hdds.DatanodeVersion; import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig; import org.apache.hadoop.hdds.scm.OzoneClientConfig; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientMetrics; import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput; import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput; -import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.ClientConfigForTesting; import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; @@ -41,22 +44,28 @@ import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.TestHelper; -import org.apache.ozone.test.tag.Flaky; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.PutBlock; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.WriteChunk; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_CHUNK_READ_NETTY_CHUNKED_NIO_FILE_KEY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -64,36 +73,54 @@ /** * Tests BlockDataStreamOutput class. */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) @Timeout(300) public class TestBlockDataStreamOutput { - private static MiniOzoneCluster cluster; - private static OzoneConfiguration conf = new OzoneConfiguration(); - private static OzoneClient client; - private static ObjectStore objectStore; - private static int chunkSize; - private static int flushSize; - private static int maxFlushSize; - private static int blockSize; - private static String volumeName; - private static String bucketName; - private static String keyString; + private MiniOzoneCluster cluster; + private static int chunkSize = 100; + private static int flushSize = 2 * chunkSize; + private static int maxFlushSize = 2 * flushSize; + private static int blockSize = 2 * maxFlushSize; + private static String volumeName = "testblockoutputstream"; + private static String bucketName = volumeName; + private static String keyString = UUID.randomUUID().toString();; private static final DatanodeVersion DN_OLD_VERSION = DatanodeVersion.SEPARATE_RATIS_PORTS_AVAILABLE; - @BeforeAll - public static void init() throws Exception { - chunkSize = 100; - flushSize = 2 * chunkSize; - maxFlushSize = 2 * flushSize; - blockSize = 2 * maxFlushSize; - + static MiniOzoneCluster createCluster() throws IOException, + InterruptedException, TimeoutException { + OzoneConfiguration conf = new OzoneConfiguration(); OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class); + clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE); + clientConfig.setStreamBufferFlushDelay(false); + clientConfig.setEnablePutblockPiggybacking(true); conf.setFromObject(clientConfig); - conf.setBoolean(OZONE_CHUNK_READ_NETTY_CHUNKED_NIO_FILE_KEY, true); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS); + conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS); conf.setQuietMode(false); - conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4, - StorageUnit.MB); + conf.setStorageSize(OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB); + conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 3); + + conf.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true); + conf.setBoolean("ozone.client.hbase.enhancements.allowed", true); + + DatanodeRatisServerConfig ratisServerConfig = + conf.getObject(DatanodeRatisServerConfig.class); + ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3)); + ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(3)); + conf.setFromObject(ratisServerConfig); + + RatisClientConfig.RaftConfig raftClientConfig = + conf.getObject(RatisClientConfig.RaftConfig.class); + raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3)); + raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(5)); + conf.setFromObject(raftClientConfig); + + RatisClientConfig ratisClientConfig = + conf.getObject(RatisClientConfig.class); + ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(30)); + ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(30)); + conf.setFromObject(ratisClientConfig); ClientConfigForTesting.newBuilder(StorageUnit.BYTES) .setBlockSize(blockSize) @@ -105,90 +132,109 @@ public static void init() throws Exception { .setDataStreamWindowSize(5 * chunkSize) .applyTo(conf); - cluster = MiniOzoneCluster.newBuilder(conf) + MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf) .setNumDatanodes(5) .setDatanodeFactory(UniformDatanodesFactory.newBuilder() .setCurrentVersion(DN_OLD_VERSION) .build()) .build(); + cluster.waitForClusterToBeReady(); - //the easiest way to create an open container is creating a key - client = OzoneClientFactory.getRpcClient(conf); - objectStore = client.getObjectStore(); - keyString = UUID.randomUUID().toString(); - volumeName = "testblockdatastreamoutput"; - bucketName = volumeName; - objectStore.createVolume(volumeName); - objectStore.getVolume(volumeName).createBucket(bucketName); + + try (OzoneClient client = cluster.newClient()) { + ObjectStore objectStore = client.getObjectStore(); + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + } + + return cluster; } - static String getKeyName() { - return UUID.randomUUID().toString(); + private static Stream clientParameters() { + return Stream.of( + Arguments.of(true, true), + Arguments.of(true, false), + Arguments.of(false, true), + Arguments.of(false, false) + ); } - @AfterAll - public static void shutdown() { - IOUtils.closeQuietly(client); - if (cluster != null) { - cluster.shutdown(); - } + private static Stream dataLengthParameters() { + return Stream.of( + Arguments.of(chunkSize / 2), + Arguments.of(chunkSize), + Arguments.of(chunkSize + 50), + Arguments.of(blockSize + 50) + ); } - @Test - public void testHalfChunkWrite() throws Exception { - testWrite(chunkSize / 2); - testWriteWithFailure(chunkSize / 2); + static OzoneClientConfig newClientConfig(ConfigurationSource source, + boolean flushDelay, boolean enablePiggybacking) { + OzoneClientConfig clientConfig = source.getObject(OzoneClientConfig.class); + clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE); + clientConfig.setStreamBufferFlushDelay(flushDelay); + clientConfig.setEnablePutblockPiggybacking(enablePiggybacking); + return clientConfig; } - @Test - public void testSingleChunkWrite() throws Exception { - testWrite(chunkSize); - testWriteWithFailure(chunkSize); + static OzoneClient newClient(OzoneConfiguration conf, + OzoneClientConfig config) throws IOException { + OzoneConfiguration copy = new OzoneConfiguration(conf); + copy.setFromObject(config); + return OzoneClientFactory.getRpcClient(copy); + } + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + * @throws IOException + */ + @BeforeAll + public void init() throws Exception { + cluster = createCluster(); } - @Test - public void testMultiChunkWrite() throws Exception { - testWrite(chunkSize + 50); - testWriteWithFailure(chunkSize + 50); + static String getKeyName() { + return UUID.randomUUID().toString(); } - @Test - @Flaky("HDDS-12027") - public void testMultiBlockWrite() throws Exception { - testWrite(blockSize + 50); - testWriteWithFailure(blockSize + 50); + /** + * Shutdown MiniDFSCluster. + */ + @AfterAll + public void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } } - static void testWrite(int dataLength) throws Exception { - XceiverClientMetrics metrics = - XceiverClientManager.getXceiverClientMetrics(); - long pendingWriteChunkCount = metrics.getPendingContainerOpCountMetrics(WriteChunk); - long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics(PutBlock); + @ParameterizedTest + @MethodSource("dataLengthParameters") + public void testHalfChunkWrite(int dataLength) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), false, true); + try (OzoneClient client = newClient(cluster.getConf(), config)) { + testWrite(client, dataLength); + testWriteWithFailure(client, dataLength); + } + } + static void testWrite(OzoneClient client, int dataLength) throws Exception { String keyName = getKeyName(); OzoneDataStreamOutput key = createKey( - keyName, ReplicationType.RATIS, dataLength); + client, keyName, dataLength); final byte[] data = ContainerTestHelper.generateData(dataLength, false); key.write(ByteBuffer.wrap(data)); // now close the stream, It will update the key length. key.close(); - validateData(keyName, data); - - assertEquals(pendingPutBlockCount, - metrics.getPendingContainerOpCountMetrics(PutBlock)); - assertEquals(pendingWriteChunkCount, - metrics.getPendingContainerOpCountMetrics(WriteChunk)); + validateData(client, keyName, data); } - private void testWriteWithFailure(int dataLength) throws Exception { - XceiverClientMetrics metrics = - XceiverClientManager.getXceiverClientMetrics(); - long pendingWriteChunkCount = metrics.getPendingContainerOpCountMetrics(WriteChunk); - long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics(PutBlock); - + private void testWriteWithFailure(OzoneClient client, int dataLength) throws Exception { String keyName = getKeyName(); OzoneDataStreamOutput key = createKey( - keyName, ReplicationType.RATIS, dataLength); + client, keyName, dataLength); byte[] data = ContainerTestHelper.getFixedLengthString(keyString, dataLength) .getBytes(UTF_8); @@ -203,129 +249,127 @@ private void testWriteWithFailure(int dataLength) throws Exception { key.write(b); key.close(); String dataString = new String(data, UTF_8); - validateData(keyName, dataString.concat(dataString).getBytes(UTF_8)); - - assertEquals(pendingPutBlockCount, - metrics.getPendingContainerOpCountMetrics(PutBlock)); - assertEquals(pendingWriteChunkCount, - metrics.getPendingContainerOpCountMetrics(WriteChunk)); + validateData(client, keyName, dataString.concat(dataString).getBytes(UTF_8)); } - @Test - public void testPutBlockAtBoundary() throws Exception { - int dataLength = maxFlushSize + 100; - XceiverClientMetrics metrics = - XceiverClientManager.getXceiverClientMetrics(); - long writeChunkCount = metrics.getContainerOpCountMetrics(WriteChunk); - long putBlockCount = metrics.getContainerOpCountMetrics(PutBlock); - long pendingWriteChunkCount = metrics.getPendingContainerOpCountMetrics(WriteChunk); - long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics(PutBlock); - long totalOpCount = metrics.getTotalOpCount(); - - String keyName = getKeyName(); - OzoneDataStreamOutput key = createKey( - keyName, ReplicationType.RATIS, 0); - byte[] data = - ContainerTestHelper.getFixedLengthString(keyString, dataLength) - .getBytes(UTF_8); - key.write(ByteBuffer.wrap(data)); - assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock)) - .isLessThanOrEqualTo(pendingPutBlockCount + 1); - assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk)) - .isLessThanOrEqualTo(pendingWriteChunkCount + 5); - key.close(); - // Since data length is 500 , first putBlock will be at 400(flush boundary) - // and the other at 500 - assertEquals(putBlockCount + 2, - metrics.getContainerOpCountMetrics(PutBlock)); - // Each chunk is 100 so there will be 500 / 100 = 5 chunks. - assertEquals(writeChunkCount + 5, - metrics.getContainerOpCountMetrics(WriteChunk)); - assertEquals(totalOpCount + 7, - metrics.getTotalOpCount()); - assertEquals(pendingPutBlockCount, - metrics.getPendingContainerOpCountMetrics(PutBlock)); - assertEquals(pendingWriteChunkCount, - metrics.getPendingContainerOpCountMetrics(WriteChunk)); - - validateData(keyName, data); + static OzoneDataStreamOutput createKey(OzoneClient client, String keyName) throws Exception { + return createKey(client, keyName, 0); } - - static OzoneDataStreamOutput createKey(String keyName, ReplicationType type, + static OzoneDataStreamOutput createKey(OzoneClient client, String keyName, long size) throws Exception { - return TestHelper.createStreamKey( - keyName, type, size, objectStore, volumeName, bucketName); + return TestHelper.createStreamKey(keyName, ReplicationType.RATIS, size, + client.getObjectStore(), volumeName, bucketName); } - static void validateData(String keyName, byte[] data) throws Exception { + + static void validateData(OzoneClient client, String keyName, byte[] data) throws Exception { TestHelper.validateData( - keyName, data, objectStore, volumeName, bucketName); + keyName, data, client.getObjectStore(), volumeName, bucketName); } - - @Test - public void testMinPacketSize() throws Exception { - String keyName = getKeyName(); - XceiverClientMetrics metrics = - XceiverClientManager.getXceiverClientMetrics(); - OzoneDataStreamOutput key = createKey(keyName, ReplicationType.RATIS, 0); - long writeChunkCount = metrics.getContainerOpCountMetrics(WriteChunk); - long pendingWriteChunkCount = metrics.getPendingContainerOpCountMetrics(WriteChunk); - byte[] data = - ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 2) - .getBytes(UTF_8); - key.write(ByteBuffer.wrap(data)); - // minPacketSize= 100, so first write of 50 wont trigger a writeChunk - assertEquals(writeChunkCount, - metrics.getContainerOpCountMetrics(WriteChunk)); - key.write(ByteBuffer.wrap(data)); - assertEquals(writeChunkCount + 1, - metrics.getContainerOpCountMetrics(WriteChunk)); - // now close the stream, It will update the key length. - key.close(); - assertEquals(pendingWriteChunkCount, - metrics.getPendingContainerOpCountMetrics(WriteChunk)); - String dataString = new String(data, UTF_8); - validateData(keyName, dataString.concat(dataString).getBytes(UTF_8)); + @ParameterizedTest + @MethodSource("clientParameters") + public void testPutBlockAtBoundary(boolean flushDelay, boolean enablePiggybacking) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); + try (OzoneClient client = newClient(cluster.getConf(), config)) { + int dataLength = 500; + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + long putBlockCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics( + ContainerProtos.Type.PutBlock); + String keyName = getKeyName(); + OzoneDataStreamOutput key = createKey( + client, keyName, 0); + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + key.write(ByteBuffer.wrap(data)); + assertThat(metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock)) + .isLessThanOrEqualTo(pendingPutBlockCount + 1); + key.close(); + // Since data length is 500 , first putBlock will be at 400(flush boundary) + // and the other at 500 + assertEquals( + metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock), + putBlockCount + 2); + validateData(client, keyName, data); + } } - @Test - public void testTotalAckDataLength() throws Exception { - int dataLength = 400; - String keyName = getKeyName(); - OzoneDataStreamOutput key = createKey( - keyName, ReplicationType.RATIS, 0); - byte[] data = - ContainerTestHelper.getFixedLengthString(keyString, dataLength) - .getBytes(UTF_8); - KeyDataStreamOutput keyDataStreamOutput = - (KeyDataStreamOutput) key.getByteBufStreamOutput(); - BlockDataStreamOutputEntry stream = - keyDataStreamOutput.getStreamEntries().get(0); - key.write(ByteBuffer.wrap(data)); - key.close(); - assertEquals(dataLength, stream.getTotalAckDataLength()); + @ParameterizedTest + @MethodSource("clientParameters") + public void testMinPacketSize(boolean flushDelay, boolean enablePiggybacking) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); + try (OzoneClient client = newClient(cluster.getConf(), config)) { + String keyName = getKeyName(); + XceiverClientMetrics metrics = + XceiverClientManager.getXceiverClientMetrics(); + OzoneDataStreamOutput key = createKey(client, keyName, 0); + long writeChunkCount = + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk); + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 2) + .getBytes(UTF_8); + key.write(ByteBuffer.wrap(data)); + // minPacketSize= 100, so first write of 50 wont trigger a writeChunk + assertEquals(writeChunkCount, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + key.write(ByteBuffer.wrap(data)); + assertEquals(writeChunkCount + 1, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + // now close the stream, It will update the key length. + key.close(); + String dataString = new String(data, UTF_8); + validateData(client, keyName, dataString.concat(dataString).getBytes(UTF_8)); + } } - @Test - public void testDatanodeVersion() throws Exception { - // Verify all DNs internally have versions set correctly - List dns = cluster.getHddsDatanodes(); - for (HddsDatanodeService dn : dns) { - DatanodeDetails details = dn.getDatanodeDetails(); - assertEquals(DN_OLD_VERSION.toProtoValue(), details.getCurrentVersion()); + @ParameterizedTest + @MethodSource("clientParameters") + public void testTotalAckDataLength(boolean flushDelay, boolean enablePiggybacking) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); + try (OzoneClient client = newClient(cluster.getConf(), config)) { + int dataLength = 400; + String keyName = getKeyName(); + OzoneDataStreamOutput key = createKey( + client, keyName, 0); + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + KeyDataStreamOutput keyDataStreamOutput = + (KeyDataStreamOutput) key.getByteBufStreamOutput(); + BlockDataStreamOutputEntry stream = + keyDataStreamOutput.getStreamEntries().get(0); + key.write(ByteBuffer.wrap(data)); + key.close(); + assertEquals(dataLength, stream.getTotalAckDataLength()); } + } - String keyName = getKeyName(); - OzoneDataStreamOutput key = createKey(keyName, ReplicationType.RATIS, 0); - KeyDataStreamOutput keyDataStreamOutput = (KeyDataStreamOutput) key.getByteBufStreamOutput(); - BlockDataStreamOutputEntry stream = keyDataStreamOutput.getStreamEntries().get(0); - - // Now check 3 DNs in a random pipeline returns the correct DN versions - List streamDnDetails = stream.getPipeline().getNodes(); - for (DatanodeDetails details : streamDnDetails) { - assertEquals(DN_OLD_VERSION.toProtoValue(), details.getCurrentVersion()); + @ParameterizedTest + @MethodSource("clientParameters") + public void testDatanodeVersion(boolean flushDelay, boolean enablePiggybacking) throws Exception { + OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, enablePiggybacking); + try (OzoneClient client = newClient(cluster.getConf(), config)) { + // Verify all DNs internally have versions set correctly + List dns = cluster.getHddsDatanodes(); + for (HddsDatanodeService dn : dns) { + DatanodeDetails details = dn.getDatanodeDetails(); + assertEquals(DN_OLD_VERSION.toProtoValue(), details.getCurrentVersion()); + } + + String keyName = getKeyName(); + OzoneDataStreamOutput key = createKey(client, keyName, 0); + KeyDataStreamOutput keyDataStreamOutput = (KeyDataStreamOutput) key.getByteBufStreamOutput(); + BlockDataStreamOutputEntry stream = keyDataStreamOutput.getStreamEntries().get(0); + + // Now check 3 DNs in a random pipeline returns the correct DN versions + List streamDnDetails = stream.getPipeline().getNodes(); + for (DatanodeDetails details : streamDnDetails) { + assertEquals(DN_OLD_VERSION.toProtoValue(), details.getCurrentVersion()); + } } } - }