Skip to content

Commit

Permalink
[cypher] include cumulative tx amount in Lifetime relation (#14)
Browse files Browse the repository at this point in the history
* cleanup

* rust relation label for transfer to have amount

* rename cypher update timestamps

* wip dynamic labels

* use dynamic labels for tx types

* indexes

* revert batch implementation, cast amounts

* revert test

* add coin amount to onboarding

* use updated encoding 5.2.0 file

* patch v5 parsing issue

* patch merge

* clean

* fix v5 timestamp parsing

* add framework version to txs

---------

Co-authored-by: Peregrine Fitz Bittern <[email protected]>
Co-authored-by: Caty Fitz Coney <[email protected]>
Co-authored-by: Mariella Bittern <[email protected]>
Co-authored-by: Nora Fitz Coney <[email protected]>
Co-authored-by: Paoletta De Heron <[email protected]>
Co-authored-by: Raffa Hind <[email protected]>
Co-authored-by: Sere McPresto <[email protected]>
Co-authored-by: Gianna LeHeron <[email protected]>
Co-authored-by: Sere De Beaver <[email protected]>
Co-authored-by: Sofi Lento <[email protected]>
Co-authored-by: Paoletta Saint Diminuendo <[email protected]>
Co-authored-by: Rosina Saint Pianissimo <[email protected]>
Co-authored-by: Gianna Fitz Wren <[email protected]>
Co-authored-by: Vale Diminuendo <[email protected]>
Co-authored-by: Lucietta Fitz Adagio <[email protected]>
Co-authored-by: Mariella De Pianissimo <[email protected]>
Co-authored-by: Reginald Saint Brock <[email protected]>
Co-authored-by: Franci MacLento <[email protected]>
  • Loading branch information
19 people authored Jan 28, 2025
1 parent 4c26f09 commit 2be6a01
Show file tree
Hide file tree
Showing 16 changed files with 364 additions and 29,876 deletions.
2 changes: 1 addition & 1 deletion src/analytics/enrich_rms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,5 +245,5 @@ fn test_rms_pipeline() {
assert!((s3.rms_24hour > 57.0) && (s3.rms_24hour < 58.0));

process_shill(&mut swaps);
dbg!(&swaps);
// dbg!(&swaps);
}
176 changes: 163 additions & 13 deletions src/cypher_templates.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
//! organic free trade template literals for cypher queries
use anyhow::{Context, Result};

// TODO move this to a .CQL file so we can lint and debug
// batch tx submission query
// takes a Cypher Map object `list_str`
// and unwinds it into multiple merge operations
pub fn write_batch_tx_string(list_str: &str) -> String {
format!(
r#"
Expand All @@ -13,28 +15,176 @@ MERGE (from:Account {{address: tx.sender}})
MERGE (to:Account {{address: tx.recipient}})
MERGE (from)-[rel:Tx {{tx_hash: tx.tx_hash}}]->(to)
ON CREATE SET rel.created_at = timestamp(), rel.modified_at = null
ON MATCH SET rel.modified_at = timestamp()
ON CREATE SET rel.cypher_created_at = timestamp(), rel.cypher_modified_at = null
ON MATCH SET rel.cypher_modified_at = timestamp()
SET
rel.block_datetime = tx.block_datetime,
rel.block_timestamp = tx.block_timestamp,
rel.relation = tx.relation,
rel.function = tx.function
rel.function = tx.function,
rel.framework_version = tx.framework_version
// Conditionally add `tx.args` if it exists
FOREACH (_ IN CASE WHEN tx.args IS NOT NULL THEN [1] ELSE [] END |
SET rel += tx.args
)
WITH rel
// Conditionally increment the lifetime coins sent
FOREACH (_ IN CASE WHEN tx.amount > 0 THEN [1] ELSE [] END |
MERGE (from)-[relTotal:LifeTime]->(to)
SET relTotal.coins = COALESCE(relTotal.coins, 0) + tx.coins
)
RETURN
COUNT(CASE WHEN rel.created_at = timestamp() THEN 1 END) AS created_tx,
COUNT(CASE WHEN rel.modified_at = timestamp() AND rel.created_at < timestamp() THEN 1 END) AS modified_tx
COUNT(CASE WHEN rel.cypher_created_at = timestamp() THEN 1 END) AS created_tx,
COUNT(CASE WHEN rel.cypher_modified_at = timestamp() AND rel.created_at < timestamp() THEN 1 END) AS modified_tx
"#
)
}

// // TODO move this to a .CQL file so we can lint and debug
// pub fn write_batch_tx_string(list_str: &str) -> String {
// format!(
// r#"
// WITH {list_str} AS tx_data
// UNWIND tx_data AS tx

// // Ensure accounts exist
// MERGE (from:Account {{address: tx.sender}})
// MERGE (to:Account {{address: tx.recipient}})

// // Dynamically set the relationship label using a subquery
// WITH from, to, tx
// CALL {{
// // Conditionally create the appropriate relationship
// FOREACH (_ IN CASE WHEN tx.relation = "Transfer" THEN [1] ELSE [] END |
// MERGE (from)-[rel:Transfer {tx_hash: tx.tx_hash}]->(to)
// ON CREATE SET
// rel.cypher_created_at = timestamp(),
// rel.cypher_modified_at = null
// ON MATCH SET
// rel.cypher_modified_at = timestamp()
// SET
// rel.block_datetime = tx.block_datetime,
// rel.block_timestamp = tx.block_timestamp,
// rel.function = tx.function
// )
// FOREACH (_ IN CASE WHEN tx.relation = "Onboarding" THEN [1] ELSE [] END |
// MERGE (from)-[rel:Onboarding {tx_hash: tx.tx_hash}]->(to)
// ON CREATE SET
// rel.cypher_created_at = timestamp(),
// rel.cypher_modified_at = null
// ON MATCH SET
// rel.cypher_modified_at = timestamp()
// SET
// rel.block_datetime = tx.block_datetime,
// rel.block_timestamp = tx.block_timestamp,
// rel.function = tx.function
// )
// FOREACH (_ IN CASE WHEN tx.relation = "Vouch" THEN [1] ELSE [] END |
// MERGE (from)-[rel:Vouch {tx_hash: tx.tx_hash}]->(to)
// ON CREATE SET
// rel.cypher_created_at = timestamp(),
// rel.cypher_modified_at = null
// ON MATCH SET
// rel.cypher_modified_at = timestamp()
// SET
// rel.block_datetime = tx.block_datetime,
// rel.block_timestamp = tx.block_timestamp,
// rel.function = tx.function
// )
// FOREACH (_ IN CASE WHEN tx.relation IS NULL OR NOT tx.relation IN ["Transfer", "Onboarding", "Vouch"] THEN [1] ELSE [] END |
// MERGE (from)-[rel:Misc {tx_hash: tx.tx_hash}]->(to)
// ON CREATE SET
// rel.cypher_created_at = timestamp(),
// rel.cypher_modified_at = null
// ON MATCH SET
// rel.cypher_modified_at = timestamp()
// SET
// rel.block_datetime = tx.block_datetime,
// rel.block_timestamp = tx.block_timestamp,
// rel.function = tx.function
// CASE
// WHEN tx.args IS NOT NULL THEN
// SET rel += tx.args
// END

// )
// }}

// // // Conditionally add `tx.args` if it exists
// // FOREACH (_ IN CASE WHEN tx.args IS NOT NULL THEN [1] ELSE [] END |
// // SET rel += tx.args
// // )

// // Increment the cumulative Lifetime edge if `tx.amount > 0`
// FOREACH (_ IN CASE WHEN tx.amount > 0 THEN [1] ELSE [] END |
// MERGE (from)-[rl:Lifetime]->(to)
// SET rl.coins_tx = COALESCE(rl.amount, 0) + tx.amount
// )

// // Final return with counts
// RETURN
// COUNT(CASE WHEN rel.cypher_created_at = timestamp() THEN 1 END) AS created_tx,
// COUNT(CASE WHEN rel.cypher_modified_at = timestamp() AND rel.created_at < timestamp() THEN 1 END) AS modified_tx
// "#
// )
// }

// // TODO move this to a .CQL file so we can lint and debug
// pub fn write_batch_tx_string(list_str: &str) -> String {
// format!(
// r#"
// WITH {list_str} AS tx_data
// UNWIND tx_data AS tx

// // Ensure accounts exist
// MERGE (from:Account {{address: tx.sender}})
// MERGE (to:Account {{address: tx.recipient}})

// // Dynamically set the relationship label using a subquery
// WITH from, to, tx
// CALL {{
// WITH tx
// RETURN CASE
// WHEN tx.relation = "Tx" THEN "Tx"
// WHEN tx.relation = "Onboarding" THEN "Vouch"
// WHEN tx.relation = "Vouch" THEN "Vouch"
// ELSE "Unknown" // Default for unexpected or missing values
// END AS dynamicLabel
// }}
// WITH from, to, tx, dynamicLabel
// // Use dynamicLabel to create the relationship
// MERGE (from)-[rel:`${{dynamicLabel}}` {{tx_hash: tx.tx_hash}}]->(to)
// ON CREATE SET
// rel.cypher_created_at = timestamp(),
// rel.cypher_modified_at = null
// ON MATCH SET
// rel.cypher_modified_at = timestamp()
// SET
// rel.block_datetime = tx.block_datetime,
// rel.block_timestamp = tx.block_timestamp,
// rel.function = tx.function

// // Conditionally add `tx.args` if it exists
// FOREACH (_ IN CASE WHEN tx.args IS NOT NULL THEN [1] ELSE [] END |
// SET rel += tx.args
// )

// // Increment the cumulative Lifetime edge if `tx.amount > 0`
// FOREACH (_ IN CASE WHEN tx.amount > 0 THEN [1] ELSE [] END |
// MERGE (from)-[rl:Lifetime]->(to)
// SET rl.coins_tx = COALESCE(rl.amount, 0) + tx.amount
// )

// // Final return with counts
// RETURN
// COUNT(CASE WHEN rel.cypher_created_at = timestamp() THEN 1 END) AS created_tx,
// COUNT(CASE WHEN rel.cypher_modified_at = timestamp() AND rel.created_at < timestamp() THEN 1 END) AS modified_tx
// "#
// )
// }

pub fn write_batch_user_create(list_str: &str) -> String {
format!(
r#"
Expand All @@ -49,16 +199,16 @@ UNWIND unique_array AS addr
// Merge unique Accounts
MERGE (node:Account {{address: addr}})
ON CREATE SET
node.created_at = timestamp(),
node.modified_at = null
node.cypher_created_at = timestamp(),
node.cypher_modified_at = null
ON MATCH SET
node.modified_at = timestamp()
node.cypher_modified_at = timestamp()
RETURN
COUNT(node) AS unique_accounts,
COUNT(CASE WHEN node.created_at = timestamp() THEN 1 END) AS created_accounts,
COUNT(CASE WHEN node.modified_at = timestamp() AND node.created_at < timestamp() THEN 1 END) AS modified_accounts,
COUNT(CASE WHEN node.modified_at < timestamp() THEN 1 END) AS unchanged_accounts
COUNT(CASE WHEN node.cypher_created_at = timestamp() THEN 1 END) AS created_accounts,
COUNT(CASE WHEN node.cypher_modified_at = timestamp() AND node.cypher_created_at < timestamp() THEN 1 END) AS modified_accounts,
COUNT(CASE WHEN node.cypher_modified_at < timestamp() THEN 1 END) AS unchanged_accounts
"#
)
}
Expand Down
92 changes: 25 additions & 67 deletions src/decode_entry_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,12 @@ use libra_backwards_compatibility::sdk::{
v6_libra_framework_sdk_builder::EntryFunctionCall as V6EntryFunctionCall,
v7_libra_framework_sdk_builder::EntryFunctionCall as V7EntryFunctionCall,
};
// use libra_cached_packages::libra_stdlib::EntryFunctionCall as CurrentVersionEntryFunctionCall;

/// test all entry function decoders for the current bytes
/// EntryFunction decoding for V6, V7 eras
pub fn decode_entry_function_all_versions(
user_tx: &SignedTransaction,
events: &[WarehouseEvent],
) -> anyhow::Result<(EntryFunctionArgs, RelationLabel)> {
// TODO: current version encoding

// if let Some((args, relation)) = maybe_get_current_version_relation(user_tx, events) {
// return Ok((args, relation));
// }

if let Some((ef, rel)) = maybe_get_v7_relation(user_tx, events) {
return Ok((ef, rel));
}
Expand All @@ -29,75 +22,37 @@ pub fn decode_entry_function_all_versions(
bail!("no entry function found")
}

// TODO: the CurrentVersionEntryFunctionCall needs serde derives
// Using HEAD libra-framework code base try to decode transaction
// fn maybe_get_current_version_relation(
// user_tx: &SignedTransaction,
// events: &[WarehouseEvent],
// ) -> Option<(EntryFunctionArgs, RelationLabel)> {
// let ef = CurrentVersionEntryFunctionCall::decode(user_tx.payload());

// let relation = match ef {
// Some(EntryFunctionCall::OlAccountTransfer { to, amount: _ }) => {
// if is_onboarding_event(events) {
// RelationLabel::Onboarding(to)
// } else {
// RelationLabel::Transfer(to)
// }
// }
// Some(EntryFunctionCall::OlAccountCreateAccount { auth_key }) => {
// RelationLabel::Onboarding(auth_key)
// }
// Some(EntryFunctionCall::VouchVouchFor { friend_account }) => {
// RelationLabel::Vouch(friend_account)
// }
// Some(EntryFunctionCall::VouchInsistVouchFor { friend_account }) => {
// RelationLabel::Vouch(friend_account)
// }
// Some(EntryFunctionCall::CoinTransfer { to, .. }) => RelationLabel::Transfer(to),
// Some(EntryFunctionCall::AccountRotateAuthenticationKeyWithRotationCapability {
// rotation_cap_offerer_address,
// ..
// }) => RelationLabel::Transfer(rotation_cap_offerer_address),

// // TODO: get other entry functions with known counter parties
// // if nothing is found try to decipher from events
// _ => return None,
// };

// let args = EntryFunctionArgs::Current(ef.unwrap());

// Some((args, relation))
// }

fn maybe_get_v7_relation(
user_tx: &SignedTransaction,
events: &[WarehouseEvent],
) -> Option<(EntryFunctionArgs, RelationLabel)> {
let ef = V7EntryFunctionCall::decode(user_tx.payload());

let relation = match ef {
Some(V7EntryFunctionCall::OlAccountTransfer { to, amount: _ }) => {
Some(V7EntryFunctionCall::OlAccountTransfer { to, amount }) => {
if is_onboarding_event(events) {
RelationLabel::Onboarding(to)
RelationLabel::Onboarding(to, amount)
} else {
RelationLabel::Transfer(to)
RelationLabel::Transfer(to, amount)
}
}
Some(V7EntryFunctionCall::OlAccountCreateAccount { auth_key }) => {
RelationLabel::Onboarding(auth_key)
RelationLabel::Onboarding(auth_key, 0)
}
Some(V7EntryFunctionCall::VouchVouchFor { friend_account }) => {
RelationLabel::Vouch(friend_account)
}
Some(V7EntryFunctionCall::VouchInsistVouchFor { friend_account }) => {
RelationLabel::Vouch(friend_account)
}
Some(V7EntryFunctionCall::CoinTransfer { to, .. }) => RelationLabel::Transfer(to),
Some(V7EntryFunctionCall::AccountRotateAuthenticationKeyWithRotationCapability {
rotation_cap_offerer_address,
..
}) => RelationLabel::Transfer(rotation_cap_offerer_address),
Some(V7EntryFunctionCall::CoinTransfer { to, amount, .. }) => {
// RelationLabel::Transfer(to, amount)
if is_onboarding_event(events) {
RelationLabel::Onboarding(to, amount)
} else {
RelationLabel::Transfer(to, amount)
}
}

// TODO: get other entry functions with known counter parties
// if nothing is found try to decipher from events
Expand All @@ -115,27 +70,30 @@ fn maybe_get_v6_relation(
) -> Option<(EntryFunctionArgs, RelationLabel)> {
let ef = V6EntryFunctionCall::decode(user_tx.payload());
let relation = match ef {
Some(V6EntryFunctionCall::OlAccountTransfer { to, amount: _ }) => {
Some(V6EntryFunctionCall::OlAccountTransfer { to, amount }) => {
if is_onboarding_event(events) {
RelationLabel::Onboarding(to)
RelationLabel::Onboarding(to, amount)
} else {
RelationLabel::Transfer(to)
RelationLabel::Transfer(to, amount)
}
}
Some(V6EntryFunctionCall::OlAccountCreateAccount { auth_key }) => {
RelationLabel::Onboarding(auth_key)
RelationLabel::Onboarding(auth_key, 0)
}
Some(V6EntryFunctionCall::VouchVouchFor { wanna_be_my_friend }) => {
RelationLabel::Vouch(wanna_be_my_friend)
}
Some(V6EntryFunctionCall::VouchInsistVouchFor { wanna_be_my_friend }) => {
RelationLabel::Vouch(wanna_be_my_friend)
}
Some(V6EntryFunctionCall::CoinTransfer { to, .. }) => RelationLabel::Transfer(to),
Some(V6EntryFunctionCall::AccountRotateAuthenticationKeyWithRotationCapability {
rotation_cap_offerer_address,
..
}) => RelationLabel::Transfer(rotation_cap_offerer_address),
Some(V6EntryFunctionCall::CoinTransfer { to, amount, .. }) => {
// RelationLabel::Transfer(to, amount)
if is_onboarding_event(events) {
RelationLabel::Onboarding(to, amount)
} else {
RelationLabel::Transfer(to, amount)
}
}

// TODO: get other entry functions with known counter parties
// if nothing is found try to decipher from events
Expand Down
Loading

0 comments on commit 2be6a01

Please sign in to comment.