Skip to content

Commit

Permalink
Improve countersigning scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
ThetaSinner committed Sep 30, 2024
1 parent 84a5576 commit c5447f3
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 118 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bindings/trycp_runner/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ where
let agent_name = agent_name.clone();
async move {
let logs = client
.download_logs(agent_name, Some(Duration::from_secs(180)))
.download_logs(agent_name, Some(Duration::from_secs(10 * 60)))
.await
.context("Failed to download logs")?;
Ok(logs)
Expand Down
3 changes: 3 additions & 0 deletions conductor-config-ci.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
dpki:
dna_path: ~
no_dpki: true
network:
network_type: quic_bootstrap
transport_pool:
Expand Down
2 changes: 2 additions & 0 deletions scenarios/two_party_countersigning/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ log = { workspace = true }
rmp-serde = { workspace = true }

holochain_types = { workspace = true }
holo_hash = { workspace = true }
holochain_serialized_bytes = { workspace = true }
trycp_wind_tunnel_runner = { workspace = true }
countersigning_integrity = { workspace = true }

Expand Down
318 changes: 201 additions & 117 deletions scenarios/two_party_countersigning/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use anyhow::Context;
use countersigning_integrity::Signals;
use holochain_types::prelude::{AgentPubKey, CellId, EntryHash, PreflightResponse};
use holochain_types::prelude::{
AgentPubKey, CellId, EntryHash, PreflightRequest, PreflightResponse,
};
use holochain_types::signal::{Signal, SystemSignal};
use rand::seq::SliceRandom;
use rand::thread_rng;
Expand Down Expand Up @@ -117,132 +119,109 @@ fn agent_behaviour_initiate(
let new_peers = ctx
.runner_context()
.executor()
.execute_in_place(
async move {
match initiate_with_peers {
None => {
// No more agents available to call, get a new list.
// This is also the initial condition.
let response = client
.call_zome(
app_port,
cell_id.clone(),
"countersigning",
"list_participants",
(),
None,
)
.await
.context("Failed to list participants")?;
let mut new_peer_list = response.decode::<Vec<AgentPubKey>>().map_err(|e| anyhow::anyhow!("Decoding failure: {:?}", e))?;
new_peer_list.shuffle(&mut thread_rng());

// Pause to let Holochain receive more agent links if none are found yet.
if new_peer_list.is_empty() {
tokio::time::sleep(Duration::from_millis(100)).await;
}

Ok(new_peer_list)
.execute_in_place(async move {
match initiate_with_peers {
None => {
// No more agents available to call, get a new list.
// This is also the initial condition.
let response = client
.call_zome(
app_port,
cell_id.clone(),
"countersigning",
"list_participants",
(),
None,
)
.await
.context("Failed to list participants")?;
let mut new_peer_list = response
.decode::<Vec<AgentPubKey>>()
.map_err(|e| anyhow::anyhow!("Decoding failure: {:?}", e))?;
new_peer_list.shuffle(&mut thread_rng());

// Pause to let Holochain receive more agent links if none are found yet.
if new_peer_list.is_empty() {
tokio::time::sleep(Duration::from_millis(100)).await;
}
Some(agent_pub_key) => {
log::debug!("Initiating a countersigning session with agent {:?}", agent_pub_key);

let start = Instant::now();
let initiated = initiated.fetch_add(1, std::sync::atomic::Ordering::Acquire);
reporter.add_custom(
ReportMetric::new("countersigning_session_initiated")
.with_tag("agent", agent_name.clone())
.with_field("value", initiated as u64),
);
Ok(new_peer_list)
}
Some(agent_pub_key) => {
log::debug!(
"Initiating a countersigning session with agent {:?}",
agent_pub_key
);

// Start a countersigning session with the next agent in the list.
let response = client
.call_zome(
app_port,
cell_id.clone(),
"countersigning",
"start_two_party",
agent_pub_key.clone(),
// This should be fairly quick, can increase this if it causes problems
None,
let start = Instant::now();
let initiated = initiated.fetch_add(1, std::sync::atomic::Ordering::Acquire);
reporter.add_custom(
ReportMetric::new("countersigning_session_initiated")
.with_tag("agent", agent_name.clone())
.with_field("value", initiated as u64),
);

// Start a countersigning session with the next agent in the list.
let response = client
.call_zome(
app_port,
cell_id.clone(),
"countersigning",
"start_two_party",
agent_pub_key.clone(),
// This should be fairly quick, can increase this if it causes problems
None,
)
.await
.with_context(|| {
format!(
"Failed to start a new countersigning session: {:?}",
agent_pub_key
)
.await
.with_context(|| {
format!("Failed to start a new countersigning session: {:?}", agent_pub_key)
})?;
})?;

let my_preflight_response: PreflightResponse = response.decode().map_err(|e| anyhow::anyhow!("Decoding failure: {:?}", e))?;
let my_preflight_response: PreflightResponse = response
.decode()
.map_err(|e| anyhow::anyhow!("Decoding failure: {:?}", e))?;

let session_times = my_preflight_response.request.session_times.clone();
let session_timeout = Instant::now().add(Duration::from_millis(
(session_times.end.as_millis() - session_times.start.as_millis()) as u64,
));
loop {
// Now listen for a signal from the remote with their acceptance
let signal = tokio::time::timeout_at(session_timeout, client.recv_signal()).await.with_context(|| format!("Agent [{agent_pub_key:?}] did not respond to the countersigning request in time, abandoning"))?;

match signal {
Some(signal) => {
let signal = match rmp_serde::decode::from_slice::<Signal>(&signal.data).map_err(|e| anyhow::anyhow!("Decoding failure, appears to not be a signal: {:?}", e))? {
Signal::App {
signal,
..
} => signal,
_ => {
log::debug!("Received a signal that is not an app signal, listening for other signals.");
continue;
}
};

let other_response = match signal.clone().into_inner().decode::<Signals>() {
Ok(Signals::Response(response)) if response.request.app_entry_hash == my_preflight_response.request.app_entry_hash => response,
Ok(_) => {
log::debug!("Received a signal that is not a response for this countersigning session, listening for other signals.");
continue;
}
Err(_) => {
// Must be resilient to unexpected signals, somebody else might try to initiate with us while we're already
// working with another peer.
log::debug!("Got an unexpected signal, will try again. {:?}", signal);
continue;
}
};

log::debug!("The other party [{:?}] has accepted the countersigning session.", agent_pub_key);

let retry_count = complete_session(client.clone(), app_port, cell_id.clone(), my_preflight_response, other_response, session_timeout).await.context("Initiator failed to complete session")?;

let elapsed = start.elapsed();

log::debug!("Completed the countersigning session with agent {:?}", agent_pub_key);

let initiated_success = initiated_success.fetch_add(1, std::sync::atomic::Ordering::Acquire);
reporter.add_custom(
ReportMetric::new("countersigning_session_initiated_success")
.with_tag("agent", agent_name.clone())
.with_field("retries", retry_count as u64)
.with_field("value", initiated_success as u64),
);
reporter.add_custom(
ReportMetric::new("countersigning_session_initiated_duration")
.with_tag("agent", agent_name)
.with_field("value", elapsed.as_secs_f64()),
);

break;
}
None => {
log::warn!("No signal received, problem with the remote? Will try again.");
}
}
}
let session_times = my_preflight_response.request.session_times.clone();
let session_timeout = Instant::now().add(Duration::from_millis(
(session_times.end.as_millis() - session_times.start.as_millis()) as u64,
));

// Add no new agents, that should only happen when we exhaust the list.
Ok(Vec::with_capacity(0))
match run_initiated_session(
client.clone(),
my_preflight_response,
session_timeout,
agent_pub_key,
agent_name.clone(),
cell_id.clone(),
app_port,
start,
initiated_success.clone(),
reporter,
)
.await
{
Ok(_) => {
// Completed successfully
}
Err(e) => {
log::warn!(
"Failed to run initiated session, waiting for it to time out: {:?}",
e
);

// TODO this can be replaced with listening for the abandoned signal
tokio::time::sleep_until(session_timeout).await;
}
}

// Add no new agents, that should only happen when we exhaust the list.
Ok(Vec::with_capacity(0))
}
}
)?;
})?;

ctx.get_mut()
.scenario_values
Expand All @@ -252,6 +231,102 @@ fn agent_behaviour_initiate(
Ok(())
}

#[allow(clippy::too_many_arguments)]
async fn run_initiated_session(
client: TryCPClient,
my_preflight_response: PreflightResponse,
session_timeout: Instant,
agent_pub_key: AgentPubKey,
agent_name: String,
cell_id: CellId,
app_port: u16,
start: Instant,
initiated_success: Arc<AtomicUsize>,
reporter: Arc<Reporter>,
) -> anyhow::Result<()> {
loop {
// Now listen for a signal from the remote with their acceptance
let signal = tokio::time::timeout_at(session_timeout, client.recv_signal()).await.with_context(|| format!("Agent [{agent_pub_key:?}] did not respond to the countersigning request in time, abandoning"))?;

match signal {
Some(signal) => {
let signal = match rmp_serde::decode::from_slice::<Signal>(&signal.data).map_err(
|e| anyhow::anyhow!("Decoding failure, appears to not be a signal: {:?}", e),
)? {
Signal::App { signal, .. } => signal,
_ => {
log::debug!("Received a signal that is not an app signal, listening for other signals.");
continue;
}
};

let other_response = match signal.clone().into_inner().decode::<Signals>() {
Ok(Signals::Response(response))
if session_fingerprint(response.request())?
== session_fingerprint(my_preflight_response.request())? =>
{
response
}
Ok(_) => {
log::debug!("Received a signal that is not a response for this countersigning session, listening for other signals.");
continue;
}
Err(_) => {
// Must be resilient to unexpected signals, somebody else might try to initiate with us while we're already
// working with another peer.
log::debug!("Got an unexpected signal, will try again. {:?}", signal);
continue;
}
};

log::debug!(
"The other party [{:?}] has accepted the countersigning session.",
agent_pub_key
);

let retry_count = complete_session(
client.clone(),
app_port,
cell_id.clone(),
my_preflight_response,
other_response,
session_timeout,
)
.await
.context("Initiator failed to complete session")?;

let elapsed = start.elapsed();

log::debug!(
"Completed the countersigning session with agent {:?}",
agent_pub_key
);

let initiated_success =
initiated_success.fetch_add(1, std::sync::atomic::Ordering::Acquire);
reporter.add_custom(
ReportMetric::new("countersigning_session_initiated_success")
.with_tag("agent", agent_name.clone())
.with_field("retries", retry_count as u64)
.with_field("value", initiated_success as u64),
);
reporter.add_custom(
ReportMetric::new("countersigning_session_initiated_duration")
.with_tag("agent", agent_name)
.with_field("value", elapsed.as_secs_f64()),
);

break;
}
None => {
log::warn!("No signal received, problem with the remote? Will try again.");
}
}
}

Ok(())
}

fn agent_behaviour_participate(
ctx: &mut AgentContext<TryCPRunnerContext, TryCPAgentContext<ScenarioValues>>,
) -> HookResult {
Expand Down Expand Up @@ -488,6 +563,15 @@ async fn complete_session(
Ok(retry_count)
}

fn session_fingerprint(preflight_request: &PreflightRequest) -> anyhow::Result<Vec<u8>> {
let hash = holo_hash::encode::blake2b_256(
&holochain_serialized_bytes::encode(preflight_request)
.context("Failed to serialize preflight request")?,
);

Ok(hash)
}

async fn await_countersigning_success(
client: TryCPClient,
session_entry_hash: EntryHash,
Expand Down

0 comments on commit c5447f3

Please sign in to comment.