Skip to content

Commit

Permalink
server: fix stats sending performance and ns creation (#1440)
Browse files Browse the repository at this point in the history
* server: create debug logging

* server: fix stats sending performance

This change fixes how we send stats to the pulse server. This is done by
not serially sending stats on the same task that we extract events from.
The reason for this is that if the event buffer (stats_sender) gets full
it will block namespace create requests.

With this change, we now submit stats http requests off the main stats
task and restrict the stats sending to 128 concurrent requests. This
will allow us to accept more create namespace requests and efficiently
send stats.
  • Loading branch information
LucioFranco authored Jun 4, 2024
1 parent 97beabd commit 2b9ad5c
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 53 deletions.
5 changes: 4 additions & 1 deletion bottomless/src/bottomless_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ impl BottomlessWalWrapper {
fn try_with_replicator<Ret>(&self, f: impl FnOnce(&mut Replicator) -> Ret) -> Result<Ret> {
let mut lock = self.replicator.lock().unwrap();
match &mut *lock {
Some(replicator) => Ok(f(replicator)),
Some(replicator) => {
let _span = tracing::info_span!("replicator", db_name = replicator.db_name);
Ok(f(replicator))
}
None => Err(Error::new(SQLITE_IOERR_WRITE)),
}
}
Expand Down
1 change: 1 addition & 0 deletions libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ where
}
}

#[tracing::instrument(skip(self))]
async fn make_connection(&self) -> Result<LibSqlConnection<W>> {
LibSqlConnection::new(
self.db_path.clone(),
Expand Down
95 changes: 54 additions & 41 deletions libsql-server/src/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,91 +1,104 @@
#![allow(clippy::mutable_key_type)]

use std::collections::HashMap;
use std::sync::Weak;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::time::Interval;
use url::Url;

use tokio::sync::mpsc;
use tokio::sync::{mpsc, Semaphore};

use crate::http::admin::stats::StatsResponse;
use crate::namespace::{NamespaceName, NamespaceStore};
use crate::namespace::meta_store::MetaStoreHandle;
use crate::namespace::NamespaceName;
use crate::stats::Stats;

pub async fn server_heartbeat(
url: Option<Url>,
auth: Option<String>,
update_period: Duration,
mut stats_subs: mpsc::Receiver<(NamespaceName, Weak<Stats>)>,
namespaces: NamespaceStore,
mut stats_subs: mpsc::Receiver<(NamespaceName, MetaStoreHandle, Weak<Stats>)>,
) {
let mut watched = HashMap::new();
let client = reqwest::Client::new();
let mut interval = tokio::time::interval(update_period);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let semaphore = Arc::new(Semaphore::new(128));

loop {
let wait_for_next_tick = next_tick(&mut interval, &semaphore, 128);

tokio::select! {
Some((ns, stats)) = stats_subs.recv() => {
watched.insert(ns, stats);
Some((ns, handle, stats)) = stats_subs.recv() => {
watched.insert(ns, (handle, stats));
}
_ = interval.tick() => {
send_stats(&mut watched, &client, &namespaces, url.as_ref(), auth.as_deref()).await;
_ = wait_for_next_tick => {
send_stats(&mut watched, &client, url.as_ref(), auth.as_deref(), &semaphore).await;
}
};
}
}

/// Wait for all the permits to be available again, this should work as long as its called after
/// the last `send_stats` is called since the sempaphore waits in a queue.
async fn next_tick(interval: &mut Interval, semaphore: &Arc<Semaphore>, permits: u32) {
let permit = semaphore.acquire_many(permits).await;
drop(permit);

interval.tick().await;
}

async fn send_stats(
watched: &mut HashMap<NamespaceName, Weak<Stats>>,
watched: &mut HashMap<NamespaceName, (MetaStoreHandle, Weak<Stats>)>,
client: &reqwest::Client,
namespaces: &NamespaceStore,
url: Option<&Url>,
auth: Option<&str>,
semaphore: &Arc<Semaphore>,
) {
// first send all the stats...
for (ns, stats) in watched.iter() {
for (ns, (config_store, stats)) in watched.iter() {
if let Some(stats) = stats.upgrade() {
let body = StatsResponse::from(stats.as_ref());
let mut heartbeat_url;
if let Some(url) = url {
heartbeat_url = url.clone();

let mut heartbeat_url = if let Some(url) = url {
url.clone()
} else {
match namespaces.config_store(ns.clone()).await {
Ok(config_store) => {
let config = config_store.get();
if let Some(url) = config.heartbeat_url.as_ref() {
heartbeat_url = url.clone();
} else {
tracing::debug!(
"No heartbeat url for namespace {}. Can't send stats!",
ns.as_str()
);
continue;
}
}
Err(e) => {
tracing::warn!(
"Error fetching config for namespace {}. Can't send stats: {}",
ns.as_str(),
e
);
continue;
}
let config = config_store.get();
if let Some(url) = config.heartbeat_url.as_ref() {
url.clone()
} else {
tracing::debug!(
"No heartbeat url for namespace {}. Can't send stats!",
ns.as_str()
);
continue;
}
}
};

heartbeat_url.path_segments_mut().unwrap().push(ns.as_str());

let request = client.post(heartbeat_url);

let request = if let Some(ref auth) = auth {
request.header("Authorization", auth.to_string())
} else {
request
};

let request = request.json(&body);
if let Err(err) = request.send().await {
tracing::warn!("Error sending heartbeat: {}", err);
}

let semaphore = semaphore.clone();
tokio::spawn(async move {
let _permit = semaphore.acquire().await;

if let Err(err) = request.send().await {
tracing::warn!("Error sending heartbeat: {}", err);
}
});
}
}

// ..and then remove all expired subscription
watched.retain(|_, s| s.upgrade().is_some());
watched.retain(|_, (_, s)| s.upgrade().is_some());
}
11 changes: 5 additions & 6 deletions libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use libsql_sys::wal::either::Either;
use libsql_sys::wal::Sqlite3WalManager;
use libsql_wal::registry::WalRegistry;
use libsql_wal::wal::LibsqlWalManager;
use namespace::meta_store::MetaStoreHandle;
use namespace::{NamespaceConfig, NamespaceName};
use net::Connector;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -94,7 +95,7 @@ pub(crate) static BLOCKING_RT: Lazy<Runtime> = Lazy::new(|| {
});

type Result<T, E = Error> = std::result::Result<T, E>;
type StatsSender = mpsc::Sender<(NamespaceName, Weak<Stats>)>;
type StatsSender = mpsc::Sender<(NamespaceName, MetaStoreHandle, Weak<Stats>)>;

pub struct Server<C = HttpConnector, A = AddrIncoming, D = HttpsConnector<HttpConnector>> {
pub path: Arc<Path>,
Expand Down Expand Up @@ -299,8 +300,7 @@ where
fn spawn_monitoring_tasks(
&self,
join_set: &mut JoinSet<anyhow::Result<()>>,
stats_receiver: mpsc::Receiver<(NamespaceName, Weak<Stats>)>,
namespaces: NamespaceStore,
stats_receiver: mpsc::Receiver<(NamespaceName, MetaStoreHandle, Weak<Stats>)>,
) -> anyhow::Result<()> {
match self.heartbeat_config {
Some(ref config) => {
Expand All @@ -323,7 +323,6 @@ where
heartbeat_auth,
heartbeat_period,
stats_receiver,
namespaces,
)
.await;
Ok(())
Expand Down Expand Up @@ -422,7 +421,7 @@ where

let (scheduler_sender, scheduler_receiver) = mpsc::channel(128);

let (stats_sender, stats_receiver) = mpsc::channel(8);
let (stats_sender, stats_receiver) = mpsc::channel(1024);

// chose the wal backend
let (make_wal_manager, registry_shutdown) = self.configure_wal_manager()?;
Expand Down Expand Up @@ -476,7 +475,7 @@ where
Ok(())
});

self.spawn_monitoring_tasks(&mut join_set, stats_receiver, namespace_store.clone())?;
self.spawn_monitoring_tasks(&mut join_set, stats_receiver)?;

// eagerly load the default namespace when namespaces are disabled
if self.disable_namespaces && db_kind.is_primary() {
Expand Down
35 changes: 30 additions & 5 deletions libsql-server/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ impl Namespace {
}
}

#[tracing::instrument(skip_all)]
async fn make_primary_connection_maker(
ns_config: &NamespaceConfig,
meta_store_handle: &MetaStoreHandle,
Expand Down Expand Up @@ -315,12 +316,14 @@ impl Namespace {
let (replicator, did_recover) =
init_bottomless_replicator(db_path.join("data"), options, &restore_option)
.await?;
tracing::debug!("Completed init of bottomless replicator");
is_dirty |= did_recover;
Some(replicator)
}
None => None,
};

tracing::debug!("Checking fresh db");
let is_fresh_db = check_fresh_db(&db_path)?;
// switch frame-count checkpoint to time-based one
let auto_checkpoint = if ns_config.checkpoint_interval.is_some() {
Expand All @@ -340,17 +343,24 @@ impl Namespace {
ns_config.encryption_config.clone(),
)?);

tracing::debug!("sending stats");

let stats = make_stats(
&db_path,
join_set,
meta_store_handle.clone(),
ns_config.stats_sender.clone(),
name.clone(),
logger.new_frame_notifier.subscribe(),
ns_config.encryption_config.clone(),
)
.await?;

tracing::debug!("Making replication wal wrapper");
let wal_wrapper = make_replication_wal_wrapper(bottomless_replicator, logger.clone());

tracing::debug!("Opening libsql connection");

let connection_maker = MakeLibSqlConn::new(
db_path.to_path_buf(),
wal_wrapper.clone(),
Expand All @@ -374,6 +384,8 @@ impl Namespace {
ns_config.max_concurrent_requests,
);

tracing::debug!("Completed opening libsql connection");

// this must happen after we create the connection maker. The connection maker old on a
// connection to ensure that no other connection is closing while we try to open the dump.
// that would cause a SQLITE_LOCKED error.
Expand All @@ -382,25 +394,30 @@ impl Namespace {
Err(LoadDumpError::LoadDumpExistingDb)?;
}
RestoreOption::Dump(dump) => {
tracing::debug!("Loading dump");
load_dump(
&db_path,
dump,
wal_wrapper.clone().map_wal(),
ns_config.encryption_config.clone(),
)
.await?;
tracing::debug!("Done loading dump");
}
_ => { /* other cases were already handled when creating bottomless */ }
}

join_set.spawn(run_periodic_compactions(logger.clone()));

tracing::debug!("Done making primary connection");

Ok((connection_maker, wal_wrapper, stats))
}

#[tracing::instrument(skip_all, fields(namespace))]
async fn try_new_primary(
ns_config: &NamespaceConfig,
name: NamespaceName,
namespace: NamespaceName,
meta_store_handle: MetaStoreHandle,
restore_option: RestoreOption,
resolve_attach_path: ResolveNamespacePathFn,
Expand All @@ -415,7 +432,7 @@ impl Namespace {
ns_config,
&meta_store_handle,
&db_path,
&name,
&namespace,
restore_option,
block_writes.clone(),
&mut join_set,
Expand Down Expand Up @@ -444,18 +461,20 @@ impl Namespace {
join_set.spawn(run_periodic_checkpoint(
connection_maker.clone(),
checkpoint_interval,
name.clone(),
namespace.clone(),
));
}

tracing::debug!("Done making new primary");

Ok(Self {
tasks: join_set,
db: Database::Primary(PrimaryDatabase {
wal_wrapper,
connection_maker,
block_writes,
}),
name,
name: namespace,
stats,
db_config_store: meta_store_handle,
path: db_path.into(),
Expand Down Expand Up @@ -578,6 +597,7 @@ impl Namespace {
let stats = make_stats(
&db_path,
&mut join_set,
meta_store_handle.clone(),
config.stats_sender.clone(),
name.clone(),
applied_frame_no_receiver.clone(),
Expand Down Expand Up @@ -773,16 +793,19 @@ fn make_bottomless_options(
async fn make_stats(
db_path: &Path,
join_set: &mut JoinSet<anyhow::Result<()>>,
meta_store_handle: MetaStoreHandle,
stats_sender: StatsSender,
name: NamespaceName,
mut current_frame_no: watch::Receiver<Option<FrameNo>>,
encryption_config: Option<EncryptionConfig>,
) -> anyhow::Result<Arc<Stats>> {
tracing::debug!("creating stats type");
let stats = Stats::new(name.clone(), db_path, join_set).await?;

// the storage monitor is optional, so we ignore the error here.
tracing::debug!("stats created, sending stats");
let _ = stats_sender
.send((name.clone(), Arc::downgrade(&stats)))
.send((name.clone(), meta_store_handle, Arc::downgrade(&stats)))
.await;

join_set.spawn({
Expand All @@ -807,6 +830,8 @@ async fn make_stats(
encryption_config,
));

tracing::debug!("done sending stats, and creating bg tasks");

Ok(stats)
}

Expand Down
Loading

0 comments on commit 2b9ad5c

Please sign in to comment.