Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(net): improve overall multiplex performance for rlpx satellite stream #13861

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
81 changes: 72 additions & 9 deletions crates/net/eth-wire/src/multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,30 +581,93 @@ where
St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
Primary: Sink<T> + Unpin,
P2PStreamError: Into<<Primary as Sink<T>>::Error>,
io::Error: Into<<Primary as Sink<T>>::Error>,
{
type Error = <Primary as Sink<T>>::Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.get_mut();
if let Err(err) = ready!(this.inner.conn.poll_ready_unpin(cx)) {
return Poll::Ready(Err(err.into()))
}
if let Err(err) = ready!(this.primary.st.poll_ready_unpin(cx)) {
return Poll::Ready(Err(err))

// First try to drain any buffered satellite messages
loop {
match this.inner.conn.poll_ready_unpin(cx) {
hoank101 marked this conversation as resolved.
Show resolved Hide resolved
Poll::Ready(Ok(())) => {
if let Some(msg) = this.inner.out_buffer.pop_front() {
if let Err(err) = this.inner.conn.start_send_unpin(msg) {
return Poll::Ready(Err(err.into()))
}
continue;
}
break;
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Pending => return Poll::Pending,
}
}
Poll::Ready(Ok(()))

// Only check primary sink readiness after buffer is drained
this.primary.st.poll_ready_unpin(cx)
}

/// Attempts to send a message. Returns an error if there are still buffered satellite messages
/// that need to be sent first.
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.get_mut().primary.st.start_send_unpin(item)
let this = self.get_mut();

// Ensure all buffered satellite messages are sent before accepting new messages
if !this.inner.out_buffer.is_empty() {
return Err(io::Error::new(
io::ErrorKind::WouldBlock,
"satellite message buffer not empty",
)
.into());
}
hoank101 marked this conversation as resolved.
Show resolved Hide resolved

this.primary.st.start_send_unpin(item)
}

/// Attempts to flush all pending messages by:
/// 1. Sending any buffered satellite messages
/// 2. Flushing the underlying connection
/// 3. Flushing the primary stream
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.get_mut().inner.conn.poll_flush_unpin(cx).map_err(Into::into)
let this = self.get_mut();

// First try to send all buffered satellite messages
loop {
match this.inner.conn.poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => {
if let Some(msg) = this.inner.out_buffer.pop_front() {
if let Err(err) = this.inner.conn.start_send_unpin(msg) {
return Poll::Ready(Err(err.into()))
}
continue;
}
break;
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Pending => return Poll::Pending,
}
}

// Then flush both the connection and primary stream
ready!(this.inner.conn.poll_flush_unpin(cx)).map_err(Into::into)?;
this.primary.st.poll_flush_unpin(cx)
}

/// Closes the stream by:
/// 1. Flushing all pending messages
/// 2. Closing the underlying connection
/// 3. Closing the primary stream
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.get_mut().inner.conn.poll_close_unpin(cx).map_err(Into::into)
let this = self.get_mut();

// First flush any remaining messages
ready!(Pin::new(&mut *this).poll_flush(cx))?;

// Then close both the connection and primary stream
ready!(this.inner.conn.poll_close_unpin(cx)).map_err(Into::into)?;
this.primary.st.poll_close_unpin(cx)
hoank101 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Loading