Skip to content

Commit

Permalink
Merge pull request #264 from superfly/debug-missing-sub
Browse files Browse the repository at this point in the history
Prioritize recent versions during sync
  • Loading branch information
somtochiama authored Nov 7, 2024
2 parents 6fb772b + 9977246 commit 99fb53f
Show file tree
Hide file tree
Showing 11 changed files with 612 additions and 288 deletions.
110 changes: 97 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ strum = { version = "0.24.1", features = ["derive"] }
tempfile = "3.5.0"
thiserror = "1.0.40"
time = { version = "0.3.15", features = ["macros", "serde-well-known"] }
tokio = { version = "1.34", features = ["full"] }
tokio = { version = "1.41", features = ["full"] }
tokio-metrics = "0.3.0"
tokio-serde = { version = "0.8", features = ["json"] }
tokio-stream = { version = "0.1.12", features = ["sync"] }
Expand Down
62 changes: 25 additions & 37 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::{
time::{Duration, Instant},
};

use crate::agent::util::log_at_pow_10;
use crate::{
agent::{bi, bootstrap, uni, util, SyncClientError, ANNOUNCE_INTERVAL},
api::peer::parallel_sync,
Expand Down Expand Up @@ -118,7 +119,12 @@ pub fn spawn_incoming_connection_handlers(

// Spawn handler tasks for this connection
spawn_foca_handler(&agent, &tripwire, &conn);
uni::spawn_unipayload_handler(&tripwire, &conn, agent.clone());
uni::spawn_unipayload_handler(
&tripwire,
&conn,
agent.cluster_id(),
agent.tx_changes().clone(),
);
bi::spawn_bipayload_handler(&agent, &bookie, &tripwire, &conn);
});
}
Expand Down Expand Up @@ -527,12 +533,10 @@ pub async fn handle_emptyset(
mut rx_emptysets: CorroReceiver<ChangeV1>,
mut tripwire: Tripwire,
) {
let mut buf: HashMap<ActorId, VecDeque<(Vec<RangeInclusive<Version>>, Timestamp)>> =
HashMap::new();
type EmptyQueue = VecDeque<(Vec<RangeInclusive<Version>>, Timestamp)>;
let mut buf: HashMap<ActorId, EmptyQueue> = HashMap::new();

let mut join_set: JoinSet<
HashMap<ActorId, VecDeque<(Vec<RangeInclusive<Version>>, Timestamp)>>,
> = JoinSet::new();
let mut join_set: JoinSet<HashMap<ActorId, EmptyQueue>> = JoinSet::new();

loop {
tokio::select! {
Expand All @@ -551,7 +555,7 @@ pub async fn handle_emptyset(
maybe_change_src = rx_emptysets.recv() => match maybe_change_src {
Some(change) => {
if let Changeset::EmptySet { versions, ts } = change.changeset {
buf.entry(change.actor_id).or_insert(VecDeque::new()).push_back((versions.clone(), ts));
buf.entry(change.actor_id).or_default().push_back((versions.clone(), ts));
} else {
warn!("received non-emptyset changes in emptyset channel from {}", change.actor_id);
}
Expand Down Expand Up @@ -618,9 +622,9 @@ pub async fn process_emptyset(
) -> Result<(), ChangeError> {
let (versions, ts) = changes;

let mut version_iter = versions.chunks(100);
let version_iter = versions.chunks(100);

while let Some(chunk) = version_iter.next() {
for chunk in version_iter {
let mut conn = agent.pool().write_low().await?;
debug!("processing emptyset from {:?}", actor_id);
let booked = {
Expand Down Expand Up @@ -863,26 +867,6 @@ pub async fn handle_changes(
continue;
}

// drop items when the queue is full.
if queue.len() > max_queue_len {
drop_log_count += 1;
if is_pow_10(drop_log_count) {
if drop_log_count == 1 {
warn!("dropping a change because changes queue is full");
} else {
warn!(
"dropping {} changes because changes queue is full",
drop_log_count
);
}
}
// reset count
if drop_log_count == 100000000 {
drop_log_count = 0;
}
continue;
}

if let Some(mut seqs) = change.seqs().cloned() {
let v = *change.versions().start();
if let Some(seen_seqs) = seen.get(&(change.actor_id, v)) {
Expand Down Expand Up @@ -933,6 +917,18 @@ pub async fn handle_changes(
}
}

// drop old items when the queue is full.
if queue.len() > max_queue_len {
let change = queue.pop_back();
if let Some(change) = change {
for v in change.0.versions() {
let _ = seen.remove(&(change.0.actor_id, v));
}
}

log_at_pow_10("dropped old change from queue", &mut drop_log_count);
}

if let Some(recv_lag) = recv_lag {
let src_str: &'static str = src.into();
histogram!("corro.agent.changes.recv.lag.seconds", "source" => src_str)
Expand Down Expand Up @@ -1164,11 +1160,3 @@ mod tests {
Ok(())
}
}

#[inline]
fn is_pow_10(i: u64) -> bool {
matches!(
i,
1 | 10 | 100 | 1000 | 10000 | 1000000 | 10000000 | 100000000
)
}
1 change: 1 addition & 0 deletions crates/corro-agent/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub use error::{SyncClientError, SyncRecvError};
pub use run_root::start_with_config;
pub use setup::{setup, AgentOptions};
pub use util::process_multiple_changes;
pub use uni::spawn_unipayload_handler;

pub const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(300);
#[cfg(test)]
Expand Down
Loading

0 comments on commit 99fb53f

Please sign in to comment.