Skip to content

Commit

Permalink
Implement basic voting protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
quackzar committed May 14, 2024
1 parent fd514d2 commit 46d6010
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 45 deletions.
46 changes: 23 additions & 23 deletions src/net/agency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use futures::Future;
use itertools::Itertools;

pub trait Broadcast {
type Error: Error + Send + 'static;
type BroadcastError: Error + Send + 'static;
// type Error: Error + 'static;

/// Broadcast a message to all other parties.
Expand All @@ -38,7 +38,7 @@ pub trait Broadcast {
fn broadcast(
&mut self,
msg: &(impl serde::Serialize + Sync),
) -> impl std::future::Future<Output = Result<(), Self::Error>>;
) -> impl std::future::Future<Output = Result<(), Self::BroadcastError>>;

/// Broadcast a message to all parties and await their messages
/// Messages are ordered by their index.
Expand All @@ -50,7 +50,7 @@ pub trait Broadcast {
fn symmetric_broadcast<T>(
&mut self,
msg: T,
) -> impl Future<Output = Result<Vec<T>, Self::Error>>
) -> impl Future<Output = Result<Vec<T>, Self::BroadcastError>>
where
T: serde::Serialize + serde::de::DeserializeOwned + Sync;

Expand All @@ -60,27 +60,27 @@ pub trait Broadcast {
fn recv_from<T: serde::de::DeserializeOwned>(
&mut self,
idx: usize,
) -> impl Future<Output = Result<T, Self::Error>>;
) -> impl Future<Output = Result<T, Self::BroadcastError>>;

/// Size of the broadcasting network including yourself,
/// as such there is n-1 outgoing connections
fn size(&self) -> usize;
}

impl<'a, B: Broadcast> Broadcast for &'a mut B {
type Error = B::Error;
type BroadcastError = B::BroadcastError;

fn broadcast(
&mut self,
msg: &(impl serde::Serialize + Sync),
) -> impl std::future::Future<Output = Result<(), Self::Error>> {
) -> impl std::future::Future<Output = Result<(), Self::BroadcastError>> {
(**self).broadcast(msg)
}

fn symmetric_broadcast<T>(
&mut self,
msg: T,
) -> impl Future<Output = Result<Vec<T>, Self::Error>>
) -> impl Future<Output = Result<Vec<T>, Self::BroadcastError>>
where
T: serde::Serialize + serde::de::DeserializeOwned + Sync {
(**self).symmetric_broadcast(msg)
Expand All @@ -89,7 +89,7 @@ impl<'a, B: Broadcast> Broadcast for &'a mut B {
fn recv_from<T: serde::de::DeserializeOwned>(
&mut self,
idx: usize,
) -> impl Future<Output = Result<T, Self::Error>> {
) -> impl Future<Output = Result<T, Self::BroadcastError>> {
(**self).recv_from(idx)
}

Expand All @@ -100,7 +100,7 @@ impl<'a, B: Broadcast> Broadcast for &'a mut B {

// TODO: Possible rename this trait as it's name is confusing.
pub trait Unicast {
type Error: Error + Send + 'static;
type UnicastError: Error + Send + 'static;

/// Unicast messages to each party
///
Expand All @@ -113,7 +113,7 @@ pub trait Unicast {
fn unicast(
&mut self,
msgs: &[impl serde::Serialize + Sync],
) -> impl std::future::Future<Output = Result<(), Self::Error>>;
) -> impl std::future::Future<Output = Result<(), Self::UnicastError>>;

/// Unicast a message to each party and await their messages
/// Messages are supposed to be in order, meaning message `i`
Expand All @@ -123,7 +123,7 @@ pub trait Unicast {
fn symmetric_unicast<T>(
&mut self,
msgs: Vec<T>,
) -> impl Future<Output = Result<Vec<T>, Self::Error>>
) -> impl Future<Output = Result<Vec<T>, Self::UnicastError>>
where
T: serde::Serialize + serde::de::DeserializeOwned + Sync;

Expand All @@ -134,37 +134,37 @@ pub trait Unicast {
/// Returns: A list sorted by the connections (skipping yourself)
fn receive_all<T: serde::de::DeserializeOwned>(
&mut self,
) -> impl Future<Output = Result<Vec<T>, Self::Error>>;
) -> impl Future<Output = Result<Vec<T>, Self::UnicastError>>;

/// Size of the unicasting network including yourself,
/// as such there is n-1 outgoing connections
fn size(&self) -> usize;
}

impl<'a, U: Unicast> Unicast for &'a mut U {
type Error = U::Error;
type UnicastError = U::UnicastError;

fn size(&self) -> usize {
(**self).size()
}

fn receive_all<T: serde::de::DeserializeOwned>(
&mut self,
) -> impl Future<Output = Result<Vec<T>, Self::Error>> {
) -> impl Future<Output = Result<Vec<T>, Self::UnicastError>> {
(**self).receive_all()
}

fn unicast(
&mut self,
msgs: &[impl serde::Serialize + Sync],
) -> impl std::future::Future<Output = Result<(), Self::Error>> {
) -> impl std::future::Future<Output = Result<(), Self::UnicastError>> {
(**self).unicast(msgs)
}

fn symmetric_unicast<T>(
&mut self,
msgs: Vec<T>,
) -> impl Future<Output = Result<Vec<T>, Self::Error>>
) -> impl Future<Output = Result<Vec<T>, Self::UnicastError>>
where
T: serde::Serialize + serde::de::DeserializeOwned + Sync {
(**self).symmetric_unicast(msgs)
Expand All @@ -189,7 +189,7 @@ impl<B: Broadcast, D: Digest> VerifiedBroadcast<B, D> {
pub async fn symmetric_broadcast<T>(
&mut self,
msg: T,
) -> Result<Vec<T>, BroadcastVerificationError<B::Error>>
) -> Result<Vec<T>, BroadcastVerificationError<B::BroadcastError>>
where
T: serde::Serialize + serde::de::DeserializeOwned,
{
Expand Down Expand Up @@ -261,7 +261,7 @@ impl<B: Broadcast, D: Digest> VerifiedBroadcast<B, D> {
pub async fn broadcast<T>(
&mut self,
msg: &T,
) -> Result<(), BroadcastVerificationError<B::Error>>
) -> Result<(), BroadcastVerificationError<B::BroadcastError>>
where
T: serde::Serialize,
{
Expand Down Expand Up @@ -291,7 +291,7 @@ impl<B: Broadcast, D: Digest> VerifiedBroadcast<B, D> {
pub async fn recv_from<T>(
&mut self,
party: usize,
) -> Result<T, BroadcastVerificationError<B::Error>>
) -> Result<T, BroadcastVerificationError<B::BroadcastError>>
where
T: serde::de::DeserializeOwned,
{
Expand Down Expand Up @@ -352,13 +352,13 @@ pub enum BroadcastVerificationError<E> {
}

impl<B: Broadcast, D: Digest> Broadcast for VerifiedBroadcast<B, D> {
type Error = BroadcastVerificationError<<B as Broadcast>::Error>;
type BroadcastError = BroadcastVerificationError<<B as Broadcast>::BroadcastError>;

async fn broadcast(&mut self, msg: &impl serde::Serialize) -> Result<(), Self::Error> {
async fn broadcast(&mut self, msg: &impl serde::Serialize) -> Result<(), Self::BroadcastError> {
self.broadcast(msg).await
}

async fn symmetric_broadcast<T>(&mut self, msg: T) -> Result<Vec<T>, Self::Error>
async fn symmetric_broadcast<T>(&mut self, msg: T) -> Result<Vec<T>, Self::BroadcastError>
where
T: serde::Serialize + serde::de::DeserializeOwned,
{
Expand All @@ -368,7 +368,7 @@ impl<B: Broadcast, D: Digest> Broadcast for VerifiedBroadcast<B, D> {
fn recv_from<T: serde::de::DeserializeOwned>(
&mut self,
idx: usize,
) -> impl Future<Output = Result<T, Self::Error>> {
) -> impl Future<Output = Result<T, Self::BroadcastError>> {
self.recv_from(idx)
}

Expand Down
12 changes: 6 additions & 6 deletions src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,24 @@ impl<'a, C: Channel> Channel for &'a mut C {

/// Tune to a specific channel
pub trait Tuneable {
type Error: Error + Send + 'static;
type TuningError: Error + Send + 'static;

fn id(&self) -> usize;

fn recv_from<T: serde::de::DeserializeOwned>(
&mut self,
idx: usize,
) -> impl Future<Output = Result<T, Self::Error>>;
) -> impl Future<Output = Result<T, Self::TuningError>>;

fn send_to<T: serde::Serialize + Sync>(
&mut self,
idx: usize,
msg: &T,
) -> impl std::future::Future<Output = Result<(), Self::Error>>;
) -> impl std::future::Future<Output = Result<(), Self::TuningError>>;
}

impl<'a, R: Tuneable + ?Sized> Tuneable for &'a mut R {
type Error = R::Error;
type TuningError = R::TuningError;

fn id(&self) -> usize {
(**self).id()
Expand All @@ -98,15 +98,15 @@ impl<'a, R: Tuneable + ?Sized> Tuneable for &'a mut R {
fn recv_from<T: serde::de::DeserializeOwned>(
&mut self,
idx: usize,
) -> impl Future<Output = Result<T, Self::Error>> {
) -> impl Future<Output = Result<T, Self::TuningError>> {
(**self).recv_from(idx)
}

fn send_to<T: serde::Serialize + Sync>(
&mut self,
idx: usize,
msg: &T,
) -> impl std::future::Future<Output = Result<(), Self::Error>> {
) -> impl std::future::Future<Output = Result<(), Self::TuningError>> {
(**self).send_to(idx, msg)
}
}
Expand Down
57 changes: 46 additions & 11 deletions src/net/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,26 +298,61 @@ impl<C: SplitChannel> Network<C> {
let n = n + 1; // We need to count ourselves.
0..n
}

async fn drop_party(_id: usize) -> Result<(), ()> {
todo!("Initiate a drop vote");

}
}

// TODO: Implement a handler system, such we can handle ad-hoc requests from other parties,
// such as dropping/kicking other parties for cheating, being slow, etc.
//
// Outline:
// Currently we do not handle any unprepared protocols, but only expected 'happy path' behaviour.
// In case of protocols or communication failure we return an error, but we do not provide a solution.
// The current expection is for the downstream user to handle it themselves, instead of doing
// something automatic. However, we currently do not have any methods for removing parties,
// and if we had we still need all other parties to come to the same conclusion.
//
// First,
// we are in need of some voting protocols, such we can initiate a 'drop' vote.
// How this should be done is not clear-cut, but we can start with something simple.
//
// Second,
// We need the ability to handle these ad-hoc as these voting requests can come at any point in
// time, while we could check for votes manually each 'round' between each protocol, this would not
// probably not suffice.
//
// We can use asyncness to run these in the back, racing/selecting between the happy-path and
// incoming vote requests. A handler should be able to be set up so the policies/code for how to
// react on these requests should be handled.
//
// The issue here becomes that we need to process channels before deciding to relay them or handle
// it with the vote handler.
//
//
//


impl<C: SplitChannel> Unicast for Network<C> {
type Error = NetworkError<C::Error>;
type UnicastError = NetworkError<C::Error>;

#[tracing::instrument(skip_all)]
async fn unicast(&mut self, msgs: &[impl serde::Serialize + Sync]) -> Result<(), Self::Error> {
async fn unicast(&mut self, msgs: &[impl serde::Serialize + Sync]) -> Result<(), Self::UnicastError> {
self.unicast(msgs).await
}

#[tracing::instrument(skip_all)]
async fn symmetric_unicast<T>(&mut self, msgs: Vec<T>) -> Result<Vec<T>, Self::Error>
async fn symmetric_unicast<T>(&mut self, msgs: Vec<T>) -> Result<Vec<T>, Self::UnicastError>
where
T: serde::Serialize + serde::de::DeserializeOwned + Sync,
{
self.symmetric_unicast(msgs).await
}

#[tracing::instrument(skip_all)]
async fn receive_all<T: serde::de::DeserializeOwned>(&mut self) -> Result<Vec<T>, Self::Error> {
async fn receive_all<T: serde::de::DeserializeOwned>(&mut self) -> Result<Vec<T>, Self::UnicastError> {
self.receive_all().await
}

Expand All @@ -327,15 +362,15 @@ impl<C: SplitChannel> Unicast for Network<C> {
}

impl<C: SplitChannel> Broadcast for Network<C> {
type Error = NetworkError<C::Error>;
type BroadcastError = NetworkError<C::Error>;

#[tracing::instrument(skip_all)]
async fn broadcast(&mut self, msg: &(impl serde::Serialize + Sync)) -> Result<(), Self::Error> {
async fn broadcast(&mut self, msg: &(impl serde::Serialize + Sync)) -> Result<(), Self::BroadcastError> {
self.broadcast(msg).await
}

#[tracing::instrument(skip_all)]
async fn symmetric_broadcast<T>(&mut self, msg: T) -> Result<Vec<T>, Self::Error>
async fn symmetric_broadcast<T>(&mut self, msg: T) -> Result<Vec<T>, Self::BroadcastError>
where
T: serde::Serialize + serde::de::DeserializeOwned + Sync,
{
Expand All @@ -345,7 +380,7 @@ impl<C: SplitChannel> Broadcast for Network<C> {
fn recv_from<T: serde::de::DeserializeOwned>(
&mut self,
idx: usize,
) -> impl Future<Output = Result<T, Self::Error>> {
) -> impl Future<Output = Result<T, Self::BroadcastError>> {
Tuneable::recv_from(self, idx).map_err(move |e| NetworkError {
id: idx as u32,
source: e,
Expand All @@ -358,7 +393,7 @@ impl<C: SplitChannel> Broadcast for Network<C> {
}

impl<C: SplitChannel> Tuneable for Network<C> {
type Error = C::Error;
type TuningError = C::Error;

fn id(&self) -> usize {
self.index
Expand All @@ -367,7 +402,7 @@ impl<C: SplitChannel> Tuneable for Network<C> {
async fn recv_from<T: serde::de::DeserializeOwned>(
&mut self,
idx: usize,
) -> Result<T, Self::Error> {
) -> Result<T, Self::TuningError> {
let idx = self.id_to_index(idx);
self.connections[idx].recv().await
}
Expand All @@ -376,7 +411,7 @@ impl<C: SplitChannel> Tuneable for Network<C> {
&mut self,
idx: usize,
msg: &T,
) -> Result<(), Self::Error> {
) -> Result<(), Self::TuningError> {
let idx = self.id_to_index(idx);
self.connections[idx].send(msg).await
}
Expand Down
8 changes: 4 additions & 4 deletions src/protocols/cutnchoose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,16 @@ mod test {
}

impl Broadcast for SingleBroadcast {
type Error = <DuplexConnection as Channel>::Error;
type BroadcastError = <DuplexConnection as Channel>::Error;

fn broadcast(
&mut self,
msg: &(impl serde::Serialize + Sync),
) -> impl std::future::Future<Output = Result<(), Self::Error>> {
) -> impl std::future::Future<Output = Result<(), Self::BroadcastError>> {
self.inner.send(msg)
}

async fn symmetric_broadcast<T>(&mut self, msg: T) -> Result<Vec<T>, Self::Error>
async fn symmetric_broadcast<T>(&mut self, msg: T) -> Result<Vec<T>, Self::BroadcastError>
where
T: serde::Serialize + serde::de::DeserializeOwned + Sync,
{
Expand All @@ -134,7 +134,7 @@ mod test {
fn recv_from<T: serde::de::DeserializeOwned>(
&mut self,
_idx: usize,
) -> impl futures::prelude::Future<Output = Result<T, Self::Error>> {
) -> impl futures::prelude::Future<Output = Result<T, Self::BroadcastError>> {
self.inner.recv()
}

Expand Down
1 change: 1 addition & 0 deletions src/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod beaver;
pub mod cointoss;
mod cutnchoose;
mod rand;
pub mod voting;
Loading

0 comments on commit 46d6010

Please sign in to comment.