Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cli] extract fix gz state snapshots #16

Merged
merged 15 commits into from
Jan 30, 2025
1 change: 0 additions & 1 deletion src/analytics/enrich_rms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,5 +245,4 @@ fn test_rms_pipeline() {
assert!((s3.rms_24hour > 57.0) && (s3.rms_24hour < 58.0));

process_shill(&mut swaps);
// dbg!(&swaps);
}
77 changes: 0 additions & 77 deletions src/analytics/offline_matching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,78 +89,10 @@ pub async fn get_date_range_deposits_alt(
deposited,
};
top_deposits.push(d);
// dbg!(&d);
}
Ok(top_deposits)
}

// pub async fn get_date_range_deposits(
// pool: &Graph,
// top_n: u64,
// start: DateTime<Utc>,
// end: DateTime<Utc>,
// ) -> Result<Vec<Deposit>> {
// let mut top_deposits = vec![];

// let q = format!(
// // r#"
// // WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" AS olswap_deposit

// // // Step 1: Get the list of all depositors
// // MATCH (depositor:Account)-[tx:Tx]->(onboard:Account {{address: olswap_deposit}})
// // WITH COLLECT(DISTINCT depositor) AS all_depositors, olswap_deposit, tx

// // // Step 2: Match depositors and amounts within the date range

// // UNWIND all_depositors AS depositor

// // OPTIONAL MATCH (depositor)-[tx2:Tx]->(onboard:Account {{address: olswap_deposit}})
// // WHERE tx2.block_datetime >= datetime('{}') AND tx2.block_datetime <= datetime('{}')

// // WITH
// // depositor.address AS account,
// // COALESCE(SUM(tx2.V7_OlAccountTransfer_amount), 0)/1000000 AS deposit_amount
// // RETURN account, toFloat(deposit_amount) as deposited
// // ORDER BY deposited DESC

// // "#,
// r#"
// WITH "0xf57d3968d0bfd5b3120fda88f34310c70bd72033f77422f4407fbbef7c24557a" as exchange_deposit
// MATCH
// (u:Account)-[tx:Tx]->(onboard:Account {{address: exchange_deposit}})
// WHERE
// tx.`block_datetime` > datetime("{}")
// AND tx.`block_datetime` < datetime("{}")
// WITH
// DISTINCT(u),
// SUM(tx.V7_OlAccountTransfer_amount) AS totalTxAmount
// ORDER BY totalTxAmount DESCENDING
// RETURN u.address AS account, toFloat(totalTxAmount) / 1000000 AS deposited

// "#,
// start.to_rfc3339(),
// end.to_rfc3339(),
// // top_n,
// );
// let cypher_query = neo4rs::query(&q);

// // Execute the query
// let mut result = pool.execute(cypher_query).await?;

// // Fetch the first row only
// while let Some(r) = result.next().await? {
// let account_str = r.get::<String>("account").unwrap_or("unknown".to_string());
// let deposited = r.get::<f64>("deposited").unwrap_or(0.0);
// let d = Deposit {
// account: account_str.parse().unwrap_or(AccountAddress::ZERO),
// deposited,
// };
// top_deposits.push(d);
// // dbg!(&d);
// }
// Ok(top_deposits)
// }

pub async fn get_exchange_users(
pool: &Graph,
top_n: u64,
Expand Down Expand Up @@ -194,7 +126,6 @@ pub async fn get_exchange_users(
let funded = r.get::<f64>("funded").unwrap_or(0.0);
let d = MinFunding { user_id, funded };
min_funding.push(d);
// dbg!(&d);
}
Ok(min_funding)
}
Expand Down Expand Up @@ -226,7 +157,6 @@ pub async fn get_exchange_users_only_outflows(pool: &Graph) -> Result<Vec<MinFun
funded: funded as f64,
};
min_funding.push(d);
// dbg!(&d);
}
Ok(min_funding)
}
Expand Down Expand Up @@ -263,7 +193,6 @@ pub async fn get_one_exchange_user(
let funded = r.get::<f64>("funded").unwrap_or(0.0);
let d = MinFunding { user_id, funded };
min_funding.push(d);
// dbg!(&d);
}
Ok(min_funding)
}
Expand Down Expand Up @@ -311,11 +240,6 @@ impl Matching {
.map(|el| el.user_id)
.collect();

// dbg!(&ids);
// let user_ledger = funded.iter().find(|el| {
// // check if we have already identified it
// self.definite.0.get(el.user_id).none()
// });
Ok((*ids.first().unwrap(), *ids.get(1).unwrap()))
}

Expand Down Expand Up @@ -460,7 +384,6 @@ impl Matching {

let mut eval: Vec<AccountAddress> = vec![];
deposits.iter().for_each(|el| {
// dbg!(&el);
if el.deposited >= user.funded &&
// must not already have been tagged impossible
!pending.impossible.contains(&el.account) &&
Expand Down
1 change: 0 additions & 1 deletion src/cypher_templates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ use serde_json::Value;
pub fn to_cypher_object<T: Serialize>(object: &T) -> Result<String> {
// Serialize the struct to a JSON value
let serialized_value = serde_json::to_value(object).expect("Failed to serialize");
// dbg!(&serialized_value);

let flattener = smooth_json::Flattener {
separator: "_",
Expand Down
1 change: 0 additions & 1 deletion src/enrich_exchange_onboarding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[ExchangeOnRamp]) ->
// cypher queries makes it annoying to do a single insert of users and
// txs
let cypher_string = ExchangeOnRamp::cypher_batch_link_owner(&list_str);
// dbg!(&cypher_string);

// Execute the query
let cypher_query = neo4rs::query(&cypher_string);
Expand Down
28 changes: 14 additions & 14 deletions src/extract_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ pub async fn extract_current_transactions(

// first increment the block metadata. This assumes the vector is sequential.
if let Some(block) = tx.try_as_block_metadata() {
// check the epochs are incrementing or not
if epoch > block.epoch()
&& round > block.round()
&& timestamp > block.timestamp_usecs()
{
dbg!(
epoch,
block.epoch(),
round,
block.round(),
timestamp,
block.timestamp_usecs()
);
}
// // check the epochs are incrementing or not
// if epoch > block.epoch()
// && round > block.round()
// && timestamp > block.timestamp_usecs()
// {
// dbg!(
// epoch,
// block.epoch(),
// round,
// block.round(),
// timestamp,
// block.timestamp_usecs()
// );
// }

epoch = block.epoch();
round = block.round();
Expand Down
19 changes: 11 additions & 8 deletions src/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ pub async fn ingest_all(
let pending = queue::get_queued(pool).await?;
info!("pending archives: {}", pending.len());

// This manifest may be for a .gz file, we should handle here as well
for (_p, m) in archive_map.0.iter() {
info!("checking if we need to decompress");
let (new_unzip_path, temp) = unzip_temp::maybe_handle_gz(&m.archive_dir)?;
let mut better_man = ManifestInfo::new(&new_unzip_path);
better_man.set_info()?;

println!(
"\nProcessing: {:?} with archive: {}",
m.contents,
Expand All @@ -60,6 +66,7 @@ pub async fn ingest_all(
m.archive_dir.display()
);
}
drop(temp);
}

Ok(())
Expand All @@ -70,9 +77,6 @@ pub async fn try_load_one_archive(
pool: &Graph,
batch_size: usize,
) -> Result<BatchTxReturn> {
info!("checking if we need to decompress");
let (archive_path, temp) = unzip_temp::maybe_handle_gz(&man.archive_dir)?;

let mut all_results = BatchTxReturn::new();
match man.contents {
crate::scan::BundleContent::Unknown => todo!(),
Expand All @@ -82,24 +86,23 @@ pub async fn try_load_one_archive(
error!("no framework version detected");
bail!("could not load archive from manifest");
}
crate::scan::FrameworkVersion::V5 => extract_v5_snapshot(&archive_path).await?,
crate::scan::FrameworkVersion::V5 => extract_v5_snapshot(&man.archive_dir).await?,
crate::scan::FrameworkVersion::V6 => {
extract_current_snapshot(&archive_path).await?
extract_current_snapshot(&man.archive_dir).await?
}
crate::scan::FrameworkVersion::V7 => {
extract_current_snapshot(&archive_path).await?
extract_current_snapshot(&man.archive_dir).await?
}
};
snapshot_batch(&snaps, pool, batch_size, &man.archive_id).await?;
}
crate::scan::BundleContent::Transaction => {
let (txs, _) = extract_current_transactions(&archive_path, &man.version).await?;
let (txs, _) = extract_current_transactions(&man.archive_dir, &man.version).await?;
let batch_res =
load_tx_cypher::tx_batch(&txs, pool, batch_size, &man.archive_id).await?;
all_results.increment(&batch_res);
}
crate::scan::BundleContent::EpochEnding => todo!(),
}
drop(temp);
Ok(all_results)
}
2 changes: 0 additions & 2 deletions src/load_account_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ pub async fn snapshot_batch(

match impl_batch_snapshot_insert(pool, c).await {
Ok(batch) => {
// dbg!(&batch);
all_results.increment(&batch);
// dbg!(&all_results);
queue::update_task(pool, archive_id, true, i).await?;
info!("...success");
}
Expand Down
115 changes: 75 additions & 40 deletions src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,44 @@ pub struct ManifestInfo {
}

impl ManifestInfo {
pub fn new(archive_dir: &Path) -> Self {
let archive_id = archive_dir
.file_name()
.unwrap()
.to_str()
.unwrap()
.to_owned();
ManifestInfo {
archive_dir: archive_dir.to_path_buf(),
archive_id,
version: FrameworkVersion::Unknown,
contents: BundleContent::Unknown,
processed: false,
}
}

pub fn set_info(&mut self) -> Result<()> {
self.set_contents()?;
self.try_set_framework_version();
Ok(())
}

/// find out the type of content in the manifest
pub fn set_contents(&mut self) -> Result<()> {
// filenames may be in .gz format
let pattern = format!(
"{}/*.manifest*", // also try .gz
self.archive_dir
.to_str()
.context("cannot parse starting dir")?
);

if let Some(man_file) = glob(&pattern)?.flatten().next() {
self.contents = BundleContent::new_from_man_file(&man_file);
}
Ok(())
}

pub fn try_set_framework_version(&mut self) -> FrameworkVersion {
match self.contents {
BundleContent::Unknown => return FrameworkVersion::Unknown,
Expand All @@ -41,9 +79,8 @@ impl ManifestInfo {
// first check if the v7 manifest will parse
if let Ok(_bak) = load_snapshot_manifest(&man_path) {
self.version = FrameworkVersion::V7;
};

if v5_read_from_snapshot_manifest(&self.archive_dir.join("state.manifest")).is_ok()
} else if v5_read_from_snapshot_manifest(&self.archive_dir.join("state.manifest"))
.is_ok()
{
self.version = FrameworkVersion::V5;
}
Expand Down Expand Up @@ -83,6 +120,19 @@ pub enum BundleContent {
EpochEnding,
}
impl BundleContent {
pub fn new_from_man_file(man_file: &Path) -> Self {
let s = man_file.to_str().expect("invalid path");
if s.contains("transaction.manifest") {
return BundleContent::Transaction;
};
if s.contains("epoch_ending.manifest") {
return BundleContent::EpochEnding;
};
if s.contains("state.manifest") {
return BundleContent::StateSnapshot;
};
BundleContent::Unknown
}
pub fn filename(&self) -> String {
match self {
BundleContent::Unknown => "*.manifest".to_string(),
Expand Down Expand Up @@ -110,44 +160,29 @@ pub fn scan_dir_archive(

let mut archive = BTreeMap::new();

for entry in glob(&pattern)? {
match entry {
Ok(manifest_path) => {
let dir = manifest_path
.parent()
.context("no parent dir found")?
.to_owned();
let contents = test_content(&manifest_path);
let archive_id = dir.file_name().unwrap().to_str().unwrap().to_owned();
let mut m = ManifestInfo {
archive_dir: dir.clone(),
archive_id,
version: FrameworkVersion::Unknown,
contents,
processed: false,
};
m.try_set_framework_version();

archive.insert(manifest_path.clone(), m);
}
Err(e) => println!("{:?}", e),
}
for manifest_path in glob(&pattern)?.flatten() {
let archive_dir = manifest_path
.parent()
.expect("can't find manifest dir, weird");
let mut man = ManifestInfo::new(archive_dir);
man.set_info()?;
archive.insert(archive_dir.to_path_buf(), man);
}
Ok(ArchiveMap(archive))
}

/// find out the type of content in the manifest
fn test_content(manifest_path: &Path) -> BundleContent {
let s = manifest_path.to_str().expect("path invalid");
if s.contains("transaction.manifest") {
return BundleContent::Transaction;
};
if s.contains("epoch_ending.manifest") {
return BundleContent::EpochEnding;
};
if s.contains("state.manifest") {
return BundleContent::StateSnapshot;
};

BundleContent::Unknown
}
// /// find out the type of content in the manifest
// fn test_content(manifest_path: &Path) -> BundleContent {
// let s = manifest_path.to_str().expect("path invalid");
// if s.contains("transaction.manifest") {
// return BundleContent::Transaction;
// };
// if s.contains("epoch_ending.manifest") {
// return BundleContent::EpochEnding;
// };
// if s.contains("state.manifest") {
// return BundleContent::StateSnapshot;
// };

// BundleContent::Unknown
// }
Loading
Loading