Skip to content

Commit

Permalink
refactor: checks on entity and make txn's atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
thevaibhav-dixit committed Nov 6, 2023
1 parent da7c5db commit 7688148
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 106 deletions.
52 changes: 32 additions & 20 deletions quotes-server/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod config;
use chrono::{DateTime, Duration, Utc};
use futures::stream::StreamExt;
use rust_decimal::Decimal;
use sqlx::{Postgres, Transaction};
use tracing::{info_span, Instrument};

use shared::{
Expand Down Expand Up @@ -98,9 +99,12 @@ impl QuotesApp {
.expires_at(expiry_time)
.build()
.expect("Could not build quote");
let quote = self.quotes.create(new_quote).await?;
let mut tx = self.pool.begin().await?;
let mut quote = self.quotes.create(&mut tx, new_quote).await?;
if immediate_execution {
self.accept_quote(quote.id).await?;
self.accept_quote_in_tx(tx, &mut quote).await?;
} else {
tx.commit().await?;
}

Ok(quote)
Expand All @@ -125,9 +129,12 @@ impl QuotesApp {
.expires_at(expiry_time)
.build()
.expect("Could not build quote");
let quote = self.quotes.create(new_quote).await?;
let mut tx = self.pool.begin().await?;
let mut quote = self.quotes.create(&mut tx, new_quote).await?;
if immediate_execution {
self.accept_quote(quote.id).await?;
self.accept_quote_in_tx(tx, &mut quote).await?;
} else {
tx.commit().await?;
}

Ok(quote)
Expand All @@ -152,9 +159,12 @@ impl QuotesApp {
.expires_at(expiry_time)
.build()
.expect("Could not build quote");
let quote = self.quotes.create(new_quote).await?;
let mut tx = self.pool.begin().await?;
let mut quote = self.quotes.create(&mut tx, new_quote).await?;
if immediate_execution {
self.accept_quote(quote.id).await?;
self.accept_quote_in_tx(tx, &mut quote).await?;
} else {
tx.commit().await?;
}

Ok(quote)
Expand All @@ -179,27 +189,30 @@ impl QuotesApp {
.expires_at(expiry_time)
.build()
.expect("Could not build quote");
let quote = self.quotes.create(new_quote).await?;
let mut tx = self.pool.begin().await?;
let mut quote = self.quotes.create(&mut tx, new_quote).await?;
if immediate_execution {
self.accept_quote(quote.id).await?;
self.accept_quote_in_tx(tx, &mut quote).await?;
} else {
tx.commit().await?;
}

Ok(quote)
}

pub async fn accept_quote(&self, id: QuoteId) -> Result<(), QuotesAppError> {
let mut quote = self.quotes.find_by_id(id).await?;

if quote.is_accepted() {
return Err(QuotesAppError::QuoteAlreadyAccepted(id.to_string()));
}

if quote.is_expired() {
return Err(QuotesAppError::QuoteExpired(id.to_string()));
}

quote.accept();
let tx = self.pool.begin().await?;
self.accept_quote_in_tx(tx, &mut quote).await?;
Ok(())
}

async fn accept_quote_in_tx(
&self,
mut tx: Transaction<'_, Postgres>,
quote: &mut Quote,
) -> Result<(), QuotesAppError> {
quote.accept()?;
self.quotes.update(&quote, &mut tx).await?;
if quote.direction == Direction::SellCents {
self.ledger
.sell_usd_quote_accepted(
Expand Down Expand Up @@ -229,7 +242,6 @@ impl QuotesApp {
)
.await?;
}
self.quotes.update(quote).await?;

Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions quotes-server/src/entity/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,9 @@ impl<T: DeserializeOwned + Serialize + 'static> EntityEvents<T> {
query.execute(&mut **tx).await?;
Ok(())
}

#[cfg(test)]
pub fn last(&self, n: usize) -> &[T] {
&self.events[self.events.len() - n..]
}
}
66 changes: 63 additions & 3 deletions quotes-server/src/quote/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use serde::{Deserialize, Serialize};

use crate::{currency::*, entity::*};

shared::entity_id! { QuoteId }
use super::QuoteError;

crate::entity_id!(QuoteId);

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
Expand Down Expand Up @@ -50,11 +52,18 @@ impl Quote {
false
}

pub fn accept(&mut self) {
pub fn accept(&mut self) -> Result<(), QuoteError> {
if self.is_accepted() {
return Err(QuoteError::QuoteAlreadyAccepted);
}
if self.is_expired() {
return Err(QuoteError::QuoteExpiredError);
}
self.events.push(QuoteEvent::Accepted {});
Ok(())
}

pub fn is_expired(&self) -> bool {
fn is_expired(&self) -> bool {
self.expires_at < Utc::now()
}
}
Expand Down Expand Up @@ -144,3 +153,54 @@ pub mod pg {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use chrono::{Duration, Utc};
use rust_decimal::Decimal;

fn init_events(expired: bool) -> EntityEvents<QuoteEvent> {
let expiration_interval = Duration::from_std(std::time::Duration::from_secs(120)) // 2 minutes = 120 seconds
.unwrap();
let expiration_time = if !expired {
Utc::now() + Duration::from_std(expiration_interval.to_std().unwrap()).unwrap()
} else {
Utc::now() - Duration::from_std(expiration_interval.to_std().unwrap()).unwrap()
};
EntityEvents::init([QuoteEvent::Initialized {
id: QuoteId::new(),
direction: Direction::BuyCents,
immediate_execution: false,
sat_amount: Satoshis::from(Decimal::from(100)),
cent_amount: UsdCents::from(Decimal::from(10)),
expires_at: expiration_time,
}])
}

#[test]
fn accept_quote() {
let events = init_events(false);
let mut quote = Quote::try_from(events).unwrap();
assert!(quote.accept().is_ok());
assert!(matches!(quote.events.last(1)[0], QuoteEvent::Accepted {}));
}

#[test]
fn can_only_accept_quote_once() {
let mut events = init_events(false);
events.push(QuoteEvent::Accepted {});
let mut quote = Quote::try_from(events).unwrap();
assert!(matches!(
quote.accept(),
Err(QuoteError::QuoteAlreadyAccepted)
));
}

#[test]
fn cannot_accept_expired_quote() {
let events = init_events(true);
let mut quote = Quote::try_from(events).unwrap();
assert!(matches!(quote.accept(), Err(QuoteError::QuoteExpiredError)));
}
}
4 changes: 4 additions & 0 deletions quotes-server/src/quote/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,8 @@ pub enum QuoteError {
Sqlx(#[from] sqlx::Error),
#[error("QuotesError - EntityError: {0}")]
EntityError(#[from] crate::entity::EntityError),
#[error("QuotesError - Quotes is already accepted")]
QuoteAlreadyAccepted,
#[error("QuotesError - Quote has expired")]
QuoteExpiredError,
}
22 changes: 12 additions & 10 deletions quotes-server/src/quote/repo.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use sqlx::{Pool, Postgres};
use sqlx::{Pool, Postgres, Transaction};
use tracing::instrument;

use crate::entity::*;
Expand All @@ -16,14 +16,17 @@ impl Quotes {
}

#[instrument(name = "quotes.create", skip(self))]
pub async fn create(&self, quote: NewQuote) -> Result<Quote, QuoteError> {
let mut tx = self.pool.begin().await?;
pub async fn create(
&self,
mut tx: &mut Transaction<'_, Postgres>,
quote: NewQuote,
) -> Result<Quote, QuoteError> {
sqlx::query!(
r#"INSERT INTO stablesats_quotes (id)
VALUES ($1)"#,
quote.id as QuoteId
)
.execute(&mut *tx)
.execute(&mut **tx)
.await?;
let res = Quote {
id: quote.id,
Expand All @@ -41,8 +44,6 @@ impl Quotes {
quote.initial_events().new_serialized_events(res.id),
)
.await?;

tx.commit().await?;
Ok(res)
}

Expand All @@ -68,19 +69,20 @@ impl Quotes {
Ok(Quote::try_from(entity_events)?)
}

pub async fn update(&self, quote: Quote) -> Result<(), QuoteError> {
pub async fn update(
&self,
quote: &Quote,
mut tx: &mut Transaction<'_, Postgres>,
) -> Result<(), QuoteError> {
if !quote.events.is_dirty() {
return Ok(());
}

let mut tx = self.pool.begin().await?;
EntityEvents::<QuoteEvent>::persist(
"stablesats_quote_events",
&mut tx,
quote.events.new_serialized_events(quote.id),
)
.await?;
tx.commit().await?;
Ok(())
}
}
21 changes: 3 additions & 18 deletions quotes-server/tests/quotes_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,29 +98,14 @@ async fn quotes_app() -> anyhow::Result<()> {
.quote_cents_from_sats_for_buy(dec!(100_000_000), false)
.await;
assert!(quote.is_ok());
let id = quote.unwrap().id;

let accepted = app.accept_quote(id).await;
let accepted = app.accept_quote(quote.unwrap().id).await;
assert!(accepted.is_ok());

let err = app.accept_quote(id).await;
assert!(matches!(err, Err(QuotesAppError::QuoteAlreadyAccepted(_))));

let quote = app
.quote_cents_from_sats_for_buy(dec!(100_000_000), true)
.await;
println!("{:?}", quote);
assert!(quote.is_ok());

let err = app.accept_quote(quote.unwrap().id).await;
assert!(matches!(err, Err(QuotesAppError::QuoteAlreadyAccepted(_))));

let quote = app
.quote_cents_from_sats_for_buy(dec!(100_000_000), false)
.await;
let expiration_duration = std::time::Duration::from_secs(2);
tokio::time::sleep(expiration_duration).await;
let err = app.accept_quote(quote.unwrap().id).await;
assert!(matches!(err, Err(QuotesAppError::QuoteExpired(_))));

assert!(quote.unwrap().is_accepted());
Ok(())
}
55 changes: 0 additions & 55 deletions shared/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,58 +124,3 @@ macro_rules! decimal_wrapper_common {
}
};
}

#[macro_export]
macro_rules! entity_id {
($name:ident) => {
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
sqlx::Type,
serde::Deserialize,
serde::Serialize,
)]
#[serde(transparent)]
#[sqlx(transparent)]
pub struct $name(uuid::Uuid);

#[allow(clippy::new_without_default)]
impl $name {
pub fn new() -> Self {
uuid::Uuid::new_v4().into()
}
}

impl From<uuid::Uuid> for $name {
fn from(uuid: uuid::Uuid) -> Self {
Self(uuid)
}
}

impl From<$name> for uuid::Uuid {
fn from(id: $name) -> Self {
id.0
}
}

impl std::fmt::Display for $name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

impl std::str::FromStr for $name {
type Err = uuid::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(uuid::Uuid::parse_str(s)?))
}
}
};
}

0 comments on commit 7688148

Please sign in to comment.