diff --git a/grovedb/src/lib.rs b/grovedb/src/lib.rs index 012785ca..0d28e0a7 100644 --- a/grovedb/src/lib.rs +++ b/grovedb/src/lib.rs @@ -204,6 +204,7 @@ use grovedb_version::version::GroveVersion; use grovedb_visualize::DebugByteVectors; #[cfg(any(feature = "full", feature = "verify"))] pub use query::{PathQuery, SizedQuery}; +use reference_path::path_from_reference_path_type; #[cfg(feature = "grovedbg")] use tokio::net::ToSocketAddrs; @@ -898,10 +899,11 @@ impl GroveDb { /// Method to visualize hash mismatch after verification pub fn visualize_verify_grovedb( &self, + verify_references: bool, grove_version: &GroveVersion, ) -> Result, Error> { Ok(self - .verify_grovedb(None, grove_version)? + .verify_grovedb(None, verify_references, grove_version)? .iter() .map(|(path, (root_hash, expected, actual))| { ( @@ -924,6 +926,7 @@ impl GroveDb { pub fn verify_grovedb( &self, transaction: TransactionArg, + verify_references: bool, grove_version: &GroveVersion, ) -> Result>, (CryptoHash, CryptoHash, CryptoHash)>, Error> { if let Some(transaction) = transaction { @@ -940,13 +943,20 @@ impl GroveDb { &SubtreePath::empty(), None, transaction, + verify_references, grove_version, ) } else { let root_merk = self .open_non_transactional_merk_at_path(SubtreePath::empty(), None, grove_version) .unwrap()?; - self.verify_merk_and_submerks(root_merk, &SubtreePath::empty(), None, grove_version) + self.verify_merk_and_submerks( + root_merk, + &SubtreePath::empty(), + None, + verify_references, + grove_version, + ) } } @@ -957,6 +967,7 @@ impl GroveDb { merk: Merk, path: &SubtreePath, batch: Option<&'db StorageBatch>, + verify_references: bool, grove_version: &GroveVersion, ) -> Result>, (CryptoHash, CryptoHash, CryptoHash)>, Error> { let mut all_query = Query::new(); @@ -968,61 +979,140 @@ impl GroveDb { while let Some((key, element_value)) = element_iterator.next_kv().unwrap() { let element = raw_decode(&element_value, grove_version)?; - if element.is_any_tree() { - let (kv_value, element_value_hash) = merk - .get_value_and_value_hash( - &key, - true, - None::<&fn(&[u8], &GroveVersion) -> Option>, + match element { + Element::SumTree(..) | Element::Tree(..) => { + let (kv_value, element_value_hash) = merk + .get_value_and_value_hash( + &key, + true, + None::<&fn(&[u8], &GroveVersion) -> Option>, + grove_version, + ) + .unwrap() + .map_err(MerkError)? + .ok_or(Error::CorruptedData( + "expected merk to contain value at key".to_string(), + ))?; + let new_path = path.derive_owned_with_child(key); + let new_path_ref = SubtreePath::from(&new_path); + + let inner_merk = self + .open_non_transactional_merk_at_path( + new_path_ref.clone(), + batch, + grove_version, + ) + .unwrap()?; + let root_hash = inner_merk.root_hash().unwrap(); + + let actual_value_hash = value_hash(&kv_value).unwrap(); + let combined_value_hash = combine_hash(&actual_value_hash, &root_hash).unwrap(); + + if combined_value_hash != element_value_hash { + issues.insert( + new_path.to_vec(), + (root_hash, combined_value_hash, element_value_hash), + ); + } + issues.extend(self.verify_merk_and_submerks( + inner_merk, + &new_path_ref, + batch, + verify_references, grove_version, - ) - .unwrap() - .map_err(MerkError)? - .ok_or(Error::CorruptedData( - "expected merk to contain value at key".to_string(), - ))?; - let new_path = path.derive_owned_with_child(key); - let new_path_ref = SubtreePath::from(&new_path); - - let inner_merk = self - .open_non_transactional_merk_at_path(new_path_ref.clone(), batch, grove_version) - .unwrap()?; - let root_hash = inner_merk.root_hash().unwrap(); - - let actual_value_hash = value_hash(&kv_value).unwrap(); - let combined_value_hash = combine_hash(&actual_value_hash, &root_hash).unwrap(); - - if combined_value_hash != element_value_hash { - issues.insert( - new_path.to_vec(), - (root_hash, combined_value_hash, element_value_hash), - ); + )?); } - issues.extend(self.verify_merk_and_submerks( - inner_merk, - &new_path_ref, - batch, - grove_version, - )?); - } else if element.is_any_item() { - let (kv_value, element_value_hash) = merk - .get_value_and_value_hash( - &key, - true, - None::<&fn(&[u8], &GroveVersion) -> Option>, - grove_version, - ) - .unwrap() - .map_err(MerkError)? - .ok_or(Error::CorruptedData( - "expected merk to contain value at key".to_string(), - ))?; - let actual_value_hash = value_hash(&kv_value).unwrap(); - if actual_value_hash != element_value_hash { - issues.insert( - path.derive_owned_with_child(key).to_vec(), - (actual_value_hash, element_value_hash, actual_value_hash), - ); + Element::Item(..) | Element::SumItem(..) => { + let (kv_value, element_value_hash) = merk + .get_value_and_value_hash( + &key, + true, + None::<&fn(&[u8], &GroveVersion) -> Option>, + grove_version, + ) + .unwrap() + .map_err(MerkError)? + .ok_or(Error::CorruptedData( + "expected merk to contain value at key".to_string(), + ))?; + let actual_value_hash = value_hash(&kv_value).unwrap(); + if actual_value_hash != element_value_hash { + issues.insert( + path.derive_owned_with_child(key).to_vec(), + (actual_value_hash, element_value_hash, actual_value_hash), + ); + } + } + Element::Reference(ref reference_path, ..) => { + // Skip this whole check if we don't `verify_references` + if !verify_references { + continue; + } + + // Merk we're checking: + let (kv_value, element_value_hash) = merk + .get_value_and_value_hash( + &key, + true, + None::<&fn(&[u8], &GroveVersion) -> Option>, + grove_version, + ) + .unwrap() + .map_err(MerkError)? + .ok_or(Error::CorruptedData( + "expected merk to contain value at key".to_string(), + ))?; + + // Absolute path to the referenced merk and the key in that merk: + let (referenced_path, referenced_key) = { + let mut full_path = path_from_reference_path_type( + reference_path.clone(), + &path.to_vec(), + Some(&key), + )?; + let key = full_path.pop().ok_or_else(|| { + Error::MissingReference( + "can't resolve reference path to absolute path".to_owned(), + ) + })?; + (full_path, key) + }; + + // Open another subtree, one that is referenced: + let referenced_merk = self + .open_non_transactional_merk_at_path( + (referenced_path.as_slice()).into(), + None, + grove_version, + ) + .unwrap()?; + + // Get value hash of the referenced item + let (_, referenced_value_hash) = referenced_merk + .get_value_and_value_hash( + &referenced_key, + true, + None::<&fn(&[u8], &GroveVersion) -> Option>, + grove_version, + ) + .unwrap() + .map_err(MerkError)? + .ok_or(Error::CorruptedData( + "expected merk to contain value at key".to_string(), + ))?; + + // Take the current item (reference) hash and combine it with referenced value's + // hash + let self_actual_value_hash = value_hash(&kv_value).unwrap(); + let combined_value_hash = + combine_hash(&self_actual_value_hash, &referenced_value_hash).unwrap(); + + if combined_value_hash != element_value_hash { + issues.insert( + path.derive_owned_with_child(key).to_vec(), + (combined_value_hash, element_value_hash, combined_value_hash), + ); + } } } } @@ -1035,6 +1125,7 @@ impl GroveDb { path: &SubtreePath, batch: Option<&'db StorageBatch>, transaction: &Transaction, + verify_references: bool, grove_version: &GroveVersion, ) -> Result>, (CryptoHash, CryptoHash, CryptoHash)>, Error> { let mut all_query = Query::new(); @@ -1046,67 +1137,143 @@ impl GroveDb { while let Some((key, element_value)) = element_iterator.next_kv().unwrap() { let element = raw_decode(&element_value, grove_version)?; - if element.is_any_tree() { - let (kv_value, element_value_hash) = merk - .get_value_and_value_hash( - &key, - true, - None::<&fn(&[u8], &GroveVersion) -> Option>, - grove_version, - ) - .unwrap() - .map_err(MerkError)? - .ok_or(Error::CorruptedData( - "expected merk to contain value at key".to_string(), - ))?; - let new_path = path.derive_owned_with_child(key); - let new_path_ref = SubtreePath::from(&new_path); - - let inner_merk = self - .open_transactional_merk_at_path( - new_path_ref.clone(), - transaction, + match element { + Element::SumTree(..) | Element::Tree(..) => { + let (kv_value, element_value_hash) = merk + .get_value_and_value_hash( + &key, + true, + None::<&fn(&[u8], &GroveVersion) -> Option>, + grove_version, + ) + .unwrap() + .map_err(MerkError)? + .ok_or(Error::CorruptedData( + "expected merk to contain value at key".to_string(), + ))?; + let new_path = path.derive_owned_with_child(key); + let new_path_ref = SubtreePath::from(&new_path); + + let inner_merk = self + .open_transactional_merk_at_path( + new_path_ref.clone(), + transaction, + batch, + grove_version, + ) + .unwrap()?; + let root_hash = inner_merk.root_hash().unwrap(); + + let actual_value_hash = value_hash(&kv_value).unwrap(); + let combined_value_hash = combine_hash(&actual_value_hash, &root_hash).unwrap(); + + if combined_value_hash != element_value_hash { + issues.insert( + new_path.to_vec(), + (root_hash, combined_value_hash, element_value_hash), + ); + } + issues.extend(self.verify_merk_and_submerks_in_transaction( + inner_merk, + &new_path_ref, batch, + transaction, + verify_references, grove_version, - ) - .unwrap()?; - let root_hash = inner_merk.root_hash().unwrap(); + )?); + } + Element::Item(..) | Element::SumItem(..) => { + let (kv_value, element_value_hash) = merk + .get_value_and_value_hash( + &key, + true, + None::<&fn(&[u8], &GroveVersion) -> Option>, + grove_version, + ) + .unwrap() + .map_err(MerkError)? + .ok_or(Error::CorruptedData( + "expected merk to contain value at key".to_string(), + ))?; + let actual_value_hash = value_hash(&kv_value).unwrap(); + if actual_value_hash != element_value_hash { + issues.insert( + path.derive_owned_with_child(key).to_vec(), + (actual_value_hash, element_value_hash, actual_value_hash), + ); + } + } + Element::Reference(ref reference_path, ..) => { + // Skip this whole check if we don't `verify_references` + if !verify_references { + continue; + } - let actual_value_hash = value_hash(&kv_value).unwrap(); - let combined_value_hash = combine_hash(&actual_value_hash, &root_hash).unwrap(); + // Merk we're checking: + let (kv_value, element_value_hash) = merk + .get_value_and_value_hash( + &key, + true, + None::<&fn(&[u8], &GroveVersion) -> Option>, + grove_version, + ) + .unwrap() + .map_err(MerkError)? + .ok_or(Error::CorruptedData( + "expected merk to contain value at key".to_string(), + ))?; - if combined_value_hash != element_value_hash { - issues.insert( - new_path.to_vec(), - (root_hash, combined_value_hash, element_value_hash), - ); - } - issues.extend(self.verify_merk_and_submerks_in_transaction( - inner_merk, - &new_path_ref, - batch, - transaction, - grove_version, - )?); - } else if element.is_any_item() { - let (kv_value, element_value_hash) = merk - .get_value_and_value_hash( - &key, - true, - None::<&fn(&[u8], &GroveVersion) -> Option>, - grove_version, - ) - .unwrap() - .map_err(MerkError)? - .ok_or(Error::CorruptedData( - "expected merk to contain value at key".to_string(), - ))?; - let actual_value_hash = value_hash(&kv_value).unwrap(); - if actual_value_hash != element_value_hash { - issues.insert( - path.derive_owned_with_child(key).to_vec(), - (actual_value_hash, element_value_hash, actual_value_hash), - ); + // Absolute path to the referenced merk and the key in that merk: + let (referenced_path, referenced_key) = { + let mut full_path = path_from_reference_path_type( + reference_path.clone(), + &path.to_vec(), + Some(&key), + )?; + let key = full_path.pop().ok_or_else(|| { + Error::MissingReference( + "can't resolve reference path to absolute path".to_owned(), + ) + })?; + (full_path, key) + }; + + // Open another subtree, one that is referenced: + let referenced_merk = self + .open_transactional_merk_at_path( + (referenced_path.as_slice()).into(), + transaction, + None, + grove_version, + ) + .unwrap()?; + + // Get value hash of the referenced item + let (_, referenced_value_hash) = referenced_merk + .get_value_and_value_hash( + &referenced_key, + true, + None::<&fn(&[u8], &GroveVersion) -> Option>, + grove_version, + ) + .unwrap() + .map_err(MerkError)? + .ok_or(Error::CorruptedData( + "expected merk to contain value at key".to_string(), + ))?; + + // Take the current item (reference) hash and combine it with referenced value's + // hash + let self_actual_value_hash = value_hash(&kv_value).unwrap(); + let combined_value_hash = + combine_hash(&self_actual_value_hash, &referenced_value_hash).unwrap(); + + if combined_value_hash != element_value_hash { + issues.insert( + path.derive_owned_with_child(key).to_vec(), + (combined_value_hash, element_value_hash, combined_value_hash), + ); + } } } } diff --git a/grovedb/src/tests/mod.rs b/grovedb/src/tests/mod.rs index 226a047b..f98fcb64 100644 --- a/grovedb/src/tests/mod.rs +++ b/grovedb/src/tests/mod.rs @@ -758,6 +758,8 @@ pub fn make_deep_tree_with_sum_trees(grove_version: &GroveVersion) -> TempGroveD } mod tests { + use grovedb_merk::proofs::query::SubqueryBranch; + use super::*; #[test] @@ -3914,4 +3916,83 @@ mod tests { Err(Error::PathParentLayerNotFound(..)) )); } + + #[test] + fn test_grovedb_verify_corrupted_reference() { + // This test is dedicated to a case when references are out of sync, but + // `verify_grovedb` must detect this case as any other inconsistency + + let grove_version = GroveVersion::latest(); + let db = make_deep_tree(grove_version); + + // Insert a reference + db.insert( + &[TEST_LEAF, b"innertree"], + b"ref", + Element::Reference( + ReferencePathType::AbsolutePathReference(vec![ + ANOTHER_TEST_LEAF.to_vec(), + b"innertree2".to_vec(), + b"key3".to_vec(), + ]), + None, + None, + ), + None, + None, + grove_version, + ) + .unwrap() + .unwrap(); + + // Ensure we can prove and verify the inserted reference + let query = PathQuery { + path: vec![TEST_LEAF.to_vec(), b"innertree".to_vec()], + query: SizedQuery { + query: Query { + items: vec![QueryItem::Key(b"ref".to_vec())], + default_subquery_branch: SubqueryBranch { + subquery_path: None, + subquery: None, + }, + conditional_subquery_branches: None, + left_to_right: true, + }, + limit: None, + offset: None, + }, + }; + let proof = db + .prove_query(&query, None, grove_version) + .unwrap() + .unwrap(); + + let (hash, _) = GroveDb::verify_query(&proof, &query, grove_version).unwrap(); + assert_eq!(hash, db.root_hash(None, grove_version).unwrap().unwrap()); + + // Update referenced value to break things + db.insert( + &[ANOTHER_TEST_LEAF.to_vec(), b"innertree2".to_vec()], + b"key3", + Element::Item(b"idk something else i guess?".to_vec(), None), + None, + None, + grove_version, + ) + .unwrap() + .unwrap(); + + let proof = db + .prove_query(&query, None, grove_version) + .unwrap() + .unwrap(); + + assert!(matches!( + GroveDb::verify_query(&proof, &query, grove_version), + Err(_) + )); + + // `verify_grovedb` must identify issues + assert!(db.verify_grovedb(None, true, grove_version).unwrap().len() > 0); + } }