Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregator service #71

Merged
merged 15 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
members = [
"tap_core"
"tap_core",
"tap_aggregator"
]

[workspace.package]
Expand Down
22 changes: 22 additions & 0 deletions tap_aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name="tap_aggregator"
version.workspace = true
edition.workspace = true
license.workspace = true

[[bin]]
name = "tap_aggregator"
path = "src/main.rs"

[dependencies]
anyhow = "1.0.70"
tokio = { version = "1.27.0", features = ["macros", "signal"] }
tap_core = { path = "../tap_core" }
jsonrpsee = { version = "0.18.0", features = ["server", "macros"] }
ethers-signers = "2.0.3"
clap = { version = "4.2.4", features = ["derive", "env"] }
ethers-core = "2.0.3"

[dev-dependencies]
jsonrpsee = { version = "0.18.0", features = ["http-client", "jsonrpsee-core"] }
rstest = "0.17.0"
316 changes: 316 additions & 0 deletions tap_aggregator/src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::collections::hash_set;

use anyhow::{Ok, Result};
use ethers_core::types::{Address, Signature};
use ethers_signers::{LocalWallet, Signer};

use tap_core::{
eip_712_signed_message::EIP712SignedMessage,
receipt_aggregate_voucher::ReceiptAggregateVoucher, tap_receipt::Receipt,
};

pub async fn check_and_aggregate_receipts(
receipts: &[EIP712SignedMessage<Receipt>],
previous_rav: Option<EIP712SignedMessage<ReceiptAggregateVoucher>>,
wallet: LocalWallet,
) -> Result<EIP712SignedMessage<ReceiptAggregateVoucher>> {
// Check that the receipts are unique
check_signatures_unique(receipts)?;

// Check that the receipts are signed by ourselves
receipts
.iter()
.try_for_each(|receipt| receipt.verify(wallet.address()))?;

// Check that the previous rav is signed by ourselves
if let Some(previous_rav) = &previous_rav {
previous_rav.verify(wallet.address())?;
}

// Check that the receipts timestamp is greater than the previous rav
check_receipt_timestamps(receipts, previous_rav.as_ref())?;

// Get the allocation id from the first receipt, return error if there are no receipts
let allocation_id = match receipts.get(0) {
Some(receipt) => receipt.message.allocation_id,
None => {
return Err(tap_core::Error::InvalidCheckError {
check_string: "No receipts".into(),
}
.into())
}
};

// Check that the receipts all have the same allocation id
check_allocation_id(receipts, allocation_id)?;

// Check that the rav has the correct allocation id
if let Some(previous_rav) = &previous_rav {
if previous_rav.message.allocation_id != allocation_id {
return Err(tap_core::Error::InvalidCheckError {
check_string: "Previous rav allocation id does not match receipts".into(),
}
.into());
}
}

// Aggregate the receipts
let rav = ReceiptAggregateVoucher::aggregate_receipts(allocation_id, receipts, previous_rav)?;

// Sign the rav and return
Ok(EIP712SignedMessage::new(rav, &wallet).await?)
}

fn check_allocation_id(
receipts: &[EIP712SignedMessage<Receipt>],
allocation_id: Address,
) -> Result<()> {
for receipt in receipts.iter() {
let receipt = &receipt.message;
if receipt.allocation_id != allocation_id {
return Err(tap_core::Error::InvalidCheckError {
check_string: "Receipts allocation id is not uniform".into(),
}
.into());
}
}
Ok(())
}

fn check_signatures_unique(receipts: &[EIP712SignedMessage<Receipt>]) -> Result<()> {
let mut receipt_signatures: hash_set::HashSet<Signature> = hash_set::HashSet::new();
for receipt in receipts.iter() {
let signature = receipt.signature;
if receipt_signatures.contains(&signature) {
return Err(tap_core::Error::InvalidCheckError {
check_string: "Duplicate receipt signature".into(),
}
.into());
}
receipt_signatures.insert(signature);
}
Ok(())
}

fn check_receipt_timestamps(
receipts: &[EIP712SignedMessage<Receipt>],
previous_rav: Option<&EIP712SignedMessage<ReceiptAggregateVoucher>>,
) -> Result<()> {
if let Some(previous_rav) = &previous_rav {
for receipt in receipts.iter() {
let receipt = &receipt.message;
if previous_rav.message.timestamp_ns > receipt.timestamp_ns {
return Err(tap_core::Error::InvalidCheckError {
check_string: "Receipt timestamp is less or equal then previous rav timestamp"
.into(),
}
.into());
}
}
}

Ok(())
}

#[cfg(test)]
mod tests {
use std::{
str::FromStr,
time::{SystemTime, UNIX_EPOCH},
};

use ethers_core::types::Address;
use ethers_signers::{coins_bip39::English, LocalWallet, MnemonicBuilder, Signer};
use rstest::*;

use crate::aggregator;
use tap_core::{eip_712_signed_message::EIP712SignedMessage, tap_receipt::Receipt};
ColePBryan marked this conversation as resolved.
Show resolved Hide resolved

#[fixture]
fn keys() -> (LocalWallet, Address) {
let wallet: LocalWallet = MnemonicBuilder::<English>::default()
.phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about")
.build()
.unwrap();
let address = wallet.address();
(wallet, address)
}

#[fixture]
fn allocation_ids() -> Vec<Address> {
vec![
Address::from_str("0xabababababababababababababababababababab").unwrap(),
Address::from_str("0xdeaddeaddeaddeaddeaddeaddeaddeaddeaddead").unwrap(),
Address::from_str("0xbeefbeefbeefbeefbeefbeefbeefbeefbeefbeef").unwrap(),
Address::from_str("0x1234567890abcdef1234567890abcdef12345678").unwrap(),
]
}

#[rstest]
#[tokio::test]
async fn check_signatures_unique_fail(
keys: (LocalWallet, Address),
allocation_ids: Vec<Address>,
) {
// Create the same receipt twice (replay attack)
let mut receipts = Vec::new();
let receipt =
EIP712SignedMessage::new(Receipt::new(allocation_ids[0], 42).unwrap(), &keys.0)
.await
.unwrap();
receipts.push(receipt.clone());
receipts.push(receipt.clone());

let res = aggregator::check_signatures_unique(&receipts);
assert!(res.is_err());
}

#[rstest]
#[tokio::test]
async fn check_signatures_unique_ok(
keys: (LocalWallet, Address),
allocation_ids: Vec<Address>,
) {
// Create 2 different receipts
let mut receipts = Vec::new();
receipts.push(
EIP712SignedMessage::new(Receipt::new(allocation_ids[0], 42).unwrap(), &keys.0)
.await
.unwrap(),
);
receipts.push(
EIP712SignedMessage::new(Receipt::new(allocation_ids[0], 43).unwrap(), &keys.0)
.await
.unwrap(),
);

let res = aggregator::check_signatures_unique(&receipts);
assert!(res.is_ok());
}

#[rstest]
#[tokio::test]
/// Test that a receipt with a timestamp greater then the rav timestamp passes
async fn check_receipt_timestamps_ok(
keys: (LocalWallet, Address),
allocation_ids: Vec<Address>,
) {
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;

// Create rav
let rav = EIP712SignedMessage::new(
tap_core::receipt_aggregate_voucher::ReceiptAggregateVoucher {
allocation_id: allocation_ids[0],
timestamp_ns: time,
value_aggregate: 42,
},
&keys.0,
)
.await
.unwrap();

let mut receipts = Vec::new();
receipts.push(
EIP712SignedMessage::new(Receipt::new(allocation_ids[0], 42).unwrap(), &keys.0)
.await
.unwrap(),
);

aggregator::check_receipt_timestamps(&receipts, Some(&rav)).unwrap();
}

#[rstest]
#[tokio::test]
/// Test that a receipt with a timestamp less then the rav timestamp fails
async fn check_receipt_timestamps_fail(
keys: (LocalWallet, Address),
allocation_ids: Vec<Address>,
) {
let mut receipts = Vec::new();
receipts.push(
EIP712SignedMessage::new(Receipt::new(allocation_ids[0], 42).unwrap(), &keys.0)
.await
.unwrap(),
);

let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;

// Create rav
let rav = EIP712SignedMessage::new(
tap_core::receipt_aggregate_voucher::ReceiptAggregateVoucher {
allocation_id: allocation_ids[0],
timestamp_ns: time,
value_aggregate: 42,
},
&keys.0,
)
.await
.unwrap();

let res = aggregator::check_receipt_timestamps(&receipts, Some(&rav));

assert!(res.is_err());
}

#[rstest]
#[tokio::test]
/// Test check_allocation_id with 2 receipts that have the correct allocation id
/// and 1 receipt that has the wrong allocation id
async fn check_allocation_id_fail(keys: (LocalWallet, Address), allocation_ids: Vec<Address>) {
let mut receipts = Vec::new();
receipts.push(
EIP712SignedMessage::new(Receipt::new(allocation_ids[0], 42).unwrap(), &keys.0)
.await
.unwrap(),
);
receipts.push(
EIP712SignedMessage::new(Receipt::new(allocation_ids[0], 43).unwrap(), &keys.0)
.await
.unwrap(),
);
receipts.push(
EIP712SignedMessage::new(Receipt::new(allocation_ids[1], 44).unwrap(), &keys.0)
.await
.unwrap(),
);

let res = aggregator::check_allocation_id(&receipts, allocation_ids[0]);

assert!(res.is_err());
}

#[rstest]
#[tokio::test]
/// Test check_allocation_id with 3 receipts that have the correct allocation id
async fn check_allocation_id_ok(keys: (LocalWallet, Address), allocation_ids: Vec<Address>) {
let mut receipts = Vec::new();
receipts.push(
EIP712SignedMessage::new(Receipt::new(allocation_ids[0], 42).unwrap(), &keys.0)
.await
.unwrap(),
);
receipts.push(
EIP712SignedMessage::new(Receipt::new(allocation_ids[0], 43).unwrap(), &keys.0)
.await
.unwrap(),
);
receipts.push(
EIP712SignedMessage::new(Receipt::new(allocation_ids[0], 44).unwrap(), &keys.0)
.await
.unwrap(),
);

let res = aggregator::check_allocation_id(&receipts, allocation_ids[0]);

assert!(res.is_ok());
}
}
Loading