Skip to content

Commit

Permalink
refactor(katana-node): distinguish between launched node handle (#2504)
Browse files Browse the repository at this point in the history
abstraction to distinguish between a launched and not yet launch node handle. also appropriately place the stop methods under the launched handle. this PR also includes some ergonomic changes to some of the futures used to stop the node.

having a different struct for this would allow conveying to the readers that the node can either be in static (not yet launched) and already launched node. and thus only expose certain methods based on the appropriate handle.

eg, the LaunchedNode::stop() method is placed under LaunchedNode because you can only stop node that has been started. it doesn't make sense to stop a node that hasn't even been ran.
  • Loading branch information
kariy authored Oct 8, 2024
1 parent 9c790a9 commit 3e51d5d
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 88 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 18 additions & 9 deletions bin/katana/src/cli/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,23 +225,32 @@ impl NodeArgs {
let sequencer_config = self.sequencer_config();
let starknet_config = self.starknet_config()?;

// build the node and start it
let node = katana_node::start(server_config, sequencer_config, starknet_config).await?;
// Build the node
let node = katana_node::build(server_config, sequencer_config, starknet_config)
.await
.context("failed to build node")?;

if !self.silent {
#[allow(deprecated)]
let genesis = &node.backend.config.genesis;
print_intro(&self, genesis, node.rpc.addr);
let server_address = node.server_config.addr();
print_intro(&self, genesis, &server_address);
}

// Wait until an OS signal is received or TaskManager shutdown
// Launch the node
let handle = node.launch().await.context("failed to launch node")?;

// Wait until an OS signal (ie SIGINT, SIGTERM) is received or the node is shutdown.
tokio::select! {
_ = dojo_utils::signal::wait_signals() => {},
_ = node.task_manager.wait_for_shutdown() => {}
_ = dojo_utils::signal::wait_signals() => {
// Gracefully shutdown the node before exiting
handle.stop().await?;
},

_ = handle.stopped() => { }
}

info!("Shutting down...");
node.stop().await?;
info!("Shutting down.");

Ok(())
}
Expand Down Expand Up @@ -339,7 +348,7 @@ impl NodeArgs {
}
}

fn print_intro(args: &NodeArgs, genesis: &Genesis, address: SocketAddr) {
fn print_intro(args: &NodeArgs, genesis: &Genesis, address: &str) {
let mut accounts = genesis.accounts().peekable();
let account_class_hash = accounts.peek().map(|e| e.1.class_hash());
let seed = &args.starknet.seed;
Expand Down
17 changes: 9 additions & 8 deletions crates/dojo-test-utils/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use katana_core::constants::DEFAULT_SEQUENCER_ADDRESS;
#[allow(deprecated)]
pub use katana_core::sequencer::SequencerConfig;
use katana_executor::implementation::blockifier::BlockifierFactory;
use katana_node::Handle;
use katana_node::LaunchedNode;
use katana_primitives::chain::ChainId;
use katana_rpc::config::ServerConfig;
use katana_rpc_api::ApiKind;
Expand All @@ -29,7 +29,7 @@ pub struct TestAccount {
#[allow(missing_debug_implementations)]
pub struct TestSequencer {
url: Url,
handle: Handle,
handle: LaunchedNode,
account: TestAccount,
}

Expand All @@ -45,19 +45,20 @@ impl TestSequencer {
apis: vec![ApiKind::Starknet, ApiKind::Dev, ApiKind::Saya, ApiKind::Torii],
};

let node = katana_node::start(server_config, config, starknet_config)
let node = katana_node::build(server_config, config, starknet_config)
.await
.expect("Failed to build node components");
let handle = node.launch().await.expect("Failed to launch node");

let url = Url::parse(&format!("http://{}", node.rpc.addr)).expect("Failed to parse URL");
let url = Url::parse(&format!("http://{}", handle.rpc.addr)).expect("Failed to parse URL");

let account = node.backend.config.genesis.accounts().next().unwrap();
let account = handle.node.backend.config.genesis.accounts().next().unwrap();
let account = TestAccount {
private_key: Felt::from_bytes_be(&account.1.private_key().unwrap().to_bytes_be()),
account_address: Felt::from_bytes_be(&account.0.to_bytes_be()),
};

TestSequencer { handle: node, account, url }
TestSequencer { handle, account, url }
}

pub fn account(&self) -> SingleOwnerAccount<JsonRpcClient<HttpTransport>, LocalWallet> {
Expand All @@ -79,15 +80,15 @@ impl TestSequencer {
}

pub fn backend(&self) -> &Arc<Backend<BlockifierFactory>> {
&self.handle.backend
&self.handle.node.backend
}

pub fn account_at_index(
&self,
index: usize,
) -> SingleOwnerAccount<JsonRpcClient<HttpTransport>, LocalWallet> {
#[allow(deprecated)]
let accounts: Vec<_> = self.handle.backend.config.genesis.accounts().collect::<_>();
let accounts: Vec<_> = self.handle.node.backend.config.genesis.accounts().collect::<_>();

let account = accounts[index];
let private_key = Felt::from_bytes_be(&account.1.private_key().unwrap().to_bytes_be());
Expand Down
1 change: 1 addition & 0 deletions crates/katana/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ katana-tasks.workspace = true

anyhow.workspace = true
dojo-metrics.workspace = true
futures.workspace = true
hyper.workspace = true
jsonrpsee.workspace = true
num-traits.workspace = true
Expand Down
41 changes: 41 additions & 0 deletions crates/katana/node/src/exit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use anyhow::Result;
use futures::future::BoxFuture;
use futures::FutureExt;

use crate::LaunchedNode;

/// A Future that is resolved once the node has been stopped including all of its running tasks.
#[must_use = "futures do nothing unless polled"]
pub struct NodeStoppedFuture<'a> {
fut: BoxFuture<'a, Result<()>>,
}

impl<'a> NodeStoppedFuture<'a> {
pub(crate) fn new(handle: &'a LaunchedNode) -> Self {
let fut = Box::pin(async {
handle.node.task_manager.wait_for_shutdown().await;
handle.stop().await?;
Ok(())
});
Self { fut }
}
}

impl<'a> Future for NodeStoppedFuture<'a> {
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
this.fut.poll_unpin(cx)
}
}

impl<'a> core::fmt::Debug for NodeStoppedFuture<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("NodeStoppedFuture").field("fut", &"...").finish()
}
}
166 changes: 108 additions & 58 deletions crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

mod exit;

use std::future::IntoFuture;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use dojo_metrics::prometheus_exporter::PrometheusHandle;
use dojo_metrics::{metrics_process, prometheus_exporter, Report};
use hyper::{Method, Uri};
use jsonrpsee::server::middleware::proxy_get_request::ProxyGetRequestLayer;
Expand All @@ -19,6 +22,7 @@ use katana_core::env::BlockContextGenerator;
#[allow(deprecated)]
use katana_core::sequencer::SequencerConfig;
use katana_core::service::block_producer::BlockProducer;
use katana_db::mdbx::DbEnv;
use katana_executor::implementation::blockifier::BlockifierFactory;
use katana_executor::{ExecutorFactory, SimulationFlag};
use katana_pipeline::{stage, Pipeline};
Expand Down Expand Up @@ -49,27 +53,103 @@ use starknet::providers::{JsonRpcClient, Provider};
use tower_http::cors::{AllowOrigin, CorsLayer};
use tracing::{info, trace};

/// A handle to the instantiated Katana node.
use crate::exit::NodeStoppedFuture;

/// A handle to the launched node.
#[allow(missing_debug_implementations)]
pub struct Handle {
pub pool: TxPool,
pub struct LaunchedNode {
pub node: Node,
/// Handle to the rpc server.
pub rpc: RpcServer,
}

impl LaunchedNode {
/// Stops the node.
///
/// This will instruct the node to stop and wait until it has actually stop.
pub async fn stop(&self) -> Result<()> {
// TODO: wait for the rpc server to stop instead of just stopping it.
self.rpc.handle.stop()?;
self.node.task_manager.shutdown().await;
Ok(())
}

/// Returns a future which resolves only when the node has stopped.
pub fn stopped(&self) -> NodeStoppedFuture<'_> {
NodeStoppedFuture::new(self)
}
}

/// A node instance.
///
/// The struct contains the handle to all the components of the node.
#[must_use = "Node does nothing unless launched."]
#[allow(missing_debug_implementations)]
pub struct Node {
pub pool: TxPool,
pub db: Option<DbEnv>,
pub task_manager: TaskManager,
pub prometheus_handle: PrometheusHandle,
pub backend: Arc<Backend<BlockifierFactory>>,
pub block_producer: BlockProducer<BlockifierFactory>,
pub server_config: ServerConfig,
#[allow(deprecated)]
pub sequencer_config: SequencerConfig,
}

impl Handle {
/// Stops the Katana node.
pub async fn stop(self) -> Result<()> {
// TODO: wait for the rpc server to stop
self.rpc.handle.stop()?;
self.task_manager.shutdown().await;
Ok(())
impl Node {
/// Start the node.
///
/// This method will start all the node process, running them until the node is stopped.
pub async fn launch(self) -> Result<LaunchedNode> {
if let Some(addr) = self.server_config.metrics {
let mut reports = Vec::new();
if let Some(ref db) = self.db {
reports.push(Box::new(db.clone()) as Box<dyn Report>);
}

prometheus_exporter::serve(
addr,
self.prometheus_handle.clone(),
metrics_process::Collector::default(),
reports,
)
.await?;

info!(%addr, "Metrics endpoint started.");
}

let pool = self.pool.clone();
let backend = self.backend.clone();
let block_producer = self.block_producer.clone();
let validator = self.block_producer.validator().clone();

// --- build sequencing stage

#[allow(deprecated)]
let sequencing = stage::Sequencing::new(
pool.clone(),
backend.clone(),
self.task_manager.clone(),
block_producer.clone(),
self.sequencer_config.messaging.clone(),
);

// --- build and start the pipeline

let mut pipeline = Pipeline::new();
pipeline.add_stage(Box::new(sequencing));

self.task_manager.spawn(pipeline.into_future());

let node_components = (pool, backend, block_producer, validator);
let rpc = spawn(node_components, self.server_config.clone()).await?;

Ok(LaunchedNode { node: self, rpc })
}
}

/// Build the core Katana components from the given configurations and start running the node.
/// Build the core Katana components from the given configurations.
// TODO: placeholder until we implement a dedicated class that encapsulate building the node
// components
//
Expand All @@ -79,11 +159,15 @@ impl Handle {
//
// NOTE: Don't rely on this function as it is mainly used as a placeholder for now.
#[allow(deprecated)]
pub async fn start(
pub async fn build(
server_config: ServerConfig,
sequencer_config: SequencerConfig,
mut starknet_config: StarknetConfig,
) -> Result<Handle> {
) -> Result<Node> {
// Metrics recorder must be initialized before calling any of the metrics macros, in order
// for it to be registered.
let prometheus_handle = prometheus_exporter::install_recorder("katana")?;

// --- build executor factory

let cfg_env = CfgEnv {
Expand Down Expand Up @@ -189,52 +273,18 @@ pub async fn start(
let validator = block_producer.validator();
let pool = TxPool::new(validator.clone(), FiFo::new());

// --- build metrics service

// Metrics recorder must be initialized before calling any of the metrics macros, in order for
// it to be registered.
if let Some(addr) = server_config.metrics {
let prometheus_handle = prometheus_exporter::install_recorder("katana")?;
let reports = db.map(|db| vec![Box::new(db) as Box<dyn Report>]).unwrap_or_default();

prometheus_exporter::serve(
addr,
prometheus_handle,
metrics_process::Collector::default(),
reports,
)
.await?;

info!(%addr, "Metrics endpoint started.");
}

// --- create a TaskManager using the ambient Tokio runtime

let task_manager = TaskManager::current();

// --- build sequencing stage

let sequencing = stage::Sequencing::new(
pool.clone(),
backend.clone(),
task_manager.clone(),
block_producer.clone(),
sequencer_config.messaging.clone(),
);

// --- build and start the pipeline

let mut pipeline = Pipeline::new();
pipeline.add_stage(Box::new(sequencing));

task_manager.spawn(pipeline.into_future());

// --- spawn rpc server

let node_components = (pool.clone(), backend.clone(), block_producer.clone(), validator);
let rpc = spawn(node_components, server_config).await?;
let node = Node {
db,
pool,
backend,
server_config,
block_producer,
sequencer_config,
prometheus_handle,
task_manager: TaskManager::current(),
};

Ok(Handle { backend, block_producer, pool, rpc, task_manager })
Ok(node)
}

// Moved from `katana_rpc` crate
Expand Down
Loading

1 comment on commit 3e51d5d

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.30.

Benchmark suite Current: 3e51d5d Previous: 9c790a9 Ratio
Concurrent.Simulate/Blockifier.1000 3532479541 ns/iter (± 787487385) 2685904660 ns/iter (± 244740276) 1.32

This comment was automatically generated by workflow using github-action-benchmark.

CC: @kariy @glihm @tarrencev

Please sign in to comment.