Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

linux: Feature gate async runtime, allowing opting between Tokio or smol #27

Merged
merged 10 commits into from
Nov 4, 2022
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [3.0.0] - [unreleased]
mxinden marked this conversation as resolved.
Show resolved Hide resolved

### Changed
- Feature gate async runtime, allowing opting between Tokio or smol. For every OS each `IfWatcher` is
under the `tokio` or `smol` module. This makes it a breaking change as there
is no more a default implementation. See [PR 27](https://github.com/mxinden/if-watch/pull/27).

## [2.0.0]

### Changed
Expand Down
13 changes: 11 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
[package]
name = "if-watch"
version = "2.0.0"
version = "3.0.0"
authors = ["David Craven <[email protected]>", "Parity Technologies Limited <[email protected]>"]
edition = "2021"
keywords = ["asynchronous", "routing"]
license = "MIT OR Apache-2.0"
description = "crossplatform asynchronous network watcher"
repository = "https://github.com/mxinden/if-watch"

[features]
default = ["smol"]
mxinden marked this conversation as resolved.
Show resolved Hide resolved
tokio = ["dep:tokio", "rtnetlink/tokio_socket"]
smol = ["dep:smol", "rtnetlink/smol_socket"]

[dependencies]
fnv = "1.0.7"
futures = "0.3.19"
ipnet = "2.3.1"
log = "0.4.14"

[target.'cfg(target_os = "linux")'.dependencies]
rtnetlink = { version = "0.10.0", default-features = false, features = ["smol_socket"] }
rtnetlink = { version = "0.10.0", default-features = false }

[target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies]
core-foundation = "0.9.2"
if-addrs = "0.7.0"
system-configuration = "0.5.0"
tokio = { version = "1.21.2", features = ["rt"], optional = true }
smol = { version = "1.2.5", optional = true }

[target.'cfg(target_os = "windows")'.dependencies]
if-addrs = "0.7.0"
Expand All @@ -32,3 +39,5 @@ if-addrs = "0.7.0"

[dev-dependencies]
env_logger = "0.9.0"
smol = "1.2.5"
tokio = { version = "1.21.2", features = ["rt", "macros"] }
4 changes: 2 additions & 2 deletions examples/if_watch.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use futures::StreamExt;
use if_watch::IfWatcher;
use if_watch::smol::IfWatcher;
mxinden marked this conversation as resolved.
Show resolved Hide resolved

fn main() {
env_logger::init();
futures::executor::block_on(async {
smol::block_on(async {
let mut set = IfWatcher::new().unwrap();
loop {
let event = set.select_next_some().await;
Expand Down
93 changes: 89 additions & 4 deletions src/apple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,95 @@ use core_foundation::runloop::{kCFRunLoopCommonModes, CFRunLoop};
use core_foundation::string::CFString;
use fnv::FnvHashSet;
use futures::channel::mpsc;
use futures::stream::Stream;
use futures::stream::{FusedStream, Stream};
use futures::Future;
use if_addrs::IfAddr;
use std::collections::VecDeque;
use std::io::Result;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use system_configuration::dynamic_store::{
SCDynamicStore, SCDynamicStoreBuilder, SCDynamicStoreCallBackContext,
};

#[cfg(feature = "tokio")]
pub mod tokio {
//! An interface watcher that uses the `tokio` runtime.
use futures::Future;

#[doc(hidden)]
pub struct TokioRuntime;

impl super::Runtime for TokioRuntime {
fn spawn<F>(f: F)
where
F: Future,
F: Send + 'static,
<F as Future>::Output: Send + 'static,
{
tokio::spawn(f);
}
}

/// Watches for interface changes.
pub type IfWatcher = super::IfWatcher<TokioRuntime>;
}

#[cfg(feature = "smol")]
pub mod smol {
//! An interface watcher that uses the `smol` runtime.

use futures::Future;

#[doc(hidden)]
pub struct SmolRuntime;

impl super::Runtime for SmolRuntime {
fn spawn<F>(f: F)
where
F: Future,
F: Send + 'static,
<F as Future>::Output: Send + 'static,
{
smol::spawn(f).detach();
}
}

/// Watches for interface changes.
pub type IfWatcher = super::IfWatcher<SmolRuntime>;
}

#[derive(Debug)]
pub struct IfWatcher {
pub struct IfWatcher<T> {
addrs: FnvHashSet<IpNet>,
queue: VecDeque<IfEvent>,
rx: mpsc::Receiver<()>,
runtime: PhantomData<T>,
}

impl IfWatcher {
#[doc(hidden)]
pub trait Runtime {
fn spawn<F>(f: F)
where
F: Future,
F: Send + 'static,
<F as Future>::Output: Send + 'static;
}

impl<T> IfWatcher<T>
where
T: Runtime,
{
/// Create a watcher.
pub fn new() -> Result<Self> {
let (tx, rx) = mpsc::channel(1);
std::thread::spawn(|| background_task(tx));
T::spawn(async { background_task(tx) });

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spawning a non-async function is a bit odd. Can we make it async?

I think this will block the executor thread otherwise.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will block the executor thread otherwise.

interesting, why do you say this?

btw was able to test on a mac and confirm background_task's CFRunLoop::run_current() seems to run forever, and per tokio spawn_blocking doc for example:

This function is intended for non-async operations that eventually finish on their own. If you want to spawn an ordinary thread, you should use thread::spawn instead.

so we should probably maintain the plain old std::thread::spawn, wdyt?

Copy link

@thomaseizinger thomaseizinger Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will block the executor thread otherwise.

interesting, why do you say this?

Unless interrupted otherwise, a task will run on an executor thread until it yields, i.e. returns Poll::Pending.

async_std has a mechanism to detect blocked executor threads and spawn new ones: https://async.rs/blog/stop-worrying-about-blocking-the-new-async-std-runtime/

Tokio doesn't though and it is a conscious design decision AFAIK.

btw was able to test on a mac and confirm background_task's CFRunLoop::run_current() seems to run forever, and per tokio spawn_blocking doc for example:

This function is intended for non-async operations that eventually finish on their own. If you want to spawn an ordinary thread, you should use thread::spawn instead.

But you are not using spawn_blocking right? So how is that relevant?

so we should probably maintain the plain old std::thread::spawn, wdyt?

Can we refactor this to be similar to the linux implementation where we register a waker and call it in a callback?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless interrupted otherwise, a task will run on an executor thread until it yields, i.e. returns Poll::Pending.

async_std has a mechanism to detect blocked executor threads and spawn new ones: https://async.rs/blog/stop-worrying->about-blocking-the-new-async-std-runtime/

Tokio doesn't though and it is a conscious design decision AFAIK.

Right yeah, but a function just by not being async doesn't mean it will block the thread.

But you are not using spawn_blocking right? So how is that relevant?

because background_task's CFRunLoop::run_current() seems to run forever, therefore blocking the thread.

Can we refactor this to be similar to the linux implementation where we register a waker and call it in a callback?

Yeah I think so, but there still still need for a blocking thread as CFRunLoop::run_current() is blocking, while the Windows NotifyIpInterfaceChange isn't.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless interrupted otherwise, a task will run on an executor thread until it yields, i.e. returns Poll::Pending.
async_std has a mechanism to detect blocked executor threads and spawn new ones: https://async.rs/blog/stop-worrying->about-blocking-the-new-async-std-runtime/
Tokio doesn't though and it is a conscious design decision AFAIK.

Right yeah, but a function just by not being async doesn't mean it will block the thread.

That is correct. I think we are saying the same thing, it is CFRunLoop::run_current() that is the problem.

But you are not using spawn_blocking right? So how is that relevant?

because background_task's CFRunLoop::run_current() seems to run forever, therefore blocking the thread.

Can we refactor this to be similar to the linux implementation where we register a waker and call it in a callback?

Yeah I think so, but there still still need for a blocking thread as CFRunLoop::run_current() is blocking, while the Windows NotifyIpInterfaceChange isn't.

That really is unfortunate. I guess we will have to stick with a regular thread then for MacOS.

Copy link
Member Author

@jxs jxs Oct 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That really is unfortunate. I guess we will have to stick with a regular thread then for MacOS.

yeah 😞 Reverted to an implementation similar to Windows ptal Thomas. Still, I think this approach makes sense and is more idiomatic the inicial one of overriding features

let mut watcher = Self {
addrs: Default::default(),
queue: Default::default(),
rx,
runtime: PhantomData,
};
watcher.resync()?;
Ok(watcher)
Expand All @@ -55,10 +119,12 @@ impl IfWatcher {
Ok(())
}

/// Iterate over current networks.
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.addrs.iter()
}

/// Poll for an address change event.
pub fn poll_if_event(&mut self, cx: &mut Context) -> Poll<Result<IfEvent>> {
loop {
if let Some(event) = self.queue.pop_front() {
Expand All @@ -74,6 +140,25 @@ impl IfWatcher {
}
}

impl<T> Stream for IfWatcher<T>
where
T: Runtime + Unpin,
{
type Item = Result<IfEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::into_inner(self).poll_if_event(cx).map(Some)
}
}

impl<T> FusedStream for IfWatcher<T>
where
T: Runtime + Unpin,
{
fn is_terminated(&self) -> bool {
false
}
}

fn ifaddr_to_ipnet(addr: IfAddr) -> IpNet {
match addr {
IfAddr::V4(ip) => {
Expand Down
39 changes: 37 additions & 2 deletions src/fallback.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::IfEvent;
use async_io::Timer;
use futures::stream::Stream;
use futures::stream::{FusedStream, Stream};
use if_addrs::IfAddr;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use std::collections::{HashSet, VecDeque};
Expand All @@ -10,6 +10,26 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

#[cfg(feature = "tokio")]
pub mod tokio {
//! An interface watcher.
//! **On this platform there is no difference between `tokio` and `smol` features,**
//! **this was done to maintain the api compatible with other platforms**.

/// Watches for interface changes.
pub type IfWatcher = super::IfWatcher;
}

#[cfg(feature = "smol")]
pub mod smol {
//! An interface watcher.
//! **On this platform there is no difference between `tokio` and `smol` features,**
//! **this was done to maintain the api compatible with other platforms**.

/// Watches for interface changes.
pub type IfWatcher = super::IfWatcher;
}

/// An address set/watcher
#[derive(Debug)]
pub struct IfWatcher {
Expand All @@ -19,7 +39,7 @@ pub struct IfWatcher {
}

impl IfWatcher {
/// Create a watcher
/// Create a watcher.
pub fn new() -> Result<Self> {
Ok(Self {
addrs: Default::default(),
Expand All @@ -45,10 +65,12 @@ impl IfWatcher {
Ok(())
}

/// Iterate over current networks.
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.addrs.iter()
}

/// Poll for an address change event.
pub fn poll_if_event(&mut self, cx: &mut Context) -> Poll<Result<IfEvent>> {
loop {
if let Some(event) = self.queue.pop_front() {
Expand All @@ -64,6 +86,19 @@ impl IfWatcher {
}
}

impl Stream for IfWatcher {
type Item = Result<IfEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::into_inner(self).poll_if_event(cx).map(Some)
}
}

impl FusedStream for IfWatcher {
fn is_terminated(&self) -> bool {
false
}
}

fn ifaddr_to_ipnet(addr: IfAddr) -> IpNet {
match addr {
IfAddr::V4(ip) => {
Expand Down
Loading