Skip to content

Commit

Permalink
Merge branch 'async' of https://github.com/webb-tools/dkg-substrate i…
Browse files Browse the repository at this point in the history
…nto async
  • Loading branch information
drewstone committed Jun 15, 2022
2 parents d331dba + 50197bd commit 5c7de60
Show file tree
Hide file tree
Showing 37 changed files with 1,938 additions and 1,672 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.

use crate::{
messages::{dkg_message::sign_and_send_messages, public_key_gossip::gossip_public_key},
meta_async_rounds::{dkg_gossip_engine::GossipEngineIface, BatchKey},
async_protocols::BatchKey,
gossip_engine::GossipEngineIface,
gossip_messages::{dkg_message::sign_and_send_messages, public_key_gossip::gossip_public_key},
persistence::store_localkey,
proposal::{get_signed_proposal, make_signed_proposal},
proposal::get_signed_proposal,
storage::proposals::save_signed_proposals_in_storage,
worker::{DKGWorker, HasLatestHeader, KeystoreExt},
Client, DKGApi, DKGKeystore,
Expand All @@ -31,7 +32,7 @@ use dkg_primitives::{
};
use dkg_runtime_primitives::{
crypto::{AuthorityId, Public},
AggregatedPublicKeys, AuthoritySet, Proposal, ProposalKind, UnsignedProposal,
AggregatedPublicKeys, AuthoritySet, Proposal, UnsignedProposal,
};
use multi_party_ecdsa::protocols::multi_party_ecdsa::gg_2020::{
party_i::SignatureRecid, state_machine::keygen::LocalKey,
Expand All @@ -40,15 +41,12 @@ use parking_lot::{Mutex, RwLock};
use sc_client_api::Backend;
use sc_keystore::LocalKeystore;
use sp_arithmetic::traits::AtLeast32BitUnsigned;
use sp_runtime::{
generic::BlockId,
traits::{Block, Header, NumberFor},
};
use std::{collections::HashMap, marker::PhantomData, path::PathBuf, sync::Arc};
use sp_runtime::traits::{Block, NumberFor};
use std::{collections::HashMap, fmt::Debug, marker::PhantomData, path::PathBuf, sync::Arc};

#[auto_impl::auto_impl(Arc,&,&mut)]
pub trait BlockChainIface: Send + Sync {
type Clock: AtLeast32BitUnsigned + Copy + Send + Sync;
pub trait BlockchainInterface: Send + Sync {
type Clock: Debug + AtLeast32BitUnsigned + Copy + Send + Sync;
type GossipEngine: GossipEngineIface;

fn verify_signature_against_authorities(
Expand All @@ -67,38 +65,13 @@ pub trait BlockChainIface: Send + Sync {
fn gossip_public_key(&self, key: DKGPublicKeyMessage) -> Result<(), DKGError>;
fn store_public_key(&self, key: LocalKey<Secp256k1>, round_id: RoundId)
-> Result<(), DKGError>;
fn get_jailed_signers_inner(&self) -> Result<Vec<Public>, DKGError>;
fn get_authority_set(&self) -> &Vec<Public>;
/// Get the unjailed signers
fn get_unjailed_signers(&self) -> Result<Vec<u16>, DKGError> {
let jailed_signers = self.get_jailed_signers_inner()?;
Ok(self
.get_authority_set()
.iter()
.enumerate()
.filter(|(_, key)| !jailed_signers.contains(key))
.map(|(i, _)| u16::try_from(i + 1).unwrap_or_default())
.collect())
}

/// Get the jailed signers
fn get_jailed_signers(&self) -> Result<Vec<u16>, DKGError> {
let jailed_signers = self.get_jailed_signers_inner()?;
Ok(self
.get_authority_set()
.iter()
.enumerate()
.filter(|(_, key)| jailed_signers.contains(key))
.map(|(i, _)| u16::try_from(i + 1).unwrap_or_default())
.collect())
}

fn get_gossip_engine(&self) -> Option<&Self::GossipEngine>;
/// Returns the present time
fn now(&self) -> Self::Clock;
}

pub struct DKGIface<B: Block, BE, C, GE> {
pub struct DKGProtocolEngine<B: Block, BE, C, GE> {
pub backend: Arc<BE>,
pub latest_header: Arc<RwLock<Option<B::Header>>>,
pub client: Arc<C>,
Expand All @@ -115,7 +88,25 @@ pub struct DKGIface<B: Block, BE, C, GE> {
pub local_key_path: Option<PathBuf>,
}

impl<B, BE, C, GE> BlockChainIface for DKGIface<B, BE, C, GE>
impl<B: Block, BE, C, GE> KeystoreExt for DKGProtocolEngine<B, BE, C, GE> {
fn get_keystore(&self) -> &DKGKeystore {
&self.keystore
}
}

impl<B, BE, C, GE> HasLatestHeader<B> for DKGProtocolEngine<B, BE, C, GE>
where
B: Block,
BE: Backend<B>,
GE: GossipEngineIface,
C: Client<B, BE>,
{
fn get_latest_header(&self) -> &Arc<RwLock<Option<B::Header>>> {
&self.latest_header
}
}

impl<B, BE, C, GE> BlockchainInterface for DKGProtocolEngine<B, BE, C, GE>
where
B: Block,
C: Client<B, BE> + 'static,
Expand Down Expand Up @@ -216,18 +207,6 @@ where
Ok(())
}

fn get_jailed_signers_inner(&self) -> Result<Vec<Public>, DKGError> {
let now = self.latest_header.read().clone().ok_or_else(|| DKGError::CriticalError {
reason: "latest header does not exist!".to_string(),
})?;
let at: BlockId<B> = BlockId::hash(now.hash());
Ok(self
.client
.runtime_api()
.get_signing_jailed(&at, (&*self.best_authorities).clone())
.unwrap_or_default())
}

fn get_authority_set(&self) -> &Vec<Public> {
&*self.best_authorities
}
Expand All @@ -240,92 +219,3 @@ where
self.get_latest_block_number()
}
}

pub(crate) type VoteResults =
Arc<Mutex<HashMap<BatchKey, Vec<(Proposal, SignatureRecid, BigInt)>>>>;

#[derive(Clone)]
pub struct TestDummyIface {
pub sender: tokio::sync::mpsc::UnboundedSender<SignedDKGMessage<Public>>,
pub best_authorities: Arc<Vec<Public>>,
pub authority_public_key: Arc<Public>,
// key is party_index, hash of data. Needed especially for local unit tests
pub vote_results: VoteResults,
pub keygen_key: Arc<Mutex<Option<LocalKey<Secp256k1>>>>,
}

impl BlockChainIface for TestDummyIface {
type Clock = u32;
type GossipEngine = ();

fn verify_signature_against_authorities(
&self,
message: Arc<SignedDKGMessage<Public>>,
) -> Result<DKGMessage<Public>, DKGError> {
Ok(message.msg.clone())
}

fn sign_and_send_msg(&self, unsigned_msg: DKGMessage<Public>) -> Result<(), DKGError> {
log::info!(
"Sending message through iface id={}",
unsigned_msg.payload.async_proto_only_get_sender_id().unwrap()
);
let faux_signed_message = SignedDKGMessage { msg: unsigned_msg, signature: None };
self.sender
.send(faux_signed_message)
.map_err(|err| DKGError::GenericError { reason: err.to_string() })?;
Ok(())
}

fn process_vote_result(
&self,
signature_rec: SignatureRecid,
unsigned_proposal: UnsignedProposal,
round_id: RoundId,
batch_key: BatchKey,
message: BigInt,
) -> Result<(), DKGError> {
let mut lock = self.vote_results.lock();
let _payload_key = unsigned_proposal.key;
let signature = convert_signature(&signature_rec).ok_or_else(|| {
DKGError::CriticalError { reason: "Unable to serialize signature".to_string() }
})?;

let finished_round = DKGSignedPayload {
key: round_id.encode(),
payload: "Webb".encode(),
signature: signature.encode(),
};

let prop = make_signed_proposal(ProposalKind::EVM, finished_round).unwrap();
lock.entry(batch_key).or_default().push((prop, signature_rec, message));

Ok(())
}

fn gossip_public_key(&self, _key: DKGPublicKeyMessage) -> Result<(), DKGError> {
// we do not gossip the public key in the test interface
Ok(())
}

fn store_public_key(&self, key: LocalKey<Secp256k1>, _: RoundId) -> Result<(), DKGError> {
*self.keygen_key.lock() = Some(key);
Ok(())
}

fn get_jailed_signers_inner(&self) -> Result<Vec<Public>, DKGError> {
Ok(vec![])
}

fn get_authority_set(&self) -> &Vec<Public> {
&*self.best_authorities
}

fn get_gossip_engine(&self) -> Option<&Self::GossipEngine> {
None
}

fn now(&self) -> Self::Clock {
0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::meta_async_rounds::{
blockchain_interface::BlockChainIface, meta_handler::AsyncProtocolParameters, ProtocolType,
};
use dkg_primitives::types::{DKGError, DKGMessage, DKGMsgPayload, RoundId, SignedDKGMessage};
use dkg_runtime_primitives::crypto::Public;
use futures::Stream;
Expand All @@ -26,34 +23,36 @@ use std::{
};
use tokio_stream::wrappers::BroadcastStream;

use super::{blockchain_interface::BlockchainInterface, AsyncProtocolParameters, ProtocolType};

/// Used to filter and transform incoming messages from the DKG worker
pub struct IncomingAsyncProtocolWrapper<T, B> {
pub struct IncomingAsyncProtocolWrapper<T, BI> {
pub receiver: BroadcastStream<T>,
round_id: RoundId,
bc_iface: Arc<B>,
engine: Arc<BI>,
ty: ProtocolType,
}

impl<T: TransformIncoming, B: BlockChainIface> IncomingAsyncProtocolWrapper<T, B> {
impl<T: TransformIncoming, BI: BlockchainInterface> IncomingAsyncProtocolWrapper<T, BI> {
pub fn new(
receiver: tokio::sync::broadcast::Receiver<T>,
ty: ProtocolType,
params: &AsyncProtocolParameters<B>,
params: &AsyncProtocolParameters<BI>,
) -> Self {
Self {
receiver: BroadcastStream::new(receiver),
round_id: params.round_id,
bc_iface: params.blockchain_iface.clone(),
engine: params.engine.clone(),
ty,
}
}
}

pub trait TransformIncoming: Clone + Send + 'static {
type IncomingMapped;
fn transform<B: BlockChainIface>(
fn transform<BI: BlockchainInterface>(
self,
verify: &B,
verify: &BI,
stream_type: &ProtocolType,
this_round_id: RoundId,
) -> Result<Option<Msg<Self::IncomingMapped>>, DKGError>
Expand All @@ -63,9 +62,9 @@ pub trait TransformIncoming: Clone + Send + 'static {

impl TransformIncoming for Arc<SignedDKGMessage<Public>> {
type IncomingMapped = DKGMessage<Public>;
fn transform<B: BlockChainIface>(
fn transform<BI: BlockchainInterface>(
self,
verify: &B,
verify: &BI,
stream_type: &ProtocolType,
this_round_id: RoundId,
) -> Result<Option<Msg<Self::IncomingMapped>>, DKGError>
Expand Down Expand Up @@ -101,20 +100,20 @@ impl TransformIncoming for Arc<SignedDKGMessage<Public>> {
}
}

impl<T, B> Stream for IncomingAsyncProtocolWrapper<T, B>
impl<T, BI> Stream for IncomingAsyncProtocolWrapper<T, BI>
where
T: TransformIncoming,
B: BlockChainIface,
BI: BlockchainInterface,
{
type Item = Msg<T::IncomingMapped>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self { receiver, ty, bc_iface, round_id } = &mut *self;
let Self { receiver, ty, engine, round_id } = &mut *self;
let mut receiver = Pin::new(receiver);

loop {
match futures::ready!(receiver.as_mut().poll_next(cx)) {
Some(Ok(msg)) => match msg.transform(&**bc_iface, &*ty, *round_id) {
Some(Ok(msg)) => match msg.transform(&**engine, &*ty, *round_id) {
Ok(Some(msg)) => return Poll::Ready(Some(msg)),

Ok(None) => continue,
Expand Down
Loading

0 comments on commit 5c7de60

Please sign in to comment.