Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add non-http stress test
Browse files Browse the repository at this point in the history
14s writes on sqlite comparted to 4s writes on postgresql

sub 100ms read on sqlite comparted to sub 200ms reads on postgresql

using pool size = 20

50 users each sending 100 appointments

(pool size = 1 on sqlite results in 11s writes and sub 200ms reads)
mariocynicys committed Oct 16, 2023
1 parent 691f002 commit c955cda
Showing 5 changed files with 478 additions and 10 deletions.
4 changes: 4 additions & 0 deletions teos/Cargo.toml
Original file line number Diff line number Diff line change
@@ -14,6 +14,10 @@ path = "src/cli.rs"
name = "teosd"
path = "src/main.rs"

[[bin]]
name = "stress"
path = "src/stress.rs"

[features]
# By default, enable both SQLite snd PostgreSQL in the output binary.
default = ["sqlite", "postgres"]
4 changes: 2 additions & 2 deletions teos/src/gatekeeper.rs
Original file line number Diff line number Diff line change
@@ -44,13 +44,13 @@ pub(crate) struct AuthenticationFailure<'a>(&'a str);

/// Error raised if the user subscription has not enough slots to fit a new appointment.
#[derive(Debug, PartialEq)]
pub(crate) struct NotEnoughSlots;
pub struct NotEnoughSlots;

/// Error raised if the user subscription slots limit has been reached.
///
/// This is currently set to [u32::MAX].
#[derive(Debug, PartialEq)]
pub(crate) struct MaxSlotsReached;
pub struct MaxSlotsReached;

/// Component in charge of managing access to the tower resources.
///
2 changes: 1 addition & 1 deletion teos/src/responder.rs
Original file line number Diff line number Diff line change
@@ -67,7 +67,7 @@ impl ConfirmationStatus {
///
/// It is analogous to [ExtendedAppointment](crate::extended_appointment::ExtendedAppointment) for the [`Watcher`](crate::watcher::Watcher).
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct TransactionTracker {
pub struct TransactionTracker {
/// Matches the corresponding [Breach] `dispute_tx` field.
pub dispute_tx: Transaction,
/// Matches the corresponding [Breach] penalty_tx field.
464 changes: 464 additions & 0 deletions teos/src/stress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,464 @@
use std::collections::HashMap;
use std::fs;
use std::io::ErrorKind;
use std::ops::{Deref, DerefMut};
use std::str::FromStr;
use std::sync::{Arc, Condvar, Mutex};

use teos::async_listener::AsyncBlockListener;
use teos::bitcoin_cli::BitcoindClient;
use teos::carrier::Carrier;
use teos::chain_monitor::ChainMonitor;
use teos::config::{self, Config, Opt};
use teos::dbm::DBM;
use teos::gatekeeper::Gatekeeper;
use teos::responder::Responder;
use teos::watcher::Watcher;

use teos_common::constants::IRREVOCABLY_RESOLVED;
use teos_common::cryptography::{get_random_keypair, sign};
use teos_common::test_utils::*;
use teos_common::TowerId;
use teos_common::UserId;

use log::LevelFilter;
use simple_logger::SimpleLogger;
use structopt::StructOpt;

use bitcoin::network::constants::Network;
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};

use bitcoincore_rpc::{Auth, Client, RpcApi};

use lightning_block_sync::init::validate_best_block_header;
use lightning_block_sync::poll::{
ChainPoller, Poll, Validate, ValidatedBlock, ValidatedBlockHeader,
};
use lightning_block_sync::{BlockSource, BlockSourceError, SpvClient, UnboundedCache};

async fn get_last_n_blocks<B, T>(
poller: &mut ChainPoller<B, T>,
mut last_known_block: ValidatedBlockHeader,
n: usize,
) -> Result<Vec<ValidatedBlock>, BlockSourceError>
where
B: DerefMut<Target = T> + Sized + Send + Sync,
T: BlockSource,
{
let mut last_n_blocks = Vec::with_capacity(n);
for _ in 0..n {
log::debug!("Fetching block #{}", last_known_block.height);
let block = poller.fetch_block(&last_known_block).await?;
last_known_block = poller.look_up_previous_header(&last_known_block).await?;
last_n_blocks.push(block);
}

Ok(last_n_blocks)
}

async fn create_new_tower_keypair(dbm: &DBM) -> (SecretKey, PublicKey) {
let sk =
SecretKey::from_str("646133513d0d57bb01269ff517b46fb4c65137741c03e535d80a606116155dd0")
.unwrap();
let pk = PublicKey::from_secret_key(&Secp256k1::new(), &sk);
dbm.store_tower_key(&sk).await.unwrap();
(sk, pk)
}

async fn stress(watcher: Arc<Watcher>) {
let (n_users, n_apps) = (10, 100);

// Send appointments.
let start = tokio::time::Instant::now();
let mut tasks = Vec::new();
let appointments_sent = Arc::new(Mutex::new(HashMap::new()));
let appointment_send_times = Arc::new(Mutex::new(Vec::new()));
for _ in 0..n_users {
let (sk, pk) = get_random_keypair();
let user_id = UserId(pk);

// Shared data.
let appointments_sent = appointments_sent.clone();
let appointment_send_times = appointment_send_times.clone();
let watcher = watcher.clone();

tasks.push(tokio::task::spawn(async move {
let mut times = Vec::new();
let mut locators = Vec::new();
for j in 0..n_apps {
// Re-register every once in a while.
if j % 300 == 0 {
watcher.register(user_id).await.unwrap();
}
// Generate and send an appointment.
let appointment = generate_random_appointment(None);
let signature = sign(&appointment.to_vec(), &sk).unwrap();
let start = tokio::time::Instant::now();
locators.push(appointment.locator);
watcher
.add_appointment(appointment, signature)
.await
.unwrap();
// Store the time it took us to successfully send the appointment and the appointment locator.
times.push(tokio::time::Instant::now() - start);
}
appointments_sent.lock().unwrap().insert(sk, locators);
appointment_send_times.lock().unwrap().extend(times);
}));
}

// Wait for all the tasks to finish.
while !tasks.iter().all(|t| t.is_finished()) {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}

// Log the results.
let appointment_send_times = Arc::try_unwrap(appointment_send_times)
.unwrap()
.into_inner()
.unwrap();
let appointments_sent = Arc::try_unwrap(appointments_sent)
.unwrap()
.into_inner()
.unwrap();
println!(
"Took {:?} to send {} appointments.",
tokio::time::Instant::now() - start,
n_users * n_apps
);
println!(
"Min = {:?}, Max = {:?}, Avg = {:?}",
appointment_send_times.iter().min().unwrap(),
appointment_send_times.iter().max().unwrap(),
appointment_send_times.iter().sum::<tokio::time::Duration>()
/ appointment_send_times.len() as u32
);

// Now retrieve all these sent appointment.
let start = tokio::time::Instant::now();
let mut tasks = Vec::new();
let appointment_send_times = Arc::new(Mutex::new(Vec::new()));
for (sk, locators) in appointments_sent {
// Shared data.
let appointment_send_times = appointment_send_times.clone();
let watcher = watcher.clone();

tasks.push(tokio::task::spawn(async move {
let mut times = Vec::new();
for locator in locators {
let signature = sign(format!("get appointment {locator}").as_bytes(), &sk).unwrap();
let start = tokio::time::Instant::now();
watcher.get_appointment(locator, &signature).await.unwrap();
times.push(tokio::time::Instant::now() - start);
}
appointment_send_times.lock().unwrap().extend(times);
}));
}

// Wait for all the tasks to finish.
while !tasks.iter().all(|t| t.is_finished()) {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}

// Log the results.
let appointment_send_times = Arc::try_unwrap(appointment_send_times)
.unwrap()
.into_inner()
.unwrap();
println!(
"Took {:?} to retrieve {} appointments.",
tokio::time::Instant::now() - start,
n_users * n_apps
);
println!(
"Min = {:?}, Max = {:?}, Avg = {:?}",
appointment_send_times.iter().min().unwrap(),
appointment_send_times.iter().max().unwrap(),
appointment_send_times.iter().sum::<tokio::time::Duration>()
/ appointment_send_times.len() as u32
);
}

#[tokio::main]
async fn main() {
let opt = Opt::from_args();
let path = config::data_dir_absolute_path(opt.data_dir.clone());
let conf_file_path = path.join("teos.toml");
// Create data dir if it does not exist
fs::create_dir_all(&path).unwrap_or_else(|e| {
eprintln!("Cannot create data dir: {e:?}");
std::process::exit(1);
});

// Load conf (from file or defaults) and patch it with the command line parameters received (if any)
let mut conf = config::from_file::<Config>(&conf_file_path);
let is_default = conf.is_default();
conf.patch_with_options(opt);
conf.verify().unwrap_or_else(|e| {
eprintln!("{e}");
std::process::exit(1);
});

// Set log level
SimpleLogger::new()
.with_level(if conf.deps_debug {
LevelFilter::Debug
} else {
LevelFilter::Warn
})
.with_module_level(
"teos",
if conf.debug {
LevelFilter::Debug
} else {
LevelFilter::Info
},
)
.init()
.unwrap();

// Create network dir
let path_network = path.join(conf.btc_network.clone());
fs::create_dir_all(&path_network).unwrap_or_else(|e| {
eprintln!("Cannot create network dir: {e:?}");
std::process::exit(1);
});

// Log default data dir
log::info!("Default data directory: {:?}", &path);

// Log datadir path
log::info!("Using data directory: {:?}", &path_network);

// Log config file path based on whether the config file is found or not
if is_default {
log::info!("Config file: {:?} (not found, skipping)", &conf_file_path);
} else {
log::info!("Config file: {:?}", &conf_file_path);
conf.log_non_default_options();
}

let dbm = if conf.database_url == "managed" {
DBM::new(&format!(
"sqlite://{}",
path_network
// rwc = Read + Write + Create (creates the database file if not found)
.join("teos_db.sql3?mode=rwc")
.to_str()
.expect("Path to the sqlite DB contains non-UTF-8 characters.")
))
.await
.unwrap()
} else {
DBM::new(&conf.database_url).await.unwrap()
};
let dbm = Arc::new(dbm);

// Load tower secret key or create a fresh one if none is found. If overwrite key is set, create a new
// key straightaway
let (tower_sk, tower_pk) = {
if conf.overwrite_key {
log::info!("Overwriting tower keys");
create_new_tower_keypair(&dbm).await
} else if let Some(sk) = dbm.load_tower_key().await {
(sk, PublicKey::from_secret_key(&Secp256k1::new(), &sk))
} else {
log::info!("Tower keys not found. Creating a fresh set");
create_new_tower_keypair(&dbm).await
}
};
log::info!("tower_id: {tower_pk}");

// Initialize our bitcoind client
let (bitcoin_cli, bitcoind_reachable) = match BitcoindClient::new(
&conf.btc_rpc_connect,
conf.btc_rpc_port,
&conf.btc_rpc_user,
&conf.btc_rpc_password,
&conf.btc_network,
)
.await
{
Ok(client) => (
Arc::new(client),
Arc::new((Mutex::new(true), Condvar::new())),
),
Err(e) => {
let e_msg = match e.kind() {
ErrorKind::InvalidData => "invalid btcrpcuser or btcrpcpassword".into(),
_ => e.to_string(),
};
log::error!("Failed to connect to bitcoind. Error: {e_msg}");
std::process::exit(1);
}
};

// FIXME: Temporary. We're using bitcoin_core_rpc and rust-lightning's rpc until they both get merged
// https://github.com/rust-bitcoin/rust-bitcoincore-rpc/issues/166
let schema = if !conf.btc_rpc_connect.starts_with("http") {
"http://"
} else {
""
};
let rpc = Arc::new(
Client::new(
&format!("{schema}{}:{}", conf.btc_rpc_connect, conf.btc_rpc_port),
Auth::UserPass(conf.btc_rpc_user.clone(), conf.btc_rpc_password.clone()),
)
.unwrap(),
);
let mut derefed = bitcoin_cli.deref();
// Load last known block from DB if found. Poll it from Bitcoind otherwise.
let last_known_block = dbm.load_last_known_block().await;
let tip = if let Some(block_hash) = last_known_block {
let mut last_known_header = derefed
.get_header(&block_hash, None)
.await
.unwrap()
.validate(block_hash)
.unwrap();

log::info!(
"Last known block: {} (height: {})",
last_known_header.header.block_hash(),
last_known_header.height
);

// If we are running in pruned mode some data may be missing (if we happen to have been offline for a while)
if let Some(prune_height) = rpc.get_blockchain_info().unwrap().prune_height {
if last_known_header.height - IRREVOCABLY_RESOLVED + 1 < prune_height as u32 {
log::warn!(
"Cannot load blocks in the range {}-{}. Chain has gone too far out of sync",
last_known_header.height - IRREVOCABLY_RESOLVED + 1,
last_known_header.height
);
if conf.force_update {
log::info!("Forcing a backend update");
// We want to grab the first IRREVOCABLY_RESOLVED we know about for the initial cache
// So we can perform transitions from there onwards.
let target_height = prune_height + IRREVOCABLY_RESOLVED as u64;
let target_hash = rpc.get_block_hash(target_height).unwrap();
last_known_header = derefed
.get_header(
&rpc.get_block_hash(target_height).unwrap(),
Some(target_height as u32),
)
.await
.unwrap()
.validate(target_hash)
.unwrap();
} else {
log::error!(
"The underlying chain has gone too far out of sync. The tower block cache cannot be initialized. Run with --forceupdate to force update. THIS WILL, POTENTIALLY, MAKE THE TOWER MISS SOME OF ITS APPOINTMENTS"
);
std::process::exit(1);
}
}
}
last_known_header
} else {
validate_best_block_header(&derefed).await.unwrap()
};

// DISCUSS: This is not really required (and only triggered in regtest). This is only in place so the caches can be
// populated with enough blocks mainly because the size of the cache is based on the amount of blocks passed when initializing.
// However, we could add an additional parameter to specify the size of the cache, and initialize with however may blocks we
// could pull from the backend. Adding this functionality just for regtest seemed unnecessary though, hence the check.
if tip.height < IRREVOCABLY_RESOLVED {
log::error!(
"Not enough blocks to start teosd (required: {IRREVOCABLY_RESOLVED}). Mine at least {} more",
IRREVOCABLY_RESOLVED - tip.height
);
std::process::exit(1);
}

log::info!(
"Current chain tip: {} (height: {})",
tip.header.block_hash(),
tip.height
);

// This is how chain poller names bitcoin networks.
let btc_network = match conf.btc_network.as_str() {
"main" => "bitcoin",
"test" => "testnet",
any => any,
};

// Build components
let gatekeeper = Arc::new(
Gatekeeper::new(
tip.height,
conf.subscription_slots,
conf.subscription_duration,
conf.expiry_delta,
dbm.clone(),
)
.await,
);

let mut poller = ChainPoller::new(&mut derefed, Network::from_str(btc_network).unwrap());
let (responder, watcher) = {
let last_n_blocks = get_last_n_blocks(&mut poller, tip, IRREVOCABLY_RESOLVED as usize)
.await.unwrap_or_else(|e| {
// I'm pretty sure this can only happen if we are pulling blocks from the target to the prune height, and by the time we get to
// the end at least one has been pruned.
log::error!("Couldn't load the latest {IRREVOCABLY_RESOLVED} blocks. Please try again (Error: {})", e.into_inner());
std::process::exit(1);
}
);

let responder = Arc::new(Responder::new(
&last_n_blocks,
tip.height,
Carrier::new(rpc, bitcoind_reachable.clone(), tip.height),
gatekeeper.clone(),
dbm.clone(),
));
let watcher = Arc::new(Watcher::new(
gatekeeper.clone(),
responder.clone(),
&last_n_blocks[0..6],
tip.height,
tower_sk,
TowerId(tower_pk),
dbm.clone(),
));
(responder, watcher)
};

if watcher.is_fresh().await & responder.is_fresh().await & gatekeeper.is_fresh().await {
log::info!("Fresh bootstrap");
} else {
log::info!("Bootstrapping from backed up data");
}

let (_trigger, shutdown_signal_cm) = triggered::trigger();

// The ordering here actually matters. Listeners are called by order, and we want the gatekeeper to be called
// first so it updates the users' states and both the Watcher and the Responder operate only on registered users.
let listeners = (gatekeeper, (watcher.clone(), responder));

// This spawns a separate async actor in the background that will be fed new blocks from a sync block listener.
// In this way we can have our components listen to blocks in an async manner from the async actor.
let listener = AsyncBlockListener::wrap_listener(listeners, dbm);

let cache = &mut UnboundedCache::new();
let spv_client = SpvClient::new(tip, poller, cache, &listener);
let mut chain_monitor = ChainMonitor::new(
spv_client,
tip,
conf.polling_delta,
shutdown_signal_cm,
bitcoind_reachable.clone(),
)
.await;

// Get all the components up to date if there's a backlog of blocks
chain_monitor.poll_best_tip().await;
log::info!("Bootstrap completed. Starting the stress test in a different thread.");

tokio::spawn(stress(watcher));

chain_monitor.monitor_chain().await;
}
14 changes: 7 additions & 7 deletions teos/src/watcher.rs
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ use crate::tx_index::TxIndex;
/// using the resulting dispute transaction id to decipher the encrypted blob of an ongoing [Appointment].
/// Breaches are passed to the [Responder] once created.
#[derive(Debug, Clone)]
pub(crate) struct Breach {
pub struct Breach {
/// Transaction that triggered the breach.
pub dispute_tx: Transaction,
/// Transaction that will be used as a response to the breach.
@@ -46,7 +46,7 @@ impl Breach {
/// Packs the reasons why trying to add an appointment may fail.
// TODO: It may be nice to create richer errors so the API can return richer rejection
#[derive(Debug)]
pub(crate) enum AddAppointmentFailure {
pub enum AddAppointmentFailure {
AuthenticationFailure,
NotEnoughSlots,
SubscriptionExpired(u32),
@@ -55,7 +55,7 @@ pub(crate) enum AddAppointmentFailure {

/// Packs the reasons why trying to query an appointment may fail.
#[derive(Debug)]
pub(crate) enum GetAppointmentFailure {
pub enum GetAppointmentFailure {
AuthenticationFailure,
SubscriptionExpired(u32),
NotFound,
@@ -73,7 +73,7 @@ pub(crate) enum GetSubscriptionInfoFailure {
/// Either an [Appointment] or a [TransactionTracker] can be
/// returned depending on whether the appointment can be found in the [Watcher] or in the [Responder].
#[derive(Debug)]
pub(crate) enum AppointmentInfo {
pub enum AppointmentInfo {
Appointment(Appointment),
Tracker(TransactionTracker),
}
@@ -141,7 +141,7 @@ impl Watcher {

/// Registers a new user within the [Watcher]. This request is passed to the [Gatekeeper], who is in
/// charge of managing users.
pub(crate) async fn register(
pub async fn register(
&self,
user_id: UserId,
) -> Result<RegistrationReceipt, MaxSlotsReached> {
@@ -162,7 +162,7 @@ impl Watcher {
/// If an appointment is accepted, an [ExtendedAppointment] (constructed from the [Appointment]) will be persisted on disk.
/// In case the locator for the given appointment can be found in the cache (meaning the appointment has been
/// triggered recently) the data will be passed to the [Responder] straightaway (modulo it being valid).
pub(crate) async fn add_appointment(
pub async fn add_appointment(
&self,
appointment: Appointment,
user_signature: String,
@@ -322,7 +322,7 @@ impl Watcher {
/// - The user subscription has not expired
/// - The appointment belongs to the user
/// - The appointment exists within the system (either in the [Watcher] or the [Responder])
pub(crate) async fn get_appointment(
pub async fn get_appointment(
&self,
locator: Locator,
user_signature: &str,

0 comments on commit c955cda

Please sign in to comment.