From 1c888caabcd8ef87ccaccd2b35443fad2d2526b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tuomas=20M=C3=A4kinen?= Date: Wed, 20 Mar 2024 07:52:48 +0200 Subject: [PATCH 1/3] Revert "Introduce dummy lock to sequentialize P2P.connect() (#155)" This reverts commit 92ade6d7e84a1513432fa081f4767b81f2a1d8ef. --- crates/node/src/networking/p2p/pea2pea.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/crates/node/src/networking/p2p/pea2pea.rs b/crates/node/src/networking/p2p/pea2pea.rs index 9aa3a1a3..89a097e7 100644 --- a/crates/node/src/networking/p2p/pea2pea.rs +++ b/crates/node/src/networking/p2p/pea2pea.rs @@ -39,7 +39,6 @@ pub struct P2P { peer_list: Arc>>, // Contains corrected peers that are used for asset file download. pub peer_http_port_list: Arc>>>, - connect_lock: Arc>, http_port: Option, nat_listen_addr: Option, @@ -91,7 +90,6 @@ impl P2P { peer_list: Default::default(), peer_addr_mapping: Default::default(), peer_http_port_list, - connect_lock: Arc::new(tokio::sync::Mutex::new(())), http_port, nat_listen_addr, tx_sender, @@ -196,9 +194,6 @@ impl P2P { #[async_trait::async_trait] impl Handshake for P2P { async fn perform_handshake(&self, mut conn: Connection) -> io::Result { - // Sequentialize P2P connections to avoid deadlock. - let _lock = self.connect_lock.lock().await; - tracing::debug!("starting handshake"); // Create the noise objects. From 99eaa503620aabb2a88024156f3000af5ca7634c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tuomas=20M=C3=A4kinen?= Date: Wed, 20 Mar 2024 08:15:34 +0200 Subject: [PATCH 2/3] Try to isolate the noise_states.write() lock in handshake --- crates/node/src/networking/p2p/pea2pea.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/node/src/networking/p2p/pea2pea.rs b/crates/node/src/networking/p2p/pea2pea.rs index 89a097e7..f5657f0e 100644 --- a/crates/node/src/networking/p2p/pea2pea.rs +++ b/crates/node/src/networking/p2p/pea2pea.rs @@ -207,8 +207,10 @@ impl Handshake for P2P { let (noise_state, _) = noise::handshake_xx(self, &mut conn, noise_builder, Bytes::new()).await?; - // Save the noise state to be reused by Reading and Writing. - self.noise_states.write().insert(conn.addr(), noise_state); + { + // Save the noise state to be reused by Reading and Writing. + self.noise_states.write().insert(conn.addr(), noise_state); + } tracing::debug!("noise handshake finished. exchanging node information"); From 8d16a6c6adeec02f3bdc5a00b858e616871bc1b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tuomas=20M=C3=A4kinen?= Date: Wed, 20 Mar 2024 08:33:52 +0200 Subject: [PATCH 3/3] Add test case that locks up --- crates/node/src/networking/p2p/pea2pea.rs | 117 ++++++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/crates/node/src/networking/p2p/pea2pea.rs b/crates/node/src/networking/p2p/pea2pea.rs index f5657f0e..ea13b626 100644 --- a/crates/node/src/networking/p2p/pea2pea.rs +++ b/crates/node/src/networking/p2p/pea2pea.rs @@ -469,6 +469,7 @@ mod tests { use gevulot_node::types::transaction::Payload; use gevulot_node::types::transaction::Received; use libsecp256k1::SecretKey; + use rand::Rng; use rand::{rngs::StdRng, SeedableRng}; use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::UnboundedSender; @@ -755,6 +756,122 @@ mod tests { assert_eq!(into_receive(tx), recv_tx.0); } + // Test 20 peers that connect each other. + #[tokio::test] + async fn test_twenty_peers() { + start_logger(LevelFilter::DEBUG); + + struct Peer { + p2p: P2P, + tx_sender: UnboundedSender>, + tx_receiver: UnboundedReceiver<( + Transaction, + Option>>, + )>, + } + + impl Peer { + fn new( + tuple: ( + P2P, + UnboundedSender>, + UnboundedReceiver<( + Transaction, + Option>>, + )>, + ), + ) -> Peer { + Peer { + p2p: tuple.0, + tx_sender: tuple.1, + tx_receiver: tuple.2, + } + } + } + + tracing::debug!("creating P2P beacon node"); + let p2p_beacon_node = Peer::new(create_peer(&format!("p2p-beacon")).await); + + tracing::debug!("P2P beacon node starts listening"); + p2p_beacon_node + .p2p + .node() + .start_listening() + .await + .expect("p2p_beacon_node start_listening()"); + tracing::debug!("P2P beacon node is listening"); + + let num_nodes = 20; + let mut peers = Vec::with_capacity(num_nodes); + + for i in 1..num_nodes { + tracing::debug!("creating peer {i}"); + let peer = Peer::new(create_peer(&format!("peer{i}")).await); + + tracing::debug!("peer{i} starts listening"); + peer.p2p + .node() + .start_listening() + .await + .expect("peer{i] listen"); + tracing::debug!("peer{i} is listening"); + + tracing::debug!("peer{i} connects to P2P beacon node"); + peer.p2p + .node() + .connect( + p2p_beacon_node + .p2p + .node() + .listening_addr() + .expect("p2p_beacon_node listening_addr()"), + ) + .await + .expect(&format!("peer{i} connect p2p_beacon_node")); + tracing::debug!("peer{i} is connected to P2P beacon node"); + + peers.push(peer); + } + + // Let the dust settle down a bit. + //tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + tracing::info!("All {num_nodes} P2P nodes started"); + + // Broadcast 50 transactions into the network. + for i in 1..50 { + // Create transaction. + let tx = new_tx(); + + // Pick random node to submit it. + let node_idx = rand::thread_rng().gen_range(0..peers.len()); + + tracing::debug!("sending transaction {i} from peer {node_idx}"); + peers[node_idx] + .tx_sender + .send(tx.clone()) + .expect(&format!("peer{node_idx} send()")); + tracing::debug!("transaction {i} sent from peer {node_idx}"); + + for i in 1..peers.len() { + if i == node_idx { + continue; + } + + tracing::debug!("receiving transaction {i} on peer {i}"); + let recv_tx = peers[i] + .tx_receiver + .recv() + .await + .expect(&format!("peers[{i}] recv()")); + tracing::debug!("received transaction {i} on peer {i}"); + assert_eq!(into_receive(tx.clone()), recv_tx.0); + } + + //tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + } + fn new_tx() -> Transaction { let rng = &mut StdRng::from_entropy();