From 25e405f7ae211489ca6baadb8479689a56be024a Mon Sep 17 00:00:00 2001 From: Thomas Segismont Date: Thu, 16 Jan 2025 15:17:59 +0100 Subject: [PATCH] WebClient failure when sending multipart formdata from a virtual thread MultipartFormUpload is created on the Vert.x context that is bound to the request promise. We should create the queue using the context executor for both the producer and consumer sides. Otherwise, if there is a large upload and the queue needs to be paused/resumed, the pump method will be invoked on the EventLoop executor bound to this context. And then the IllegalArgumentException is thrown. Signed-off-by: Thomas Segismont --- .../web/client/impl/MultipartFormUpload.java | 7 ++- .../client/tests/MultipartFormUploadTest.java | 54 +++++++++++++------ 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/vertx-web-client/src/main/java/io/vertx/ext/web/client/impl/MultipartFormUpload.java b/vertx-web-client/src/main/java/io/vertx/ext/web/client/impl/MultipartFormUpload.java index 5c11d30833..0c404d627d 100644 --- a/vertx-web-client/src/main/java/io/vertx/ext/web/client/impl/MultipartFormUpload.java +++ b/vertx-web-client/src/main/java/io/vertx/ext/web/client/impl/MultipartFormUpload.java @@ -19,7 +19,6 @@ import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.multipart.*; -import io.vertx.core.Context; import io.vertx.core.Handler; import io.vertx.core.MultiMap; import io.vertx.core.buffer.Buffer; @@ -56,13 +55,13 @@ public class MultipartFormUpload implements ReadStream { private boolean ended; private final ContextInternal context; - public MultipartFormUpload(Context context, + public MultipartFormUpload(ContextInternal context, MultipartForm parts, boolean multipart, HttpPostRequestEncoder.EncoderMode encoderMode) throws Exception { - this.context = (ContextInternal) context; + this.context = context; this.writable = true; - this.pending = new InboundMessageQueue<>(((ContextInternal) context).eventLoop(), ((ContextInternal) context).executor()) { + this.pending = new InboundMessageQueue<>(context.executor(), context.executor()) { @Override protected void handleResume() { writable = true; diff --git a/vertx-web-client/src/test/java/io/vertx/ext/web/client/tests/MultipartFormUploadTest.java b/vertx-web-client/src/test/java/io/vertx/ext/web/client/tests/MultipartFormUploadTest.java index 3016a6e6d0..7be10548d6 100644 --- a/vertx-web-client/src/test/java/io/vertx/ext/web/client/tests/MultipartFormUploadTest.java +++ b/vertx-web-client/src/test/java/io/vertx/ext/web/client/tests/MultipartFormUploadTest.java @@ -16,19 +16,17 @@ package io.vertx.ext.web.client.tests; import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder; -import io.vertx.core.Context; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; +import io.vertx.core.internal.ContextInternal; +import io.vertx.core.internal.VertxInternal; import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; import io.vertx.ext.web.client.impl.MultipartFormUpload; import io.vertx.ext.web.multipart.MultipartForm; import io.vertx.test.core.TestUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; +import org.junit.*; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; @@ -40,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeTrue; @RunWith(VertxUnitRunner.class) public class MultipartFormUploadTest { @@ -47,11 +46,11 @@ public class MultipartFormUploadTest { @ClassRule public static TemporaryFolder testFolder = new TemporaryFolder(); - private Vertx vertx; + private VertxInternal vertx; @Before public void setUp() throws Exception { - vertx = Vertx.vertx(); + vertx = (VertxInternal) Vertx.vertx(); } @After @@ -63,7 +62,7 @@ public void tearDown(TestContext ctx) { public void testSimpleAttribute(TestContext ctx) throws Exception { Async async = ctx.async(); Buffer result = Buffer.buffer(); - Context context = vertx.getOrCreateContext(); + ContextInternal context = vertx.getOrCreateContext(); MultipartFormUpload upload = new MultipartFormUpload(context, MultipartForm.create().attribute("foo", "bar"), false, HttpPostRequestEncoder.EncoderMode.RFC1738); upload.endHandler(v -> { assertEquals("foo=bar", result.toString()); @@ -75,24 +74,45 @@ public void testSimpleAttribute(TestContext ctx) throws Exception { } @Test - public void testFileUpload(TestContext ctx) throws Exception { - testFileUpload(ctx, false); + public void testFileUploadEventLoopContext(TestContext ctx) throws Exception { + testFileUpload(ctx, vertx.createEventLoopContext(), false); } @Test - public void testFileUploadPaused(TestContext ctx) throws Exception { - testFileUpload(ctx, true); + public void testFileUploadWorkerContext(TestContext ctx) throws Exception { + testFileUpload(ctx, vertx.createWorkerContext(), false); } - private void testFileUpload(TestContext ctx, boolean paused) throws Exception { + @Test + public void testFileUploadVirtualThreadContext(TestContext ctx) throws Exception { + assumeTrue(vertx.isVirtualThreadAvailable()); + testFileUpload(ctx, vertx.createVirtualThreadContext(), false); + } + + @Test + public void testFileUploadPausedEventLoopContext(TestContext ctx) throws Exception { + testFileUpload(ctx, vertx.createEventLoopContext(), true); + } + + @Test + public void testFileUploadPausedWorkerContext(TestContext ctx) throws Exception { + testFileUpload(ctx, vertx.createWorkerContext(), true); + } + + @Test + public void testFileUploadPausedVirtualThreadContext(TestContext ctx) throws Exception { + assumeTrue(vertx.isVirtualThreadAvailable()); + testFileUpload(ctx, vertx.createVirtualThreadContext(), true); + } + + private void testFileUpload(TestContext testContext, ContextInternal context, boolean paused) throws Exception { File file = testFolder.newFile(); Files.write(file.toPath(), TestUtils.randomByteArray(32 * 1024)); String filename = file.getName(); String pathname = file.getAbsolutePath(); - Async async = ctx.async(); - Context context = vertx.getOrCreateContext(); + Async async = testContext.async(); context.runOnContext(v1 -> { try { MultipartFormUpload upload = new MultipartFormUpload(context, MultipartForm.create().textFileUpload( @@ -104,7 +124,7 @@ private void testFileUpload(TestContext ctx, boolean paused) throws Exception { AtomicInteger end = new AtomicInteger(); upload.endHandler(v2 -> { assertEquals(0, end.getAndIncrement()); - ctx.assertTrue(buffers.size() > 0); + testContext.assertFalse(buffers.isEmpty()); async.complete(); }); upload.handler(buffer -> { @@ -119,7 +139,7 @@ private void testFileUpload(TestContext ctx, boolean paused) throws Exception { context.runOnContext(v3 -> upload.resume()); } } catch (Exception e) { - ctx.fail(e); + testContext.fail(e); throw new AssertionError(e); } });