From ed90974e66fe22166d9740308d52ec2341aab241 Mon Sep 17 00:00:00 2001 From: Philippe Delrieu Date: Wed, 20 Mar 2024 17:15:50 +0100 Subject: [PATCH] Change the way new advertised peer are connected. (#158) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * change connection process for advised peers * add some test * correct some comments * Apply suggestions from code review Co-authored-by: Tuomas Mäkinen <1947505+tuommaki@users.noreply.github.com> --------- Co-authored-by: Tuomas Mäkinen <1947505+tuommaki@users.noreply.github.com> --- crates/node/src/main.rs | 13 +- crates/node/src/networking/p2p/pea2pea.rs | 157 +++++++++++++--------- 2 files changed, 98 insertions(+), 72 deletions(-) diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 7e1453b7..72f87ae1 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -272,16 +272,11 @@ async fn run(config: Arc) -> Result<()> { match addr.to_socket_addrs() { Ok(mut socket_iter) => { if let Some(peer) = socket_iter.next() { - match p2p.node().connect(peer).await { - Ok(_) => { - connected_nodes += 1; - continue; - } - Err(err) => { - tracing::warn!("failed to connect to {}: {}", peer, err); - } + let (connected, fail) = p2p.connect(peer).await; + connected_nodes += connected.len(); + if !fail.is_empty() { + tracing::info!("Peer connection, fail to connect to these peers:{fail:?}"); } - break; } } Err(err) => { diff --git a/crates/node/src/networking/p2p/pea2pea.rs b/crates/node/src/networking/p2p/pea2pea.rs index 9aa3a1a3..b1ef4b0e 100644 --- a/crates/node/src/networking/p2p/pea2pea.rs +++ b/crates/node/src/networking/p2p/pea2pea.rs @@ -37,6 +37,7 @@ pub struct P2P { // This mapping is needed for proper cleanup on OnDisconnect. peer_addr_mapping: Arc>>, peer_list: Arc>>, + current_connecting_peer_list: Arc>>, // Contains corrected peers that are used for asset file download. pub peer_http_port_list: Arc>>>, connect_lock: Arc>, @@ -89,6 +90,7 @@ impl P2P { psk: psk.to_vec(), public_node_key, peer_list: Default::default(), + current_connecting_peer_list: Default::default(), peer_addr_mapping: Default::default(), peer_http_port_list, connect_lock: Arc::new(tokio::sync::Mutex::new(())), @@ -191,6 +193,36 @@ impl P2P { Ok(()) } + + // Connect to peer at `addr`. Subsequent connections to newly discovered nodes are done in sequence, one at a time. + // Peer can be fail because they was 2 simultaneous connection. One is fail and the orher is ok. + pub async fn connect(&self, addr: SocketAddr) -> (BTreeSet, BTreeSet) { + let mut connected_peers = BTreeSet::new(); + let mut failed_peers = BTreeSet::new(); + let mut peer_to_connect_list = vec![addr]; + while !peer_to_connect_list.is_empty() { + // Clear new peer list before connect + self.current_connecting_peer_list.write().await.clear(); + let addr = peer_to_connect_list.pop().unwrap(); //unwrap tested in the while. + match self.node.connect(addr).await { + Ok(_) => { + connected_peers.insert(addr); + { + let peers = self.current_connecting_peer_list.write().await.clone(); + peer_to_connect_list.extend(peers.iter()); + }; + + // Only add peer that are connected. + self.peer_list.write().await.insert(addr); + } + Err(err) => { + tracing::error!("An error occurs during peer:{addr} connection: {err}",); + failed_peers.insert(addr); + } + }; + } + (connected_peers, failed_peers) + } } #[async_trait::async_trait] @@ -359,27 +391,12 @@ impl Handshake for P2P { local_diff.remove(&local_p2p_addr); local_diff.remove(remote_peer_p2p_addr); - let node = self.node(); - // Connect to other not connected peers. - for addr in local_diff { - tokio::spawn({ - let node = node.clone(); - let peer_list = self.peer_list.clone(); - async move { - tracing::debug!("connect to {}", &addr); - - // XXX: If `node.connect(addr)` returns an error, it's omitted because: - // 1.) It's already logged. - // 2.) It often happens because there is already a connection between the 2 peers. - match node.connect(addr).await { - Ok(_) => { - peer_list.write().await.insert(addr); - tracing::debug!("connected to {}", &addr); - } - Err(err) => tracing::error!("failed to connect to {}: {}", &addr, err), - }; - } - }); + //add new peer to node list + { + self.current_connecting_peer_list + .write() + .await + .append(&mut local_diff); } self.peer_http_port_list @@ -572,27 +589,20 @@ mod tests { let (peer2, tx_sender2, mut tx_receiver2) = create_faulty_peer("peer2").await; let (peer3, tx_sender3, mut tx_receiver3) = create_peer("peer3").await; - tracing::debug!("start listening"); peer1.node().start_listening().await.expect("peer1 listen"); peer2.node().start_listening().await.expect("peer2 listen"); peer3.node().start_listening().await.expect("peer3 listen"); - tracing::debug!("connect peer2 to peer1"); - peer2 - .node() - .connect(peer1.node().listening_addr().unwrap()) - .await - .unwrap(); + let (new_peers, fail_peers) = peer2.connect(peer1.node().listening_addr().unwrap()).await; - assert_eq!(peer1.peer_http_port_list.read().await.len(), 1); - assert_eq!(peer2.peer_http_port_list.read().await.len(), 1); + assert_eq!(new_peers.len(), 1); + assert_eq!(fail_peers.len(), 0); - tracing::debug!("connect peer3 to peer1"); - peer3 - .node() - .connect(peer1.node().listening_addr().unwrap()) - .await - .unwrap(); + let (new_peers, fail_peers) = peer3.connect(peer1.node().listening_addr().unwrap()).await; + + assert_eq!(new_peers.len(), 1); + assert_eq!(fail_peers.len(), 1); + assert_eq!(fail_peers.first(), Some(&"128.0.0.1:0".parse().unwrap())); // Wait for the connection fail timeout. tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await; @@ -626,10 +636,14 @@ mod tests { } } - // 3 peers + // 5 peers // Peer2 connect to Peer1. // Peer3 connect to Peer1. // Peer3 automatically connect to Peer2. + // Peer4 connect to Peer3. + // Peer4 automatically connect to Peer1, Peer2. + // Peer5 connect to Peer4. + // Peer5 automatically connect to Peer1, Peer2, Peer3. #[tokio::test] async fn test_peer_list_inter_connection() { //start_logger(LevelFilter::ERROR); @@ -637,46 +651,66 @@ mod tests { let (peer1, tx_sender1, mut tx_receiver1) = create_peer("peer1").await; let (peer2, tx_sender2, mut tx_receiver2) = create_peer("peer2").await; let (peer3, tx_sender3, mut tx_receiver3) = create_peer("peer3").await; + let (peer4, tx_sender4, mut tx_receiver4) = create_peer("peer4").await; + let (peer5, tx_sender5, mut tx_receiver5) = create_peer("peer5").await; let bind_add = peer1.node().start_listening().await.expect("peer1 listen"); let bind_add = peer2.node().start_listening().await.expect("peer2 listen"); let bind_add = peer3.node().start_listening().await.expect("peer3 listen"); + let bind_add = peer4.node().start_listening().await.expect("peer4 listen"); + let bind_add = peer5.node().start_listening().await.expect("peer5 listen"); - peer2 - .node() - .connect(peer1.node().listening_addr().unwrap()) - .await - .unwrap(); + let (new_peers, fail_peers) = peer2.connect(peer1.node().listening_addr().unwrap()).await; + assert_eq!(new_peers.len(), 1); assert_eq!(peer1.peer_http_port_list.read().await.len(), 1); assert_eq!(peer2.peer_http_port_list.read().await.len(), 1); - peer3 - .node() - .connect(peer1.node().listening_addr().unwrap()) - .await - .unwrap(); + let (new_peers, fail_peers) = peer3.connect(peer1.node().listening_addr().unwrap()).await; + assert_eq!(new_peers.len(), 2); + + let (new_peers, fail_peers) = peer4.connect(peer3.node().listening_addr().unwrap()).await; + assert_eq!(new_peers.len(), 3); + + let (new_peers, fail_peers) = peer5.connect(peer4.node().listening_addr().unwrap()).await; + assert_eq!(new_peers.len(), 4); tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - assert_eq!(peer1.peer_http_port_list.read().await.len(), 2); - assert_eq!(peer2.peer_http_port_list.read().await.len(), 2); - assert_eq!(peer3.peer_http_port_list.read().await.len(), 2); + assert_eq!(peer1.peer_http_port_list.read().await.len(), 4); + assert_eq!(peer2.peer_http_port_list.read().await.len(), 4); + assert_eq!(peer3.peer_http_port_list.read().await.len(), 4); + assert_eq!(peer4.peer_http_port_list.read().await.len(), 4); + assert_eq!(peer5.peer_http_port_list.read().await.len(), 4); // Verify connections by sending Tx to all peers. let tx = new_tx(); tx_sender2.send(tx.clone()).unwrap(); let recv_tx = tx_receiver1.recv().await.expect("peer1 recv"); - assert_eq!(into_receive(tx.clone()), recv_tx.0); + let recv_tx = tx_receiver3.recv().await.expect("peer3 recv"); - assert_eq!(into_receive(tx), recv_tx.0); + assert_eq!(into_receive(tx.clone()), recv_tx.0); + + let recv_tx = tx_receiver4.recv().await.expect("peer4 recv"); + assert_eq!(into_receive(tx.clone()), recv_tx.0); + + let recv_tx = tx_receiver5.recv().await.expect("peer5 recv"); + assert_eq!(into_receive(tx.clone()), recv_tx.0); let tx = new_tx(); - tx_sender3.send(tx.clone()).unwrap(); + tx_sender5.send(tx.clone()).unwrap(); + let recv_tx = tx_receiver1.recv().await.expect("peer1 recv"); assert_eq!(into_receive(tx.clone()), recv_tx.0); + let recv_tx = tx_receiver2.recv().await.expect("peer2 recv"); + assert_eq!(into_receive(tx.clone()), recv_tx.0); + + let recv_tx = tx_receiver3.recv().await.expect("peer3 recv"); + assert_eq!(into_receive(tx.clone()), recv_tx.0); + + let recv_tx = tx_receiver4.recv().await.expect("peer4 recv"); assert_eq!(into_receive(tx), recv_tx.0); } @@ -693,11 +727,10 @@ mod tests { let (peer2, tx_sender2, mut tx_receiver2) = create_peer("peer2").await; peer2.node().start_listening().await.expect("peer2 listen"); - peer1 - .node() - .connect(peer2.node().listening_addr().unwrap()) - .await - .unwrap(); + let (new_peers, fail_peers) = + peer1.connect(peer2.node().listening_addr().unwrap()).await; + assert_eq!(new_peers.len(), 1); + assert_eq!(fail_peers.len(), 0); assert_eq!(peer1.peer_http_port_list.read().await.len(), 1); assert_eq!(peer2.peer_http_port_list.read().await.len(), 1); @@ -741,11 +774,9 @@ mod tests { peer1.node().start_listening().await.expect("peer1 listen"); peer2.node().start_listening().await.expect("peer2 listen"); - peer2 - .node() - .connect(peer1.node().listening_addr().unwrap()) - .await - .unwrap(); + let (new_peers, fail_peers) = peer2.connect(peer1.node().listening_addr().unwrap()).await; + assert_eq!(new_peers.len(), 1); + assert_eq!(fail_peers.len(), 0); let tx = new_tx(); tx_sender1.send(tx.clone()).unwrap();