diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java index 891788aa5c8d..3bbcc7aa2ca8 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/Flusher.java @@ -13,15 +13,19 @@ package org.eclipse.jetty.fcgi.generator; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import java.util.Queue; +import java.util.concurrent.CountDownLatch; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.thread.AutoLock; import org.slf4j.Logger; @@ -33,7 +37,7 @@ public class Flusher private final AutoLock lock = new AutoLock(); private final Queue queue = new ArrayDeque<>(); - private final IteratingCallback flushCallback = new FlushCallback(); + private final FlushCallback flushCallback = new FlushCallback(); private final EndPoint endPoint; public Flusher(EndPoint endPoint) @@ -41,6 +45,31 @@ public Flusher(EndPoint endPoint) this.endPoint = endPoint; } + public Callback cancel(Throwable cause) + { + // Cancel the IteratingCallback and take the nest Callback + CancelSendException cancelSendException = new CancelSendException(cause); + if (!flushCallback.abort(cancelSendException)) + return Callback.NOOP; + + // We now know that we aborted this ICB with the CSE above, so onAbort will eventually be called + // in a serialized context and the callback will be set on the CSE. + + // If a write operation has been scheduled cancel it and fail its callback, otherwise complete ourselves + Callback writeCallback = endPoint.cancelWrite(cause); + + if (writeCallback == null) + // There was no write in operation, so we can complete the CSE ourselves + cancelSendException.complete(); + else + // The write was cancelled and the callback is this ICB, so failing it will call onCompleted + writeCallback.failed(cause); + + // wait for the cancellation to be complete and the callback to be set by onAbort. + // This should never block indefinitely, as onAborted only waits for active states like PROCESSING to complete. + return cancelSendException.join(); + } + public void flush(ByteBufferPool.Accumulator accumulator, Callback callback) { offer(new Entry(accumulator, callback)); @@ -73,6 +102,44 @@ public void shutdown() })); } + private static class CancelSendException extends IOException + { + private final CountDownLatch _complete = new CountDownLatch(2); + private Callback _callback; + + public CancelSendException(Throwable cause) + { + super(cause); + } + + public void complete() + { + _complete.countDown(); + } + + public Callback join() + { + try + { + _complete.await(); + } + catch (InterruptedException x) + { + Throwable cause = getCause(); + ExceptionUtil.addSuppressedIfNotAssociated(cause, x); + throw new RuntimeIOException(cause); + } + + return _callback; + } + + public void setCallback(Callback callback) + { + _callback = callback; + _complete.countDown(); + } + } + private class FlushCallback extends IteratingCallback { private Entry active; @@ -94,6 +161,28 @@ protected Action process() throws Exception return Action.SCHEDULED; } + @Override + protected void onAborted(Throwable cause) + { + if (cause instanceof CancelSendException cancelSend) + cancelSend.setCallback(resetCallback()); + } + + private Callback resetCallback() + { + Flusher.Entry entry = active; + active = null; + return Callback.from(entry.callback, entry::release); + } + + @Override + protected void onCompleted(Throwable causeOrNull) + { + if (causeOrNull instanceof CancelSendException cancelSendException) + cancelSendException.complete(); + super.onCompleted(causeOrNull); + } + @Override protected void onCompleteSuccess() { diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java index 1e85cadfc01c..9747b60529f9 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/HttpStreamOverFCGI.java @@ -250,6 +250,12 @@ public void send(MetaData.Request request, MetaData.Response response, boolean l } } + @Override + public Runnable cancelSend(Throwable cause, Callback appCallback) + { + return () -> Callback.combine(_connection.getFlusher().cancel(cause), appCallback).failed(cause); + } + private void commit(MetaData.Response info, boolean head, boolean last, ByteBuffer content, Callback callback) { if (LOG.isDebugEnabled()) diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index ad33d95ec1dc..8ebb5b08e67a 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -736,6 +736,25 @@ void reset(HTTP2Stream stream, ResetFrame frame, Callback callback) }, callback), frame); } + public void flush(Callback callback) + { + Entry entry = new Entry(new FlushFrame(), null, callback) + { + @Override + public int getFrameBytesGenerated() + { + return 0; + } + + @Override + public boolean generate(RetainableByteBuffer.Mutable accumulator) + { + return true; + } + }; + frame(entry, true); + } + /** *

Invoked internally and by applications to send a GO_AWAY frame to the other peer.

* @@ -1336,6 +1355,8 @@ public boolean shouldBeDropped() case WINDOW_UPDATE: case PREFACE: case DISCONNECT: + case FAILURE: + case FLUSH: return false; // Frames of this type follow the logic below. case DATA: @@ -2518,4 +2539,12 @@ protected boolean onExpired(HTTP2Stream stream) return false; } } + + private static class FlushFrame extends Frame + { + public FlushFrame() + { + super(FrameType.FLUSH); + } + } } diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 0e057809c4f8..41ea13992c7d 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -166,7 +166,7 @@ public void reset(ResetFrame frame, Callback callback) Throwable resetFailure = null; try (AutoLock ignored = lock.lock()) { - if (isReset()) + if (localReset) { resetFailure = failure; } @@ -180,6 +180,8 @@ public void reset(ResetFrame frame, Callback callback) session.dataConsumed(this, flowControlLength); if (resetFailure != null) { + if (LOG.isDebugEnabled()) + LOG.debug("failing callback immediately as stream {} already is locally reset", this, resetFailure); close(); session.removeStream(this); callback.failed(resetFailure); diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java index 98c5e63b54d0..442b5ebdcd1e 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java @@ -24,11 +24,13 @@ import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.thread.Invocable; import org.slf4j.Logger; @@ -86,16 +88,17 @@ public void shutdownOutput() switch (current.state) { case IDLE: - case OSHUTTING: if (!writeState.compareAndSet(current, WriteState.OSHUT)) break; Callback oshutCallback = Callback.from(Invocable.InvocationType.NON_BLOCKING, this::oshutSuccess, this::oshutFailure); stream.data(new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), oshutCallback); return; case PENDING: - if (!writeState.compareAndSet(current, WriteState.OSHUTTING)) + Callback callback = ((WriteState.Pending)current).callback; + if (!writeState.compareAndSet(current, new WriteState.Pending(WriteState.State.PENDING_OSHUT, this, callback))) break; return; + case PENDING_OSHUT: case OSHUT: case FAILED: return; @@ -110,7 +113,7 @@ private void oshutSuccess() { case IDLE: case PENDING: - case OSHUTTING: + case PENDING_OSHUT: throw new IllegalStateException(); case OSHUT: case FAILED: @@ -123,19 +126,10 @@ private void oshutFailure(Throwable failure) while (true) { WriteState current = writeState.get(); - switch (current.state) - { - case IDLE: - case PENDING: - case OSHUTTING: - throw new IllegalStateException(); - case OSHUT: - if (!writeState.compareAndSet(current, new WriteState(WriteState.State.FAILED, failure))) - break; - return; - case FAILED: - return; - } + if (current.state != WriteState.State.OSHUT) + return; + if (writeState.compareAndSet(current, new WriteState(WriteState.State.FAILED, failure))) + return; } } @@ -143,7 +137,7 @@ private void oshutFailure(Throwable failure) public boolean isOutputShutdown() { WriteState.State state = writeState.get().state; - return state == WriteState.State.OSHUTTING || state == WriteState.State.OSHUT; + return state == WriteState.State.PENDING_OSHUT || state == WriteState.State.OSHUT || state == WriteState.State.FAILED; } @Override @@ -244,22 +238,12 @@ public boolean flush(ByteBuffer... buffers) throws IOException // stream.data() would remain as a pending operation. WriteState current = writeState.get(); - switch (current.state) + return switch (current.state) { - case IDLE, PENDING -> - { - return false; - } - case OSHUTTING, OSHUT -> throw new EofException("Output shutdown"); - case FAILED -> - { - Throwable failure = current.failure; - if (failure instanceof IOException) - throw (IOException)failure; - throw new IOException(failure); - } - default -> throw new IllegalStateException("Unexpected state: " + current.state); - } + case IDLE, PENDING -> false; + case PENDING_OSHUT, OSHUT -> throw new EofException("Output shutdown"); + case FAILED -> throw IO.rethrow(current.failure); + }; } @Override @@ -325,15 +309,15 @@ public void write(Callback callback, ByteBuffer... buffers) throws WritePendingE { case IDLE -> { - if (!writeState.compareAndSet(current, WriteState.PENDING)) + WriteState.Pending pending = new WriteState.Pending(WriteState.State.PENDING, this, callback); + if (!writeState.compareAndSet(current, pending)) continue; // TODO: we really need a Stream primitive to write multiple frames. ByteBuffer result = coalesce(buffers); - Callback dataCallback = Callback.from(Invocable.getInvocationType(callback), () -> writeSuccess(callback), x -> writeFailure(x, callback)); - stream.data(new DataFrame(stream.getId(), result, false), dataCallback); + stream.data(new DataFrame(stream.getId(), result, false), pending); } case PENDING -> callback.failed(new WritePendingException()); - case OSHUTTING, OSHUT -> callback.failed(new EofException("Output shutdown")); + case PENDING_OSHUT, OSHUT -> callback.failed(new EofException("Output shutdown")); case FAILED -> callback.failed(current.failure); default -> callback.failed(new IllegalStateException("Unexpected state: " + current.state)); } @@ -342,51 +326,90 @@ public void write(Callback callback, ByteBuffer... buffers) throws WritePendingE } } - private void writeSuccess(Callback callback) + @Override + public Callback cancelWrite(Throwable cause) + { + while (true) + { + WriteState current = writeState.get(); + switch (current.state) + { + case IDLE -> + { + if (writeState.compareAndSet(current, new WriteState(WriteState.State.FAILED, cause))) + { + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); + return null; + } + } + case PENDING, PENDING_OSHUT -> + { + if (writeState.compareAndSet(current, new WriteState(WriteState.State.FAILED, cause))) + { + WriteState.Pending pending = (WriteState.Pending)current; + // Initiate a reset() and a flush(). + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code)); + + try (Callback.Combination callbacks = new Callback.Combination(pending.callback, cause)) + { + stream.getSession().flush(callbacks.newCallback()); + return callbacks.newCallback(); + } + } + } + case FAILED -> + { + ExceptionUtil.addSuppressedIfNotAssociated(current.failure, cause); + return null; + } + default -> + { + if (writeState.compareAndSet(current, new WriteState(WriteState.State.FAILED, cause))) + return null; + } + } + } + } + + private void writeSuccess() { while (true) { WriteState current = writeState.get(); switch (current.state) { - case IDLE, OSHUT -> callback.failed(new IllegalStateException()); case PENDING -> { if (!writeState.compareAndSet(current, WriteState.IDLE)) continue; - callback.succeeded(); + ((WriteState.Pending)current).callback.succeeded(); } - case OSHUTTING -> + case PENDING_OSHUT -> { - callback.succeeded(); - shutdownOutput(); + if (!writeState.compareAndSet(current, WriteState.OSHUT)) + continue; + ((WriteState.Pending)current).callback.succeeded(); + // Complete the shutdown of the output. + stream.data(new DataFrame(stream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP); } - case FAILED -> callback.failed(current.failure); - default -> callback.failed(new IllegalStateException("Unexpected state: " + current.state)); } return; } } - private void writeFailure(Throwable failure, Callback callback) + private void writeFailure(Throwable failure) { while (true) { WriteState current = writeState.get(); switch (current.state) { - case IDLE, OSHUT -> callback.failed(new IllegalStateException(failure)); - case PENDING, OSHUTTING -> + case PENDING, PENDING_OSHUT -> { if (!writeState.compareAndSet(current, new WriteState(WriteState.State.FAILED, failure))) continue; - callback.failed(failure); - } - case FAILED -> - { - // Already failed. + ((WriteState.Pending)current).callback.failed(failure); } - default -> callback.failed(new IllegalStateException("Unexpected state: " + current.state)); } return; } @@ -498,8 +521,6 @@ public String toString() private static class WriteState { public static final WriteState IDLE = new WriteState(State.IDLE); - public static final WriteState PENDING = new WriteState(State.PENDING); - public static final WriteState OSHUTTING = new WriteState(State.OSHUTTING); public static final WriteState OSHUT = new WriteState(State.OSHUT); private final State state; @@ -524,7 +545,38 @@ public String toString() private enum State { - IDLE, PENDING, OSHUTTING, OSHUT, FAILED + IDLE, PENDING, PENDING_OSHUT, OSHUT, FAILED + } + + private static class Pending extends WriteState implements Callback + { + private final HTTP2StreamEndPoint endpoint; + private final Callback callback; + + private Pending(State state, HTTP2StreamEndPoint endPoint, Callback callback) + { + super(state); + this.endpoint = endPoint; + this.callback = callback; + } + + @Override + public void succeeded() + { + endpoint.writeSuccess(); + } + + @Override + public void failed(Throwable x) + { + endpoint.writeFailure(x); + } + + @Override + public InvocationType getInvocationType() + { + return callback.getInvocationType(); + } } } } diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/frames/DisconnectFrame.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/frames/DisconnectFrame.java index 1d332304cdfb..bba64baf267f 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/frames/DisconnectFrame.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/frames/DisconnectFrame.java @@ -13,6 +13,7 @@ package org.eclipse.jetty.http2.frames; +@Deprecated (forRemoval = true, since = "12.1.0") public class DisconnectFrame extends Frame { public DisconnectFrame() diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/frames/FrameType.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/frames/FrameType.java index 28326781f902..8bd855c25941 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/frames/FrameType.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/frames/FrameType.java @@ -31,7 +31,8 @@ public enum FrameType // Synthetic frames only needed by the implementation. PREFACE(10), DISCONNECT(11), - FAILURE(12); + FAILURE(12), + FLUSH(13); public static FrameType from(int type) { diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java index eca1caa9e514..d82859d5dcac 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/internal/HTTP2Flusher.java @@ -175,24 +175,24 @@ protected Action process() throws Throwable { boolean rethrow = true; if (terminated instanceof HpackException.SessionException) - { - HTTP2Session.Entry entry = entries.peek(); - if (entry != null) - { - FrameType frameType = entry.frame().getType(); - if (frameType == FrameType.RST_STREAM || frameType == FrameType.GO_AWAY) - { - rethrow = false; - if (frameType == FrameType.GO_AWAY) - { - // Allow a SessionException to be processed once to send a GOAWAY. - terminated = new ClosedChannelException().initCause(terminated); - } - } - } - } - if (rethrow) - throw terminated; + { + HTTP2Session.Entry entry = entries.peek(); + if (entry != null) + { + FrameType frameType = entry.frame().getType(); + if (frameType == FrameType.RST_STREAM || frameType == FrameType.GO_AWAY) + { + rethrow = false; + if (frameType == FrameType.GO_AWAY) + { + // Allow a SessionException to be processed once to send a GOAWAY. + terminated = new ClosedChannelException().initCause(terminated); + } + } + } + } + if (rethrow) + throw terminated; } WindowEntry windowEntry; diff --git a/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HttpStreamOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HttpStreamOverHTTP2.java index 10fc0827b57f..1b639cbd5c4d 100644 --- a/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HttpStreamOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HttpStreamOverHTTP2.java @@ -437,6 +437,24 @@ private void sendContent(MetaData.Request request, ByteBuffer content, boolean l } } + @Override + public Runnable cancelSend(Throwable cause, Callback appCallback) + { + // Append a ResetFrame into the H2 flusher and complete the appCallback once all the frames + // up to and including the reset one have been flushed; it is needed to wait to make sure + // the completion listeners aren't called while a write is still pending. + return () -> + { + _stream.reset(new ResetFrame(_stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); + _stream.getSession().flush(Callback.from(() -> + { + if (LOG.isDebugEnabled()) + LOG.debug("cancelSend reset and flushed"); + appCallback.failed(cause); + })); + }; + } + private HttpFields retrieveTrailers() { Supplier supplier = _responseMetaData.getTrailersSupplier(); diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/CancelWriteTest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/CancelWriteTest.java new file mode 100644 index 000000000000..ccd784e7fe89 --- /dev/null +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/CancelWriteTest.java @@ -0,0 +1,134 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http2.tests; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.io.AbstractEndPoint; +import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.io.WriteFlusher; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.HttpStream; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; +import org.eclipse.jetty.util.Callback; +import org.junit.jupiter.api.Test; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class CancelWriteTest extends AbstractTest +{ + @Test + public void testCancelCongestedWrite() throws Exception + { + CountDownLatch serverFlusherPendingLatch = new CountDownLatch(1); + CountDownLatch serverWriteSuccessLatch = new CountDownLatch(1); + CountDownLatch serverWriteFailureLatch = new CountDownLatch(1); + + start(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + AtomicBoolean httpStreamCancelled = new AtomicBoolean(); + request.addHttpStreamWrapper(w -> new HttpStream.Wrapper(w) + { + @Override + public Runnable cancelSend(Throwable cause, Callback appCallback) + { + return super.cancelSend(cause, Callback.from(appCallback::succeeded, x -> + { + // Make sure this callback gets called before the Response.write() one. + httpStreamCancelled.set(true); + appCallback.failed(x); + })); + } + }); + + RetainableByteBuffer.Mutable buffer = server.getByteBufferPool().acquire(128 * 1024 * 1024, true); + ByteBuffer byteBuffer = buffer.getByteBuffer(); + byteBuffer.clear(); + response.write(true, byteBuffer, Callback.from(() -> + { + serverWriteSuccessLatch.countDown(); + + // Release the buffer. + buffer.release(); + + // Complete the Handler callback. + callback.succeeded(); + }, x -> + { + assertTrue(httpStreamCancelled.get()); + serverWriteFailureLatch.countDown(); + + // Release the buffer. + buffer.release(); + + // Complete the Handler callback. + callback.failed(x); + })); + + // Wait until TCP congestion. + await().atMost(5, TimeUnit.SECONDS).until(() -> + { + AbstractEndPoint endPoint = (AbstractEndPoint)request.getConnectionMetaData().getConnection().getEndPoint(); + WriteFlusher flusher = endPoint.getWriteFlusher(); + return flusher.isPending(); + }); + serverFlusherPendingLatch.countDown(); + + return true; + } + }); + + // Set the HTTP/2 flow control windows very large so we can + // cause TCP congestion, not HTTP/2 flow control congestion. + http2Client.setInitialSessionRecvWindow(512 * 1024 * 1024); + http2Client.setInitialStreamRecvWindow(512 * 1024 * 1024); + Session session = newClientSession(new Session.Listener() {}); + session.newStream(new HeadersFrame(newRequest("GET", HttpFields.EMPTY), null, true), new Stream.Listener() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame, Callback callback) + { + try + { + // Block to cause TCP congestion. + assertTrue(serverFlusherPendingLatch.await(5, TimeUnit.SECONDS)); + + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), callback); + } + catch (InterruptedException e) + { + callback.failed(e); + } + } + }); + + assertTrue(serverWriteFailureLatch.await(5, TimeUnit.SECONDS)); + assertFalse(serverWriteSuccessLatch.await(1, TimeUnit.SECONDS)); + } +} diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/StreamResetTest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/StreamResetTest.java index 611b3e06109a..e6234229517a 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/StreamResetTest.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/StreamResetTest.java @@ -29,10 +29,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Supplier; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; @@ -60,11 +60,10 @@ import org.eclipse.jetty.http2.internal.HTTP2Flusher; import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; -import org.eclipse.jetty.io.AbstractEndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.RetainableByteBuffer; -import org.eclipse.jetty.io.WriteFlusher; +import org.eclipse.jetty.io.SelectableChannelEndPoint; import org.eclipse.jetty.logging.StacklessLogging; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpConfiguration; @@ -76,7 +75,6 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FuturePromise; -import org.eclipse.jetty.util.NanoTime; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; @@ -817,10 +815,9 @@ public boolean handle(Request request, Response response, Callback callback) thr } @Test - public void testResetAfterTCPCongestedWrite() throws Exception + public void testResetAfterTCPCongestedWriteThenResolveCongestionToUnblockWrite() throws Exception { - AtomicReference flusherRef = new AtomicReference<>(); - CountDownLatch flusherLatch = new CountDownLatch(1); + AtomicReference serverEndPointRef = new AtomicReference<>(); CountDownLatch writeLatch1 = new CountDownLatch(1); CountDownLatch writeLatch2 = new CountDownLatch(1); start(new Handler.Abstract() @@ -828,8 +825,7 @@ public void testResetAfterTCPCongestedWrite() throws Exception @Override public boolean handle(Request request, Response response, Callback callback) { - flusherRef.set(((AbstractEndPoint)request.getConnectionMetaData().getConnection().getEndPoint()).getWriteFlusher()); - flusherLatch.countDown(); + serverEndPointRef.set((SelectableChannelEndPoint)request.getConnectionMetaData().getConnection().getEndPoint()); try { @@ -880,15 +876,103 @@ public boolean handle(Request request, Response response, Callback callback) accumulator.writeTo(Content.Sink.from(socket), false); // Wait until the server is TCP congested. - assertTrue(flusherLatch.await(5, TimeUnit.SECONDS)); - WriteFlusher flusher = flusherRef.get(); - waitUntilTCPCongested(flusher); + waitUntilTCPCongested(serverEndPointRef::get); accumulator.clear(); generator.control(accumulator, new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code)); accumulator.writeTo(Content.Sink.from(socket), false); accumulator.release(); + // Resolve TCP congestion to allow the server to send its reset frame initiated by + // the cancellation of the pending write. + socket.configureBlocking(false); + ByteBuffer buffer = ByteBuffer.allocate(8192); + while (true) + { + int read = socket.read(buffer); + if (read < 1) + break; + buffer.clear(); + } + + assertTrue(writeLatch1.await(5, TimeUnit.SECONDS)); + assertTrue(writeLatch2.await(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testResetAfterTCPCongestedWriteThenWaitForIdleTimeoutToUnblockWrite() throws Exception + { + int serverIdleTimeout = 1000; + AtomicReference serverEndPointRef = new AtomicReference<>(); + CountDownLatch writeLatch1 = new CountDownLatch(1); + CountDownLatch writeLatch2 = new CountDownLatch(1); + start(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + serverEndPointRef.set((SelectableChannelEndPoint)request.getConnectionMetaData().getConnection().getEndPoint()); + + try + { + // Large write, it blocks due to TCP congestion. + byte[] data = new byte[128 * 1024 * 1024]; + Content.Sink.write(response, false, ByteBuffer.wrap(data)); + } + catch (IOException x) + { + writeLatch1.countDown(); + try + { + // Try to write again, must fail immediately. + Content.Sink.write(response, true, ByteBuffer.wrap(new byte[]{1})); + } + catch (IOException e) + { + writeLatch2.countDown(); + } + } + return true; + } + }); + connector.setIdleTimeout(serverIdleTimeout); + + ByteBufferPool bufferPool = http2Client.getByteBufferPool(); + try (SocketChannel socket = SocketChannel.open()) + { + String host = "localhost"; + int port = connector.getLocalPort(); + socket.connect(new InetSocketAddress(host, port)); + + Generator generator = new Generator(bufferPool); + RetainableByteBuffer.Mutable accumulator = new RetainableByteBuffer.DynamicCapacity(); + generator.control(accumulator, new PrefaceFrame()); + Map clientSettings = new HashMap<>(); + // Max stream HTTP/2 flow control window. + clientSettings.put(SettingsFrame.INITIAL_WINDOW_SIZE, Integer.MAX_VALUE); + generator.control(accumulator, new SettingsFrame(clientSettings, false)); + // Max session HTTP/2 flow control window. + generator.control(accumulator, new WindowUpdateFrame(0, Integer.MAX_VALUE - FlowControlStrategy.DEFAULT_WINDOW_SIZE)); + + HttpURI uri = HttpURI.from("http", host, port, "/"); + MetaData.Request request = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_2, HttpFields.EMPTY); + int streamId = 3; + HeadersFrame headersFrame = new HeadersFrame(streamId, request, null, true); + generator.control(accumulator, headersFrame); + + accumulator.writeTo(Content.Sink.from(socket), false); + + // Wait until the server is TCP congested. + waitUntilTCPCongested(serverEndPointRef::get); + + accumulator.clear(); + generator.control(accumulator, new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code)); + accumulator.writeTo(Content.Sink.from(socket), false); + accumulator.release(); + + // Idle timeout should unblock the server's write. + assertTrue(writeLatch1.await(5, TimeUnit.SECONDS)); assertTrue(writeLatch2.await(5, TimeUnit.SECONDS)); } @@ -897,7 +981,7 @@ public boolean handle(Request request, Response response, Callback callback) @Test public void testResetSecondRequestAfterTCPCongestedWriteBeforeWrite() throws Exception { - Exchanger exchanger = new Exchanger<>(); + Exchanger exchanger = new Exchanger<>(); CountDownLatch requestLatch1 = new CountDownLatch(1); CountDownLatch requestLatch2 = new CountDownLatch(1); CountDownLatch writeLatch1 = new CountDownLatch(1); @@ -918,7 +1002,7 @@ else if (target.equals("/2")) private void service1(Request request, Response response, Callback callback) throws Exception { - exchanger.exchange(((AbstractEndPoint)request.getConnectionMetaData().getConnection().getEndPoint()).getWriteFlusher()); + exchanger.exchange((SelectableChannelEndPoint)request.getConnectionMetaData().getConnection().getEndPoint()); // Large write, it blocks due to TCP congestion. response.write(true, ByteBuffer.wrap(new byte[128 * 1024 * 1024]), callback); } @@ -968,7 +1052,8 @@ private void service2(Response response, Callback callback) throws Exception accumulator.writeTo(Content.Sink.from(socket), false); - waitUntilTCPCongested(exchanger.exchange(null)); + SelectableChannelEndPoint endPoint = exchanger.exchange(null); + waitUntilTCPCongested(() -> endPoint); // Send a second request. uri = HttpURI.from("http", host, port, "/2"); @@ -1119,16 +1204,16 @@ public void onReset(Stream stream, ResetFrame frame, Callback callback) await().atMost(5, TimeUnit.SECONDS).until(() -> ((HTTP2Session)stream.getSession()).updateSendWindow(0), greaterThan(0)); } - private void waitUntilTCPCongested(WriteFlusher flusher) throws TimeoutException, InterruptedException + private void waitUntilTCPCongested(Supplier selectableChannelEndPointRef) { - long start = NanoTime.now(); - while (!flusher.isPending()) + await().atMost(5, TimeUnit.SECONDS).until(() -> { - if (NanoTime.secondsSince(start) > 15) - throw new TimeoutException(); - Thread.sleep(100); - } - // Wait for the selector to update the SelectionKey to OP_WRITE. - Thread.sleep(1000); + SelectableChannelEndPoint endPoint = selectableChannelEndPointRef.get(); + if (endPoint == null) + return false; + boolean pending = endPoint.getWriteFlusher().isPending(); + boolean writeInterested = endPoint.isWriteInterested(); + return pending && writeInterested; + }); } } diff --git a/jetty-core/jetty-http3/jetty-http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpStreamOverHTTP3.java b/jetty-core/jetty-http3/jetty-http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpStreamOverHTTP3.java index 33e9828772d8..8783d7088a60 100644 --- a/jetty-core/jetty-http3/jetty-http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpStreamOverHTTP3.java +++ b/jetty-core/jetty-http3/jetty-http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpStreamOverHTTP3.java @@ -263,6 +263,13 @@ public void send(MetaData.Request request, MetaData.Response response, boolean l sendContent(request, content, last, callback); } + @Override + public Runnable cancelSend(Throwable cause, Callback appCallback) + { + // TODO Implement after #12742 is merged + return () -> appCallback.failed(new UnsupportedOperationException("Implement after #12742 is merged")); + } + private void sendHeaders(MetaData.Request request, MetaData.Response response, ByteBuffer content, boolean lastContent, Callback callback) { this.responseMetaData = response; diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java index eccdca817878..626c46a8dd5e 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractEndPoint.java @@ -349,6 +349,12 @@ public void write(Callback callback, ByteBuffer... buffers) throws WritePendingE _writeFlusher.write(callback, buffers); } + @Override + public Callback cancelWrite(Throwable cause) + { + return _writeFlusher.cancelWrite(cause); + } + protected abstract void onIncompleteFlush(); protected abstract void needsFillInterest() throws IOException; diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java index e72e49908a6c..55bc65fece89 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java @@ -262,30 +262,6 @@ private void recordNoBucketAcquire(int size, boolean direct) } } - @Override - public boolean releaseAndRemove(RetainableByteBuffer buffer) - { - RetainableByteBuffer actual = buffer; - while (actual instanceof RetainableByteBuffer.Wrapper wrapper) - actual = wrapper.getWrapped(); - - if (actual instanceof ReservedBuffer reservedBuffer) - { - // remove the actual reserved buffer, but release the wrapped buffer - reservedBuffer.remove(); - return buffer.release(); - } - - if (actual instanceof PooledBuffer poolBuffer) - { - // remove the actual pool buffer, but release the wrapped buffer - poolBuffer.remove(); - return buffer.release(); - } - - return ByteBufferPool.super.releaseAndRemove(buffer); - } - private void reserve(RetainedBucket bucket, ByteBuffer byteBuffer) { _reserved.addAndGet(-byteBuffer.capacity()); diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java index 72df4e946428..727162e73353 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java @@ -59,18 +59,6 @@ public interface ByteBufferPool */ RetainableByteBuffer.Mutable acquire(int size, boolean direct); - /** - * {@link RetainableByteBuffer#release() Release} the buffer in a way that will remove it from any pool that it may be in. - * If the buffer is not in a pool, calling this method is equivalent to calling {@link RetainableByteBuffer#release()}. - * Calling this method satisfies any contract that requires a call to {@link RetainableByteBuffer#release()}. - * @return {@code true} if a call to {@link RetainableByteBuffer#release()} would have returned {@code true}. - * @see RetainableByteBuffer#releaseAndRemove() - */ - default boolean releaseAndRemove(RetainableByteBuffer buffer) - { - return buffer != null && buffer.release(); - } - /** *

Removes all {@link RetainableByteBuffer#isRetained() non-retained} * pooled instances from this pool.

@@ -94,12 +82,6 @@ public ByteBufferPool getWrapped() return wrapped; } - @Override - public boolean releaseAndRemove(RetainableByteBuffer buffer) - { - return getWrapped().releaseAndRemove(buffer); - } - @Override public RetainableByteBuffer.Mutable acquire(int size, boolean direct) { diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java index 758802a05ba1..e582b79f1e7d 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java @@ -333,6 +333,17 @@ default void write(boolean last, ByteBuffer byteBuffer, Callback callback) } } + /** + * Cancel any current {@link #write(Callback, ByteBuffer...)} operation + * in progress. Calling this method with cause future calls to {@link #write(Callback, ByteBuffer...)} + * and its variants, to fail the passed {@link Callback}. + * + * @param cause the cause + * @return The callback passed to a pending/in progress {@link #write(Callback, ByteBuffer...) write} + * or {@code null} if there was none. + */ + Callback cancelWrite(Throwable cause); + /** * @return the {@link Connection} associated with this EndPoint * @see #setConnection(Connection) diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java index acf69973f256..b21a6cd5a430 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java @@ -164,19 +164,6 @@ default Mutable asMutable() throws ReadOnlyBufferException throw new ReadOnlyBufferException(); } - /** - * {@link #release() Releases} the buffer in a way that ensures it will not be recycled in a buffer pool. - * This method should be used in cases where it is unclear if operations on the buffer have completed - * (for example, when a write operation has been aborted asynchronously or timed out, but the write - * operation may still be pending). - * @return whether if the buffer was released. - * @see ByteBufferPool#releaseAndRemove(RetainableByteBuffer) - */ - default boolean releaseAndRemove() - { - return release(); - } - /** * Appends and consumes the contents of this buffer to the passed buffer, limited by the capacity of the target buffer. * @param buffer The buffer to append bytes to, whose limit will be updated. @@ -671,12 +658,6 @@ public RetainableByteBuffer getWrapped() return (RetainableByteBuffer)super.getWrapped(); } - @Override - public boolean releaseAndRemove() - { - return getWrapped().releaseAndRemove(); - } - @Override public boolean isRetained() { @@ -1321,12 +1302,6 @@ protected Pooled(ByteBufferPool pool, ByteBuffer byteBuffer, Retainable retainab _pool = pool; } - @Override - public boolean releaseAndRemove() - { - return _pool.releaseAndRemove(this); - } - @Override public RetainableByteBuffer slice(long length) { @@ -1985,22 +1960,6 @@ public boolean release() return false; } - @Override - public boolean releaseAndRemove() - { - if (LOG.isDebugEnabled()) - LOG.debug("release {}", this); - if (super.release()) - { - for (RetainableByteBuffer buffer : _buffers) - buffer.releaseAndRemove(); - _buffers.clear(); - _aggregate = null; - return true; - } - return false; - } - @Override public void clear() { diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableChannelEndPoint.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableChannelEndPoint.java index 467fa70a1221..1eb0a151d40d 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableChannelEndPoint.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableChannelEndPoint.java @@ -21,6 +21,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.Invocable; @@ -174,16 +175,37 @@ public void onClose(Throwable cause) @Override protected void needsFillInterest() { - changeInterests(SelectionKey.OP_READ); + addInterests(SelectionKey.OP_READ); } @Override protected void onIncompleteFlush() { - changeInterests(SelectionKey.OP_WRITE); + addInterests(SelectionKey.OP_WRITE); } - private void changeInterests(int operation) + @Override + public Callback cancelWrite(Throwable cause) + { + Callback callback = super.cancelWrite(cause); + // This is somewhat racy, but any onWritable notification that happens after cancellation but + // before removal of interest will be ignored. This just ensure that interest will not be left hanging. + if (callback != null) + removeInterests(SelectionKey.OP_WRITE); + return callback; + } + + private void addInterests(int operation) + { + updateInterests(operation, true); + } + + private void removeInterests(int operation) + { + updateInterests(operation, false); + } + + private void updateInterests(int operation, boolean add) { // This method runs from any thread, possibly // concurrently with updateKey() and onSelected(). @@ -195,13 +217,13 @@ private void changeInterests(int operation) { pending = _updatePending; oldInterestOps = _desiredInterestOps; - newInterestOps = oldInterestOps | operation; + newInterestOps = add ? oldInterestOps | operation : oldInterestOps & ~operation; if (newInterestOps != oldInterestOps) _desiredInterestOps = newInterestOps; } if (LOG.isDebugEnabled()) - LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this); + LOG.debug("updateInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this); if (!pending && _selector != null) _selector.submit(_updateKeyAction); @@ -288,6 +310,11 @@ public void updateKey() } } + public boolean isWriteInterested() + { + return (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE; + } + @Override public void replaceKey(SelectionKey newKey) { diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index 1478a86ddad0..fad279531e9e 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -46,35 +46,43 @@ public abstract class WriteFlusher private static final ByteBuffer[] EMPTY_BUFFERS = new ByteBuffer[]{BufferUtil.EMPTY_BUFFER}; private static final EnumMap> __stateTransitions = new EnumMap<>(StateType.class); private static final State __IDLE = new IdleState(); - private static final State __WRITING = new WritingState(); + private static final State __FLUSHING = new FlushingState(); private static final State __COMPLETING = new CompletingState(); + private static final State __CANCEL = new State(StateType.CANCEL); private final EndPoint _endPoint; private final AtomicReference _state = new AtomicReference<>(); static { - // fill the state machine - __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.WRITING)); - __stateTransitions.put(StateType.WRITING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED)); + // A write operation may either complete immediately: + // IDLE-->FLUSHING-->IDLE + // Or it may not completely flush and go via the state + // IDLE-->FLUSHING-->PENDING-->COMPLETING-->IDLE + // Or it may take several cycles to complete + // IDLE-->FLUSHING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE + // + // If a failure happens while in IDLE, the state goes to FAILED even if there is no operation to tell of the failure. + // IDLE--(fail)-->FAILED + // + // If a cancel happens then: + // PENDING -> FAILED + // COMPLETING/FLUSHING -> CANCEL + // CANCEL -> CANCELLING + // CANCELLING -> FAILED + // + // From any other state than IDLE a failure will result in an FAILED state which is a terminal state, and + // the callback is failed with the Throwable which caused the failure. + // IDLE-->FLUSHING--(fail)-->FAILED + + __stateTransitions.put(StateType.IDLE, EnumSet.of(StateType.FLUSHING, StateType.FAILED)); + __stateTransitions.put(StateType.FLUSHING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.CANCEL, StateType.FAILED)); __stateTransitions.put(StateType.PENDING, EnumSet.of(StateType.COMPLETING, StateType.IDLE, StateType.FAILED)); - __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.FAILED)); + __stateTransitions.put(StateType.COMPLETING, EnumSet.of(StateType.IDLE, StateType.PENDING, StateType.CANCEL, StateType.FAILED)); + __stateTransitions.put(StateType.CANCEL, EnumSet.of(StateType.CANCELLING)); + __stateTransitions.put(StateType.CANCELLING, EnumSet.of(StateType.FAILED)); __stateTransitions.put(StateType.FAILED, EnumSet.noneOf(StateType.class)); } - // A write operation may either complete immediately: - // IDLE-->WRITING-->IDLE - // Or it may not completely flush and go via the PENDING state - // IDLE-->WRITING-->PENDING-->COMPLETING-->IDLE - // Or it may take several cycles to complete - // IDLE-->WRITING-->PENDING-->COMPLETING-->PENDING-->COMPLETING-->IDLE - // - // If a failure happens while in IDLE, it is a noop since there is no operation to tell of the failure. - // IDLE--(fail)-->IDLE - // - // From any other state than IDLE a failure will result in an FAILED state which is a terminal state, and - // the callback is failed with the Throwable which caused the failure. - // IDLE-->WRITING--(fail)-->FAILED - protected WriteFlusher(EndPoint endPoint) { _state.set(__IDLE); @@ -83,10 +91,28 @@ protected WriteFlusher(EndPoint endPoint) private enum StateType { + /** No write is in progress */ IDLE, - WRITING, + + /** A flush is currently being attempted to progress the write */ + FLUSHING, + + /** The write was not able to be completed by a previous flush and {@link #onIncompleteFlush()} is waiting + * for {@link #completeWrite()} to be called */ PENDING, + + /** The {@link #completeWrite()} method has been called and the write will be progressed */ COMPLETING, + + /** The {@link WriteFlusher#cancelWrite(Throwable)} method was called whilst in {@link StateType#FLUSHING} or {@link StateType#COMPLETING}, + * so that when those operations complete, the next state will be {@link StateType#CANCELLING}*/ + CANCEL, + + /** A flush operation has completed and seen the {@link StateType#CANCEL} state. Entering this state indicates that + * the thread calling {@link WriteFlusher#cancelWrite(Throwable)} can continue to progress to the {@link StateType#FAILED} state. */ + CANCELLING, + + /** The write failed due to a failure from flushing, or cancellation is done. */ FAILED } @@ -101,7 +127,7 @@ private enum StateType private boolean updateState(State previous, State next) { if (!isTransitionAllowed(previous, next)) - throw new IllegalStateException(); + throw new IllegalArgumentException("Bad transition %s -> %s".formatted(previous, next)); boolean updated = _state.compareAndSet(previous, next); if (DEBUG) @@ -158,11 +184,11 @@ private IdleState() /** * In WritingState WriteFlusher is currently writing. */ - private static class WritingState extends State + private static class FlushingState extends State { - private WritingState() + private FlushingState() { - super(StateType.WRITING); + super(StateType.FLUSHING); } } @@ -222,6 +248,17 @@ InvocationType getCallbackInvocationType() } } + private class CancellingState extends State + { + private final Callback _callback; + + private CancellingState(Callback callback) + { + super(StateType.CANCELLING); + _callback = callback; + } + } + public InvocationType getCallbackInvocationType() { State s = _state.get(); @@ -237,7 +274,7 @@ public InvocationType getCallbackInvocationType() protected abstract void onIncompleteFlush(); /** - * Tries to switch state to WRITING. If successful it writes the given buffers to the EndPoint. If state transition + * Tries to switch state to FLUSHING. If successful it writes the given buffers to the EndPoint. If state transition * fails it will fail the callback and leave the WriteFlusher in a terminal FAILED state. * * If not all buffers can be written in one go it creates a new {@code PendingState} object to preserve the state @@ -267,7 +304,7 @@ public void write(Callback callback, SocketAddress address, ByteBuffer... buffer if (DEBUG) LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers)); - if (!updateState(__IDLE, __WRITING)) + if (!updateState(__IDLE, __FLUSHING)) throw new WritePendingException(); try @@ -279,7 +316,7 @@ public void write(Callback callback, SocketAddress address, ByteBuffer... buffer if (DEBUG) LOG.debug("flush incomplete {}", this); PendingState pending = new PendingState(callback, address, buffers); - if (updateState(__WRITING, pending)) + if (updateState(__FLUSHING, pending)) onIncompleteFlush(); else fail(callback); @@ -287,7 +324,7 @@ public void write(Callback callback, SocketAddress address, ByteBuffer... buffer return; } - if (updateState(__WRITING, __IDLE)) + if (updateState(__FLUSHING, __IDLE)) callback.succeeded(); else fail(callback); @@ -296,7 +333,7 @@ public void write(Callback callback, SocketAddress address, ByteBuffer... buffer { if (DEBUG) LOG.debug("write exception", e); - if (updateState(__WRITING, new FailedState(e))) + if (updateState(__FLUSHING, new FailedState(e))) callback.failed(e); else fail(callback, e); @@ -313,6 +350,14 @@ private void fail(Callback callback, Throwable... suppressed) switch (state.getType()) { + case CANCEL: + { + CancellingState cancellingState = new CancellingState(callback); + if (_state.compareAndSet(state, cancellingState)) + return; // Let the cancel method return the callback + break; + } + case FAILED: { FailedState failed = (FailedState)state; @@ -321,6 +366,7 @@ private void fail(Callback callback, Throwable... suppressed) } case IDLE: + case CANCELLING: for (Throwable t : suppressed) { LOG.warn("Failed Write Cause", t); @@ -349,7 +395,7 @@ private void fail(Callback callback, Throwable... suppressed) /** * Complete a write that has not completed and that called {@link #onIncompleteFlush()} to request a call to this * method when a call to {@link EndPoint#flush(ByteBuffer...)} is likely to be able to progress. - * + *

* It tries to switch from PENDING to COMPLETING. If state transition fails, then it does nothing as the callback * should have been already failed. That's because the only way to switch from PENDING outside this method is * {@link #onFail(Throwable)} or {@link #onClose()} @@ -478,6 +524,8 @@ public boolean onFail(Throwable cause) switch (current.getType()) { case IDLE: + case CANCEL: + case CANCELLING: case FAILED: if (DEBUG) { @@ -498,7 +546,7 @@ public boolean onFail(Throwable cause) } break; - case WRITING: + case FLUSHING: case COMPLETING: if (DEBUG) LOG.debug("failed: {}", this, cause); @@ -512,6 +560,58 @@ public boolean onFail(Throwable cause) } } + /** + * Abort any write the flusher may have in progress or pending, then prevent any further write. + * + * @param cause the cause + * @return the callback of the write in progress or pending, null if the flusher was idle + */ + public Callback cancelWrite(Throwable cause) + { + // Keep trying to handle the failure until we get to IDLE or FAILED state + while (true) + { + State current = _state.get(); + switch (current.getType()) + { + case IDLE: + if (updateState(current, new FailedState(cause))) + return null; + break; + + case FAILED: + return null; + + case PENDING: + PendingState pending = (PendingState)current; + if (updateState(current, new FailedState(cause))) + return pending._callback; + break; + + case COMPLETING: + case FLUSHING: + updateState(current, __CANCEL); + break; + + case CANCEL: + // A concurrent thread is racing to move from COMPLETING state and it will + // soon discover the CANCEL state and instead move to CANCELLING. + // This thread can stay in this method until that other thread leaves CANCEL. + Thread.onSpinWait(); + break; + + case CANCELLING: + CancellingState cancelling = (CancellingState)current; + if (updateState(current, new FailedState(cause))) + return cancelling._callback; + break; + + default: + throw new IllegalStateException(); + } + } + } + public void onClose() { switch (_state.get().getType()) @@ -525,7 +625,7 @@ public void onClose() } } - boolean isFailed() + public boolean isFailed() { return isState(StateType.FAILED); } @@ -547,21 +647,16 @@ private boolean isState(StateType type) public String toStateString() { - switch (_state.get().getType()) + return switch (_state.get().getType()) { - case WRITING: - return "W"; - case PENDING: - return "P"; - case COMPLETING: - return "C"; - case IDLE: - return "-"; - case FAILED: - return "F"; - default: - return "?"; - } + case IDLE -> "-"; + case FLUSHING -> "F"; + case PENDING -> "P"; + case COMPLETING -> "C"; + case CANCEL -> "l"; + case CANCELLING -> "L"; + case FAILED -> "F"; + }; } @Override diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java index 7b899edf01d8..fdba39beb553 100644 --- a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java +++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java @@ -569,43 +569,4 @@ public void testReleaseExcessMemory() assertThat(compoundPool.getPrimaryPool().size(), is(ConcurrentPool.OPTIMAL_MAX_SIZE)); assertThat(compoundPool.getSecondaryPool().size(), is(0)); } - - @Test - public void testReleaseAndRemove() - { - ArrayByteBufferPool pool = new ArrayByteBufferPool(); - - RetainableByteBuffer reserved0 = pool.acquire(1024, false); - RetainableByteBuffer reserved1 = pool.acquire(1024, false); - - RetainableByteBuffer acquired0 = pool.acquire(1024, false); - acquired0.release(); - acquired0 = pool.acquire(1024, false); - RetainableByteBuffer acquired1 = pool.acquire(1024, false); - acquired1.release(); - acquired1 = pool.acquire(1024, false); - - RetainableByteBuffer retained0 = pool.acquire(1024, false); - retained0.release(); - retained0 = pool.acquire(1024, false); - retained0.retain(); - RetainableByteBuffer retained1 = pool.acquire(1024, false); - retained1.release(); - retained1 = pool.acquire(1024, false); - retained1.retain(); - - assertTrue(reserved1.releaseAndRemove()); - assertTrue(acquired1.releaseAndRemove()); - assertFalse(retained1.releaseAndRemove()); - assertTrue(retained1.release()); - - assertThat(pool.getHeapByteBufferCount(), is(2L)); - assertTrue(reserved0.release()); - assertThat(pool.getHeapByteBufferCount(), is(3L)); - assertTrue(acquired0.release()); - assertThat(pool.getHeapByteBufferCount(), is(3L)); - assertFalse(retained0.release()); - assertTrue(retained0.release()); - assertThat(pool.getHeapByteBufferCount(), is(3L)); - } } diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java index 1f9596f64b8a..592a9ceb7150 100644 --- a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java +++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/WriteFlusherTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.logging.StacklessLogging; import org.eclipse.jetty.util.BufferUtil; @@ -38,13 +39,57 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class WriteFlusherTest { + @Test + public void testCancelWriteBeforeWrite() + { + ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 16, false); + WriteFlusher flusher = endPoint.getWriteFlusher(); + + assertThat(flusher.isFailed(), is(false)); + Callback callback = flusher.cancelWrite(new ArithmeticException()); + assertNull(callback); + assertThat(flusher.isFailed(), is(true)); + + AtomicReference failureRef = new AtomicReference<>(); + endPoint.write(Callback.from(() -> failureRef.set(new AssertionError("expected callback to be failed")), failureRef::set), ByteBuffer.allocate(32)); + assertThat(failureRef.get(), instanceOf(ArithmeticException.class)); + } + + @Test + public void testCancelWriteDuringPendingWrite() + { + ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 16, false); + WriteFlusher flusher = endPoint.getWriteFlusher(); + + ByteBuffer buffer = ByteBuffer.allocate(32); + AtomicReference failureRef = new AtomicReference<>(); + Callback writeCallback = Callback.from(() -> failureRef.set(new AssertionError("expected callback to be failed")), failureRef::set); + endPoint.write(writeCallback, buffer); + assertThat(failureRef.get(), nullValue()); + assertThat(buffer.remaining(), is(16)); + assertThat(flusher.isPending(), is(true)); + + Callback cancelCallback = flusher.cancelWrite(new ArithmeticException()); + assertThat(failureRef.get(), nullValue()); + assertThat(flusher.isFailed(), is(true)); + cancelCallback.failed(new IllegalCallerException()); + assertThat(failureRef.get(), instanceOf(IllegalCallerException.class)); + + failureRef.set(null); + endPoint.write(writeCallback, buffer); + assertThat(failureRef.get(), instanceOf(ArithmeticException.class)); + } + @Test public void testCompleteNoBlocking() throws Exception { diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/HttpStream.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/HttpStream.java index c6b488107a78..ed0287c6786b 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/HttpStream.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/HttpStream.java @@ -82,6 +82,18 @@ public interface HttpStream extends Callback */ void send(MetaData.Request request, MetaData.Response response, boolean last, ByteBuffer content, Callback callback); + /** + * Cancel any {@link #send(MetaData.Request, MetaData.Response, boolean, ByteBuffer, Callback)} call in progress. + * + * @param cause The cause of the cancellation + * @param appCallback The callback to ultimately {@link Callback#failed(Throwable) fail} after the cancellation + * @return A {@link Runnable} that will be {@link Runnable#run() run} to complete the + * cancellation and will, in turn, ultimately fail both the passed {@link Callback} and any {@link Callback} + * that was passed to the {@link #send(MetaData.Request, MetaData.Response, boolean, ByteBuffer, Callback)} + * method. + */ + Runnable cancelSend(Throwable cause, Callback appCallback); + /** *

Pushes the given {@code resource} to the client.

* @@ -185,6 +197,12 @@ public void send(MetaData.Request request, MetaData.Response response, boolean l getWrapped().send(request, response, last, content, callback); } + @Override + public Runnable cancelSend(Throwable cause, Callback appCallback) + { + return getWrapped().cancelSend(cause, appCallback); + } + @Override public void push(MetaData.Request resource) { diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java index cd2dec0b0d8b..cf981985ead8 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/ProxyConnectionFactory.java @@ -803,6 +803,12 @@ public ProxyEndPoint(EndPoint endPoint, SocketAddress local, SocketAddress remot _sslSessionData = sslSessionData; } + @Override + public Callback cancelWrite(Throwable cause) + { + return _endPoint.cancelWrite(cause); + } + @Override public SslSessionData getSslSessionData() { diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ErrorHandler.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ErrorHandler.java index f6a59f7f1200..9521429349d5 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ErrorHandler.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ErrorHandler.java @@ -271,7 +271,7 @@ else if (charsets.contains(StandardCharsets.ISO_8859_1)) catch (Throwable x) { if (buffer != null) - buffer.releaseAndRemove(); + buffer.release(); throw x; } } @@ -677,9 +677,9 @@ public void failed(Throwable x) { Callback callback = _callback.getAndSet(null); if (callback == null) - _buffer.releaseAndRemove(); + _buffer.release(); else - ExceptionUtil.callAndThen(x, t -> _buffer.releaseAndRemove(), callback::failed); + ExceptionUtil.callAndThen(x, t -> _buffer.release(), callback::failed); } } } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index a2ff06eb82b2..ef2a4f3b817f 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -1175,11 +1175,14 @@ private Runnable lockedFailWrite(Throwable x) { assert _request._lock.isHeldByCurrentThread(); Callback writeCallback = _writeCallback; - _writeCallback = null; if (writeCallback == null) return null; + _writeCallback = null; + + Runnable cancellation = _request.getHttpStream().cancelSend(x, writeCallback); + _writeFailure = ExceptionUtil.combine(_writeFailure, x); - return () -> HttpChannelState.failed(writeCallback, x); + return cancellation; } public long getContentBytesWritten() diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index 28c4c617f361..001e965e2eab 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeoutException; @@ -65,6 +66,7 @@ import org.eclipse.jetty.server.TunnelSupport; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.ExceptionUtil; import org.eclipse.jetty.util.HostPort; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.StringUtil; @@ -734,6 +736,59 @@ private SendCallback() super(true); } + /** + * Cancel any send in progress by aborting this {@link IteratingCallback} and take any send {@link Callback}. + * @param cause the cause of the cancellation + * @return A {@link Callback} passed to + * {@link #reset(MetaData.Request, MetaData.Response, ByteBuffer, boolean, Callback)} if it has not yet + * been invoked, else {@code null} + */ + public Callback cancel(Throwable cause) + { + // wrap the cause in a CSE so that onAborted knows it is cancelling and can provide the reset callback. + CancelSendException cancelSendException = new CancelSendException(cause); + + // Try to abort the IteratingCallback and if unable to, then return NOOP. + if (!abort(cancelSendException)) + return Callback.NOOP; + + // We now know that we aborted this ICB with the CSE above, so onAbort will eventually be called + // in a serialized context and the callback will be set on the CSE. + // Whilst waiting for that to happen... + + // If a write operation has been scheduled cancel it and take its callback + Callback senderCallback = getEndPoint().cancelWrite(cause); + + if (senderCallback == null) + // There was no write in operation, so we must complete the CSE ourselves + cancelSendException.complete(); + else + // The write was cancelled and we have the callback (probably to this ICB or another callback + // wrapping this ICB). So failing the taken callback will call onCompleted and allow onAborted to be called. + senderCallback.failed(cause); + + // wait for the cancellation to be complete and the callback to be set by onAbort. + // This should never block indefinitely, as onAborted only waits for active states like PROCESSING to complete. + return cancelSendException.join(); + } + + @Override + protected void onAborted(Throwable cause) + { + // If the cause is a CSE, then take the callback and give it to the CSE to be called once cancellation is complete. + if (cause instanceof CancelSendException cancelSend) + cancelSend.setCallback(takeCallbackAndReset()); + } + + @Override + protected void onCompleted(Throwable causeOrNull) + { + // If the cause is a CSE, then signal to it that the ICB is complete and any join call can return. + if (causeOrNull instanceof CancelSendException cancelSendException) + cancelSendException.complete(); + super.onCompleted(causeOrNull); + } + @Override public InvocationType getInvocationType() { @@ -909,17 +964,19 @@ public Action process() throws Exception } } - private Callback resetCallback() + private Callback takeCallbackAndReset() { - Callback complete = _callback; + Callback callback = _callback; _callback = null; _info = null; _content = null; - return complete; + return callback; } private void release() { + if (_callback != null) + throw new IllegalStateException("callback not invoked"); releaseHeader(); releaseChunk(); } @@ -948,16 +1005,18 @@ protected void onCompleteSuccess() // cannot be delayed by any further server handling before the stream callback is completed. getEndPoint().shutdownOutput(); } - Callback callback = resetCallback(); + Callback callback = takeCallbackAndReset(); release(); - callback.succeeded(); + if (callback != null) + callback.succeeded(); } @Override public void onFailure(final Throwable x) { - Callback callback = resetCallback(); - callback.failed(x); + Callback callback = takeCallbackAndReset(); + if (callback != null) + callback.failed(x); } @Override @@ -971,6 +1030,44 @@ public String toString() { return String.format("%s[i=%s,cb=%s]", super.toString(), _info, _callback); } + + private static class CancelSendException extends IOException + { + private final CountDownLatch _complete = new CountDownLatch(2); + private Callback _callback; + + public CancelSendException(Throwable cause) + { + super(cause); + } + + public void complete() + { + _complete.countDown(); + } + + public Callback join() + { + try + { + _complete.await(); + } + catch (InterruptedException x) + { + Throwable cause = getCause(); + ExceptionUtil.addSuppressedIfNotAssociated(cause, x); + throw new RuntimeIOException(cause); + } + + return _callback; + } + + public void setCallback(Callback callback) + { + _callback = callback; + _complete.countDown(); + } + } } protected class RequestHandler implements HttpParser.RequestHandler @@ -1470,6 +1567,16 @@ else if (_generator.isCommitted()) _sendCallback.iterate(); } + @Override + public Runnable cancelSend(Throwable cause, Callback appCallback) + { + // We know that the SendCallback#cancel call will never block on external events, + // so we can just return a Runnable that fails the combination of the cancellation of any + // send in progress with the passed in appCallback. At worst, we may be deferred whilst another thread finishes + // processing a send/write before it notices the cancel. It never blocks on IO itself + return () -> Callback.combine(_sendCallback.cancel(cause), appCallback).failed(cause); + } + @Override public long getIdleTimeout() { diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/CancelWriteTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/CancelWriteTest.java new file mode 100644 index 000000000000..8dcd4eb4f278 --- /dev/null +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/CancelWriteTest.java @@ -0,0 +1,314 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.server; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.channels.WritePendingException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.http.HttpTester; +import org.eclipse.jetty.io.ArrayByteBufferPool; +import org.eclipse.jetty.io.ManagedSelector; +import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.io.SocketChannelEndPoint; +import org.eclipse.jetty.server.internal.HttpConnection; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.component.LifeCycle; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class CancelWriteTest +{ + private Server server; + private ArrayByteBufferPool.Tracking bufferPool; + + @BeforeEach + public void setUp() + { + bufferPool = new ArrayByteBufferPool.Tracking(); + server = new Server(null, null, bufferPool); + } + + @AfterEach + public void tearDown() + { + try + { + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertThat("Server leaks: " + bufferPool.dumpLeaks(), bufferPool.getLeaks().size(), is(0))); + } + finally + { + LifeCycle.stop(server); + } + } + + @Test + public void testCancelCongestedWrite() throws Exception + { + long idleTimeout = 1000; + CountDownLatch serverWriteFailureLatch = new CountDownLatch(1); + + ServerConnector connector = new ServerConnector(server, 1, 1); + connector.setIdleTimeout(idleTimeout); + server.addConnector(connector); + server.setHandler(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) throws Exception + { + SocketChannelEndPoint serverEndPoint = (SocketChannelEndPoint)request.getConnectionMetaData().getConnection().getEndPoint(); + + // Large write, it blocks due to TCP congestion. + RetainableByteBuffer.Mutable buffer = server.getByteBufferPool().acquire(128 * 1024 * 1024, true); + ByteBuffer byteBuffer = buffer.getByteBuffer(); + byteBuffer.clear(); + response.write(true, byteBuffer, Callback.from(callback::succeeded, x -> + { + // Check that the WriteFlusher won't access the + // buffer anymore, so that it can be released. + if (serverEndPoint.getWriteFlusher().isFailed()) + serverWriteFailureLatch.countDown(); + + // Release the buffer. + buffer.release(); + + // Complete the Handler callback. + callback.failed(x); + })); + return true; + } + }); + server.start(); + + try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort()))) + { + HttpTester.Request request = new HttpTester.Request(); + request.setMethod("GET"); + request.setHeader("Host", "localhost"); + request.setURI("/"); + ByteBuffer buffer = request.generate(); + + client.write(buffer); + + assertTrue(serverWriteFailureLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + + // Verify that the server eventually closes the connection. + long totalRead = 0L; + client.socket().setSoTimeout(1000); + ByteBuffer byteBuffer = ByteBuffer.allocate(8192); + while (true) + { + byteBuffer.clear(); + int read = client.read(byteBuffer); + if (read < 0) + break; + totalRead += read; + } + assertThat(totalRead, greaterThan(0L)); + } + } + + @Test + public void testCancelBeforeWrite() throws Exception + { + CountDownLatch serverWriteFailureLatch = new CountDownLatch(1); + CountDownLatch serverEndPointWriteFailureLatch = new CountDownLatch(1); + ServerConnector connector = new ServerConnector(server, 1, 1) + { + @Override + protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException + { + SocketChannelEndPoint endpoint = new SocketChannelEndPoint(channel, selectSet, key, getScheduler()) + { + @Override + public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException + { + HttpConnection connection = (HttpConnection)getConnection(); + Runnable runnable = connection.getHttpChannel().onFailure(new ArithmeticException()); + Thread thread = new Thread(runnable); + thread.start(); + + // Wait until the thread running the failure runnable cancelled the write on the endpoint. + await().atMost(5, TimeUnit.SECONDS).until(() -> getWriteFlusher().isFailed()); + + super.write(Callback.from(callback::succeeded, x -> + { + if (serverWriteFailureLatch.getCount() == 1L) + serverEndPointWriteFailureLatch.countDown(); + + // Complete the send callback from HttpConnection. + callback.failed(x); + }), buffers); + } + }; + endpoint.setIdleTimeout(getIdleTimeout()); + return endpoint; + } + }; + server.addConnector(connector); + server.setHandler(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) throws Exception + { + RetainableByteBuffer.Mutable buffer = server.getByteBufferPool().acquire(1024, true); + ByteBuffer byteBuffer = buffer.getByteBuffer(); + byteBuffer.clear(); + response.write(true, byteBuffer, Callback.from(callback::succeeded, x -> + { + if (serverEndPointWriteFailureLatch.getCount() == 0L) + serverWriteFailureLatch.countDown(); + + // Release the buffer. + buffer.release(); + + // Complete the Handler callback. + callback.failed(x); + })); + return true; + } + }); + server.start(); + + try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort()))) + { + HttpTester.Request request = new HttpTester.Request(); + request.setMethod("GET"); + request.setHeader("Host", "localhost"); + request.setURI("/"); + ByteBuffer buffer = request.generate(); + + client.write(buffer); + + assertTrue(serverWriteFailureLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverEndPointWriteFailureLatch.await(5, TimeUnit.SECONDS)); + + // Verify that the server eventually closes the connection after writing 0 byte. + long totalRead = 0L; + client.socket().setSoTimeout(1000); + ByteBuffer byteBuffer = ByteBuffer.allocate(8192); + while (true) + { + byteBuffer.clear(); + int read = client.read(byteBuffer); + if (read < 0) + break; + totalRead += read; + } + assertThat(totalRead, is(0L)); + } + } + + @Test + public void testCancelAfterWrite() throws Exception + { + CountDownLatch serverEndPointWriteSuccessLatch = new CountDownLatch(1); + CountDownLatch serverWriteFailureLatch = new CountDownLatch(1); + CountDownLatch serverWriteSuccessLatch = new CountDownLatch(1); + ServerConnector connector = new ServerConnector(server, 1, 1) + { + @Override + protected SocketChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) + { + SocketChannelEndPoint endpoint = new SocketChannelEndPoint(channel, selectSet, key, getScheduler()) + { + @Override + public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException + { + super.write(Callback.from(() -> + { + if (serverWriteFailureLatch.getCount() == 1L) + serverEndPointWriteSuccessLatch.countDown(); + + HttpConnection connection = (HttpConnection)getConnection(); + Runnable runnable = connection.getHttpChannel().onFailure(new ArithmeticException()); + Thread thread = new Thread(runnable); + thread.start(); + + // Wait until the thread running the failure runnable cancelled the write on the endpoint. + await().atMost(5, TimeUnit.SECONDS).until(() -> getWriteFlusher().isFailed()); + + callback.succeeded(); + }, callback::failed), buffers); + } + }; + endpoint.setIdleTimeout(getIdleTimeout()); + return endpoint; + } + }; + server.addConnector(connector); + server.setHandler(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) + { + RetainableByteBuffer.Mutable buffer = server.getByteBufferPool().acquire(1024, true); + ByteBuffer byteBuffer = buffer.getByteBuffer(); + byteBuffer.clear(); + response.write(true, byteBuffer, Callback.from(() -> + { + serverWriteSuccessLatch.countDown(); + + // Release the buffer. + buffer.release(); + + // Complete the Handler callback. + callback.succeeded(); + }, x -> + { + if (serverEndPointWriteSuccessLatch.getCount() == 0L) + serverWriteFailureLatch.countDown(); + + // Release the buffer. + buffer.release(); + + // Complete the Handler callback. + callback.failed(x); + })); + return true; + } + }); + server.start(); + + try (SocketChannel client = SocketChannel.open(new InetSocketAddress("localhost", connector.getLocalPort()))) + { + HttpTester.Request request = new HttpTester.Request(); + request.setMethod("GET"); + request.setHeader("Host", "localhost"); + request.setURI("/"); + ByteBuffer buffer = request.generate(); + + client.write(buffer); + + assertTrue(serverEndPointWriteSuccessLatch.await(5, TimeUnit.SECONDS)); + assertFalse(serverWriteSuccessLatch.await(1, TimeUnit.SECONDS)); + + HttpTester.Response response = HttpTester.parseResponse(HttpTester.from(client)); + assertThat(response.getStatus(), is(200)); + } + } +} diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpStreamTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpStreamTest.java index 8048be766d8c..62d2e9a5c4bd 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpStreamTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpStreamTest.java @@ -137,6 +137,12 @@ public void send(MetaData.Request request, MetaData.Response response, boolean l throw new UnsupportedOperationException(); } + @Override + public Runnable cancelSend(Throwable cause, Callback appCallback) + { + throw new UnsupportedOperationException(); + } + @Override public long getIdleTimeout() { diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/MockHttpStream.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/MockHttpStream.java index c5dff059d395..199f6e49e534 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/MockHttpStream.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/MockHttpStream.java @@ -204,6 +204,12 @@ public void send(MetaData.Request request, MetaData.Response response, boolean l callback.succeeded(); } + @Override + public Runnable cancelSend(Throwable cause, Callback appCallback) + { + throw new UnsupportedOperationException(); + } + @Override public long getIdleTimeout() { diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java index f28a22f6f2dd..18c7661ed3d3 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java @@ -15,6 +15,8 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicStampedReference; import java.util.function.Consumer; import org.eclipse.jetty.util.thread.Invocable; @@ -389,6 +391,158 @@ static Callback from(Callback callback1, Callback callback2) return combine(callback1, callback2); } + /** + * A combination of multiple Callbacks, that must all be completed before a specific callback is completed. + * For example:
{@code
+     *   void sendToAll(String message, Collection channels, Callback callback)
+     *   {
+     *       try (Callback.Combination combination = new Callback.Combination(callback))
+     *       {
+     *           for (Channel channel : channels)
+     *               channel.send(message, combination.newCallback());
+     *       }
+     *   }
+     * }
+ */ + class Combination implements AutoCloseable + { + private final Callback andThen; + private final AtomicStampedReference state; + + /** + * Create a new empty combined callback. + * @param andThen The {@code Callback} to complete once all {@code Callbacks} in the combination are complete + * and this combination is {@link #close() closed}. + */ + public Combination(Callback andThen) + { + this(andThen, null); + } + + /** + * Create a new empty combined callback with a forced failure and invocation type. + * + * @param andThen The {@code Callback} to complete once all {@code Callbacks} in the combination are complete + * and this combination is {@link #close() closed}. + * @param failure If not {@code null}, force a failure, so that the {@link Callback#failed(Throwable)} method + * will always be called on the {@code Callback} passed to the constructor, once all the + * combined callbacks are completed and the {@code Combination} is {@link #close() closed}. + */ + public Combination(Callback andThen, Throwable failure) + { + this.andThen = Objects.requireNonNull(andThen); + // initial stamp is -1 to indicate not closed and requiring 1 completion from close. + this.state = new AtomicStampedReference<>(failure, -1); + } + + /** + * Create a new {@code Callback} as part of this combination. + * + * @return A {@code Callback} that must be completed before the callback passed to + * the constructor is completed. + * @throws IllegalStateException if the combination has already been completed. + */ + public Callback newCallback() + { + int[] h = new int[1]; + while (true) + { + Throwable failure = state.get(h); + int s = h[0]; + + if (s >= 0) + throw new IllegalStateException("closed"); + // we can only create new callbacks when not closed, so we decrement to make a more negative stamp. + if (!state.compareAndSet(failure, failure, s, s - 1)) + continue; + + return new Callback() + { + private final AtomicBoolean completed = new AtomicBoolean(false); + + @Override + public void failed(Throwable x) + { + if (completed.compareAndSet(false, true)) + complete(x); + } + + @Override + public void succeeded() + { + if (completed.compareAndSet(false, true)) + complete(null); + } + + @Override + public InvocationType getInvocationType() + { + return andThen.getInvocationType(); + } + }; + } + } + + /** + * Called to indicate that no more calls to {@link #newCallback()} will happen. + * If the combination is already complete, then it will be completed in the scope of this call. + * + * @throws IllegalStateException if this method has already been called. + */ + @Override + public void close() throws IllegalStateException + { + int[] h = new int[1]; + while (true) + { + Throwable failure = state.get(h); + int s = h[0]; + + if (s >= 0) + throw new IllegalStateException("closed"); + + // make the stamp positive to indicate that the combination is closed + if (!state.compareAndSet(failure, failure, s, -s)) + continue; + complete(null); + return; + } + } + + private void complete(Throwable failed) + { + Throwable failure; + int[] h = new int[1]; + while (true) + { + failure = state.get(h); + int s = h[0]; + + // combine failures + Throwable combined = failure; + if (combined == null) + combined = failed; + else if (failed != null) + ExceptionUtil.addSuppressedIfNotAssociated(failure, failed); + + // If the stamp is < 0, the combination has not been closed, so we increment, else we decrement. + int n = s < 0 ? s + 1 : s - 1; + if (!state.compareAndSet(failure, combined, s, n)) + continue; + + if (n == 0) + { + if (failure == null) + andThen.succeeded(); + else + andThen.failed(failure); + } + + return; + } + } + } + /** *

A Callback implementation that calls the {@link #completed()} method when it either succeeds or fails.

*/ diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/FutureCallback.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/FutureCallback.java index ec8bc9859e3a..2f088b721b7d 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/FutureCallback.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/FutureCallback.java @@ -108,6 +108,11 @@ public boolean isDone() return _done.get() && _latch.getCount() == 0; } + public boolean isFailed() + { + return isDone() && _cause != COMPLETED; + } + @Override public Void get() throws InterruptedException, ExecutionException { diff --git a/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/CallbackTest.java b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/CallbackTest.java new file mode 100644 index 000000000000..93f74aee2e07 --- /dev/null +++ b/jetty-core/jetty-util/src/test/java/org/eclipse/jetty/util/CallbackTest.java @@ -0,0 +1,290 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.util; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.jetty.util.thread.Invocable; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class CallbackTest +{ + // TODO Better coverage of Callback + + @Test + void testNoOpCallback() + { + Callback callback = Callback.NOOP; + assertEquals(Invocable.InvocationType.NON_BLOCKING, callback.getInvocationType()); + assertEquals("Callback.NOOP", callback.toString()); + + assertDoesNotThrow(callback::succeeded); + assertDoesNotThrow(() -> callback.failed(new Exception("Test"))); + } + + @Test + void testCompleteWithSuccess() + { + CompletableFuture future = new CompletableFuture<>(); + FutureCallback callback = new FutureCallback(); + + callback.completeWith(future); + future.complete(null); + + assertTrue(callback.isDone()); + assertFalse(callback.isFailed()); + } + + @Test + void testCompleteWithFailure() + { + CompletableFuture future = new CompletableFuture<>(); + FutureCallback callback = new FutureCallback(); + + callback.completeWith(future); + Exception failure = new Exception("Failure"); + future.completeExceptionally(failure); + + assertTrue(callback.isDone()); + assertTrue(callback.isFailed()); + } + + @Test + void testCallbackFromCompletableFuture() + { + CompletableFuture future = new CompletableFuture<>(); + Callback callback = Callback.from(future); + + callback.succeeded(); + assertTrue(future.isDone()); + assertFalse(future.isCompletedExceptionally()); + } + + @Test + void testCallbackFromCompletableFutureFailure() + { + CompletableFuture future = new CompletableFuture<>(); + Callback callback = Callback.from(future); + + Exception failure = new Exception("Failure"); + callback.failed(failure); + assertTrue(future.isCompletedExceptionally()); + } + + @Test + void testCallbackFromSuccessAndFailureHandlers() + { + AtomicBoolean successCalled = new AtomicBoolean(false); + AtomicBoolean failureCalled = new AtomicBoolean(false); + + Callback callback = Callback.from( + () -> successCalled.set(true), + throwable -> failureCalled.set(true) + ); + + callback.succeeded(); + assertTrue(successCalled.get()); + assertFalse(failureCalled.get()); + + successCalled.set(false); + callback = Callback.from( + () -> successCalled.set(true), + throwable -> failureCalled.set(true) + ); + callback.failed(new Exception("Test")); + assertFalse(successCalled.get()); + assertTrue(failureCalled.get()); + } + + @Test + void testNestedCallback() + { + AtomicBoolean innerCalled = new AtomicBoolean(false); + AtomicBoolean outerCalled = new AtomicBoolean(false); + + Callback inner = Callback.from(() -> innerCalled.set(true)); + Callback nested = Callback.from(inner, () -> outerCalled.set(true)); + + nested.succeeded(); + assertTrue(innerCalled.get()); + assertTrue(outerCalled.get()); + } + + @Test + void testCombinedCallback() + { + AtomicBoolean firstCalled = new AtomicBoolean(false); + AtomicBoolean secondCalled = new AtomicBoolean(false); + + Callback first = Callback.from(() -> firstCalled.set(true)); + Callback second = Callback.from(() -> secondCalled.set(true)); + + Callback combined = Callback.from(first, second); + combined.succeeded(); + + assertTrue(firstCalled.get()); + assertTrue(secondCalled.get()); + } + + @Test + void testCallbackCollectionAllSucceededBefore() + { + FutureCallback mainCallback = new FutureCallback(); + try (Callback.Combination combination = new Callback.Combination(mainCallback)) + { + for (int i = 0; i < 10; i++) + combination.newCallback().succeeded(); + } + assertTrue(mainCallback.isDone()); + assertFalse(mainCallback.isFailed()); + } + + @Test + void testCallbackCollectionOneFailBefore() throws Exception + { + FutureCallback mainCallback = new FutureCallback(); + Exception failure = new Exception("Test"); + try (Callback.Combination combination = new Callback.Combination(mainCallback)) + { + for (int i = 0; i < 5; i++) + combination.newCallback().succeeded(); + combination.newCallback().failed(failure); + for (int i = 6; i < 10; i++) + combination.newCallback().succeeded(); + } + + assertTrue(mainCallback.isDone()); + assertTrue(mainCallback.isFailed()); + try + { + mainCallback.get(); + fail(); + } + catch (ExecutionException e) + { + assertThat(e.getCause(), Matchers.sameInstance(failure)); + } + } + + @Test + void testCallbackCollectionAllSucceededAfter() + { + FutureCallback mainCallback = new FutureCallback(); + Callback[] callbacks = new Callback[10]; + try (Callback.Combination combination = new Callback.Combination(mainCallback)) + { + for (int i = 0; i < callbacks.length; i++) + callbacks[i] = combination.newCallback(); + } + assertFalse(mainCallback.isDone()); + + for (Callback callback : callbacks) + callback.succeeded(); + assertTrue(mainCallback.isDone()); + assertFalse(mainCallback.isFailed()); + } + + @Test + void testCallbackCollectionOneFailAfter() throws InterruptedException + { + FutureCallback mainCallback = new FutureCallback(); + Callback[] callbacks = new Callback[10]; + try (Callback.Combination combination = new Callback.Combination(mainCallback)) + { + for (int i = 0; i < callbacks.length; i++) + callbacks[i] = combination.newCallback(); + } + assertFalse(mainCallback.isDone()); + + for (int i = 0; i < 5; i++) + callbacks[i].succeeded(); + Exception failure = new Exception("Test"); + callbacks[5].failed(failure); + for (int i = 6; i < 10; i++) + callbacks[i].succeeded(); + + assertTrue(mainCallback.isDone()); + assertTrue(mainCallback.isFailed()); + try + { + mainCallback.get(); + fail(); + } + catch (ExecutionException e) + { + assertThat(e.getCause(), Matchers.sameInstance(failure)); + } + } + + @Test + void testCallbackCollectionForceFail() throws InterruptedException + { + FutureCallback mainCallback = new FutureCallback(); + Exception failure = new Exception("Test"); + try (Callback.Combination combination = new Callback.Combination(mainCallback, failure)) + { + for (int i = 0; i < 10; i++) + combination.newCallback().succeeded(); + } + assertTrue(mainCallback.isDone()); + assertTrue(mainCallback.isFailed()); + try + { + mainCallback.get(); + fail(); + } + catch (ExecutionException e) + { + assertThat(e.getCause(), Matchers.sameInstance(failure)); + } + } + + @Test + void testCallbackCollectionForceFailAssociated() throws InterruptedException + { + FutureCallback mainCallback = new FutureCallback(); + Exception failure = new Exception("Test"); + Exception associated = new Exception("Test"); + try (Callback.Combination combination = new Callback.Combination(mainCallback, failure)) + { + for (int i = 0; i < 5; i++) + combination.newCallback().succeeded(); + combination.newCallback().failed(associated); + for (int i = 6; i < 10; i++) + combination.newCallback().succeeded(); + } + assertTrue(mainCallback.isDone()); + assertTrue(mainCallback.isFailed()); + try + { + mainCallback.get(); + fail(); + } + catch (ExecutionException e) + { + assertThat(e.getCause(), Matchers.sameInstance(failure)); + assertTrue(ExceptionUtil.areAssociated(failure, associated)); + } + } +} diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/internal/MockEndpoint.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/internal/MockEndpoint.java index 617341f7d6f4..ea5c949f260f 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/internal/MockEndpoint.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/internal/MockEndpoint.java @@ -135,6 +135,12 @@ public void write(Callback callback, ByteBuffer... buffers) throws WritePendingE throw new UnsupportedOperationException(NOT_SUPPORTED); } + @Override + public Callback cancelWrite(Throwable cause) + { + throw new UnsupportedOperationException(NOT_SUPPORTED); + } + @Override public Connection getConnection() {