diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 1e84f65cdd842..3095139ca4685 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -174,7 +174,7 @@ public void testClientConnectionCloseMidStream() throws Exception { // await stream handler is ready and request full content var handler = ctx.awaitRestChannelAccepted(opaqueId); - assertBusy(() -> assertNotNull(handler.stream.buf())); + assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); assertFalse(handler.streamClosed); @@ -187,7 +187,7 @@ public void testClientConnectionCloseMidStream() throws Exception { // wait for resources to be released assertBusy(() -> { - assertNull(handler.stream.buf()); + assertEquals(0, handler.stream.bufSize()); assertTrue(handler.streamClosed); }); } @@ -204,15 +204,13 @@ public void testServerCloseConnectionMidStream() throws Exception { // await stream handler is ready and request full content var handler = ctx.awaitRestChannelAccepted(opaqueId); - assertBusy(() -> assertNotNull(handler.stream.buf())); + assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); assertFalse(handler.streamClosed); // terminate connection on server and wait resources are released handler.channel.request().getHttpChannel().close(); assertBusy(() -> { - // Cannot be simplified to assertNull. - // assertNull requires object to not fail on toString() method, but closing buffer can - assertTrue(handler.stream.buf() == null); + assertEquals(0, handler.stream.bufSize()); assertTrue(handler.streamClosed); }); } @@ -228,14 +226,14 @@ public void testServerExceptionMidStream() throws Exception { // await stream handler is ready and request full content var handler = ctx.awaitRestChannelAccepted(opaqueId); - assertBusy(() -> assertNotNull(handler.stream.buf())); + assertBusy(() -> assertNotEquals(0, handler.stream.bufSize())); assertFalse(handler.streamClosed); handler.shouldThrowInsideHandleChunk = true; handler.stream.next(); assertBusy(() -> { - assertNull(handler.stream.buf()); + assertEquals(0, handler.stream.bufSize()); assertTrue(handler.streamClosed); }); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index 238faa7a9237e..ac3e3aecf97b9 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -37,12 +37,15 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { private final List tracingHandlers = new ArrayList<>(4); private final ThreadContext threadContext; private ByteBuf buf; - private boolean hasLast = false; private boolean requested = false; private boolean closing = false; private HttpBody.ChunkHandler handler; private ThreadContext.StoredContext requestContext; + // used in tests + private volatile int bufSize = 0; + private volatile boolean hasLast = false; + public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext) { this.channel = channel; this.threadContext = threadContext; @@ -112,11 +115,12 @@ private void addChunk(ByteBuf chunk) { comp.addComponent(true, chunk); buf = comp; } + bufSize = buf.readableBytes(); } // visible for test - ByteBuf buf() { - return buf; + int bufSize() { + return bufSize; } // visible for test @@ -130,6 +134,7 @@ private void send() { var bytesRef = Netty4Utils.toReleasableBytesReference(buf); requested = false; buf = null; + bufSize = 0; try (var ignored = threadContext.restoreExistingContext(requestContext)) { for (var tracer : tracingHandlers) { tracer.onNext(bytesRef, hasLast); @@ -164,6 +169,7 @@ private void doClose() { if (buf != null) { buf.release(); buf = null; + bufSize = 0; } channel.config().setAutoRead(true); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java index 5ff5a27e2d551..d456bbecfbd20 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java @@ -67,7 +67,7 @@ public void testEnqueueChunksBeforeRequest() { for (int i = 0; i < totalChunks; i++) { channel.writeInbound(randomContent(1024)); } - assertEquals(totalChunks * 1024, stream.buf().readableBytes()); + assertEquals(totalChunks * 1024, stream.bufSize()); } // ensures all received chunks can be flushed downstream @@ -119,7 +119,7 @@ public void testReadFromChannel() { channel.writeInbound(randomLastContent(chunkSize)); for (int i = 0; i < totalChunks; i++) { - assertNull("should not enqueue chunks", stream.buf()); + assertEquals("should not enqueue chunks", 0, stream.bufSize()); stream.next(); channel.runPendingTasks(); assertEquals("each next() should produce single chunk", i + 1, gotChunks.size()); diff --git a/muted-tests.yml b/muted-tests.yml index ed52d0b6ab1a7..fb61bb7a02f88 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -223,12 +223,6 @@ tests: - class: org.elasticsearch.repositories.s3.RepositoryS3RestIT method: testReloadCredentialsFromKeystore issue: https://github.com/elastic/elasticsearch/issues/116811 -- class: org.elasticsearch.http.netty4.Netty4IncrementalRequestHandlingIT - method: testClientConnectionCloseMidStream - issue: https://github.com/elastic/elasticsearch/issues/116815 -- class: org.elasticsearch.http.netty4.Netty4IncrementalRequestHandlingIT - method: testServerExceptionMidStream - issue: https://github.com/elastic/elasticsearch/issues/116838 - class: org.elasticsearch.xpack.searchablesnapshots.hdfs.SecureHdfsSearchableSnapshotsIT issue: https://github.com/elastic/elasticsearch/issues/116851 - class: org.elasticsearch.xpack.esql.analysis.VerifierTests