Skip to content

Commit

Permalink
Add return type for settle_game
Browse files Browse the repository at this point in the history
  • Loading branch information
DogLooksGood committed Jan 11, 2024
1 parent 2bef889 commit a929e07
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 53 deletions.
2 changes: 1 addition & 1 deletion core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub trait TransportT: Send + Sync {

async fn publish_game(&self, params: PublishGameParams) -> Result<String>;

async fn settle_game(&self, params: SettleParams) -> Result<()>;
async fn settle_game(&self, params: SettleParams) -> Result<String>;

async fn create_registration(&self, params: CreateRegistrationParams) -> Result<String>;

Expand Down
5 changes: 5 additions & 0 deletions core/src/types/transactor_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ pub enum TxState {
},

PlayerConfirmingFailed(u64),

SettleSucceed {
settle_version: u64,
signature: Option<String>,
},
}

#[derive(Debug, Clone, PartialEq, Eq, BorshDeserialize, BorshSerialize)]
Expand Down
4 changes: 2 additions & 2 deletions facade/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ async fn get_player_info(
Ok(Some(player.try_to_vec().unwrap()))
}

async fn settle(params: Params<'_>, context: Arc<Mutex<Context>>) -> RpcResult<()> {
async fn settle(params: Params<'_>, context: Arc<Mutex<Context>>) -> RpcResult<String> {
let SettleParams {
addr,
settles,
Expand Down Expand Up @@ -814,7 +814,7 @@ async fn settle(params: Params<'_>, context: Arc<Mutex<Context>>) -> RpcResult<(

context.players = players;
context.games = games;
Ok(())
Ok(format!("facade_settle_{}", settle_version))
}

async fn run_server(context: Context) -> anyhow::Result<ServerHandle> {
Expand Down
1 change: 0 additions & 1 deletion js/sdk-core/src/events.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { field, array, enums, option, variant, struct } from '@race-foundation/borsh';
import { PlayerJoin, ServerJoin } from './accounts';
import { Fields, Id } from './types';
import { GamePlayer } from './effect';

Expand Down
39 changes: 34 additions & 5 deletions js/sdk-core/src/tx-state.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import { field, array, variant, struct } from '@race-foundation/borsh';
import { PlayerJoin } from './accounts';
import { field, array, variant, struct, option } from '@race-foundation/borsh';

// type StateField<T> = Omit<Fields<T>, 'kind'>;
export abstract class TxState {}

export type TxStateKind =
| 'PlayerConfirming'
| 'PlayerConfirmingFailed'
| 'SettleSucceed';

export interface ITxStateKind {
kind(): TxStateKind;
}

export class ConfirmingPlayer {
@field('u64')
id!: bigint;
Expand All @@ -20,7 +27,7 @@ export class ConfirmingPlayer {
}

@variant(0)
export class PlayerConfirming extends TxState {
export class PlayerConfirming extends TxState implements ITxStateKind {
@field(array(struct(ConfirmingPlayer)))
confirmPlayers!: ConfirmingPlayer[];
@field('u64')
Expand All @@ -30,15 +37,37 @@ export class PlayerConfirming extends TxState {
super();
Object.assign(this, fields);
}
kind(): TxStateKind {
return 'PlayerConfirming';
}
}

@variant(1)
export class PlayerConfirmingFailed extends TxState {
export class PlayerConfirmingFailed extends TxState implements ITxStateKind {
@field('u64')
accessVersion!: bigint;

constructor(fields: any) {
super();
Object.assign(this, fields);
}
kind(): TxStateKind {
return 'PlayerConfirmingFailed';
}
}

@variant(2)
export class SettleSucceed extends TxState implements ITxStateKind {
@field('u64')
settleVersion!: bigint;
@field(option('string'))
signature: string | undefined;

constructor(fields: any) {
super();
Object.assign(this, fields);
}
kind(): TxStateKind {
return 'SettleSucceed';
}
}
19 changes: 10 additions & 9 deletions transactor/src/component/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,24 +138,25 @@ impl Component<ConsumerPorts, BroadcasterContext> for Broadcaster {
});
}
EventFrame::TxState { tx_state } => match tx_state {
TxState::SettleSucceed { .. } => {
let r = ctx.broadcast_tx.send(BroadcastFrame::TxState { tx_state });

if let Err(e) = r {
debug!("Failed to broadcast event: {:?}", e);
}
}
TxState::PlayerConfirming {
confirm_players,
access_version,
..
} => {
let tx_state = TxState::PlayerConfirming {
confirm_players,
access_version,
};

let r = ctx.broadcast_tx.send(BroadcastFrame::TxState { tx_state });

if let Err(e) = r {
debug!("Failed to broadcast event: {:?}", e);
}
}
TxState::PlayerConfirmingFailed(access_version) => {
TxState::PlayerConfirmingFailed(_) => {
let r = ctx.broadcast_tx.send(BroadcastFrame::TxState {
tx_state: TxState::PlayerConfirmingFailed(access_version),
tx_state,
});

if let Err(e) = r {
Expand Down
6 changes: 6 additions & 0 deletions transactor/src/component/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ impl PipelinePorts {
self.tx.send(frame).await
}

pub fn clone_as_producer(&self) -> ProducerPorts {
ProducerPorts {
tx: self.tx.clone()
}
}

pub async fn send(&self, frame: EventFrame) {
match self.tx.send(frame).await {
Ok(_) => (),
Expand Down
48 changes: 33 additions & 15 deletions transactor/src/component/submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ use std::time::Duration;

use async_trait::async_trait;
use race_api::error::Error;
use race_core::types::{GameAccount, SettleParams};
use race_core::types::{GameAccount, SettleParams, TxState};
use tokio::select;
use tokio::sync::mpsc;
use tracing::error;

use crate::component::common::{Component, ConsumerPorts};
use crate::component::common::Component;
use crate::component::event_bus::CloseReason;
use crate::frame::EventFrame;
use race_core::transport::TransportT;

use super::common::PipelinePorts;

/// Squash two settles into one.
fn squash_settles(mut prev: SettleParams, next: SettleParams) -> SettleParams {
let SettleParams {
Expand Down Expand Up @@ -91,23 +93,34 @@ impl Submitter {
}

#[async_trait]
impl Component<ConsumerPorts, SubmitterContext> for Submitter {
impl Component<PipelinePorts, SubmitterContext> for Submitter {
fn name(&self) -> &str {
"Submitter"
}

async fn run(mut ports: ConsumerPorts, ctx: SubmitterContext) -> CloseReason {
async fn run(mut ports: PipelinePorts, ctx: SubmitterContext) -> CloseReason {
let (queue_tx, mut queue_rx) = mpsc::channel::<SettleParams>(32);

let p = ports.clone_as_producer();
// Start a task to handle settlements
// Prevent the blocking from pending transactions
let join_handle = tokio::spawn(async move {
loop {
let ps = read_settle_params(&mut queue_rx).await;
if let Some(params) = ps.into_iter().reduce(squash_settles) {
let settle_version = params.settle_version;
let res = ctx.transport.settle_game(params).await;
match res {
Ok(_) => (),
Ok(signature) => {
let tx_state = TxState::SettleSucceed {
signature: if signature.is_empty() {
None
} else {
Some(signature)
},
settle_version,
};
p.send(EventFrame::TxState { tx_state }).await;
}
Err(e) => {
return CloseReason::Fault(e);
}
Expand All @@ -127,16 +140,21 @@ impl Component<ConsumerPorts, SubmitterContext> for Submitter {
checkpoint,
settle_version,
} => {
let res = queue_tx.send(SettleParams {
addr: ctx.addr.clone(),
settles,
transfers,
checkpoint,
settle_version,
next_settle_version: settle_version + 1,
}).await;
let res = queue_tx
.send(SettleParams {
addr: ctx.addr.clone(),
settles,
transfers,
checkpoint,
settle_version,
next_settle_version: settle_version + 1,
})
.await;
if let Err(e) = res {
error!("Submitter failed to send settle to task queue: {}", e.to_string());
error!(
"Submitter failed to send settle to task queue: {}",
e.to_string()
);
}
}
EventFrame::Shutdown => {
Expand Down
23 changes: 12 additions & 11 deletions transactor/src/component/wrapped_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl TransportT for WrappedTransport {

/// `settle_version` is used to identify the settle state,
/// Until the `settle_version` is bumped, we keep retrying.
async fn settle_game(&self, params: SettleParams) -> Result<()> {
async fn settle_game(&self, params: SettleParams) -> Result<String> {
let mut curr_settle_version: Option<u64> = None;
loop {
let game_account = self
Expand All @@ -113,18 +113,19 @@ impl TransportT for WrappedTransport {
// The `settle_version` had been bumped, indicates the transaction was succeed
// NOTE: The transaction can success with error result due to unstable network
if curr_settle_version.is_some_and(|v| v < game_account.settle_version) {
return Ok(());
return Ok("".into());
}
curr_settle_version = Some(game_account.settle_version);
if let Err(e) = self.inner.settle_game(params.clone()).await {
error!(
"Error in settlement: {:?}, will retry in {} secs",
e, RETRY_INTERVAL
);
tokio::time::sleep(Duration::from_secs(RETRY_INTERVAL)).await;
continue;
} else {
return Ok(());
match self.inner.settle_game(params.clone()).await {
Ok(sig) => return Ok(sig),
Err(e) => {
error!(
"Error in settlement: {:?}, will retry in {} secs",
e, RETRY_INTERVAL
);
tokio::time::sleep(Duration::from_secs(RETRY_INTERVAL)).await;
continue;
}
}
} else {
error!(
Expand Down
4 changes: 0 additions & 4 deletions transactor/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ pub enum EventFrame {
checkpoint: Vec<u8>,
settle_version: u64,
},
SettleFinalized {
settle_version: u64,
},
ContextUpdated {
context: Box<GameContext>,
},
Expand Down Expand Up @@ -122,7 +119,6 @@ impl std::fmt::Display for EventFrame {
EventFrame::Checkpoint { .. } => write!(f, "Checkpoint"),
EventFrame::Broadcast { event, .. } => write!(f, "Broadcast: {}", event),
EventFrame::Settle { .. } => write!(f, "Settle"),
EventFrame::SettleFinalized { .. } => write!(f, "SettleFinalized"),
EventFrame::SendMessage { message } => write!(f, "SendMessage: {}", message.sender),
EventFrame::ContextUpdated { context: _ } => write!(f, "ContextUpdated"),
EventFrame::Shutdown => write!(f, "Shutdown"),
Expand Down
2 changes: 1 addition & 1 deletion transport/src/facade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl TransportT for FacadeTransport {
unimplemented!()
}

async fn settle_game(&self, params: SettleParams) -> Result<()> {
async fn settle_game(&self, params: SettleParams) -> Result<String> {
self.client
.request("settle", rpc_params![params])
.await
Expand Down
7 changes: 3 additions & 4 deletions transport/src/solana.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ impl TransportT for SolanaTransport {
Ok(mint_pubkey.to_string())
}

async fn settle_game(&self, params: SettleParams) -> Result<()> {
async fn settle_game(&self, params: SettleParams) -> Result<String> {
let SettleParams {
addr,
mut settles,
Expand Down Expand Up @@ -641,9 +641,8 @@ impl TransportT for SolanaTransport {
let mut tx = Transaction::new_unsigned(message);
let blockhash = self.get_blockhash()?;
tx.sign(&[payer], blockhash);
self.send_transaction(tx)?;

Ok(())
let sig = self.send_transaction(tx)?;
Ok(sig.to_string())
}

async fn create_registration(&self, params: CreateRegistrationParams) -> Result<String> {
Expand Down

0 comments on commit a929e07

Please sign in to comment.