Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MTG-703 Adding peer to peer consistency checks #316

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ indicatif = "0.17"

# Errors, futures, helpers, tools, time, etc...
# Errors
anyhow = "1"
thiserror = { version = "1"}
# Clients
arweave-rs = { version = "0.2.0", git = "https://github.com/RequescoS/arweave-rs.git", rev = "d8f5ef76f06c96afdf013fe5b62301790631b33f" }
Expand Down
29 changes: 29 additions & 0 deletions entities/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,19 +529,22 @@ pub struct MetadataInfo {
pub rent_epoch: u64,
pub executable: bool,
pub metadata_owner: Option<String>,
pub data_hash: u64,
}

#[derive(Clone)]
pub struct EditionMetadata {
pub edition: TokenMetadataEdition,
pub write_version: u64,
pub slot_updated: u64,
pub data_hash: u64,
}

#[derive(Clone, Debug)]
pub struct BurntMetadataSlot {
pub slot_updated: u64,
pub write_version: u64,
pub data_hash: u64,
}

#[derive(Clone)]
Expand All @@ -552,6 +555,7 @@ pub struct IndexableAssetWithAccountInfo {
pub slot_updated: u64,
pub write_version: u64,
pub rent_epoch: u64,
pub data_hash: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -566,6 +570,7 @@ pub struct TokenAccount {
pub slot_updated: i64,
pub amount: i64,
pub write_version: u64,
pub data_hash: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand All @@ -579,19 +584,22 @@ pub struct Mint {
pub token_program: Pubkey,
pub extensions: Option<MintAccountExtensions>,
pub write_version: u64,
pub data_hash: u64,
}

pub struct InscriptionInfo {
pub inscription: Inscription,
pub write_version: u64,
pub slot_updated: u64,
pub data_hash: u64,
}

#[derive(Clone)]
pub struct InscriptionDataInfo {
pub inscription_data: Vec<u8>,
pub write_version: u64,
pub slot_updated: u64,
pub data_hash: u64,
}

#[derive(Clone)]
Expand All @@ -602,6 +610,7 @@ pub struct CoreAssetFee {
pub rent_epoch: u64,
pub slot_updated: u64,
pub write_version: u64,
pub data_hash: u64,
}

pub struct UnprocessedAccountMessage {
Expand All @@ -610,6 +619,26 @@ pub struct UnprocessedAccountMessage {
pub id: String,
}

impl UnprocessedAccountMessage {
pub fn solana_change_info(&self) -> (Pubkey, u64, u64, u64) {
let (slot, write_version, data_hash) = match &self.account {
UnprocessedAccount::MetadataInfo(v) => (v.slot_updated, v.write_version, v.data_hash),
UnprocessedAccount::Token(v) => (v.slot_updated as u64, v.write_version, v.data_hash),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we ok with converting i64 as u64? In case it cannot be negative, why then TokenAccount stores it as i64? Some kind of restrictions from the DB?

UnprocessedAccount::Mint(v) => (v.slot_updated as u64, v.write_version, v.data_hash),
UnprocessedAccount::Edition(v) => (v.slot_updated, v.write_version, v.data_hash),
UnprocessedAccount::BurnMetadata(v) => (v.slot_updated, v.write_version, v.data_hash),
UnprocessedAccount::BurnMplCore(v) => (v.slot_updated, v.write_version, v.data_hash),
UnprocessedAccount::MplCore(v) => (v.slot_updated, v.write_version, v.data_hash),
UnprocessedAccount::Inscription(v) => (v.slot_updated, v.write_version, v.data_hash),
UnprocessedAccount::InscriptionData(v) => {
(v.slot_updated, v.write_version, v.data_hash)
}
UnprocessedAccount::MplCoreFee(v) => (v.slot_updated, v.write_version, v.data_hash),
};
(self.key, slot, write_version, data_hash)
}
}

pub struct BufferedTxWithID {
pub tx: BufferedTransaction,
pub id: String,
Expand Down
2 changes: 2 additions & 0 deletions grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
prost = { workspace = true }
futures = { workspace = true }
solana-sdk = { workspace = true }
interface = { path = "../interface" }
entities = { path = "../entities" }
metrics-utils = { path = "../metrics_utils" }
async-trait = { workspace = true }
thiserror = { workspace = true }
solana-transaction-status = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions grpc/build.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.protoc_arg("--experimental_allow_proto3_optional")
.out_dir("src") // Output directory for the generated Rust code within grpc module
.compile(
&[
// Paths to the .proto files
"proto/gap_filler.proto",
"proto/asset_urls.proto",
"proto/consistency_api.proto",
],
&["proto"], // Include paths for proto file dependencies
)?;
Expand Down
166 changes: 166 additions & 0 deletions grpc/proto/consistency_api.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
syntax = "proto3";

import "google/protobuf/empty.proto";

package consistencyapi;

// The very early grand epoch on a given peer
message BbgmEarlistGrandEpoch {
optional uint32 grand_epoch = 1;
}

// List of all grand epochs for a bubblegum tree
message BbgmGrandEpochForTreeList {
repeated BbgmGrandEpochChecksumForTree list = 1;
}

message BbgmGrandEpochChecksumForTree {
uint32 grand_epoch = 1;
optional bytes checksum = 2;
}

message BbgmGrandEpochList {
repeated BbgmGrandEpoch list = 1;
}

message BbgmGrandEpoch {
uint32 grand_epoch = 1;
bytes tree_pubkey = 2;
optional bytes checksum = 3;
}

message BbgmEpochList {
repeated BbgmEpoch list = 1;
}

message BbgmEpoch {
uint32 epoch = 1;
bytes tree_pubkey = 2;
optional bytes checksum = 3;
}

message BbgmChangeList {
repeated BbgmChange list = 1;
}

message BbgmChange {
bytes tree_pubkey = 1;
uint64 slot = 2;
uint64 seq = 3;
string signature = 4;
}

// Request object for getting grand epoch trees checksums
message GetBbgmGrandEpochsReq {
// Grand epoch number
uint32 grand_epoch = 1;
// Maximum amount of tree checksums to return
optional uint64 limit = 2;
// Return trees checksums that are after given
optional bytes after = 3;
}

// Request object for getting all grand epoch checksums for a given tree
message GetBbgmGrandEpochsForTreeReq {
bytes tree_pubkey = 1;
}

// Request object for getting epoch tree checksums in the geven grand epoch
message GetBbgmEpochsReq {
// Public key of the bubblegum tree, checksum should be returned for
bytes tree_pubkey = 1;
// Number of grand epoch which nested epochs should be returned
uint32 grand_epoch = 2;
}

message BbgmChangePosition {
uint64 slot = 1;
uint64 seq = 2;
}

// Request object for getting list of individual bubblegum tree changes
// that happened in the given epoch
message GetBbgmChangesReq {
// Pubkey of bubblegum tree
bytes tree_pubkey = 1;
// Number of epoch changes are listed from
uint32 epoch = 2;
// Maximum amount of bubblegum changes to return
optional uint64 limit = 3;
// Return changes after given position
optional BbgmChangePosition after = 4;
}

// Represents account NFT grand bucket checksum.
message AccGrandBucketChecksum {
uint32 grand_bucket = 1;
optional bytes checksum = 2;
}

// List of account NFT grand bucket checksums.
message AccGrandBucketChecksumsList {
repeated AccGrandBucketChecksum list = 1;
}

message AccBucketChecksum {
uint32 bucket = 1;
optional bytes checksum = 2;
}

message AccBucketChecksumsList {
repeated AccBucketChecksum list = 1;
}

// Represents last tracked account NFT change
message Acc {
bytes account_pubkey = 1;
uint64 slot = 2;
uint64 write_version = 3;
uint64 data_hash = 4;
}

// Represents list of last tracked account NFT changes
message AccList {
repeated Acc list = 1;
}

message GetAccBucketsReq {
uint32 grand_bucket = 1;
}

message GetAccReq {
// number of bucket
uint32 bucket = 1;
// maximum amount of account latest states to return
optional uint64 limit = 2;
// return account that are after the given
optional bytes after = 3;
}

service BbgmConsistencyService {
// Returns earliest grand epoch avaible on the peer.
rpc GetBbgmEarliestGrandEpoch(google.protobuf.Empty) returns (BbgmEarlistGrandEpoch);

// Request all the bubblegum grand epochs checksums for the given tree
rpc GetBbgmGrandEpochsForTree(GetBbgmGrandEpochsForTreeReq) returns (BbgmGrandEpochForTreeList);

// Request list of tree checksums in the given grand epoch
// No need to use stream since in the worst case the response size
// is still significanly less than 1 MB
rpc GetBbgmGrandEpochChecksums(GetBbgmGrandEpochsReq) returns (BbgmGrandEpochList);
rpc GetBbgmEpochChecksumsInGrandEpoch(GetBbgmEpochsReq) returns (BbgmEpochList);
rpc GetBbgmChangesInEpoch(GetBbgmChangesReq) returns (BbgmChangeList);

// Propose bubblegum changes to a peer, that has these changes missing.
// Can be called after after the "get changes" API is called, and a portion
// of missing bubblegum changes detected on the peer.
rpc ProposeMissingBbgmChanges(BbgmChangeList) returns (google.protobuf.Empty);
}

service AccConsistencyService {
rpc GetAccGrandBucketChecksums(google.protobuf.Empty) returns (AccGrandBucketChecksumsList);
rpc GetAccBucketChecksumsInGrandBucket(GetAccBucketsReq) returns (AccBucketChecksumsList);
rpc GetAccsInBucket(GetAccReq) returns (AccList);

rpc ProposeMissingAccChanges(AccList) returns (google.protobuf.Empty);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline

3 changes: 2 additions & 1 deletion grpc/src/asseturls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ impl DownloadError {
/// Generated client implementations.
pub mod asset_url_service_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::{http::Uri, *};
use tonic::codegen::http::Uri;
use tonic::codegen::*;
#[derive(Debug, Clone)]
pub struct AssetUrlServiceClient<T> {
inner: tonic::client::Grpc<T>,
Expand Down
7 changes: 5 additions & 2 deletions grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ pub struct Client {

impl Client {
pub async fn connect(peer_discovery: impl PeerDiscovery) -> Result<Self, GrpcError> {
let url = Uri::from_str(peer_discovery.get_gapfiller_peer_addr().as_str())
.map_err(|e| GrpcError::UriCreate(e.to_string()))?;
Client::connect_to_url(peer_discovery.get_gapfiller_peer_addr().as_str()).await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Client::connect_to_url(peer_discovery.get_gapfiller_peer_addr().as_str()).await
Client::connect_to_url(&peer_discovery.get_gapfiller_peer_addr()).await

Just a preference of style, feel free to ignore

}

pub async fn connect_to_url(url_str: &str) -> Result<Self, GrpcError> {
let url = Uri::from_str(url_str).map_err(|e| GrpcError::UriCreate(e.to_string()))?;
let channel = Channel::builder(url).connect().await?;

Ok(Self {
Expand Down
Loading
Loading