-
Notifications
You must be signed in to change notification settings - Fork 191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: subscription updates ordered #2507
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -9,7 +9,9 @@ use futures::Stream; | |||||||||||||||||||||||||||||||||||||||||||||||||||||
use futures_util::StreamExt; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
use rand::Rng; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
use starknet::core::types::Felt; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
use tokio::sync::mpsc::{channel, Receiver, Sender}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
use tokio::sync::mpsc::{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
use tokio::sync::RwLock; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
use torii_core::error::{Error, ParseError}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
use torii_core::simple_broker::SimpleBroker; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -77,20 +79,36 @@ impl EntityManager { | |||||||||||||||||||||||||||||||||||||||||||||||||||||
#[must_use = "Service does nothing unless polled"] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
#[allow(missing_debug_implementations)] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub struct Service { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
subs_manager: Arc<EntityManager>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
simple_broker: Pin<Box<dyn Stream<Item = OptimisticEntity> + Send>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
entity_sender: UnboundedSender<OptimisticEntity>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
impl Service { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
pub fn new(subs_manager: Arc<EntityManager>) -> Self { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Self { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
subs_manager, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
let (entity_sender, entity_receiver) = unbounded_channel(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
let service = Self { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
simple_broker: Box::pin(SimpleBroker::<OptimisticEntity>::subscribe()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
entity_sender, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
tokio::spawn(Self::publish_updates(subs_manager, entity_receiver)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
service | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
async fn publish_updates( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
subs: Arc<EntityManager>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
mut entity_receiver: UnboundedReceiver<OptimisticEntity>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
while let Some(entity) = entity_receiver.recv().await { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Err(e) = Self::process_entity_update(&subs, &entity).await { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
error!(target = LOG_TARGET, error = %e, "Processing entity update."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+101
to
+109
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Ohayo, sensei! Error handling in Currently, errors from Would you like assistance in implementing enhanced error handling or a retry strategy? |
||||||||||||||||||||||||||||||||||||||||||||||||||||||
async fn process_entity_update( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
subs: &Arc<EntityManager>, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
entity: &OptimisticEntity, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
) -> Result<(), Error> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
let mut closed_stream = Vec::new(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -217,16 +235,13 @@ impl Service { | |||||||||||||||||||||||||||||||||||||||||||||||||||||
impl Future for Service { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
type Output = (); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
let pin = self.get_mut(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
let this = self.get_mut(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
let subs = Arc::clone(&pin.subs_manager); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
tokio::spawn(async move { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Err(e) = Service::publish_updates(subs, &entity).await { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
error!(target = LOG_TARGET, error = %e, "Publishing entity update."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
if let Err(e) = this.entity_sender.send(entity) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
error!(target = LOG_TARGET, error = %e, "Sending entity update to processor."); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+238
to
+244
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Ohayo, sensei! Potential issue if If Here's a suggested change to handle the closed sender: fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
- if let Err(e) = this.entity_sender.send(entity) {
+ if this.entity_sender.send(entity).is_err() {
error!(target = LOG_TARGET, "Sending entity update to processor failed. Shutting down service.");
+ return Poll::Ready(());
}
}
Poll::Pending
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
Poll::Pending | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,7 @@ use futures::Stream; | |
use futures_util::StreamExt; | ||
use rand::Rng; | ||
use starknet::core::types::Felt; | ||
use tokio::sync::mpsc::{channel, Receiver}; | ||
use tokio::sync::mpsc::{channel, unbounded_channel, Receiver, UnboundedReceiver, UnboundedSender}; | ||
use tokio::sync::RwLock; | ||
use torii_core::error::{Error, ParseError}; | ||
use torii_core::simple_broker::SimpleBroker; | ||
|
@@ -71,20 +71,36 @@ impl EventMessageManager { | |
#[must_use = "Service does nothing unless polled"] | ||
#[allow(missing_debug_implementations)] | ||
pub struct Service { | ||
subs_manager: Arc<EventMessageManager>, | ||
simple_broker: Pin<Box<dyn Stream<Item = OptimisticEventMessage> + Send>>, | ||
event_sender: UnboundedSender<OptimisticEventMessage>, | ||
} | ||
|
||
impl Service { | ||
pub fn new(subs_manager: Arc<EventMessageManager>) -> Self { | ||
Self { | ||
subs_manager, | ||
let (event_sender, event_receiver) = unbounded_channel(); | ||
let service = Self { | ||
simple_broker: Box::pin(SimpleBroker::<OptimisticEventMessage>::subscribe()), | ||
} | ||
event_sender, | ||
}; | ||
|
||
tokio::spawn(Self::publish_updates(subs_manager, event_receiver)); | ||
|
||
service | ||
} | ||
|
||
async fn publish_updates( | ||
subs: Arc<EventMessageManager>, | ||
mut event_receiver: UnboundedReceiver<OptimisticEventMessage>, | ||
) { | ||
while let Some(event) = event_receiver.recv().await { | ||
if let Err(e) = Self::process_event_update(&subs, &event).await { | ||
error!(target = LOG_TARGET, error = %e, "Processing event update."); | ||
} | ||
} | ||
} | ||
|
||
async fn process_event_update( | ||
subs: &Arc<EventMessageManager>, | ||
entity: &OptimisticEventMessage, | ||
) -> Result<(), Error> { | ||
let mut closed_stream = Vec::new(); | ||
|
@@ -195,16 +211,13 @@ impl Service { | |
impl Future for Service { | ||
type Output = (); | ||
|
||
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { | ||
let pin = self.get_mut(); | ||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
let this = self.get_mut(); | ||
|
||
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) { | ||
let subs = Arc::clone(&pin.subs_manager); | ||
tokio::spawn(async move { | ||
if let Err(e) = Service::publish_updates(subs, &entity).await { | ||
error!(target = LOG_TARGET, error = %e, "Publishing entity update."); | ||
} | ||
}); | ||
while let Poll::Ready(Some(event)) = this.simple_broker.poll_next_unpin(cx) { | ||
if let Err(e) = this.event_sender.send(event) { | ||
error!(target = LOG_TARGET, error = %e, "Sending event update to processor."); | ||
} | ||
Comment on lines
+214
to
+220
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohayo, sensei! Be cautious with unbounded channels to avoid memory issues The use of |
||
} | ||
|
||
Poll::Pending | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,9 @@ use futures::{Stream, StreamExt}; | |
use rand::Rng; | ||
use sqlx::{Pool, Sqlite}; | ||
use starknet::core::types::Felt; | ||
use tokio::sync::mpsc::{channel, Receiver, Sender}; | ||
use tokio::sync::mpsc::{ | ||
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender, | ||
}; | ||
use tokio::sync::RwLock; | ||
use torii_core::error::{Error, ParseError}; | ||
use torii_core::simple_broker::SimpleBroker; | ||
|
@@ -81,17 +83,36 @@ impl IndexerManager { | |
#[must_use = "Service does nothing unless polled"] | ||
#[allow(missing_debug_implementations)] | ||
pub struct Service { | ||
subs_manager: Arc<IndexerManager>, | ||
simple_broker: Pin<Box<dyn Stream<Item = ContractUpdated> + Send>>, | ||
update_sender: UnboundedSender<ContractUpdated>, | ||
} | ||
|
||
impl Service { | ||
pub fn new(subs_manager: Arc<IndexerManager>) -> Self { | ||
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<ContractUpdated>::subscribe()) } | ||
let (update_sender, update_receiver) = unbounded_channel(); | ||
let service = Self { | ||
simple_broker: Box::pin(SimpleBroker::<ContractUpdated>::subscribe()), | ||
update_sender, | ||
}; | ||
|
||
tokio::spawn(Self::publish_updates(subs_manager, update_receiver)); | ||
|
||
service | ||
} | ||
|
||
async fn publish_updates( | ||
subs: Arc<IndexerManager>, | ||
mut update_receiver: UnboundedReceiver<ContractUpdated>, | ||
) { | ||
while let Some(update) = update_receiver.recv().await { | ||
if let Err(e) = Self::process_update(&subs, &update).await { | ||
error!(target = LOG_TARGET, error = %e, "Processing indexer update."); | ||
} | ||
} | ||
} | ||
Comment on lines
103
to
+112
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Dojo-level implementation, sensei! The
A small suggestion to enhance error handling: Consider adding more context to the error log. This could help with debugging in the future. For example: error!(
target = LOG_TARGET,
error = %e,
contract_address = %update.contract_address,
"Failed to process indexer update"
); This addition provides more information about which update caused the error. Overall, excellent work on implementing this asynchronous update processing! |
||
|
||
async fn process_update( | ||
subs: &Arc<IndexerManager>, | ||
update: &ContractUpdated, | ||
) -> Result<(), Error> { | ||
let mut closed_stream = Vec::new(); | ||
|
@@ -127,16 +148,13 @@ impl Service { | |
impl Future for Service { | ||
type Output = (); | ||
|
||
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> { | ||
let pin = self.get_mut(); | ||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
let this = self.get_mut(); | ||
|
||
while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) { | ||
let subs = Arc::clone(&pin.subs_manager); | ||
tokio::spawn(async move { | ||
if let Err(e) = Service::publish_updates(subs, &event).await { | ||
error!(target = LOG_TARGET, error = %e, "Publishing indexer update."); | ||
} | ||
}); | ||
while let Poll::Ready(Some(update)) = this.simple_broker.poll_next_unpin(cx) { | ||
if let Err(e) = this.update_sender.send(update) { | ||
error!(target = LOG_TARGET, error = %e, "Sending indexer update to processor."); | ||
} | ||
Comment on lines
+151
to
+157
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Ohayo, sensei! Excellent refinement of the The changes here are well thought out:
A small suggestion to improve error handling: Consider breaking the loop when a send error occurs. This could prevent unnecessary CPU usage if the receiver has been dropped. For example: while let Poll::Ready(Some(update)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.update_sender.send(update) {
error!(target = LOG_TARGET, error = %e, "Sending indexer update to processor failed. Stopping poll.");
return Poll::Ready(());
}
} This change ensures that the service stops polling if it can't send updates anymore. Overall, great job on refining this method to work with the new asynchronous architecture! |
||
} | ||
|
||
Poll::Pending | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo, sensei! Potential memory growth due to unbounded channels in
Service
struct.Using an unbounded channel (
UnboundedSender<OptimisticEntity>
) means that if the producer sends messages faster than the consumer can process them, it could lead to increased memory usage. Consider using a bounded channel to prevent potential memory exhaustion.Would you like assistance in refactoring this to use a bounded channel with appropriate capacity?