From 132ba54b6ff7406204b866eb644594201d6be8d7 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Fri, 13 Sep 2024 15:25:46 -0400 Subject: [PATCH] dc: Prune peer map if we've pruned the id map (#2319) * Prune peer list if we've pruned the path secret This ensures that when we prune entries from the map, we bound the size of the peer set as well. This also updates our test coverage to include a small-size map, which causes us to need to tweak a few assertions to account for the now semi-random removals. I'm not very happy with the result, but I think it's OK for now. Mid-term it probably makes sense to figure out a better way to make sure our logic is sound (e.g., probability of removing a recently added peer should be near zero). * Bound requested_handshakes too --- dc/s2n-quic-dc/src/path/secret/map.rs | 69 ++++++++++++++++------ dc/s2n-quic-dc/src/path/secret/map/test.rs | 58 ++++++++++++++++-- 2 files changed, 105 insertions(+), 22 deletions(-) diff --git a/dc/s2n-quic-dc/src/path/secret/map.rs b/dc/s2n-quic-dc/src/path/secret/map.rs index 4bea24e8b..792bbacad 100644 --- a/dc/s2n-quic-dc/src/path/secret/map.rs +++ b/dc/s2n-quic-dc/src/path/secret/map.rs @@ -195,7 +195,7 @@ impl Cleaner { // handshake to happen. This handshake will happen on the next request for this // particular peer. if entry.rehandshake_time() <= now { - state.requested_handshakes.pin().insert(entry.peer); + state.request_handshake(entry.peer); } // Not retired. @@ -208,24 +208,47 @@ impl Cleaner { } } - if state.ids.len() <= (state.max_capacity * 95 / 100) { - return; - } - - let mut to_remove = std::cmp::max(state.ids.len() / 100, 1); - let guard = state.ids.guard(); - for (id, entry) in state.ids.iter(&guard) { - if to_remove > 0 { - // Only remove with the minimum epoch. This hopefully means that we will remove - // fairly stale entries. - if entry.used_at.load(Ordering::Relaxed) == minimum { - state.ids.remove(id, &guard); - to_remove -= 1; + if state.ids.len() > (state.max_capacity * 95 / 100) { + let mut to_remove = std::cmp::max(state.ids.len() / 100, 1); + let guard = state.ids.guard(); + for (id, entry) in state.ids.iter(&guard) { + if to_remove > 0 { + // Only remove with the minimum epoch. This hopefully means that we will remove + // fairly stale entries. + if entry.used_at.load(Ordering::Relaxed) == minimum { + state.ids.remove(id, &guard); + to_remove -= 1; + } + } else { + break; } - } else { - break; } } + + // Prune the peer list of any entries that no longer have a corresponding `id` entry. + // + // This ensures that the peer list is naturally bounded in size by the size of the `id` + // set, and relies on precisely the same mechanisms for eviction. + { + let ids = state.ids.pin(); + state + .peers + .pin() + .retain(|_, entry| ids.contains_key(entry.secret.id())); + } + + // Iteration order should be effectively random, so this effectively just prunes the list + // periodically. 5000 is chosen arbitrarily to make sure this isn't a memory leak. Note + // that peers the application is actively interested in will typically bypass this list, so + // this is mostly a risk of delaying regular re-handshaking with very large cardinalities. + // + // FIXME: Long or mid-term it likely makes sense to replace this data structure with a + // fuzzy set of some kind and/or just moving to immediate background handshake attempts. + let mut count = 0; + state.requested_handshakes.pin().retain(|_| { + count += 1; + count < 5000 + }); } fn epoch(&self) -> u64 { @@ -235,6 +258,16 @@ impl Cleaner { const EVICTION_CYCLES: u64 = if cfg!(test) { 0 } else { 10 }; +impl State { + fn request_handshake(&self, peer: SocketAddr) { + // The length is reset as part of cleanup to 5000. + let handshakes = self.requested_handshakes.pin(); + if handshakes.len() <= 6000 { + handshakes.insert(peer); + } + } +} + impl Map { pub fn new(signer: stateless_reset::Signer) -> Self { // FIXME: Avoid unwrap and the whole socket. @@ -385,7 +418,7 @@ impl Map { // FIXME: More actively schedule a new handshake. // See comment on requested_handshakes for details. - self.state.requested_handshakes.pin().insert(state.peer); + self.state.request_handshake(state.peer); } pub fn handle_control_packet(&self, packet: &control::Packet) { @@ -433,7 +466,7 @@ impl Map { // // Handshaking will be rate limited per destination peer (and at least // de-duplicated). - self.state.requested_handshakes.pin().insert(state.peer); + self.state.request_handshake(state.peer); } control::Packet::UnknownPathSecret(_) => unreachable!(), } diff --git a/dc/s2n-quic-dc/src/path/secret/map/test.rs b/dc/s2n-quic-dc/src/path/secret/map/test.rs index 1f5e47fa5..5fda426f8 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/test.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/test.rs @@ -154,7 +154,12 @@ impl Model { let ids = state.state.ids.guard(); self.invariants.retain(|invariant| { if let Invariant::ContainsId(id) = invariant { - if state.state.ids.get(id, &ids).unwrap().retired.retired() { + if state + .state + .ids + .get(id, &ids) + .map_or(true, |v| v.retired.retired()) + { invalidated.push(*id); return false; } @@ -191,12 +196,18 @@ impl Model { let peers = state.peers.guard(); let ids = state.ids.guard(); for invariant in self.invariants.iter() { + // We avoid assertions for contains() if we're running the small capacity test, since + // they are likely broken -- we semi-randomly evict peers in that case. match invariant { Invariant::ContainsIp(ip) => { - assert!(state.peers.contains_key(ip, &peers), "{:?}", ip); + if state.max_capacity != 5 { + assert!(state.peers.contains_key(ip, &peers), "{:?}", ip); + } } Invariant::ContainsId(id) => { - assert!(state.ids.contains_key(id, &ids), "{:?}", id); + if state.max_capacity != 5 { + assert!(state.ids.contains_key(id, &ids), "{:?}", id); + } } Invariant::IdRemoved(id) => { assert!( @@ -207,6 +218,16 @@ impl Model { } } } + + // All entries in the peer set should also be in the `ids` set (which is actively garbage + // collected). + for (_, entry) in state.peers.iter(&peers) { + assert!( + state.ids.contains_key(entry.secret.id(), &ids), + "{:?} not present in IDs", + entry.secret.id() + ); + } } } @@ -236,7 +257,36 @@ fn has_duplicate_pids(ops: &[Operation]) -> bool { fn check_invariants() { bolero::check!() .with_type::>() - .with_iterations(100_000) + .with_iterations(10_000) + .for_each(|input: &Vec| { + if has_duplicate_pids(input) { + // Ignore this attempt. + return; + } + + let mut model = Model::default(); + let signer = stateless_reset::Signer::new(b"secret"); + let mut map = Map::new(signer); + + // Avoid background work interfering with testing. + map.state.cleaner.stop(); + + Arc::get_mut(&mut map.state).unwrap().max_capacity = 5; + + model.check_invariants(&map.state); + + for op in input { + model.perform(*op, &map); + model.check_invariants(&map.state); + } + }) +} + +#[test] +fn check_invariants_no_overflow() { + bolero::check!() + .with_type::>() + .with_iterations(10_000) .for_each(|input: &Vec| { if has_duplicate_pids(input) { // Ignore this attempt.