diff --git a/grovedb/src/lib.rs b/grovedb/src/lib.rs index fd11f10d..a68dc1b3 100644 --- a/grovedb/src/lib.rs +++ b/grovedb/src/lib.rs @@ -319,11 +319,11 @@ impl GroveDb { /// Opens a Merk at given path for with direct write access. Intended for /// replication purposes. - fn open_merk_for_replication<'db, 'b, B>( + fn open_merk_for_replication<'tx, 'db: 'tx, 'b, B>( &'db self, path: SubtreePath<'b, B>, - tx: &'db Transaction, - ) -> Result>, Error> + tx: &'tx Transaction<'db>, + ) -> Result>, Error> where B: AsRef<[u8]> + 'b, { diff --git a/grovedb/src/replication.rs b/grovedb/src/replication.rs index 11b0cb6c..93c42a29 100644 --- a/grovedb/src/replication.rs +++ b/grovedb/src/replication.rs @@ -1,170 +1,40 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - fmt, -}; +mod state_sync_session; + +use std::pin::Pin; use grovedb_merk::{ ed::Encode, - merk::restore::Restorer, proofs::{Decoder, Op}, tree::{hash::CryptoHash, kv::ValueDefinedCostType, value_hash}, ChunkProducer, }; use grovedb_path::SubtreePath; use grovedb_storage::rocksdb_storage::RocksDbStorage; -#[rustfmt::skip] -use grovedb_storage::rocksdb_storage::storage_context::context_immediate::PrefixedRocksDbImmediateStorageContext; -use crate::{replication, Error, GroveDb, Transaction, TransactionArg}; - -pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; +pub use self::state_sync_session::MultiStateSyncSession; +use self::state_sync_session::SubtreesMetadata; +use crate::{Error, GroveDb, TransactionArg}; pub const CURRENT_STATE_SYNC_VERSION: u16 = 1; -#[derive(Default)] -struct SubtreeStateSyncInfo<'db> { - // Current Chunk restorer - restorer: Option>>, - // Set of global chunk ids requested to be fetched and pending for processing. For the - // description of global chunk id check fetch_chunk(). - pending_chunks: BTreeSet>, - // Number of processed chunks in current prefix (Path digest) - num_processed_chunks: usize, -} - -impl<'a> SubtreeStateSyncInfo<'a> { - // Function to create an instance of SubtreeStateSyncInfo with default values - pub fn new() -> Self { - Self::default() - } -} - -// Struct governing state sync -pub struct MultiStateSyncInfo<'db> { - // Map of current processing subtrees - // SubtreePrefix (Path digest) -> SubtreeStateSyncInfo - current_prefixes: BTreeMap>, - // Set of processed prefixes (Path digests) - processed_prefixes: BTreeSet, - // Version of state sync protocol, - version: u16, -} - -impl<'db> Default for MultiStateSyncInfo<'db> { - fn default() -> Self { - Self { - current_prefixes: BTreeMap::new(), - processed_prefixes: BTreeSet::new(), - version: CURRENT_STATE_SYNC_VERSION, - } - } -} - -// Struct containing information about current subtrees found in GroveDB -pub struct SubtreesMetadata { - // Map of Prefix (Path digest) -> (Actual path, Parent Subtree actual_value_hash, Parent - // Subtree elem_value_hash) Note: Parent Subtree actual_value_hash, Parent Subtree - // elem_value_hash are needed when verifying the new constructed subtree after wards. - pub data: BTreeMap>, CryptoHash, CryptoHash)>, -} - -impl SubtreesMetadata { - pub fn new() -> SubtreesMetadata { - SubtreesMetadata { - data: BTreeMap::new(), - } - } -} - -impl Default for SubtreesMetadata { - fn default() -> Self { - Self::new() - } -} - -impl fmt::Debug for SubtreesMetadata { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - for (prefix, metadata) in self.data.iter() { - let metadata_path = &metadata.0; - let metadata_path_str = util_path_to_string(metadata_path); - writeln!( - f, - " prefix:{:?} -> path:{:?}", - hex::encode(prefix), - metadata_path_str - ); - } - Ok(()) - } -} - -// Converts a path into a human-readable string (for debugging) -pub fn util_path_to_string(path: &[Vec]) -> Vec { - let mut subtree_path_str: Vec = vec![]; - for subtree in path { - let string = std::str::from_utf8(subtree).expect("should be able to convert path"); - subtree_path_str.push( - string - .parse() - .expect("should be able to parse path to string"), - ); - } - subtree_path_str -} - -// Splits the given global chunk id into [SUBTREE_PREFIX:CHUNK_ID] -pub fn util_split_global_chunk_id( - global_chunk_id: &[u8], -) -> Result<(crate::SubtreePrefix, Vec), Error> { - let chunk_prefix_length: usize = 32; - if global_chunk_id.len() < chunk_prefix_length { - return Err(Error::CorruptedData( - "expected global chunk id of at least 32 length".to_string(), - )); - } - - let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); - let mut array = [0u8; 32]; - array.copy_from_slice(chunk_prefix); - let chunk_prefix_key: crate::SubtreePrefix = array; - Ok((chunk_prefix_key, chunk_id.to_vec())) -} - -pub fn util_encode_vec_ops(chunk: Vec) -> Result, Error> { - let mut res = vec![]; - for op in chunk { - op.encode_into(&mut res) - .map_err(|e| Error::CorruptedData(format!("unable to encode chunk: {}", e)))?; +#[cfg(feature = "full")] +impl GroveDb { + pub fn start_syncing_session(&self) -> Pin> { + MultiStateSyncSession::new(self.start_transaction()) } - Ok(res) -} -pub fn util_decode_vec_ops(chunk: Vec) -> Result, Error> { - let decoder = Decoder::new(&chunk); - let mut res = vec![]; - for op in decoder { - match op { - Ok(op) => res.push(op), - Err(e) => { - return Err(Error::CorruptedData(format!( - "unable to decode chunk: {}", - e - ))); - } - } + pub fn commit_session(&self, session: Pin>) { + // we do not care about the cost + let _ = self.commit_transaction(session.into_transaction()); } - Ok(res) -} -#[cfg(feature = "full")] -impl GroveDb { // Returns the discovered subtrees found recursively along with their associated // metadata Params: // tx: Transaction. Function returns the data by opening merks at given tx. // TODO: Add a SubTreePath as param and start searching from that path instead // of root (as it is now) pub fn get_subtrees_metadata(&self, tx: TransactionArg) -> Result { - let mut subtrees_metadata = crate::replication::SubtreesMetadata::new(); + let mut subtrees_metadata = SubtreesMetadata::new(); let subtrees_root = self.find_subtrees(&SubtreePath::empty(), tx).value?; for subtree in subtrees_root.into_iter() { @@ -341,271 +211,87 @@ impl GroveDb { } } - // Starts a state sync process (should be called by ABCI when OfferSnapshot - // method is called) Params: - // state_sync_info: Consumed StateSyncInfo - // app_hash: Snapshot's AppHash - // tx: Transaction for the state sync - // Returns the first set of global chunk ids that can be fetched from sources (+ - // the StateSyncInfo transferring ownership back to the caller) + /// Starts a state sync process of a snapshot with `app_hash` root hash, + /// should be called by ABCI when OfferSnapshot method is called. + /// Returns the first set of global chunk ids that can be fetched from + /// sources and a new sync session. pub fn start_snapshot_syncing<'db>( &'db self, - mut state_sync_info: MultiStateSyncInfo<'db>, app_hash: CryptoHash, - tx: &'db Transaction, version: u16, - ) -> Result<(Vec>, MultiStateSyncInfo), Error> { + ) -> Result<(Vec>, Pin>>), Error> { // For now, only CURRENT_STATE_SYNC_VERSION is supported if version != CURRENT_STATE_SYNC_VERSION { return Err(Error::CorruptedData( "Unsupported state sync protocol version".to_string(), )); } - if version != state_sync_info.version { - return Err(Error::CorruptedData( - "Unsupported state sync protocol version".to_string(), - )); - } - - let mut res = vec![]; - - if !state_sync_info.current_prefixes.is_empty() - || !state_sync_info.processed_prefixes.is_empty() - { - return Err(Error::InternalError( - "GroveDB has already started a snapshot syncing", - )); - } - println!( - " starting:{:?}...", - replication::util_path_to_string(&[]) - ); + println!(" starting:{:?}...", util_path_to_string(&[])); - let mut root_prefix_state_sync_info = SubtreeStateSyncInfo::default(); let root_prefix = [0u8; 32]; - if let Ok(merk) = self.open_merk_for_replication(SubtreePath::empty(), tx) { - let restorer = Restorer::new(merk, app_hash, None); - root_prefix_state_sync_info.restorer = Some(restorer); - root_prefix_state_sync_info.pending_chunks.insert(vec![]); - state_sync_info - .current_prefixes - .insert(root_prefix, root_prefix_state_sync_info); - res.push(root_prefix.to_vec()); - } else { - return Err(Error::InternalError("Unable to open merk for replication")); - } + let mut session = self.start_syncing_session(); + session.add_subtree_sync_info(self, SubtreePath::empty(), app_hash, None, root_prefix)?; - Ok((res, state_sync_info)) + Ok((vec![root_prefix.to_vec()], session)) } +} - // Apply a chunk (should be called by ABCI when ApplySnapshotChunk method is - // called) Params: - // state_sync_info: Consumed MultiStateSyncInfo - // chunk: (Global chunk id, Chunk proof operators encoded in bytes) - // tx: Transaction for the state sync - // Returns the next set of global chunk ids that can be fetched from sources (+ - // the MultiStateSyncInfo transferring ownership back to the caller) - pub fn apply_chunk<'db>( - &'db self, - mut state_sync_info: MultiStateSyncInfo<'db>, - chunk: (&[u8], Vec), - tx: &'db Transaction, - version: u16, - ) -> Result<(Vec>, MultiStateSyncInfo), Error> { - // For now, only CURRENT_STATE_SYNC_VERSION is supported - if version != CURRENT_STATE_SYNC_VERSION { - return Err(Error::CorruptedData( - "Unsupported state sync protocol version".to_string(), - )); - } - if version != state_sync_info.version { - return Err(Error::CorruptedData( - "Unsupported state sync protocol version".to_string(), - )); - } - - let mut next_chunk_ids = vec![]; - - let (global_chunk_id, chunk_data) = chunk; - let (chunk_prefix, chunk_id) = replication::util_split_global_chunk_id(global_chunk_id)?; - - if state_sync_info.current_prefixes.is_empty() { - return Err(Error::InternalError("GroveDB is not in syncing mode")); - } - if let Some(subtree_state_sync) = state_sync_info.current_prefixes.remove(&chunk_prefix) { - if let Ok((res, mut new_subtree_state_sync)) = - self.apply_inner_chunk(subtree_state_sync, &chunk_id, chunk_data) - { - if !res.is_empty() { - for local_chunk_id in res.iter() { - let mut next_global_chunk_id = chunk_prefix.to_vec(); - next_global_chunk_id.extend(local_chunk_id.to_vec()); - next_chunk_ids.push(next_global_chunk_id); - } - - // re-insert subtree_state_sync in state_sync_info - state_sync_info - .current_prefixes - .insert(chunk_prefix, new_subtree_state_sync); - Ok((next_chunk_ids, state_sync_info)) - } else { - if !new_subtree_state_sync.pending_chunks.is_empty() { - // re-insert subtree_state_sync in state_sync_info - state_sync_info - .current_prefixes - .insert(chunk_prefix, new_subtree_state_sync); - return Ok((vec![], state_sync_info)); - } - - // Subtree is finished. We can save it. - match new_subtree_state_sync.restorer.take() { - None => Err(Error::InternalError("Unable to finalize subtree")), - Some(restorer) => { - if (new_subtree_state_sync.num_processed_chunks > 0) - && (restorer.finalize().is_err()) - { - return Err(Error::InternalError("Unable to finalize Merk")); - } - state_sync_info.processed_prefixes.insert(chunk_prefix); - - // Subtree was successfully save. Time to discover new subtrees that - // need to be processed - let subtrees_metadata = self.get_subtrees_metadata(Some(tx))?; - if let Some(value) = subtrees_metadata.data.get(&chunk_prefix) { - println!( - " path:{:?} done (num_processed_chunks:{:?})", - replication::util_path_to_string(&value.0), - new_subtree_state_sync.num_processed_chunks - ); - } - - if let Ok((res, new_state_sync_info)) = - self.discover_subtrees(state_sync_info, subtrees_metadata, tx) - { - next_chunk_ids.extend(res); - Ok((next_chunk_ids, new_state_sync_info)) - } else { - Err(Error::InternalError("Unable to discover Subtrees")) - } - } - } - } - } else { - Err(Error::InternalError("Unable to process incoming chunk")) - } - } else { - Err(Error::InternalError("Invalid incoming prefix")) - } +// Converts a path into a human-readable string (for debugging) +pub fn util_path_to_string(path: &[Vec]) -> Vec { + let mut subtree_path_str: Vec = vec![]; + for subtree in path { + let string = std::str::from_utf8(subtree).expect("should be able to convert path"); + subtree_path_str.push( + string + .parse() + .expect("should be able to parse path to string"), + ); } + subtree_path_str +} - // Apply a chunk using the given SubtreeStateSyncInfo - // state_sync_info: Consumed SubtreeStateSyncInfo - // chunk_id: Local chunk id - // chunk_data: Chunk proof operators encoded in bytes - // Returns the next set of global chunk ids that can be fetched from sources (+ - // the SubtreeStateSyncInfo transferring ownership back to the caller) - fn apply_inner_chunk<'db>( - &'db self, - mut state_sync_info: SubtreeStateSyncInfo<'db>, - chunk_id: &[u8], - chunk_data: Vec, - ) -> Result<(Vec>, SubtreeStateSyncInfo), Error> { - let mut res = vec![]; - - match &mut state_sync_info.restorer { - Some(restorer) => { - if !state_sync_info.pending_chunks.contains(chunk_id) { - return Err(Error::InternalError( - "Incoming global_chunk_id not expected", - )); - } - state_sync_info.pending_chunks.remove(chunk_id); - if !chunk_data.is_empty() { - match util_decode_vec_ops(chunk_data) { - Ok(ops) => { - match restorer.process_chunk(chunk_id, ops) { - Ok(next_chunk_ids) => { - state_sync_info.num_processed_chunks += 1; - for next_chunk_id in next_chunk_ids { - state_sync_info - .pending_chunks - .insert(next_chunk_id.clone()); - res.push(next_chunk_id); - } - } - _ => { - return Err(Error::InternalError( - "Unable to process incoming chunk", - )); - } - }; - } - Err(_) => { - return Err(Error::CorruptedData( - "Unable to decode incoming chunk".to_string(), - )); - } - } - } - } - _ => { - return Err(Error::InternalError("Invalid internal state (restorer")); - } - } - - Ok((res, state_sync_info)) +// Splits the given global chunk id into [SUBTREE_PREFIX:CHUNK_ID] +pub fn util_split_global_chunk_id( + global_chunk_id: &[u8], +) -> Result<(crate::SubtreePrefix, Vec), Error> { + let chunk_prefix_length: usize = 32; + if global_chunk_id.len() < chunk_prefix_length { + return Err(Error::CorruptedData( + "expected global chunk id of at least 32 length".to_string(), + )); } - // Prepares SubtreeStateSyncInfos for the freshly discovered subtrees in - // subtrees_metadata and returns the root global chunk ids for all of those - // new subtrees. state_sync_info: Consumed MultiStateSyncInfo - // subtrees_metadata: Metadata about discovered subtrees - // chunk_data: Chunk proof operators - // Returns the next set of global chunk ids that can be fetched from sources (+ - // the MultiStateSyncInfo transferring ownership back to the caller) - fn discover_subtrees<'db>( - &'db self, - mut state_sync_info: MultiStateSyncInfo<'db>, - subtrees_metadata: SubtreesMetadata, - tx: &'db Transaction, - ) -> Result<(Vec>, MultiStateSyncInfo), Error> { - let mut res = vec![]; - - for (prefix, prefix_metadata) in &subtrees_metadata.data { - if !state_sync_info.processed_prefixes.contains(prefix) - && !state_sync_info.current_prefixes.contains_key(prefix) - { - let (current_path, s_actual_value_hash, s_elem_value_hash) = &prefix_metadata; - - let subtree_path: Vec<&[u8]> = - current_path.iter().map(|vec| vec.as_slice()).collect(); - let path: &[&[u8]] = &subtree_path; - println!( - " path:{:?} starting...", - replication::util_path_to_string(&prefix_metadata.0) - ); - - let mut subtree_state_sync_info = SubtreeStateSyncInfo::default(); - if let Ok(merk) = self.open_merk_for_replication(path.into(), tx) { - let restorer = - Restorer::new(merk, *s_elem_value_hash, Some(*s_actual_value_hash)); - subtree_state_sync_info.restorer = Some(restorer); - subtree_state_sync_info.pending_chunks.insert(vec![]); + let (chunk_prefix, chunk_id) = global_chunk_id.split_at(chunk_prefix_length); + let mut array = [0u8; 32]; + array.copy_from_slice(chunk_prefix); + let chunk_prefix_key: crate::SubtreePrefix = array; + Ok((chunk_prefix_key, chunk_id.to_vec())) +} - state_sync_info - .current_prefixes - .insert(*prefix, subtree_state_sync_info); +pub fn util_encode_vec_ops(chunk: Vec) -> Result, Error> { + let mut res = vec![]; + for op in chunk { + op.encode_into(&mut res) + .map_err(|e| Error::CorruptedData(format!("unable to encode chunk: {}", e)))?; + } + Ok(res) +} - let root_chunk_prefix = prefix.to_vec(); - res.push(root_chunk_prefix.to_vec()); - } else { - return Err(Error::InternalError("Unable to open Merk for replication")); - } +pub fn util_decode_vec_ops(chunk: Vec) -> Result, Error> { + let decoder = Decoder::new(&chunk); + let mut res = vec![]; + for op in decoder { + match op { + Ok(op) => res.push(op), + Err(e) => { + return Err(Error::CorruptedData(format!( + "unable to decode chunk: {}", + e + ))); } } - - Ok((res, state_sync_info)) } + Ok(res) } diff --git a/grovedb/src/replication/state_sync_session.rs b/grovedb/src/replication/state_sync_session.rs new file mode 100644 index 00000000..bfc12b51 --- /dev/null +++ b/grovedb/src/replication/state_sync_session.rs @@ -0,0 +1,377 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + fmt, + marker::PhantomPinned, + pin::Pin, +}; + +use grovedb_merk::{CryptoHash, Restorer}; +use grovedb_path::SubtreePath; +use grovedb_storage::rocksdb_storage::PrefixedRocksDbImmediateStorageContext; + +use super::{util_decode_vec_ops, util_split_global_chunk_id, CURRENT_STATE_SYNC_VERSION}; +use crate::{replication::util_path_to_string, Error, GroveDb, Transaction}; + +pub(crate) type SubtreePrefix = [u8; blake3::OUT_LEN]; + +struct SubtreeStateSyncInfo<'db> { + /// Current Chunk restorer + restorer: Restorer>, + /// Set of global chunk ids requested to be fetched and pending for + /// processing. For the description of global chunk id check + /// fetch_chunk(). + pending_chunks: BTreeSet>, + /// Number of processed chunks in current prefix (Path digest) + num_processed_chunks: usize, +} + +impl<'db> SubtreeStateSyncInfo<'db> { + // Apply a chunk using the given SubtreeStateSyncInfo + // state_sync_info: Consumed SubtreeStateSyncInfo + // chunk_id: Local chunk id + // chunk_data: Chunk proof operators encoded in bytes + // Returns the next set of global chunk ids that can be fetched from sources (+ + // the SubtreeStateSyncInfo transferring ownership back to the caller) + fn apply_inner_chunk( + &mut self, + chunk_id: &[u8], + chunk_data: Vec, + ) -> Result>, Error> { + let mut res = vec![]; + + if !self.pending_chunks.contains(chunk_id) { + return Err(Error::InternalError( + "Incoming global_chunk_id not expected", + )); + } + self.pending_chunks.remove(chunk_id); + if !chunk_data.is_empty() { + match util_decode_vec_ops(chunk_data) { + Ok(ops) => { + match self.restorer.process_chunk(chunk_id, ops) { + Ok(next_chunk_ids) => { + self.num_processed_chunks += 1; + for next_chunk_id in next_chunk_ids { + self.pending_chunks.insert(next_chunk_id.clone()); + res.push(next_chunk_id); + } + } + _ => { + return Err(Error::InternalError("Unable to process incoming chunk")); + } + }; + } + Err(_) => { + return Err(Error::CorruptedData( + "Unable to decode incoming chunk".to_string(), + )); + } + } + } + + Ok(res) + } +} + +impl<'tx> SubtreeStateSyncInfo<'tx> { + pub fn new(restorer: Restorer>) -> Self { + SubtreeStateSyncInfo { + restorer, + pending_chunks: Default::default(), + num_processed_chunks: 0, + } + } +} + +// Struct governing state sync +pub struct MultiStateSyncSession<'db> { + // Map of current processing subtrees + // SubtreePrefix (Path digest) -> SubtreeStateSyncInfo + current_prefixes: BTreeMap>, + // Set of processed prefixes (Path digests) + processed_prefixes: BTreeSet, + // Version of state sync protocol, + pub(crate) version: u16, + // Transaction goes last to be dropped last as well + transaction: Transaction<'db>, + _pin: PhantomPinned, +} + +impl<'db> MultiStateSyncSession<'db> { + /// Initializes a new state sync session. + pub fn new(transaction: Transaction<'db>) -> Pin> { + Box::pin(MultiStateSyncSession { + transaction, + current_prefixes: Default::default(), + processed_prefixes: Default::default(), + version: CURRENT_STATE_SYNC_VERSION, + _pin: PhantomPinned, + }) + } + + pub fn is_empty(&self) -> bool { + self.current_prefixes.is_empty() + } + + pub fn is_sync_completed(&self) -> bool { + for (_, subtree_state_info) in self.current_prefixes.iter() { + if !subtree_state_info.pending_chunks.is_empty() { + return false; + } + } + return true; + } + + pub fn into_transaction(self: Pin>) -> Transaction<'db> { + // SAFETY: the struct isn't used anymore and no one will refer to transaction + // address again + unsafe { Pin::into_inner_unchecked(self) }.transaction + } + + pub fn add_subtree_sync_info<'b, B: AsRef<[u8]>>( + self: &mut Pin>>, + db: &'db GroveDb, + path: SubtreePath<'b, B>, + hash: CryptoHash, + actual_hash: Option, + chunk_prefix: [u8; 32], + ) -> Result<(), Error> { + // SAFETY: we get an immutable reference of a transaction that stays behind + // `Pin` so this reference shall remain valid for the whole session + // object lifetime. + let transaction_ref: &'db Transaction<'db> = unsafe { + let tx: &mut Transaction<'db> = + &mut Pin::into_inner_unchecked(self.as_mut()).transaction; + &*(tx as *mut _) + }; + + if let Ok(merk) = db.open_merk_for_replication(path, transaction_ref) { + let restorer = Restorer::new(merk, hash, actual_hash); + let mut sync_info = SubtreeStateSyncInfo::new(restorer); + sync_info.pending_chunks.insert(vec![]); + self.as_mut() + .current_prefixes() + .insert(chunk_prefix, sync_info); + Ok(()) + } else { + Err(Error::InternalError("Unable to open merk for replication")) + } + } + + fn current_prefixes( + self: Pin<&mut MultiStateSyncSession<'db>>, + ) -> &mut BTreeMap> { + // SAFETY: no memory-sensitive assumptions are made about fields except the + // `transaciton` so it will be safe to modify them + &mut unsafe { self.get_unchecked_mut() }.current_prefixes + } + + fn processed_prefixes( + self: Pin<&mut MultiStateSyncSession<'db>>, + ) -> &mut BTreeSet { + // SAFETY: no memory-sensitive assumptions are made about fields except the + // `transaciton` so it will be safe to modify them + &mut unsafe { self.get_unchecked_mut() }.processed_prefixes + } + + /// Applies a chunk, shuold be called by ABCI when `ApplySnapshotChunk` + /// method is called. `chunk` is a pair of global chunk id and an + /// encoded proof. + pub fn apply_chunk( + self: &mut Pin>>, + db: &'db GroveDb, + chunk: (&[u8], Vec), + version: u16, + ) -> Result>, Error> { + // For now, only CURRENT_STATE_SYNC_VERSION is supported + if version != CURRENT_STATE_SYNC_VERSION { + return Err(Error::CorruptedData( + "Unsupported state sync protocol version".to_string(), + )); + } + if version != self.version { + return Err(Error::CorruptedData( + "Unsupported state sync protocol version".to_string(), + )); + } + + let mut next_chunk_ids = vec![]; + + let (global_chunk_id, chunk_data) = chunk; + let (chunk_prefix, chunk_id) = util_split_global_chunk_id(global_chunk_id)?; + + if self.is_empty() { + return Err(Error::InternalError("GroveDB is not in syncing mode")); + } + + let current_prefixes = self.as_mut().current_prefixes(); + let Some(subtree_state_sync) = current_prefixes.get_mut(&chunk_prefix) else { + return Err(Error::InternalError("Unable to process incoming chunk")); + }; + let Ok(res) = subtree_state_sync.apply_inner_chunk(&chunk_id, chunk_data) else { + return Err(Error::InternalError("Invalid incoming prefix")); + }; + + if !res.is_empty() { + for local_chunk_id in res.iter() { + let mut next_global_chunk_id = chunk_prefix.to_vec(); + next_global_chunk_id.extend(local_chunk_id.to_vec()); + next_chunk_ids.push(next_global_chunk_id); + } + + Ok(next_chunk_ids) + } else { + if !subtree_state_sync.pending_chunks.is_empty() { + return Ok(vec![]); + } + + // Subtree is finished. We can save it. + if (subtree_state_sync.num_processed_chunks > 0) + && (current_prefixes + .remove(&chunk_prefix) + .expect("prefix exists") + .restorer + .finalize() + .is_err()) + { + return Err(Error::InternalError("Unable to finalize Merk")); + } + self.as_mut().processed_prefixes().insert(chunk_prefix); + + // // Subtree was successfully save. Time to discover new subtrees that + // // need to be processed + // if let Some(value) = subtrees_metadata.data.get(&chunk_prefix) { + // println!( + // " path:{:?} done (num_processed_chunks:{:?})", + // util_path_to_string(&value.0), + // subtree_state_sync.num_processed_chunks + // ); + // } + + if let Ok(res) = self.discover_subtrees(db) { + next_chunk_ids.extend(res); + Ok(next_chunk_ids) + } else { + Err(Error::InternalError("Unable to discover Subtrees")) + } + } + } + + /// Prepares sync session for the freshly discovered subtrees and returns + /// global chunk ids of those new subtrees. + fn discover_subtrees( + self: &mut Pin>>, + db: &'db GroveDb, + ) -> Result>, Error> { + let subtrees_metadata = db.get_subtrees_metadata(Some(&self.transaction))?; + + let mut res = vec![]; + + for (prefix, prefix_metadata) in &subtrees_metadata.data { + if !self.processed_prefixes.contains(prefix) + && !self.current_prefixes.contains_key(prefix) + { + let (current_path, actual_value_hash, elem_value_hash) = &prefix_metadata; + + let subtree_path: Vec<&[u8]> = + current_path.iter().map(|vec| vec.as_slice()).collect(); + let path: &[&[u8]] = &subtree_path; + println!( + " path:{:?} starting...", + util_path_to_string(&prefix_metadata.0) + ); + + self.add_subtree_sync_info( + db, + path.into(), + elem_value_hash.clone(), + Some(actual_value_hash.clone()), + prefix.clone(), + )?; + res.push(prefix.to_vec()); + } + } + + Ok(res) + } +} + +// impl<'db> Default for MultiStateSyncInfo<'db> { +// fn default() -> Self { +// Self { +// current_prefixes: BTreeMap::new(), +// processed_prefixes: BTreeSet::new(), +// version: CURRENT_STATE_SYNC_VERSION, +// } +// } +// } + +// fn lol(db: &GroveDb) -> MultiStateSyncSession { +// let mut sync = MultiStateSyncSession { +// transaction: db.start_transaction(), +// current_prefixes: Default::default(), +// processed_prefixes: Default::default(), +// version: 0, +// }; + +// sync.current_prefixes.insert( +// b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_owned(), +// SubtreeStateSyncInfo { +// restorer: Some(Restorer::new( +// db.open_merk_for_replication(SubtreePath::empty(), +// &sync.transaction) .unwrap(), +// b"11111111111111111111111111111111".to_owned(), +// None, +// )), +// pending_chunks: Default::default(), +// num_processed_chunks: 0, +// }, +// ); + +// let ass: Option<&mut SubtreeStateSyncInfo> = +// sync.current_prefixes.values_mut().next(); + +// let ass2: &mut SubtreeStateSyncInfo = ass.unwrap(); + +// ass2.apply_inner_chunk(b"a", vec![]).unwrap(); + +// sync +// } + +// Struct containing information about current subtrees found in GroveDB +pub struct SubtreesMetadata { + // Map of Prefix (Path digest) -> (Actual path, Parent Subtree actual_value_hash, Parent + // Subtree elem_value_hash) Note: Parent Subtree actual_value_hash, Parent Subtree + // elem_value_hash are needed when verifying the new constructed subtree after wards. + pub data: BTreeMap>, CryptoHash, CryptoHash)>, +} + +impl SubtreesMetadata { + pub fn new() -> SubtreesMetadata { + SubtreesMetadata { + data: BTreeMap::new(), + } + } +} + +impl Default for SubtreesMetadata { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Debug for SubtreesMetadata { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + for (prefix, metadata) in self.data.iter() { + let metadata_path = &metadata.0; + let metadata_path_str = util_path_to_string(metadata_path); + writeln!( + f, + " prefix:{:?} -> path:{:?}", + hex::encode(prefix), + metadata_path_str + )?; + } + Ok(()) + } +} diff --git a/tutorials/src/bin/replication.rs b/tutorials/src/bin/replication.rs index f3e09532..440cb64e 100644 --- a/tutorials/src/bin/replication.rs +++ b/tutorials/src/bin/replication.rs @@ -5,7 +5,7 @@ use grovedb::reference_path::ReferencePathType; use rand::{distributions::Alphanumeric, Rng, }; use grovedb::element::SumValue; use grovedb::replication::CURRENT_STATE_SYNC_VERSION; -use grovedb::replication::MultiStateSyncInfo; +use grovedb::replication::MultiStateSyncSession; const MAIN_ΚΕΥ: &[u8] = b"key_main"; const MAIN_ΚΕΥ_EMPTY: &[u8] = b"key_main_empty"; @@ -101,10 +101,7 @@ fn main() { println!("{:?}", subtrees_metadata_source); println!("\n######### db_checkpoint_0 -> db_destination state sync"); - let state_info = MultiStateSyncInfo::default(); - let tx = db_destination.start_transaction(); - sync_db_demo(&db_checkpoint_0, &db_destination, state_info, &tx).unwrap(); - db_destination.commit_transaction(tx).unwrap().expect("expected to commit transaction"); + sync_db_demo(&db_checkpoint_0, &db_destination, /*&mut state_sync_session*/).unwrap(); println!("\n######### verify db_destination"); let incorrect_hashes = db_destination.verify_grovedb(None).unwrap(); @@ -241,11 +238,9 @@ fn query_db(db: &GroveDb, path: &[&[u8]], key: Vec) { fn sync_db_demo( source_db: &GroveDb, target_db: &GroveDb, - state_sync_info: MultiStateSyncInfo, - target_tx: &Transaction, ) -> Result<(), grovedb::Error> { let app_hash = source_db.root_hash(None).value.unwrap(); - let (chunk_ids, mut state_sync_info) = target_db.start_snapshot_syncing(state_sync_info, app_hash, target_tx, CURRENT_STATE_SYNC_VERSION)?; + let (chunk_ids, mut session) = target_db.start_snapshot_syncing(app_hash, CURRENT_STATE_SYNC_VERSION)?; let mut chunk_queue : VecDeque> = VecDeque::new(); @@ -253,11 +248,14 @@ fn sync_db_demo( while let Some(chunk_id) = chunk_queue.pop_front() { let ops = source_db.fetch_chunk(chunk_id.as_slice(), None, CURRENT_STATE_SYNC_VERSION)?; - let (more_chunks, new_state_sync_info) = target_db.apply_chunk(state_sync_info, (chunk_id.as_slice(), ops), target_tx, CURRENT_STATE_SYNC_VERSION)?; - state_sync_info = new_state_sync_info; + let more_chunks = session.apply_chunk(&target_db, (chunk_id.as_slice(), ops), CURRENT_STATE_SYNC_VERSION)?; chunk_queue.extend(more_chunks); } + if session.is_sync_completed() { + target_db.commit_session(session); + } + Ok(()) }