Skip to content

Commit

Permalink
Revert "refactor(katana-node): distinguish between launched node hand…
Browse files Browse the repository at this point in the history
…le (#2504)"

This reverts commit 3e51d5d.
  • Loading branch information
steebchen committed Oct 11, 2024
1 parent d29b4a4 commit ab43725
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 221 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 9 additions & 18 deletions bin/katana/src/cli/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,32 +224,23 @@ impl NodeArgs {
let sequencer_config = self.sequencer_config();
let starknet_config = self.starknet_config()?;

// Build the node
let node = katana_node::build(server_config, sequencer_config, starknet_config)
.await
.context("failed to build node")?;
// build the node and start it
let node = katana_node::start(server_config, sequencer_config, starknet_config).await?;

if !self.silent {
#[allow(deprecated)]
let genesis = &node.backend.config.genesis;
let server_address = node.server_config.addr();
print_intro(&self, genesis, &server_address);
print_intro(&self, genesis, node.rpc.addr);
}

// Launch the node
let handle = node.launch().await.context("failed to launch node")?;

// Wait until an OS signal (ie SIGINT, SIGTERM) is received or the node is shutdown.
// Wait until an OS signal is received or TaskManager shutdown
tokio::select! {
_ = dojo_utils::signal::wait_signals() => {
// Gracefully shutdown the node before exiting
handle.stop().await?;
},

_ = handle.stopped() => { }
_ = dojo_utils::signal::wait_signals() => {},
_ = node.task_manager.wait_for_shutdown() => {}
}

info!("Shutting down.");
info!("Shutting down...");
node.stop().await?;

Ok(())
}
Expand Down Expand Up @@ -346,7 +337,7 @@ impl NodeArgs {
}
}

fn print_intro(args: &NodeArgs, genesis: &Genesis, address: &str) {
fn print_intro(args: &NodeArgs, genesis: &Genesis, address: SocketAddr) {
let mut accounts = genesis.accounts().peekable();
let account_class_hash = accounts.peek().map(|e| e.1.class_hash());
let seed = &args.starknet.seed;
Expand Down
17 changes: 8 additions & 9 deletions crates/dojo-test-utils/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use katana_core::constants::DEFAULT_SEQUENCER_ADDRESS;
#[allow(deprecated)]
pub use katana_core::sequencer::SequencerConfig;
use katana_executor::implementation::blockifier::BlockifierFactory;
use katana_node::LaunchedNode;
use katana_node::Handle;
use katana_primitives::chain::ChainId;
use katana_rpc::config::ServerConfig;
use katana_rpc_api::ApiKind;
Expand All @@ -29,7 +29,7 @@ pub struct TestAccount {
#[allow(missing_debug_implementations)]
pub struct TestSequencer {
url: Url,
handle: LaunchedNode,
handle: Handle,
account: TestAccount,
}

Expand All @@ -45,20 +45,19 @@ impl TestSequencer {
apis: vec![ApiKind::Starknet, ApiKind::Dev, ApiKind::Saya, ApiKind::Torii],
};

let node = katana_node::build(server_config, config, starknet_config)
let node = katana_node::start(server_config, config, starknet_config)
.await
.expect("Failed to build node components");
let handle = node.launch().await.expect("Failed to launch node");

let url = Url::parse(&format!("http://{}", handle.rpc.addr)).expect("Failed to parse URL");
let url = Url::parse(&format!("http://{}", node.rpc.addr)).expect("Failed to parse URL");

let account = handle.node.backend.config.genesis.accounts().next().unwrap();
let account = node.backend.config.genesis.accounts().next().unwrap();
let account = TestAccount {
private_key: Felt::from_bytes_be(&account.1.private_key().unwrap().to_bytes_be()),
account_address: Felt::from_bytes_be(&account.0.to_bytes_be()),
};

TestSequencer { handle, account, url }
TestSequencer { handle: node, account, url }
}

pub fn account(&self) -> SingleOwnerAccount<JsonRpcClient<HttpTransport>, LocalWallet> {
Expand All @@ -80,15 +79,15 @@ impl TestSequencer {
}

pub fn backend(&self) -> &Arc<Backend<BlockifierFactory>> {
&self.handle.node.backend
&self.handle.backend
}

pub fn account_at_index(
&self,
index: usize,
) -> SingleOwnerAccount<JsonRpcClient<HttpTransport>, LocalWallet> {
#[allow(deprecated)]
let accounts: Vec<_> = self.handle.node.backend.config.genesis.accounts().collect::<_>();
let accounts: Vec<_> = self.handle.backend.config.genesis.accounts().collect::<_>();

let account = accounts[index];
let private_key = Felt::from_bytes_be(&account.1.private_key().unwrap().to_bytes_be());
Expand Down
1 change: 0 additions & 1 deletion crates/katana/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ katana-tasks.workspace = true

anyhow.workspace = true
dojo-metrics.workspace = true
futures.workspace = true
hyper.workspace = true
jsonrpsee.workspace = true
num-traits.workspace = true
Expand Down
41 changes: 0 additions & 41 deletions crates/katana/node/src/exit.rs

This file was deleted.

166 changes: 58 additions & 108 deletions crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

mod exit;

use std::future::IntoFuture;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use dojo_metrics::prometheus_exporter::PrometheusHandle;
use dojo_metrics::{metrics_process, prometheus_exporter, Report};
use hyper::{Method, Uri};
use jsonrpsee::server::middleware::proxy_get_request::ProxyGetRequestLayer;
Expand All @@ -22,7 +19,6 @@ use katana_core::env::BlockContextGenerator;
#[allow(deprecated)]
use katana_core::sequencer::SequencerConfig;
use katana_core::service::block_producer::BlockProducer;
use katana_db::mdbx::DbEnv;
use katana_executor::implementation::blockifier::BlockifierFactory;
use katana_executor::{ExecutorFactory, SimulationFlag};
use katana_pipeline::{stage, Pipeline};
Expand Down Expand Up @@ -53,103 +49,27 @@ use starknet::providers::{JsonRpcClient, Provider};
use tower_http::cors::{AllowOrigin, CorsLayer};
use tracing::{info, trace};

use crate::exit::NodeStoppedFuture;

/// A handle to the launched node.
/// A handle to the instantiated Katana node.
#[allow(missing_debug_implementations)]
pub struct LaunchedNode {
pub node: Node,
/// Handle to the rpc server.
pub rpc: RpcServer,
}

impl LaunchedNode {
/// Stops the node.
///
/// This will instruct the node to stop and wait until it has actually stop.
pub async fn stop(&self) -> Result<()> {
// TODO: wait for the rpc server to stop instead of just stopping it.
self.rpc.handle.stop()?;
self.node.task_manager.shutdown().await;
Ok(())
}

/// Returns a future which resolves only when the node has stopped.
pub fn stopped(&self) -> NodeStoppedFuture<'_> {
NodeStoppedFuture::new(self)
}
}

/// A node instance.
///
/// The struct contains the handle to all the components of the node.
#[must_use = "Node does nothing unless launched."]
#[allow(missing_debug_implementations)]
pub struct Node {
pub struct Handle {
pub pool: TxPool,
pub db: Option<DbEnv>,
pub rpc: RpcServer,
pub task_manager: TaskManager,
pub prometheus_handle: PrometheusHandle,
pub backend: Arc<Backend<BlockifierFactory>>,
pub block_producer: BlockProducer<BlockifierFactory>,
pub server_config: ServerConfig,
#[allow(deprecated)]
pub sequencer_config: SequencerConfig,
}

impl Node {
/// Start the node.
///
/// This method will start all the node process, running them until the node is stopped.
pub async fn launch(self) -> Result<LaunchedNode> {
if let Some(addr) = self.server_config.metrics {
let mut reports = Vec::new();
if let Some(ref db) = self.db {
reports.push(Box::new(db.clone()) as Box<dyn Report>);
}

prometheus_exporter::serve(
addr,
self.prometheus_handle.clone(),
metrics_process::Collector::default(),
reports,
)
.await?;

info!(%addr, "Metrics endpoint started.");
}

let pool = self.pool.clone();
let backend = self.backend.clone();
let block_producer = self.block_producer.clone();
let validator = self.block_producer.validator().clone();

// --- build sequencing stage

#[allow(deprecated)]
let sequencing = stage::Sequencing::new(
pool.clone(),
backend.clone(),
self.task_manager.clone(),
block_producer.clone(),
self.sequencer_config.messaging.clone(),
);

// --- build and start the pipeline

let mut pipeline = Pipeline::new();
pipeline.add_stage(Box::new(sequencing));

self.task_manager.spawn(pipeline.into_future());

let node_components = (pool, backend, block_producer, validator);
let rpc = spawn(node_components, self.server_config.clone()).await?;

Ok(LaunchedNode { node: self, rpc })
impl Handle {
/// Stops the Katana node.
pub async fn stop(self) -> Result<()> {
// TODO: wait for the rpc server to stop
self.rpc.handle.stop()?;
self.task_manager.shutdown().await;
Ok(())
}
}

/// Build the core Katana components from the given configurations.
/// Build the core Katana components from the given configurations and start running the node.
// TODO: placeholder until we implement a dedicated class that encapsulate building the node
// components
//
Expand All @@ -159,15 +79,11 @@ impl Node {
//
// NOTE: Don't rely on this function as it is mainly used as a placeholder for now.
#[allow(deprecated)]
pub async fn build(
pub async fn start(
server_config: ServerConfig,
sequencer_config: SequencerConfig,
mut starknet_config: StarknetConfig,
) -> Result<Node> {
// Metrics recorder must be initialized before calling any of the metrics macros, in order
// for it to be registered.
let prometheus_handle = prometheus_exporter::install_recorder("katana")?;

) -> Result<Handle> {
// --- build executor factory

let cfg_env = CfgEnv {
Expand Down Expand Up @@ -273,18 +189,52 @@ pub async fn build(
let validator = block_producer.validator();
let pool = TxPool::new(validator.clone(), FiFo::new());

let node = Node {
db,
pool,
backend,
server_config,
block_producer,
sequencer_config,
prometheus_handle,
task_manager: TaskManager::current(),
};
// --- build metrics service

// Metrics recorder must be initialized before calling any of the metrics macros, in order for
// it to be registered.
if let Some(addr) = server_config.metrics {
let prometheus_handle = prometheus_exporter::install_recorder("katana")?;
let reports = db.map(|db| vec![Box::new(db) as Box<dyn Report>]).unwrap_or_default();

prometheus_exporter::serve(
addr,
prometheus_handle,
metrics_process::Collector::default(),
reports,
)
.await?;

info!(%addr, "Metrics endpoint started.");
}

// --- create a TaskManager using the ambient Tokio runtime

let task_manager = TaskManager::current();

// --- build sequencing stage

let sequencing = stage::Sequencing::new(
pool.clone(),
backend.clone(),
task_manager.clone(),
block_producer.clone(),
sequencer_config.messaging.clone(),
);

// --- build and start the pipeline

let mut pipeline = Pipeline::new();
pipeline.add_stage(Box::new(sequencing));

task_manager.spawn(pipeline.into_future());

// --- spawn rpc server

let node_components = (pool.clone(), backend.clone(), block_producer.clone(), validator);
let rpc = spawn(node_components, server_config).await?;

Ok(node)
Ok(Handle { backend, block_producer, pool, rpc, task_manager })
}

// Moved from `katana_rpc` crate
Expand Down
Loading

0 comments on commit ab43725

Please sign in to comment.