From aebc9ca60d598e611b7f1342d69820fbf1033e42 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 15 May 2024 10:39:02 +0200 Subject: [PATCH] Reapply "perf(transport): remove Server::timers (#1784)" (#1800) (#1902) This reverts commit 342e4e785e9edbe0ec4d4c6c6f51716a293e27a6. With https://github.com/mozilla/neqo/pull/1878 merged and https://bugzilla.mozilla.org/show_bug.cgi?id=1895319 available, one can now reapply the patch removing `Server::timers`. More specifically, the actual bug fix on mozilla-central side: ``` rust let output = if self.response_to_send.is_empty() { output } else { // In case there are pending responses to send, make sure a reasonable // callback is returned. const MIN_INTERVAL: Duration = Duration::from_millis(100); match output { Output::None => Output::Callback(MIN_INTERVAL), o @ Output::Datagram(_) => o, Output::Callback(d) => Output::Callback(min(d, MIN_INTERVAL)), } }; ``` See https://phabricator.services.mozilla.com/D209574. --- neqo-common/Cargo.toml | 4 - neqo-common/benches/timer.rs | 39 ---- neqo-common/src/lib.rs | 1 - neqo-common/src/timer.rs | 420 ----------------------------------- neqo-transport/src/server.rs | 88 ++++---- 5 files changed, 42 insertions(+), 510 deletions(-) delete mode 100644 neqo-common/benches/timer.rs delete mode 100644 neqo-common/src/timer.rs diff --git a/neqo-common/Cargo.toml b/neqo-common/Cargo.toml index dc4997bcf1..bce2eab2b7 100644 --- a/neqo-common/Cargo.toml +++ b/neqo-common/Cargo.toml @@ -35,7 +35,3 @@ features = ["timeapi"] [lib] # See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options bench = false - -[[bench]] -name = "timer" -harness = false diff --git a/neqo-common/benches/timer.rs b/neqo-common/benches/timer.rs deleted file mode 100644 index 5ac8019db4..0000000000 --- a/neqo-common/benches/timer.rs +++ /dev/null @@ -1,39 +0,0 @@ -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use std::time::{Duration, Instant}; - -use criterion::{criterion_group, criterion_main, Criterion}; -use neqo_common::timer::Timer; -use test_fixture::now; - -fn benchmark_timer(c: &mut Criterion) { - c.bench_function("drain a timer quickly", |b| { - b.iter_batched_ref( - make_timer, - |(_now, timer)| { - while let Some(t) = timer.next_time() { - assert!(timer.take_next(t).is_some()); - } - }, - criterion::BatchSize::SmallInput, - ); - }); -} - -fn make_timer() -> (Instant, Timer<()>) { - const TIMES: &[u64] = &[1, 2, 3, 5, 8, 13, 21, 34]; - - let now = now(); - let mut timer = Timer::new(now, Duration::from_millis(777), 100); - for &t in TIMES { - timer.add(now + Duration::from_secs(t), ()); - } - (now, timer) -} - -criterion_group!(benches, benchmark_timer); -criterion_main!(benches); diff --git a/neqo-common/src/lib.rs b/neqo-common/src/lib.rs index 3a72425e44..3f2b5f17fa 100644 --- a/neqo-common/src/lib.rs +++ b/neqo-common/src/lib.rs @@ -16,7 +16,6 @@ pub mod hrtime; mod incrdecoder; pub mod log; pub mod qlog; -pub mod timer; pub mod tos; use std::fmt::Write; diff --git a/neqo-common/src/timer.rs b/neqo-common/src/timer.rs deleted file mode 100644 index 3feddb2226..0000000000 --- a/neqo-common/src/timer.rs +++ /dev/null @@ -1,420 +0,0 @@ -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use std::{ - collections::VecDeque, - mem, - time::{Duration, Instant}, -}; - -/// Internal structure for a timer item. -struct TimerItem { - time: Instant, - item: T, -} - -impl TimerItem { - fn time(ti: &Self) -> Instant { - ti.time - } -} - -/// A timer queue. -/// This uses a classic timer wheel arrangement, with some characteristics that might be considered -/// peculiar. Each slot in the wheel is sorted (complexity O(N) insertions, but O(logN) to find cut -/// points). Time is relative, the wheel has an origin time and it is unable to represent times that -/// are more than `granularity * capacity` past that time. -pub struct Timer { - items: Vec>>, - now: Instant, - granularity: Duration, - cursor: usize, -} - -impl Timer { - /// Construct a new wheel at the given granularity, starting at the given time. - /// - /// # Panics - /// - /// When `capacity` is too large to fit in `u32` or `granularity` is zero. - pub fn new(now: Instant, granularity: Duration, capacity: usize) -> Self { - assert!(u32::try_from(capacity).is_ok()); - assert!(granularity.as_nanos() > 0); - let mut items = Vec::with_capacity(capacity); - items.resize_with(capacity, Default::default); - Self { - items, - now, - granularity, - cursor: 0, - } - } - - /// Return a reference to the time of the next entry. - #[must_use] - pub fn next_time(&self) -> Option { - let idx = self.bucket(0); - for i in idx..self.items.len() { - if let Some(t) = self.items[i].front() { - return Some(t.time); - } - } - for i in 0..idx { - if let Some(t) = self.items[i].front() { - return Some(t.time); - } - } - None - } - - /// Get the full span of time that this can cover. - /// Two timers cannot be more than this far apart. - /// In practice, this value is less by one amount of the timer granularity. - #[inline] - #[allow(clippy::cast_possible_truncation)] // guarded by assertion - #[must_use] - pub fn span(&self) -> Duration { - self.granularity * (self.items.len() as u32) - } - - /// For the given `time`, get the number of whole buckets in the future that is. - #[inline] - #[allow(clippy::cast_possible_truncation)] // guarded by assertion - fn delta(&self, time: Instant) -> usize { - // This really should use Duration::div_duration_f??(), but it can't yet. - ((time - self.now).as_nanos() / self.granularity.as_nanos()) as usize - } - - #[inline] - fn time_bucket(&self, time: Instant) -> usize { - self.bucket(self.delta(time)) - } - - #[inline] - fn bucket(&self, delta: usize) -> usize { - debug_assert!(delta < self.items.len()); - (self.cursor + delta) % self.items.len() - } - - /// Slide forward in time by `n * self.granularity`. - #[allow(clippy::cast_possible_truncation, clippy::reversed_empty_ranges)] - // cast_possible_truncation is ok because we have an assertion guard. - // reversed_empty_ranges is to avoid different types on the if/else. - fn tick(&mut self, n: usize) { - let new = self.bucket(n); - let iter = if new < self.cursor { - (self.cursor..self.items.len()).chain(0..new) - } else { - (self.cursor..new).chain(0..0) - }; - for i in iter { - assert!(self.items[i].is_empty()); - } - self.now += self.granularity * (n as u32); - self.cursor = new; - } - - /// Asserts if the time given is in the past or too far in the future. - /// - /// # Panics - /// - /// When `time` is in the past relative to previous calls. - pub fn add(&mut self, time: Instant, item: T) { - assert!(time >= self.now); - // Skip forward quickly if there is too large a gap. - let short_span = self.span() - self.granularity; - if time >= (self.now + self.span() + short_span) { - // Assert that there aren't any items. - for i in &self.items { - debug_assert!(i.is_empty()); - } - self.now = time.checked_sub(short_span).unwrap(); - self.cursor = 0; - } - - // Adjust time forward the minimum amount necessary. - let mut d = self.delta(time); - if d >= self.items.len() { - self.tick(1 + d - self.items.len()); - d = self.items.len() - 1; - } - - let bucket = self.bucket(d); - let ins = match self.items[bucket].binary_search_by_key(&time, TimerItem::time) { - Ok(j) | Err(j) => j, - }; - self.items[bucket].insert(ins, TimerItem { time, item }); - } - - /// Given knowledge of the time an item was added, remove it. - /// This requires use of a predicate that identifies matching items. - /// - /// # Panics - /// Impossible, I think. - pub fn remove(&mut self, time: Instant, mut selector: F) -> Option - where - F: FnMut(&T) -> bool, - { - if time < self.now { - return None; - } - if time > self.now + self.span() { - return None; - } - let bucket = self.time_bucket(time); - let Ok(start_index) = self.items[bucket].binary_search_by_key(&time, TimerItem::time) - else { - return None; - }; - // start_index is just one of potentially many items with the same time. - // Search backwards for a match, ... - for i in (0..=start_index).rev() { - if self.items[bucket][i].time != time { - break; - } - if selector(&self.items[bucket][i].item) { - return Some(self.items[bucket].remove(i).unwrap().item); - } - } - // ... then forwards. - for i in (start_index + 1)..self.items[bucket].len() { - if self.items[bucket][i].time != time { - break; - } - if selector(&self.items[bucket][i].item) { - return Some(self.items[bucket].remove(i).unwrap().item); - } - } - None - } - - /// Take the next item, unless there are no items with - /// a timeout in the past relative to `until`. - pub fn take_next(&mut self, until: Instant) -> Option { - fn maybe_take(v: &mut VecDeque>, until: Instant) -> Option { - if !v.is_empty() && v[0].time <= until { - Some(v.pop_front().unwrap().item) - } else { - None - } - } - - let idx = self.bucket(0); - for i in idx..self.items.len() { - let res = maybe_take(&mut self.items[i], until); - if res.is_some() { - return res; - } - } - for i in 0..idx { - let res = maybe_take(&mut self.items[i], until); - if res.is_some() { - return res; - } - } - None - } - - /// Create an iterator that takes all items until the given time. - /// Note: Items might be removed even if the iterator is not fully exhausted. - pub fn take_until(&mut self, until: Instant) -> impl Iterator { - let get_item = move |x: TimerItem| x.item; - if until >= self.now + self.span() { - // Drain everything, so a clean sweep. - let mut empty_items = Vec::with_capacity(self.items.len()); - empty_items.resize_with(self.items.len(), VecDeque::default); - let mut items = mem::replace(&mut self.items, empty_items); - self.now = until; - self.cursor = 0; - - let tail = items.split_off(self.cursor); - return tail.into_iter().chain(items).flatten().map(get_item); - } - - // Only returning a partial span, so do it bucket at a time. - let delta = self.delta(until); - let mut buckets = Vec::with_capacity(delta + 1); - - // First, the whole buckets. - for i in 0..delta { - let idx = self.bucket(i); - buckets.push(mem::take(&mut self.items[idx])); - } - self.tick(delta); - - // Now we need to split the last bucket, because there might be - // some items with `item.time > until`. - let bucket = &mut self.items[self.cursor]; - let last_idx = match bucket.binary_search_by_key(&until, TimerItem::time) { - Ok(mut m) => { - // If there are multiple values, the search will hit any of them. - // Make sure to get them all. - while m < bucket.len() && bucket[m].time == until { - m += 1; - } - m - } - Err(ins) => ins, - }; - let tail = bucket.split_off(last_idx); - buckets.push(mem::replace(bucket, tail)); - // This tomfoolery with the empty vector ensures that - // the returned type here matches the one above precisely - // without having to invoke the `either` crate. - buckets.into_iter().chain(vec![]).flatten().map(get_item) - } -} - -#[cfg(test)] -mod test { - use std::sync::OnceLock; - - use super::{Duration, Instant, Timer}; - - fn now() -> Instant { - static NOW: OnceLock = OnceLock::new(); - *NOW.get_or_init(Instant::now) - } - - const GRANULARITY: Duration = Duration::from_millis(10); - const CAPACITY: usize = 10; - #[test] - fn create() { - let t: Timer<()> = Timer::new(now(), GRANULARITY, CAPACITY); - assert_eq!(t.span(), Duration::from_millis(100)); - assert_eq!(None, t.next_time()); - } - - #[test] - fn immediate_entry() { - let mut t = Timer::new(now(), GRANULARITY, CAPACITY); - t.add(now(), 12); - assert_eq!(now(), t.next_time().expect("should have an entry")); - let values: Vec<_> = t.take_until(now()).collect(); - assert_eq!(vec![12], values); - } - - #[test] - fn same_time() { - let mut t = Timer::new(now(), GRANULARITY, CAPACITY); - let v1 = 12; - let v2 = 13; - t.add(now(), v1); - t.add(now(), v2); - assert_eq!(now(), t.next_time().expect("should have an entry")); - let values: Vec<_> = t.take_until(now()).collect(); - assert!(values.contains(&v1)); - assert!(values.contains(&v2)); - } - - #[test] - fn add() { - let mut t = Timer::new(now(), GRANULARITY, CAPACITY); - let near_future = now() + Duration::from_millis(17); - let v = 9; - t.add(near_future, v); - assert_eq!(near_future, t.next_time().expect("should return a value")); - assert_eq!( - t.take_until(near_future.checked_sub(Duration::from_millis(1)).unwrap()) - .count(), - 0 - ); - assert!(t - .take_until(near_future + Duration::from_millis(1)) - .any(|x| x == v)); - } - - #[test] - fn add_future() { - let mut t = Timer::new(now(), GRANULARITY, CAPACITY); - let future = now() + Duration::from_millis(117); - let v = 9; - t.add(future, v); - assert_eq!(future, t.next_time().expect("should return a value")); - assert!(t.take_until(future).any(|x| x == v)); - } - - #[test] - fn add_far_future() { - let mut t = Timer::new(now(), GRANULARITY, CAPACITY); - let far_future = now() + Duration::from_millis(892); - let v = 9; - t.add(far_future, v); - assert_eq!(far_future, t.next_time().expect("should return a value")); - assert!(t.take_until(far_future).any(|x| x == v)); - } - - const TIMES: &[Duration] = &[ - Duration::from_millis(40), - Duration::from_millis(91), - Duration::from_millis(6), - Duration::from_millis(3), - Duration::from_millis(22), - Duration::from_millis(40), - ]; - - fn with_times() -> Timer { - let mut t = Timer::new(now(), GRANULARITY, CAPACITY); - for (i, time) in TIMES.iter().enumerate() { - t.add(now() + *time, i); - } - assert_eq!( - now() + *TIMES.iter().min().unwrap(), - t.next_time().expect("should have a time") - ); - t - } - - #[test] - #[allow(clippy::needless_collect)] // false positive - fn multiple_values() { - let mut t = with_times(); - let values: Vec<_> = t.take_until(now() + *TIMES.iter().max().unwrap()).collect(); - for i in 0..TIMES.len() { - assert!(values.contains(&i)); - } - } - - #[test] - #[allow(clippy::needless_collect)] // false positive - fn take_far_future() { - let mut t = with_times(); - let values: Vec<_> = t.take_until(now() + Duration::from_secs(100)).collect(); - for i in 0..TIMES.len() { - assert!(values.contains(&i)); - } - } - - #[test] - fn remove_each() { - let mut t = with_times(); - for (i, time) in TIMES.iter().enumerate() { - assert_eq!(Some(i), t.remove(now() + *time, |&x| x == i)); - } - assert_eq!(None, t.next_time()); - } - - #[test] - fn remove_future() { - let mut t = Timer::new(now(), GRANULARITY, CAPACITY); - let future = now() + Duration::from_millis(117); - let v = 9; - t.add(future, v); - - assert_eq!(Some(v), t.remove(future, |candidate| *candidate == v)); - } - - #[test] - fn remove_too_far_future() { - let mut t = Timer::new(now(), GRANULARITY, CAPACITY); - let future = now() + Duration::from_millis(117); - let too_far_future = now() + t.span() + Duration::from_millis(117); - let v = 9; - t.add(future, v); - - assert_eq!(None, t.remove(too_far_future, |candidate| *candidate == v)); - } -} diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index bce87beb4d..eabb10e25b 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -15,12 +15,12 @@ use std::{ ops::{Deref, DerefMut}, path::PathBuf, rc::{Rc, Weak}, - time::{Duration, Instant}, + time::Instant, }; use neqo_common::{ self as common, event::Provider, hex, qdebug, qerror, qinfo, qlog::NeqoQlog, qtrace, qwarn, - timer::Timer, Datagram, Decoder, Role, + Datagram, Decoder, Role, }; use neqo_crypto::{ encode_ech_config, AntiReplay, Cipher, PrivateKey, PublicKey, ZeroRttCheckResult, @@ -43,14 +43,6 @@ pub enum InitialResult { Retry(Vec), } -/// The size of timer buckets. This is higher than the actual timer granularity -/// as this depends on there being some distribution of events. -const TIMER_GRANULARITY: Duration = Duration::from_millis(4); -/// The number of buckets in the timer. As mentioned in the definition of `Timer`, -/// the granularity and capacity need to multiply to be larger than the largest -/// delay that might be used. That's the idle timeout (currently 30s). -const TIMER_CAPACITY: usize = 16384; - type StateRef = Rc>; type ConnectionTableRef = Rc>>; @@ -58,7 +50,21 @@ type ConnectionTableRef = Rc>>; pub struct ServerConnectionState { c: Connection, active_attempt: Option, - last_timer: Instant, + wake_at: Option, +} + +impl ServerConnectionState { + fn set_wake_at(&mut self, at: Instant) { + self.wake_at = Some(at); + } + + fn needs_waking(&self, now: Instant) -> bool { + self.wake_at.map_or(false, |t| t <= now) + } + + fn woken(&mut self) { + self.wake_at = None; + } } impl Deref for ServerConnectionState { @@ -171,8 +177,8 @@ pub struct Server { active: HashSet, /// The set of connections that need immediate processing. waiting: VecDeque, - /// Outstanding timers for connections. - timers: Timer, + /// The latest [`Output::Callback`] returned from [`Server::process`]. + wake_at: Option, /// Address validation logic, which determines whether we send a Retry. address_validation: Rc>, /// Directory to create qlog traces in @@ -216,10 +222,10 @@ impl Server { connections: Rc::default(), active: HashSet::default(), waiting: VecDeque::default(), - timers: Timer::new(now, TIMER_GRANULARITY, TIMER_CAPACITY), address_validation: Rc::new(RefCell::new(validation)), qlog_dir: None, ech_config: None, + wake_at: None, }) } @@ -257,11 +263,6 @@ impl Server { self.ech_config.as_ref().map_or(&[], |cfg| &cfg.encoded) } - fn remove_timer(&mut self, c: &StateRef) { - let last = c.borrow().last_timer; - self.timers.remove(last, |t| Rc::ptr_eq(t, c)); - } - fn process_connection( &mut self, c: &StateRef, @@ -277,16 +278,12 @@ impl Server { } Output::Callback(delay) => { let next = now + delay; - if next != c.borrow().last_timer { - qtrace!([self], "Change timer to {:?}", next); - self.remove_timer(c); - c.borrow_mut().last_timer = next; - self.timers.add(next, Rc::clone(c)); + c.borrow_mut().set_wake_at(next); + if self.wake_at.map_or(true, |c| c > next) { + self.wake_at = Some(next); } } - Output::None => { - self.remove_timer(c); - } + Output::None => {} } if c.borrow().has_events() { qtrace!([self], "Connection active: {:?}", c); @@ -504,7 +501,7 @@ impl Server { self.setup_connection(&mut c, &attempt_key, initial, orig_dcid); let c = Rc::new(RefCell::new(ServerConnectionState { c, - last_timer: now, + wake_at: None, active_attempt: Some(attempt_key.clone()), })); cid_mgr.borrow_mut().set_connection(&c); @@ -643,24 +640,28 @@ impl Server { return Some(d); } } - qtrace!([self], "No packet to send still, run timers"); - while let Some(c) = self.timers.take_next(now) { - if let Some(d) = self.process_connection(&c, None, now) { - return Some(d); + + qtrace!([self], "No packet to send still, check wake up times"); + loop { + let connection = self + .connections + .borrow() + .values() + .find(|c| c.borrow().needs_waking(now)) + .cloned()?; + let datagram = self.process_connection(&connection, None, now); + connection.borrow_mut().woken(); + if datagram.is_some() { + return datagram; } } - None } - fn next_time(&mut self, now: Instant) -> Option { - if self.waiting.is_empty() { - self.timers.next_time().map(|x| x - now) - } else { - Some(Duration::new(0, 0)) + pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { + if self.wake_at.map_or(false, |c| c <= now) { + self.wake_at = None; } - } - pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { dgram .and_then(|d| self.process_input(d, now)) .or_else(|| self.process_next_output(now)) @@ -668,12 +669,7 @@ impl Server { qtrace!([self], "Send packet: {:?}", d); Output::Datagram(d) }) - .or_else(|| { - self.next_time(now).map(|delay| { - qtrace!([self], "Wait: {:?}", delay); - Output::Callback(delay) - }) - }) + .or_else(|| self.wake_at.take().map(|c| Output::Callback(c - now))) .unwrap_or_else(|| { qtrace!([self], "Go dormant"); Output::None