Skip to content

Commit

Permalink
fix trappy http stream tests (#116829)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhl-b authored Nov 15, 2024
1 parent 13c8aae commit 9489726
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void testClientConnectionCloseMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotNull(handler.stream.buf()));
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));

assertFalse(handler.streamClosed);

Expand All @@ -187,7 +187,7 @@ public void testClientConnectionCloseMidStream() throws Exception {

// wait for resources to be released
assertBusy(() -> {
assertNull(handler.stream.buf());
assertEquals(0, handler.stream.bufSize());
assertTrue(handler.streamClosed);
});
}
Expand All @@ -204,15 +204,13 @@ public void testServerCloseConnectionMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotNull(handler.stream.buf()));
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
assertFalse(handler.streamClosed);

// terminate connection on server and wait resources are released
handler.channel.request().getHttpChannel().close();
assertBusy(() -> {
// Cannot be simplified to assertNull.
// assertNull requires object to not fail on toString() method, but closing buffer can
assertTrue(handler.stream.buf() == null);
assertEquals(0, handler.stream.bufSize());
assertTrue(handler.streamClosed);
});
}
Expand All @@ -228,14 +226,14 @@ public void testServerExceptionMidStream() throws Exception {

// await stream handler is ready and request full content
var handler = ctx.awaitRestChannelAccepted(opaqueId);
assertBusy(() -> assertNotNull(handler.stream.buf()));
assertBusy(() -> assertNotEquals(0, handler.stream.bufSize()));
assertFalse(handler.streamClosed);

handler.shouldThrowInsideHandleChunk = true;
handler.stream.next();

assertBusy(() -> {
assertNull(handler.stream.buf());
assertEquals(0, handler.stream.bufSize());
assertTrue(handler.streamClosed);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
private final ThreadContext threadContext;
private ByteBuf buf;
private boolean hasLast = false;
private boolean requested = false;
private boolean closing = false;
private HttpBody.ChunkHandler handler;
private ThreadContext.StoredContext requestContext;

// used in tests
private volatile int bufSize = 0;
private volatile boolean hasLast = false;

public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext) {
this.channel = channel;
this.threadContext = threadContext;
Expand Down Expand Up @@ -112,11 +115,12 @@ private void addChunk(ByteBuf chunk) {
comp.addComponent(true, chunk);
buf = comp;
}
bufSize = buf.readableBytes();
}

// visible for test
ByteBuf buf() {
return buf;
int bufSize() {
return bufSize;
}

// visible for test
Expand All @@ -130,6 +134,7 @@ private void send() {
var bytesRef = Netty4Utils.toReleasableBytesReference(buf);
requested = false;
buf = null;
bufSize = 0;
try (var ignored = threadContext.restoreExistingContext(requestContext)) {
for (var tracer : tracingHandlers) {
tracer.onNext(bytesRef, hasLast);
Expand Down Expand Up @@ -164,6 +169,7 @@ private void doClose() {
if (buf != null) {
buf.release();
buf = null;
bufSize = 0;
}
channel.config().setAutoRead(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testEnqueueChunksBeforeRequest() {
for (int i = 0; i < totalChunks; i++) {
channel.writeInbound(randomContent(1024));
}
assertEquals(totalChunks * 1024, stream.buf().readableBytes());
assertEquals(totalChunks * 1024, stream.bufSize());
}

// ensures all received chunks can be flushed downstream
Expand Down Expand Up @@ -119,7 +119,7 @@ public void testReadFromChannel() {
channel.writeInbound(randomLastContent(chunkSize));

for (int i = 0; i < totalChunks; i++) {
assertNull("should not enqueue chunks", stream.buf());
assertEquals("should not enqueue chunks", 0, stream.bufSize());
stream.next();
channel.runPendingTasks();
assertEquals("each next() should produce single chunk", i + 1, gotChunks.size());
Expand Down
6 changes: 0 additions & 6 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,6 @@ tests:
- class: org.elasticsearch.repositories.s3.RepositoryS3RestIT
method: testReloadCredentialsFromKeystore
issue: https://github.com/elastic/elasticsearch/issues/116811
- class: org.elasticsearch.http.netty4.Netty4IncrementalRequestHandlingIT
method: testClientConnectionCloseMidStream
issue: https://github.com/elastic/elasticsearch/issues/116815
- class: org.elasticsearch.http.netty4.Netty4IncrementalRequestHandlingIT
method: testServerExceptionMidStream
issue: https://github.com/elastic/elasticsearch/issues/116838
- class: org.elasticsearch.xpack.searchablesnapshots.hdfs.SecureHdfsSearchableSnapshotsIT
issue: https://github.com/elastic/elasticsearch/issues/116851
- class: org.elasticsearch.xpack.esql.analysis.VerifierTests
Expand Down

0 comments on commit 9489726

Please sign in to comment.