From e16b2fc80a4b95d138e6c3bc4c477490e5e74470 Mon Sep 17 00:00:00 2001 From: Pedro Nauck Date: Mon, 27 Jan 2025 20:43:46 -0300 Subject: [PATCH] fix(publisher): Recover mechanism for tx status none --- Cargo.lock | 14 +- crates/core/README.md | 3 +- crates/core/src/stream/fuel_streams.rs | 8 +- crates/core/src/stream/stream_impl.rs | 12 +- crates/domains/src/msg_payload.rs | 15 - crates/message-broker/src/lib.rs | 2 + crates/message-broker/src/msg_broker.rs | 100 +----- crates/message-broker/src/nats.rs | 306 +++++++----------- crates/message-broker/src/nats_queue.rs | 166 ++++++++++ crates/types/src/fuel_core.rs | 17 +- crates/web-utils/Cargo.toml | 1 + crates/web-utils/src/lib.rs | 1 + .../middlewares/api_key/api_key_impl.rs | 2 +- crates/web-utils/src/shutdown.rs | 4 +- crates/web-utils/src/tracing.rs | 18 ++ services/consumer/Cargo.toml | 1 - .../consumer/src/executor/block_executor.rs | 9 +- services/consumer/src/main.rs | 31 +- services/consumer/src/server/mod.rs | 6 +- services/consumer/src/server/state.rs | 6 +- services/publisher/src/error.rs | 4 + services/publisher/src/main.rs | 27 +- services/publisher/src/publish.rs | 13 +- services/publisher/src/recover.rs | 96 ++++-- services/publisher/src/state.rs | 6 +- services/webserver/src/server/state.rs | 9 +- tests/src/lib.rs | 17 +- tests/tests/services/consumer.rs | 15 +- tests/tests/stream/live_data.rs | 9 +- 29 files changed, 472 insertions(+), 446 deletions(-) create mode 100644 crates/message-broker/src/nats_queue.rs create mode 100644 crates/web-utils/src/tracing.rs diff --git a/Cargo.lock b/Cargo.lock index 6960d3b3..82d1316c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4111,6 +4111,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-actix-web", + "tracing-subscriber", "url", "urlencoding", ] @@ -6417,9 +6418,9 @@ dependencies = [ [[package]] name = "native-tls" -version = "0.2.12" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +checksum = "0dab59f8e050d5df8e4dd87d9206fb6f65a483e20ac9fda365ade4fab353196c" dependencies = [ "libc", "log", @@ -6768,9 +6769,9 @@ dependencies = [ [[package]] name = "openssl" -version = "0.10.68" +version = "0.10.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" +checksum = "f5e534d133a060a3c19daec1eb3e98ec6f4685978834f2dbadfe2ec215bab64e" dependencies = [ "bitflags 2.8.0", "cfg-if", @@ -8276,9 +8277,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.10.1" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2bf47e6ff922db3825eb750c4e2ff784c6ff8fb9e13046ef6a1d1c5401b0b37" +checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" dependencies = [ "web-time", ] @@ -9240,7 +9241,6 @@ dependencies = [ "tokio", "tokio-util", "tracing", - "tracing-subscriber", ] [[package]] diff --git a/crates/core/README.md b/crates/core/README.md index 5742a2a8..077f3033 100644 --- a/crates/core/README.md +++ b/crates/core/README.md @@ -63,8 +63,7 @@ use futures::StreamExt; async fn main() -> anyhow::Result<()> { // Connect to NATS server let db = Db::new(DbConnectionOpts::default()).await?; - let broker = MessageBrokerClient::Nats.start("nats://localhost:4222").await?; - broker.setup().await?; + let broker = NatsMessageBroker::setup("nats://localhost:4222", None).await?; // Create or get existing stream for blocks let stream = Stream::::get_or_init(&broker, &db.arc()).await; diff --git a/crates/core/src/stream/fuel_streams.rs b/crates/core/src/stream/fuel_streams.rs index d4bdffa5..23879a3b 100644 --- a/crates/core/src/stream/fuel_streams.rs +++ b/crates/core/src/stream/fuel_streams.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use fuel_message_broker::MessageBroker; +use fuel_message_broker::NatsMessageBroker; use fuel_streams_store::db::Db; use super::Stream; @@ -14,12 +14,12 @@ pub struct FuelStreams { pub outputs: Stream, pub receipts: Stream, pub utxos: Stream, - pub msg_broker: Arc, + pub msg_broker: Arc, pub db: Arc, } impl FuelStreams { - pub async fn new(broker: &Arc, db: &Arc) -> Self { + pub async fn new(broker: &Arc, db: &Arc) -> Self { Self { blocks: Stream::::get_or_init(broker, db).await, transactions: Stream::::get_or_init(broker, db).await, @@ -36,7 +36,7 @@ impl FuelStreams { Arc::new(self.clone()) } - pub fn broker(&self) -> Arc { + pub fn broker(&self) -> Arc { self.msg_broker.clone() } } diff --git a/crates/core/src/stream/stream_impl.rs b/crates/core/src/stream/stream_impl.rs index e36011f6..c4844d4a 100644 --- a/crates/core/src/stream/stream_impl.rs +++ b/crates/core/src/stream/stream_impl.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; pub use async_nats::Subscriber as StreamLiveSubscriber; -use fuel_message_broker::MessageBroker; +use fuel_message_broker::NatsMessageBroker; use fuel_streams_macros::subject::IntoSubject; use fuel_streams_store::{db::Db, record::Record, store::Store}; use futures::{ @@ -19,7 +19,7 @@ pub type BoxedStream = Box + Send + Unpin>; #[derive(Debug, Clone)] pub struct Stream { store: Arc>, - broker: Arc, + broker: Arc, _marker: std::marker::PhantomData, } @@ -28,7 +28,7 @@ impl Stream { const INSTANCE: OnceCell = OnceCell::const_new(); pub async fn get_or_init( - broker: &Arc, + broker: &Arc, db: &Arc, ) -> Self { let cell = Self::INSTANCE; @@ -37,7 +37,7 @@ impl Stream { .to_owned() } - pub async fn new(broker: &Arc, db: &Arc) -> Self { + pub async fn new(broker: &Arc, db: &Arc) -> Self { let store = Arc::new(Store::new(db)); let broker = Arc::clone(broker); Self { @@ -62,7 +62,7 @@ impl Stream { payload: bytes::Bytes, ) -> Result<(), StreamError> { let broker = self.broker.clone(); - broker.publish_event(subject, payload).await?; + broker.publish(subject, payload).await?; Ok(()) } @@ -83,7 +83,7 @@ impl Stream { sleep(Duration::from_millis(throttle_time as u64)).await; } } - let mut live = broker.subscribe_to_events(&subject.parse()).await?; + let mut live = broker.subscribe(&subject.parse()).await?; while let Some(msg) = live.next().await { yield msg?; let throttle_time = *config::STREAM_THROTTLE_LIVE; diff --git a/crates/domains/src/msg_payload.rs b/crates/domains/src/msg_payload.rs index b06941ff..92dbdc1a 100644 --- a/crates/domains/src/msg_payload.rs +++ b/crates/domains/src/msg_payload.rs @@ -104,21 +104,6 @@ impl MsgPayload { .collect::>() } - pub fn message_id(&self) -> String { - let height = self.metadata.block_height.clone(); - format!("block_{height}") - } - - pub fn subject(&self) -> String { - let producer = self.metadata.block_producer.clone(); - let height = self.metadata.block_height.clone(); - format!("{}.{producer}.{height}", Self::subject_name()) - } - - pub fn subject_name() -> &'static str { - "block_submitted" - } - pub fn metadata(&self) -> &Metadata { &self.metadata } diff --git a/crates/message-broker/src/lib.rs b/crates/message-broker/src/lib.rs index 41909706..5794d85e 100644 --- a/crates/message-broker/src/lib.rs +++ b/crates/message-broker/src/lib.rs @@ -2,7 +2,9 @@ mod msg_broker; mod nats; pub mod nats_metrics; mod nats_opts; +mod nats_queue; pub use msg_broker::*; pub use nats::*; pub use nats_opts::*; +pub use nats_queue::*; diff --git a/crates/message-broker/src/msg_broker.rs b/crates/message-broker/src/msg_broker.rs index 4ad57227..4dfa8c3c 100644 --- a/crates/message-broker/src/msg_broker.rs +++ b/crates/message-broker/src/msg_broker.rs @@ -1,4 +1,4 @@ -use std::{fmt, sync::Arc}; +use std::fmt; use async_trait::async_trait; use futures::Stream; @@ -56,6 +56,12 @@ pub enum MessageBrokerError { Serde(#[from] serde_json::Error), #[error(transparent)] Other(#[from] Box), + #[error(transparent)] + NatsSubscribe(#[from] async_nats::client::SubscribeError), + #[error(transparent)] + NatsPublish( + #[from] async_nats::error::Error, + ), } #[async_trait] @@ -76,95 +82,3 @@ pub type MessageStream = Box< + Send + Unpin, >; - -#[async_trait] -pub trait MessageBroker: std::fmt::Debug + Send + Sync + 'static { - /// Get the current namespace - fn namespace(&self) -> &Namespace; - - /// Setup required infrastructure (queues, exchanges, etc) - async fn setup(&self) -> Result<(), MessageBrokerError>; - - /// Check if the broker is connected - fn is_connected(&self) -> bool; - - /// Publish a block to the work queue for processing - /// Used by publisher to send blocks to consumers - async fn publish_block( - &self, - id: String, - payload: Vec, - ) -> Result<(), MessageBrokerError>; - - /// Receive a stream of blocks from the work queue - /// Used by consumer to process blocks - async fn receive_blocks_stream( - &self, - batch_size: usize, - ) -> Result; - - /// Publish an event to a topic for subscribers - /// Used by Stream implementation for pub/sub - async fn publish_event( - &self, - topic: &str, - payload: bytes::Bytes, - ) -> Result<(), MessageBrokerError>; - - /// Subscribe to events on a topic - /// Used by Stream implementation for pub/sub - async fn subscribe_to_events( - &self, - topic: &str, - ) -> Result; - - /// Flush all in-flight messages - async fn flush(&self) -> Result<(), MessageBrokerError>; - - /// Check if the broker is healthy - async fn is_healthy(&self) -> bool; - - /// Get health info - async fn get_health_info( - &self, - uptime_secs: u64, - ) -> Result; -} - -#[derive(Debug, Clone, Default)] -pub enum MessageBrokerClient { - #[default] - Nats, -} - -impl MessageBrokerClient { - pub async fn start( - &self, - url: &str, - ) -> Result, MessageBrokerError> { - match self { - MessageBrokerClient::Nats => { - let opts = crate::NatsOpts::new(url.to_string()); - let broker = crate::NatsMessageBroker::new(&opts).await?; - broker.setup().await?; - Ok(broker.arc()) - } - } - } - - pub async fn start_with_namespace( - &self, - url: &str, - namespace: &str, - ) -> Result, MessageBrokerError> { - match self { - MessageBrokerClient::Nats => { - let opts = crate::NatsOpts::new(url.to_string()) - .with_namespace(namespace); - let broker = crate::NatsMessageBroker::new(&opts).await?; - broker.setup().await?; - Ok(broker.arc()) - } - } - } -} diff --git a/crates/message-broker/src/nats.rs b/crates/message-broker/src/nats.rs index 7c65dc6d..e2f1b57d 100644 --- a/crates/message-broker/src/nats.rs +++ b/crates/message-broker/src/nats.rs @@ -1,14 +1,6 @@ -use std::{sync::Arc, time::Duration}; - -use async_nats::{ - jetstream::{ - consumer::{pull::Config as ConsumerConfig, AckPolicy, PullConsumer}, - context::Publish, - stream::{Config as StreamConfig, RetentionPolicy}, - Context, - }, - Client, -}; +use std::sync::Arc; + +use async_nats::{jetstream::Context, Client}; use async_trait::async_trait; use futures::StreamExt; use tracing::info; @@ -16,16 +8,15 @@ use tracing::info; use crate::{ nats_metrics::{NatsHealthInfo, StreamInfo}, Message, - MessageBlockStream, - MessageBroker, MessageBrokerError, MessageStream, Namespace, NatsOpts, + NatsQueue, }; #[derive(Debug)] -pub struct NatsMessage(async_nats::jetstream::Message); +pub struct NatsMessage(pub async_nats::jetstream::Message); #[derive(Debug, Clone)] pub struct NatsMessageBroker { @@ -36,10 +27,7 @@ pub struct NatsMessageBroker { } impl NatsMessageBroker { - const BLOCKS_STREAM: &'static str = "block_importer"; - const BLOCKS_SUBJECT: &'static str = "block_submitted"; - - pub async fn new(opts: &NatsOpts) -> Result { + async fn new(opts: &NatsOpts) -> Result { let url = &opts.url(); let client = opts.connect_opts().connect(url).await.map_err(|e| { MessageBrokerError::Connection(format!( @@ -57,153 +45,58 @@ impl NatsMessageBroker { }) } - fn stream_name(&self) -> String { - self.namespace().queue_name(Self::BLOCKS_STREAM) + async fn with_url(url: &str) -> Result { + let opts = NatsOpts::new(url.to_string()); + Self::new(&opts).await } - fn consumer_name(&self) -> String { - format!("{}_consumer", self.stream_name()) + async fn with_namespace( + url: &str, + namespace: &str, + ) -> Result { + let opts = + crate::NatsOpts::new(url.to_string()).with_namespace(namespace); + Self::new(&opts).await } - fn blocks_subject(&self) -> String { - self.namespace().subject_name(Self::BLOCKS_SUBJECT) - } - - async fn get_blocks_stream( - &self, - ) -> Result { - let subject_name = format!("{}.>", self.blocks_subject()); - let stream_name = self.stream_name(); - let stream = self - .jetstream - .get_or_create_stream(StreamConfig { - name: stream_name, - subjects: vec![subject_name], - retention: RetentionPolicy::WorkQueue, - duplicate_window: Duration::from_secs(1), - allow_direct: true, - ..Default::default() - }) - .await - .map_err(|e| MessageBrokerError::Setup(e.to_string()))?; - Ok(stream) + pub async fn setup( + url: &str, + namespace: Option<&str>, + ) -> Result, MessageBrokerError> { + let broker = match namespace { + Some(namespace) => Self::with_namespace(url, namespace).await?, + None => Self::with_url(url).await?, + }; + broker.setup_queues().await?; + Ok(broker.arc()) } - async fn get_blocks_consumer( - &self, - ) -> Result { - let consumer_name = self.consumer_name(); - let stream = self.get_blocks_stream().await?; - let mut config = ConsumerConfig { - durable_name: Some(consumer_name.to_string()), - ack_policy: AckPolicy::Explicit, - ..Default::default() - }; - if let Some(ack_wait) = self.opts.ack_wait_secs { - config.ack_wait = Duration::from_secs(ack_wait); - } - stream - .get_or_create_consumer(&consumer_name, config) - .await - .map_err(|e| MessageBrokerError::Setup(e.to_string())) + async fn setup_queues(&self) -> Result<(), MessageBrokerError> { + let block_importer = NatsQueue::BlockImporter(self.arc()); + let block_retrier = NatsQueue::BlockRetrier(self.arc()); + block_importer.setup().await?; + block_retrier.setup().await?; + Ok(()) } - pub async fn get_stream_info( - &self, - ) -> Result, MessageBrokerError> { - let mut streams = self.jetstream.streams(); - let mut infos = vec![]; - while let Some(stream) = streams.next().await { - let stream = - stream.map_err(|e| MessageBrokerError::Setup(e.to_string()))?; - infos.push(StreamInfo { - stream_name: stream.config.name, - state: stream.state.into(), - }); - } - Ok(infos) + pub fn client(&self) -> Arc { + Arc::new(self.client.to_owned()) } pub fn arc(&self) -> Arc { Arc::new(self.clone()) } -} -#[async_trait] -impl Message for NatsMessage { - fn payload(&self) -> Vec { - self.0.payload.to_vec() - } - - fn id(&self) -> String { - self.0.subject.to_string() - } - - async fn ack(&self) -> Result<(), MessageBrokerError> { - self.0 - .ack() - .await - .map_err(|e| MessageBrokerError::Acknowledgment(e.to_string())) - } -} - -#[async_trait] -impl MessageBroker for NatsMessageBroker { - fn namespace(&self) -> &Namespace { + pub fn namespace(&self) -> &Namespace { &self.namespace } - fn is_connected(&self) -> bool { + pub fn is_connected(&self) -> bool { let state = self.client.connection_state(); state == async_nats::connection::State::Connected } - async fn setup(&self) -> Result<(), MessageBrokerError> { - let _ = self.get_blocks_stream().await?; - let _ = self.get_blocks_consumer().await?; - Ok(()) - } - - async fn publish_block( - &self, - id: String, - payload: Vec, - ) -> Result<(), MessageBrokerError> { - let subject = format!("{}.{}", self.blocks_subject(), id); - let payload_id = format!("{}.block_{}", self.namespace(), id); - let publish = Publish::build() - .message_id(payload_id) - .payload(payload.into()); - self.jetstream - .send_publish(subject, publish) - .await - .map_err(|e| MessageBrokerError::Publishing(e.to_string()))? - .await - .map_err(|e| MessageBrokerError::Publishing(e.to_string()))?; - - Ok(()) - } - - async fn receive_blocks_stream( - &self, - batch_size: usize, - ) -> Result { - let consumer = self.get_blocks_consumer().await?; - let stream = consumer - .fetch() - .max_messages(batch_size) - .messages() - .await - .map_err(|e| MessageBrokerError::Receiving(e.to_string()))? - .filter_map(|msg| async { - msg.ok() - .map(|m| Ok(Box::new(NatsMessage(m)) as Box)) - }) - .boxed(); - Ok(Box::new(stream)) - } - - async fn publish_event( + pub async fn publish( &self, topic: &str, payload: bytes::Bytes, @@ -216,7 +109,7 @@ impl MessageBroker for NatsMessageBroker { Ok(()) } - async fn subscribe_to_events( + pub async fn subscribe( &self, topic: &str, ) -> Result { @@ -230,7 +123,23 @@ impl MessageBroker for NatsMessageBroker { Ok(Box::new(stream)) } - async fn flush(&self) -> Result<(), MessageBrokerError> { + pub async fn get_streams_info( + &self, + ) -> Result, MessageBrokerError> { + let mut streams = self.jetstream.streams(); + let mut infos = vec![]; + while let Some(stream) = streams.next().await { + let stream = + stream.map_err(|e| MessageBrokerError::Setup(e.to_string()))?; + infos.push(StreamInfo { + stream_name: stream.config.name, + state: stream.state.into(), + }); + } + Ok(infos) + } + + pub async fn flush(&self) -> Result<(), MessageBrokerError> { self.client.flush().await.map_err(|e| { MessageBrokerError::Flush(format!( "Failed to flush NATS client: {}", @@ -240,15 +149,15 @@ impl MessageBroker for NatsMessageBroker { Ok(()) } - async fn is_healthy(&self) -> bool { + pub async fn is_healthy(&self) -> bool { self.is_connected() } - async fn get_health_info( + pub async fn get_health_info( &self, uptime_secs: u64, ) -> Result { - let infos = self.get_stream_info().await?; + let infos = self.get_streams_info().await?; let health_info = NatsHealthInfo { uptime_secs, is_healthy: self.is_healthy().await, @@ -258,12 +167,33 @@ impl MessageBroker for NatsMessageBroker { } } +#[async_trait] +impl Message for NatsMessage { + fn payload(&self) -> Vec { + self.0.payload.to_vec() + } + + fn id(&self) -> String { + self.0.subject.to_string() + } + + async fn ack(&self) -> Result<(), MessageBrokerError> { + self.0 + .ack() + .await + .map_err(|e| MessageBrokerError::Acknowledgment(e.to_string())) + } +} + #[cfg(test)] mod tests { + use std::time::Duration; + use pretty_assertions::assert_eq; use rand::Rng; use super::*; + use crate::NatsSubject; const NATS_URL: &str = "nats://localhost:4222"; async fn setup_broker() -> Result { @@ -271,7 +201,7 @@ mod tests { .with_rdn_namespace() .with_ack_wait(1); let broker = NatsMessageBroker::new(&opts).await?; - broker.setup().await?; + broker.setup_queues().await?; Ok(broker) } @@ -288,9 +218,7 @@ mod tests { // Spawn a task to receive events let receiver = tokio::spawn(async move { - let mut events = - broker_clone.subscribe_to_events("test.topic").await?; - + let mut events = broker_clone.subscribe("test.topic").await?; tokio::time::timeout(Duration::from_secs(5), events.next()) .await .map_err(|_| { @@ -306,9 +234,7 @@ mod tests { // Add a small delay to ensure subscriber is ready tokio::time::sleep(Duration::from_millis(100)).await; - broker - .publish_event("test.topic", vec![4, 5, 6].into()) - .await?; + broker.publish("test.topic", vec![4, 5, 6].into()).await?; let result = receiver.await.expect("receiver task panicked")?; let topic = format!("{}.{}", broker.namespace(), "test.topic"); assert_eq!(result, (topic, vec![4, 5, 6])); @@ -319,16 +245,23 @@ mod tests { async fn test_work_queue_batch_size_limiting( ) -> Result<(), MessageBrokerError> { let broker = setup_broker().await?; + let queue = NatsQueue::BlockImporter(broker.arc()); // Publish 3 messages - broker.publish_block("1".to_string(), vec![1]).await?; - broker.publish_block("2".to_string(), vec![2]).await?; - broker.publish_block("3".to_string(), vec![3]).await?; + queue + .publish(&NatsSubject::BlockSubmitted(1_u64), vec![1]) + .await?; + queue + .publish(&NatsSubject::BlockSubmitted(2_u64), vec![2]) + .await?; + queue + .publish(&NatsSubject::BlockSubmitted(3_u64), vec![3]) + .await?; // Receive with batch size of 2 - let mut stream = broker.receive_blocks_stream(2).await?; + let mut message_stream = queue.subscribe(2).await?; let mut received = Vec::new(); - while let Some(msg) = stream.next().await { + while let Some(msg) = message_stream.next().await { let msg = msg?; received.push(msg.payload().to_vec()); msg.ack().await?; @@ -339,7 +272,6 @@ mod tests { 2, "Should only receive batch_size messages" ); - Ok(()) } @@ -347,11 +279,15 @@ mod tests { async fn test_work_queue_unacked_message_redelivery( ) -> Result<(), MessageBrokerError> { let broker = setup_broker().await?; - broker.publish_block("1".to_string(), vec![1]).await?; + let queue = NatsQueue::BlockImporter(broker.arc()); + + queue + .publish(&NatsSubject::BlockSubmitted(1_u64), vec![1]) + .await?; { - let mut stream = broker.receive_blocks_stream(1).await?; - let msg = stream.next().await.unwrap(); + let mut message_stream = queue.subscribe(1).await?; + let msg = message_stream.next().await.unwrap(); assert!(msg.is_ok()); let msg = msg.unwrap(); assert_eq!(msg.payload(), &[1]); @@ -361,8 +297,8 @@ mod tests { tokio::time::sleep(Duration::from_secs(2)).await; // Receive message again - let mut stream = broker.receive_blocks_stream(1).await?; - let msg = stream.next().await.unwrap(); + let mut message_stream = queue.subscribe(1).await?; + let msg = message_stream.next().await.unwrap(); assert!(msg.is_ok()); Ok(()) } @@ -371,14 +307,15 @@ mod tests { async fn test_work_queue_multiple_consumers( ) -> Result<(), MessageBrokerError> { let broker = setup_broker().await?; - let broker1 = broker.clone(); - let broker2 = broker.clone(); - let broker3 = broker.clone(); + let stream1 = NatsQueue::BlockImporter(broker.arc()); + let stream2 = NatsQueue::BlockImporter(broker.arc()); + let stream3 = NatsQueue::BlockImporter(broker.arc()); + let queue = NatsQueue::BlockImporter(broker.arc()); // Spawn three consumer tasks let consumer1 = tokio::spawn(async move { - let mut stream = broker1.receive_blocks_stream(1).await?; - let msg = stream.next().await.ok_or_else(|| { + let mut message_stream = stream1.subscribe(1).await?; + let msg = message_stream.next().await.ok_or_else(|| { MessageBrokerError::Receiving("No message received".into()) })??; msg.ack().await?; @@ -386,8 +323,8 @@ mod tests { }); let consumer2 = tokio::spawn(async move { - let mut stream = broker2.receive_blocks_stream(1).await?; - let msg = stream.next().await.ok_or_else(|| { + let mut message_stream = stream2.subscribe(1).await?; + let msg = message_stream.next().await.ok_or_else(|| { MessageBrokerError::Receiving("No message received".into()) })??; msg.ack().await?; @@ -395,8 +332,8 @@ mod tests { }); let consumer3 = tokio::spawn(async move { - let mut stream = broker3.receive_blocks_stream(1).await?; - let msg = stream.next().await.ok_or_else(|| { + let mut message_stream = stream3.subscribe(1).await?; + let msg = message_stream.next().await.ok_or_else(|| { MessageBrokerError::Receiving("No message received".into()) })??; msg.ack().await?; @@ -406,15 +343,14 @@ mod tests { let heights = (0..3).map(|_| random_height() as u8).collect::>(); // Publish three messages - broker - .publish_block(heights[0].to_string(), vec![heights[0]]) - .await?; - broker - .publish_block(heights[1].to_string(), vec![heights[1]]) - .await?; - broker - .publish_block(heights[2].to_string(), vec![heights[2]]) - .await?; + for height in &heights { + queue + .publish( + &NatsSubject::BlockSubmitted(height.to_owned() as u64), + vec![*height], + ) + .await?; + } // Collect results from all consumers let msg1 = consumer1.await.expect("consumer1 task panicked")?; diff --git a/crates/message-broker/src/nats_queue.rs b/crates/message-broker/src/nats_queue.rs new file mode 100644 index 00000000..383eed07 --- /dev/null +++ b/crates/message-broker/src/nats_queue.rs @@ -0,0 +1,166 @@ +use std::{sync::Arc, time::Duration}; + +use async_nats::jetstream::{ + consumer::{pull::Config as ConsumerConfig, AckPolicy, PullConsumer}, + stream::{Config as StreamConfig, RetentionPolicy}, +}; +use futures::StreamExt; + +use crate::{MessageBrokerError, NatsMessageBroker}; + +pub enum NatsSubject { + BlockSubmitted(u64), + BlockFailed(u64), + BlockSuccess(u64), +} + +impl NatsSubject { + pub fn to_string(&self, queue: &NatsQueue) -> String { + let queue_name = queue.queue_name(); + match self { + NatsSubject::BlockSubmitted(height) => { + format!("{queue_name}.block_submitted.{height}") + } + NatsSubject::BlockFailed(id) => { + format!("{queue_name}.block_failed.{id}") + } + NatsSubject::BlockSuccess(id) => { + format!("{queue_name}.block_success.{id}") + } + } + } + + fn to_id(&self, queue: &NatsQueue) -> String { + self.to_string(queue).replace(".", "_") + } +} + +pub enum NatsQueue { + BlockImporter(Arc), + BlockRetrier(Arc), +} + +impl NatsQueue { + fn broker(&self) -> &NatsMessageBroker { + match self { + NatsQueue::BlockImporter(broker) => broker, + NatsQueue::BlockRetrier(broker) => broker, + } + } + + fn queue_name(&self) -> String { + let value = match self { + NatsQueue::BlockImporter(_) => "block_importer", + NatsQueue::BlockRetrier(_) => "block_retrier", + }; + self.broker().namespace().queue_name(value) + } + + fn subjects(&self) -> Vec { + let queue_name = self.queue_name(); + match self { + NatsQueue::BlockImporter(_) => { + vec![format!("{queue_name}.block_submitted.>")] + } + NatsQueue::BlockRetrier(_) => { + vec![format!("{queue_name}.block_failed.>")] + } + } + } + + fn consumer_name(&self) -> String { + format!("{}_consumer", self.queue_name()) + } + + pub async fn get_or_create_stream( + &self, + ) -> Result { + let broker = self.broker(); + let subjects = self.subjects(); + let name = self.queue_name(); + broker + .jetstream + .get_or_create_stream(StreamConfig { + name, + subjects, + retention: RetentionPolicy::WorkQueue, + duplicate_window: Duration::from_secs(1), + allow_direct: true, + ..Default::default() + }) + .await + .map_err(|e| MessageBrokerError::Setup(e.to_string())) + } + + pub async fn get_or_create_consumer( + &self, + ) -> Result { + let stream = self.get_or_create_stream().await?; + let consumer_name = self.consumer_name(); + let broker = self.broker(); + let mut config = ConsumerConfig { + durable_name: Some(consumer_name.to_string()), + ack_policy: AckPolicy::Explicit, + ..Default::default() + }; + + if let Some(ack_wait) = broker.opts.ack_wait_secs { + config.ack_wait = Duration::from_secs(ack_wait); + } + + stream + .get_or_create_consumer(&consumer_name, config) + .await + .map_err(|e| MessageBrokerError::Setup(e.to_string())) + } + + pub async fn publish>( + &self, + subject: &NatsSubject, + payload: T, + ) -> Result<(), MessageBrokerError> { + let broker = self.broker(); + let subject_str = subject.to_string(self); + let subject_id = subject.to_id(self); + let publish = async_nats::jetstream::context::Publish::build() + .message_id(subject_id) + .payload(payload.into()); + + broker + .jetstream + .send_publish(subject_str, publish) + .await + .map_err(|e| MessageBrokerError::Publishing(e.to_string()))? + .await + .map_err(|e| MessageBrokerError::Publishing(e.to_string()))?; + + Ok(()) + } + + pub async fn subscribe( + &self, + batch_size: usize, + ) -> Result { + let consumer = self.get_or_create_consumer().await?; + let stream = consumer + .fetch() + .max_messages(batch_size) + .messages() + .await + .map_err(|e| MessageBrokerError::Receiving(e.to_string()))? + .filter_map(|msg| async { + msg.ok().map(|m| { + Ok(Box::new(crate::NatsMessage(m)) + as Box) + }) + }) + .boxed(); + Ok(Box::new(stream)) + } + + pub async fn setup(&self) -> Result<(), MessageBrokerError> { + let _ = self.get_or_create_stream().await?; + let _ = self.get_or_create_consumer().await?; + Ok(()) + } +} diff --git a/crates/types/src/fuel_core.rs b/crates/types/src/fuel_core.rs index b276d8c9..023c35c9 100644 --- a/crates/types/src/fuel_core.rs +++ b/crates/types/src/fuel_core.rs @@ -189,14 +189,21 @@ pub trait FuelCoreLike: Sync + Send { fn get_sealed_block( &self, block_height: crate::BlockHeight, - ) -> Sealed { + ) -> FuelCoreResult { let height = block_height.as_ref().to_owned() as u32; - self.onchain_database() + let latest_view = self + .onchain_database() .latest_view() - .expect("failed to get latest db view") + .map_err(|e| FuelCoreError::Database(e.to_string()))?; + + let sealed_block = latest_view .get_sealed_block_by_height(&height.into()) - .expect("Failed to get latest block height") - .expect("NATS Publisher: no block at height {height}") + .map_err(|e| FuelCoreError::Database(e.to_string()))? + .ok_or_else(|| { + FuelCoreError::Database(format!("No block at height {height}")) + })?; + + Ok(sealed_block) } } diff --git a/crates/web-utils/Cargo.toml b/crates/web-utils/Cargo.toml index 2a4c8b13..a0c730ca 100644 --- a/crates/web-utils/Cargo.toml +++ b/crates/web-utils/Cargo.toml @@ -52,6 +52,7 @@ tokio.workspace = true tokio-util = "0.7.13" tracing.workspace = true tracing-actix-web.workspace = true +tracing-subscriber.workspace = true url = "2.5" urlencoding = "2.1" diff --git a/crates/web-utils/src/lib.rs b/crates/web-utils/src/lib.rs index 7bea99ab..9a7855d5 100644 --- a/crates/web-utils/src/lib.rs +++ b/crates/web-utils/src/lib.rs @@ -1,6 +1,7 @@ pub mod server; pub mod shutdown; pub mod telemetry; +pub mod tracing; use std::sync::LazyLock; diff --git a/crates/web-utils/src/server/middlewares/api_key/api_key_impl.rs b/crates/web-utils/src/server/middlewares/api_key/api_key_impl.rs index caf4aa6e..f51e0adb 100644 --- a/crates/web-utils/src/server/middlewares/api_key/api_key_impl.rs +++ b/crates/web-utils/src/server/middlewares/api_key/api_key_impl.rs @@ -124,7 +124,7 @@ impl ApiKey { let random_num = rand::thread_rng() .sample_iter(&Alphanumeric) .filter(|c| c.is_ascii_alphabetic()) - .take(12) + .take(32) .map(char::from) .collect::(); format!("fuel-{}", random_num) diff --git a/crates/web-utils/src/shutdown.rs b/crates/web-utils/src/shutdown.rs index a2ab539f..a5ed0d42 100644 --- a/crates/web-utils/src/shutdown.rs +++ b/crates/web-utils/src/shutdown.rs @@ -1,11 +1,11 @@ use std::{sync::Arc, time::Duration}; -use fuel_message_broker::MessageBroker; +use fuel_message_broker::NatsMessageBroker; use tokio_util::sync::CancellationToken; pub const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(90); -pub async fn shutdown_broker_with_timeout(broker: &Arc) { +pub async fn shutdown_broker_with_timeout(broker: &Arc) { let _ = tokio::time::timeout(GRACEFUL_SHUTDOWN_TIMEOUT, async { tracing::info!("Flushing in-flight messages to broker ..."); match broker.flush().await { diff --git a/crates/web-utils/src/tracing.rs b/crates/web-utils/src/tracing.rs new file mode 100644 index 00000000..e5680d00 --- /dev/null +++ b/crates/web-utils/src/tracing.rs @@ -0,0 +1,18 @@ +use tracing::level_filters::LevelFilter; +use tracing_subscriber::fmt::time; + +pub fn init_tracing() { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(), + ) + .with_timer(time::LocalTime::rfc_3339()) + .with_target(false) + .with_thread_ids(false) + .with_file(true) + .with_line_number(true) + .with_level(true) + .init(); +} diff --git a/services/consumer/Cargo.toml b/services/consumer/Cargo.toml index a9ed9d49..eab0ad2d 100644 --- a/services/consumer/Cargo.toml +++ b/services/consumer/Cargo.toml @@ -43,7 +43,6 @@ thiserror.workspace = true tokio.workspace = true tokio-util = "0.7.13" tracing.workspace = true -tracing-subscriber.workspace = true # these dependencies need to update in the future when fuel-core 0.41.4 is on mainnet [target.'cfg(target_os = "linux")'.dependencies] diff --git a/services/consumer/src/executor/block_executor.rs b/services/consumer/src/executor/block_executor.rs index d87b60da..9366ddcc 100644 --- a/services/consumer/src/executor/block_executor.rs +++ b/services/consumer/src/executor/block_executor.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use fuel_message_broker::{Message, MessageBroker}; +use fuel_message_broker::{Message, NatsMessageBroker, NatsQueue}; use fuel_streams_core::{ types::{Block, Transaction}, FuelStreams, @@ -39,7 +39,7 @@ enum ProcessResult { pub struct BlockExecutor { db: Arc, - message_broker: Arc, + message_broker: Arc, fuel_streams: Arc, fuel_stores: Arc, semaphore: Arc, @@ -50,7 +50,7 @@ pub struct BlockExecutor { impl BlockExecutor { pub fn new( db: Arc, - message_broker: &Arc, + message_broker: &Arc, fuel_streams: &Arc, telemetry: Arc>, ) -> Self { @@ -79,9 +79,10 @@ impl BlockExecutor { self.max_tasks ); let telemetry = self.telemetry.clone(); + let queue = NatsQueue::BlockImporter(self.message_broker.clone()); while !token.is_cancelled() { tokio::select! { - msg_result = self.message_broker.receive_blocks_stream(BATCH_SIZE) => { + msg_result = queue.subscribe(BATCH_SIZE) => { let mut messages = msg_result?; while let Some(msg) = messages.next().await { let msg = msg?; diff --git a/services/consumer/src/main.rs b/services/consumer/src/main.rs index 04e1981e..2b37043a 100644 --- a/services/consumer/src/main.rs +++ b/services/consumer/src/main.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use clap::Parser; -use fuel_message_broker::{MessageBroker, MessageBrokerClient}; +use fuel_message_broker::NatsMessageBroker; use fuel_streams_core::FuelStreams; use fuel_streams_store::db::{Db, DbConnectionOpts}; use fuel_web_utils::{shutdown::ShutdownController, telemetry::Telemetry}; @@ -12,26 +12,10 @@ use sv_consumer::{ BlockExecutor, Server, }; -use tracing::level_filters::LevelFilter; -use tracing_subscriber::fmt::time; #[tokio::main] async fn main() -> anyhow::Result<()> { - // Initialize tracing subscriber - tracing_subscriber::fmt() - .with_env_filter( - tracing_subscriber::EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .from_env_lossy(), - ) - .with_timer(time::LocalTime::rfc_3339()) - .with_target(false) - .with_thread_ids(false) - .with_file(true) - .with_line_number(true) - .with_level(true) - .init(); - + fuel_web_utils::tracing::init_tracing(); if let Err(err) = dotenvy::dotenv() { tracing::warn!("File .env not found: {:?}", err); } @@ -42,7 +26,7 @@ async fn main() -> anyhow::Result<()> { // Initialize shared resources let db = setup_db(&cli.db_url).await?; - let message_broker = setup_message_broker(&cli.nats_url).await?; + let message_broker = NatsMessageBroker::setup(&cli.nats_url, None).await?; let metrics = Metrics::new(None)?; let telemetry = Telemetry::new(Some(metrics)).await?; telemetry.start().await?; @@ -84,12 +68,3 @@ async fn setup_db(db_url: &str) -> Result, ConsumerError> { .await?; Ok(db.arc()) } - -async fn setup_message_broker( - nats_url: &str, -) -> Result, ConsumerError> { - let broker = MessageBrokerClient::Nats; - let broker = broker.start(nats_url).await?; - broker.setup().await?; - Ok(broker) -} diff --git a/services/consumer/src/server/mod.rs b/services/consumer/src/server/mod.rs index 45b312a7..e5b11d8e 100644 --- a/services/consumer/src/server/mod.rs +++ b/services/consumer/src/server/mod.rs @@ -3,7 +3,7 @@ pub(crate) mod state; use std::sync::Arc; -use fuel_message_broker::MessageBroker; +use fuel_message_broker::NatsMessageBroker; use fuel_web_utils::{ server::api::build_and_spawn_web_server, telemetry::Telemetry, @@ -14,14 +14,14 @@ use crate::{errors::ConsumerError, state::ServerState}; pub struct Server { port: u16, - message_broker: Arc, + message_broker: Arc, telemetry: Arc>, } impl Server { pub fn new( port: u16, - message_broker: Arc, + message_broker: Arc, telemetry: Arc>, ) -> Self { Self { diff --git a/services/consumer/src/server/state.rs b/services/consumer/src/server/state.rs index f43a45eb..ba32da22 100644 --- a/services/consumer/src/server/state.rs +++ b/services/consumer/src/server/state.rs @@ -4,20 +4,20 @@ use std::{ }; use async_trait::async_trait; -use fuel_message_broker::MessageBroker; +use fuel_message_broker::NatsMessageBroker; use fuel_web_utils::{server::state::StateProvider, telemetry::Telemetry}; use crate::metrics::Metrics; pub struct ServerState { pub start_time: Instant, - pub msg_broker: Arc, + pub msg_broker: Arc, pub telemetry: Arc>, } impl ServerState { pub fn new( - msg_broker: Arc, + msg_broker: Arc, telemetry: Arc>, ) -> Self { Self { diff --git a/services/publisher/src/error.rs b/services/publisher/src/error.rs index b3b71c26..e16f1cfb 100644 --- a/services/publisher/src/error.rs +++ b/services/publisher/src/error.rs @@ -1,3 +1,5 @@ +use std::str::Utf8Error; + use fuel_message_broker::MessageBrokerError; use fuel_streams_core::types::*; use fuel_streams_domains::MsgPayloadError; @@ -25,4 +27,6 @@ pub enum PublishError { BlockNotFound, #[error(transparent)] Sqlx(#[from] sqlx::Error), + #[error("Failed to get block height from encoded utf8 string")] + BlockHeightFromUtf8(#[from] Utf8Error), } diff --git a/services/publisher/src/main.rs b/services/publisher/src/main.rs index e8fc31d4..8b6a4051 100644 --- a/services/publisher/src/main.rs +++ b/services/publisher/src/main.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use clap::Parser; -use fuel_message_broker::{MessageBroker, MessageBrokerClient}; +use fuel_message_broker::NatsMessageBroker; use fuel_streams_core::types::*; use fuel_streams_store::{ db::{Db, DbConnectionOpts}, @@ -19,7 +19,7 @@ use sv_publisher::{ error::PublishError, metrics::Metrics, publish::publish_block, - recover::recover_block_with_tx_status_none, + recover::recover_tx_status_none, state::ServerState, }; use tokio_util::sync::CancellationToken; @@ -32,7 +32,7 @@ async fn main() -> anyhow::Result<()> { fuel_core.start().await?; let db = setup_db(&cli.db_url).await?; - let message_broker = setup_message_broker(&cli.nats_url).await?; + let message_broker = NatsMessageBroker::setup(&cli.nats_url, None).await?; let metrics = Metrics::new(None)?; let telemetry = Telemetry::new(Some(metrics)).await?; telemetry.start().await?; @@ -51,7 +51,7 @@ async fn main() -> anyhow::Result<()> { tracing::info!("Last block height: {}", last_block_height); tokio::select! { result = async { - let recover_blocks = recover_block_with_tx_status_none( + let recover_blocks = recover_tx_status_none( &db, &message_broker, &fuel_core, @@ -104,15 +104,6 @@ async fn setup_db(db_url: &str) -> Result { Ok(db) } -async fn setup_message_broker( - nats_url: &str, -) -> Result, PublishError> { - let broker = MessageBrokerClient::Nats; - let broker = broker.start(nats_url).await?; - broker.setup().await?; - Ok(broker) -} - async fn find_last_published_height( db: &Db, ) -> Result { @@ -155,7 +146,7 @@ fn get_historical_block_range( fn process_historical_blocks( from_height: BlockHeight, - message_broker: &Arc, + message_broker: &Arc, fuel_core: &Arc, last_block_height: &Arc, last_published_height: &Arc, @@ -180,11 +171,11 @@ fn process_historical_blocks( .map(|height| { let message_broker = message_broker.clone(); let fuel_core = fuel_core.clone(); - let sealed_block = - fuel_core.get_sealed_block(height.into()); - let sealed_block = Arc::new(sealed_block); let telemetry = telemetry.clone(); async move { + let sealed_block = + fuel_core.get_sealed_block(height.into())?; + let sealed_block = Arc::new(sealed_block); publish_block( &message_broker, &fuel_core, @@ -212,7 +203,7 @@ fn process_historical_blocks( } async fn process_live_blocks( - message_broker: &Arc, + message_broker: &Arc, fuel_core: &Arc, token: CancellationToken, telemetry: &Arc>, diff --git a/services/publisher/src/publish.rs b/services/publisher/src/publish.rs index 2f633db9..104b3443 100644 --- a/services/publisher/src/publish.rs +++ b/services/publisher/src/publish.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use fuel_core_types::blockchain::SealedBlock; -use fuel_message_broker::MessageBroker; +use fuel_message_broker::{NatsMessageBroker, NatsQueue, NatsSubject}; use fuel_streams_core::types::FuelCoreLike; use fuel_streams_domains::{Metadata, MsgPayload}; use fuel_streams_store::record::DataEncoder; @@ -10,7 +10,7 @@ use fuel_web_utils::telemetry::Telemetry; use crate::{error::PublishError, metrics::Metrics}; pub async fn publish_block( - message_broker: &Arc, + message_broker: &Arc, fuel_core: &Arc, sealed_block: &Arc, telemetry: &Arc>, @@ -19,14 +19,13 @@ pub async fn publish_block( let fuel_core = Arc::clone(fuel_core); let payload = MsgPayload::new(fuel_core, sealed_block, &metadata).await?; let encoded = payload.encode().await?; + let queue = NatsQueue::BlockImporter(message_broker.clone()); + let subject = NatsSubject::BlockSubmitted(payload.block_height().into()); - message_broker - .publish_block(payload.message_id(), encoded.clone()) - .await?; - + queue.publish(&subject, encoded.clone()).await?; if let Some(metrics) = telemetry.base_metrics() { metrics.update_publisher_success_metrics( - &payload.subject(), + &subject.to_string(&queue), encoded.len(), ); } diff --git a/services/publisher/src/recover.rs b/services/publisher/src/recover.rs index 08f84e1c..2bb30dd3 100644 --- a/services/publisher/src/recover.rs +++ b/services/publisher/src/recover.rs @@ -1,26 +1,21 @@ -//! TEMPORARY FILE -//! This module contains recovery logic for transactions with 'none' status. -//! It can be safely deleted once all transactions have been recovered and -//! there are no more transactions with 'none' status in the database, -//! or we can use this to create a proper failure mechanism as a service in -//! in the future. - use std::sync::Arc; -use fuel_message_broker::MessageBroker; -use fuel_streams_core::types::{BlockHeight, FuelCoreLike}; +use fuel_message_broker::NatsMessageBroker; +use fuel_streams_core::types::FuelCoreLike; use fuel_streams_store::db::Db; use fuel_web_utils::telemetry::Telemetry; +use tokio::sync::Semaphore; -use crate::{error::PublishError, metrics::Metrics, publish::publish_block}; +use crate::{metrics::Metrics, publish::publish_block}; -pub async fn recover_block_with_tx_status_none( +pub async fn recover_tx_status_none( db: &Db, - message_broker: &Arc, + message_broker: &Arc, fuel_core: &Arc, telemetry: &Arc>, -) -> Result<(), PublishError> { +) -> anyhow::Result<()> { let db = db.to_owned().arc(); + let semaphore = Arc::new(Semaphore::new(10)); // Get total count of distinct block heights let count: i64 = sqlx::query_as::<_, (i64,)>( @@ -42,7 +37,6 @@ pub async fn recover_block_with_tx_status_none( count ); - // Fetch batch of distinct block heights let block_heights = sqlx::query_as::<_, (i64,)>( "SELECT DISTINCT block_height FROM transactions WHERE tx_status = 'none' \ ORDER BY block_height LIMIT $1 OFFSET $2" @@ -52,30 +46,64 @@ pub async fn recover_block_with_tx_status_none( .fetch_all(&db.pool) .await?; - // Process each block height - for (block_height_u64,) in block_heights { - let block_height = BlockHeight::from(block_height_u64); - let sealed_block = fuel_core.get_sealed_block(block_height); - let sealed_block = Arc::new(sealed_block); - tracing::info!("Recovering block #{}", block_height); - publish_block(message_broker, fuel_core, &sealed_block, telemetry) - .await?; + let mut join_set: tokio::task::JoinSet> = + tokio::task::JoinSet::new(); + for (block_height,) in block_heights { + join_set.spawn({ + let db = db.clone(); + let semaphore = semaphore.clone(); + let message_broker = message_broker.clone(); + let fuel_core = fuel_core.clone(); + let telemetry = telemetry.clone(); + async move { + let _permit = semaphore.acquire().await.map_err(|e| { + tracing::error!("Failed to acquire semaphore for block #{}: {}", block_height, e); + anyhow::anyhow!("Semaphore error: {}", e) + })?; + + let block_height_str = block_height.to_string(); + let sealed_block = fuel_core.get_sealed_block(block_height.into()).map_err(|e| { + tracing::error!("Failed to get sealed block #{}: {}", block_height, e); + anyhow::anyhow!("Get sealed block error: {}", e) + })?; + + let sealed_block = Arc::new(sealed_block); + tracing::info!("Recovering block #{}", block_height); + + publish_block(&message_broker, &fuel_core, &sealed_block, &telemetry).await.map_err(|e| { + tracing::error!("Failed to publish block #{}: {}", block_height, e); + anyhow::anyhow!("Publish block error: {}", e) + })?; - // Delete all transactions with status 'none' for this block height - let deleted = sqlx::query( - "DELETE FROM transactions WHERE block_height = $1 AND tx_status = 'none'" - ) - .bind(block_height_u64) - .execute(&db.pool) - .await?; - tracing::info!( - "Deleted {} transactions with status 'none' from block #{}", - deleted.rows_affected(), - block_height - ); + sqlx::query( + "DELETE FROM transactions WHERE block_height = $1 AND tx_status = 'none'" + ) + .bind(block_height_str) + .execute(&db.pool) + .await + .map_err(|e| { + tracing::error!("Failed to delete transactions for block #{}: {}", block_height, e); + anyhow::anyhow!("Delete transactions error: {}", e) + })?; + + tracing::info!( + "Successfully processed and deleted transactions for block #{}", + block_height + ); + Ok(()) + } + }); + } + + // Wait for all tasks in the batch to complete, but don't fail if individual tasks fail + while let Some(result) = join_set.join_next().await { + if let Err(e) = result { + tracing::error!("Task join error: {}", e); + } } offset += BATCH_SIZE; } + tracing::info!("Recovery process completed successfully"); Ok(()) } diff --git a/services/publisher/src/state.rs b/services/publisher/src/state.rs index bbcaa9b6..f6efd371 100644 --- a/services/publisher/src/state.rs +++ b/services/publisher/src/state.rs @@ -4,7 +4,7 @@ use std::{ }; use async_trait::async_trait; -use fuel_message_broker::MessageBroker; +use fuel_message_broker::NatsMessageBroker; use fuel_web_utils::{server::state::StateProvider, telemetry::Telemetry}; use serde::{Deserialize, Serialize}; @@ -18,13 +18,13 @@ pub struct HealthResponse { pub struct ServerState { pub start_time: Instant, - pub msg_broker: Arc, + pub msg_broker: Arc, pub telemetry: Arc>, } impl ServerState { pub fn new( - msg_broker: Arc, + msg_broker: Arc, telemetry: Arc>, ) -> Self { Self { diff --git a/services/webserver/src/server/state.rs b/services/webserver/src/server/state.rs index 2716cffb..b025947b 100644 --- a/services/webserver/src/server/state.rs +++ b/services/webserver/src/server/state.rs @@ -4,7 +4,7 @@ use std::{ }; use async_trait::async_trait; -use fuel_message_broker::{MessageBroker, MessageBrokerClient}; +use fuel_message_broker::NatsMessageBroker; use fuel_streams_core::FuelStreams; use fuel_streams_store::db::{Db, DbConnectionOpts}; use fuel_web_utils::{ @@ -23,7 +23,7 @@ use crate::{config::Config, metrics::Metrics, API_PASSWORD}; #[derive(Clone)] pub struct ServerState { pub start_time: Instant, - pub msg_broker: Arc, + pub msg_broker: Arc, pub fuel_streams: Arc, pub telemetry: Arc>, pub db: Arc, @@ -33,9 +33,8 @@ pub struct ServerState { impl ServerState { pub async fn new(config: &Config) -> anyhow::Result { - let msg_broker = - MessageBrokerClient::Nats.start(&config.broker.url).await?; - + let url = &config.broker.url; + let msg_broker = NatsMessageBroker::setup(url, None).await?; let db = Db::new(DbConnectionOpts { connection_str: config.db.url.clone(), ..Default::default() diff --git a/tests/src/lib.rs b/tests/src/lib.rs index 5e715890..a77d6cc4 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -3,7 +3,7 @@ mod fuel_core_helpers; use std::sync::Arc; pub use fuel_core_helpers::*; -use fuel_message_broker::{MessageBroker, MessageBrokerClient}; +use fuel_message_broker::NatsMessageBroker; use fuel_streams_core::{stream::*, subjects::IntoSubject, types::Block}; use fuel_streams_domains::blocks::{ subjects::BlocksSubject, @@ -35,18 +35,13 @@ pub async fn setup_store() -> DbResult> { Ok(store) } -pub async fn setup_message_broker( +pub async fn setup_stream( nats_url: &str, -) -> anyhow::Result> { - let broker = MessageBrokerClient::Nats.start(nats_url).await?; - broker.setup().await?; - Ok(broker) -} - -pub async fn setup_stream(nats_url: &str) -> anyhow::Result> { + prefix: &str, +) -> anyhow::Result> { let db = setup_db().await?; - let nats_client = setup_message_broker(nats_url).await?; - let stream = Stream::::get_or_init(&nats_client, &db.arc()).await; + let broker = NatsMessageBroker::setup(nats_url, Some(prefix)).await?; + let stream = Stream::::get_or_init(&broker, &db.arc()).await; Ok(stream) } diff --git a/tests/tests/services/consumer.rs b/tests/tests/services/consumer.rs index 5508e2a4..a30f3140 100644 --- a/tests/tests/services/consumer.rs +++ b/tests/tests/services/consumer.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use fuel_message_broker::MessageBrokerClient; +use fuel_message_broker::{NatsMessageBroker, NatsQueue, NatsSubject}; use fuel_streams_core::{ inputs::InputsSubject, outputs::OutputsSubject, @@ -191,9 +191,9 @@ async fn test_consumer_inserting_records() -> anyhow::Result<()> { shutdown.clone().spawn_signal_handler(); // Setup real NATS broker - let message_broker = MessageBrokerClient::Nats - .start_with_namespace("nats://localhost:4222", &prefix) - .await?; + let message_broker = + NatsMessageBroker::setup("nats://localhost:4222", Some(&prefix)) + .await?; // Setup FuelStreams & FuelStores let fuel_streams = FuelStreams::new(&message_broker, &db).await.arc(); @@ -202,9 +202,10 @@ async fn test_consumer_inserting_records() -> anyhow::Result<()> { // Create and publish test message let msg_payload = MockMsgPayload::build(1).with_namespace(&prefix); let encoded_payload = msg_payload.encode().await?; - message_broker - .publish_block(msg_payload.message_id(), encoded_payload) - .await?; + let queue = NatsQueue::BlockImporter(message_broker.clone()); + let block_height = msg_payload.block_height().into(); + let subject = NatsSubject::BlockSubmitted(block_height); + queue.publish(&subject, encoded_payload).await?; // Process messages tokio::spawn({ diff --git a/tests/tests/stream/live_data.rs b/tests/tests/stream/live_data.rs index 4adb7bbc..d4faa2af 100644 --- a/tests/tests/stream/live_data.rs +++ b/tests/tests/stream/live_data.rs @@ -1,6 +1,10 @@ use fuel_streams_core::{server::DeliverPolicy, subjects::*, types::Block}; use fuel_streams_store::record::{DataEncoder, Record}; -use fuel_streams_test::{create_multiple_records, setup_stream}; +use fuel_streams_test::{ + create_multiple_records, + create_random_db_name, + setup_stream, +}; use futures::StreamExt; use pretty_assertions::assert_eq; @@ -8,7 +12,8 @@ const NATS_URL: &str = "nats://localhost:4222"; #[tokio::test] async fn test_streaming_live_data() -> anyhow::Result<()> { - let stream = setup_stream(NATS_URL).await?; + let prefix = create_random_db_name(); + let stream = setup_stream(NATS_URL, &prefix).await?; let data = create_multiple_records(10, 0); tokio::spawn({