Skip to content

Commit

Permalink
separate task spawning from task manager
Browse files Browse the repository at this point in the history
  • Loading branch information
kariy committed Oct 10, 2024
1 parent 5ab6cde commit 0d77872
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 77 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions bin/katana/src/cli/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ impl NodeArgs {
let node = katana_node::build(config).await.context("failed to build node")?;

if !self.silent {
#[allow(deprecated)]
let genesis = &node.backend.chain_spec.genesis;
let server_address = node.rpc_config.socket_addr();
print_intro(&self, genesis, &server_address);
Expand All @@ -257,9 +256,10 @@ impl NodeArgs {
}

fn init_logging(&self) -> Result<()> {
const DEFAULT_LOG_FILTER: &str = "info,executor=trace,forking::backend=trace,server=debug,\
katana_core=trace,blockifier=off,jsonrpsee_server=off,\
hyper=off,messaging=debug,node=error";
const DEFAULT_LOG_FILTER: &str = "tasks=debug,info,executor=trace,forking::backend=trace,\
server=debug,katana_core=trace,blockifier=off,\
jsonrpsee_server=off,hyper=off,messaging=debug,\
node=error";

LogTracer::init()?;

Expand Down
10 changes: 7 additions & 3 deletions crates/katana/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,10 @@ impl Node {

// --- build sequencing stage

#[allow(deprecated)]
let sequencing = stage::Sequencing::new(
pool.clone(),
backend.clone(),
self.task_manager.clone(),
self.task_manager.task_spawner(),
block_producer.clone(),
self.messaging_config.clone(),
);
Expand All @@ -143,7 +142,12 @@ impl Node {
let mut pipeline = Pipeline::new();
pipeline.add_stage(Box::new(sequencing));

self.task_manager.spawn(pipeline.into_future());
self.task_manager
.task_spawner()
.build_task()
.critical()
.name("Pipeline")
.spawn(pipeline.into_future());

let node_components = (pool, backend, block_producer, validator);
let rpc = spawn(node_components, self.rpc_config.clone()).await?;
Expand Down
1 change: 1 addition & 0 deletions crates/katana/pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ anyhow.workspace = true
async-trait.workspace = true
futures.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
11 changes: 8 additions & 3 deletions crates/katana/pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use core::future::IntoFuture;

use futures::future::BoxFuture;
use stage::Stage;
use tracing::info;
use tracing::{error, info};

/// The result of a pipeline execution.
pub type PipelineResult = Result<(), Error>;
Expand Down Expand Up @@ -46,9 +46,10 @@ impl Pipeline {
/// Start the pipeline.
pub async fn run(&mut self) -> PipelineResult {
for stage in &mut self.stages {
info!(id = %stage.id(), "Executing stage");
info!(target: "pipeline", id = %stage.id(), "Executing stage.");
stage.execute().await?;
}
info!(target: "pipeline", "Pipeline finished.");
Ok(())
}
}
Expand All @@ -58,7 +59,11 @@ impl IntoFuture for Pipeline {
type IntoFuture = PipelineFut;

fn into_future(mut self) -> Self::IntoFuture {
Box::pin(async move { self.run().await })
Box::pin(async move {
self.run().await.inspect_err(|error| {
error!(target: "pipeline", %error, "Pipeline failed.");
})
})
}
}

Expand Down
51 changes: 32 additions & 19 deletions crates/katana/pipeline/src/stage/sequencing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use katana_core::service::messaging::{MessagingConfig, MessagingService, Messagi
use katana_core::service::{BlockProductionTask, TransactionMiner};
use katana_executor::ExecutorFactory;
use katana_pool::{TransactionPool, TxPool};
use katana_tasks::TaskManager;
use katana_tasks::{TaskHandle, TaskSpawner};
use tracing::error;

use super::{StageId, StageResult};
use crate::Stage;
Expand All @@ -18,7 +19,7 @@ use crate::Stage;
pub struct Sequencing<EF: ExecutorFactory> {
pool: TxPool,
backend: Arc<Backend<EF>>,
task_manager: TaskManager,
task_spawner: TaskSpawner,
block_producer: BlockProducer<EF>,
messaging_config: Option<MessagingConfig>,
}
Expand All @@ -27,41 +28,37 @@ impl<EF: ExecutorFactory> Sequencing<EF> {
pub fn new(
pool: TxPool,
backend: Arc<Backend<EF>>,
task_manager: TaskManager,
task_spawner: TaskSpawner,
block_producer: BlockProducer<EF>,
messaging_config: Option<MessagingConfig>,
) -> Self {
Self { pool, backend, task_manager, block_producer, messaging_config }
Self { pool, backend, task_spawner, block_producer, messaging_config }
}

async fn run_messaging(&self) -> Result<()> {
async fn run_messaging(&self) -> Result<TaskHandle<()>> {
if let Some(config) = &self.messaging_config {
let config = config.clone();
let pool = self.pool.clone();
let backend = self.backend.clone();

let service = MessagingService::new(config, pool, backend).await?;
let task = MessagingTask::new(service);
self.task_manager.build_task().critical().name("Messaging").spawn(task);

let handle = self.task_spawner.build_task().name("Messaging").spawn(task);
Ok(handle)
} else {
// this will create a future that will never resolve
self.task_manager
.build_task()
.critical()
.name("Messaging")
.spawn(future::pending::<()>());
let handle = self.task_spawner.build_task().spawn(future::pending::<()>());
Ok(handle)
}

Ok(())
}

async fn run_block_production(&self) {
fn run_block_production(&self) -> TaskHandle<()> {
let pool = self.pool.clone();
let miner = TransactionMiner::new(pool.add_listener());
let block_producer = self.block_producer.clone();

let service = BlockProductionTask::new(pool, miner, block_producer);
self.task_manager.build_task().critical().name("Block production").spawn(service);
self.task_spawner.build_task().name("Block production").spawn(service)
}
}

Expand All @@ -71,9 +68,25 @@ impl<EF: ExecutorFactory> Stage for Sequencing<EF> {
StageId::Sequencing
}

#[tracing::instrument(skip(self), name = "Stage", fields(id = %self.id()))]
async fn execute(&mut self) -> StageResult {
let _ = self.run_messaging().await?;
let _ = self.run_block_production().await;
future::pending::<StageResult>().await
// Build the messaging and block production tasks.
let messaging = self.run_messaging().await?;
let block_production = self.run_block_production();

// Neither of these tasks should complete as they are meant to be run forever,
// but if either of them do complete, the sequencing stage should return.
//
// Select on the tasks completion to prevent the task from failing silently (if any).
tokio::select! {
res = messaging => {
error!(target: "pipeline", reason = ?res, "Messaging task finished unexpectedly.");
},
res = block_production => {
error!(target: "pipeline", reason = ?res, "Block production task finished unexpectedly.");
}
}

Ok(())
}
}
1 change: 1 addition & 0 deletions crates/katana/tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tokio::task::JoinHandle;
#[error("Failed to initialize task spawner: {0}")]
pub struct TaskSpawnerInitError(tokio::runtime::TryCurrentError);

// TODO: replace this with TaskSpawner in manager.rs
/// A task spawner for spawning tasks on a tokio runtime. This is simple wrapper around a tokio's
/// runtime [Handle] to easily spawn tasks on the runtime.
///
Expand Down
Loading

0 comments on commit 0d77872

Please sign in to comment.