Skip to content

Commit

Permalink
fix(sv-publisher): Recover mechanism for tx status none (#396)
Browse files Browse the repository at this point in the history
* fix(publisher): Recover mechanism for tx status none

* build(repo): trigger update

* build(repo): pre-commit config

* build(repo): pre commit
  • Loading branch information
pedronauck authored Jan 27, 2025
1 parent d62e206 commit 1b1083d
Show file tree
Hide file tree
Showing 31 changed files with 476 additions and 449 deletions.
5 changes: 3 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ exclude: |
(?x)^(
CHANGELOG.md|
pnpm-lock.yaml|
cluster/charts/.*|
node_modules/.*
cluster/charts|
node_modules|
target
)$
repos:
Expand Down
2 changes: 1 addition & 1 deletion .prettierignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
cluster/charts
pnpm-lock.yaml
cluster/charts
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions crates/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ use futures::StreamExt;
async fn main() -> anyhow::Result<()> {
// Connect to NATS server
let db = Db::new(DbConnectionOpts::default()).await?;
let broker = MessageBrokerClient::Nats.start("nats://localhost:4222").await?;
broker.setup().await?;
let broker = NatsMessageBroker::setup("nats://localhost:4222", None).await?;
// Create or get existing stream for blocks
let stream = Stream::<Block>::get_or_init(&broker, &db.arc()).await;
Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/stream/fuel_streams.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use fuel_message_broker::MessageBroker;
use fuel_message_broker::NatsMessageBroker;
use fuel_streams_store::db::Db;

use super::Stream;
Expand All @@ -14,12 +14,12 @@ pub struct FuelStreams {
pub outputs: Stream<Output>,
pub receipts: Stream<Receipt>,
pub utxos: Stream<Utxo>,
pub msg_broker: Arc<dyn MessageBroker>,
pub msg_broker: Arc<NatsMessageBroker>,
pub db: Arc<Db>,
}

impl FuelStreams {
pub async fn new(broker: &Arc<dyn MessageBroker>, db: &Arc<Db>) -> Self {
pub async fn new(broker: &Arc<NatsMessageBroker>, db: &Arc<Db>) -> Self {
Self {
blocks: Stream::<Block>::get_or_init(broker, db).await,
transactions: Stream::<Transaction>::get_or_init(broker, db).await,
Expand All @@ -36,7 +36,7 @@ impl FuelStreams {
Arc::new(self.clone())
}

pub fn broker(&self) -> Arc<dyn MessageBroker> {
pub fn broker(&self) -> Arc<NatsMessageBroker> {
self.msg_broker.clone()
}
}
12 changes: 6 additions & 6 deletions crates/core/src/stream/stream_impl.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{sync::Arc, time::Duration};

pub use async_nats::Subscriber as StreamLiveSubscriber;
use fuel_message_broker::MessageBroker;
use fuel_message_broker::NatsMessageBroker;
use fuel_streams_macros::subject::IntoSubject;
use fuel_streams_store::{db::Db, record::Record, store::Store};
use futures::{
Expand All @@ -19,7 +19,7 @@ pub type BoxedStream = Box<dyn FStream<Item = BoxedStoreItem> + Send + Unpin>;
#[derive(Debug, Clone)]
pub struct Stream<S: Record> {
store: Arc<Store<S>>,
broker: Arc<dyn MessageBroker>,
broker: Arc<NatsMessageBroker>,
_marker: std::marker::PhantomData<S>,
}

Expand All @@ -28,7 +28,7 @@ impl<R: Record> Stream<R> {
const INSTANCE: OnceCell<Self> = OnceCell::const_new();

pub async fn get_or_init(
broker: &Arc<dyn MessageBroker>,
broker: &Arc<NatsMessageBroker>,
db: &Arc<Db>,
) -> Self {
let cell = Self::INSTANCE;
Expand All @@ -37,7 +37,7 @@ impl<R: Record> Stream<R> {
.to_owned()
}

pub async fn new(broker: &Arc<dyn MessageBroker>, db: &Arc<Db>) -> Self {
pub async fn new(broker: &Arc<NatsMessageBroker>, db: &Arc<Db>) -> Self {
let store = Arc::new(Store::new(db));
let broker = Arc::clone(broker);
Self {
Expand All @@ -62,7 +62,7 @@ impl<R: Record> Stream<R> {
payload: bytes::Bytes,
) -> Result<(), StreamError> {
let broker = self.broker.clone();
broker.publish_event(subject, payload).await?;
broker.publish(subject, payload).await?;
Ok(())
}

Expand All @@ -83,7 +83,7 @@ impl<R: Record> Stream<R> {
sleep(Duration::from_millis(throttle_time as u64)).await;
}
}
let mut live = broker.subscribe_to_events(&subject.parse()).await?;
let mut live = broker.subscribe(&subject.parse()).await?;
while let Some(msg) = live.next().await {
yield msg?;
let throttle_time = *config::STREAM_THROTTLE_LIVE;
Expand Down
15 changes: 0 additions & 15 deletions crates/domains/src/msg_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,6 @@ impl MsgPayload {
.collect::<Vec<_>>()
}

pub fn message_id(&self) -> String {
let height = self.metadata.block_height.clone();
format!("block_{height}")
}

pub fn subject(&self) -> String {
let producer = self.metadata.block_producer.clone();
let height = self.metadata.block_height.clone();
format!("{}.{producer}.{height}", Self::subject_name())
}

pub fn subject_name() -> &'static str {
"block_submitted"
}

pub fn metadata(&self) -> &Metadata {
&self.metadata
}
Expand Down
2 changes: 2 additions & 0 deletions crates/message-broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ mod msg_broker;
mod nats;
pub mod nats_metrics;
mod nats_opts;
mod nats_queue;

pub use msg_broker::*;
pub use nats::*;
pub use nats_opts::*;
pub use nats_queue::*;
100 changes: 7 additions & 93 deletions crates/message-broker/src/msg_broker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt, sync::Arc};
use std::fmt;

use async_trait::async_trait;
use futures::Stream;
Expand Down Expand Up @@ -56,6 +56,12 @@ pub enum MessageBrokerError {
Serde(#[from] serde_json::Error),
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error + Send + Sync>),
#[error(transparent)]
NatsSubscribe(#[from] async_nats::client::SubscribeError),
#[error(transparent)]
NatsPublish(
#[from] async_nats::error::Error<async_nats::client::PublishErrorKind>,
),
}

#[async_trait]
Expand All @@ -76,95 +82,3 @@ pub type MessageStream = Box<
+ Send
+ Unpin,
>;

#[async_trait]
pub trait MessageBroker: std::fmt::Debug + Send + Sync + 'static {
/// Get the current namespace
fn namespace(&self) -> &Namespace;

/// Setup required infrastructure (queues, exchanges, etc)
async fn setup(&self) -> Result<(), MessageBrokerError>;

/// Check if the broker is connected
fn is_connected(&self) -> bool;

/// Publish a block to the work queue for processing
/// Used by publisher to send blocks to consumers
async fn publish_block(
&self,
id: String,
payload: Vec<u8>,
) -> Result<(), MessageBrokerError>;

/// Receive a stream of blocks from the work queue
/// Used by consumer to process blocks
async fn receive_blocks_stream(
&self,
batch_size: usize,
) -> Result<MessageBlockStream, MessageBrokerError>;

/// Publish an event to a topic for subscribers
/// Used by Stream implementation for pub/sub
async fn publish_event(
&self,
topic: &str,
payload: bytes::Bytes,
) -> Result<(), MessageBrokerError>;

/// Subscribe to events on a topic
/// Used by Stream implementation for pub/sub
async fn subscribe_to_events(
&self,
topic: &str,
) -> Result<MessageStream, MessageBrokerError>;

/// Flush all in-flight messages
async fn flush(&self) -> Result<(), MessageBrokerError>;

/// Check if the broker is healthy
async fn is_healthy(&self) -> bool;

/// Get health info
async fn get_health_info(
&self,
uptime_secs: u64,
) -> Result<serde_json::Value, MessageBrokerError>;
}

#[derive(Debug, Clone, Default)]
pub enum MessageBrokerClient {
#[default]
Nats,
}

impl MessageBrokerClient {
pub async fn start(
&self,
url: &str,
) -> Result<Arc<dyn MessageBroker>, MessageBrokerError> {
match self {
MessageBrokerClient::Nats => {
let opts = crate::NatsOpts::new(url.to_string());
let broker = crate::NatsMessageBroker::new(&opts).await?;
broker.setup().await?;
Ok(broker.arc())
}
}
}

pub async fn start_with_namespace(
&self,
url: &str,
namespace: &str,
) -> Result<Arc<dyn MessageBroker>, MessageBrokerError> {
match self {
MessageBrokerClient::Nats => {
let opts = crate::NatsOpts::new(url.to_string())
.with_namespace(namespace);
let broker = crate::NatsMessageBroker::new(&opts).await?;
broker.setup().await?;
Ok(broker.arc())
}
}
}
}
Loading

0 comments on commit 1b1083d

Please sign in to comment.