Skip to content

Commit

Permalink
Ignore old worker query logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiezzel committed Aug 27, 2024
1 parent 053c823 commit 69c00a4
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 18 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion contract-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "contract-client"
license = "AGPL-3.0-or-later"
version = "1.0.4"
version = "1.0.5"
edition = "2021"

[dependencies]
Expand Down
17 changes: 17 additions & 0 deletions contract-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ impl NetworkNodes {
pub type NodeStream =
Pin<Box<dyn Stream<Item = Result<NetworkNodes, ClientError>> + Send + 'static>>;

/// A stream of (epoch_number, epoch_start_time)
pub type EpochStream =
Pin<Box<dyn Stream<Item = Result<(u32, SystemTime), ClientError>> + Send + 'static>>;

#[async_trait]
pub trait Client: Send + Sync + 'static {
/// Using regular clone is not possible for trait objects
Expand Down Expand Up @@ -129,6 +133,19 @@ pub trait Client: Send + Sync + 'static {
}
}))
}

/// Get a stream of current epoch number and start time
/// Updated on the given interval
fn epoch_stream(self: Box<Self>, interval: Duration) -> EpochStream {
Box::pin(IntervalStream::new(tokio::time::interval(interval)).then(move |_| {
let client = self.clone_client();
async move {
let epoch_number = client.current_epoch().await?;
let epoch_start = client.current_epoch_start().await?;
Ok((epoch_number, epoch_start))
}
}))
}
}

pub async fn get_client(rpc_args: &RpcArgs) -> Result<Box<dyn Client>, ClientError> {
Expand Down
2 changes: 1 addition & 1 deletion contract-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ pub use libp2p::PeerId;

pub use cli::{Network, RpcArgs};
pub use client::{
get_client, Allocation, Client, GatewayCluster, NetworkNodes, NodeStream, Worker,
get_client, Allocation, Client, EpochStream, GatewayCluster, NetworkNodes, NodeStream, Worker,
};
pub use error::ClientError;
2 changes: 1 addition & 1 deletion transport/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "subsquid-network-transport"
license = "AGPL-3.0-or-later"
version = "1.0.17"
version = "1.0.18"
edition = "2021"

[[bin]]
Expand Down
59 changes: 48 additions & 11 deletions transport/src/behaviour/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
num::NonZeroUsize,
sync::Arc,
task::{Context, Poll},
time::Duration,
time::{Duration, SystemTime},
vec,
};

Expand Down Expand Up @@ -36,7 +36,7 @@ use parking_lot::RwLock;
use prost::Message;
use serde::{Deserialize, Serialize};

use contract_client::{NetworkNodes, NodeStream};
use contract_client::{EpochStream, NetworkNodes, NodeStream};
use subsquid_messages::{
signatures::SignedMessage, worker_logs_msg, LogsCollected, Ping, QueryExecuted, QueryLogs,
WorkerLogsMsg,
Expand All @@ -52,8 +52,9 @@ use crate::{
},
cli::BootNode,
protocol::{
ID_PROTOCOL, KEEP_LAST_WORKER_LOGS, LOGS_COLLECTED_MIN_INTERVAL, LOGS_COLLECTED_TOPIC,
LOGS_MIN_INTERVAL, MAX_PUBSUB_MSG_SIZE, PINGS_MIN_INTERVAL, PING_TOPIC, WORKER_LOGS_TOPIC,
EPOCH_SEAL_TIMEOUT, ID_PROTOCOL, KEEP_LAST_WORKER_LOGS, LOGS_COLLECTED_MIN_INTERVAL,
LOGS_COLLECTED_TOPIC, LOGS_MIN_INTERVAL, MAX_PUBSUB_MSG_SIZE, PINGS_MIN_INTERVAL,
PING_TOPIC, WORKER_LOGS_TOPIC,
},
record_event,
util::{addr_is_reachable, parse_env_var},
Expand All @@ -75,8 +76,8 @@ pub struct InnerBehaviour {

#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct BaseConfig {
/// How often to check for whitelisted nodes on chain (default: 5 min).
pub nodes_update_interval: Duration,
/// How often to check for on-chain updates: current epoch and registered nodes (default: 3 min).
pub onchain_update_interval: Duration,
/// Timeout for autoNAT probes (default: 60 sec).
pub autonat_timeout: Duration,
/// How often to publish indetify info to connected nodes (default: 60 sec).
Expand All @@ -96,7 +97,7 @@ pub struct BaseConfig {
impl BaseConfig {
pub fn from_env() -> Self {
let nodes_update_interval =
Duration::from_secs(parse_env_var("NODES_UPDATE_INTERVAL_SEC", 300));
Duration::from_secs(parse_env_var("ONCHAIN_UPDATE_INTERVAL_SEC", 180));
let autonat_timeout = Duration::from_secs(parse_env_var("AUTONAT_TIMEOUT_SEC", 60));
let identify_interval = Duration::from_secs(parse_env_var("IDENTIFY_INTERVAL_SEC", 60));
let probe_timeout = Duration::from_secs(parse_env_var("PROBE_TIMEOUT_SEC", 20));
Expand All @@ -106,7 +107,7 @@ impl BaseConfig {
let addr_cache_size = NonZeroUsize::new(parse_env_var("ADDR_CACHE_SIZE", 1024))
.expect("addr_cache_size should be > 0");
Self {
nodes_update_interval,
onchain_update_interval: nodes_update_interval,
autonat_timeout,
identify_interval,
probe_timeout,
Expand All @@ -126,8 +127,10 @@ pub struct BaseBehaviour {
probe_timeouts: FuturesMap<PeerId, ()>,
registered_nodes: HashSet<PeerId>,
registered_workers: Arc<RwLock<HashSet<PeerId>>>,
current_epoch_start: Arc<RwLock<SystemTime>>,
logs_collected: Arc<RwLock<HashMap<String, u64>>>, // peer_id (base58) -> highest collected seq_no
active_nodes_stream: NodeStream,
epoch_stream: EpochStream,
max_pubsub_msg_size: usize,
}

Expand Down Expand Up @@ -183,8 +186,12 @@ impl BaseBehaviour {
probe_timeouts: FuturesMap::new(config.probe_timeout, config.max_concurrent_probes),
registered_nodes: Default::default(),
registered_workers: Arc::new(RwLock::new(Default::default())),
current_epoch_start: Arc::new(RwLock::new(SystemTime::UNIX_EPOCH)),
logs_collected: Arc::new(RwLock::new(Default::default())),
active_nodes_stream: contract_client.network_nodes_stream(config.nodes_update_interval),
active_nodes_stream: contract_client
.clone_client()
.network_nodes_stream(config.onchain_update_interval),
epoch_stream: contract_client.epoch_stream(config.onchain_update_interval),
max_pubsub_msg_size: config.max_pubsub_msg_size,
}
}
Expand All @@ -206,6 +213,7 @@ impl BaseBehaviour {
// Unordered messages need to be allowed, because we're interested in all messages from
// each worker, not only the most recent one (as in the case of pings).
let registered_workers = self.registered_workers.clone();
let epoch_start = self.current_epoch_start.clone();
let logs_collected = self.logs_collected.clone();
let config = MsgValidationConfig::new(LOGS_MIN_INTERVAL)
.max_burst(10)
Expand All @@ -221,10 +229,18 @@ impl BaseBehaviour {
return Err(ValidationError::Invalid("Invalid worker logs"));
};
// Logs are sorted by seq_no, so we need to check the last one only
let last_seq_no = match msg.queries_executed.last() {
Some(query_executed) => query_executed.seq_no.unwrap_or_default(),
let (last_timestamp, last_seq_no) = match msg.queries_executed.last() {
Some(query_executed) => (
query_executed.timestamp_ms.unwrap_or_default(),
query_executed.seq_no.unwrap_or_default(),
),
None => return Err(ValidationError::Invalid("Empty worker logs")),
};
// Don't propagate logs which are old & no longer relevant
let last_log_time = SystemTime::UNIX_EPOCH + Duration::from_millis(last_timestamp);
if last_log_time + EPOCH_SEAL_TIMEOUT < *epoch_start.read() {
return Err(ValidationError::Ignored("Old worker logs"));
}
// Don't propagate worker logs which have already been collected
match logs_collected.read().get(&peer_id.to_base58()) {
Some(seq_no) if *seq_no >= last_seq_no => {
Expand Down Expand Up @@ -338,6 +354,18 @@ impl BaseBehaviour {
}
self.registered_nodes = nodes;
}

fn on_epoch_update(&self, result: Result<(u32, SystemTime), contract_client::ClientError>) {
let epoch_start = match result {
Err(e) => {
log::error!("Error retrieving current epoch from chain: {e:?}");
return;
}
Ok((_, epoch_start)) => epoch_start,
};

*self.current_epoch_start.write() = epoch_start;
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -416,6 +444,15 @@ impl BehaviourWrapper for BaseBehaviour {
_ => unreachable!(), // infinite stream
}

match self.epoch_stream.poll_next_unpin(cx) {
Poll::Ready(Some(res)) => {
self.on_epoch_update(res);
continue;
}
Poll::Pending => {}
_ => unreachable!(), // infinite stream
}

match self.probe_timeouts.poll_unpin(cx) {
Poll::Ready((peer_id, Err(_))) => {
#[cfg(feature = "metrics")]
Expand Down
4 changes: 2 additions & 2 deletions transport/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ impl P2PTransportBuilder {
};
// We need to wait for other nodes to catch up and whitelist our peer ID
let elapsed = reg_time.elapsed().unwrap_or_default();
if elapsed < self.base_config.nodes_update_interval {
tokio::time::sleep(self.base_config.nodes_update_interval - elapsed).await;
if elapsed < self.base_config.onchain_update_interval {
tokio::time::sleep(self.base_config.onchain_update_interval - elapsed).await;
}
break;
}
Expand Down
1 change: 1 addition & 0 deletions transport/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub const KEEP_LAST_WORKER_LOGS: u64 = 100;
pub const PINGS_MIN_INTERVAL: Duration = Duration::from_secs(20);
pub const LOGS_MIN_INTERVAL: Duration = Duration::from_secs(120);
pub const LOGS_COLLECTED_MIN_INTERVAL: Duration = Duration::from_secs(60);
pub const EPOCH_SEAL_TIMEOUT: Duration = Duration::from_secs(600);

pub const fn dht_protocol(network: Network) -> StreamProtocol {
match network {
Expand Down

0 comments on commit 69c00a4

Please sign in to comment.