Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sana): use transaction to update token.current_owner #413

Merged
merged 1 commit into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions crates/ark-starknet/src/client/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use starknet::{
providers::{jsonrpc::HttpTransport, AnyProvider, JsonRpcClient, Provider, ProviderError},
};
use std::collections::HashMap;
use tracing::info;
use url::Url;

const INPUT_TOO_SHORT: &str = "0x496e70757420746f6f2073686f727420666f7220617267756d656e7473";
Expand Down Expand Up @@ -326,8 +325,6 @@ impl StarknetClient for StarknetClientHttp {
}
}

info!("Pending block events: {:?}", events);

Ok(events)
}

Expand Down
105 changes: 43 additions & 62 deletions crates/sana/src/storage/sqlx/default_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use super::types::*;
use crate::storage::types::*;
use crate::Storage;
use async_trait::async_trait;
use sqlx::{postgres::PgPoolOptions, Error as SqlxError, FromRow, PgPool, Row};
use sqlx::{postgres::PgPoolOptions, Error as SqlxError, FromRow, PgPool};
use std::str::FromStr;
use tracing::{error, trace, info};
use tracing::{error, info, trace};

impl From<SqlxError> for StorageError {
fn from(e: SqlxError) -> Self {
Expand All @@ -26,7 +26,7 @@ impl PostgresStorage {

Ok(Self {
pool: PgPoolOptions::new()
.max_connections(1)
.max_connections(10)
.connect(db_url)
.await?,
})
Expand Down Expand Up @@ -295,77 +295,33 @@ impl Storage for PostgresStorage {
event: &TokenTransferEvent,
) -> Result<(), StorageError> {
info!("Registering transfer event {:?}", event.token_event_id);
let existing_transfer_event =
(self.get_event_by_id(&event.token_event_id).await?).is_some();
let mut transaction = self.pool.begin().await?;
let existing_transfer_event = self.get_event_by_id(&event.token_event_id).await?.is_some();

if existing_transfer_event {
let q = "UPDATE token_event SET block_timestamp = $1 WHERE token_event_id = $2";
sqlx::query(q)
.bind(event.block_timestamp as i64)
.bind(event.token_event_id.clone())
.execute(&self.pool)
.execute(&mut *transaction)
.await?;

Ok(())
info!(
"Updated existing transfer event: {:?}",
event.token_event_id
);
} else {
let last_transfer_query = r#"SELECT block_timestamp
FROM token_event
WHERE contract_address = $1 AND chain_id = $2
AND token_id = $3
AND event_type IN ('Transfer', 'Burn', 'Mint')
ORDER BY block_timestamp DESC LIMIT 1"#;

match sqlx::query(last_transfer_query)
.bind(event.contract_address.clone())
.bind(event.chain_id.clone())
.bind(event.token_id.clone())
.fetch_optional(&self.pool)
.await
{
Ok(row) => {
if let Some(r) = row {
let last_transfer_timestamp: i64 = r.get(0);
let last_transfer_timestamp_u64 = last_transfer_timestamp as u64;

info!("Last transfer timestamp: {}", last_transfer_timestamp_u64);
info!("Current transfer timestamp: {}", event.block_timestamp);

if event.block_timestamp > last_transfer_timestamp_u64 {
info!(
"Update token owner: {} -> {}",
event.from_address, event.to_address
);

let update_q = "UPDATE token SET current_owner = $1 WHERE contract_address = $2 AND chain_id = $3 AND token_id = $4";
let _r = sqlx::query(update_q)
.bind(event.to_address.clone())
.bind(event.contract_address.clone())
.bind(event.chain_id.clone())
.bind(event.token_id.clone())
.execute(&self.pool)
.await?;
}
}
}
Err(e) => {
error!("Database error: {:?}", e);
}
}

let insert_query = "INSERT INTO token_event (token_event_id, contract_address, chain_id, token_id, token_id_hex, event_type, block_timestamp, transaction_hash, to_address, from_address)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (token_event_id) DO NOTHING";

let event_type = match &event.event_type {
Some(e) => {
let res = self.to_title_case(&e.to_string().to_lowercase());
Some(res)
}
_ => None,
};
let event_type = event.event_type.as_ref().map(|e| {
let res = self.to_title_case(&e.to_string().to_lowercase());
res
});

info!("inserting transfer event... {:?}", event_type);
info!("Inserting transfer event... {:?}", event_type);

let _r = sqlx::query(insert_query)
sqlx::query(insert_query)
.bind(event.token_event_id.clone())
.bind(event.contract_address.clone())
.bind(event.chain_id.clone())
Expand All @@ -376,11 +332,36 @@ impl Storage for PostgresStorage {
.bind(event.transaction_hash.clone())
.bind(event.to_address.clone())
.bind(event.from_address.clone())
.execute(&self.pool)
.execute(&mut *transaction)
.await?;

info!("Inserted transfer event: {:?}", event.token_event_id);

let update_query = "UPDATE token t
SET current_owner = (
SELECT te.to_address
FROM token_event te
WHERE te.contract_address = t.contract_address
AND te.token_id = t.token_id
AND te.chain_id = t.chain_id
AND te.event_type IN ('Transfer', 'Burn', 'Mint')
ORDER BY te.block_timestamp DESC
LIMIT 1
)
WHERE t.contract_address = $1 AND t.token_id = $2 AND t.chain_id = $3 RETURNING t.current_owner";

let current_owner: Option<String> = sqlx::query_scalar(update_query)
.bind(event.contract_address.clone())
.bind(event.token_id.clone())
.bind(event.chain_id.clone())
.fetch_one(&mut *transaction)
.await?;

Ok(())
info!("Current owner updated to: {:?}", current_owner);
}

transaction.commit().await?;
Ok(())
}

async fn get_contract_type(
Expand Down
Loading