From 270b71a8e450db998de87ff3bc90ef9afba94576 Mon Sep 17 00:00:00 2001 From: Shady Khalifa Date: Mon, 13 Jun 2022 16:06:15 +0200 Subject: [PATCH] Add back P2P Reputation (#322) * Minimal P2P Reputation * keep track of message duplicate * tests working Co-authored-by: Shady Khalifa --- Cargo.lock | 2 +- dkg-gadget/Cargo.toml | 2 +- .../dkg_gossip_engine/network.rs | 118 +++++++++++++----- .../meta_async_rounds/misbehaviour_monitor.rs | 2 +- .../state_machine_wrapper.rs | 3 +- standalone/runtime/src/lib.rs | 1 + 6 files changed, 89 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d2d3ca79b..171bc17db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1446,7 +1446,7 @@ dependencies = [ "hex", "itertools 0.10.3", "libsecp256k1 0.3.5", - "linked_hash_set", + "linked-hash-map", "log", "lru", "multi-party-ecdsa", diff --git a/dkg-gadget/Cargo.toml b/dkg-gadget/Cargo.toml index d8b66a801..3a892d60c 100644 --- a/dkg-gadget/Cargo.toml +++ b/dkg-gadget/Cargo.toml @@ -20,7 +20,7 @@ sha3 = "0.9" hex = "0.4" rand = "0.8.4" strum = { version = "0.21", features = ["derive"] } -linked_hash_set = "0.1.4" +linked-hash-map = "0.5.4" lru = "0.7.0" curv = { package = "curv-kzen", version = "0.9", default-features = false } diff --git a/dkg-gadget/src/meta_async_rounds/dkg_gossip_engine/network.rs b/dkg-gadget/src/meta_async_rounds/dkg_gossip_engine/network.rs index 1c5602b25..f9f403930 100644 --- a/dkg-gadget/src/meta_async_rounds/dkg_gossip_engine/network.rs +++ b/dkg-gadget/src/meta_async_rounds/dkg_gossip_engine/network.rs @@ -43,7 +43,7 @@ use codec::{Decode, Encode}; use dkg_primitives::types::{DKGError, SignedDKGMessage}; use dkg_runtime_primitives::crypto::AuthorityId; use futures::{FutureExt, Stream, StreamExt}; -use linked_hash_set::LinkedHashSet; +use linked_hash_map::LinkedHashMap; use log::{debug, warn}; use parking_lot::RwLock; use sc_network::{config, error, multiaddr, Event, NetworkService, PeerId}; @@ -147,22 +147,20 @@ const MAX_MESSAGE_SIZE: u64 = 16 * 1024 * 1024; /// Maximum number of messages request we keep at any moment. const MAX_PENDING_MESSAGES: usize = 8192; +/// Maximum number of duplicate messages that a single peer can send us. +/// +/// This is to prevent a malicious peer from spamming us with messages. +const MAX_DUPLICATED_MESSAGES_PER_PEER: usize = 5; + #[allow(unused)] mod rep { use sc_peerset::ReputationChange as Rep; - /// Reputation change when a peer sends us any message. - /// - /// This forces node to verify it, thus the negative value here. Once message is verified, - /// reputation change should be refunded with `ANY_MESSAGE_REFUND`. - pub const ANY_MESSAGE: Rep = Rep::new(-(1 << 4), "Any message"); - /// Reputation change when a peer sends us any message that is not invalid. - pub const ANY_MESSAGE_REFUND: Rep = Rep::new(1 << 4, "Any message (refund)"); /// Reputation change when a peer sends us a message that we didn't know about. pub const GOOD_MESSAGE: Rep = Rep::new(1 << 7, "Good message"); - /// Reputation change when a peer sends us a bad message. - pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message"); /// We received an unexpected message packet. pub const UNEXPECTED_MESSAGE: Rep = Rep::new_fatal("Unexpected message packet"); + /// Reputation change when a peer sends us the same message over and over. + pub const DUPLICATE_MESSAGE: Rep = Rep::new(-(1 << 12), "Duplicate message"); } /// Controls the behaviour of a [`GossipHandler`] it is connected to. @@ -254,7 +252,7 @@ pub struct GossipHandler { /// these peers using the message hash while the message is /// received. This prevents that we receive the same message /// multiple times concurrently. - pending_messages_peers: HashMap>, + pending_messages_peers: HashMap>, /// Network service to use to send messages and manage peers. service: Arc>, /// Stream of networking events. @@ -283,6 +281,13 @@ where struct Peer { /// Holds a set of messages known to this peer. known_messages: LruHashSet, + /// a counter of the messages that are received from this peer. + /// + /// Implemented as a HashMap/LruHashMap with the message hash as the key, + /// This is used to track the frequency of the messages received from this peer. + /// If the same message is received from this peer more than + /// `MAX_DUPLICATED_MESSAGES_PER_PEER`, we will flag this peer as malicious. + message_counter: LruHashMap, } impl GossipHandler { @@ -347,6 +352,9 @@ impl GossipHandler { known_messages: LruHashSet::new( NonZeroUsize::new(MAX_KNOWN_MESSAGES).expect("Constant is nonzero"), ), + message_counter: LruHashMap::new( + NonZeroUsize::new(MAX_KNOWN_MESSAGES).expect("Constant is nonzero"), + ), }, ); debug_assert!(_was_in.is_none()); @@ -384,29 +392,41 @@ impl GossipHandler { /// Called when peer sends us new signed DKG message. async fn on_signed_dkg_message(&mut self, who: PeerId, message: SignedDKGMessage) { // Check behavior of the peer. - // TODO: Fill in with proper check of message - let some_check_here = false; - if some_check_here { - self.service.disconnect_peer(who, self.protocol_name.clone()); - self.service.report_peer(who, rep::UNEXPECTED_MESSAGE); - return - } - let now = self.get_latest_block_number(); debug!(target: "dkg", "Received a signed DKG messages from {} @ {:?}", who, now); if let Some(ref mut peer) = self.peers.get_mut(&who) { peer.known_messages.insert(message.message_hash::()); - // self.service.report_peer(who, rep::ANY_MESSAGE); - match self.pending_messages_peers.entry(message.message_hash::()) { Entry::Vacant(entry) => { let _ = self.controller_channel.send(message.clone()); - entry.insert(vec![who]); + entry.insert(HashSet::from([who])); + // This good, this peer is good, they sent us a message we didn't know about. + // we should add some good reputation to them. + self.service.report_peer(who, rep::GOOD_MESSAGE); }, Entry::Occupied(mut entry) => { - entry.get_mut().push(who); + // if we are here, that means this peer sent us a message we already know. + let inserted = entry.get_mut().insert(who); + // and if inserted is `false` that means this peer was already in the set + // hence this not the first time we received this message from the exact same + // peer. + if !inserted { + // we will increment the counter for this message. + let old = peer + .message_counter + .get(&message.message_hash::()) + .cloned() + .unwrap_or(0); + peer.message_counter.insert(message.message_hash::(), old + 1); + // and if we have received this message from the same peer more than + // `MAX_DUPLICATED_MESSAGES_PER_PEER` times, we should report this peer + // as malicious. + if old >= MAX_DUPLICATED_MESSAGES_PER_PEER { + self.service.report_peer(who, rep::DUPLICATE_MESSAGE); + } + } }, } } @@ -465,19 +485,55 @@ impl GossipHandler { } } -/// Wrapper around `LinkedHashSet` with bounded growth. +/// Wrapper around `LinkedHashMap` with bounded growth. /// /// In the limit, for each element inserted the oldest existing element will be removed. #[derive(Debug, Clone)] -pub struct LruHashSet { - set: LinkedHashSet, +pub struct LruHashMap { + inner: LinkedHashMap, limit: NonZeroUsize, } +impl LruHashMap { + /// Create a new `LruHashMap` with the given (exclusive) limit. + pub fn new(limit: NonZeroUsize) -> Self { + Self { inner: LinkedHashMap::new(), limit } + } + + /// Insert element into the map. + /// + /// Returns `true` if this is a new element to the map, `false` otherwise. + /// Maintains the limit of the map by removing the oldest entry if necessary. + /// Inserting the same element will update its LRU position. + pub fn insert(&mut self, k: K, v: V) -> bool { + if self.inner.insert(k, v).is_some() { + if self.inner.len() == usize::from(self.limit) { + self.inner.pop_front(); // remove oldest entry + } + return true + } + false + } + + /// Get an element from the map. + /// Returns `None` if the element is not in the map. + pub fn get(&self, k: &K) -> Option<&V> { + self.inner.get(k) + } +} + +/// Wrapper around `LruHashMap` with bounded growth. +/// +/// In the limit, for each element inserted the oldest existing element will be removed. +#[derive(Debug, Clone)] +pub struct LruHashSet { + set: LruHashMap, +} + impl LruHashSet { /// Create a new `LruHashSet` with the given (exclusive) limit. pub fn new(limit: NonZeroUsize) -> Self { - Self { set: LinkedHashSet::new(), limit } + Self { set: LruHashMap::new(limit) } } /// Insert element into the set. @@ -486,12 +542,6 @@ impl LruHashSet { /// Maintains the limit of the set by removing the oldest entry if necessary. /// Inserting the same element will update its LRU position. pub fn insert(&mut self, e: T) -> bool { - if self.set.insert(e) { - if self.set.len() == usize::from(self.limit) { - self.set.pop_front(); // remove oldest entry - } - return true - } - false + self.set.insert(e, ()) } } diff --git a/dkg-gadget/src/meta_async_rounds/misbehaviour_monitor.rs b/dkg-gadget/src/meta_async_rounds/misbehaviour_monitor.rs index 96f51a1ec..358bceb38 100644 --- a/dkg-gadget/src/meta_async_rounds/misbehaviour_monitor.rs +++ b/dkg-gadget/src/meta_async_rounds/misbehaviour_monitor.rs @@ -40,7 +40,7 @@ impl MisbehaviourMonitor { tokio::time::interval(MISBEHAVIOUR_MONITOR_CHECK_INTERVAL), ); - while let Some(_) = ticker.next().await { + while let Some(_tick) = ticker.next().await { log::trace!("[MisbehaviourMonitor] Performing periodic check ..."); match remote.get_status() { MetaHandlerStatus::Keygen | MetaHandlerStatus::Complete => { diff --git a/dkg-gadget/src/meta_async_rounds/state_machine_wrapper.rs b/dkg-gadget/src/meta_async_rounds/state_machine_wrapper.rs index 64672d0e7..d730f637d 100644 --- a/dkg-gadget/src/meta_async_rounds/state_machine_wrapper.rs +++ b/dkg-gadget/src/meta_async_rounds/state_machine_wrapper.rs @@ -58,8 +58,7 @@ where } fn round_timeout_reached(&mut self) -> Self::Err { - let result = self.sm.round_timeout_reached(); - result + self.sm.round_timeout_reached() } fn is_finished(&self) -> bool { diff --git a/standalone/runtime/src/lib.rs b/standalone/runtime/src/lib.rs index 7da1e04d7..5239bee8e 100644 --- a/standalone/runtime/src/lib.rs +++ b/standalone/runtime/src/lib.rs @@ -455,6 +455,7 @@ impl pallet_election_provider_multi_phase::MinerConfig for WebbMinerConfig { type MaxVotesPerVoter = MaxNominations; type Solution = NposSolution16; + #[allow(unused)] fn solution_weight(v: u32, t: u32, a: u32, d: u32) -> Weight { 0 }