Skip to content

Commit

Permalink
Initial sync before server start (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion authored Jan 3, 2024
1 parent 574dee1 commit 090a23d
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 14 deletions.
2 changes: 1 addition & 1 deletion bundler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
reqwest = "0.11.22"
ethereum_serde_utils = "0.5"
url = "2.5.0"
futures = "0.3.29"
futures = { version = "0.3.29", features = ["std"] }
rand = "0.8.5"
prometheus = { version = "0.13.3", features = ["push"] }
lazy_static = "1.4.0"
Expand Down
55 changes: 54 additions & 1 deletion bundler/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ethers::{
types::{Address, BlockNumber},
};
use eyre::{bail, eyre, Result};
use futures::{stream, StreamExt, TryStreamExt};
use sqlx::MySqlPool;
use tokio::sync::{Notify, RwLock};

Expand All @@ -19,11 +20,15 @@ use crate::{
DataIntentTracker,
},
eth_provider::EthProvider,
info,
sync::{BlockSync, BlockWithTxs, NonceStatus, SyncBlockError, SyncBlockOutcome, TxInclusion},
utils::address_to_hex_lowercase,
AppConfig, BlobTxSummary, DataIntent,
warn, AppConfig, BlobTxSummary, DataIntent,
};

pub(crate) const PERSIST_ANCHOR_BLOCK_INITIAL_SYNC_INTERVAL: u64 = 32;
pub(crate) const MAX_DISTANCE_SYNC: u64 = 8;

pub(crate) struct AppData {
pub config: AppConfig,
pub kzg_settings: c_kzg::KzgSettings,
Expand Down Expand Up @@ -328,6 +333,54 @@ impl AppData {
(data_intents, items_from_previous_inclusions)
}

/// Do initial blocking sync to get to the remote node head before starting the API and
/// potentially building blob transactions.
pub async fn initial_block_sync(&self) -> Result<()> {
loop {
let remote_node_head_block = self.fetch_remote_node_latest_block_number().await?;
let head_block = self.sync.read().await.get_head().number;

// Every sync iteration get closer to the remote head until being close enough
if head_block < remote_node_head_block + MAX_DISTANCE_SYNC {
break;
}

stream::iter(head_block + 1..remote_node_head_block)
.map(|block_number| self.fetch_block(block_number))
.buffered(16)
.try_for_each(|block| async {
let block_number = block.number;
let outcome = self.sync_next_head(block).await?;

if let SyncBlockOutcome::BlockKnown = outcome {
warn!("initial sync imported a known block {block_number}");
}

if block_number % PERSIST_ANCHOR_BLOCK_INITIAL_SYNC_INTERVAL == 0 {
self.maybe_advance_anchor_block().await?;
info!(
"initial sync progress {block_number}/{remote_node_head_block} {} left",
remote_node_head_block - block_number
)
}
Ok(())
})
.await?;
}

Ok(())
}

/// Helper for `self.initial_block_sync`
async fn fetch_block(&self, block_number: u64) -> Result<BlockWithTxs> {
let block = self
.provider
.get_block_with_txs(block_number)
.await?
.ok_or_else(|| eyre!(format!("no block for number {block_number}")))?;
BlockWithTxs::from_ethers_block(block)
}

pub async fn get_sync(&self) -> (SyncStatusBlock, SyncStatusBlock) {
(
self.sync.read().await.get_anchor().into(),
Expand Down
2 changes: 1 addition & 1 deletion bundler/src/blob_sender_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use ethers::{signers::Signer, types::TxHash};
use eyre::{bail, Context, Result};

use crate::{
app::MAX_DISTANCE_SYNC,
blob_tx_data::BlobTxParticipant,
data_intent_tracker::DataIntentDbRowFull,
debug, error,
Expand All @@ -19,7 +20,6 @@ use crate::{
/// Limit the maximum number of times a data intent included in a previous transaction can be
/// included again in a new transaction.
const MAX_PREVIOUS_INCLUSIONS: usize = 2;
const MAX_DISTANCE_SYNC: u64 = 8;

pub(crate) async fn blob_sender_task(app_data: Arc<AppData>) -> Result<()> {
let mut id = 0_u64;
Expand Down
28 changes: 18 additions & 10 deletions bundler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,28 @@ impl App {
app_data.sync_data_intents().await?;
info!("synced data intent tracker");

// Prints progress every few blocks to info level
app_data.initial_block_sync().await?;

let address = args.address();
let listener = TcpListener::bind(address.clone())?;
let listener_port = listener.local_addr().unwrap().port();
info!("Binding server on {}:{}", args.bind_address, listener_port);

let register_get_metrics = if args.metrics {
if args.metrics_port == args.port {
info!("enabling metrics on server port");
if args.metrics_bearer_token.is_none() {
warn!("UNSAFE: metrics exposed on the server port without auth");
}
true
} else {
todo!("serve metrics on different port");
}
} else {
false
};

let app_data_clone = app_data.clone();
let server = HttpServer::new(move || {
let app = actix_web::App::new()
Expand All @@ -300,11 +317,7 @@ impl App {
.service(get_balance_by_address);

// Conditionally register the metrics route
if args.metrics && args.metrics_port == args.port {
info!("enabling metrics on server port");
if args.metrics_bearer_token.is_none() {
warn!("UNSAFE: metrics exposed on the server port without auth");
}
if register_get_metrics {
app.service(get_metrics)
} else {
app
Expand All @@ -313,11 +326,6 @@ impl App {
.listen(listener)?
.run();

// TODO: serve metrics on different port
if args.metrics && args.metrics_port != args.port {
todo!("serve metrics on different port");
}

Ok(App {
port: listener_port,
server,
Expand Down
2 changes: 1 addition & 1 deletion bundler/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ impl From<&BlockSummary> for SyncStatusBlock {
pub struct BlockWithTxs {
hash: H256,
parent_hash: H256,
number: u64,
pub number: u64,
transactions: Vec<Transaction>,
pub gas: BlockGasSummary,
}
Expand Down

0 comments on commit 090a23d

Please sign in to comment.