Skip to content

Commit

Permalink
Merge pull request #1 from michaelsutton/add-daa-estimate-2
Browse files Browse the repository at this point in the history
DAA score timestamp estimation -- improvements and bug fixes
  • Loading branch information
coderofstuff authored Nov 23, 2023
2 parents 59ad2eb + 2a81c5a commit 3c9b355
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 25 deletions.
45 changes: 32 additions & 13 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
acceptance_data::AcceptanceDataStoreReader,
block_transactions::BlockTransactionsStoreReader,
ghostdag::{GhostdagData, GhostdagStoreReader},
headers::HeaderStoreReader,
headers::{CompactHeaderData, HeaderStoreReader},
headers_selected_tip::HeadersSelectedTipStoreReader,
past_pruning_points::PastPruningPointsStoreReader,
pruning::PruningStoreReader,
Expand Down Expand Up @@ -364,6 +364,16 @@ impl Consensus {
};
Ok(self.services.window_manager.estimate_network_hashes_per_second(window)?)
}

fn pruning_point_compact_headers(&self) -> Vec<(Hash, CompactHeaderData)> {
// PRUNE SAFETY: index is monotonic and past pruning point headers are expected permanently
let current_pp_info = self.pruning_point_store.read().get().unwrap();
(0..current_pp_info.index)
.map(|index| self.past_pruning_points_store.get(index).unwrap())
.chain(once(current_pp_info.pruning_point))
.map(|hash| (hash, self.headers_store.get_compact_header_data(hash).unwrap()))
.collect_vec()
}
}

impl ConsensusApi for Consensus {
Expand Down Expand Up @@ -486,10 +496,13 @@ impl ConsensusApi for Consensus {
}

/// Returns a Vec of header samples since genesis
/// Ordered ascending by daa_score, first entry is genesis
/// ordered by ascending daa_score, first entry is genesis
fn get_chain_block_samples(&self) -> Vec<DaaScoreTimestamp> {
// We need consistency between the past pruning points, selected chain and header store reads
let _guard = self.pruning_lock.blocking_read();

// Sorted from genesis to latest pruning_point_headers
let pp_headers = self.pruning_point_headers();
let pp_headers = self.pruning_point_compact_headers();
let step_divisor: usize = 3; // The number of extra samples we'll get from blocks after last pp header
let prealloc_len = pp_headers.len() + step_divisor + 1;

Expand All @@ -498,7 +511,9 @@ impl ConsensusApi for Consensus {
// Part 1: Add samples from pruning point headers:
if self.config.net.network_type == NetworkType::Mainnet {
sample_headers = Vec::<DaaScoreTimestamp>::with_capacity(prealloc_len + 15);
// For mainnet, we add extra data (15 pp headers) from before checkpoint genesis:
// For mainnet, we add extra data (15 pp headers) from before checkpoint genesis.
// Source: https://github.com/kaspagang/kaspad-py-explorer/blob/main/src/tx_timestamp_estimation.ipynb
// For context see also: https://github.com/kaspagang/kaspad-py-explorer/blob/main/src/genesis_proof.ipynb
sample_headers.push(DaaScoreTimestamp { daa_score: 0, timestamp: 1636298787842 });
sample_headers.push(DaaScoreTimestamp { daa_score: 87133, timestamp: 1636386662010 });
sample_headers.push(DaaScoreTimestamp { daa_score: 176797, timestamp: 1636473700804 });
Expand All @@ -519,21 +534,25 @@ impl ConsensusApi for Consensus {
}

for header in pp_headers.iter() {
sample_headers.push(DaaScoreTimestamp { daa_score: header.daa_score, timestamp: header.timestamp });
sample_headers.push(DaaScoreTimestamp { daa_score: header.1.daa_score, timestamp: header.1.timestamp });
}

// Part 2: Add samples from recent chain blocks
let sc_read = self.storage.selected_chain_store.read();
let low = pp_headers.last().unwrap().hash;
let high = sc_read.get_tip().unwrap().1;

let low_index = sc_read.get_by_hash(low).unwrap_option().unwrap_or(0);
let high_index = sc_read.get_by_hash(high).unwrap_option().unwrap_or(0);
let high_index = sc_read.get_tip().unwrap().0;
// The last pruning point is always expected in the selected chain store. However if due to some reason
// this is not the case, we prefer not crashing but rather avoid sampling (hence set low index to high index)
let low_index = sc_read.get_by_hash(pp_headers.last().unwrap().0).unwrap_option().unwrap_or(high_index);
let step_size = cmp::max((high_index - low_index) / (step_divisor as u64), 1);

for index in (low_index + step_size..=high_index).step_by(step_size as usize) {
let chain_block_header = self.storage.headers_store.get_header(sc_read.get_by_index(index).unwrap()).unwrap();
sample_headers.push(DaaScoreTimestamp::from(chain_block_header));
// We chain `high_index` to make sure we sample sink, and dedup to avoid sampling it twice
for index in (low_index + step_size..=high_index).step_by(step_size as usize).chain(once(high_index)).dedup() {
let compact = self
.storage
.headers_store
.get_compact_header_data(sc_read.get_by_index(index).expect("store lock is acquired"))
.unwrap();
sample_headers.push(DaaScoreTimestamp { daa_score: compact.daa_score, timestamp: compact.timestamp });
}

sample_headers
Expand Down
2 changes: 1 addition & 1 deletion rpc/grpc/core/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -821,4 +821,4 @@ message GetDaaScoreTimestampEstimateRequestMessage {
message GetDaaScoreTimestampEstimateResponseMessage{
repeated uint64 timestamps = 1;
RPCError error = 1000;
}
}
23 changes: 12 additions & 11 deletions rpc/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ impl RpcApi for RpcCoreService {
request: GetDaaScoreTimestampEstimateRequest,
) -> RpcResult<GetDaaScoreTimestampEstimateResponse> {
let session = self.consensus_manager.consensus().session().await;
// TODO: cache samples based on sufficient recency of the data and append sink data
let mut headers = session.async_get_chain_block_samples().await;
let mut requested_daa_scores = request.daa_scores.clone();
let mut daa_score_timestamp_map = HashMap::<u64, u64>::new();
Expand All @@ -551,6 +552,8 @@ impl RpcApi for RpcCoreService {
// Loop runs at O(n + m) where n = # pp headers, m = # requested daa_scores
// Loop will always end because in the worst case the last header with daa_score = 0 (the genesis)
// will cause every remaining requested daa_score to be "found in range"
//
// TODO: optimize using binary search over the samples to obtain O(m log n) complexity (which is an improvement assuming m << n)
while header_idx < headers.len() && req_idx < request.daa_scores.len() {
let header = headers.get(header_idx).unwrap();
let curr_daa_score = requested_daa_scores[req_idx];
Expand All @@ -559,23 +562,20 @@ impl RpcApi for RpcCoreService {
if header.daa_score <= curr_daa_score {
// For daa_score later than the last header, we estimate in milliseconds based on the difference
let time_adjustment = if header_idx == 0 {
// estimate milliseconds = (daa_score / bps)
((curr_daa_score - header.daa_score) / self.config.bps()).checked_mul(1000).unwrap_or(u64::MAX)
// estimate milliseconds = (daa_score * target_time_per_block)
(curr_daa_score - header.daa_score).checked_mul(self.config.target_time_per_block).unwrap_or(u64::MAX)
} else {
// "next" header is the one that we processed last iteration
let next_header = &headers[header_idx - 1];
let time_between_now_and_next = next_header.timestamp - header.timestamp;
let score_between_now_and_request = curr_daa_score - header.daa_score;
let score_between_now_and_next = next_header.daa_score - header.daa_score;

(time_between_now_and_next)
.checked_mul(score_between_now_and_request / score_between_now_and_next)
.unwrap_or(u64::MAX)
// Unlike DAA scores which are monotonic (over the selected chain), timestamps are not strictly monotonic, so we avoid assuming so
let time_between_headers = next_header.timestamp.checked_sub(header.timestamp).unwrap_or_default();
let score_between_query_and_header = (curr_daa_score - header.daa_score) as f64;
let score_between_headers = (next_header.daa_score - header.daa_score) as f64;
// Interpolate the timestamp delta using the estimated fraction based on DAA scores
((time_between_headers as f64) * (score_between_query_and_header / score_between_headers)) as u64
};

// Use higher types to catch overflows. Cast to lower type later on when confirmed within u64 range
let daa_score_timestamp = header.timestamp.checked_add(time_adjustment).unwrap_or(u64::MAX);

daa_score_timestamp_map.insert(curr_daa_score, daa_score_timestamp);

// Process the next daa score that's <= than current one (at earlier idx)
Expand All @@ -585,6 +585,7 @@ impl RpcApi for RpcCoreService {
}
}

// Note: it is safe to assume all entries exist in the map since the first sampled header is expected to have daa_score=0
let timestamps = request.daa_scores.iter().map(|curr_daa_score| daa_score_timestamp_map[curr_daa_score]).collect();

Ok(GetDaaScoreTimestampEstimateResponse::new(timestamps))
Expand Down

0 comments on commit 3c9b355

Please sign in to comment.