Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: subscription updates ordered #2507

Merged
merged 3 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions crates/torii/grpc/src/server/subscriptions/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
use futures_util::StreamExt;
use rand::Rng;
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::RwLock;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
Expand Down Expand Up @@ -74,23 +76,37 @@
}
}

#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EntityManager>,
simple_broker: Pin<Box<dyn Stream<Item = OptimisticEntity> + Send>>,
entity_sender: UnboundedSender<OptimisticEntity>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Potential memory growth due to unbounded channels in Service struct.

Using an unbounded channel (UnboundedSender<OptimisticEntity>) means that if the producer sends messages faster than the consumer can process them, it could lead to increased memory usage. Consider using a bounded channel to prevent potential memory exhaustion.

Would you like assistance in refactoring this to use a bounded channel with appropriate capacity?

}

impl Service {
pub fn new(subs_manager: Arc<EntityManager>) -> Self {
Self {
subs_manager,
let (entity_sender, entity_receiver) = unbounded_channel();
let service = Self {
simple_broker: Box::pin(SimpleBroker::<OptimisticEntity>::subscribe()),
}
entity_sender,
};

tokio::spawn(Self::publish_updates(subs_manager, entity_receiver));

service
}

async fn publish_updates(
subs: Arc<EntityManager>,
mut entity_receiver: UnboundedReceiver<OptimisticEntity>,
) {
while let Some(entity) = entity_receiver.recv().await {
if let Err(e) = Self::process_entity_update(&subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Processing entity update.");
}

Check warning on line 104 in crates/torii/grpc/src/server/subscriptions/entity.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/entity.rs#L102-L104

Added lines #L102 - L104 were not covered by tests
}
}

Check warning on line 106 in crates/torii/grpc/src/server/subscriptions/entity.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/entity.rs#L106

Added line #L106 was not covered by tests

Comment on lines +101 to +109
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Ohayo, sensei! Error handling in publish_updates may need improvement.

Currently, errors from process_entity_update are logged but not acted upon. If process_entity_update fails repeatedly, it might indicate a critical issue. You might want to implement a retry mechanism or take corrective action when such errors occur.

Would you like assistance in implementing enhanced error handling or a retry strategy?

async fn process_entity_update(
subs: &Arc<EntityManager>,

Check warning on line 109 in crates/torii/grpc/src/server/subscriptions/entity.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/entity.rs#L108-L109

Added lines #L108 - L109 were not covered by tests
entity: &OptimisticEntity,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
Expand Down Expand Up @@ -217,16 +233,13 @@
impl Future for Service {
type Output = ();

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
let pin = self.get_mut();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.entity_sender.send(entity) {
error!(target = LOG_TARGET, error = %e, "Sending entity update to processor.");
}

Check warning on line 242 in crates/torii/grpc/src/server/subscriptions/entity.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/entity.rs#L240-L242

Added lines #L240 - L242 were not covered by tests
Comment on lines +238 to +244
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Ohayo, sensei! Potential issue if entity_sender is closed in poll method.

If entity_sender.send(entity) fails, it could mean that the receiver has been dropped, possibly causing this loop to continue indefinitely without effective processing. Consider handling this case by breaking out of the loop or shutting down the service gracefully.

Here's a suggested change to handle the closed sender:

 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
     let this = self.get_mut();

     while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
-        if let Err(e) = this.entity_sender.send(entity) {
+        if this.entity_sender.send(entity).is_err() {
             error!(target = LOG_TARGET, "Sending entity update to processor failed. Shutting down service.");
+            return Poll::Ready(());
         }
     }

     Poll::Pending
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.entity_sender.send(entity) {
error!(target = LOG_TARGET, error = %e, "Sending entity update to processor.");
}
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
if this.entity_sender.send(entity).is_err() {
error!(target = LOG_TARGET, "Sending entity update to processor failed. Shutting down service.");
return Poll::Ready(());
}
}
Poll::Pending
}

}

Poll::Pending
Expand Down
36 changes: 26 additions & 10 deletions crates/torii/grpc/src/server/subscriptions/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
use futures_util::StreamExt;
use rand::Rng;
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::RwLock;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
Expand Down Expand Up @@ -62,16 +64,33 @@
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EventManager>,
simple_broker: Pin<Box<dyn Stream<Item = Event> + Send>>,
event_sender: UnboundedSender<Event>,
}

impl Service {
pub fn new(subs_manager: Arc<EventManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()) }
let (event_sender, event_receiver) = unbounded_channel();
let service =
Self { simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()), event_sender };

tokio::spawn(Self::publish_updates(subs_manager, event_receiver));

service
}

async fn publish_updates(
subs: Arc<EventManager>,
mut event_receiver: UnboundedReceiver<Event>,
) {
while let Some(event) = event_receiver.recv().await {
if let Err(e) = Self::process_event(&subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Processing event update.");
}

Check warning on line 89 in crates/torii/grpc/src/server/subscriptions/event.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event.rs#L87-L89

Added lines #L87 - L89 were not covered by tests
}
}

async fn publish_updates(subs: Arc<EventManager>, event: &Event) -> Result<(), Error> {
async fn process_event(subs: &Arc<EventManager>, event: &Event) -> Result<(), Error> {

Check warning on line 93 in crates/torii/grpc/src/server/subscriptions/event.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event.rs#L93

Added line #L93 was not covered by tests
let mut closed_stream = Vec::new();
let keys = event
.keys
Expand Down Expand Up @@ -151,12 +170,9 @@
let pin = self.get_mut();

while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Publishing events update.");
}
});
if let Err(e) = pin.event_sender.send(event) {
error!(target = LOG_TARGET, error = %e, "Sending event to processor.");
}

Check warning on line 175 in crates/torii/grpc/src/server/subscriptions/event.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event.rs#L173-L175

Added lines #L173 - L175 were not covered by tests
}

Poll::Pending
Expand Down
41 changes: 27 additions & 14 deletions crates/torii/grpc/src/server/subscriptions/event_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use futures_util::StreamExt;
use rand::Rng;
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::sync::mpsc::{channel, unbounded_channel, Receiver, UnboundedReceiver, UnboundedSender};
use tokio::sync::RwLock;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
Expand Down Expand Up @@ -71,20 +71,36 @@
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<EventMessageManager>,
simple_broker: Pin<Box<dyn Stream<Item = OptimisticEventMessage> + Send>>,
event_sender: UnboundedSender<OptimisticEventMessage>,
}

impl Service {
pub fn new(subs_manager: Arc<EventMessageManager>) -> Self {
Self {
subs_manager,
let (event_sender, event_receiver) = unbounded_channel();
let service = Self {
simple_broker: Box::pin(SimpleBroker::<OptimisticEventMessage>::subscribe()),
}
event_sender,
};

tokio::spawn(Self::publish_updates(subs_manager, event_receiver));

service
}

async fn publish_updates(
subs: Arc<EventMessageManager>,
mut event_receiver: UnboundedReceiver<OptimisticEventMessage>,
) {
while let Some(event) = event_receiver.recv().await {
if let Err(e) = Self::process_event_update(&subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Processing event update.");
}

Check warning on line 98 in crates/torii/grpc/src/server/subscriptions/event_message.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event_message.rs#L96-L98

Added lines #L96 - L98 were not covered by tests
}
}

Check warning on line 100 in crates/torii/grpc/src/server/subscriptions/event_message.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event_message.rs#L100

Added line #L100 was not covered by tests

async fn process_event_update(
subs: &Arc<EventMessageManager>,

Check warning on line 103 in crates/torii/grpc/src/server/subscriptions/event_message.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event_message.rs#L102-L103

Added lines #L102 - L103 were not covered by tests
entity: &OptimisticEventMessage,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
Expand Down Expand Up @@ -195,16 +211,13 @@
impl Future for Service {
type Output = ();

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
let pin = self.get_mut();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
while let Poll::Ready(Some(event)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.event_sender.send(event) {
error!(target = LOG_TARGET, error = %e, "Sending event update to processor.");
}

Check warning on line 220 in crates/torii/grpc/src/server/subscriptions/event_message.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/event_message.rs#L218-L220

Added lines #L218 - L220 were not covered by tests
Comment on lines +214 to +220
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Be cautious with unbounded channels to avoid memory issues

The use of UnboundedSender and UnboundedReceiver may lead to unbounded memory growth if events are produced faster than they are consumed. Consider using a bounded channel (tokio::sync::mpsc::channel) with an appropriate capacity to implement backpressure and prevent potential memory exhaustion.

}

Poll::Pending
Expand Down
42 changes: 30 additions & 12 deletions crates/torii/grpc/src/server/subscriptions/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
use rand::Rng;
use sqlx::{Pool, Sqlite};
use starknet::core::types::Felt;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::mpsc::{
channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::RwLock;
use torii_core::error::{Error, ParseError};
use torii_core::simple_broker::SimpleBroker;
Expand Down Expand Up @@ -81,17 +83,36 @@
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Service {
subs_manager: Arc<IndexerManager>,
simple_broker: Pin<Box<dyn Stream<Item = ContractUpdated> + Send>>,
update_sender: UnboundedSender<ContractUpdated>,
}

impl Service {
pub fn new(subs_manager: Arc<IndexerManager>) -> Self {
Self { subs_manager, simple_broker: Box::pin(SimpleBroker::<ContractUpdated>::subscribe()) }
let (update_sender, update_receiver) = unbounded_channel();
let service = Self {
simple_broker: Box::pin(SimpleBroker::<ContractUpdated>::subscribe()),
update_sender,
};

tokio::spawn(Self::publish_updates(subs_manager, update_receiver));

service
}

async fn publish_updates(
subs: Arc<IndexerManager>,
mut update_receiver: UnboundedReceiver<ContractUpdated>,
) {
while let Some(update) = update_receiver.recv().await {
if let Err(e) = Self::process_update(&subs, &update).await {
error!(target = LOG_TARGET, error = %e, "Processing indexer update.");
}

Check warning on line 110 in crates/torii/grpc/src/server/subscriptions/indexer.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/indexer.rs#L108-L110

Added lines #L108 - L110 were not covered by tests
}
}

Check warning on line 112 in crates/torii/grpc/src/server/subscriptions/indexer.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/indexer.rs#L112

Added line #L112 was not covered by tests
Comment on lines 103 to +112
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Dojo-level implementation, sensei!

The publish_updates method is a great addition:

  1. It effectively separates the concern of update processing.
  2. The use of while let for consuming from the channel is idiomatic and efficient.
  3. Error handling is in place, which is crucial for long-running tasks.

A small suggestion to enhance error handling:

Consider adding more context to the error log. This could help with debugging in the future. For example:

error!(
    target = LOG_TARGET,
    error = %e,
    contract_address = %update.contract_address,
    "Failed to process indexer update"
);

This addition provides more information about which update caused the error.

Overall, excellent work on implementing this asynchronous update processing!


async fn process_update(
subs: &Arc<IndexerManager>,

Check warning on line 115 in crates/torii/grpc/src/server/subscriptions/indexer.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/indexer.rs#L114-L115

Added lines #L114 - L115 were not covered by tests
update: &ContractUpdated,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
Expand Down Expand Up @@ -127,16 +148,13 @@
impl Future for Service {
type Output = ();

fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
let pin = self.get_mut();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Publishing indexer update.");
}
});
while let Poll::Ready(Some(update)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.update_sender.send(update) {
error!(target = LOG_TARGET, error = %e, "Sending indexer update to processor.");
}

Check warning on line 157 in crates/torii/grpc/src/server/subscriptions/indexer.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/grpc/src/server/subscriptions/indexer.rs#L155-L157

Added lines #L155 - L157 were not covered by tests
Comment on lines +151 to +157
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo, sensei! Excellent refinement of the poll method!

The changes here are well thought out:

  1. Simplifying std::pin::Pin<&mut Self> to Pin<&mut Self> improves readability.
  2. Sending updates through the update_sender aligns perfectly with the new asynchronous processing design.
  3. Error handling for the send operation is a good practice.

A small suggestion to improve error handling:

Consider breaking the loop when a send error occurs. This could prevent unnecessary CPU usage if the receiver has been dropped. For example:

while let Poll::Ready(Some(update)) = this.simple_broker.poll_next_unpin(cx) {
    if let Err(e) = this.update_sender.send(update) {
        error!(target = LOG_TARGET, error = %e, "Sending indexer update to processor failed. Stopping poll.");
        return Poll::Ready(());
    }
}

This change ensures that the service stops polling if it can't send updates anymore.

Overall, great job on refining this method to work with the new asynchronous architecture!

}

Poll::Pending
Expand Down
Loading