From 795e0a503e805f95e078b79bdee25ab4b8171984 Mon Sep 17 00:00:00 2001 From: Mike <42689366+MikeIvanichev@users.noreply.github.com> Date: Sat, 12 Oct 2024 19:30:15 +0300 Subject: [PATCH 1/9] fix loom script --- bin/loom | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/bin/loom b/bin/loom index b5f5dc8..3ca3ac3 100755 --- a/bin/loom +++ b/bin/loom @@ -17,8 +17,8 @@ cd "$(dirname "$0")"/.. TESTCMD=(test) # if cargo nextest is installed, use it instead! -if cargo list | grep -q "nextest"; then - TESTCMD=(nextest run --cargo-profile loom) +if cargo --list | grep -q "nextest"; then + TESTCMD=(nextest run --cargo-profile loom) fi TESTCMD+=(--profile loom --lib) @@ -26,6 +26,7 @@ TESTCMD+=(--profile loom --lib) TESTCMD+=(${@}) RUSTFLAGS="--cfg loom ${RUSTFLAGS:-}" \ - LOOM_LOG="${LOOM_LOG:-info}" \ - LOOM_MAX_PREEMPTIONS="${LOOM_MAX_PREEMPTIONS:-2}" \ - cargo ${TESTCMD[@]} \ No newline at end of file + LOOM_LOG="${LOOM_LOG:-info}" \ + LOOM_MAX_PREEMPTIONS="${LOOM_MAX_PREEMPTIONS:-2}" \ + cargo ${TESTCMD[@]} + From d9c9c82bda65568d12bdd46b674a252f1fb517c2 Mon Sep 17 00:00:00 2001 From: Mike <42689366+MikeIvanichev@users.noreply.github.com> Date: Sun, 13 Oct 2024 01:14:32 +0300 Subject: [PATCH 2/9] ingore cfg lints ignore warn level lints about: - cfg(loom) - cfg(thingbuf_trace) --- Cargo.toml | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 426f967..dad6861 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,5 @@ [workspace] -members = [ - ".", - "bench" -] +members = [".", "bench"] [package] name = "thingbuf" @@ -31,13 +28,21 @@ pin-project = "1" parking_lot = { version = "0.12", optional = true, default-features = false } [dev-dependencies] -tokio = { version = "1.14.0", features = ["rt", "rt-multi-thread", "macros", "sync"] } +tokio = { version = "1.14.0", features = [ + "rt", + "rt-multi-thread", + "macros", + "sync", +] } # So that we can use `poll_fn` in tests. futures-util = { version = "0.3", default-features = false } [target.'cfg(loom)'.dev-dependencies] loom = { version = "0.5.6", features = ["checkpoint", "futures"] } -tracing-subscriber = { version = "0.3", default-features = false, features = ["std", "fmt"] } +tracing-subscriber = { version = "0.3", default-features = false, features = [ + "std", + "fmt", +] } tracing = { version = "0.1", default-features = false, features = ["std"] } # Custom profile for Loom tests: enable release optimizations so that the loom @@ -53,3 +58,9 @@ loom = { git = "https://github.com/tokio-rs/loom", rev = "a93bf2390e0fcfdb7c5899 [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] + +[lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = [ + 'cfg(loom)', + 'cfg(thingbuf_trace)', +] } From e4cdeea99375fd8a824d61f4c667ab57ffc7d1be Mon Sep 17 00:00:00 2001 From: Mike <42689366+MikeIvanichev@users.noreply.github.com> Date: Sun, 13 Oct 2024 01:17:00 +0300 Subject: [PATCH 3/9] impl ThreadWaker --- src/lib.rs | 1 + src/loom.rs | 4 +-- src/thread_waker.rs | 78 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 2 deletions(-) create mode 100644 src/thread_waker.rs diff --git a/src/lib.rs b/src/lib.rs index c6c0869..2f6f5cc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ mod macros; mod loom; pub mod mpsc; pub mod recycling; +mod thread_waker; mod util; mod wait; diff --git a/src/loom.rs b/src/loom.rs index 4c0d147..30d8184 100644 --- a/src/loom.rs +++ b/src/loom.rs @@ -8,7 +8,7 @@ mod inner { pub use std::sync::atomic::Ordering; } - pub(crate) use loom::{cell, future, hint, sync, thread}; + pub(crate) use loom::{cell, future, hint, sync, thread, thread_local}; use std::{cell::RefCell, fmt::Write}; pub(crate) mod model { @@ -214,7 +214,7 @@ mod inner { pub(crate) use core::sync::atomic; #[cfg(feature = "std")] - pub use std::thread; + pub use std::{thread, thread_local}; pub(crate) mod hint { #[inline(always)] diff --git a/src/thread_waker.rs b/src/thread_waker.rs new file mode 100644 index 0000000..32aafbe --- /dev/null +++ b/src/thread_waker.rs @@ -0,0 +1,78 @@ +feature! { + #![feature = "std"] + + use core::task::{RawWaker, RawWakerVTable, Waker}; + + use crate::loom::sync::Arc; + use crate::loom::thread::{self, Thread}; + use crate::loom::thread_local; + + thread_local!{ + static CURRENT: Waker= ThreadWaker::new().into_waker(); + } + + pub(crate) fn current() -> Waker { + CURRENT.with(|waker| waker.clone()) + } + + struct ThreadWaker(Arc); + + impl ThreadWaker { + fn new() -> Self { + Self(Arc::new(Inner::new(thread::current()))) + } + + fn into_waker(self) -> Waker { + unsafe { + let raw = thread_waker_to_raw_waker(self.0); + Waker::from_raw(raw) + } + } + } + + struct Inner(Thread); + + impl Inner { + fn new(thread: Thread) -> Self { + Self(thread) + } + } + + impl Inner { + #[allow(clippy::wrong_self_convention)] + fn into_raw(this: Arc) -> *const () { + Arc::into_raw(this) as *const () + } + + unsafe fn from_raw(ptr: *const ()) -> Arc { + Arc::from_raw(ptr as *const Inner) + } + } + + unsafe fn thread_waker_to_raw_waker(thread_waker: Arc) -> RawWaker { + RawWaker::new( + Inner::into_raw(thread_waker), + &RawWakerVTable::new(clone, wake, wake_by_ref, drop_waker), + ) + } + + unsafe fn clone(raw: *const ()) -> RawWaker { + Arc::increment_strong_count(raw as *const Inner); + thread_waker_to_raw_waker(Inner::from_raw(raw)) + } + + unsafe fn drop_waker(raw: *const ()) { + drop(Inner::from_raw(raw)); + } + + unsafe fn wake(raw: *const ()) { + let unparker = Inner::from_raw(raw); + unparker.0.unpark(); + } + + unsafe fn wake_by_ref(raw: *const ()) { + let raw = raw as *const Inner; + (*raw).0.unpark(); + } + +} From 2c757eda8ee49bd09a96e1cc33fb9ed220b713d4 Mon Sep 17 00:00:00 2001 From: Mike <42689366+MikeIvanichev@users.noreply.github.com> Date: Mon, 14 Oct 2024 04:55:25 +0300 Subject: [PATCH 4/9] add ci_skip_slow_models to ignored lints --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index dad6861..9d47dfa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,4 +63,5 @@ rustdoc-args = ["--cfg", "docsrs"] unexpected_cfgs = { level = "warn", check-cfg = [ 'cfg(loom)', 'cfg(thingbuf_trace)', + 'cfg(ci_skip_slow_models)', ] } From c4e90411e1a54fe6669a0254c3c76d893c5671e1 Mon Sep 17 00:00:00 2001 From: Mike <42689366+MikeIvanichev@users.noreply.github.com> Date: Mon, 14 Oct 2024 05:13:30 +0300 Subject: [PATCH 5/9] add feature gate to ThreadWaker mod --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 2f6f5cc..721e9b5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ mod macros; mod loom; pub mod mpsc; pub mod recycling; +#[cfg(feature = "std")] mod thread_waker; mod util; mod wait; From 89a64f1bfd6ca78ca43957af81a850be6828378c Mon Sep 17 00:00:00 2001 From: Mike <42689366+MikeIvanichev@users.noreply.github.com> Date: Tue, 22 Oct 2024 03:24:59 +0300 Subject: [PATCH 6/9] add time feat to tokio dev dep for timeout tests --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 9d47dfa..323ef49 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ tokio = { version = "1.14.0", features = [ "rt-multi-thread", "macros", "sync", + "time", ] } # So that we can use `poll_fn` in tests. futures-util = { version = "0.3", default-features = false } From 8de9f8ff3851f7f79c767800c78bb030a515ea16 Mon Sep 17 00:00:00 2001 From: Mike <42689366+MikeIvanichev@users.noreply.github.com> Date: Tue, 22 Oct 2024 03:25:39 +0300 Subject: [PATCH 7/9] impl bridge fns for sender --- src/mpsc/async_impl.rs | 422 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 422 insertions(+) diff --git a/src/mpsc/async_impl.rs b/src/mpsc/async_impl.rs index 9f410d2..72df48d 100644 --- a/src/mpsc/async_impl.rs +++ b/src/mpsc/async_impl.rs @@ -144,6 +144,126 @@ feature! { .await } + /// Reserves a slot in the channel to mutate in place, blocking until + /// there is a free slot to write to. + /// + /// This is similar to the [`blocking_send`] method, but, rather than taking a + /// message by value to write to the channel, this method reserves a + /// writable slot in the channel, and returns a [`SendRef`] that allows + /// mutating the slot in place. If the [`Receiver`] end of the channel + /// uses the [`Receiver::recv_ref`], [`Receiver::blocking_recv_ref`] + /// or [`Receiver::poll_recv_ref`] method for receiving from the channel, + /// this allows allocations for channel messages to be reused in place. + /// + /// # Errors + /// + /// If the [`Receiver`] end of the channel has been dropped, this + /// returns a [`Closed`] error. + /// + /// # Examples + /// + /// Sending formatted strings by writing them directly to channel slots, + /// in place: + /// ``` + /// use thingbuf::mpsc; + /// use std::{fmt::Write, thread}; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = mpsc::channel::(8); + /// + /// // Spawn a thread that writes formatted messages to the channel until it closes. + /// thread::spawn( move || { + /// let mut count = 1; + /// while let Ok(mut value) = tx.blocking_send_ref() { + /// // Writing to the `SendRef` will reuse the *existing* string + /// // allocation in place. + /// write!(value, "hello from message {}", count) + /// .expect("writing to a `String` should never fail"); + /// count += 1; + /// } + /// }); + /// + /// // Print each message received from the channel: + /// for _ in 0..10 { + /// let msg = rx.recv_ref().await.unwrap(); + /// println!("{}", msg); + /// } + /// } + /// ``` + /// [`blocking_send`]: Self::blocking_send + #[cfg(feature = "std")] + #[cfg_attr(docsrs, doc(cfg(feature = "std")))] + pub fn blocking_send_ref(&self) -> Result, Closed> { + blocking_send_ref(&self.inner.core, self.inner.slots.as_ref(), &self.inner.recycle) + } + + + /// Reserves a slot in the channel to mutate in place, blocking until + /// there is a free slot to write to, waiting for at most `timeout`. + /// + /// This is similar to the [`blocking_send_timeout`] method, but, rather than taking a + /// message by value to write to the channel, this method reserves a + /// writable slot in the channel, and returns a [`SendRef`] that allows + /// mutating the slot in place. If the [`Receiver`] end of the channel + /// uses the [`Receiver::recv_ref`] method for receiving from the channel, + /// this allows allocations for channel messages to be reused in place. + /// + /// # Errors + /// + /// - [`Err`]`(`[`SendTimeoutError::Timeout`]`)` if the timeout has elapsed. + /// - [`Err`]`(`[`SendTimeoutError::Closed`]`)` if the channel has closed. + /// + /// # Examples + /// + /// Sending formatted strings by writing them directly to channel slots, + /// in place: + /// + /// ``` + /// use thingbuf::mpsc::{self, errors::SendTimeoutError}; + /// use std::{fmt::Write, time::Duration, thread}; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = mpsc::channel::(1); + /// + /// tokio::spawn(async move { + /// tokio::time::sleep(Duration::from_millis(500)); + /// let msg = rx.recv_ref().await.unwrap(); + /// println!("{}", msg); + /// tokio::time::sleep(Duration::from_millis(500)); + /// }); + /// + /// thread::spawn(move || { + /// let mut value = tx.blocking_send_ref_timeout(Duration::from_millis(200)).unwrap(); + /// write!(value, "hello").expect("writing to a `String` should never fail"); + /// thread::sleep(Duration::from_millis(400)); + /// + /// let mut value = tx.blocking_send_ref_timeout(Duration::from_millis(200)).unwrap(); + /// write!(value, "world").expect("writing to a `String` should never fail"); + /// thread::sleep(Duration::from_millis(400)); + /// + /// assert_eq!( + /// Err(&SendTimeoutError::Timeout(())), + /// tx.blocking_send_ref_timeout(Duration::from_millis(200)).as_deref().map(String::as_str) + /// ); + /// }); + /// } + /// ``` + /// + /// [`blocking_send_timeout`]: Self::blocking_send_timeout + #[cfg(feature = "std")] + #[cfg_attr(docsrs, doc(cfg(feature = "std")))] + #[cfg(not(all(test, loom)))] + pub fn blocking_send_ref_timeout(&self, timeout: Duration) -> Result, SendTimeoutError> { + blocking_send_ref_timeout( + &self.inner.core, + self.inner.slots.as_ref(), + &self.inner.recycle, + timeout, + ) + } + /// Sends a message by value, waiting until there is a free slot to /// write to. /// @@ -198,6 +318,129 @@ feature! { } } + /// Sends a message by value, blocking until there is a free slot to + /// write to. + /// + /// This method takes the message by value, and replaces any previous + /// value in the slot. This means that the channel will *not* function + /// as an object pool while sending messages with `blocking_send`. This method is + /// most appropriate when messages don't own reusable heap allocations, + /// or when the [`Receiver`] end of the channel must receive messages by + /// moving them out of the channel by value (using the [`Receiver::recv`] + /// or [`Receiver::blocking_recv`] method). When messages in the channel own + /// reusable heap allocations (such as `String`s or `Vec`s), and the + /// [`Receiver`] doesn't need to receive them by value, consider using + /// [`send_ref`][Self::send_ref] or [`blocking_send_ref`] instead, to enable allocation reuse. + /// + /// # Errors + /// + /// If the [`Receiver`] end of the channel has been dropped, this + /// returns a [`Closed`] error containing the sent value. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc; + /// use std::thread; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = mpsc::channel(8); + /// + /// // Spawn a thread that writes the current iteration to the channel until it closes. + /// thread::spawn( move || { + /// let mut count = 1; + /// while tx.blocking_send(count).is_ok() { + /// count += 1; + /// } + /// }); + /// + /// // Print each message received from the channel: + /// for _ in 0..10 { + /// let msg = rx.recv().await.unwrap(); + /// println!("received message {}", msg); + /// } + /// } + /// ``` + /// [`blocking_send_ref`]: Self::blocking_send_ref + #[cfg(feature = "std")] + #[cfg_attr(docsrs, doc(cfg(feature = "std")))] + pub fn blocking_send(&self, val: T) -> Result<(), Closed> { + match self.blocking_send_ref() { + Err(Closed(())) => Err(Closed(val)), + Ok(mut slot) => { + *slot = val; + Ok(()) + } + } + } + + /// Sends a message by value, blocking until there is a free slot to + /// write to, for at most `timeout`. + /// + /// This method takes the message by value, and replaces any previous + /// value in the slot. This means that the channel will *not* function + /// as an object pool while sending messages with `send_timeout`. This method is + /// most appropriate when messages don't own reusable heap allocations, + /// or when the [`Receiver`] end of the channel must receive messages by + /// moving them out of the channel by value (using the + /// [`Receiver::recv`] method). When messages in the channel own + /// reusable heap allocations (such as `String`s or `Vec`s), and the + /// [`Receiver`] doesn't need to receive them by value, consider using + /// [`blocking_send_ref_timeout`] instead, to enable allocation reuse. + /// + /// + /// # Errors + /// + /// - [`Err`]`(`[`SendTimeoutError::Timeout`]`)` if the timeout has elapsed. + /// - [`Err`]`(`[`SendTimeoutError::Closed`]`)` if the channel has closed. + /// + /// # Examples + /// + /// ``` + /// use thingbuf::mpsc::{self, errors::SendTimeoutError}; + /// use std::{time::Duration, thread}; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = mpsc::channel(1); + /// + /// tokio::spawn(async move { + /// tokio::time::sleep(Duration::from_millis(500)); + /// let msg = rx.recv().await.unwrap(); + /// println!("{}", msg); + /// tokio::time::sleep(Duration::from_millis(500)); + /// }); + /// + /// thread::spawn(move || { + /// tx.blocking_send_timeout(1, Duration::from_millis(200)).unwrap(); + /// thread::sleep(Duration::from_millis(400)); + /// + /// tx.blocking_send_timeout(2, Duration::from_millis(200)).unwrap(); + /// thread::sleep(Duration::from_millis(400)); + /// + /// assert_eq!( + /// Err(SendTimeoutError::Timeout(3)), + /// tx.blocking_send_timeout(3, Duration::from_millis(200)) + /// ); + /// }); + /// } + /// ``` + /// + /// [`blocking_send_ref_timeout`]: Self::blocking_send_ref_timeout + #[cfg(feature = "std")] + #[cfg_attr(docsrs, doc(cfg(feature = "std")))] + #[cfg(not(all(test, loom)))] + pub fn blocking_send_timeout(&self, val: T, timeout: Duration) -> Result<(), SendTimeoutError> { + match self.blocking_send_ref_timeout(timeout) { + Err(e) => Err(e.with_value(val)), + Ok(mut slot) => { + *slot = val; + Ok(()) + } + } + } + /// Attempts to reserve a slot in the channel to mutate in place, /// without waiting for capacity. /// @@ -471,6 +714,12 @@ feature! { } } + #[cfg(feature = "std")] + #[cfg_attr(docsrs, doc(cfg(feature = "std")))] + pub fn blocking_recv_ref(&self) -> Option> { + blocking_recv_ref(&self.inner.core, self.inner.slots.as_ref()) + } + /// Receives the next message for this receiver, **by value**. /// /// This method returns `None` if the channel has been closed and there are @@ -540,6 +789,16 @@ feature! { } } + #[cfg(feature = "std")] + #[cfg_attr(docsrs, doc(cfg(feature = "std")))] + pub fn blocking_recv(&self) -> Option + where + R: Recycle, + { + let mut val = self.blocking_recv_ref()?; + Some(recycling::take(&mut *val, &self.inner.recycle)) + } + /// Attempts to receive the next message for this receiver by reference /// without waiting for a new message when the channel is empty. /// @@ -1979,6 +2238,169 @@ impl PinnedDrop for SendRefFuture<'_, T, R> { } } +// === impl blocking Fns === + +feature! { + #![feature = "std"] + + use crate::thread_waker; + use crate::util::Backoff; + use crate::loom::thread; + use std::time::{Duration, Instant}; + + #[inline] + fn blocking_send_ref<'a, T, R: Recycle>( + core: &'a ChannelCore, + slots: &'a [Slot], + recycle: &'a R, + ) -> Result, Closed<()>> { + match core.try_send_ref(slots, recycle) { + Ok(slot) => return Ok(SendRef(slot)), + Err(TrySendError::Closed(_)) => return Err(Closed(())), + _ => {} + } + let mut waiter = queue::Waiter::new(); + let mut unqueued = true; + let waker = thread_waker::current(); + let mut boff = Backoff::new(); + loop { + let node = unsafe { Pin::new_unchecked(&mut waiter) }; + let wait = if unqueued { + test_dbg!(core.tx_wait.start_wait(node, &waker)) + } else { + test_dbg!(core.tx_wait.continue_wait(node, &waker)) + }; + match wait { + WaitResult::Closed => return Err(Closed(())), + WaitResult::Notified => { + boff.spin_yield(); + match core.try_send_ref(slots.as_ref(), recycle) { + Ok(slot) => return Ok(SendRef(slot)), + Err(TrySendError::Closed(_)) => return Err(Closed(())), + _ => {} + } + } + WaitResult::Wait => { + unqueued = false; + thread::park(); + } + } + } + } + + #[cfg(not(all(test, loom)))] + #[inline] + fn blocking_send_ref_timeout<'a, T, R: Recycle>( + core: &'a ChannelCore, + slots: &'a [Slot], + recycle: &'a R, + timeout: Duration, + ) -> Result, SendTimeoutError> { + // fast path: avoid getting the thread and constructing the node if the + // slot is immediately ready. + match core.try_send_ref(slots, recycle) { + Ok(slot) => return Ok(SendRef(slot)), + Err(TrySendError::Closed(_)) => return Err(SendTimeoutError::Closed(())), + _ => {} + } + + let mut waiter = queue::Waiter::new(); + let mut unqueued = true; + let thread = thread_waker::current(); + let mut boff = Backoff::new(); + let beginning_park = Instant::now(); + loop { + let node = unsafe { + // Safety: in this case, it's totally safe to pin the waiter, as + // it is owned uniquely by this function, and it cannot possibly + // be moved while this thread is parked. + Pin::new_unchecked(&mut waiter) + }; + + let wait = if unqueued { + test_dbg!(core.tx_wait.start_wait(node, &thread)) + } else { + test_dbg!(core.tx_wait.continue_wait(node, &thread)) + }; + + match wait { + WaitResult::Closed => return Err(SendTimeoutError::Closed(())), + WaitResult::Notified => { + boff.spin_yield(); + match core.try_send_ref(slots.as_ref(), recycle) { + Ok(slot) => return Ok(SendRef(slot)), + Err(TrySendError::Closed(_)) => return Err(SendTimeoutError::Closed(())), + _ => {} + } + } + WaitResult::Wait => { + unqueued = false; + thread::park_timeout(timeout); + let elapsed = beginning_park.elapsed(); + if elapsed >= timeout { + return Err(SendTimeoutError::Timeout(())); + } + } + } + } + } + + #[inline] + fn blocking_recv_ref<'a, T>( + core: &'a ChannelCore, + slots: &'a [Slot], + ) -> Option> { + loop { + match core.poll_recv_ref(slots, crate::thread_waker::current) { + Poll::Ready(r) => { + return r.map(|slot| { + RecvRef(RecvRefInner { + _notify: super::NotifyTx(&core.tx_wait), + slot, + }) + }) + } + Poll::Pending => { + test_println!("parking ({:?})", crate::loom::thread::current()); + crate::loom::thread::park(); + } + } + } + } + + #[cfg(not(all(test, loom)))] + #[inline] + fn blocking_recv_ref_timeout<'a, T>( + core: &'a ChannelCore, + slots: &'a [Slot], + timeout: Duration, + ) -> Result, RecvTimeoutError> { + let beginning_park = Instant::now(); + loop { + match core.poll_recv_ref(slots, thread_waker::current) { + Poll::Ready(r) => { + return r + .map(|slot| { + RecvRef(RecvRefInner { + _notify: super::NotifyTx(&core.tx_wait), + slot, + }) + }) + .ok_or(RecvTimeoutError::Closed); + } + Poll::Pending => { + test_println!("park_timeout ({:?})", thread::current()); + thread::park_timeout(timeout); + let elapsed = beginning_park.elapsed(); + if elapsed >= timeout { + return Err(RecvTimeoutError::Timeout); + } + } + } + } + } +} + #[cfg(feature = "alloc")] #[cfg(test)] mod tests { From 20063bde036764955821efa825ee77c684b3ba07 Mon Sep 17 00:00:00 2001 From: Mike <42689366+MikeIvanichev@users.noreply.github.com> Date: Tue, 22 Oct 2024 03:25:54 +0300 Subject: [PATCH 8/9] impl tests for bridge fns --- src/mpsc/tests.rs | 2 + src/mpsc/tests/mpsc_bridge.rs | 489 ++++++++++++++++++++++++++++++++++ tests/mpsc_async.rs | 144 ++++++++++ 3 files changed, 635 insertions(+) create mode 100644 src/mpsc/tests/mpsc_bridge.rs diff --git a/src/mpsc/tests.rs b/src/mpsc/tests.rs index 1a0f1d9..80948a2 100644 --- a/src/mpsc/tests.rs +++ b/src/mpsc/tests.rs @@ -3,3 +3,5 @@ use super::*; mod mpsc_async; #[cfg(feature = "std")] mod mpsc_blocking; +#[cfg(feature = "std")] +mod mpsc_bridge; diff --git a/src/mpsc/tests/mpsc_bridge.rs b/src/mpsc/tests/mpsc_bridge.rs new file mode 100644 index 0000000..f990548 --- /dev/null +++ b/src/mpsc/tests/mpsc_bridge.rs @@ -0,0 +1,489 @@ +use super::*; +use crate::loom::{self, alloc::Track, thread}; +use crate::mpsc::errors; +use crate::mpsc::RecvRef; + +#[test] +#[cfg_attr(ci_skip_slow_models, ignore)] +fn mpsc_try_send_recv() { + loom::model(|| { + let (tx, rx) = channel(3); + + let p1 = { + let tx = tx.clone(); + thread::spawn(move || { + *tx.try_send_ref().unwrap() = 1; + }) + }; + let p2 = thread::spawn(move || { + *tx.try_send_ref().unwrap() = 2; + *tx.try_send_ref().unwrap() = 3; + }); + + let mut vals = Vec::new(); + + while let Some(val) = rx.blocking_recv_ref() { + vals.push(*val); + } + + vals.sort_unstable(); + assert_eq_dbg!(vals, vec![1, 2, 3]); + + p1.join().unwrap(); + p2.join().unwrap(); + }) +} + +#[test] +#[cfg_attr(ci_skip_slow_models, ignore)] +fn mpsc_try_recv_ref() { + loom::model(|| { + let (tx, rx) = channel(2); + + let p1 = { + let tx = tx.clone(); + thread::spawn(move || { + *tx.blocking_send_ref().unwrap() = 1; + thread::yield_now(); + *tx.blocking_send_ref().unwrap() = 2; + }) + }; + let p2 = thread::spawn(move || { + *tx.blocking_send_ref().unwrap() = 3; + thread::yield_now(); + *tx.blocking_send_ref().unwrap() = 4; + }); + + let mut vals = Vec::new(); + + while vals.len() < 4 { + match rx.try_recv_ref() { + Ok(val) => vals.push(*val), + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Closed) => panic!("channel closed"), + } + thread::yield_now(); + } + + vals.sort_unstable(); + assert_eq_dbg!(vals, vec![1, 2, 3, 4]); + + p1.join().unwrap(); + p2.join().unwrap(); + }) +} + +#[test] +#[cfg_attr(ci_skip_slow_models, ignore)] +fn mpsc_test_skip_slot() { + // This test emulates a situation where we might need to skip a slot. The setup includes two writing + // threads that write elements to the channel and one reading thread that maintains a RecvRef to the + // first element until the end of the test, necessitating the skip: + // Given that the channel capacity is 2, here's the sequence of operations: + // Thread 1 writes: 1, 2 + // Thread 2 writes: 3, 4 + // The main thread reads from slots in this order: 0 (holds ref), 1, 1, 1. + // As a result, the first slot is skipped during this process. + loom::model(|| { + let (tx, rx) = channel(3); + + let p1 = { + let tx = tx.clone(); + thread::spawn(move || { + *tx.blocking_send_ref().unwrap() = 1; + thread::yield_now(); + *tx.blocking_send_ref().unwrap() = 2; + }) + }; + + let p2 = { + thread::spawn(move || { + *tx.blocking_send_ref().unwrap() = 3; + thread::yield_now(); + *tx.blocking_send_ref().unwrap() = 4; + }) + }; + + let mut vals = Vec::new(); + let mut hold: Vec> = Vec::new(); + + while vals.len() + hold.len() < 4 { + match rx.try_recv_ref() { + Ok(val) => { + if vals.is_empty() && hold.is_empty() { + hold.push(val); + } else { + vals.push(*val); + } + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Closed) => { + panic!("channel closed"); + } + } + thread::yield_now(); + } + vals.push(*hold.pop().unwrap()); + + vals.sort_unstable(); + assert_eq_dbg!(vals, vec![1, 2, 3, 4]); + + p1.join().unwrap(); + p2.join().unwrap(); + }) +} + +#[test] +fn rx_closes() { + const ITERATIONS: usize = 6; + loom::model(|| { + let (tx, rx) = channel(ITERATIONS / 2); + + let producer = thread::spawn(move || { + 'iters: for i in 0..=ITERATIONS { + test_println!("sending {}", i); + 'send: loop { + match tx.try_send_ref() { + Ok(mut slot) => { + *slot = i; + break 'send; + } + Err(TrySendError::Full(_)) => thread::yield_now(), + Err(TrySendError::Closed(_)) => break 'iters, + } + } + test_println!("sent {}\n", i); + } + }); + + for i in 0..ITERATIONS - 1 { + let n = rx.blocking_recv(); + + test_println!("recv {:?}\n", n); + assert_eq_dbg!(n, Some(i)); + } + drop(rx); + + producer.join().unwrap(); + }) +} + +#[test] +fn rx_close_unconsumed_spsc() { + // Tests that messages that have not been consumed by the receiver are + // dropped when dropping the channel. + const MESSAGES: usize = 4; + + loom::model(|| { + let (tx, rx) = channel(MESSAGES); + + let consumer = thread::spawn(move || { + // recieve one message + let msg = rx.blocking_recv(); + test_println!("recv {:?}", msg); + assert_dbg!(msg.is_some()); + // drop the receiver... + }); + + let mut i = 1; + while let Ok(mut slot) = tx.blocking_send_ref() { + test_println!("producer sending {}...", i); + *slot = Track::new(i); + i += 1; + } + + consumer.join().unwrap(); + drop(tx); + }) +} + +#[test] +#[ignore] // This is marked as `ignore` because it takes over an hour to run. +fn rx_close_unconsumed_mpsc() { + const MESSAGES: usize = 2; + + fn do_producer(tx: Sender>, n: usize) -> impl FnOnce() + Send + Sync { + move || { + let mut i = 1; + while let Ok(mut slot) = tx.blocking_send_ref() { + test_println!("producer {} sending {}...", n, i); + *slot = Track::new(i); + i += 1; + } + } + } + + loom::model(|| { + let (tx, rx) = channel(MESSAGES); + + let consumer = thread::spawn(move || { + // recieve one message + let msg = rx.blocking_recv(); + test_println!("recv {:?}", msg); + assert_dbg!(msg.is_some()); + // drop the receiver... + }); + + let producer = thread::spawn(do_producer(tx.clone(), 1)); + + do_producer(tx, 2)(); + + producer.join().unwrap(); + consumer.join().unwrap(); + }) +} + +#[test] +fn spsc_recv_then_try_send() { + loom::model(|| { + let (tx, rx) = channel::(4); + let consumer = thread::spawn(move || { + assert_eq_dbg!(rx.blocking_recv().unwrap(), 10); + }); + + tx.try_send(10).unwrap(); + consumer.join().unwrap(); + }) +} + +#[test] +fn spsc_recv_then_close() { + loom::model(|| { + let (tx, rx) = channel::(4); + let producer = thread::spawn(move || { + drop(tx); + }); + + assert_eq_dbg!(rx.blocking_recv(), None); + + producer.join().unwrap(); + }); +} + +#[test] +fn spsc_recv_then_try_send_then_close() { + loom::model(|| { + let (tx, rx) = channel::(2); + let consumer = thread::spawn(move || { + assert_eq_dbg!(rx.blocking_recv().unwrap(), 10); + assert_eq_dbg!(rx.blocking_recv().unwrap(), 20); + assert_eq_dbg!(rx.blocking_recv(), None); + }); + + tx.try_send(10).unwrap(); + tx.try_send(20).unwrap(); + drop(tx); + consumer.join().unwrap(); + }) +} + +#[test] +#[cfg_attr(ci_skip_slow_models, ignore)] +fn mpsc_send_recv_wrap() { + loom::model(|| { + let (tx, rx) = channel::(1); + let producer1 = do_producer(tx.clone(), 10); + let producer2 = do_producer(tx, 20); + + let mut results = Vec::new(); + while let Some(val) = rx.blocking_recv() { + test_println!("RECEIVED {:?}", val); + results.push(val); + } + + producer1.join().expect("producer 1 panicked"); + producer2.join().expect("producer 2 panicked"); + + assert_eq_dbg!(results.len(), 2); + assert_dbg!( + results.contains(&10), + "missing value from producer 1; results={:?}", + results + ); + + assert_dbg!( + results.contains(&20), + "missing value from producer 2; results={:?}", + results + ); + }) +} + +#[test] +fn mpsc_send_recv_no_wrap() { + loom::model(|| { + let (tx, rx) = channel::(2); + let producer1 = do_producer(tx.clone(), 10); + let producer2 = do_producer(tx, 20); + + let mut results = Vec::new(); + while let Some(val) = rx.blocking_recv() { + test_println!("RECEIVED {:?}", val); + results.push(val); + } + + producer1.join().expect("producer 1 panicked"); + producer2.join().expect("producer 2 panicked"); + + assert_eq_dbg!(results.len(), 2); + assert_dbg!( + results.contains(&10), + "missing value from producer 1; results={:?}", + results + ); + + assert_dbg!( + results.contains(&20), + "missing value from producer 2; results={:?}", + results + ); + }) +} + +fn do_producer(tx: Sender, tag: usize) -> thread::JoinHandle<()> { + thread::spawn(move || { + test_println!("SENDING {:?}", tag); + tx.blocking_send(tag).unwrap(); + test_println!("SENT {:?}", tag); + }) +} + +#[test] +fn spsc_send_recv_in_order_no_wrap() { + const N_SENDS: usize = 4; + loom::model(|| { + let (tx, rx) = channel::(N_SENDS); + let consumer = thread::spawn(move || { + for i in 1..=N_SENDS { + assert_eq_dbg!(rx.blocking_recv(), Some(i)); + } + assert_eq_dbg!(rx.blocking_recv(), None); + }); + + for i in 1..=N_SENDS { + tx.blocking_send(i).unwrap() + } + drop(tx); + consumer.join().unwrap(); + }) +} + +#[test] +fn spsc_send_recv_in_order_wrap() { + const N_SENDS: usize = 2; + loom::model(|| { + let (tx, rx) = channel::(N_SENDS / 2); + let consumer = thread::spawn(move || { + for i in 1..=N_SENDS { + assert_eq_dbg!(rx.blocking_recv(), Some(i)); + } + assert_eq_dbg!(rx.blocking_recv(), None); + }); + + for i in 1..=N_SENDS { + tx.blocking_send(i).unwrap() + } + drop(tx); + consumer.join().unwrap(); + }) +} + +#[test] +fn spsc_send_recv_in_order_skip_wrap() { + const N_SENDS: usize = 5; + loom::model(|| { + let (tx, rx) = channel::((N_SENDS + 1) / 2); + let consumer = thread::spawn(move || { + let mut hold = Vec::new(); + assert_eq_dbg!(rx.blocking_recv(), Some(1)); + loop { + match rx.try_recv_ref() { + Ok(val) => { + assert_eq_dbg!(*val, 2); + hold.push(val); + break; + } + Err(TryRecvError::Empty) => { + loom::thread::yield_now(); + } + Err(TryRecvError::Closed) => panic!("channel closed"), + } + } + for i in 3..=N_SENDS { + assert_eq_dbg!(rx.blocking_recv(), Some(i)); + } + assert_eq_dbg!(rx.blocking_recv(), None); + }); + for i in 1..=N_SENDS { + tx.blocking_send(i).unwrap(); + } + drop(tx); + consumer.join().unwrap(); + }); +} + +#[test] +fn tx_close_wakes() { + loom::model(|| { + let (tx, rx) = channel::(2); + let consumer = thread::spawn(move || { + assert_eq_dbg!(rx.blocking_recv(), None); + }); + drop(tx); + consumer.join().unwrap(); + }); +} + +#[test] +fn tx_close_drains_queue() { + const LEN: usize = 4; + loom::model(|| { + let (tx, rx) = channel(LEN); + let producer = thread::spawn(move || { + for i in 0..LEN { + tx.blocking_send(i).unwrap(); + } + }); + + for i in 0..LEN { + assert_eq_dbg!(rx.blocking_recv(), Some(i)) + } + + producer.join().unwrap(); + }); +} + +// Reproduces https://github.com/hawkw/thingbuf/issues/83 +// Pushing to a thingbuf should not hang when the buffer is full. +#[test] +fn test_full() { + loom::model(|| { + let (tx, rx) = channel(4); + let p1 = { + let tx = tx.clone(); + thread::spawn(move || loop { + match tx.try_send(1) { + Ok(_) => {} + Err(errors::TrySendError::Full(_)) => break, + Err(err) => assert!(false, "unexpected error: {:?}", err), + } + thread::yield_now(); + }) + }; + + let p2 = { + let tx = tx.clone(); + thread::spawn(move || loop { + match tx.try_send(2) { + Ok(_) => {} + Err(errors::TrySendError::Full(_)) => break, + Err(err) => assert!(false, "unexpected error: {:?}", err), + } + thread::yield_now(); + }) + }; + + p1.join().unwrap(); + p2.join().unwrap(); + }); +} diff --git a/tests/mpsc_async.rs b/tests/mpsc_async.rs index 82267cd..65abf47 100644 --- a/tests/mpsc_async.rs +++ b/tests/mpsc_async.rs @@ -43,3 +43,147 @@ async fn basically_works() { } } } + +#[tokio::test(flavor = "multi_thread")] +async fn sync_to_async() { + use std::collections::HashSet; + use std::thread; + + const N_SENDS: usize = 10; + const N_PRODUCERS: usize = 10; + + fn start_producer(tx: mpsc::Sender, n: usize) -> thread::JoinHandle<()> { + let tag = n * N_SENDS; + thread::Builder::new() + .name(format!("producer {}", n)) + .spawn(move || { + for i in 0..N_SENDS { + let msg = i + tag; + println!("[producer {}] sending {}...", n, msg); + tx.blocking_send(msg).unwrap(); + println!("[producer {}] sent {}!", n, msg); + } + println!("[producer {}] DONE!", n); + }) + .expect("spawning threads should succeed") + } + + let (tx, rx) = mpsc::channel(N_SENDS / 2); + for n in 0..N_PRODUCERS { + start_producer(tx.clone(), n); + } + drop(tx); + + let mut results = HashSet::new(); + while let Some(val) = { + println!("receiving..."); + rx.recv().await + } { + println!("received {}!", val); + results.insert(val); + } + + let results = dbg!(results); + + for n in 0..N_PRODUCERS { + let tag = n * N_SENDS; + for i in 0..N_SENDS { + let msg = i + tag; + assert!(results.contains(&msg), "missing message {:?}", msg); + } + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn async_to_sync() { + use std::collections::HashSet; + + const N_SENDS: usize = 10; + const N_PRODUCERS: usize = 10; + + async fn do_producer(tx: mpsc::Sender, n: usize) { + let tag = n * N_SENDS; + for i in 0..N_SENDS { + let msg = i + tag; + println!("sending {}...", msg); + tx.send(msg).await.unwrap(); + println!("sent {}!", msg); + } + println!("PRODUCER {} DONE!", n); + } + + let (tx, rx) = mpsc::channel(N_SENDS / 2); + for n in 0..N_PRODUCERS { + tokio::spawn(do_producer(tx.clone(), n)); + } + drop(tx); + + let mut results = HashSet::new(); + while let Some(val) = { + println!("receiving..."); + rx.blocking_recv() + } { + println!("received {}!", val); + results.insert(val); + } + + let results = dbg!(results); + + for n in 0..N_PRODUCERS { + let tag = n * N_SENDS; + for i in 0..N_SENDS { + let msg = i + tag; + assert!(results.contains(&msg), "missing message {:?}", msg); + } + } +} + +#[test] +fn sync_to_sync() { + use std::collections::HashSet; + use std::thread; + + const N_SENDS: usize = 10; + const N_PRODUCERS: usize = 10; + + fn start_producer(tx: mpsc::Sender, n: usize) -> thread::JoinHandle<()> { + let tag = n * N_SENDS; + thread::Builder::new() + .name(format!("producer {}", n)) + .spawn(move || { + for i in 0..N_SENDS { + let msg = i + tag; + println!("[producer {}] sending {}...", n, msg); + tx.blocking_send(msg).unwrap(); + println!("[producer {}] sent {}!", n, msg); + } + println!("[producer {}] DONE!", n); + }) + .expect("spawning threads should succeed") + } + + let (tx, rx) = mpsc::channel(N_SENDS / 2); + for n in 0..N_PRODUCERS { + start_producer(tx.clone(), n); + } + drop(tx); + + let mut results = HashSet::new(); + while let Some(val) = { + println!("receiving..."); + rx.blocking_recv() + } { + println!("received {}!", val); + results.insert(val); + } + + let results = dbg!(results); + + for n in 0..N_PRODUCERS { + let tag = n * N_SENDS; + for i in 0..N_SENDS { + let msg = i + tag; + assert!(results.contains(&msg), "missing message {:?}", msg); + } + } +} From 62bdf1198f8ede650a9be5ef8bfd18ee61f88685 Mon Sep 17 00:00:00 2001 From: Mike <42689366+MikeIvanichev@users.noreply.github.com> Date: Tue, 22 Oct 2024 03:49:49 +0300 Subject: [PATCH 9/9] add tmp doc to allow docs to build --- src/mpsc/async_impl.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/mpsc/async_impl.rs b/src/mpsc/async_impl.rs index 72df48d..9cd3c7f 100644 --- a/src/mpsc/async_impl.rs +++ b/src/mpsc/async_impl.rs @@ -714,6 +714,7 @@ feature! { } } + /// Tmp docs to pass build #[cfg(feature = "std")] #[cfg_attr(docsrs, doc(cfg(feature = "std")))] pub fn blocking_recv_ref(&self) -> Option> { @@ -789,6 +790,7 @@ feature! { } } + /// Tmp docs to pass build #[cfg(feature = "std")] #[cfg_attr(docsrs, doc(cfg(feature = "std")))] pub fn blocking_recv(&self) -> Option