Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Merge ACL whitelist #123

Merged
merged 5 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async fn main() -> Result<()> {
PeerCommand::Whitelist { db_url } => {
let db = storage::Database::new(&db_url).await?;
let key = entity::PublicKey::try_from(peer.as_str())?;
db.acl_whitelist(&key).await
db.acl_whitelist(key).await
}
PeerCommand::Deny { db_url } => {
let db = storage::Database::new(&db_url).await?;
Expand Down
234 changes: 223 additions & 11 deletions crates/node/src/networking/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
pub mod p2p;

use async_trait::async_trait;
use bytes::Buf;
use eyre::{eyre, Result};
use futures_util::TryStreamExt;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::io::AsyncBufReadExt;
use tokio_stream::Stream;
use tokio_util::io::StreamReader;

pub use p2p::P2P;

use crate::storage::Database;

#[async_trait]
trait MergeStoredAcl {
async fn get_acl_whitelist(&self) -> Result<Vec<crate::entity::PublicKey>>;
async fn acl_whitelist(&self, key: crate::entity::PublicKey) -> Result<()>;
async fn acl_deny(&self, key: &crate::entity::PublicKey) -> Result<()>;
}

pub struct WhitelistSyncer {
url: String,
database: Arc<Database>,
Expand All @@ -31,17 +42,7 @@ impl WhitelistSyncer {
resp.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
);

let mut lines = reader.lines();
let mut key_count = 0;
while let Ok(Some(line)) = lines.next_line().await {
let public_key = crate::entity::PublicKey::try_from(line.as_str())?;
self.database.acl_whitelist(&public_key).await?;
key_count += 1;
}

tracing::info!("{} keys whitelisted", key_count);
Ok(())
merge_acl_white_list(reader, &self).await
} else {
Err(eyre!(
"failed to download file from {}: response status: {}",
Expand All @@ -51,3 +52,214 @@ impl WhitelistSyncer {
}
}
}

#[async_trait::async_trait]
impl<'a> MergeStoredAcl for &'a WhitelistSyncer {
async fn get_acl_whitelist(&self) -> Result<Vec<crate::entity::PublicKey>> {
self.database.get_acl_whitelist().await
}
async fn acl_whitelist(&self, key: crate::entity::PublicKey) -> Result<()> {
self.database.acl_whitelist(key).await
}
async fn acl_deny(&self, key: &crate::entity::PublicKey) -> Result<()> {
self.database.acl_deny(key).await
}
}

async fn merge_acl_white_list<S, B>(
reader: StreamReader<S, B>,
database: &impl MergeStoredAcl,
) -> Result<()>
where
S: Stream<Item = Result<B, std::io::Error>> + Unpin,
B: Buf,
{
let mut lines = reader.lines();
let db_keys = database.get_acl_whitelist().await?;
let mut db_keys: HashSet<crate::entity::PublicKey> = HashSet::from_iter(db_keys.into_iter());

let mut new_keys = vec![];
// Received list shouldn't be empty.
let mut non_empry_list = false;
musitdev marked this conversation as resolved.
Show resolved Hide resolved
// If an error occurs during fetch cancel merge to avoid removing all db keys.
while let Some(line) = lines.next_line().await? {
non_empry_list = true;
musitdev marked this conversation as resolved.
Show resolved Hide resolved
match crate::entity::PublicKey::try_from(line.as_str()) {
Ok(public_key) => {
if !db_keys.contains(&public_key) {
new_keys.push(public_key);
} else {
db_keys.remove(&public_key);
}
}
Err(err) => tracing::error!(
"Error during acl white list sync. Key not a public key:{}",
line
),
}
}

let key_count = new_keys.len();
for public_key in new_keys {
database.acl_whitelist(public_key).await?;
}
let removed_key_count = db_keys.len();

// Do not remove all list if received list is empty.
if non_empry_list {
musitdev marked this conversation as resolved.
Show resolved Hide resolved
for public_key in db_keys {
database.acl_deny(&public_key).await?;
}
}

tracing::info!("{key_count} new keys whitelisted and {removed_key_count} keys removed",);
Ok(())
}

#[cfg(test)]
mod tests {

use super::*;
use tokio::sync::Mutex;

struct TestDb(Mutex<HashSet<String>>);

#[async_trait::async_trait]
impl MergeStoredAcl for TestDb {
async fn get_acl_whitelist(&self) -> Result<Vec<crate::entity::PublicKey>> {
self.0
.lock()
.await
.iter()
.map(|s| crate::entity::PublicKey::try_from(s.as_str()))
.collect::<Result<Vec<_>, _>>()
.map_err(|err| err.into())
}
async fn acl_whitelist(&self, key: crate::entity::PublicKey) -> Result<()> {
self.0.lock().await.insert(key.to_string());
Ok(())
}
async fn acl_deny(&self, key: &crate::entity::PublicKey) -> Result<()> {
let str_key = key.to_string();
self.0.lock().await.remove(&str_key);
Ok(())
}
}

#[tokio::test]
async fn test_acl_merge_multiple() {
let db = TestDb(Mutex::new(HashSet::new()));
{
let mut set = db.0.lock().await;
set.insert("0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8".to_string());
set.insert("040f28123df7a638647d867dfe186395999a276048c76c1bd56d4b792eca56449751944d6cd0b208f44b9aec332b4b6d0630406ebb07ccfb17e7c505a07156a792".to_string());
set.insert("04485539698460eb21864f22fdc2ff595980f67d6b43c90b35e022ea40a7465806144bdfe79e571ee196ce52f7d0c88da915733e838420b347f8e6f7920c2c91e7".to_string());
}

let to_merge_acl = vec![
Ok("043a1d3d3bfa8b91b18be20009f58616683695765c90c37a404af7635a3e047de50ee62b5da0c02a1ba54489294974aa6a3733ec82e34c8f6a26c29d6aa000e88d\n".as_bytes()),
Ok("040f28123df7a638647d867dfe186395999a276048c76c1bd56d4b792eca56449751944d6cd0b208f44b9aec332b4b6d0630406ebb07ccfb17e7c505a07156a792\n".as_bytes()),
];
let reader = StreamReader::new(tokio_stream::iter(to_merge_acl));
let res = merge_acl_white_list(reader, &db).await;

assert!(res.is_ok());
{
let set = db.0.lock().await;
assert_eq!(2, set.len());
assert!(set.get("043a1d3d3bfa8b91b18be20009f58616683695765c90c37a404af7635a3e047de50ee62b5da0c02a1ba54489294974aa6a3733ec82e34c8f6a26c29d6aa000e88d").is_some());
assert!(set.get("040f28123df7a638647d867dfe186395999a276048c76c1bd56d4b792eca56449751944d6cd0b208f44b9aec332b4b6d0630406ebb07ccfb17e7c505a07156a792").is_some());
assert!(set.get("0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8").is_none());
assert!(set.get("04485539698460eb21864f22fdc2ff595980f67d6b43c90b35e022ea40a7465806144bdfe79e571ee196ce52f7d0c88da915733e838420b347f8e6f7920c2c91e7").is_none());
}
}

#[tokio::test]
async fn test_acl_merge_from_empty() {
let db = TestDb(Mutex::new(HashSet::new()));

let to_merge_acl = vec![
Ok("043a1d3d3bfa8b91b18be20009f58616683695765c90c37a404af7635a3e047de50ee62b5da0c02a1ba54489294974aa6a3733ec82e34c8f6a26c29d6aa000e88d\n".as_bytes()),
Ok("040f28123df7a638647d867dfe186395999a276048c76c1bd56d4b792eca56449751944d6cd0b208f44b9aec332b4b6d0630406ebb07ccfb17e7c505a07156a792\n".as_bytes()),
];
let reader = StreamReader::new(tokio_stream::iter(to_merge_acl));
let res = merge_acl_white_list(reader, &db).await;

assert!(res.is_ok());
{
let set = db.0.lock().await;
assert_eq!(2, set.len());
assert!(set.get("043a1d3d3bfa8b91b18be20009f58616683695765c90c37a404af7635a3e047de50ee62b5da0c02a1ba54489294974aa6a3733ec82e34c8f6a26c29d6aa000e88d").is_some());
assert!(set.get("040f28123df7a638647d867dfe186395999a276048c76c1bd56d4b792eca56449751944d6cd0b208f44b9aec332b4b6d0630406ebb07ccfb17e7c505a07156a792").is_some());
}
}

#[tokio::test]
async fn test_acl_merge_with_empty() {
let db = TestDb(Mutex::new(HashSet::new()));
{
let mut set = db.0.lock().await;
set.insert("0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8".to_string());
}

let to_merge_acl: Vec<Result<&[u8], std::io::Error>> = vec![];
let reader = StreamReader::new(tokio_stream::iter(to_merge_acl));
let res = merge_acl_white_list(reader, &db).await;

assert!(res.is_ok());
{
let set = db.0.lock().await;
assert_eq!(1, set.len());
assert!(set.get("0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8").is_some());
}
}

#[tokio::test]
async fn test_acl_merge_with_error() {
let db = TestDb(Mutex::new(HashSet::new()));
{
let mut set = db.0.lock().await;
set.insert("0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8".to_string());
}

let to_merge_acl = vec![
Ok("043a1d3d3bfa8b91b18be20009f58616683695765c90c37a404af7635a3e047de50ee62b5da0c02a1ba54489294974aa6a3733ec82e34c8f6a26c29d6aa000e88d\n".as_bytes()),
Err(std::io::Error::new(std::io::ErrorKind::Other, "a test")),
Ok("040f28123df7a638647d867dfe186395999a276048c76c1bd56d4b792eca56449751944d6cd0b208f44b9aec332b4b6d0630406ebb07ccfb17e7c505a07156a792\n".as_bytes()),
];
let reader = StreamReader::new(tokio_stream::iter(to_merge_acl));
let res = merge_acl_white_list(reader, &db).await;

assert!(res.is_err());
{
let set = db.0.lock().await;
assert_eq!(1, set.len());
assert!(set.get("0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8").is_some());
}
}

#[tokio::test]
async fn test_acl_merge_with_keyparseerror() {
let db = TestDb(Mutex::new(HashSet::new()));
{
let mut set = db.0.lock().await;
set.insert("0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8".to_string());
}

let to_merge_acl = vec![
Ok("043a1d3d3bfa8b91b18be20009f58616683695765c90c37a404af7635a3e047de50ee62b5da0c02a1ba54489294974aa6a3733ec82e34c8f6a26c29d6aa000e88d\n".as_bytes()),
Ok("0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000\n".as_bytes()),
Ok("040f28123df7a638647d867dfe186395999a276048c76c1bd56d4b792eca56449751944d6cd0b208f44b9aec332b4b6d0630406ebb07ccfb17e7c505a07156a792\n".as_bytes()),
];
let reader = StreamReader::new(tokio_stream::iter(to_merge_acl));
let res = merge_acl_white_list(reader, &db).await;

assert!(res.is_ok());
{
let set = db.0.lock().await;
assert_eq!(2, set.len());
assert!(set.get("043a1d3d3bfa8b91b18be20009f58616683695765c90c37a404af7635a3e047de50ee62b5da0c02a1ba54489294974aa6a3733ec82e34c8f6a26c29d6aa000e88d").is_some());
assert!(set.get("040f28123df7a638647d867dfe186395999a276048c76c1bd56d4b792eca56449751944d6cd0b208f44b9aec332b4b6d0630406ebb07ccfb17e7c505a07156a792").is_some());
}
}
}
10 changes: 9 additions & 1 deletion crates/node/src/storage/database/entity/public_key.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::fmt;
use std::hash::Hash;
use std::hash::Hasher;

use serde::{Deserialize, Serialize};
use sqlx::{Decode, Encode, Postgres, Type};
Expand All @@ -11,7 +13,7 @@ pub enum PublicKeyError {
ParseError(String),
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct PublicKey(pub libsecp256k1::PublicKey);

impl Default for PublicKey {
Expand All @@ -22,6 +24,12 @@ impl Default for PublicKey {
}
}

impl Hash for PublicKey {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.serialize().hash(state);
}
}

impl fmt::Display for PublicKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", hex::encode(self.0.serialize()))
Expand Down
10 changes: 9 additions & 1 deletion crates/node/src/storage/database/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,14 @@ impl Database {
Ok(())
}

pub async fn get_acl_whitelist(&self) -> Result<Vec<entity::PublicKey>> {
let dbkey_set: Vec<entity::PublicKey> = sqlx::query("SELECT key FROM acl_whitelist")
.map(|row: sqlx::postgres::PgRow| row.get(0))
.fetch_all(&self.pool)
.await?;
Ok(dbkey_set)
}

pub async fn acl_whitelist_has(&self, key: &entity::PublicKey) -> Result<bool> {
let res: Option<i32> = sqlx::query("SELECT 1 FROM acl_whitelist WHERE key = $1")
.bind(key)
Expand All @@ -545,7 +553,7 @@ impl Database {
Ok(res.is_some())
}

pub async fn acl_whitelist(&self, key: &entity::PublicKey) -> Result<()> {
pub async fn acl_whitelist(&self, key: entity::PublicKey) -> Result<()> {
sqlx::query("INSERT INTO acl_whitelist ( key ) VALUES ( $1 ) ON CONFLICT (key) DO NOTHING")
.bind(key)
.execute(&self.pool)
Expand Down
Loading