Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: subscription updates ordered #2507

Merged
merged 3 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions crates/torii/grpc/src/server/subscriptions/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::RwLock;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
Expand Down Expand Up @@ -77,20 +79,36 @@ impl EntityManager {
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EntityManager>,
simple_broker: Pin<Box<dyn Stream<Item = OptimisticEntity> + Send>>,
entity_sender: UnboundedSender<OptimisticEntity>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Potential memory growth due to unbounded channels in Service struct.

Using an unbounded channel (UnboundedSender<OptimisticEntity>) means that if the producer sends messages faster than the consumer can process them, it could lead to increased memory usage. Consider using a bounded channel to prevent potential memory exhaustion.

Would you like assistance in refactoring this to use a bounded channel with appropriate capacity?

}

impl Service {
pub fn new(subs_manager: Arc<EntityManager>) -> Self {
Self {
subs_manager,
let (entity_sender, entity_receiver) = unbounded_channel();
let service = Self {
simple_broker: Box::pin(SimpleBroker::<OptimisticEntity>::subscribe()),
}
entity_sender,
};

tokio::spawn(Self::publish_updates(subs_manager, entity_receiver));

service
}

async fn publish_updates(
subs: Arc<EntityManager>,
mut entity_receiver: UnboundedReceiver<OptimisticEntity>,
) {
while let Some(entity) = entity_receiver.recv().await {
if let Err(e) = Self::process_entity_update(&subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Processing entity update.");
}
}
}

Comment on lines +101 to +109
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Ohayo, sensei! Error handling in publish_updates may need improvement.

Currently, errors from process_entity_update are logged but not acted upon. If process_entity_update fails repeatedly, it might indicate a critical issue. You might want to implement a retry mechanism or take corrective action when such errors occur.

Would you like assistance in implementing enhanced error handling or a retry strategy?

async fn process_entity_update(
subs: &Arc<EntityManager>,
entity: &OptimisticEntity,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
Expand Down Expand Up @@ -217,16 +235,13 @@ impl Service {
impl Future for Service {
type Output = ();

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
let pin = self.get_mut();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.entity_sender.send(entity) {
error!(target = LOG_TARGET, error = %e, "Sending entity update to processor.");
}
Comment on lines +238 to +244
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Ohayo, sensei! Potential issue if entity_sender is closed in poll method.

If entity_sender.send(entity) fails, it could mean that the receiver has been dropped, possibly causing this loop to continue indefinitely without effective processing. Consider handling this case by breaking out of the loop or shutting down the service gracefully.

Here's a suggested change to handle the closed sender:

 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
     let this = self.get_mut();

     while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
-        if let Err(e) = this.entity_sender.send(entity) {
+        if this.entity_sender.send(entity).is_err() {
             error!(target = LOG_TARGET, "Sending entity update to processor failed. Shutting down service.");
+            return Poll::Ready(());
         }
     }

     Poll::Pending
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.entity_sender.send(entity) {
error!(target = LOG_TARGET, error = %e, "Sending entity update to processor.");
}
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
if this.entity_sender.send(entity).is_err() {
error!(target = LOG_TARGET, "Sending entity update to processor failed. Shutting down service.");
return Poll::Ready(());
}
}
Poll::Pending
}

}

Poll::Pending
Expand Down
36 changes: 26 additions & 10 deletions crates/torii/grpc/src/server/subscriptions/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::RwLock;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
Expand Down Expand Up @@ -62,16 +64,33 @@ impl EventManager {
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EventManager>,
simple_broker: Pin<Box<dyn Stream<Item = Event> + Send>>,
event_sender: UnboundedSender<Event>,
}

impl Service {
pub fn new(subs_manager: Arc<EventManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()) }
let (event_sender, event_receiver) = unbounded_channel();
let service =
Self { simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()), event_sender };

tokio::spawn(Self::publish_updates(subs_manager, event_receiver));

service
}

async fn publish_updates(
subs: Arc<EventManager>,
mut event_receiver: UnboundedReceiver<Event>,
) {
while let Some(event) = event_receiver.recv().await {
if let Err(e) = Self::process_event(&subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Processing event update.");
}
}
}

async fn publish_updates(subs: Arc<EventManager>, event: &Event) -> Result<(), Error> {
async fn process_event(subs: &Arc<EventManager>, event: &Event) -> Result<(), Error> {
let mut closed_stream = Vec::new();
let keys = event
.keys
Expand Down Expand Up @@ -151,12 +170,9 @@ impl Future for Service {
let pin = self.get_mut();

while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Publishing events update.");
}
});
if let Err(e) = pin.event_sender.send(event) {
error!(target = LOG_TARGET, error = %e, "Sending event to processor.");
}
}

Poll::Pending
Expand Down
41 changes: 27 additions & 14 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::Stream;
use futures_util::StreamExt;
use rand::Rng;
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::sync::mpsc::{channel, unbounded_channel, Receiver, UnboundedReceiver, UnboundedSender};
use tokio::sync::RwLock;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
Expand Down Expand Up @@ -71,20 +71,36 @@ impl EventMessageManager {
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EventMessageManager>,
simple_broker: Pin<Box<dyn Stream<Item = OptimisticEventMessage> + Send>>,
event_sender: UnboundedSender<OptimisticEventMessage>,
}

impl Service {
pub fn new(subs_manager: Arc<EventMessageManager>) -> Self {
Self {
subs_manager,
let (event_sender, event_receiver) = unbounded_channel();
let service = Self {
simple_broker: Box::pin(SimpleBroker::<OptimisticEventMessage>::subscribe()),
}
event_sender,
};

tokio::spawn(Self::publish_updates(subs_manager, event_receiver));

service
}

async fn publish_updates(
subs: Arc<EventMessageManager>,
mut event_receiver: UnboundedReceiver<OptimisticEventMessage>,
) {
while let Some(event) = event_receiver.recv().await {
if let Err(e) = Self::process_event_update(&subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Processing event update.");
}
}
}

async fn process_event_update(
subs: &Arc<EventMessageManager>,
entity: &OptimisticEventMessage,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
Expand Down Expand Up @@ -195,16 +211,13 @@ impl Service {
impl Future for Service {
type Output = ();

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
let pin = self.get_mut();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
while let Poll::Ready(Some(event)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.event_sender.send(event) {
error!(target = LOG_TARGET, error = %e, "Sending event update to processor.");
}
Comment on lines +214 to +220
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Be cautious with unbounded channels to avoid memory issues

The use of UnboundedSender and UnboundedReceiver may lead to unbounded memory growth if events are produced faster than they are consumed. Consider using a bounded channel (tokio::sync::mpsc::channel) with an appropriate capacity to implement backpressure and prevent potential memory exhaustion.

}

Poll::Pending
Expand Down
42 changes: 30 additions & 12 deletions crates/torii/grpc/src/server/subscriptions/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use futures::{Stream, StreamExt};
use rand::Rng;
use sqlx::{Pool, Sqlite};
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::RwLock;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
Expand Down Expand Up @@ -81,17 +83,36 @@ impl IndexerManager {
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<IndexerManager>,
simple_broker: Pin<Box<dyn Stream<Item = ContractUpdated> + Send>>,
update_sender: UnboundedSender<ContractUpdated>,
}

impl Service {
pub fn new(subs_manager: Arc<IndexerManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<ContractUpdated>::subscribe()) }
let (update_sender, update_receiver) = unbounded_channel();
let service = Self {
simple_broker: Box::pin(SimpleBroker::<ContractUpdated>::subscribe()),
update_sender,
};

tokio::spawn(Self::publish_updates(subs_manager, update_receiver));

service
}

async fn publish_updates(
subs: Arc<IndexerManager>,
mut update_receiver: UnboundedReceiver<ContractUpdated>,
) {
while let Some(update) = update_receiver.recv().await {
if let Err(e) = Self::process_update(&subs, &update).await {
error!(target = LOG_TARGET, error = %e, "Processing indexer update.");
}
}
}
Comment on lines 103 to +112
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Dojo-level implementation, sensei!

The publish_updates method is a great addition:

  1. It effectively separates the concern of update processing.
  2. The use of while let for consuming from the channel is idiomatic and efficient.
  3. Error handling is in place, which is crucial for long-running tasks.

A small suggestion to enhance error handling:

Consider adding more context to the error log. This could help with debugging in the future. For example:

error!(
    target = LOG_TARGET,
    error = %e,
    contract_address = %update.contract_address,
    "Failed to process indexer update"
);

This addition provides more information about which update caused the error.

Overall, excellent work on implementing this asynchronous update processing!


async fn process_update(
subs: &Arc<IndexerManager>,
update: &ContractUpdated,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
Expand Down Expand Up @@ -127,16 +148,13 @@ impl Service {
impl Future for Service {
type Output = ();

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
let pin = self.get_mut();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Publishing indexer update.");
}
});
while let Poll::Ready(Some(update)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.update_sender.send(update) {
error!(target = LOG_TARGET, error = %e, "Sending indexer update to processor.");
}
Comment on lines +151 to +157
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo, sensei! Excellent refinement of the poll method!

The changes here are well thought out:

  1. Simplifying std::pin::Pin<&mut Self> to Pin<&mut Self> improves readability.
  2. Sending updates through the update_sender aligns perfectly with the new asynchronous processing design.
  3. Error handling for the send operation is a good practice.

A small suggestion to improve error handling:

Consider breaking the loop when a send error occurs. This could prevent unnecessary CPU usage if the receiver has been dropped. For example:

while let Poll::Ready(Some(update)) = this.simple_broker.poll_next_unpin(cx) {
    if let Err(e) = this.update_sender.send(update) {
        error!(target = LOG_TARGET, error = %e, "Sending indexer update to processor failed. Stopping poll.");
        return Poll::Ready(());
    }
}

This change ensures that the service stops polling if it can't send updates anymore.

Overall, great job on refining this method to work with the new asynchronous architecture!

}

Poll::Pending
Expand Down
Loading