From eb8998c7b4eaa0766917b4abac151a9f1740a804 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Tue, 1 Aug 2023 13:34:08 +0800 Subject: [PATCH] a Signed-off-by: CalvinNeo --- .../src/core/forward_raft/command.rs | 4 +- .../src/core/forward_raft/snapshot.rs | 62 +++++++++++++++++-- .../engine_store_ffi/src/core/forwarder.rs | 11 ++-- .../engine_store_ffi/src/observer.rs | 4 ++ .../mock_store/mock_engine_store_server.rs | 2 + .../src/engine_store_helper_impls.rs | 12 ++++ proxy_components/proxy_ffi/src/interfaces.rs | 16 ++++- .../ffi/src/RaftStoreProxyFFI/@version | 2 +- .../ffi/src/RaftStoreProxyFFI/ProxyFFI.h | 4 ++ 9 files changed, 106 insertions(+), 11 deletions(-) diff --git a/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs b/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs index 2e0024f4dbb..7eec45e0e67 100644 --- a/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs +++ b/proxy_components/engine_store_ffi/src/core/forward_raft/command.rs @@ -229,6 +229,7 @@ impl ProxyForwarder { "term" => cmd.term, "index" => cmd.index, "type" => ?cmd_type, + "region_epoch" => ?ob_region.get_region_epoch(), ); } _ => { @@ -238,7 +239,8 @@ impl ProxyForwarder { "peer_id" => region_state.peer_id, "term" => cmd.term, "index" => cmd.index, - "command" => ?request + "command" => ?request, + "region_epoch" => ?ob_region.get_region_epoch(), ); } } diff --git a/proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs b/proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs index a9b207b09c4..ff919773785 100644 --- a/proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs +++ b/proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs @@ -170,14 +170,14 @@ impl ProxyForwarder { match self.apply_snap_pool.as_ref() { Some(p) => { let (sender, receiver) = mpsc::channel(); - let task = Arc::new(PrehandleTask::new(receiver, peer_id, snap_key.idx)); + let task = Arc::new(PrehandleTask::new(receiver, peer_id, snap_key.clone())); { let mut lock = match self.pre_handle_snapshot_ctx.lock() { Ok(l) => l, Err(_) => fatal!("pre_apply_snapshot poisoned"), }; let ctx = lock.deref_mut(); - if let Some(o) = ctx.tracer.insert(snap_key.clone(), task.clone()) { + if let Some(o) = ctx.tracer.insert(snap_key.region_id, task.clone()) { let _prev = self .engine .proxy_ext @@ -189,7 +189,7 @@ impl ProxyForwarder { "snap_key" => ?snap_key, "pending" => self.engine.proxy_ext.pending_applies_count.load(Ordering::SeqCst), "new_snapshot" => ?snap, - "old_snapshot_index" => o.snapshot_index, + "old_snapshot_index" => o.snap_key.idx, ); // TODO elegantly stop the previous task. drop(o); @@ -307,12 +307,21 @@ impl ProxyForwarder { Err(_) => fatal!("post_apply_snapshot poisoned"), }; let ctx = lock.deref_mut(); - ctx.tracer.remove(snap_key) + ctx.tracer.remove(®ion_id) }; let post_apply_start = tikv_util::time::Instant::now(); let need_retry = match maybe_prehandle_task { Some(t) => { + if &t.snap_key != snap_key { + panic!( + "mismatched prehandled snapshot [region_id={}] [peer_id={}] [snap_key={:?}] [snap_key={:?}]", + ob_region.get_id(), + peer_id, + snap_key, + t.snap_key, + ); + } let need_retry = match t.recv.recv() { Ok(snap_ptr) => { info!("get prehandled snapshot success"; @@ -407,4 +416,49 @@ impl ProxyForwarder { pub fn should_pre_apply_snapshot(&self) -> bool { true } + + pub fn cancel_apply_snapshot(&self, region_id: u64, peer_id: u64) { + info!("start cancel apply snapshot"; + "peer_id" => peer_id, + "region_id" => region_id, + ); + // Notify TiFlash to stop pre handling snapshot. No blocking wait. + self.engine_store_server_helper + .abort_pre_handle_snapshot(region_id, peer_id); + let maybe_prehandle_task = { + let mut lock = match self.pre_handle_snapshot_ctx.lock() { + Ok(l) => l, + Err(_) => fatal!("cancel_apply_snapshot poisoned"), + }; + let ctx = lock.deref_mut(); + ctx.tracer.remove(®ion_id) + }; + if let Some(t) = maybe_prehandle_task { + info!("cancel apply snapshot find no prehandle task"; + "peer_id" => peer_id, + "region_id" => region_id, + ); + match t.recv.recv() { + Ok(f) => { + info!("cancel apply snapshot start cancel pre handled snapshot"; + "peer_id" => peer_id, + "region_id" => region_id, + ); + self.engine_store_server_helper + .release_pre_handled_snapshot(f.0); + } + Err(e) => { + info!("cancel apply snapshot find error in prehandle task {:?}", e; + "peer_id" => peer_id, + "region_id" => region_id, + ); + } + } + } else { + info!("cancel apply snapshot find no prehandle task"; + "peer_id" => peer_id, + "region_id" => region_id, + ); + } + } } diff --git a/proxy_components/engine_store_ffi/src/core/forwarder.rs b/proxy_components/engine_store_ffi/src/core/forwarder.rs index 8d4ede73c1e..7bc06ad427f 100644 --- a/proxy_components/engine_store_ffi/src/core/forwarder.rs +++ b/proxy_components/engine_store_ffi/src/core/forwarder.rs @@ -9,25 +9,28 @@ pub struct PtrWrapper(pub RawCppPtr); unsafe impl Send for PtrWrapper {} unsafe impl Sync for PtrWrapper {} +// Previously we use SnapKey, however, region_id is enough here. +pub type PreHandleKey = u64; + #[derive(Default, Debug)] pub struct PrehandleContext { // tracer holds ptr of snapshot prehandled by TiFlash side. - pub tracer: HashMap>, + pub tracer: HashMap>, } #[derive(Debug)] pub struct PrehandleTask { pub recv: mpsc::Receiver, pub peer_id: u64, - pub snapshot_index: u64, + pub snap_key: SnapKey, } impl PrehandleTask { - pub fn new(recv: mpsc::Receiver, peer_id: u64, snapshot_index: u64) -> Self { + pub fn new(recv: mpsc::Receiver, peer_id: u64, snap_key: SnapKey) -> Self { PrehandleTask { recv, peer_id, - snapshot_index, + snap_key, } } } diff --git a/proxy_components/engine_store_ffi/src/observer.rs b/proxy_components/engine_store_ffi/src/observer.rs index 59096f6e03f..41f47754ec9 100644 --- a/proxy_components/engine_store_ffi/src/observer.rs +++ b/proxy_components/engine_store_ffi/src/observer.rs @@ -231,6 +231,10 @@ impl ApplySnapshotObserver for TiFlashOb fn should_pre_apply_snapshot(&self) -> bool { self.forwarder.should_pre_apply_snapshot() } + + fn cancel_apply_snapshot(&self, region_id: u64, peer_id: u64) { + self.forwarder.cancel_apply_snapshot(region_id, peer_id) + } } impl RoleObserver for TiFlashObserver { diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs index cfc2291af99..95318f3dbc0 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs @@ -293,6 +293,8 @@ pub fn gen_engine_store_server_helper( fn_handle_get_engine_store_server_status: None, fn_pre_handle_snapshot: Some(ffi_pre_handle_snapshot), fn_apply_pre_handled_snapshot: Some(ffi_apply_pre_handled_snapshot), + fn_abort_pre_handle_snapshot: None, + fn_release_pre_handled_snapshot: None, fn_handle_http_request: None, fn_check_http_uri_available: None, fn_gc_raw_cpp_ptr: Some(ffi_gc_raw_cpp_ptr), diff --git a/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs b/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs index a010e39ee30..81c3aed27f7 100644 --- a/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs +++ b/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs @@ -205,6 +205,18 @@ impl EngineStoreServerHelper { } } + pub fn abort_pre_handle_snapshot(&self, region_id: u64, peer_id: u64) { + debug_assert!(self.fn_abort_pre_handle_snapshot.is_some()); + unsafe { (self.fn_abort_pre_handle_snapshot.into_inner())(self.inner, region_id, peer_id) } + } + + pub fn release_pre_handled_snapshot(&self, snap: RawCppPtr) { + debug_assert!(self.fn_release_pre_handled_snapshot.is_some()); + unsafe { + (self.fn_release_pre_handled_snapshot.into_inner())(self.inner, snap.ptr, snap.type_) + } + } + pub fn handle_ingest_sst( &self, snaps: Vec<(&[u8], ColumnFamilyType)>, diff --git a/proxy_components/proxy_ffi/src/interfaces.rs b/proxy_components/proxy_ffi/src/interfaces.rs index 9b9a2db2ca5..f86b930d75c 100644 --- a/proxy_components/proxy_ffi/src/interfaces.rs +++ b/proxy_components/proxy_ffi/src/interfaces.rs @@ -565,6 +565,20 @@ pub mod root { arg3: root::DB::RawCppPtrType, ), >, + pub fn_abort_pre_handle_snapshot: ::std::option::Option< + unsafe extern "C" fn( + arg1: *mut root::DB::EngineStoreServerWrap, + arg2: u64, + arg3: u64, + ), + >, + pub fn_release_pre_handled_snapshot: ::std::option::Option< + unsafe extern "C" fn( + arg1: *mut root::DB::EngineStoreServerWrap, + arg2: root::DB::RawVoidPtr, + arg3: root::DB::RawCppPtrType, + ), + >, pub fn_handle_http_request: ::std::option::Option< unsafe extern "C" fn( arg1: *mut root::DB::EngineStoreServerWrap, @@ -634,7 +648,7 @@ pub mod root { arg3: root::DB::RawVoidPtr, ) -> u32; } - pub const RAFT_STORE_PROXY_VERSION: u64 = 9421202721206258776; + pub const RAFT_STORE_PROXY_VERSION: u64 = 7385737401255952460; pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639; } } diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version index eec858e191e..27e279fbb42 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version @@ -1,3 +1,3 @@ #pragma once #include -namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 9421202721206258776ull; } \ No newline at end of file +namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 7385737401255952460ull; } \ No newline at end of file diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h index 54a44857744..2365db8e7dd 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h @@ -295,6 +295,10 @@ struct EngineStoreServerHelper { uint64_t, SSTViewVec, uint64_t, uint64_t); void (*fn_apply_pre_handled_snapshot)(EngineStoreServerWrap *, RawVoidPtr, RawCppPtrType); + void (*fn_abort_pre_handle_snapshot)(EngineStoreServerWrap *, uint64_t, + uint64_t); + void (*fn_release_pre_handled_snapshot)(EngineStoreServerWrap *, RawVoidPtr, + RawCppPtrType); HttpRequestRes (*fn_handle_http_request)(EngineStoreServerWrap *, BaseBuffView path, BaseBuffView query,