From ca764c3538f4d62cf41f08d1a8b03d64ad3034a2 Mon Sep 17 00:00:00 2001 From: Brandon Pike Date: Fri, 12 Jul 2024 11:40:12 -0700 Subject: [PATCH 1/5] Add try_write to TcpStream Signed-off-by: Brandon Pike --- src/net/tcp/stream.rs | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index bf7eb5b..321061f 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -93,6 +93,21 @@ impl TcpStream { Ok(TcpStream::new(pair, rx)) } + /// Try to write a buffer to the stream, returning how many bytes were + /// written. + /// + /// The function will attempt to write the entire contents of `buf`, but + /// only part of the buffer may be written. + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream has been shut down + /// `Err(io::ErrorKind::BrokenPipe)` is returned. + pub fn try_write(&self, buf: &[u8]) -> Result { + self.write_half.try_write(buf) + } + /// Returns the local address that this stream is bound to. pub fn local_addr(&self) -> Result { Ok(self.read_half.pair.local) @@ -214,19 +229,16 @@ pub(crate) struct WriteHalf { } impl WriteHalf { - fn poll_write_priv(&self, _cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + fn try_write(&self, buf: &[u8]) -> Result { if buf.remaining() == 0 { - return Poll::Ready(Ok(0)); + return Ok(0); } if self.is_shutdown { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::BrokenPipe, - "Broken pipe", - ))); + return Err(io::Error::new(io::ErrorKind::BrokenPipe, "Broken pipe")); } - let res = World::current(|world| { + World::current(|world| { let bytes = Bytes::copy_from_slice(buf); let len = bytes.len(); @@ -234,9 +246,11 @@ impl WriteHalf { self.send(world, Segment::Data(seq, bytes))?; Ok(len) - }); + }) + } - Poll::Ready(res) + fn poll_write_priv(&self, _cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + Poll::Ready(self.try_write(buf)) } fn poll_shutdown_priv(&mut self) -> Poll> { From ca29965bd27d3239207ccf9174d482b40c2b03ee Mon Sep 17 00:00:00 2001 From: Brandon Pike Date: Fri, 12 Jul 2024 13:31:37 -0700 Subject: [PATCH 2/5] Add try_write test & allow test deadcode Signed-off-by: Brandon Pike --- tests/async_send_sync.rs | 3 +++ tests/tcp.rs | 24 ++++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/tests/async_send_sync.rs b/tests/async_send_sync.rs index d3a11e1..c075a18 100644 --- a/tests/async_send_sync.rs +++ b/tests/async_send_sync.rs @@ -11,18 +11,21 @@ fn require_unpin(_t: &T) {} #[allow(dead_code)] struct Invalid; +#[allow(dead_code)] trait AmbiguousIfSend { fn some_item(&self) {} } impl AmbiguousIfSend<()> for T {} impl AmbiguousIfSend for T {} +#[allow(dead_code)] trait AmbiguousIfSync { fn some_item(&self) {} } impl AmbiguousIfSync<()> for T {} impl AmbiguousIfSync for T {} +#[allow(dead_code)] trait AmbiguousIfUnpin { fn some_item(&self) {} } diff --git a/tests/tcp.rs b/tests/tcp.rs index d96cc8c..42f8c5b 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -1162,3 +1162,27 @@ fn exhaust_ephemeral_ports() { _ = sim.run() } + +#[test] +fn try_write() -> Result { + let mut sim = Builder::new().build(); + sim.client("client", async move { + let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 1234)).await?; + + tokio::spawn(async move { + let (socket, _) = listener.accept().await.unwrap(); + + let written = socket.try_write(b"hello!").unwrap(); + assert_eq!(written, 6); + }); + + let mut socket = TcpStream::connect((Ipv4Addr::LOCALHOST, 1234)).await?; + let mut buf: [u8; 6] = [0; 6]; + socket.read_exact(&mut buf).await?; + assert_eq!(&buf, b"hello!"); + + Ok(()) + }); + + sim.run() +} From 03bbf191290272c272016b9b35123c41b4f82b81 Mon Sep 17 00:00:00 2001 From: Brandon Pike Date: Fri, 12 Jul 2024 13:54:20 -0700 Subject: [PATCH 3/5] Allow deadcode in tests::async_send_sync Signed-off-by: Brandon Pike --- tests/async_send_sync.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/tests/async_send_sync.rs b/tests/async_send_sync.rs index c075a18..c37259f 100644 --- a/tests/async_send_sync.rs +++ b/tests/async_send_sync.rs @@ -1,31 +1,25 @@ //! Copied over from: //! https://github.com/tokio-rs/tokio/blob/master/tokio/tests/async_send_sync.rs +#![allow(dead_code)] -#[allow(dead_code)] fn require_send(_t: &T) {} -#[allow(dead_code)] fn require_sync(_t: &T) {} -#[allow(dead_code)] fn require_unpin(_t: &T) {} -#[allow(dead_code)] struct Invalid; -#[allow(dead_code)] trait AmbiguousIfSend { fn some_item(&self) {} } impl AmbiguousIfSend<()> for T {} impl AmbiguousIfSend for T {} -#[allow(dead_code)] trait AmbiguousIfSync { fn some_item(&self) {} } impl AmbiguousIfSync<()> for T {} impl AmbiguousIfSync for T {} -#[allow(dead_code)] trait AmbiguousIfUnpin { fn some_item(&self) {} } From f9099b3e526fb8c68c81ac8b03675d733b1b85bc Mon Sep 17 00:00:00 2001 From: Brandon Pike Date: Fri, 12 Jul 2024 14:36:40 -0700 Subject: [PATCH 4/5] Add TcpStream::writeable() + docs Signed-off-by: Brandon Pike --- src/net/tcp/stream.rs | 91 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 2 deletions(-) diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 321061f..70c5ca6 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -99,11 +99,48 @@ impl TcpStream { /// The function will attempt to write the entire contents of `buf`, but /// only part of the buffer may be written. /// + /// This function is usually paired with `writable()`. + /// /// # Return /// /// If data is successfully written, `Ok(n)` is returned, where `n` is the - /// number of bytes written. If the stream has been shut down - /// `Err(io::ErrorKind::BrokenPipe)` is returned. + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use turmoil::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// loop { + /// // Wait for the socket to be writable + /// stream.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` pub fn try_write(&self, buf: &[u8]) -> Result { self.write_half.try_write(buf) } @@ -125,6 +162,56 @@ impl TcpStream { } } + /// Waits for the socket to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually + /// paired with `try_write()`. + /// + /// # Cancel safety + /// + /// This method is cancel safe. Once a readiness event occurs, the method + /// will continue to return immediately until the readiness event is + /// consumed by an attempt to write that fails with `WouldBlock` or + /// `Poll::Pending`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// loop { + /// // Wait for the socket to be writable + /// stream.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` + pub async fn writable(&self) -> Result<()> { + Ok(()) + } + /// Splits a `TcpStream` into a read half and a write half, which can be used /// to read and write the stream concurrently. /// From 707ac0be8363bafad1123556d77d495c53513080 Mon Sep 17 00:00:00 2001 From: Brett McChesney <39924297+mcches@users.noreply.github.com> Date: Mon, 15 Jul 2024 10:38:25 -0600 Subject: [PATCH 5/5] Update stream.rs docs --- src/net/tcp/stream.rs | 70 ------------------------------------------- 1 file changed, 70 deletions(-) diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 70c5ca6..67c4db8 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -106,41 +106,6 @@ impl TcpStream { /// If data is successfully written, `Ok(n)` is returned, where `n` is the /// number of bytes written. If the stream is not ready to write data, /// `Err(io::ErrorKind::WouldBlock)` is returned. - /// - /// # Examples - /// - /// ```no_run - /// use turmoil::net::TcpStream; - /// use std::error::Error; - /// use std::io; - /// - /// #[tokio::main] - /// async fn main() -> Result<(), Box> { - /// // Connect to a peer - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// loop { - /// // Wait for the socket to be writable - /// stream.writable().await?; - /// - /// // Try to write data, this may still fail with `WouldBlock` - /// // if the readiness event is a false positive. - /// match stream.try_write(b"hello world") { - /// Ok(n) => { - /// break; - /// } - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - /// continue; - /// } - /// Err(e) => { - /// return Err(e.into()); - /// } - /// } - /// } - /// - /// Ok(()) - /// } - /// ``` pub fn try_write(&self, buf: &[u8]) -> Result { self.write_half.try_write(buf) } @@ -173,41 +138,6 @@ impl TcpStream { /// will continue to return immediately until the readiness event is /// consumed by an attempt to write that fails with `WouldBlock` or /// `Poll::Pending`. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::net::TcpStream; - /// use std::error::Error; - /// use std::io; - /// - /// #[tokio::main] - /// async fn main() -> Result<(), Box> { - /// // Connect to a peer - /// let stream = TcpStream::connect("127.0.0.1:8080").await?; - /// - /// loop { - /// // Wait for the socket to be writable - /// stream.writable().await?; - /// - /// // Try to write data, this may still fail with `WouldBlock` - /// // if the readiness event is a false positive. - /// match stream.try_write(b"hello world") { - /// Ok(n) => { - /// break; - /// } - /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - /// continue; - /// } - /// Err(e) => { - /// return Err(e.into()); - /// } - /// } - /// } - /// - /// Ok(()) - /// } - /// ``` pub async fn writable(&self) -> Result<()> { Ok(()) }