Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel write semantic #12727

Merged
merged 64 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
ffd68fc
first attempt
gregw Jan 22, 2025
ba00bcf
Do not remove pooled buffers on write failure
gregw Jan 22, 2025
04d420c
Do not remove pooled buffers on write failure
gregw Jan 22, 2025
fcef2e5
Merge remote-tracking branch 'origin/jetty-12.1.x' into fix/jetty-12.…
gregw Jan 22, 2025
75dfeb3
HTTP2 WIP
gregw Jan 24, 2025
0b16d61
HTTP2 WIP
gregw Jan 24, 2025
737e9e5
HTTP2 use reset to cancel
gregw Jan 27, 2025
05a0fec
add h3 support
lorban Jan 28, 2025
0becca7
updates from review
gregw Jan 31, 2025
f538286
updates from review
gregw Jan 31, 2025
cf82fcc
removed CountingCallback usage
gregw Feb 1, 2025
50cc8cb
test tentative
lorban Feb 4, 2025
613a40c
updates from review
gregw Feb 4, 2025
338754d
Merge branch 'jetty-12.1.x' into fix/jetty-12.1.x/cancelWrite
gregw Feb 12, 2025
f02a8dd
make HttpStream.cancelSend() non-default
lorban Feb 12, 2025
a7b2014
make HttpStream.cancelSend() return Runnable
lorban Feb 12, 2025
808470b
FCGI stream cancel implementation
lorban Feb 12, 2025
60a1fc1
review H1 cancellation + make EndPoint.cancelWrite go to the failed s…
lorban Feb 13, 2025
87206d9
defer H3 to after #12742
lorban Feb 13, 2025
63ed8e2
Fix H2
lorban Feb 13, 2025
a2b610b
minor H1 improvement
lorban Feb 13, 2025
6e47a47
add todos
lorban Feb 13, 2025
4912d8f
align FCGI with H1
lorban Feb 14, 2025
5e67a6d
replace state CANCELLED with state FAILED
lorban Feb 14, 2025
628639b
add H1 write cancellation test
lorban Feb 14, 2025
6e84081
add H1 write cancellation test
lorban Feb 14, 2025
ae8c796
Merge remote-tracking branch 'origin/jetty-12.1.x' into fix/jetty-12.…
lorban Feb 14, 2025
23034cf
revert test prototype
lorban Feb 14, 2025
25297c8
rollback blank line
lorban Feb 14, 2025
9c20685
change mock impl
lorban Feb 14, 2025
4a87adc
fix incorrect fall through from previous branch of the switch statement
lorban Feb 14, 2025
aa19b92
add WriteFlusher.cancelWrite() test and javadoc
lorban Feb 14, 2025
7ef84b6
fix checkstyle
lorban Feb 14, 2025
cc1e13d
add H2 test
lorban Feb 17, 2025
48c90f4
improve tests
lorban Feb 17, 2025
55afda1
improve tests
lorban Feb 17, 2025
282407c
improve tests
lorban Feb 17, 2025
60c100e
fix javadoc
lorban Feb 17, 2025
b4407d5
fix log
lorban Feb 20, 2025
81927fe
fix Http2Stream.reset() failing to send a reset frame when the remote…
lorban Feb 20, 2025
61a7118
improve javadoc
lorban Feb 21, 2025
4849197
improve javadoc
lorban Feb 21, 2025
219dec8
fix test
lorban Feb 21, 2025
cc5596b
fix H2 stream endpoint
lorban Feb 21, 2025
f68ed8b
remove all traces of releaseAndRemove
lorban Feb 21, 2025
b53966d
Merge remote-tracking branch 'origin/jetty-12.1.x' into fix/jetty-12.…
lorban Feb 21, 2025
3424438
Merge remote-tracking branch 'origin/jetty-12.1.x' into fix/jetty-12.…
lorban Feb 21, 2025
6f744df
Merge remote-tracking branch 'origin/jetty-12.1.x' into fix/jetty-12.…
lorban Feb 21, 2025
4066804
remove todos
lorban Feb 21, 2025
387c5b7
add new H2 session flushing facility
lorban Feb 21, 2025
0bfa7a6
add and fix test
lorban Feb 24, 2025
7693987
fix checkstyle
lorban Feb 24, 2025
e9490c5
fix flush
lorban Feb 24, 2025
b2c8863
Javadoc
gregw Feb 24, 2025
7a9559e
updates from review
gregw Feb 24, 2025
77f5594
updates from review
gregw Feb 24, 2025
37f0480
Simplified implementation of HTTP2StreamEndPoint.
sbordet Feb 25, 2025
f9c5051
Callback.collection
gregw Feb 25, 2025
07415bd
updates from review
gregw Feb 25, 2025
afce1e7
updates from review
gregw Feb 26, 2025
640e3d5
updates from review
gregw Feb 26, 2025
e578e1d
updates from review
gregw Feb 26, 2025
ad4c28a
Added tests for legacy URI compliance
gregw Feb 26, 2025
fdd44bc
updates from review
gregw Feb 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@

package org.eclipse.jetty.fcgi.generator;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;

import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
Expand All @@ -33,14 +37,39 @@ public class Flusher

private final AutoLock lock = new AutoLock();
private final Queue<Entry> queue = new ArrayDeque<>();
private final IteratingCallback flushCallback = new FlushCallback();
private final FlushCallback flushCallback = new FlushCallback();
private final EndPoint endPoint;

public Flusher(EndPoint endPoint)
{
this.endPoint = endPoint;
}

public Callback cancel(Throwable cause)
{
// Cancel the IteratingCallback and take the nest Callback
CancelSendException cancelSendException = new CancelSendException(cause);
if (!flushCallback.abort(cancelSendException))
return Callback.NOOP;

// We now know that we aborted this ICB with the CSE above, so onAbort will eventually be called
// in a serialized context and the callback will be set on the CSE.

// If a write operation has been scheduled cancel it and fail its callback, otherwise complete ourselves
Callback writeCallback = endPoint.cancelWrite(cause);

if (writeCallback == null)
// There was no write in operation, so we can complete the CSE ourselves
cancelSendException.complete();
else
// The write was cancelled and the callback is this ICB, so failing it will call onCompleted
writeCallback.failed(cause);

// wait for the cancellation to be complete and the callback to be set by onAbort.
// This should never block indefinitely, as onAborted only waits for active states like PROCESSING to complete.
return cancelSendException.join();
}

public void flush(ByteBufferPool.Accumulator accumulator, Callback callback)
{
offer(new Entry(accumulator, callback));
Expand Down Expand Up @@ -73,6 +102,44 @@ public void shutdown()
}));
}

private static class CancelSendException extends IOException
{
private final CountDownLatch _complete = new CountDownLatch(2);
private Callback _callback;

public CancelSendException(Throwable cause)
{
super(cause);
}

public void complete()
{
_complete.countDown();
}

public Callback join()
{
try
{
_complete.await();
}
catch (InterruptedException x)
{
Throwable cause = getCause();
ExceptionUtil.addSuppressedIfNotAssociated(cause, x);
throw new RuntimeIOException(cause);
}

return _callback;
}

public void setCallback(Callback callback)
{
_callback = callback;
_complete.countDown();
}
}

private class FlushCallback extends IteratingCallback
{
private Entry active;
Expand All @@ -94,6 +161,28 @@ protected Action process() throws Exception
return Action.SCHEDULED;
}

@Override
protected void onAborted(Throwable cause)
{
if (cause instanceof CancelSendException cancelSend)
cancelSend.setCallback(resetCallback());
}

private Callback resetCallback()
{
Flusher.Entry entry = active;
active = null;
return Callback.from(entry.callback, entry::release);
}

@Override
protected void onCompleted(Throwable causeOrNull)
{
if (causeOrNull instanceof CancelSendException cancelSendException)
cancelSendException.complete();
super.onCompleted(causeOrNull);
}

@Override
protected void onCompleteSuccess()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ public void send(MetaData.Request request, MetaData.Response response, boolean l
}
}

@Override
public Runnable cancelSend(Throwable cause, Callback appCallback)
{
return () -> Callback.combine(_connection.getFlusher().cancel(cause), appCallback).failed(cause);
}

private void commit(MetaData.Response info, boolean head, boolean last, ByteBuffer content, Callback callback)
{
if (LOG.isDebugEnabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,25 @@ void reset(HTTP2Stream stream, ResetFrame frame, Callback callback)
}, callback), frame);
}

public void flush(Callback callback)
{
Entry entry = new Entry(new FlushFrame(), null, callback)
{
@Override
public int getFrameBytesGenerated()
{
return 0;
}

@Override
public boolean generate(RetainableByteBuffer.Mutable accumulator)
{
return true;
}
};
frame(entry, true);
}

/**
* <p>Invoked internally and by applications to send a GO_AWAY frame to the other peer.</p>
*
Expand Down Expand Up @@ -1336,6 +1355,8 @@ public boolean shouldBeDropped()
case WINDOW_UPDATE:
case PREFACE:
case DISCONNECT:
case FAILURE:
case FLUSH:
return false;
// Frames of this type follow the logic below.
case DATA:
Expand Down Expand Up @@ -2518,4 +2539,12 @@ protected boolean onExpired(HTTP2Stream stream)
return false;
}
}

private static class FlushFrame extends Frame
{
public FlushFrame()
{
super(FrameType.FLUSH);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void reset(ResetFrame frame, Callback callback)
Throwable resetFailure = null;
try (AutoLock ignored = lock.lock())
{
if (isReset())
if (localReset)
{
resetFailure = failure;
}
Expand All @@ -180,6 +180,8 @@ public void reset(ResetFrame frame, Callback callback)
session.dataConsumed(this, flowControlLength);
if (resetFailure != null)
{
if (LOG.isDebugEnabled())
LOG.debug("failing callback immediately as stream {} already is locally reset", this, resetFailure);
close();
session.removeStream(this);
callback.failed(resetFailure);
Expand Down
Loading