From 4818613844ad3bdfe8389407e13f3d68440f3cc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 17 Oct 2022 19:35:37 +0100 Subject: [PATCH] general: feature gate every platform under tokio and smol --- CHANGELOG.md | 8 ++-- Cargo.toml | 10 +++-- examples/if_watch.rs | 4 +- src/apple.rs | 93 +++++++++++++++++++++++++++++++++++++++-- src/fallback.rs | 39 +++++++++++++++++- src/lib.rs | 98 +++++++++++++++++++------------------------- src/win.rs | 39 +++++++++++++++++- 7 files changed, 221 insertions(+), 70 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1766f59..744b70b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,10 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [2.1.0] - [unreleased] +## [3.0.0] - [unreleased] -### Added -- Allow opting for Tokio instead of smol for the `AsyncSocket`. See [PR 27](https://github.com/mxinden/if-watch/pull/27). +### Changed +- Feature gate async runtime, allowing opting between Tokio or smol. For every OS each `IfWatcher` is + under the `tokio` or `smol` module. This makes it a breaking change as there + is no more a default implementation. See [PR 27](https://github.com/mxinden/if-watch/pull/27). ## [2.0.0] diff --git a/Cargo.toml b/Cargo.toml index 46c3c6c..5369884 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "if-watch" -version = "2.0.0" +version = "3.0.0" authors = ["David Craven ", "Parity Technologies Limited "] edition = "2021" keywords = ["asynchronous", "routing"] @@ -10,8 +10,8 @@ repository = "https://github.com/mxinden/if-watch" [features] default = ["smol"] -tokio = ["rtnetlink/tokio_socket"] -smol = ["rtnetlink/smol_socket"] +tokio = ["dep:tokio", "rtnetlink/tokio_socket"] +smol = ["dep:smol", "rtnetlink/smol_socket"] [dependencies] fnv = "1.0.7" @@ -26,6 +26,8 @@ rtnetlink = { version = "0.10.0", default-features = false } core-foundation = "0.9.2" if-addrs = "0.7.0" system-configuration = "0.5.0" +tokio = { version = "1.21.2", features = ["rt"], optional = true } +smol = { version = "1.2.5", optional = true } [target.'cfg(target_os = "windows")'.dependencies] if-addrs = "0.7.0" @@ -37,3 +39,5 @@ if-addrs = "0.7.0" [dev-dependencies] env_logger = "0.9.0" +smol = "1.2.5" +tokio = { version = "1.21.2", features = ["rt", "macros"] } diff --git a/examples/if_watch.rs b/examples/if_watch.rs index 2f8721a..012eed1 100644 --- a/examples/if_watch.rs +++ b/examples/if_watch.rs @@ -1,9 +1,9 @@ use futures::StreamExt; -use if_watch::IfWatcher; +use if_watch::smol::IfWatcher; fn main() { env_logger::init(); - futures::executor::block_on(async { + smol::block_on(async { let mut set = IfWatcher::new().unwrap(); loop { let event = set.select_next_some().await; diff --git a/src/apple.rs b/src/apple.rs index f775058..b76b6c2 100644 --- a/src/apple.rs +++ b/src/apple.rs @@ -4,31 +4,95 @@ use core_foundation::runloop::{kCFRunLoopCommonModes, CFRunLoop}; use core_foundation::string::CFString; use fnv::FnvHashSet; use futures::channel::mpsc; -use futures::stream::Stream; +use futures::stream::{FusedStream, Stream}; +use futures::Future; use if_addrs::IfAddr; use std::collections::VecDeque; use std::io::Result; +use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; use system_configuration::dynamic_store::{ SCDynamicStore, SCDynamicStoreBuilder, SCDynamicStoreCallBackContext, }; +#[cfg(feature = "tokio")] +pub mod tokio { + //! An interface watcher that uses the `tokio` runtime. + use futures::Future; + + #[doc(hidden)] + pub struct TokioRuntime; + + impl super::Runtime for TokioRuntime { + fn spawn(f: F) + where + F: Future, + F: Send + 'static, + ::Output: Send + 'static, + { + tokio::spawn(f); + } + } + + /// Watches for interface changes. + pub type IfWatcher = super::IfWatcher; +} + +#[cfg(feature = "smol")] +pub mod smol { + //! An interface watcher that uses the `smol` runtime. + + use futures::Future; + + #[doc(hidden)] + pub struct SmolRuntime; + + impl super::Runtime for SmolRuntime { + fn spawn(f: F) + where + F: Future, + F: Send + 'static, + ::Output: Send + 'static, + { + smol::spawn(f).detach(); + } + } + + /// Watches for interface changes. + pub type IfWatcher = super::IfWatcher; +} + #[derive(Debug)] -pub struct IfWatcher { +pub struct IfWatcher { addrs: FnvHashSet, queue: VecDeque, rx: mpsc::Receiver<()>, + runtime: PhantomData, } -impl IfWatcher { +#[doc(hidden)] +pub trait Runtime { + fn spawn(f: F) + where + F: Future, + F: Send + 'static, + ::Output: Send + 'static; +} + +impl IfWatcher +where + T: Runtime, +{ + /// Create a watcher. pub fn new() -> Result { let (tx, rx) = mpsc::channel(1); - std::thread::spawn(|| background_task(tx)); + T::spawn(async { background_task(tx) }); let mut watcher = Self { addrs: Default::default(), queue: Default::default(), rx, + runtime: PhantomData, }; watcher.resync()?; Ok(watcher) @@ -55,10 +119,12 @@ impl IfWatcher { Ok(()) } + /// Iterate over current networks. pub fn iter(&self) -> impl Iterator { self.addrs.iter() } + /// Poll for an address change event. pub fn poll_if_event(&mut self, cx: &mut Context) -> Poll> { loop { if let Some(event) = self.queue.pop_front() { @@ -74,6 +140,25 @@ impl IfWatcher { } } +impl Stream for IfWatcher +where + T: Runtime + Unpin, +{ + type Item = Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::into_inner(self).poll_if_event(cx).map(Some) + } +} + +impl FusedStream for IfWatcher +where + T: Runtime + Unpin, +{ + fn is_terminated(&self) -> bool { + false + } +} + fn ifaddr_to_ipnet(addr: IfAddr) -> IpNet { match addr { IfAddr::V4(ip) => { diff --git a/src/fallback.rs b/src/fallback.rs index c05ccdf..a5ec9e7 100644 --- a/src/fallback.rs +++ b/src/fallback.rs @@ -1,6 +1,6 @@ use crate::IfEvent; use async_io::Timer; -use futures::stream::Stream; +use futures::stream::{FusedStream, Stream}; use if_addrs::IfAddr; use ipnet::{IpNet, Ipv4Net, Ipv6Net}; use std::collections::{HashSet, VecDeque}; @@ -10,6 +10,26 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +#[cfg(feature = "tokio")] +pub mod tokio { + //! An interface watcher. + //! **On this platform there is no difference between `tokio` and `smol` features,** + //! **this was done to maintain the api compatible with other platforms**. + + /// Watches for interface changes. + pub type IfWatcher = super::IfWatcher; +} + +#[cfg(feature = "smol")] +pub mod smol { + //! An interface watcher. + //! **On this platform there is no difference between `tokio` and `smol` features,** + //! **this was done to maintain the api compatible with other platforms**. + + /// Watches for interface changes. + pub type IfWatcher = super::IfWatcher; +} + /// An address set/watcher #[derive(Debug)] pub struct IfWatcher { @@ -19,7 +39,7 @@ pub struct IfWatcher { } impl IfWatcher { - /// Create a watcher + /// Create a watcher. pub fn new() -> Result { Ok(Self { addrs: Default::default(), @@ -45,10 +65,12 @@ impl IfWatcher { Ok(()) } + /// Iterate over current networks. pub fn iter(&self) -> impl Iterator { self.addrs.iter() } + /// Poll for an address change event. pub fn poll_if_event(&mut self, cx: &mut Context) -> Poll> { loop { if let Some(event) = self.queue.pop_front() { @@ -64,6 +86,19 @@ impl IfWatcher { } } +impl Stream for IfWatcher { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::into_inner(self).poll_if_event(cx).map(Some) + } +} + +impl FusedStream for IfWatcher { + fn is_terminated(&self) -> bool { + false + } +} + fn ifaddr_to_ipnet(addr: IfAddr) -> IpNet { match addr { IfAddr::V4(ip) => { diff --git a/src/lib.rs b/src/lib.rs index fd79b30..096f129 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,15 +2,7 @@ #![deny(missing_docs)] #![deny(warnings)] -#[cfg(not(target_os = "linux"))] -use futures::stream::{FusedStream, Stream}; pub use ipnet::{IpNet, Ipv4Net, Ipv6Net}; -#[cfg(not(target_os = "linux"))] -use std::{ - io::Result, - pin::Pin, - task::{Context, Poll}, -}; #[cfg(target_os = "macos")] mod apple; @@ -28,8 +20,14 @@ mod linux; #[cfg(target_os = "windows")] mod win; -#[cfg(target_os = "macos")] -use apple as platform_impl; +#[cfg(any(target_os = "macos", target_os = "ios"))] +#[cfg(feature = "tokio")] +pub use apple::tokio; + +#[cfg(any(target_os = "macos", target_os = "ios"))] +#[cfg(feature = "smol")] +pub use apple::smol; + #[cfg(target_os = "ios")] use apple as platform_impl; #[cfg(not(any( @@ -39,8 +37,14 @@ use apple as platform_impl; target_os = "windows", )))] use fallback as platform_impl; + #[cfg(target_os = "windows")] -use win as platform_impl; +#[cfg(feature = "tokio")] +pub use win::tokio; + +#[cfg(target_os = "windows")] +#[cfg(feature = "smol")] +pub use win::smol; #[cfg(target_os = "linux")] #[cfg(feature = "tokio")] @@ -48,7 +52,7 @@ pub use linux::tokio; #[cfg(target_os = "linux")] #[cfg(feature = "smol")] -pub use linux::smol::IfWatcher; +pub use linux::smol; /// An address change event. #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] @@ -59,66 +63,50 @@ pub enum IfEvent { Down(IpNet), } -/// Watches for interface changes. -#[cfg(not(target_os = "linux"))] -#[derive(Debug)] -pub struct IfWatcher(platform_impl::IfWatcher); - -#[cfg(not(target_os = "linux"))] -impl IfWatcher { - /// Create a watcher. - pub fn new() -> Result { - platform_impl::IfWatcher::new().map(Self) - } - - /// Iterate over current networks. - pub fn iter(&self) -> impl Iterator { - self.0.iter() - } - - /// Poll for an address change event. - pub fn poll_if_event(&mut self, cx: &mut Context) -> Poll> { - self.0.poll_if_event(cx) - } -} - -#[cfg(not(target_os = "linux"))] -impl Stream for IfWatcher { - type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::into_inner(self).poll_if_event(cx).map(Some) - } -} - -#[cfg(not(target_os = "linux"))] -impl FusedStream for IfWatcher { - fn is_terminated(&self) -> bool { - false - } -} - #[cfg(test)] mod tests { - use super::IfWatcher; use futures::StreamExt; use std::pin::Pin; #[test] - fn test_ip_watch() { - futures::executor::block_on(async { + fn test_smol_ip_watch() { + use super::smol::IfWatcher; + + smol::block_on(async { let mut set = IfWatcher::new().unwrap(); let event = set.select_next_some().await.unwrap(); println!("Got event {:?}", event); }); } + #[tokio::test] + async fn test_tokio_ip_watch() { + use super::tokio::IfWatcher; + + let mut set = IfWatcher::new().unwrap(); + let event = set.select_next_some().await.unwrap(); + println!("Got event {:?}", event); + } + #[test] - fn test_is_send() { - futures::executor::block_on(async { + fn test_smol_is_send() { + use super::smol::IfWatcher; + + smol::block_on(async { fn is_send(_: T) {} is_send(IfWatcher::new()); is_send(IfWatcher::new().unwrap()); is_send(Pin::new(&mut IfWatcher::new().unwrap())); }); } + + #[tokio::test] + async fn test_tokio_is_send() { + use super::tokio::IfWatcher; + + fn is_send(_: T) {} + is_send(IfWatcher::new()); + is_send(IfWatcher::new().unwrap()); + is_send(Pin::new(&mut IfWatcher::new().unwrap())); + } } diff --git a/src/win.rs b/src/win.rs index d73f32b..57e2790 100644 --- a/src/win.rs +++ b/src/win.rs @@ -1,10 +1,12 @@ use crate::{IfEvent, IpNet, Ipv4Net, Ipv6Net}; use fnv::FnvHashSet; +use futures::stream::{FusedStream, Stream}; use futures::task::AtomicWaker; use if_addrs::IfAddr; use std::collections::VecDeque; use std::ffi::c_void; use std::io::{Error, ErrorKind, Result}; +use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; @@ -14,6 +16,26 @@ use windows::Win32::NetworkManagement::IpHelper::{ MIB_NOTIFICATION_TYPE, }; +#[cfg(feature = "tokio")] +pub mod tokio { + //! An interface watcher. + //! **On Windows there is no difference between `tokio` and `smol` features,** + //! **this was done to maintain the api compatible with other platforms**. + + /// Watches for interface changes. + pub type IfWatcher = super::IfWatcher; +} + +#[cfg(feature = "smol")] +pub mod smol { + //! An interface watcher. + //! **On Windows there is no difference between `tokio` and `smol` features,** + //! **this was done to maintain the api compatible with other platforms**. + + /// Watches for interface changes. + pub type IfWatcher = super::IfWatcher; +} + /// An address set/watcher #[derive(Debug)] pub struct IfWatcher { @@ -26,7 +48,7 @@ pub struct IfWatcher { } impl IfWatcher { - /// Create a watcher + /// Create a watcher. pub fn new() -> Result { let resync = Arc::new(AtomicBool::new(true)); let waker = Arc::new(AtomicWaker::new()); @@ -63,10 +85,12 @@ impl IfWatcher { Ok(()) } + /// Iterate over current networks. pub fn iter(&self) -> impl Iterator { self.addrs.iter() } + /// Poll for an address change event. pub fn poll_if_event(&mut self, cx: &mut Context) -> Poll> { loop { if let Some(event) = self.queue.pop_front() { @@ -83,6 +107,19 @@ impl IfWatcher { } } +impl Stream for IfWatcher { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::into_inner(self).poll_if_event(cx).map(Some) + } +} + +impl FusedStream for IfWatcher { + fn is_terminated(&self) -> bool { + false + } +} + fn ifaddr_to_ipnet(addr: IfAddr) -> IpNet { match addr { IfAddr::V4(ip) => {