Skip to content

Commit

Permalink
feat(torii): token balances subscription (#2831)
Browse files Browse the repository at this point in the history
* feat(torii): token balances subscription

* fmt

* fmt

* add to client sdk

* fmt
  • Loading branch information
Larkooo authored Dec 23, 2024
1 parent 548783b commit 7b37e22
Show file tree
Hide file tree
Showing 7 changed files with 405 additions and 8 deletions.
37 changes: 36 additions & 1 deletion crates/torii/client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use starknet::core::types::Felt;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use tokio::sync::RwLock as AsyncRwLock;
use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming};
use torii_grpc::client::{
EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming, TokenBalanceStreaming,
};
use torii_grpc::proto::world::{
RetrieveEntitiesResponse, RetrieveEventsResponse, RetrieveTokenBalancesResponse,
RetrieveTokensResponse,
Expand Down Expand Up @@ -209,4 +211,37 @@ impl Client {
.await?;
Ok(stream)
}

/// Subscribes to token balances updates.
/// If no contract addresses are provided, it will subscribe to updates for all contract
/// addresses. If no account addresses are provided, it will subscribe to updates for all
/// account addresses.
pub async fn on_token_balance_updated(
&self,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) -> Result<TokenBalanceStreaming, Error> {
let mut grpc_client = self.inner.write().await;
let stream =
grpc_client.subscribe_token_balances(contract_addresses, account_addresses).await?;
Ok(stream)
}

/// Update the token balances subscription
pub async fn update_token_balance_subscription(
&self,
subscription_id: u64,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) -> Result<(), Error> {
let mut grpc_client = self.inner.write().await;
grpc_client
.update_token_balances_subscription(
subscription_id,
contract_addresses,
account_addresses,
)
.await?;
Ok(())
}
}
12 changes: 8 additions & 4 deletions crates/torii/core/src/executor/erc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use tracing::{debug, trace, warn};
use super::{ApplyBalanceDiffQuery, Executor};
use crate::constants::{IPFS_CLIENT_MAX_RETRY, SQL_FELT_DELIMITER, TOKEN_BALANCE_TABLE};
use crate::executor::LOG_TARGET;
use crate::simple_broker::SimpleBroker;
use crate::sql::utils::{felt_to_sql_string, sql_string_to_u256, u256_to_sql_string, I256};
use crate::types::ContractType;
use crate::types::{ContractType, TokenBalance};
use crate::utils::fetch_content_from_ipfs;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -159,18 +160,21 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
}

// write the new balance to the database
sqlx::query(&format!(
let token_balance: TokenBalance = sqlx::query_as(&format!(
"INSERT OR REPLACE INTO {TOKEN_BALANCE_TABLE} (id, contract_address, account_address, \
token_id, balance) VALUES (?, ?, ?, ?, ?)",
token_id, balance) VALUES (?, ?, ?, ?, ?) RETURNING *",
))
.bind(id)
.bind(contract_address)
.bind(account_address)
.bind(token_id)
.bind(u256_to_sql_string(&balance))
.execute(&mut **tx)
.fetch_one(&mut **tx)
.await?;

debug!(target: LOG_TARGET, token_balance = ?token_balance, "Applied balance diff");
SimpleBroker::publish(token_balance);

Ok(())
}

Expand Down
24 changes: 24 additions & 0 deletions crates/torii/grpc/proto/world.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ service World {
// Update entity subscription
rpc UpdateEventMessagesSubscription (UpdateEventMessagesSubscriptionRequest) returns (google.protobuf.Empty);

// Subscribe to token balance updates.
rpc SubscribeTokenBalances (RetrieveTokenBalancesRequest) returns (stream SubscribeTokenBalancesResponse);

// Update token balance subscription
rpc UpdateTokenBalancesSubscription (UpdateTokenBalancesSubscriptionRequest) returns (google.protobuf.Empty);

// Retrieve entities
rpc RetrieveEventMessages (RetrieveEventMessagesRequest) returns (RetrieveEntitiesResponse);

Expand All @@ -50,6 +56,24 @@ service World {
rpc RetrieveTokenBalances (RetrieveTokenBalancesRequest) returns (RetrieveTokenBalancesResponse);
}

// A request to update a token balance subscription
message UpdateTokenBalancesSubscriptionRequest {
// The subscription ID
uint64 subscription_id = 1;
// The list of contract addresses to subscribe to
repeated bytes contract_addresses = 2;
// The list of account addresses to subscribe to
repeated bytes account_addresses = 3;
}

// A response containing token balances
message SubscribeTokenBalancesResponse {
// The subscription ID
uint64 subscription_id = 1;
// The token balance
types.TokenBalance balance = 2;
}

// A request to retrieve tokens
message RetrieveTokensRequest {
// The list of contract addresses to retrieve tokens for
Expand Down
77 changes: 75 additions & 2 deletions crates/torii/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ use crate::proto::world::{
SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest,
SubscribeEventsRequest, SubscribeEventsResponse, SubscribeIndexerRequest,
SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse,
UpdateEntitiesSubscriptionRequest, UpdateEventMessagesSubscriptionRequest,
SubscribeTokenBalancesResponse, UpdateEntitiesSubscriptionRequest,
UpdateEventMessagesSubscriptionRequest, UpdateTokenBalancesSubscriptionRequest,
WorldMetadataRequest,
};
use crate::types::schema::{Entity, SchemaError};
use crate::types::{EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query};
use crate::types::{
EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query, TokenBalance,
};

#[derive(Debug, thiserror::Error)]
pub enum Error {
Expand Down Expand Up @@ -295,6 +298,76 @@ impl WorldClient {
None => empty_state_update(),
}))))
}

/// Subscribe to token balances.
pub async fn subscribe_token_balances(
&mut self,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) -> Result<TokenBalanceStreaming, Error> {
let request = RetrieveTokenBalancesRequest {
contract_addresses: contract_addresses
.into_iter()
.map(|c| c.to_bytes_be().to_vec())
.collect(),
account_addresses: account_addresses
.into_iter()
.map(|a| a.to_bytes_be().to_vec())
.collect(),
};
let stream = self
.inner
.subscribe_token_balances(request)
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())?;
Ok(TokenBalanceStreaming(stream.map_ok(Box::new(|res| {
(res.subscription_id, res.balance.unwrap().try_into().expect("must able to serialize"))
}))))
}

/// Update a token balances subscription.
pub async fn update_token_balances_subscription(
&mut self,
subscription_id: u64,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) -> Result<(), Error> {
let request = UpdateTokenBalancesSubscriptionRequest {
subscription_id,
contract_addresses: contract_addresses
.into_iter()
.map(|c| c.to_bytes_be().to_vec())
.collect(),
account_addresses: account_addresses
.into_iter()
.map(|a| a.to_bytes_be().to_vec())
.collect(),
};
self.inner
.update_token_balances_subscription(request)
.await
.map_err(Error::Grpc)
.map(|res| res.into_inner())
}
}

type TokenBalanceMappedStream = MapOk<
tonic::Streaming<SubscribeTokenBalancesResponse>,
Box<dyn Fn(SubscribeTokenBalancesResponse) -> (SubscriptionId, TokenBalance) + Send>,
>;

#[derive(Debug)]
pub struct TokenBalanceStreaming(TokenBalanceMappedStream);

impl Stream for TokenBalanceStreaming {
type Item = <TokenBalanceMappedStream as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
}

type ModelDiffMappedStream = MapOk<
Expand Down
69 changes: 68 additions & 1 deletion crates/torii/grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use subscriptions::event::EventManager;
use subscriptions::indexer::IndexerManager;
use subscriptions::token_balance::TokenBalanceManager;
use tokio::net::TcpListener;
use tokio::sync::mpsc::{channel, Receiver};
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
Expand All @@ -59,7 +60,8 @@ use crate::proto::world::{
RetrieveTokenBalancesResponse, RetrieveTokensRequest, RetrieveTokensResponse,
SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest,
SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse,
UpdateEventMessagesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse,
SubscribeTokenBalancesResponse, UpdateEventMessagesSubscriptionRequest,
UpdateTokenBalancesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse,
};
use crate::proto::{self};
use crate::types::schema::SchemaError;
Expand Down Expand Up @@ -123,6 +125,7 @@ pub struct DojoWorld {
event_manager: Arc<EventManager>,
state_diff_manager: Arc<StateDiffManager>,
indexer_manager: Arc<IndexerManager>,
token_balance_manager: Arc<TokenBalanceManager>,
}

impl DojoWorld {
Expand All @@ -138,6 +141,7 @@ impl DojoWorld {
let event_manager = Arc::new(EventManager::default());
let state_diff_manager = Arc::new(StateDiffManager::default());
let indexer_manager = Arc::new(IndexerManager::default());
let token_balance_manager = Arc::new(TokenBalanceManager::default());

tokio::task::spawn(subscriptions::model_diff::Service::new_with_block_rcv(
block_rx,
Expand All @@ -156,6 +160,10 @@ impl DojoWorld {

tokio::task::spawn(subscriptions::indexer::Service::new(Arc::clone(&indexer_manager)));

tokio::task::spawn(subscriptions::token_balance::Service::new(Arc::clone(
&token_balance_manager,
)));

Self {
pool,
world_address,
Expand All @@ -165,6 +173,7 @@ impl DojoWorld {
event_manager,
state_diff_manager,
indexer_manager,
token_balance_manager,
}
}
}
Expand Down Expand Up @@ -818,6 +827,15 @@ impl DojoWorld {
Ok(RetrieveTokenBalancesResponse { balances })
}

async fn subscribe_token_balances(
&self,
contract_addresses: Vec<Felt>,
account_addresses: Vec<Felt>,
) -> Result<Receiver<Result<proto::world::SubscribeTokenBalancesResponse, tonic::Status>>, Error>
{
self.token_balance_manager.add_subscriber(contract_addresses, account_addresses).await
}

async fn subscribe_indexer(
&self,
contract_address: Felt,
Expand Down Expand Up @@ -1229,6 +1247,8 @@ type SubscribeIndexerResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeIndexerResponse, Status>> + Send>>;
type RetrieveEntitiesStreamingResponseStream =
Pin<Box<dyn Stream<Item = Result<RetrieveEntitiesStreamingResponse, Status>> + Send>>;
type SubscribeTokenBalancesResponseStream =
Pin<Box<dyn Stream<Item = Result<SubscribeTokenBalancesResponse, Status>> + Send>>;

#[tonic::async_trait]
impl proto::world::world_server::World for DojoWorld {
Expand All @@ -1238,6 +1258,7 @@ impl proto::world::world_server::World for DojoWorld {
type SubscribeEventsStream = SubscribeEventsResponseStream;
type SubscribeIndexerStream = SubscribeIndexerResponseStream;
type RetrieveEntitiesStreamingStream = RetrieveEntitiesStreamingResponseStream;
type SubscribeTokenBalancesStream = SubscribeTokenBalancesResponseStream;

async fn world_metadata(
&self,
Expand Down Expand Up @@ -1340,6 +1361,52 @@ impl proto::world::world_server::World for DojoWorld {
Ok(Response::new(()))
}

async fn subscribe_token_balances(
&self,
request: Request<RetrieveTokenBalancesRequest>,
) -> ServiceResult<Self::SubscribeTokenBalancesStream> {
let RetrieveTokenBalancesRequest { contract_addresses, account_addresses } =
request.into_inner();
let contract_addresses = contract_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();
let account_addresses = account_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();

let rx = self
.subscribe_token_balances(contract_addresses, account_addresses)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeTokenBalancesStream))
}

async fn update_token_balances_subscription(
&self,
request: Request<UpdateTokenBalancesSubscriptionRequest>,
) -> ServiceResult<()> {
let UpdateTokenBalancesSubscriptionRequest {
subscription_id,
contract_addresses,
account_addresses,
} = request.into_inner();
let contract_addresses = contract_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();
let account_addresses = account_addresses
.iter()
.map(|address| Felt::from_bytes_be_slice(address))
.collect::<Vec<_>>();

self.token_balance_manager
.update_subscriber(subscription_id, contract_addresses, account_addresses)
.await;
Ok(Response::new(()))
}

async fn retrieve_entities(
&self,
request: Request<RetrieveEntitiesRequest>,
Expand Down
1 change: 1 addition & 0 deletions crates/torii/grpc/src/server/subscriptions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod event;
pub mod event_message;
pub mod indexer;
pub mod model_diff;
pub mod token_balance;

pub(crate) fn match_entity_keys(
id: Felt,
Expand Down
Loading

0 comments on commit 7b37e22

Please sign in to comment.