Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement UTXO Return Address RPC command #436

Merged
merged 18 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions cli/src/modules/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,21 @@ impl Rpc {
let result = rpc.get_current_block_color_call(None, GetCurrentBlockColorRequest { hash }).await?;
self.println(&ctx, result);
}
RpcApiOps::GetUtxoReturnAddress => {
if argv.is_empty() || argv.len() != 2 {
return Err(Error::custom("Please specify a txid and a accepting_block_daa_score"));
}

let txid = argv.remove(0);
let txid = RpcHash::from_hex(txid.as_str())?;

let accepting_block_daa_score = argv.remove(0).parse::<u64>()?;

let result =
rpc.get_utxo_return_address_call(None, GetUtxoReturnAddressRequest { txid, accepting_block_daa_score }).await?;

self.println(&ctx, result);
}
_ => {
tprintln!(ctx, "rpc method exists but is not supported by the cli: '{op_str}'\r\n");
return Ok(());
Expand Down
2 changes: 1 addition & 1 deletion components/addressmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ mod address_store_with_cache {
let target_uniform_dist = Uniform::new(1.0, num_of_buckets as f64).unwrap();
let uniform_cdf = |x: f64| target_uniform_dist.cdf(&x);
for _ in 0..num_of_trials {
// The weight sampled expected uniform distibution
// The weight sampled expected uniform distribution
let prioritized_address_distribution = am
.lock()
.iterate_prioritized_random_addresses(HashSet::new())
Expand Down
1 change: 1 addition & 0 deletions components/consensusmanager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ duration-string.workspace = true
futures-util.workspace = true
futures.workspace = true
itertools.workspace = true
kaspa-addresses.workspace=true
kaspa-consensus-core.workspace = true
kaspa-consensus-notify.workspace = true
kaspa-core.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion components/consensusmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl StagingConsensus {
// Drop `prev` so that deletion below succeeds
drop(prev);
// Staging was committed and is now the active consensus so we can delete
// any pervious, now inactive, consensus entries
// any previous, now inactive, consensus entries
self.manager.delete_inactive_consensus_entries();
}

Expand Down
11 changes: 10 additions & 1 deletion components/consensusmanager/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use kaspa_consensus_core::{
header::Header,
pruning::{PruningPointProof, PruningPointTrustedData, PruningPointsList},
trusted::{ExternalGhostdagData, TrustedBlock},
tx::{MutableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
tx::{MutableTransaction, SignableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
utxo::utxo_inquirer::UtxoInquirerError,
BlockHashSet, BlueWorkType, ChainPath, Hash,
};
use kaspa_utils::sync::rwlock::*;
Expand Down Expand Up @@ -313,6 +314,14 @@ impl ConsensusSessionOwned {
self.clone().spawn_blocking(|c| c.get_chain_block_samples()).await
}

pub async fn async_get_populated_transaction(
&self,
txid: Hash,
accepting_block_daa_score: u64,
) -> Result<SignableTransaction, UtxoInquirerError> {
self.clone().spawn_blocking(move |c| c.get_populated_transaction(txid, accepting_block_daa_score)).await
}

/// Returns the antipast of block `hash` from the POV of `context`, i.e. `antipast(hash) ∩ past(context)`.
/// Since this might be an expensive operation for deep blocks, we allow the caller to specify a limit
/// `max_traversal_allowed` on the maximum amount of blocks to traverse for obtaining the answer
Expand Down
1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ faster-hex.workspace = true
futures-util.workspace = true
indexmap.workspace = true
itertools.workspace = true
kaspa-addresses.workspace = true
kaspa-consensus-core.workspace = true
kaspa-consensus-notify.workspace = true
kaspa-consensusmanager.workspace = true
Expand Down
9 changes: 8 additions & 1 deletion consensus/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use crate::{
header::Header,
pruning::{PruningPointProof, PruningPointTrustedData, PruningPointsList, PruningProofMetadata},
trusted::{ExternalGhostdagData, TrustedBlock},
tx::{MutableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
tx::{MutableTransaction, SignableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
utxo::utxo_inquirer::UtxoInquirerError,
BlockHashSet, BlueWorkType, ChainPath,
};
use kaspa_hashes::Hash;
Expand Down Expand Up @@ -170,6 +171,12 @@ pub trait ConsensusApi: Send + Sync {
unimplemented!()
}

/// Returns the fully populated transaction with the given txid which was accepted at the provided accepting_block_daa_score.
/// The argument `accepting_block_daa_score` is expected to be the DAA score of the accepting chain block of `txid`.
fn get_populated_transaction(&self, txid: Hash, accepting_block_daa_score: u64) -> Result<SignableTransaction, UtxoInquirerError> {
unimplemented!()
}

fn get_virtual_parents(&self) -> BlockHashSet {
unimplemented!()
}
Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/utxo/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod utxo_collection;
pub mod utxo_diff;
pub mod utxo_error;
pub mod utxo_inquirer;
pub mod utxo_view;
38 changes: 38 additions & 0 deletions consensus/core/src/utxo/utxo_inquirer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use kaspa_hashes::Hash;
use thiserror::Error;

#[derive(Error, Debug, Clone)]
pub enum UtxoInquirerError {
#[error("Transaction is already pruned")]
AlreadyPruned,
#[error("Transaction return address is coinbase")]
TxFromCoinbase,
#[error("Transaction not found at given accepting daa score")]
NoTxAtScore,
#[error("Transaction was found but not standard")]
NonStandard,
#[error("Did not find compact header for block hash {0} ")]
MissingCompactHeaderForBlockHash(Hash),
#[error("Did not find containing_acceptance for tx {0} ")]
MissingContainingAcceptanceForTx(Hash),
#[error("Did not find block {0} at block tx store")]
MissingBlockFromBlockTxStore(Hash),
#[error("Did not find index {0} in transactions of block {1}")]
MissingTransactionIndexOfBlock(usize, Hash),
#[error("Expected {0} to match {1} when checking block_transaction_store using array index of transaction")]
UnexpectedTransactionMismatch(Hash, Hash),
#[error("Did not find a utxo diff for chain block {0} ")]
MissingUtxoDiffForChainBlock(Hash),
#[error("Transaction {0} acceptance data must also be in the same block in this case")]
MissingOtherTransactionAcceptanceData(Hash),
#[error("Did not find index for hash {0}")]
MissingIndexForHash(Hash),
#[error("Did not find tip data")]
MissingTipData,
#[error("Did not find a hash at index {0} ")]
MissingHashAtIndex(u64),
#[error("Did not find acceptance data for chain block {0}")]
MissingAcceptanceDataForChainBlock(Hash),
#[error("Utxo entry is not filled")]
UnfilledUtxoEntry,
}
9 changes: 8 additions & 1 deletion consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ use kaspa_consensus_core::{
network::NetworkType,
pruning::{PruningPointProof, PruningPointTrustedData, PruningPointsList, PruningProofMetadata},
trusted::{ExternalGhostdagData, TrustedBlock},
tx::{MutableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
tx::{MutableTransaction, SignableTransaction, Transaction, TransactionOutpoint, UtxoEntry},
utxo::utxo_inquirer::UtxoInquirerError,
BlockHashSet, BlueWorkType, ChainPath, HashMapCustomHasher,
};
use kaspa_consensus_notify::root::ConsensusNotificationRoot;
Expand Down Expand Up @@ -687,6 +688,12 @@ impl ConsensusApi for Consensus {
sample_headers
}

fn get_populated_transaction(&self, txid: Hash, accepting_block_daa_score: u64) -> Result<SignableTransaction, UtxoInquirerError> {
// We need consistency between the pruning_point_store, utxo_diffs_store, block_transactions_store, selected chain and headers store reads
let _guard = self.pruning_lock.blocking_read();
self.virtual_processor.get_populated_transaction(txid, accepting_block_daa_score, self.get_source())
}

fn get_virtual_parents(&self) -> BlockHashSet {
self.lkg_virtual_state.load().parents.iter().copied().collect()
}
Expand Down
1 change: 1 addition & 0 deletions consensus/src/pipeline/virtual_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod errors;
mod processor;
mod utxo_inquirer;
mod utxo_validation;
pub use processor::*;
pub mod test_block_builder;
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ pub struct VirtualStateProcessor {
pub(super) block_window_cache_for_past_median_time: Arc<BlockWindowCacheStore>,

// Pruning lock
pruning_lock: SessionLock,
pub(super) pruning_lock: SessionLock,

// Notifier
notification_root: Arc<ConsensusNotificationRoot>,
Expand Down
189 changes: 189 additions & 0 deletions consensus/src/pipeline/virtual_processor/utxo_inquirer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use std::{cmp, sync::Arc};

use kaspa_consensus_core::{
acceptance_data::AcceptanceData,
tx::{SignableTransaction, Transaction, UtxoEntry},
utxo::{utxo_diff::ImmutableUtxoDiff, utxo_inquirer::UtxoInquirerError},
};
use kaspa_core::{trace, warn};
use kaspa_hashes::Hash;

use crate::model::stores::{
acceptance_data::AcceptanceDataStoreReader, block_transactions::BlockTransactionsStoreReader, headers::HeaderStoreReader,
selected_chain::SelectedChainStoreReader, utxo_diffs::UtxoDiffsStoreReader,
};

use super::VirtualStateProcessor;

impl VirtualStateProcessor {
/// Returns the fully populated transaction with the given txid which was accepted at the provided accepting_block_daa_score.
/// The argument `accepting_block_daa_score` is expected to be the DAA score of the accepting chain block of `txid`.
///
/// *Assumed to be called under the pruning read lock.*
pub fn get_populated_transaction(
&self,
txid: Hash,
accepting_block_daa_score: u64,
source_hash: Hash,
) -> Result<SignableTransaction, UtxoInquirerError> {
let source_daa_score = self
.headers_store
.get_compact_header_data(source_hash)
.map(|compact_header| compact_header.daa_score)
.map_err(|_| UtxoInquirerError::MissingCompactHeaderForBlockHash(source_hash))?;

if accepting_block_daa_score < source_daa_score {
// Early exit if target daa score is lower than that of pruning point's daa score:
return Err(UtxoInquirerError::AlreadyPruned);
}

let (matching_chain_block_hash, acceptance_data) =
self.find_accepting_chain_block_hash_at_daa_score(accepting_block_daa_score, source_hash)?;

// Expected to never fail, since we found the acceptance data and therefore there must be matching diff
let utxo_diff = self
.utxo_diffs_store
.get(matching_chain_block_hash)
.map_err(|_| UtxoInquirerError::MissingUtxoDiffForChainBlock(matching_chain_block_hash))?;

let tx = self.find_tx_from_acceptance_data(txid, &acceptance_data)?;

let mut populated_tx = SignableTransaction::new(tx);

let removed_diffs = utxo_diff.removed();

populated_tx.tx.inputs.iter().enumerate().for_each(|(index, input)| {
let filled_utxo = if let Some(utxo_entry) = removed_diffs.get(&input.previous_outpoint) {
Some(utxo_entry.clone().to_owned())
} else {
// This handles this rare scenario:
// - UTXO0 is spent by TX1 and creates UTXO1
// - UTXO1 is spent by TX2 and creates UTXO2
// - A chain block happens to accept both of these
// In this case, removed_diff wouldn't contain the outpoint of the created-and-immediately-spent UTXO
// so we use the transaction (which also has acceptance data in this block) and look at its outputs
let other_txid = input.previous_outpoint.transaction_id;
let other_tx = self.find_tx_from_acceptance_data(other_txid, &acceptance_data).unwrap();
let output = &other_tx.outputs[input.previous_outpoint.index as usize];
let utxo_entry =
UtxoEntry::new(output.value, output.script_public_key.clone(), accepting_block_daa_score, other_tx.is_coinbase());
Some(utxo_entry)
};

populated_tx.entries[index] = filled_utxo;
});

Ok(populated_tx)
}

/// Find the accepting chain block hash at the given DAA score by binary searching
/// through selected chain store using indexes.
/// This method assumes that local caller have acquired the pruning read lock to guarantee
/// consistency between reads on the selected_chain_store and headers_store (as well as
/// other stores outside). If no such lock is acquired, this method tries to find
/// the accepting chain block hash on a best effort basis (may fail if parts of the data
/// are pruned between two sequential calls)
fn find_accepting_chain_block_hash_at_daa_score(
&self,
target_daa_score: u64,
source_hash: Hash,
) -> Result<(Hash, Arc<AcceptanceData>), UtxoInquirerError> {
let sc_read = self.selected_chain_store.read();

let source_index = sc_read.get_by_hash(source_hash).map_err(|_| UtxoInquirerError::MissingIndexForHash(source_hash))?;
let (tip_index, tip_hash) = sc_read.get_tip().map_err(|_| UtxoInquirerError::MissingTipData)?;
let tip_daa_score = self
.headers_store
.get_compact_header_data(tip_hash)
.map(|tip| tip.daa_score)
.map_err(|_| UtxoInquirerError::MissingCompactHeaderForBlockHash(tip_hash))?;

// For a chain segment it holds that len(segment) <= daa_score(segment end) - daa_score(segment start). This is true
// because each chain block increases the daa score by at least one. Hence we can lower bound our search by high index
// minus the daa score gap as done below
let mut low_index = tip_index.saturating_sub(tip_daa_score.saturating_sub(target_daa_score)).max(source_index);
let mut high_index = tip_index;

let matching_chain_block_hash = loop {
// Binary search for the chain block that matches the target_daa_score
// 0. Get the mid point index
let mid = low_index + (high_index - low_index) / 2;

// 1. Get the chain block hash at that index. Error if we cannot find a hash at that index
let hash = sc_read.get_by_index(mid).map_err(|_| {
trace!("Did not find a hash at index {}", mid);
UtxoInquirerError::MissingHashAtIndex(mid)
})?;

// 2. Get the compact header so we have access to the daa_score. Error if we cannot find the header
let compact_header = self.headers_store.get_compact_header_data(hash).map_err(|_| {
trace!("Did not find a compact header with hash {}", hash);
UtxoInquirerError::MissingCompactHeaderForBlockHash(hash)
})?;

// 3. Compare block daa score to our target
match compact_header.daa_score.cmp(&target_daa_score) {
cmp::Ordering::Equal => {
// We found the chain block we need
break hash;
}
cmp::Ordering::Greater => {
high_index = mid - 1;
}
cmp::Ordering::Less => {
low_index = mid + 1;
}
}

if low_index > high_index {
return Err(UtxoInquirerError::NoTxAtScore);
}
};

let acceptance_data = self
.acceptance_data_store
.get(matching_chain_block_hash)
.map_err(|_| UtxoInquirerError::MissingAcceptanceDataForChainBlock(matching_chain_block_hash))?;

Ok((matching_chain_block_hash, acceptance_data))
}

/// Finds a transaction's containing block hash and index within block through
/// the accepting block acceptance data
fn find_containing_block_and_index_from_acceptance_data(
&self,
txid: Hash,
acceptance_data: &AcceptanceData,
) -> Option<(Hash, usize)> {
acceptance_data.iter().find_map(|mbad| {
let tx_arr_index =
mbad.accepted_transactions.iter().find_map(|tx| (tx.transaction_id == txid).then_some(tx.index_within_block as usize));
tx_arr_index.map(|index| (mbad.block_hash, index))
})
}

/// Finds a transaction through the accepting block acceptance data (and using indexed info therein for
/// finding the tx in the block transactions store)
fn find_tx_from_acceptance_data(&self, txid: Hash, acceptance_data: &AcceptanceData) -> Result<Transaction, UtxoInquirerError> {
let (containing_block, index) = self
.find_containing_block_and_index_from_acceptance_data(txid, acceptance_data)
.ok_or(UtxoInquirerError::MissingContainingAcceptanceForTx(txid))?;

let tx = self
.block_transactions_store
.get(containing_block)
.map_err(|_| UtxoInquirerError::MissingBlockFromBlockTxStore(containing_block))
.and_then(|block_txs| {
block_txs.get(index).cloned().ok_or(UtxoInquirerError::MissingTransactionIndexOfBlock(index, containing_block))
})?;

if tx.id() != txid {
// Should never happen, but do a sanity check. This would mean something went wrong with storing block transactions.
// Sanity check is necessary to guarantee that this function will never give back a wrong address (err on the side of not found)
warn!("Expected {} to match {} when checking block_transaction_store using array index of transaction", tx.id(), txid);
return Err(UtxoInquirerError::UnexpectedTransactionMismatch(tx.id(), txid));
}

Ok(tx)
}
}
Loading
Loading