Skip to content

Commit

Permalink
Minor updates to peek
Browse files Browse the repository at this point in the history
- Ensure both peek + poll_peek have docs, and those docs match the tokio
ones.

- Add a trace point for peek
  • Loading branch information
marcbowes committed Jan 28, 2025
1 parent 0aa4600 commit aa15f2f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
9 changes: 8 additions & 1 deletion src/net/tcp/split_owned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ impl OwnedReadHalf {
reunite(self, other)
}

/// Attempt to receive data on the socket, without removing that data from the queue, registering the current task for wakeup if data is not yet available.
/// Attempts to receive data on the socket, without removing that data from
/// the queue, registering the current task for wakeup if data is not yet
/// available.
pub fn poll_peek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -46,6 +48,11 @@ impl OwnedReadHalf {
Pin::new(&mut self.inner).poll_peek(cx, buf)
}

/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
///
/// Successive calls return the same data.
pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.peek(buf).await
}
Expand Down
36 changes: 23 additions & 13 deletions src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,18 @@ impl TcpStream {
Ok(())
}

/// Receives data on the socket from the remote address to which it is connected,
/// without removing that data from the queue. On success, returns the number of bytes peeked.
/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
///
/// Successive calls return the same data.
pub async fn peek(&mut self, buf: &mut [u8]) -> Result<usize> {
self.read_half.peek(buf).await
}

/// Attempts to receive data on the socket, without removing that data from
/// the queue, registering the current task for wakeup if data is not yet
/// available.
pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut ReadBuf) -> Poll<Result<usize>> {
self.read_half.poll_peek(cx, buf)
}
Expand Down Expand Up @@ -262,19 +268,23 @@ impl ReadHalf {
}

match ready!(self.rx.recv.poll_recv(cx)) {
Some(seg) => match seg {
SequencedSegment::Data(bytes) => {
let len = std::cmp::min(bytes.len(), buf.remaining());
buf.put_slice(&bytes[..len]);
self.rx.buffer = Some(bytes);
Some(seg) => {
tracing::trace!(target: TRACING_TARGET, src = ?self.pair.remote, dst = ?self.pair.local, protocol = %seg, "Peek");

Poll::Ready(Ok(len))
}
SequencedSegment::Fin => {
self.is_closed = true;
Poll::Ready(Ok(0))
match seg {
SequencedSegment::Data(bytes) => {
let len = std::cmp::min(bytes.len(), buf.remaining());
buf.put_slice(&bytes[..len]);
self.rx.buffer = Some(bytes);

Poll::Ready(Ok(len))
}
SequencedSegment::Fin => {
self.is_closed = true;
Poll::Ready(Ok(0))
}
}
},
}
None => Poll::Ready(Err(io::Error::new(
io::ErrorKind::ConnectionReset,
"Connection reset",
Expand Down

0 comments on commit aa15f2f

Please sign in to comment.