Skip to content

Commit

Permalink
Add StreamExtDebounced
Browse files Browse the repository at this point in the history
  • Loading branch information
bim9262 committed Feb 24, 2025
1 parent 51a8ec6 commit 3829b3f
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 18 deletions.
6 changes: 1 addition & 5 deletions src/blocks/bluetooth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,7 @@ impl DeviceMonitor {

loop {
select! {
_ = updates.next() => {
// avoid too frequent updates
let _ = tokio::time::timeout(Duration::from_millis(100), async {
loop { let _ = updates.next().await; }
}).await;
_ = updates.next_debounced() => {
debug!("Got update for device");
return Ok(());
}
Expand Down
7 changes: 1 addition & 6 deletions src/blocks/external_ip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,7 @@ pub async fn run(config: &Config, api: &CommonApi) -> Result<()> {
select! {
_ = sleep(config.interval.0) => (),
_ = api.wait_for_update_request() => (),
_ = stream.next() => {
// avoid too frequent updates
let _ = tokio::time::timeout(Duration::from_millis(100), async {
loop { let _ = stream.next().await; }
}).await;
}
_ = stream.next_debounced() => ()
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/blocks/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ pub use futures::{Stream, StreamExt};
pub use smart_default::SmartDefault;

pub use async_trait::async_trait;

pub use crate::util::StreamExtDebounced as _;
8 changes: 1 addition & 7 deletions src/blocks/privacy/v4l.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,7 @@ impl PrivacyMonitor for Monitor<'_> {
break;
}
},
_ = self.stream.next() => {
// avoid too frequent updates
let _ = tokio::time::timeout(Duration::from_millis(100), async {
loop { let _ = self.stream.next().await; }
}).await;
break;
}
_ = self.stream.next_debounced() => break
}
}
Ok(())
Expand Down
17 changes: 17 additions & 0 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,23 @@ pub fn default<T: Default>() -> T {
Default::default()
}

pub trait StreamExtDebounced: futures::StreamExt {
fn next_debounced(&mut self) -> impl Future<Output = Option<Self::Item>>;
}

impl<T: futures::StreamExt + Unpin> StreamExtDebounced for T {
async fn next_debounced(&mut self) -> Option<Self::Item> {
let mut result = self.next().await?;
let mut noop_ctx = std::task::Context::from_waker(std::task::Waker::noop());
loop {
match self.poll_next_unpin(&mut noop_ctx) {
std::task::Poll::Ready(Some(x)) => result = x,
std::task::Poll::Ready(None) | std::task::Poll::Pending => return Some(result),
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 3829b3f

Please sign in to comment.