Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(katana-node): distinguish between launched node handle #2504

Merged
merged 4 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 16 additions & 9 deletions bin/katana/src/cli/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,23 +225,30 @@ impl NodeArgs {
let sequencer_config = self.sequencer_config();
let starknet_config = self.starknet_config()?;

// build the node and start it
let node = katana_node::start(server_config, sequencer_config, starknet_config).await?;
// Build the node
let node = katana_node::build(server_config, sequencer_config, starknet_config).await?;

if !self.silent {
#[allow(deprecated)]
let genesis = &node.backend.config.genesis;
print_intro(&self, genesis, node.rpc.addr);
let server_address = node.server_config.addr();
print_intro(&self, genesis, &server_address);
kariy marked this conversation as resolved.
Show resolved Hide resolved
}

// Wait until an OS signal is received or TaskManager shutdown
// Launch the node
let handle = node.launch().await.context("failed to launch node")?;

// Wait until an OS signal (ie SIGINT, SIGTERM) is received or the node is shutdown.
tokio::select! {
_ = dojo_utils::signal::wait_signals() => {},
_ = node.task_manager.wait_for_shutdown() => {}
_ = dojo_utils::signal::wait_signals() => {
// Gracefully shutdown the node before exiting
handle.stop().await?;
},

_ = handle.stopped() => { }
}

info!("Shutting down...");
node.stop().await?;
info!("Shutting down.");

Ok(())
}
Expand Down Expand Up @@ -339,7 +346,7 @@ impl NodeArgs {
}
}

fn print_intro(args: &NodeArgs, genesis: &Genesis, address: SocketAddr) {
fn print_intro(args: &NodeArgs, genesis: &Genesis, address: &str) {
let mut accounts = genesis.accounts().peekable();
let account_class_hash = accounts.peek().map(|e| e.1.class_hash());
let seed = &args.starknet.seed;
Expand Down
17 changes: 9 additions & 8 deletions crates/dojo-test-utils/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use katana_core::constants::DEFAULT_SEQUENCER_ADDRESS;
#[allow(deprecated)]
pub use katana_core::sequencer::SequencerConfig;
use katana_executor::implementation::blockifier::BlockifierFactory;
use katana_node::Handle;
use katana_node::LaunchedNode;
use katana_primitives::chain::ChainId;
use katana_rpc::config::ServerConfig;
use katana_rpc_api::ApiKind;
Expand All @@ -29,7 +29,7 @@ pub struct TestAccount {
#[allow(missing_debug_implementations)]
pub struct TestSequencer {
url: Url,
handle: Handle,
handle: LaunchedNode,
account: TestAccount,
}

Expand All @@ -45,19 +45,20 @@ impl TestSequencer {
apis: vec![ApiKind::Starknet, ApiKind::Dev, ApiKind::Saya, ApiKind::Torii],
};

let node = katana_node::start(server_config, config, starknet_config)
let node = katana_node::build(server_config, config, starknet_config)
.await
.expect("Failed to build node components");
let handle = node.launch().await.expect("Failed to launch node");

let url = Url::parse(&format!("http://{}", node.rpc.addr)).expect("Failed to parse URL");
let url = Url::parse(&format!("http://{}", handle.rpc.addr)).expect("Failed to parse URL");

let account = node.backend.config.genesis.accounts().next().unwrap();
let account = handle.node.backend.config.genesis.accounts().next().unwrap();
let account = TestAccount {
private_key: Felt::from_bytes_be(&account.1.private_key().unwrap().to_bytes_be()),
account_address: Felt::from_bytes_be(&account.0.to_bytes_be()),
};

kariy marked this conversation as resolved.
Show resolved Hide resolved
TestSequencer { handle: node, account, url }
TestSequencer { handle, account, url }
}

pub fn account(&self) -> SingleOwnerAccount<JsonRpcClient<HttpTransport>, LocalWallet> {
Expand All @@ -79,15 +80,15 @@ impl TestSequencer {
}

pub fn backend(&self) -> &Arc<Backend<BlockifierFactory>> {
&self.handle.backend
&self.handle.node.backend
kariy marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn account_at_index(
&self,
index: usize,
) -> SingleOwnerAccount<JsonRpcClient<HttpTransport>, LocalWallet> {
#[allow(deprecated)]
let accounts: Vec<_> = self.handle.backend.config.genesis.accounts().collect::<_>();
let accounts: Vec<_> = self.handle.node.backend.config.genesis.accounts().collect::<_>();
kariy marked this conversation as resolved.
Show resolved Hide resolved

let account = accounts[index];
let private_key = Felt::from_bytes_be(&account.1.private_key().unwrap().to_bytes_be());
Expand Down
1 change: 1 addition & 0 deletions crates/katana/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ katana-tasks.workspace = true

anyhow.workspace = true
dojo-metrics.workspace = true
futures.workspace = true
hyper.workspace = true
jsonrpsee.workspace = true
num-traits.workspace = true
Expand Down
41 changes: 41 additions & 0 deletions crates/katana/node/src/exit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use anyhow::Result;
use futures::future::BoxFuture;
use futures::FutureExt;

use crate::LaunchedNode;

/// A Future that is resolved once the node has been stopped including all of its running tasks.
#[must_use = "futures do nothing unless polled"]
pub struct NodeStoppedFuture<'a> {
fut: BoxFuture<'a, Result<()>>,
}

impl<'a> NodeStoppedFuture<'a> {
pub(crate) fn new(handle: &'a LaunchedNode) -> Self {
let fut = Box::pin(async {
handle.node.task_manager.wait_for_shutdown().await;
handle.stop().await?;
Ok(())
});
Self { fut }
}
}

impl<'a> Future for NodeStoppedFuture<'a> {
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
this.fut.poll_unpin(cx)
}
}

impl<'a> core::fmt::Debug for NodeStoppedFuture<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("NodeStoppedFuture").field("fut", &"...").finish()
}
}
kariy marked this conversation as resolved.
Show resolved Hide resolved
164 changes: 106 additions & 58 deletions crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

mod exit;

use std::future::IntoFuture;
use std::net::SocketAddr;
use std::sync::Arc;
Expand All @@ -19,6 +21,7 @@ use katana_core::env::BlockContextGenerator;
#[allow(deprecated)]
use katana_core::sequencer::SequencerConfig;
use katana_core::service::block_producer::BlockProducer;
use katana_db::mdbx::DbEnv;
use katana_executor::implementation::blockifier::BlockifierFactory;
use katana_executor::{ExecutorFactory, SimulationFlag};
use katana_pipeline::{stage, Pipeline};
Expand Down Expand Up @@ -49,27 +52,107 @@ use starknet::providers::{JsonRpcClient, Provider};
use tower_http::cors::{AllowOrigin, CorsLayer};
use tracing::{info, trace};

/// A handle to the instantiated Katana node.
use crate::exit::NodeStoppedFuture;

/// A handle to the launched Katana node.
#[allow(missing_debug_implementations)]
pub struct Handle {
pub pool: TxPool,
pub struct LaunchedNode {
pub node: Node,
/// Handle to the rpc server.
pub rpc: RpcServer,
}

impl LaunchedNode {
/// Stops the node.
///
/// This will instruct the node to stop and wait until it has actually stop.
pub async fn stop(&self) -> Result<()> {
// TODO: wait for the rpc server to stop instead of just stopping it.
kariy marked this conversation as resolved.
Show resolved Hide resolved
self.rpc.handle.stop()?;
self.node.task_manager.shutdown().await;
Ok(())
}

/// Returns a future which resolves only when the node has stopped.
pub fn stopped(&self) -> NodeStoppedFuture<'_> {
NodeStoppedFuture::new(self)
}
Comment on lines +70 to +80
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Great implementation of node shutdown methods!

The stop and stopped methods provide a clean way to manage the node's lifecycle. However, there's room for improvement:

The TODO comment on line 71 suggests waiting for the RPC server to stop instead of just stopping it. This would ensure a more graceful shutdown. Consider implementing this enhancement to improve the robustness of the shutdown process.

Would you like assistance in implementing the logic to wait for the RPC server to fully stop, sensei?

}

/// A node instance.
///
/// The struct contains the handle to all the components of the node.
#[must_use = "Node does nothing unless launched."]
#[allow(missing_debug_implementations)]
pub struct Node {
pub pool: TxPool,
pub db: Option<DbEnv>,
pub task_manager: TaskManager,
pub backend: Arc<Backend<BlockifierFactory>>,
pub block_producer: BlockProducer<BlockifierFactory>,
pub server_config: ServerConfig,
#[allow(deprecated)]
pub sequencer_config: SequencerConfig,
}

impl Handle {
/// Stops the Katana node.
pub async fn stop(self) -> Result<()> {
// TODO: wait for the rpc server to stop
self.rpc.handle.stop()?;
self.task_manager.shutdown().await;
Ok(())
impl Node {
/// Start the node.
///
/// This method will start all the node process, running them until the node is stopped.
pub async fn launch(self) -> Result<LaunchedNode> {
// Metrics recorder must be initialized before calling any of the metrics macros, in order
// for it to be registered.

if let Some(addr) = self.server_config.metrics {
let prometheus_handle = prometheus_exporter::install_recorder("katana")?;
let mut reports = Vec::new();

if let Some(ref db) = self.db {
reports.push(Box::new(db.clone()) as Box<dyn Report>);
}

prometheus_exporter::serve(
addr,
prometheus_handle,
metrics_process::Collector::default(),
reports,
)
.await?;

info!(%addr, "Metrics endpoint started.");
}

let pool = self.pool.clone();
let backend = self.backend.clone();
let block_producer = self.block_producer.clone();
let validator = self.block_producer.validator().clone();

// --- build sequencing stage

#[allow(deprecated)]
let sequencing = stage::Sequencing::new(
pool.clone(),
backend.clone(),
self.task_manager.clone(),
block_producer.clone(),
self.sequencer_config.messaging.clone(),
);

// --- build and start the pipeline

let mut pipeline = Pipeline::new();
pipeline.add_stage(Box::new(sequencing));

self.task_manager.spawn(pipeline.into_future());

let node_components = (pool, backend, block_producer, validator);
let rpc = spawn(node_components, self.server_config.clone()).await?;

Ok(LaunchedNode { node: self, rpc })
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo, sensei! The launch method looks promising!

The Node::launch method does a great job of initializing all the necessary components. However, we can enhance it further with improved error handling and logging:

  1. Consider wrapping each major step (metrics initialization, pipeline building, RPC spawning) in separate functions. This will improve readability and make error handling more granular.

  2. Add more detailed logging throughout the launch process. This will make debugging easier in case of issues.

  3. Use tracing::instrument to automatically log entry and exit of the launch method along with its parameters.

Here's a sketch of how this could look:

use tracing::{info, instrument};

impl Node {
    #[instrument(skip(self), err)]
    pub async fn launch(self) -> Result<LaunchedNode> {
        info!("Starting node launch process");

        self.init_metrics().await?;
        let pipeline = self.build_pipeline()?;
        self.start_pipeline(pipeline);
        let rpc = self.spawn_rpc().await?;

        info!("Node launch completed successfully");
        Ok(LaunchedNode { node: self, rpc })
    }

    async fn init_metrics(&self) -> Result<()> {
        // Metrics initialization logic
    }

    fn build_pipeline(&self) -> Result<Pipeline> {
        // Pipeline building logic
    }

    fn start_pipeline(&self, pipeline: Pipeline) {
        // Pipeline starting logic
    }

    async fn spawn_rpc(&self) -> Result<RpcServer> {
        // RPC spawning logic
    }
}

This structure will make the code more modular and easier to maintain, sensei!

}

/// Build the core Katana components from the given configurations and start running the node.
/// Build the core Katana components from the given configurations.
// TODO: placeholder until we implement a dedicated class that encapsulate building the node
// components
//
Expand All @@ -79,11 +162,11 @@ impl Handle {
//
// NOTE: Don't rely on this function as it is mainly used as a placeholder for now.
#[allow(deprecated)]
pub async fn start(
pub async fn build(
server_config: ServerConfig,
sequencer_config: SequencerConfig,
mut starknet_config: StarknetConfig,
) -> Result<Handle> {
) -> Result<Node> {
// --- build executor factory

let cfg_env = CfgEnv {
Expand Down Expand Up @@ -189,52 +272,17 @@ pub async fn start(
let validator = block_producer.validator();
let pool = TxPool::new(validator.clone(), FiFo::new());

// --- build metrics service

// Metrics recorder must be initialized before calling any of the metrics macros, in order for
// it to be registered.
if let Some(addr) = server_config.metrics {
let prometheus_handle = prometheus_exporter::install_recorder("katana")?;
let reports = db.map(|db| vec![Box::new(db) as Box<dyn Report>]).unwrap_or_default();

prometheus_exporter::serve(
addr,
prometheus_handle,
metrics_process::Collector::default(),
reports,
)
.await?;

info!(%addr, "Metrics endpoint started.");
}

// --- create a TaskManager using the ambient Tokio runtime

let task_manager = TaskManager::current();

// --- build sequencing stage

let sequencing = stage::Sequencing::new(
pool.clone(),
backend.clone(),
task_manager.clone(),
block_producer.clone(),
sequencer_config.messaging.clone(),
);

// --- build and start the pipeline

let mut pipeline = Pipeline::new();
pipeline.add_stage(Box::new(sequencing));

task_manager.spawn(pipeline.into_future());

// --- spawn rpc server

let node_components = (pool.clone(), backend.clone(), block_producer.clone(), validator);
let rpc = spawn(node_components, server_config).await?;
let node = Node {
db,
pool,
backend,
server_config,
block_producer,
sequencer_config,
task_manager: TaskManager::current(),
};

Ok(Handle { backend, block_producer, pool, rpc, task_manager })
Ok(node)
}

// Moved from `katana_rpc` crate
Expand Down
Loading
Loading