Skip to content

Commit

Permalink
Handle unsynced node better (#40)
Browse files Browse the repository at this point in the history
* Allow starting block to overwrite anchor block

* Actually persist anchor block
  • Loading branch information
dapplion authored Jan 3, 2024
1 parent 34ec036 commit 574dee1
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 82 deletions.
40 changes: 10 additions & 30 deletions bundler/src/anchor_block.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,23 @@
use std::path::PathBuf;

use eyre::{bail, eyre, Context, Result};
use eyre::{eyre, Result};
use sqlx::MySqlPool;
use tokio::{fs, io};

use crate::{
eth_provider::EthProvider, gas::block_gas_summary_from_block, sync::AnchorBlock, StartingPoint,
};
use crate::{eth_provider::EthProvider, gas::block_gas_summary_from_block, sync::AnchorBlock};

pub(crate) async fn get_anchor_block(
anchor_block_filepath: &PathBuf,
db_pool: &MySqlPool,
provider: &EthProvider,
starting_point: StartingPoint,
starting_block: u64,
) -> Result<AnchorBlock> {
// TODO: choose starting point that's not genesis
// Attempt to read persisted file first if exists
match fs::read_to_string(&anchor_block_filepath).await {
Ok(str) => return serde_json::from_str(&str).wrap_err_with(|| "parsing anchor block file"),
Err(e) => match e.kind() {
io::ErrorKind::NotFound => {} // Ok continue
_ => bail!(
"error opening anchor_block file {}: {e:?}",
anchor_block_filepath.to_string_lossy()
),
},
}

// Second, fetch from DB
if let Some(anchor_block) = fetch_anchor_block_from_db(db_pool).await? {
return Ok(anchor_block);
if anchor_block.number >= starting_block {
return Ok(anchor_block);
}
// else bootstrap from starting block
}

// Last initialize from network at starting point
match starting_point {
StartingPoint::StartingBlock(starting_block) => {
anchor_block_from_starting_block(provider, starting_block).await
}
}
anchor_block_from_starting_block(provider, starting_block).await
}

/// Fetch AnchorBlock from DB
Expand All @@ -58,10 +38,10 @@ pub async fn fetch_anchor_block_from_db(db_pool: &MySqlPool) -> Result<Option<An
/// TODO: Keep a single row with latest block
pub async fn persist_anchor_block_to_db(
db_pool: &MySqlPool,
anchor_block: AnchorBlock,
anchor_block: &AnchorBlock,
) -> Result<()> {
// Serialize the AnchorBlock (except the block_number field) to a JSON string
let anchor_block_json = serde_json::to_string(&anchor_block)?;
let anchor_block_json = serde_json::to_string(anchor_block)?;
let block_number = anchor_block.number;

// Insert the data into the database
Expand Down
27 changes: 22 additions & 5 deletions bundler/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ use bundler_client::types::{
BlockGasSummary, DataIntentFull, DataIntentId, DataIntentStatus, DataIntentSummary,
SyncStatusBlock,
};
use ethers::{signers::LocalWallet, types::Address};
use eyre::{bail, Result};
use ethers::{
signers::LocalWallet,
types::{Address, BlockNumber},
};
use eyre::{bail, eyre, Result};
use sqlx::MySqlPool;
use tokio::sync::{Notify, RwLock};

use crate::{
anchor_block::persist_anchor_block_to_db,
data_intent_tracker::{
fetch_all_intents_with_inclusion_not_finalized, fetch_data_intent_db_full,
fetch_data_intent_db_is_known, fetch_data_intent_inclusion, fetch_many_data_intent_db_full,
Expand Down Expand Up @@ -105,7 +109,9 @@ impl AppData {
}

pub async fn maybe_advance_anchor_block(&self) -> Result<Option<(Vec<BlobTxSummary>, u64)>> {
if let Some(finalized_result) = self.sync.write().await.maybe_advance_anchor_block()? {
let finalize_result = { self.sync.write().await.maybe_advance_anchor_block()? };

if let Some(finalized_result) = finalize_result {
let mut data_intent_tracker = self.data_intent_tracker.write().await;
for tx in &finalized_result.finalized_included_txs {
data_intent_tracker.finalize_tx(tx.tx_hash);
Expand All @@ -119,6 +125,9 @@ impl AppData {
// data_intent_tracker.drop_excluded_tx(excluded_tx.tx_hash);
}

// TODO: Persist anchor block to DB less often
persist_anchor_block_to_db(&self.db_pool, self.sync.read().await.get_anchor()).await?;

Ok(Some((
finalized_result.finalized_included_txs,
finalized_result.new_anchor_block_number,
Expand Down Expand Up @@ -330,8 +339,16 @@ impl AppData {
*self.sync.read().await.get_head_gas()
}

pub async fn serialize_anchor_block(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(self.sync.read().await.get_anchor())
pub async fn fetch_remote_node_latest_block_number(&self) -> Result<u64> {
let block = self
.provider
.get_block(BlockNumber::Latest)
.await?
.ok_or_else(|| eyre!("no latest block"))?;
Ok(block
.number
.ok_or_else(|| eyre!("block has no number"))?
.as_u64())
}

pub async fn collect_metrics(&self) {
Expand Down
10 changes: 9 additions & 1 deletion bundler/src/blob_sender_task.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use ethers::{signers::Signer, types::TxHash};
use eyre::{Context, Result};
use eyre::{bail, Context, Result};

use crate::{
blob_tx_data::BlobTxParticipant,
Expand All @@ -19,6 +19,7 @@ use crate::{
/// Limit the maximum number of times a data intent included in a previous transaction can be
/// included again in a new transaction.
const MAX_PREVIOUS_INCLUSIONS: usize = 2;
const MAX_DISTANCE_SYNC: u64 = 8;

pub(crate) async fn blob_sender_task(app_data: Arc<AppData>) -> Result<()> {
let mut id = 0_u64;
Expand Down Expand Up @@ -69,6 +70,13 @@ pub(crate) enum SendResult {
pub(crate) async fn maybe_send_blob_tx(app_data: Arc<AppData>, _id: u64) -> Result<SendResult> {
let _timer = metrics::BLOB_SENDER_TASK_TIMES.start_timer();

// Only allow to send blob transactions if synced with remote node
let remote_node_head = app_data.fetch_remote_node_latest_block_number().await?;
let head_number = app_data.get_sync().await.1.number;
if remote_node_head > head_number + MAX_DISTANCE_SYNC {
bail!("Local head number {head_number} not synced with remote node {remote_node_head}");
}

// Sync available intents
app_data.sync_data_intents().await?;

Expand Down
19 changes: 1 addition & 18 deletions bundler/src/block_subscriber_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::sync::Arc;

use ethers::{providers::StreamExt, types::TxHash};
use eyre::{eyre, Context, Result};
use tokio::fs;

use crate::{
debug, error, info, metrics,
Expand All @@ -21,6 +20,7 @@ pub(crate) async fn block_subscriber_task(app_data: Arc<AppData>) -> Result<()>

loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => break,
block_hash = s.next() => {
// block_hash type := Option<Result<H256>>
let block_hash = block_hash.ok_or_else(|| eyre!("block stream closed"))??;
Expand Down Expand Up @@ -50,7 +50,6 @@ pub(crate) async fn block_subscriber_task(app_data: Arc<AppData>) -> Result<()>
// Maybe compute new blob transactions
app_data.notify.notify_one();
},
_ = tokio::signal::ctrl_c() => break,

}
}
Expand Down Expand Up @@ -116,22 +115,6 @@ async fn sync_block(app_data: Arc<AppData>, block_hash: TxHash) -> Result<(), Sy
);
metrics::SYNC_ANCHOR_NUMBER.set(new_anchor_block_number as f64);
metrics::FINALIZED_TXS.inc_by(finalized_tx_hashes.len() as f64);

// Persist anchor block
// TODO: Throttle to not persist every block, not necessary
let anchor_block_str = {
app_data
.serialize_anchor_block()
.await
.wrap_err("serializing AnchorBlock")?
};
fs::write(&app_data.config.anchor_block_filepath, anchor_block_str)
.await
.wrap_err("persisting anchor block")?;
debug!(
"persisted anchor_block file at {}",
app_data.config.anchor_block_filepath.to_string_lossy()
);
}

Ok(())
Expand Down
13 changes: 1 addition & 12 deletions bundler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ impl Args {
struct AppConfig {
l1_inbox_address: Address,
panic_on_background_task_errors: bool,
anchor_block_filepath: PathBuf,
metrics_server_bearer_token: Option<String>,
metrics_push: Option<PushMetricsConfig>,
}
Expand All @@ -170,16 +169,10 @@ pub struct App {
data: Arc<AppData>,
}

enum StartingPoint {
StartingBlock(u64),
}

impl App {
/// Instantiates components, fetching initial data, binds http server. Does not make progress
/// on the server future. To actually run the app, call `Self::run`.
pub async fn build(args: Args) -> Result<Self> {
let starting_point = StartingPoint::StartingBlock(args.starting_block);

let mut provider = EthProvider::new(&args.eth_provider).await?;

if let Some(eth_provider_interval) = args.eth_provider_interval {
Expand Down Expand Up @@ -222,11 +215,8 @@ impl App {
.connect(&args.database_url)
.await?;

let anchor_block_filepath = data_dir.join("anchor_block.json");

// TODO: choose starting point that's not genesis
let anchor_block =
get_anchor_block(&anchor_block_filepath, &db_pool, &provider, starting_point).await?;
let anchor_block = get_anchor_block(&db_pool, &provider, args.starting_block).await?;
debug!("retrieved anchor block: {:?}", anchor_block);

let sync = BlockSync::new(
Expand All @@ -242,7 +232,6 @@ impl App {
let config = AppConfig {
l1_inbox_address: Address::from_str(ADDRESS_ZERO)?,
panic_on_background_task_errors: args.panic_on_background_task_errors,
anchor_block_filepath,
metrics_server_bearer_token: args.metrics_bearer_token.clone(),
metrics_push: if let Some(url) = &args.metrics_push_url {
Some(PushMetricsConfig {
Expand Down
17 changes: 2 additions & 15 deletions bundler/src/remote_node_tracker_task.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::sync::Arc;

use ethers::types::BlockNumber;
use eyre::{eyre, Result};
use eyre::Result;
use tokio::time::{self, Duration};

use crate::eth_provider::EthProvider;
use crate::{debug, error, metrics, AppData};

pub(crate) async fn remote_node_tracker_task(app_data: Arc<AppData>) -> Result<()> {
Expand All @@ -18,7 +16,7 @@ pub(crate) async fn remote_node_tracker_task(app_data: Arc<AppData>) -> Result<(
_ = interval.tick() => {}
}

match fetch_latest_block_number(&app_data.provider).await {
match app_data.fetch_remote_node_latest_block_number().await {
Ok(block_number) => {
debug!("Remote node head block number {block_number}");
metrics::REMOTE_NODE_HEAD_BLOCK_NUMBER.set(block_number as f64);
Expand All @@ -30,14 +28,3 @@ pub(crate) async fn remote_node_tracker_task(app_data: Arc<AppData>) -> Result<(
}
}
}

async fn fetch_latest_block_number(provider: &EthProvider) -> Result<u64> {
let block = provider
.get_block(BlockNumber::Latest)
.await?
.ok_or_else(|| eyre!("no latest block"))?;
Ok(block
.number
.ok_or_else(|| eyre!("block has no number"))?
.as_u64())
}
2 changes: 1 addition & 1 deletion bundler/tests/api/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl TestHarness {
);

let db_pool = connect_db_pool(&database_url).await;
persist_anchor_block_to_db(&db_pool, anchor_block)
persist_anchor_block_to_db(&db_pool, &anchor_block)
.await
.unwrap();
}
Expand Down

0 comments on commit 574dee1

Please sign in to comment.