Skip to content

Commit

Permalink
a
Browse files Browse the repository at this point in the history
Signed-off-by: CalvinNeo <[email protected]>
  • Loading branch information
CalvinNeo committed Aug 1, 2023
1 parent c717cdd commit eb8998c
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
"term" => cmd.term,
"index" => cmd.index,
"type" => ?cmd_type,
"region_epoch" => ?ob_region.get_region_epoch(),
);
}
_ => {
Expand All @@ -238,7 +239,8 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
"peer_id" => region_state.peer_id,
"term" => cmd.term,
"index" => cmd.index,
"command" => ?request
"command" => ?request,
"region_epoch" => ?ob_region.get_region_epoch(),
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,14 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
match self.apply_snap_pool.as_ref() {
Some(p) => {
let (sender, receiver) = mpsc::channel();
let task = Arc::new(PrehandleTask::new(receiver, peer_id, snap_key.idx));
let task = Arc::new(PrehandleTask::new(receiver, peer_id, snap_key.clone()));
{
let mut lock = match self.pre_handle_snapshot_ctx.lock() {
Ok(l) => l,
Err(_) => fatal!("pre_apply_snapshot poisoned"),
};
let ctx = lock.deref_mut();
if let Some(o) = ctx.tracer.insert(snap_key.clone(), task.clone()) {
if let Some(o) = ctx.tracer.insert(snap_key.region_id, task.clone()) {
let _prev = self
.engine
.proxy_ext
Expand All @@ -189,7 +189,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
"snap_key" => ?snap_key,
"pending" => self.engine.proxy_ext.pending_applies_count.load(Ordering::SeqCst),
"new_snapshot" => ?snap,
"old_snapshot_index" => o.snapshot_index,
"old_snapshot_index" => o.snap_key.idx,
);
// TODO elegantly stop the previous task.
drop(o);
Expand Down Expand Up @@ -307,12 +307,21 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
Err(_) => fatal!("post_apply_snapshot poisoned"),
};
let ctx = lock.deref_mut();
ctx.tracer.remove(snap_key)
ctx.tracer.remove(&region_id)
};

let post_apply_start = tikv_util::time::Instant::now();
let need_retry = match maybe_prehandle_task {
Some(t) => {
if &t.snap_key != snap_key {
panic!(
"mismatched prehandled snapshot [region_id={}] [peer_id={}] [snap_key={:?}] [snap_key={:?}]",
ob_region.get_id(),
peer_id,
snap_key,
t.snap_key,
);
}
let need_retry = match t.recv.recv() {
Ok(snap_ptr) => {
info!("get prehandled snapshot success";
Expand Down Expand Up @@ -407,4 +416,49 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
pub fn should_pre_apply_snapshot(&self) -> bool {
true
}

pub fn cancel_apply_snapshot(&self, region_id: u64, peer_id: u64) {
info!("start cancel apply snapshot";
"peer_id" => peer_id,
"region_id" => region_id,
);
// Notify TiFlash to stop pre handling snapshot. No blocking wait.
self.engine_store_server_helper
.abort_pre_handle_snapshot(region_id, peer_id);
let maybe_prehandle_task = {
let mut lock = match self.pre_handle_snapshot_ctx.lock() {
Ok(l) => l,
Err(_) => fatal!("cancel_apply_snapshot poisoned"),
};
let ctx = lock.deref_mut();
ctx.tracer.remove(&region_id)
};
if let Some(t) = maybe_prehandle_task {
info!("cancel apply snapshot find no prehandle task";
"peer_id" => peer_id,
"region_id" => region_id,
);
match t.recv.recv() {
Ok(f) => {
info!("cancel apply snapshot start cancel pre handled snapshot";
"peer_id" => peer_id,
"region_id" => region_id,
);
self.engine_store_server_helper
.release_pre_handled_snapshot(f.0);
}
Err(e) => {
info!("cancel apply snapshot find error in prehandle task {:?}", e;
"peer_id" => peer_id,
"region_id" => region_id,
);
}
}
} else {
info!("cancel apply snapshot find no prehandle task";
"peer_id" => peer_id,
"region_id" => region_id,
);
}
}
}
11 changes: 7 additions & 4 deletions proxy_components/engine_store_ffi/src/core/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,28 @@ pub struct PtrWrapper(pub RawCppPtr);
unsafe impl Send for PtrWrapper {}
unsafe impl Sync for PtrWrapper {}

// Previously we use SnapKey, however, region_id is enough here.
pub type PreHandleKey = u64;

#[derive(Default, Debug)]
pub struct PrehandleContext {
// tracer holds ptr of snapshot prehandled by TiFlash side.
pub tracer: HashMap<SnapKey, Arc<PrehandleTask>>,
pub tracer: HashMap<PreHandleKey, Arc<PrehandleTask>>,
}

#[derive(Debug)]
pub struct PrehandleTask {
pub recv: mpsc::Receiver<PtrWrapper>,
pub peer_id: u64,
pub snapshot_index: u64,
pub snap_key: SnapKey,
}

impl PrehandleTask {
pub fn new(recv: mpsc::Receiver<PtrWrapper>, peer_id: u64, snapshot_index: u64) -> Self {
pub fn new(recv: mpsc::Receiver<PtrWrapper>, peer_id: u64, snap_key: SnapKey) -> Self {
PrehandleTask {
recv,
peer_id,
snapshot_index,
snap_key,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions proxy_components/engine_store_ffi/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ impl<T: Transport + 'static, ER: RaftEngine> ApplySnapshotObserver for TiFlashOb
fn should_pre_apply_snapshot(&self) -> bool {
self.forwarder.should_pre_apply_snapshot()
}

fn cancel_apply_snapshot(&self, region_id: u64, peer_id: u64) {
self.forwarder.cancel_apply_snapshot(region_id, peer_id)
}
}

impl<T: Transport + 'static, ER: RaftEngine> RoleObserver for TiFlashObserver<T, ER> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ pub fn gen_engine_store_server_helper(
fn_handle_get_engine_store_server_status: None,
fn_pre_handle_snapshot: Some(ffi_pre_handle_snapshot),
fn_apply_pre_handled_snapshot: Some(ffi_apply_pre_handled_snapshot),
fn_abort_pre_handle_snapshot: None,
fn_release_pre_handled_snapshot: None,
fn_handle_http_request: None,
fn_check_http_uri_available: None,
fn_gc_raw_cpp_ptr: Some(ffi_gc_raw_cpp_ptr),
Expand Down
12 changes: 12 additions & 0 deletions proxy_components/proxy_ffi/src/engine_store_helper_impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,18 @@ impl EngineStoreServerHelper {
}
}

pub fn abort_pre_handle_snapshot(&self, region_id: u64, peer_id: u64) {
debug_assert!(self.fn_abort_pre_handle_snapshot.is_some());
unsafe { (self.fn_abort_pre_handle_snapshot.into_inner())(self.inner, region_id, peer_id) }
}

pub fn release_pre_handled_snapshot(&self, snap: RawCppPtr) {
debug_assert!(self.fn_release_pre_handled_snapshot.is_some());
unsafe {
(self.fn_release_pre_handled_snapshot.into_inner())(self.inner, snap.ptr, snap.type_)
}
}

pub fn handle_ingest_sst(
&self,
snaps: Vec<(&[u8], ColumnFamilyType)>,
Expand Down
16 changes: 15 additions & 1 deletion proxy_components/proxy_ffi/src/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,20 @@ pub mod root {
arg3: root::DB::RawCppPtrType,
),
>,
pub fn_abort_pre_handle_snapshot: ::std::option::Option<
unsafe extern "C" fn(
arg1: *mut root::DB::EngineStoreServerWrap,
arg2: u64,
arg3: u64,
),
>,
pub fn_release_pre_handled_snapshot: ::std::option::Option<
unsafe extern "C" fn(
arg1: *mut root::DB::EngineStoreServerWrap,
arg2: root::DB::RawVoidPtr,
arg3: root::DB::RawCppPtrType,
),
>,
pub fn_handle_http_request: ::std::option::Option<
unsafe extern "C" fn(
arg1: *mut root::DB::EngineStoreServerWrap,
Expand Down Expand Up @@ -634,7 +648,7 @@ pub mod root {
arg3: root::DB::RawVoidPtr,
) -> u32;
}
pub const RAFT_STORE_PROXY_VERSION: u64 = 9421202721206258776;
pub const RAFT_STORE_PROXY_VERSION: u64 = 7385737401255952460;
pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639;
}
}
2 changes: 1 addition & 1 deletion raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#pragma once
#include <cstdint>
namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 9421202721206258776ull; }
namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 7385737401255952460ull; }
4 changes: 4 additions & 0 deletions raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ struct EngineStoreServerHelper {
uint64_t, SSTViewVec, uint64_t, uint64_t);
void (*fn_apply_pre_handled_snapshot)(EngineStoreServerWrap *, RawVoidPtr,
RawCppPtrType);
void (*fn_abort_pre_handle_snapshot)(EngineStoreServerWrap *, uint64_t,
uint64_t);
void (*fn_release_pre_handled_snapshot)(EngineStoreServerWrap *, RawVoidPtr,
RawCppPtrType);
HttpRequestRes (*fn_handle_http_request)(EngineStoreServerWrap *,
BaseBuffView path,
BaseBuffView query,
Expand Down

0 comments on commit eb8998c

Please sign in to comment.