Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cypher] include cumulative tx amount in Lifetime relation #14

Merged
merged 18 commits into from
Jan 28, 2025
2 changes: 1 addition & 1 deletion src/analytics/enrich_rms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,5 +245,5 @@ fn test_rms_pipeline() {
assert!((s3.rms_24hour > 57.0) && (s3.rms_24hour < 58.0));

process_shill(&mut swaps);
dbg!(&swaps);
// dbg!(&swaps);
}
176 changes: 163 additions & 13 deletions src/cypher_templates.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//! organic free trade template literals for cypher queries
use anyhow::{Context, Result};

// TODO move this to a .CQL file so we can lint and debug
// batch tx submission query
// takes a Cypher Map object `list_str`
// and unwinds it into multiple merge operations
pub fn write_batch_tx_string(list_str: &str) -> String {
format!(
r#"
Expand All @@ -13,28 +15,176 @@ MERGE (from:Account {{address: tx.sender}})
MERGE (to:Account {{address: tx.recipient}})
MERGE (from)-[rel:Tx {{tx_hash: tx.tx_hash}}]->(to)

ON CREATE SET rel.created_at = timestamp(), rel.modified_at = null
ON MATCH SET rel.modified_at = timestamp()
ON CREATE SET rel.cypher_created_at = timestamp(), rel.cypher_modified_at = null
ON MATCH SET rel.cypher_modified_at = timestamp()
SET
rel.block_datetime = tx.block_datetime,
rel.block_timestamp = tx.block_timestamp,
rel.relation = tx.relation,
rel.function = tx.function
rel.function = tx.function,
rel.framework_version = tx.framework_version

// Conditionally add `tx.args` if it exists
FOREACH (_ IN CASE WHEN tx.args IS NOT NULL THEN [1] ELSE [] END |
SET rel += tx.args
)

WITH rel
// Conditionally increment the lifetime coins sent
FOREACH (_ IN CASE WHEN tx.amount > 0 THEN [1] ELSE [] END |
MERGE (from)-[relTotal:LifeTime]->(to)
SET relTotal.coins = COALESCE(relTotal.coins, 0) + tx.coins
)

RETURN
COUNT(CASE WHEN rel.created_at = timestamp() THEN 1 END) AS created_tx,
COUNT(CASE WHEN rel.modified_at = timestamp() AND rel.created_at < timestamp() THEN 1 END) AS modified_tx
COUNT(CASE WHEN rel.cypher_created_at = timestamp() THEN 1 END) AS created_tx,
COUNT(CASE WHEN rel.cypher_modified_at = timestamp() AND rel.created_at < timestamp() THEN 1 END) AS modified_tx
"#
)
}

// // TODO move this to a .CQL file so we can lint and debug
// pub fn write_batch_tx_string(list_str: &str) -> String {
// format!(
// r#"
// WITH {list_str} AS tx_data
// UNWIND tx_data AS tx

// // Ensure accounts exist
// MERGE (from:Account {{address: tx.sender}})
// MERGE (to:Account {{address: tx.recipient}})

// // Dynamically set the relationship label using a subquery
// WITH from, to, tx
// CALL {{
// // Conditionally create the appropriate relationship
// FOREACH (_ IN CASE WHEN tx.relation = "Transfer" THEN [1] ELSE [] END |
// MERGE (from)-[rel:Transfer {tx_hash: tx.tx_hash}]->(to)
// ON CREATE SET
// rel.cypher_created_at = timestamp(),
// rel.cypher_modified_at = null
// ON MATCH SET
// rel.cypher_modified_at = timestamp()
// SET
// rel.block_datetime = tx.block_datetime,
// rel.block_timestamp = tx.block_timestamp,
// rel.function = tx.function
// )
// FOREACH (_ IN CASE WHEN tx.relation = "Onboarding" THEN [1] ELSE [] END |
// MERGE (from)-[rel:Onboarding {tx_hash: tx.tx_hash}]->(to)
// ON CREATE SET
// rel.cypher_created_at = timestamp(),
// rel.cypher_modified_at = null
// ON MATCH SET
// rel.cypher_modified_at = timestamp()
// SET
// rel.block_datetime = tx.block_datetime,
// rel.block_timestamp = tx.block_timestamp,
// rel.function = tx.function
// )
// FOREACH (_ IN CASE WHEN tx.relation = "Vouch" THEN [1] ELSE [] END |
// MERGE (from)-[rel:Vouch {tx_hash: tx.tx_hash}]->(to)
// ON CREATE SET
// rel.cypher_created_at = timestamp(),
// rel.cypher_modified_at = null
// ON MATCH SET
// rel.cypher_modified_at = timestamp()
// SET
// rel.block_datetime = tx.block_datetime,
// rel.block_timestamp = tx.block_timestamp,
// rel.function = tx.function
// )
// FOREACH (_ IN CASE WHEN tx.relation IS NULL OR NOT tx.relation IN ["Transfer", "Onboarding", "Vouch"] THEN [1] ELSE [] END |
// MERGE (from)-[rel:Misc {tx_hash: tx.tx_hash}]->(to)
// ON CREATE SET
// rel.cypher_created_at = timestamp(),
// rel.cypher_modified_at = null
// ON MATCH SET
// rel.cypher_modified_at = timestamp()
// SET
// rel.block_datetime = tx.block_datetime,
// rel.block_timestamp = tx.block_timestamp,
// rel.function = tx.function
// CASE
// WHEN tx.args IS NOT NULL THEN
// SET rel += tx.args
// END

// )
// }}

// // // Conditionally add `tx.args` if it exists
// // FOREACH (_ IN CASE WHEN tx.args IS NOT NULL THEN [1] ELSE [] END |
// // SET rel += tx.args
// // )

// // Increment the cumulative Lifetime edge if `tx.amount > 0`
// FOREACH (_ IN CASE WHEN tx.amount > 0 THEN [1] ELSE [] END |
// MERGE (from)-[rl:Lifetime]->(to)
// SET rl.coins_tx = COALESCE(rl.amount, 0) + tx.amount
// )

// // Final return with counts
// RETURN
// COUNT(CASE WHEN rel.cypher_created_at = timestamp() THEN 1 END) AS created_tx,
// COUNT(CASE WHEN rel.cypher_modified_at = timestamp() AND rel.created_at < timestamp() THEN 1 END) AS modified_tx
// "#
// )
// }

// // TODO move this to a .CQL file so we can lint and debug
// pub fn write_batch_tx_string(list_str: &str) -> String {
// format!(
// r#"
// WITH {list_str} AS tx_data
// UNWIND tx_data AS tx

// // Ensure accounts exist
// MERGE (from:Account {{address: tx.sender}})
// MERGE (to:Account {{address: tx.recipient}})

// // Dynamically set the relationship label using a subquery
// WITH from, to, tx
// CALL {{
// WITH tx
// RETURN CASE
// WHEN tx.relation = "Tx" THEN "Tx"
// WHEN tx.relation = "Onboarding" THEN "Vouch"
// WHEN tx.relation = "Vouch" THEN "Vouch"
// ELSE "Unknown" // Default for unexpected or missing values
// END AS dynamicLabel
// }}
// WITH from, to, tx, dynamicLabel
// // Use dynamicLabel to create the relationship
// MERGE (from)-[rel:`${{dynamicLabel}}` {{tx_hash: tx.tx_hash}}]->(to)
// ON CREATE SET
// rel.cypher_created_at = timestamp(),
// rel.cypher_modified_at = null
// ON MATCH SET
// rel.cypher_modified_at = timestamp()
// SET
// rel.block_datetime = tx.block_datetime,
// rel.block_timestamp = tx.block_timestamp,
// rel.function = tx.function

// // Conditionally add `tx.args` if it exists
// FOREACH (_ IN CASE WHEN tx.args IS NOT NULL THEN [1] ELSE [] END |
// SET rel += tx.args
// )

// // Increment the cumulative Lifetime edge if `tx.amount > 0`
// FOREACH (_ IN CASE WHEN tx.amount > 0 THEN [1] ELSE [] END |
// MERGE (from)-[rl:Lifetime]->(to)
// SET rl.coins_tx = COALESCE(rl.amount, 0) + tx.amount
// )

// // Final return with counts
// RETURN
// COUNT(CASE WHEN rel.cypher_created_at = timestamp() THEN 1 END) AS created_tx,
// COUNT(CASE WHEN rel.cypher_modified_at = timestamp() AND rel.created_at < timestamp() THEN 1 END) AS modified_tx
// "#
// )
// }

pub fn write_batch_user_create(list_str: &str) -> String {
format!(
r#"
Expand All @@ -49,16 +199,16 @@ UNWIND unique_array AS addr
// Merge unique Accounts
MERGE (node:Account {{address: addr}})
ON CREATE SET
node.created_at = timestamp(),
node.modified_at = null
node.cypher_created_at = timestamp(),
node.cypher_modified_at = null
ON MATCH SET
node.modified_at = timestamp()
node.cypher_modified_at = timestamp()

RETURN
COUNT(node) AS unique_accounts,
COUNT(CASE WHEN node.created_at = timestamp() THEN 1 END) AS created_accounts,
COUNT(CASE WHEN node.modified_at = timestamp() AND node.created_at < timestamp() THEN 1 END) AS modified_accounts,
COUNT(CASE WHEN node.modified_at < timestamp() THEN 1 END) AS unchanged_accounts
COUNT(CASE WHEN node.cypher_created_at = timestamp() THEN 1 END) AS created_accounts,
COUNT(CASE WHEN node.cypher_modified_at = timestamp() AND node.cypher_created_at < timestamp() THEN 1 END) AS modified_accounts,
COUNT(CASE WHEN node.cypher_modified_at < timestamp() THEN 1 END) AS unchanged_accounts
"#
)
}
Expand Down
92 changes: 25 additions & 67 deletions src/decode_entry_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,12 @@ use libra_backwards_compatibility::sdk::{
v6_libra_framework_sdk_builder::EntryFunctionCall as V6EntryFunctionCall,
v7_libra_framework_sdk_builder::EntryFunctionCall as V7EntryFunctionCall,
};
// use libra_cached_packages::libra_stdlib::EntryFunctionCall as CurrentVersionEntryFunctionCall;

/// test all entry function decoders for the current bytes
/// EntryFunction decoding for V6, V7 eras
pub fn decode_entry_function_all_versions(
user_tx: &SignedTransaction,
events: &[WarehouseEvent],
) -> anyhow::Result<(EntryFunctionArgs, RelationLabel)> {
// TODO: current version encoding

// if let Some((args, relation)) = maybe_get_current_version_relation(user_tx, events) {
// return Ok((args, relation));
// }

if let Some((ef, rel)) = maybe_get_v7_relation(user_tx, events) {
return Ok((ef, rel));
}
Expand All @@ -29,75 +22,37 @@ pub fn decode_entry_function_all_versions(
bail!("no entry function found")
}

// TODO: the CurrentVersionEntryFunctionCall needs serde derives
// Using HEAD libra-framework code base try to decode transaction
// fn maybe_get_current_version_relation(
// user_tx: &SignedTransaction,
// events: &[WarehouseEvent],
// ) -> Option<(EntryFunctionArgs, RelationLabel)> {
// let ef = CurrentVersionEntryFunctionCall::decode(user_tx.payload());

// let relation = match ef {
// Some(EntryFunctionCall::OlAccountTransfer { to, amount: _ }) => {
// if is_onboarding_event(events) {
// RelationLabel::Onboarding(to)
// } else {
// RelationLabel::Transfer(to)
// }
// }
// Some(EntryFunctionCall::OlAccountCreateAccount { auth_key }) => {
// RelationLabel::Onboarding(auth_key)
// }
// Some(EntryFunctionCall::VouchVouchFor { friend_account }) => {
// RelationLabel::Vouch(friend_account)
// }
// Some(EntryFunctionCall::VouchInsistVouchFor { friend_account }) => {
// RelationLabel::Vouch(friend_account)
// }
// Some(EntryFunctionCall::CoinTransfer { to, .. }) => RelationLabel::Transfer(to),
// Some(EntryFunctionCall::AccountRotateAuthenticationKeyWithRotationCapability {
// rotation_cap_offerer_address,
// ..
// }) => RelationLabel::Transfer(rotation_cap_offerer_address),

// // TODO: get other entry functions with known counter parties
// // if nothing is found try to decipher from events
// _ => return None,
// };

// let args = EntryFunctionArgs::Current(ef.unwrap());

// Some((args, relation))
// }

fn maybe_get_v7_relation(
user_tx: &SignedTransaction,
events: &[WarehouseEvent],
) -> Option<(EntryFunctionArgs, RelationLabel)> {
let ef = V7EntryFunctionCall::decode(user_tx.payload());

let relation = match ef {
Some(V7EntryFunctionCall::OlAccountTransfer { to, amount: _ }) => {
Some(V7EntryFunctionCall::OlAccountTransfer { to, amount }) => {
if is_onboarding_event(events) {
RelationLabel::Onboarding(to)
RelationLabel::Onboarding(to, amount)
} else {
RelationLabel::Transfer(to)
RelationLabel::Transfer(to, amount)
}
}
Some(V7EntryFunctionCall::OlAccountCreateAccount { auth_key }) => {
RelationLabel::Onboarding(auth_key)
RelationLabel::Onboarding(auth_key, 0)
}
Some(V7EntryFunctionCall::VouchVouchFor { friend_account }) => {
RelationLabel::Vouch(friend_account)
}
Some(V7EntryFunctionCall::VouchInsistVouchFor { friend_account }) => {
RelationLabel::Vouch(friend_account)
}
Some(V7EntryFunctionCall::CoinTransfer { to, .. }) => RelationLabel::Transfer(to),
Some(V7EntryFunctionCall::AccountRotateAuthenticationKeyWithRotationCapability {
rotation_cap_offerer_address,
..
}) => RelationLabel::Transfer(rotation_cap_offerer_address),
Some(V7EntryFunctionCall::CoinTransfer { to, amount, .. }) => {
// RelationLabel::Transfer(to, amount)
if is_onboarding_event(events) {
RelationLabel::Onboarding(to, amount)
} else {
RelationLabel::Transfer(to, amount)
}
}

// TODO: get other entry functions with known counter parties
// if nothing is found try to decipher from events
Expand All @@ -115,27 +70,30 @@ fn maybe_get_v6_relation(
) -> Option<(EntryFunctionArgs, RelationLabel)> {
let ef = V6EntryFunctionCall::decode(user_tx.payload());
let relation = match ef {
Some(V6EntryFunctionCall::OlAccountTransfer { to, amount: _ }) => {
Some(V6EntryFunctionCall::OlAccountTransfer { to, amount }) => {
if is_onboarding_event(events) {
RelationLabel::Onboarding(to)
RelationLabel::Onboarding(to, amount)
} else {
RelationLabel::Transfer(to)
RelationLabel::Transfer(to, amount)
}
}
Some(V6EntryFunctionCall::OlAccountCreateAccount { auth_key }) => {
RelationLabel::Onboarding(auth_key)
RelationLabel::Onboarding(auth_key, 0)
}
Some(V6EntryFunctionCall::VouchVouchFor { wanna_be_my_friend }) => {
RelationLabel::Vouch(wanna_be_my_friend)
}
Some(V6EntryFunctionCall::VouchInsistVouchFor { wanna_be_my_friend }) => {
RelationLabel::Vouch(wanna_be_my_friend)
}
Some(V6EntryFunctionCall::CoinTransfer { to, .. }) => RelationLabel::Transfer(to),
Some(V6EntryFunctionCall::AccountRotateAuthenticationKeyWithRotationCapability {
rotation_cap_offerer_address,
..
}) => RelationLabel::Transfer(rotation_cap_offerer_address),
Some(V6EntryFunctionCall::CoinTransfer { to, amount, .. }) => {
// RelationLabel::Transfer(to, amount)
if is_onboarding_event(events) {
RelationLabel::Onboarding(to, amount)
} else {
RelationLabel::Transfer(to, amount)
}
}

// TODO: get other entry functions with known counter parties
// if nothing is found try to decipher from events
Expand Down
Loading
Loading