diff --git a/Cargo.lock b/Cargo.lock index 228c736a..8f8deee7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4918,6 +4918,8 @@ dependencies = [ "anyhow", "countersigning_integrity", "happ_builder", + "holo_hash", + "holochain_serialized_bytes", "holochain_types", "log", "rand 0.8.5", diff --git a/bindings/trycp_runner/src/common.rs b/bindings/trycp_runner/src/common.rs index 4bdd8102..682f87b4 100644 --- a/bindings/trycp_runner/src/common.rs +++ b/bindings/trycp_runner/src/common.rs @@ -315,7 +315,7 @@ where let agent_name = agent_name.clone(); async move { let logs = client - .download_logs(agent_name, Some(Duration::from_secs(180))) + .download_logs(agent_name, Some(Duration::from_secs(10 * 60))) .await .context("Failed to download logs")?; Ok(logs) diff --git a/conductor-config-ci.yaml b/conductor-config-ci.yaml index 213fd207..bdf2e045 100644 --- a/conductor-config-ci.yaml +++ b/conductor-config-ci.yaml @@ -1,3 +1,6 @@ +dpki: + dna_path: ~ + no_dpki: true network: network_type: quic_bootstrap transport_pool: diff --git a/scenarios/two_party_countersigning/Cargo.toml b/scenarios/two_party_countersigning/Cargo.toml index a1d8624c..403f4765 100644 --- a/scenarios/two_party_countersigning/Cargo.toml +++ b/scenarios/two_party_countersigning/Cargo.toml @@ -12,6 +12,8 @@ log = { workspace = true } rmp-serde = { workspace = true } holochain_types = { workspace = true } +holo_hash = { workspace = true } +holochain_serialized_bytes = { workspace = true } trycp_wind_tunnel_runner = { workspace = true } countersigning_integrity = { workspace = true } diff --git a/scenarios/two_party_countersigning/src/main.rs b/scenarios/two_party_countersigning/src/main.rs index 2d83884a..b4f95860 100644 --- a/scenarios/two_party_countersigning/src/main.rs +++ b/scenarios/two_party_countersigning/src/main.rs @@ -1,6 +1,8 @@ use anyhow::Context; use countersigning_integrity::Signals; -use holochain_types::prelude::{AgentPubKey, CellId, EntryHash, PreflightResponse}; +use holochain_types::prelude::{ + AgentPubKey, CellId, EntryHash, PreflightRequest, PreflightResponse, +}; use holochain_types::signal::{Signal, SystemSignal}; use rand::seq::SliceRandom; use rand::thread_rng; @@ -117,132 +119,109 @@ fn agent_behaviour_initiate( let new_peers = ctx .runner_context() .executor() - .execute_in_place( - async move { - match initiate_with_peers { - None => { - // No more agents available to call, get a new list. - // This is also the initial condition. - let response = client - .call_zome( - app_port, - cell_id.clone(), - "countersigning", - "list_participants", - (), - None, - ) - .await - .context("Failed to list participants")?; - let mut new_peer_list = response.decode::>().map_err(|e| anyhow::anyhow!("Decoding failure: {:?}", e))?; - new_peer_list.shuffle(&mut thread_rng()); - - // Pause to let Holochain receive more agent links if none are found yet. - if new_peer_list.is_empty() { - tokio::time::sleep(Duration::from_millis(100)).await; - } - - Ok(new_peer_list) + .execute_in_place(async move { + match initiate_with_peers { + None => { + // No more agents available to call, get a new list. + // This is also the initial condition. + let response = client + .call_zome( + app_port, + cell_id.clone(), + "countersigning", + "list_participants", + (), + None, + ) + .await + .context("Failed to list participants")?; + let mut new_peer_list = response + .decode::>() + .map_err(|e| anyhow::anyhow!("Decoding failure: {:?}", e))?; + new_peer_list.shuffle(&mut thread_rng()); + + // Pause to let Holochain receive more agent links if none are found yet. + if new_peer_list.is_empty() { + tokio::time::sleep(Duration::from_millis(100)).await; } - Some(agent_pub_key) => { - log::debug!("Initiating a countersigning session with agent {:?}", agent_pub_key); - let start = Instant::now(); - let initiated = initiated.fetch_add(1, std::sync::atomic::Ordering::Acquire); - reporter.add_custom( - ReportMetric::new("countersigning_session_initiated") - .with_tag("agent", agent_name.clone()) - .with_field("value", initiated as u64), - ); + Ok(new_peer_list) + } + Some(agent_pub_key) => { + log::debug!( + "Initiating a countersigning session with agent {:?}", + agent_pub_key + ); - // Start a countersigning session with the next agent in the list. - let response = client - .call_zome( - app_port, - cell_id.clone(), - "countersigning", - "start_two_party", - agent_pub_key.clone(), - // This should be fairly quick, can increase this if it causes problems - None, + let start = Instant::now(); + let initiated = initiated.fetch_add(1, std::sync::atomic::Ordering::Acquire); + reporter.add_custom( + ReportMetric::new("countersigning_session_initiated") + .with_tag("agent", agent_name.clone()) + .with_field("value", initiated as u64), + ); + + // Start a countersigning session with the next agent in the list. + let response = client + .call_zome( + app_port, + cell_id.clone(), + "countersigning", + "start_two_party", + agent_pub_key.clone(), + // This should be fairly quick, can increase this if it causes problems + None, + ) + .await + .with_context(|| { + format!( + "Failed to start a new countersigning session: {:?}", + agent_pub_key ) - .await - .with_context(|| { - format!("Failed to start a new countersigning session: {:?}", agent_pub_key) - })?; + })?; - let my_preflight_response: PreflightResponse = response.decode().map_err(|e| anyhow::anyhow!("Decoding failure: {:?}", e))?; + let my_preflight_response: PreflightResponse = response + .decode() + .map_err(|e| anyhow::anyhow!("Decoding failure: {:?}", e))?; - let session_times = my_preflight_response.request.session_times.clone(); - let session_timeout = Instant::now().add(Duration::from_millis( - (session_times.end.as_millis() - session_times.start.as_millis()) as u64, - )); - loop { - // Now listen for a signal from the remote with their acceptance - let signal = tokio::time::timeout_at(session_timeout, client.recv_signal()).await.with_context(|| format!("Agent [{agent_pub_key:?}] did not respond to the countersigning request in time, abandoning"))?; - - match signal { - Some(signal) => { - let signal = match rmp_serde::decode::from_slice::(&signal.data).map_err(|e| anyhow::anyhow!("Decoding failure, appears to not be a signal: {:?}", e))? { - Signal::App { - signal, - .. - } => signal, - _ => { - log::debug!("Received a signal that is not an app signal, listening for other signals."); - continue; - } - }; - - let other_response = match signal.clone().into_inner().decode::() { - Ok(Signals::Response(response)) if response.request.app_entry_hash == my_preflight_response.request.app_entry_hash => response, - Ok(_) => { - log::debug!("Received a signal that is not a response for this countersigning session, listening for other signals."); - continue; - } - Err(_) => { - // Must be resilient to unexpected signals, somebody else might try to initiate with us while we're already - // working with another peer. - log::debug!("Got an unexpected signal, will try again. {:?}", signal); - continue; - } - }; - - log::debug!("The other party [{:?}] has accepted the countersigning session.", agent_pub_key); - - let retry_count = complete_session(client.clone(), app_port, cell_id.clone(), my_preflight_response, other_response, session_timeout).await.context("Initiator failed to complete session")?; - - let elapsed = start.elapsed(); - - log::debug!("Completed the countersigning session with agent {:?}", agent_pub_key); - - let initiated_success = initiated_success.fetch_add(1, std::sync::atomic::Ordering::Acquire); - reporter.add_custom( - ReportMetric::new("countersigning_session_initiated_success") - .with_tag("agent", agent_name.clone()) - .with_field("retries", retry_count as u64) - .with_field("value", initiated_success as u64), - ); - reporter.add_custom( - ReportMetric::new("countersigning_session_initiated_duration") - .with_tag("agent", agent_name) - .with_field("value", elapsed.as_secs_f64()), - ); - - break; - } - None => { - log::warn!("No signal received, problem with the remote? Will try again."); - } - } - } + let session_times = my_preflight_response.request.session_times.clone(); + let session_timeout = Instant::now().add(Duration::from_millis( + (session_times.end.as_millis() - session_times.start.as_millis()) as u64, + )); - // Add no new agents, that should only happen when we exhaust the list. - Ok(Vec::with_capacity(0)) + match run_initiated_session( + client.clone(), + my_preflight_response, + session_timeout, + agent_pub_key, + agent_name.clone(), + cell_id.clone(), + app_port, + start, + initiated_success.clone(), + reporter, + ) + .await + { + Ok(_) => { + // Completed successfully + } + Err(e) => { + log::warn!( + "Failed to run initiated session, waiting for it to time out: {:?}", + e + ); + + // TODO this can be replaced with listening for the abandoned signal + tokio::time::sleep_until(session_timeout).await; + } } + + // Add no new agents, that should only happen when we exhaust the list. + Ok(Vec::with_capacity(0)) } } - )?; + })?; ctx.get_mut() .scenario_values @@ -252,6 +231,102 @@ fn agent_behaviour_initiate( Ok(()) } +#[allow(clippy::too_many_arguments)] +async fn run_initiated_session( + client: TryCPClient, + my_preflight_response: PreflightResponse, + session_timeout: Instant, + agent_pub_key: AgentPubKey, + agent_name: String, + cell_id: CellId, + app_port: u16, + start: Instant, + initiated_success: Arc, + reporter: Arc, +) -> anyhow::Result<()> { + loop { + // Now listen for a signal from the remote with their acceptance + let signal = tokio::time::timeout_at(session_timeout, client.recv_signal()).await.with_context(|| format!("Agent [{agent_pub_key:?}] did not respond to the countersigning request in time, abandoning"))?; + + match signal { + Some(signal) => { + let signal = match rmp_serde::decode::from_slice::(&signal.data).map_err( + |e| anyhow::anyhow!("Decoding failure, appears to not be a signal: {:?}", e), + )? { + Signal::App { signal, .. } => signal, + _ => { + log::debug!("Received a signal that is not an app signal, listening for other signals."); + continue; + } + }; + + let other_response = match signal.clone().into_inner().decode::() { + Ok(Signals::Response(response)) + if session_fingerprint(response.request())? + == session_fingerprint(my_preflight_response.request())? => + { + response + } + Ok(_) => { + log::debug!("Received a signal that is not a response for this countersigning session, listening for other signals."); + continue; + } + Err(_) => { + // Must be resilient to unexpected signals, somebody else might try to initiate with us while we're already + // working with another peer. + log::debug!("Got an unexpected signal, will try again. {:?}", signal); + continue; + } + }; + + log::debug!( + "The other party [{:?}] has accepted the countersigning session.", + agent_pub_key + ); + + let retry_count = complete_session( + client.clone(), + app_port, + cell_id.clone(), + my_preflight_response, + other_response, + session_timeout, + ) + .await + .context("Initiator failed to complete session")?; + + let elapsed = start.elapsed(); + + log::debug!( + "Completed the countersigning session with agent {:?}", + agent_pub_key + ); + + let initiated_success = + initiated_success.fetch_add(1, std::sync::atomic::Ordering::Acquire); + reporter.add_custom( + ReportMetric::new("countersigning_session_initiated_success") + .with_tag("agent", agent_name.clone()) + .with_field("retries", retry_count as u64) + .with_field("value", initiated_success as u64), + ); + reporter.add_custom( + ReportMetric::new("countersigning_session_initiated_duration") + .with_tag("agent", agent_name) + .with_field("value", elapsed.as_secs_f64()), + ); + + break; + } + None => { + log::warn!("No signal received, problem with the remote? Will try again."); + } + } + } + + Ok(()) +} + fn agent_behaviour_participate( ctx: &mut AgentContext>, ) -> HookResult { @@ -488,6 +563,15 @@ async fn complete_session( Ok(retry_count) } +fn session_fingerprint(preflight_request: &PreflightRequest) -> anyhow::Result> { + let hash = holo_hash::encode::blake2b_256( + &holochain_serialized_bytes::encode(preflight_request) + .context("Failed to serialize preflight request")?, + ); + + Ok(hash) +} + async fn await_countersigning_success( client: TryCPClient, session_entry_hash: EntryHash,