Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

P2p locking adjustments #157

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 121 additions & 7 deletions crates/node/src/networking/p2p/pea2pea.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ pub struct P2P {
peer_list: Arc<tokio::sync::RwLock<BTreeSet<SocketAddr>>>,
// Contains corrected peers that are used for asset file download.
pub peer_http_port_list: Arc<tokio::sync::RwLock<HashMap<SocketAddr, Option<u16>>>>,
connect_lock: Arc<tokio::sync::Mutex<()>>,

http_port: Option<u16>,
nat_listen_addr: Option<SocketAddr>,
Expand Down Expand Up @@ -91,7 +90,6 @@ impl P2P {
peer_list: Default::default(),
peer_addr_mapping: Default::default(),
peer_http_port_list,
connect_lock: Arc::new(tokio::sync::Mutex::new(())),
http_port,
nat_listen_addr,
tx_sender,
Expand Down Expand Up @@ -196,9 +194,6 @@ impl P2P {
#[async_trait::async_trait]
impl Handshake for P2P {
async fn perform_handshake(&self, mut conn: Connection) -> io::Result<Connection> {
// Sequentialize P2P connections to avoid deadlock.
let _lock = self.connect_lock.lock().await;

tracing::debug!("starting handshake");

// Create the noise objects.
Expand All @@ -212,8 +207,10 @@ impl Handshake for P2P {
let (noise_state, _) =
noise::handshake_xx(self, &mut conn, noise_builder, Bytes::new()).await?;

// Save the noise state to be reused by Reading and Writing.
self.noise_states.write().insert(conn.addr(), noise_state);
{
// Save the noise state to be reused by Reading and Writing.
self.noise_states.write().insert(conn.addr(), noise_state);
}

tracing::debug!("noise handshake finished. exchanging node information");

Expand Down Expand Up @@ -472,6 +469,7 @@ mod tests {
use gevulot_node::types::transaction::Payload;
use gevulot_node::types::transaction::Received;
use libsecp256k1::SecretKey;
use rand::Rng;
use rand::{rngs::StdRng, SeedableRng};
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
Expand Down Expand Up @@ -758,6 +756,122 @@ mod tests {
assert_eq!(into_receive(tx), recv_tx.0);
}

// Test 20 peers that connect each other.
#[tokio::test]
async fn test_twenty_peers() {
start_logger(LevelFilter::DEBUG);

struct Peer {
p2p: P2P,
tx_sender: UnboundedSender<Transaction<Validated>>,
tx_receiver: UnboundedReceiver<(
Transaction<Received>,
Option<oneshot::Sender<Result<(), EventProcessError>>>,
)>,
}

impl Peer {
fn new(
tuple: (
P2P,
UnboundedSender<Transaction<Validated>>,
UnboundedReceiver<(
Transaction<Received>,
Option<oneshot::Sender<Result<(), EventProcessError>>>,
)>,
),
) -> Peer {
Peer {
p2p: tuple.0,
tx_sender: tuple.1,
tx_receiver: tuple.2,
}
}
}

tracing::debug!("creating P2P beacon node");
let p2p_beacon_node = Peer::new(create_peer(&format!("p2p-beacon")).await);

tracing::debug!("P2P beacon node starts listening");
p2p_beacon_node
.p2p
.node()
.start_listening()
.await
.expect("p2p_beacon_node start_listening()");
tracing::debug!("P2P beacon node is listening");

let num_nodes = 20;
let mut peers = Vec::with_capacity(num_nodes);

for i in 1..num_nodes {
tracing::debug!("creating peer {i}");
let peer = Peer::new(create_peer(&format!("peer{i}")).await);

tracing::debug!("peer{i} starts listening");
peer.p2p
.node()
.start_listening()
.await
.expect("peer{i] listen");
tracing::debug!("peer{i} is listening");

tracing::debug!("peer{i} connects to P2P beacon node");
peer.p2p
.node()
.connect(
p2p_beacon_node
.p2p
.node()
.listening_addr()
.expect("p2p_beacon_node listening_addr()"),
)
.await
.expect(&format!("peer{i} connect p2p_beacon_node"));
tracing::debug!("peer{i} is connected to P2P beacon node");

peers.push(peer);
}

// Let the dust settle down a bit.
//tokio::time::sleep(std::time::Duration::from_secs(5)).await;
Comment on lines +836 to +837
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@musitdev in the current setting, if there's no wait / sleep between connects and transaction sends, this test locks up on my laptop. What happens is that those derivative new node connects are happening concurrently with the transaction sends and this causes the deadlock somehow.

If you uncomment this sleep and all the connects take place before the transaction sends, so far I couldn't reproduce the deadlock anymore.


tracing::info!("All {num_nodes} P2P nodes started");

// Broadcast 50 transactions into the network.
for i in 1..50 {
// Create transaction.
let tx = new_tx();

// Pick random node to submit it.
let node_idx = rand::thread_rng().gen_range(0..peers.len());

tracing::debug!("sending transaction {i} from peer {node_idx}");
peers[node_idx]
.tx_sender
.send(tx.clone())
.expect(&format!("peer{node_idx} send()"));
tracing::debug!("transaction {i} sent from peer {node_idx}");

for i in 1..peers.len() {
if i == node_idx {
continue;
}

tracing::debug!("receiving transaction {i} on peer {i}");
let recv_tx = peers[i]
.tx_receiver
.recv()
.await
.expect(&format!("peers[{i}] recv()"));
tracing::debug!("received transaction {i} on peer {i}");
assert_eq!(into_receive(tx.clone()), recv_tx.0);
}

//tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}

fn new_tx() -> Transaction<Validated> {
let rng = &mut StdRng::from_entropy();

Expand Down
Loading