diff --git a/Cargo.lock b/Cargo.lock index 153ef76dab..06c2717c54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8176,6 +8176,7 @@ dependencies = [ "katana-pool", "katana-tasks", "thiserror", + "tokio", "tracing", ] diff --git a/bin/katana/src/cli/node.rs b/bin/katana/src/cli/node.rs index f748eb4eb1..5770d72ab5 100644 --- a/bin/katana/src/cli/node.rs +++ b/bin/katana/src/cli/node.rs @@ -232,7 +232,6 @@ impl NodeArgs { let node = katana_node::build(config).await.context("failed to build node")?; if !self.silent { - #[allow(deprecated)] let genesis = &node.backend.chain_spec.genesis; print_intro(&self, genesis); } @@ -256,9 +255,10 @@ impl NodeArgs { } fn init_logging(&self) -> Result<()> { - const DEFAULT_LOG_FILTER: &str = "info,executor=trace,forking::backend=trace,server=debug,\ - katana_core=trace,blockifier=off,jsonrpsee_server=off,\ - hyper=off,messaging=debug,node=error"; + const DEFAULT_LOG_FILTER: &str = "tasks=debug,info,executor=trace,forking::backend=trace,\ + server=debug,katana_core=trace,blockifier=off,\ + jsonrpsee_server=off,hyper=off,messaging=debug,\ + node=error"; LogTracer::init()?; diff --git a/crates/katana/node-bindings/src/lib.rs b/crates/katana/node-bindings/src/lib.rs index 358d653288..8b30bc55d1 100644 --- a/crates/katana/node-bindings/src/lib.rs +++ b/crates/katana/node-bindings/src/lib.rs @@ -622,15 +622,16 @@ mod tests { async fn can_launch_katana() { // this will launch katana with random ports let katana = Katana::new().spawn(); + // assert some default values assert_eq!(katana.accounts().len(), 10); assert_eq!(katana.chain_id(), short_string!("KATANA")); // assert that all accounts have private key assert!(katana.accounts().iter().all(|a| a.private_key.is_some())); - let provider = JsonRpcClient::new(HttpTransport::new(katana.endpoint_url())); - let result = provider.chain_id().await; - assert!(result.is_ok()); + // try to connect as a provider + let provider = JsonRpcClient::new(HttpTransport::new(dbg!(katana.endpoint_url()))); + assert!(provider.chain_id().await.is_ok()) } #[test] diff --git a/crates/katana/node/src/lib.rs b/crates/katana/node/src/lib.rs index cacd8dfd03..7bcac3bfda 100644 --- a/crates/katana/node/src/lib.rs +++ b/crates/katana/node/src/lib.rs @@ -129,11 +129,10 @@ impl Node { // --- build sequencing stage - #[allow(deprecated)] let sequencing = stage::Sequencing::new( pool.clone(), backend.clone(), - self.task_manager.clone(), + self.task_manager.task_spawner(), block_producer.clone(), self.messaging_config.clone(), ); @@ -143,7 +142,12 @@ impl Node { let mut pipeline = Pipeline::new(); pipeline.add_stage(Box::new(sequencing)); - self.task_manager.spawn(pipeline.into_future()); + self.task_manager + .task_spawner() + .build_task() + .critical() + .name("Pipeline") + .spawn(pipeline.into_future()); let node_components = (pool, backend, block_producer, validator); let rpc = spawn(node_components, self.rpc_config.clone()).await?; diff --git a/crates/katana/pipeline/Cargo.toml b/crates/katana/pipeline/Cargo.toml index fc8894e28b..5988c7aa0c 100644 --- a/crates/katana/pipeline/Cargo.toml +++ b/crates/katana/pipeline/Cargo.toml @@ -15,4 +15,5 @@ anyhow.workspace = true async-trait.workspace = true futures.workspace = true thiserror.workspace = true +tokio.workspace = true tracing.workspace = true diff --git a/crates/katana/pipeline/src/lib.rs b/crates/katana/pipeline/src/lib.rs index 287275d581..7850fe4974 100644 --- a/crates/katana/pipeline/src/lib.rs +++ b/crates/katana/pipeline/src/lib.rs @@ -6,7 +6,7 @@ use core::future::IntoFuture; use futures::future::BoxFuture; use stage::Stage; -use tracing::info; +use tracing::{error, info}; /// The result of a pipeline execution. pub type PipelineResult = Result<(), Error>; @@ -46,9 +46,10 @@ impl Pipeline { /// Start the pipeline. pub async fn run(&mut self) -> PipelineResult { for stage in &mut self.stages { - info!(id = %stage.id(), "Executing stage"); + info!(target: "pipeline", id = %stage.id(), "Executing stage."); stage.execute().await?; } + info!(target: "pipeline", "Pipeline finished."); Ok(()) } } @@ -58,7 +59,11 @@ impl IntoFuture for Pipeline { type IntoFuture = PipelineFut; fn into_future(mut self) -> Self::IntoFuture { - Box::pin(async move { self.run().await }) + Box::pin(async move { + self.run().await.inspect_err(|error| { + error!(target: "pipeline", %error, "Pipeline failed."); + }) + }) } } diff --git a/crates/katana/pipeline/src/stage/sequencing.rs b/crates/katana/pipeline/src/stage/sequencing.rs index 958e21792a..6eae240112 100644 --- a/crates/katana/pipeline/src/stage/sequencing.rs +++ b/crates/katana/pipeline/src/stage/sequencing.rs @@ -8,7 +8,8 @@ use katana_core::service::messaging::{MessagingConfig, MessagingService, Messagi use katana_core::service::{BlockProductionTask, TransactionMiner}; use katana_executor::ExecutorFactory; use katana_pool::{TransactionPool, TxPool}; -use katana_tasks::TaskManager; +use katana_tasks::{TaskHandle, TaskSpawner}; +use tracing::error; use super::{StageId, StageResult}; use crate::Stage; @@ -18,7 +19,7 @@ use crate::Stage; pub struct Sequencing { pool: TxPool, backend: Arc>, - task_manager: TaskManager, + task_spawner: TaskSpawner, block_producer: BlockProducer, messaging_config: Option, } @@ -27,14 +28,14 @@ impl Sequencing { pub fn new( pool: TxPool, backend: Arc>, - task_manager: TaskManager, + task_spawner: TaskSpawner, block_producer: BlockProducer, messaging_config: Option, ) -> Self { - Self { pool, backend, task_manager, block_producer, messaging_config } + Self { pool, backend, task_spawner, block_producer, messaging_config } } - async fn run_messaging(&self) -> Result<()> { + async fn run_messaging(&self) -> Result> { if let Some(config) = &self.messaging_config { let config = config.clone(); let pool = self.pool.clone(); @@ -42,26 +43,22 @@ impl Sequencing { let service = MessagingService::new(config, pool, backend).await?; let task = MessagingTask::new(service); - self.task_manager.build_task().critical().name("Messaging").spawn(task); + + let handle = self.task_spawner.build_task().name("Messaging").spawn(task); + Ok(handle) } else { - // this will create a future that will never resolve - self.task_manager - .build_task() - .critical() - .name("Messaging") - .spawn(future::pending::<()>()); + let handle = self.task_spawner.build_task().spawn(future::pending::<()>()); + Ok(handle) } - - Ok(()) } - async fn run_block_production(&self) { + fn run_block_production(&self) -> TaskHandle<()> { let pool = self.pool.clone(); let miner = TransactionMiner::new(pool.add_listener()); let block_producer = self.block_producer.clone(); let service = BlockProductionTask::new(pool, miner, block_producer); - self.task_manager.build_task().critical().name("Block production").spawn(service); + self.task_spawner.build_task().name("Block production").spawn(service) } } @@ -71,9 +68,25 @@ impl Stage for Sequencing { StageId::Sequencing } + #[tracing::instrument(skip(self), name = "Stage", fields(id = %self.id()))] async fn execute(&mut self) -> StageResult { - let _ = self.run_messaging().await?; - let _ = self.run_block_production().await; - future::pending::().await + // Build the messaging and block production tasks. + let messaging = self.run_messaging().await?; + let block_production = self.run_block_production(); + + // Neither of these tasks should complete as they are meant to be run forever, + // but if either of them do complete, the sequencing stage should return. + // + // Select on the tasks completion to prevent the task from failing silently (if any). + tokio::select! { + res = messaging => { + error!(target: "pipeline", reason = ?res, "Messaging task finished unexpectedly."); + }, + res = block_production => { + error!(target: "pipeline", reason = ?res, "Block production task finished unexpectedly."); + } + } + + Ok(()) } } diff --git a/crates/katana/tasks/src/lib.rs b/crates/katana/tasks/src/lib.rs index 459427c783..13368cca9c 100644 --- a/crates/katana/tasks/src/lib.rs +++ b/crates/katana/tasks/src/lib.rs @@ -22,6 +22,7 @@ use tokio::task::JoinHandle; #[error("Failed to initialize task spawner: {0}")] pub struct TaskSpawnerInitError(tokio::runtime::TryCurrentError); +// TODO: replace this with TaskSpawner in manager.rs /// A task spawner for spawning tasks on a tokio runtime. This is simple wrapper around a tokio's /// runtime [Handle] to easily spawn tasks on the runtime. /// diff --git a/crates/katana/tasks/src/manager.rs b/crates/katana/tasks/src/manager.rs index 25bd33fb4f..4abf7910a8 100644 --- a/crates/katana/tasks/src/manager.rs +++ b/crates/katana/tasks/src/manager.rs @@ -1,6 +1,7 @@ use core::future::Future; use core::pin::Pin; use core::task::{Context, Poll}; +use std::sync::Arc; use futures::future::BoxFuture; use futures::FutureExt; @@ -9,6 +10,7 @@ use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; pub use tokio_util::sync::WaitForCancellationFuture as WaitForShutdownFuture; use tokio_util::task::TaskTracker; +use tracing::trace; use crate::task::{TaskBuilder, TaskResult}; @@ -17,8 +19,22 @@ pub type TaskHandle = JoinHandle>; /// Usage for this task manager is mainly to spawn tasks that can be cancelled, and captures /// panicked tasks (which in the context of the task manager - a critical task) for graceful /// shutdown. -#[derive(Debug, Clone)] +/// +/// # Spawning tasks +/// +/// To spawn tasks on the manager, call [`TaskManager::task_spawner`] to get a [`TaskSpawner`] +/// instance. The [`TaskSpawner`] can then be used to spawn tasks on the manager. +/// +/// # Tasks cancellation +/// +/// When the manager is dropped, all tasks that have yet to complete will be cancelled. +#[derive(Debug)] pub struct TaskManager { + inner: Arc, +} + +#[derive(Debug)] +struct Inner { /// A handle to the Tokio runtime. handle: Handle, /// Keep track of currently running tasks. @@ -26,30 +42,34 @@ pub struct TaskManager { /// Used to cancel all running tasks. /// /// This is passed to all the tasks spawned by the manager. - pub(crate) on_cancel: CancellationToken, + on_cancel: CancellationToken, } impl TaskManager { /// Create a new [`TaskManager`] from the given Tokio runtime handle. pub fn new(handle: Handle) -> Self { - Self { handle, tracker: TaskTracker::new(), on_cancel: CancellationToken::new() } + Self { + inner: Arc::new(Inner { + handle, + tracker: TaskTracker::new(), + on_cancel: CancellationToken::new(), + }), + } } + /// Create a new [`TaskManager`] from the ambient Tokio runtime. pub fn current() -> Self { Self::new(Handle::current()) } - pub fn spawn(&self, fut: F) -> TaskHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - self.spawn_inner(fut) + /// Returns a [`TaskSpawner`] that can be used to spawn tasks on the manager. + pub fn task_spawner(&self) -> TaskSpawner { + TaskSpawner { inner: Arc::clone(&self.inner) } } /// Returns a future that can be awaited for the shutdown signal to be received. pub fn wait_for_shutdown(&self) -> WaitForShutdownFuture<'_> { - self.on_cancel.cancelled() + self.inner.on_cancel.cancelled() } /// Shuts down the manager and wait until all currently running tasks are finished, either due @@ -58,15 +78,15 @@ impl TaskManager { /// No task can be spawned on the manager after this method is called. pub fn shutdown(&self) -> ShutdownFuture<'_> { let fut = Box::pin(async { - if !self.on_cancel.is_cancelled() { - self.on_cancel.cancel(); + if !self.inner.on_cancel.is_cancelled() { + self.inner.on_cancel.cancel(); } self.wait_for_shutdown().await; // need to close the tracker first before waiting - let _ = self.tracker.close(); - self.tracker.wait().await; + let _ = self.inner.tracker.close(); + self.inner.tracker.wait().await; }); ShutdownFuture { fut } @@ -74,22 +94,46 @@ impl TaskManager { /// Return the handle to the Tokio runtime that the manager is associated with. pub fn handle(&self) -> &Handle { - &self.handle - } - - /// Returns a new [`TaskBuilder`] for building a task to be spawned on this manager. - pub fn build_task(&self) -> TaskBuilder<'_> { - TaskBuilder::new(self) + &self.inner.handle } /// Wait until all spawned tasks are completed. #[cfg(test)] async fn wait(&self) { // need to close the tracker first before waiting - let _ = self.tracker.close(); - self.tracker.wait().await; + let _ = self.inner.tracker.close(); + self.inner.tracker.wait().await; // reopen the tracker for spawning future tasks - let _ = self.tracker.reopen(); + let _ = self.inner.tracker.reopen(); + } +} + +/// A spawner for spawning tasks on the [`TaskManager`] that it was derived from. +/// +/// This is the main way to spawn tasks on a [`TaskManager`]. It can only be created +/// by calling [`TaskManager::task_spawner`]. +#[derive(Debug, Clone)] +pub struct TaskSpawner { + /// A handle to the [`TaskManager`] that this spawner is associated with. + inner: Arc, +} + +impl TaskSpawner { + /// Returns a new [`TaskBuilder`] for building a task. + pub fn build_task(&self) -> TaskBuilder<'_> { + TaskBuilder::new(self) + } + + pub(crate) fn spawn(&self, fut: F) -> TaskHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.spawn_inner(fut) + } + + pub(crate) fn cancellation_token(&self) -> &CancellationToken { + &self.inner.on_cancel } fn spawn_inner(&self, task: F) -> TaskHandle @@ -98,15 +142,15 @@ impl TaskManager { F::Output: Send + 'static, { let task = self.make_cancellable(task); - let task = self.tracker.track_future(task); - self.handle.spawn(task) + let task = self.inner.tracker.track_future(task); + self.inner.handle.spawn(task) } fn make_cancellable(&self, fut: F) -> impl Future> where F: Future, { - let ct = self.on_cancel.clone(); + let ct = self.inner.on_cancel.clone(); async move { tokio::select! { _ = ct.cancelled() => { @@ -122,7 +166,8 @@ impl TaskManager { impl Drop for TaskManager { fn drop(&mut self) { - self.on_cancel.cancel(); + trace!(target: "tasks", "Task manager is dropped, cancelling all ongoing tasks."); + self.inner.on_cancel.cancel(); } } @@ -156,19 +201,20 @@ mod tests { #[tokio::test] async fn normal_tasks() { let manager = TaskManager::current(); + let spawner = manager.task_spawner(); - manager.spawn(time::sleep(Duration::from_secs(1))); - manager.spawn(time::sleep(Duration::from_secs(1))); - manager.spawn(time::sleep(Duration::from_secs(1))); + spawner.build_task().spawn(time::sleep(Duration::from_secs(1))); + spawner.build_task().spawn(time::sleep(Duration::from_secs(1))); + spawner.build_task().spawn(time::sleep(Duration::from_secs(1))); // 3 tasks should be spawned on the manager - assert_eq!(manager.tracker.len(), 3); + assert_eq!(manager.inner.tracker.len(), 3); // wait until all task spawned to the manager have been completed manager.wait().await; assert!( - !manager.on_cancel.is_cancelled(), + !manager.inner.on_cancel.is_cancelled(), "cancellation signal shouldn't be sent on normal task completion" ) } @@ -176,26 +222,27 @@ mod tests { #[tokio::test] async fn task_with_graceful_shutdown() { let manager = TaskManager::current(); + let spawner = manager.task_spawner(); // mock long running normal task and a task with graceful shutdown - manager.build_task().spawn(async { + spawner.build_task().spawn(async { loop { time::sleep(Duration::from_secs(1)).await } }); - manager.build_task().spawn(async { + spawner.build_task().spawn(async { loop { time::sleep(Duration::from_secs(1)).await } }); // assert that 2 tasks should've been spawned - assert_eq!(manager.tracker.len(), 2); + assert_eq!(manager.inner.tracker.len(), 2); // Spawn a task with graceful shuwdown that finish immediately. // The long running task should be cancelled due to the graceful shutdown. - manager.build_task().graceful_shutdown().spawn(future::ready(())); + spawner.build_task().graceful_shutdown().spawn(future::ready(())); // wait until all task spawned to the manager have been completed manager.shutdown().await; @@ -204,14 +251,14 @@ mod tests { #[tokio::test] async fn critical_task_implicit_graceful_shutdown() { let manager = TaskManager::current(); - manager.build_task().critical().spawn(future::ready(())); + manager.task_spawner().build_task().critical().spawn(future::ready(())); manager.shutdown().await; } #[tokio::test] async fn critical_task_graceful_shudown_on_panicked() { let manager = TaskManager::current(); - manager.build_task().critical().spawn(async { panic!("panicking") }); + manager.task_spawner().build_task().critical().spawn(async { panic!("panicking") }); manager.shutdown().await; } } diff --git a/crates/katana/tasks/src/task.rs b/crates/katana/tasks/src/task.rs index 1e5cdb813a..22f8a24301 100644 --- a/crates/katana/tasks/src/task.rs +++ b/crates/katana/tasks/src/task.rs @@ -6,9 +6,10 @@ use futures::future::Either; use futures::{FutureExt, TryFutureExt}; use thiserror::Error; use tokio_metrics::TaskMonitor; -use tracing::error; +use tracing::{debug, error}; -use crate::manager::{TaskHandle, TaskManager}; +use crate::manager::TaskHandle; +use crate::TaskSpawner; /// A task result that can be either completed or cancelled. #[derive(Debug, Copy, Clone)] @@ -32,7 +33,7 @@ impl TaskResult { #[derive(Debug)] pub struct TaskBuilder<'a> { /// The task manager that the task will be spawned on. - manager: &'a TaskManager, + spawner: &'a TaskSpawner, /// The name of the task. name: Option, /// Indicates whether the task should be instrumented. @@ -44,8 +45,8 @@ pub struct TaskBuilder<'a> { impl<'a> TaskBuilder<'a> { /// Creates a new task builder associated with the given task manager. - pub(crate) fn new(manager: &'a TaskManager) -> Self { - Self { manager, name: None, instrument: false, graceful_shutdown: false } + pub(crate) fn new(spawner: &'a TaskSpawner) -> Self { + Self { spawner, name: None, instrument: false, graceful_shutdown: false } } pub fn critical(self) -> CriticalTaskBuilder<'a> { @@ -76,14 +77,15 @@ impl<'a> TaskBuilder<'a> { F: Future + Send + 'static, F::Output: Send + 'static, { - let Self { manager, instrument, graceful_shutdown, .. } = self; + let Self { spawner, instrument, graceful_shutdown, name } = self; // creates a future that will send a cancellation signal to the manager when the future is // completed, regardless of success or error. let fut = { - let ct = manager.on_cancel.clone(); + let ct = spawner.cancellation_token().clone(); fut.map(move |res| { if graceful_shutdown { + debug!(target: "tasks", task = name, "Task with graceful shutdown completed."); ct.cancel(); } res @@ -98,7 +100,7 @@ impl<'a> TaskBuilder<'a> { Either::Right(fut) }; - manager.spawn(fut) + spawner.spawn(fut) } } @@ -125,8 +127,8 @@ impl<'a> CriticalTaskBuilder<'a> { where F: Future + Send + 'static, { - let task_name = self.builder.name.clone().unwrap_or("unnamed".to_string()); - let ct = self.builder.manager.on_cancel.clone(); + let task_name = self.builder.name.clone().unwrap_or("".to_string()); + let ct = self.builder.spawner.cancellation_token().clone(); let fut = AssertUnwindSafe(fut) .catch_unwind()