diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 11b0cb6c..5f7db1f3 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -32,13 +32,6 @@ struct SubtreeStateSyncInfo<'db> { num_processed_chunks: usize, } -impl<'a> SubtreeStateSyncInfo<'a> { - // Function to create an instance of SubtreeStateSyncInfo with default values - pub fn new() -> Self { - Self::default() - } -} - // Struct governing state sync pub struct MultiStateSyncInfo<'db> { // Map of current processing subtrees @@ -46,6 +39,8 @@ pub struct MultiStateSyncInfo<'db> { current_prefixes: BTreeMap>, // Set of processed prefixes (Path digests) processed_prefixes: BTreeSet, + // Root app_hash + app_hash: [u8; 32], // Version of state sync protocol, version: u16, } @@ -55,6 +50,7 @@ impl<'db> Default for MultiStateSyncInfo<'db> { Self { current_prefixes: BTreeMap::new(), processed_prefixes: BTreeSet::new(), + app_hash: [0; 32], version: CURRENT_STATE_SYNC_VERSION, } } @@ -115,6 +111,7 @@ pub fn util_path_to_string(path: &[Vec]) -> Vec { // Splits the given global chunk id into [SUBTREE_PREFIX:CHUNK_ID] pub fn util_split_global_chunk_id( global_chunk_id: &[u8], + app_hash: &[u8], ) -> Result<(crate::SubtreePrefix, Vec), Error> { let chunk_prefix_length: usize = 32; if global_chunk_id.len() < chunk_prefix_length { @@ -123,6 +120,12 @@ pub fn util_split_global_chunk_id( )); } + if global_chunk_id == app_hash { + let array_of_zeros: [u8; 32] = [0; 32]; + let root_chunk_prefix_key: crate::SubtreePrefix = array_of_zeros; + return Ok((root_chunk_prefix_key, vec![])); + } + let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); let mut array = [0u8; 32]; array.copy_from_slice(chunk_prefix); @@ -253,22 +256,13 @@ impl GroveDb { )); } - let chunk_prefix_length: usize = 32; - if global_chunk_id.len() < chunk_prefix_length { - return Err(Error::CorruptedData( - "expected global chunk id of at least 32 length".to_string(), - )); - } - - let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); - - let mut array = [0u8; 32]; - array.copy_from_slice(chunk_prefix); - let chunk_prefix_key: crate::SubtreePrefix = array; + let root_app_hash = self.root_hash(tx).value?; + let (chunk_prefix, chunk_id) = + replication::util_split_global_chunk_id(global_chunk_id, &root_app_hash)?; let subtrees_metadata = self.get_subtrees_metadata(tx)?; - match subtrees_metadata.data.get(&chunk_prefix_key) { + match subtrees_metadata.data.get(&chunk_prefix) { Some(path_data) => { let subtree = &path_data.0; let subtree_path: Vec<&[u8]> = subtree.iter().map(|vec| vec.as_slice()).collect(); @@ -287,7 +281,7 @@ impl GroveDb { let chunk_producer_res = ChunkProducer::new(&merk); match chunk_producer_res { Ok(mut chunk_producer) => { - let chunk_res = chunk_producer.chunk(chunk_id); + let chunk_res = chunk_producer.chunk(&chunk_id); match chunk_res { Ok((chunk, _)) => match util_encode_vec_ops(chunk) { Ok(op_bytes) => Ok(op_bytes), @@ -317,7 +311,7 @@ impl GroveDb { let chunk_producer_res = ChunkProducer::new(&merk); match chunk_producer_res { Ok(mut chunk_producer) => { - let chunk_res = chunk_producer.chunk(chunk_id); + let chunk_res = chunk_producer.chunk(&chunk_id); match chunk_res { Ok((chunk, _)) => match util_encode_vec_ops(chunk) { Ok(op_bytes) => Ok(op_bytes), @@ -346,15 +340,14 @@ impl GroveDb { // state_sync_info: Consumed StateSyncInfo // app_hash: Snapshot's AppHash // tx: Transaction for the state sync - // Returns the first set of global chunk ids that can be fetched from sources (+ - // the StateSyncInfo transferring ownership back to the caller) + // Returns the StateSyncInfo transferring ownership back to the caller) pub fn start_snapshot_syncing<'db>( &'db self, mut state_sync_info: MultiStateSyncInfo<'db>, app_hash: CryptoHash, tx: &'db Transaction, version: u16, - ) -> Result<(Vec>, MultiStateSyncInfo), Error> { + ) -> Result { // For now, only CURRENT_STATE_SYNC_VERSION is supported if version != CURRENT_STATE_SYNC_VERSION { return Err(Error::CorruptedData( @@ -367,8 +360,6 @@ impl GroveDb { )); } - let mut res = vec![]; - if !state_sync_info.current_prefixes.is_empty() || !state_sync_info.processed_prefixes.is_empty() { @@ -391,26 +382,27 @@ impl GroveDb { state_sync_info .current_prefixes .insert(root_prefix, root_prefix_state_sync_info); - - res.push(root_prefix.to_vec()); + state_sync_info.app_hash = app_hash; } else { return Err(Error::InternalError("Unable to open merk for replication")); } - Ok((res, state_sync_info)) + Ok(state_sync_info) } // Apply a chunk (should be called by ABCI when ApplySnapshotChunk method is // called) Params: // state_sync_info: Consumed MultiStateSyncInfo - // chunk: (Global chunk id, Chunk proof operators encoded in bytes) + // global_chunk_id: Global chunk id + // chunk: Chunk proof operators encoded in bytes // tx: Transaction for the state sync // Returns the next set of global chunk ids that can be fetched from sources (+ // the MultiStateSyncInfo transferring ownership back to the caller) pub fn apply_chunk<'db>( &'db self, mut state_sync_info: MultiStateSyncInfo<'db>, - chunk: (&[u8], Vec), + global_chunk_id: &[u8], + chunk: Vec, tx: &'db Transaction, version: u16, ) -> Result<(Vec>, MultiStateSyncInfo), Error> { @@ -428,15 +420,15 @@ impl GroveDb { let mut next_chunk_ids = vec![]; - let (global_chunk_id, chunk_data) = chunk; - let (chunk_prefix, chunk_id) = replication::util_split_global_chunk_id(global_chunk_id)?; + let (chunk_prefix, chunk_id) = + replication::util_split_global_chunk_id(global_chunk_id, &state_sync_info.app_hash)?; if state_sync_info.current_prefixes.is_empty() { return Err(Error::InternalError("GroveDB is not in syncing mode")); } if let Some(subtree_state_sync) = state_sync_info.current_prefixes.remove(&chunk_prefix) { if let Ok((res, mut new_subtree_state_sync)) = - self.apply_inner_chunk(subtree_state_sync, &chunk_id, chunk_data) + self.apply_inner_chunk(subtree_state_sync, &chunk_id, chunk) { if !res.is_empty() { for local_chunk_id in res.iter() { diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index f3e09532..bfdc1782 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -245,15 +245,16 @@ fn sync_db_demo( target_tx: &Transaction, ) -> Result<(), grovedb::Error> { let app_hash = source_db.root_hash(None).value.unwrap(); - let (chunk_ids, mut state_sync_info) = target_db.start_snapshot_syncing(state_sync_info, app_hash, target_tx, CURRENT_STATE_SYNC_VERSION)?; + let mut state_sync_info = target_db.start_snapshot_syncing(state_sync_info, app_hash, target_tx, CURRENT_STATE_SYNC_VERSION)?; let mut chunk_queue : VecDeque> = VecDeque::new(); - chunk_queue.extend(chunk_ids); + // The very first chunk to fetch is always identified by the root app_hash + chunk_queue.push_back(app_hash.to_vec()); while let Some(chunk_id) = chunk_queue.pop_front() { let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?; - let (more_chunks, new_state_sync_info) = target_db.apply_chunk(state_sync_info, (chunk_id.as_slice(), ops), target_tx, CURRENT_STATE_SYNC_VERSION)?; + let (more_chunks, new_state_sync_info) = target_db.apply_chunk(state_sync_info, chunk_id.as_slice(), ops, target_tx, CURRENT_STATE_SYNC_VERSION)?; state_sync_info = new_state_sync_info; chunk_queue.extend(more_chunks); }