diff --git a/Cargo.lock b/Cargo.lock index 1964dd22c387..f99db871d305 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3429,6 +3429,7 @@ dependencies = [ "futures 0.3.15", "grpcio", "grpcio-health", + "health_controller", "int-enum", "keys", "kvproto", diff --git a/proxy_components/engine_store_ffi/src/core/common.rs b/proxy_components/engine_store_ffi/src/core/common.rs index c12509cea56f..58f9d14bbc82 100644 --- a/proxy_components/engine_store_ffi/src/core/common.rs +++ b/proxy_components/engine_store_ffi/src/core/common.rs @@ -5,14 +5,13 @@ pub use std::{ io::Write, ops::DerefMut, path::PathBuf, - str::FromStr, sync::{atomic::Ordering, mpsc, Arc, Mutex, RwLock}, time::SystemTime, }; pub use collections::HashMap; pub use engine_tiflash::{CachedRegionInfo, CachedRegionInfoManager}; -pub use engine_traits::{RaftEngine, RaftEngineDebug, SstMetaInfo, CF_LOCK, CF_RAFT}; +pub use engine_traits::{RaftEngine, SstMetaInfo, CF_LOCK, CF_RAFT}; pub use kvproto::{ metapb::Region, raft_cmdpb::{AdminCmdType, AdminRequest, AdminResponse, CmdType, RaftCmdRequest}, diff --git a/proxy_components/engine_store_ffi/src/core/fast_add_peer.rs b/proxy_components/engine_store_ffi/src/core/fast_add_peer.rs index 2d87fc3e8561..aa19b838828e 100644 --- a/proxy_components/engine_store_ffi/src/core/fast_add_peer.rs +++ b/proxy_components/engine_store_ffi/src/core/fast_add_peer.rs @@ -436,15 +436,13 @@ impl ProxyForwarder { ) -> RaftStoreResult { match self.raft_engine.get_entry(region_id, index)? { Some(entry) => Ok(entry.get_term()), - None => { - return Err(box_err!( - "can't find entry for index {} of region {}, peer_id: {}, tag {}", - index, - region_id, - peer_id, - tag - )); - } + None => Err(box_err!( + "can't find entry for index {} of region {}, peer_id: {}, tag {}", + index, + region_id, + peer_id, + tag + )), } } diff --git a/proxy_components/engine_store_ffi/src/core/forward_raft/mod.rs b/proxy_components/engine_store_ffi/src/core/forward_raft/mod.rs index 728b4c2e3400..8da561aa0f87 100644 --- a/proxy_components/engine_store_ffi/src/core/forward_raft/mod.rs +++ b/proxy_components/engine_store_ffi/src/core/forward_raft/mod.rs @@ -7,5 +7,4 @@ mod snapshot; pub use command::*; pub use fap_snapshot::*; -pub use region::*; pub use snapshot::*; diff --git a/proxy_components/engine_tiflash/src/properties.rs b/proxy_components/engine_tiflash/src/properties.rs index b9032e53f8fa..d9d5848ff019 100644 --- a/proxy_components/engine_tiflash/src/properties.rs +++ b/proxy_components/engine_tiflash/src/properties.rs @@ -1,4 +1,5 @@ // Copyright 2017 TiKV Project Authors. Licensed under Apache-2.0. +#![allow(clippy::redundant_closure)] use std::{ cmp, diff --git a/proxy_components/mock-engine-store/Cargo.toml b/proxy_components/mock-engine-store/Cargo.toml index c12122ce4716..0b0530a3e067 100644 --- a/proxy_components/mock-engine-store/Cargo.toml +++ b/proxy_components/mock-engine-store/Cargo.toml @@ -64,6 +64,7 @@ proxy_server = { workspace = true } # proxy_test_raftstore_v2 = { workspace = true } raft = { version = "0.7.0", default-features = false, features = ["protobuf-codec"] } raft_log_engine = { workspace = true } +health_controller = { workspace = true } raftstore = { workspace = true, default-features = false } raftstore-v2 = { workspace = true, default-features = false } rand = "0.8" diff --git a/proxy_components/mock-engine-store/src/lib.rs b/proxy_components/mock-engine-store/src/lib.rs index 7923e256a0c5..a79abef02a13 100644 --- a/proxy_components/mock-engine-store/src/lib.rs +++ b/proxy_components/mock-engine-store/src/lib.rs @@ -1,4 +1,5 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. +#![allow(stable_features)] #![feature(vec_into_raw_parts)] #![feature(slice_take)] #![feature(return_position_impl_trait_in_trait)] diff --git a/proxy_components/mock-engine-store/src/mock_cluster/test_utils.rs b/proxy_components/mock-engine-store/src/mock_cluster/test_utils.rs index 650d9a96d36d..4ea89414f67f 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/test_utils.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/test_utils.rs @@ -501,7 +501,7 @@ pub fn get_valid_compact_index_by( states: &HashMap, use_nodes: Option>, ) -> (u64, u64) { - let set = use_nodes.map(|nodes| HashSet::from_iter(nodes.clone().into_iter())); + let set = use_nodes.map(|nodes| HashSet::from_iter(nodes.clone())); states .iter() .filter(|(k, _)| { diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster_ext_v1.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster_ext_v1.rs index 8d8cef7491e8..c3fe03e97598 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster_ext_v1.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/cluster_ext_v1.rs @@ -38,7 +38,7 @@ impl> Cluster { let mut node_ids: Vec = self.engines.iter().map(|(&id, _)| id).collect(); // We force iterate engines in sorted order. node_ids.sort(); - for (_, node_id) in node_ids.iter().enumerate() { + for node_id in node_ids.iter() { // Always at the front of the vector since iterate from 0. self.register_ffi_helper_set(Some(0), *node_id); } diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/common.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/common.rs index 128da887d4a9..aba5e74105d6 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/common.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/common.rs @@ -1,6 +1,5 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. pub use crate::mock_cluster::{ - cluster_ext::ClusterExt, common::*, config::MixedClusterConfig, init_global_ffi_helper_set, - FFIHelperSet, + cluster_ext::ClusterExt, common::*, config::MixedClusterConfig, FFIHelperSet, }; diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs index 275d093a0f2d..915b40d2f641 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/node.rs @@ -11,6 +11,7 @@ use encryption_export::DataKeyManager; use engine_rocks::RocksSnapshot; use engine_store_ffi::core::DebugStruct; use engine_traits::{Engines, MiscExt, Peekable, SnapshotContext}; +use health_controller::HealthController; use kvproto::{ metapb, raft_cmdpb::*, @@ -294,7 +295,7 @@ impl Simulator for NodeCluster { Arc::clone(&self.pd_client), Arc::default(), bg_worker.clone(), - None, + HealthController::new(), None, ); @@ -320,7 +321,7 @@ impl Simulator for NodeCluster { (snap_mgr, Some(tmp)) } else { let trans = self.trans.core.lock().unwrap(); - let &(ref snap_mgr, _) = &trans.snap_paths[&node_id]; + let (snap_mgr, _) = &trans.snap_paths[&node_id]; (snap_mgr.clone(), None) }; diff --git a/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs b/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs index 1d305d9ac2b4..c2d2e4778ceb 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/v1/server.rs @@ -1,4 +1,5 @@ // Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0. +#![allow(clippy::arc_with_non_send_sync)] use std::{ path::Path, @@ -19,6 +20,7 @@ use engine_traits::{Engines, MiscExt, SnapshotContext}; use futures::executor::block_on; use grpcio::{ChannelBuilder, EnvBuilder, Environment, Error as GrpcError, Service}; use grpcio_health::HealthService; +use health_controller::HealthController; use kvproto::{ deadlock::create_deadlock, debugpb::DebugClient, @@ -70,7 +72,7 @@ use tikv::{ }; use tikv_util::{ box_err, - config::VersionTrack, + config::{ReadableSize, VersionTrack}, quota_limiter::QuotaLimiter, sys::thread::ThreadBuildWrapper, thd_name, @@ -298,8 +300,15 @@ impl ServerCluster { ); // Create coprocessor. + let enable_region_stats_mgr_cb: Arc bool + Send + Sync> = + if cfg.region_cache_memory_limit != ReadableSize(0) { + Arc::new(|| true) + } else { + Arc::new(|| false) + }; let mut coprocessor_host = CoprocessorHost::new(router.clone(), cfg.coprocessor.clone()); - let region_info_accessor = RegionInfoAccessor::new(&mut coprocessor_host); + let region_info_accessor = + RegionInfoAccessor::new(&mut coprocessor_host, enable_region_stats_mgr_cb); let raft_router = ServerRaftStoreRouter::new(router.clone(), local_reader); let sim_router = SimulateTransport::new(raft_router.clone()); @@ -489,6 +498,7 @@ impl ServerCluster { ) .unwrap(); let health_service = HealthService::default(); + let health_controller = HealthController::new(); let mut node = Node::new( system, &server_cfg.value().clone(), @@ -497,7 +507,7 @@ impl ServerCluster { Arc::clone(&self.pd_client), state, bg_worker.clone(), - Some(health_service.clone()), + health_controller.clone(), None, ); node.try_bootstrap_store(engines.clone())?; @@ -518,7 +528,7 @@ impl ServerCluster { self.env.clone(), None, debug_thread_pool.clone(), - health_service.clone(), + health_controller.clone(), None, ) .unwrap(); diff --git a/proxy_components/mock-engine-store/src/mock_store/common.rs b/proxy_components/mock-engine-store/src/mock_store/common.rs index 68c9a57d9b03..7225ede62396 100644 --- a/proxy_components/mock-engine-store/src/mock_store/common.rs +++ b/proxy_components/mock-engine-store/src/mock_store/common.rs @@ -1,19 +1,15 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. pub use collections::{HashMap, HashSet}; -pub use engine_store_ffi::{ - ffi::{ - interfaces_ffi, - interfaces_ffi::{EngineStoreServerHelper, RaftStoreProxyFFIHelper, RawCppPtr, RawVoidPtr}, - UnwrapExternCFunc, - }, - TiFlashEngine, +pub use engine_store_ffi::ffi::{ + interfaces_ffi, + interfaces_ffi::{EngineStoreServerHelper, RaftStoreProxyFFIHelper, RawCppPtr, RawVoidPtr}, + UnwrapExternCFunc, }; pub use engine_traits::{ Engines, Iterable, KvEngine, Mutable, Peekable, RaftEngine, RaftEngineReadOnly, RaftLogBatch, SyncMutable, WriteBatch, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, }; pub use kvproto::{ - metapb, raft_cmdpb::AdminCmdType, raft_serverpb::{PeerState, RaftApplyState, RaftLocalState, RegionLocalState}, }; diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs index 969c28af0338..d9176d4baf03 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs @@ -566,7 +566,7 @@ unsafe extern "C" fn ffi_atomic_update_proxy( arg2: *mut interfaces_ffi::RaftStoreProxyFFIHelper, ) { let store = into_engine_store_server_wrap(arg1); - store.maybe_proxy_helper = Some(&mut *(arg2 as *mut RaftStoreProxyFFIHelper)); + store.maybe_proxy_helper = Some(arg2); } unsafe extern "C" fn ffi_handle_destroy( diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_page_storage.rs b/proxy_components/mock-engine-store/src/mock_store/mock_page_storage.rs index 566e30ad7431..437425d4b437 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_page_storage.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_page_storage.rs @@ -111,23 +111,23 @@ pub unsafe extern "C" fn ffi_mockps_wb_del_page(wb: RawVoidPtr, page_id: BaseBuf } pub unsafe extern "C" fn ffi_mockps_get_wb_size(wb: RawVoidPtr) -> u64 { - let wb: _ = <&mut MockPSWriteBatch as From>::from(wb); + let wb = <&mut MockPSWriteBatch as From>::from(wb); wb.data.len() as u64 } pub unsafe extern "C" fn ffi_mockps_is_wb_empty(wb: RawVoidPtr) -> u8 { - let wb: _ = <&mut MockPSWriteBatch as From>::from(wb); + let wb = <&mut MockPSWriteBatch as From>::from(wb); u8::from(wb.data.is_empty()) } pub unsafe extern "C" fn ffi_mockps_handle_merge_wb(lwb: RawVoidPtr, rwb: RawVoidPtr) { - let lwb: _ = <&mut MockPSWriteBatch as From>::from(lwb); - let rwb: _ = <&mut MockPSWriteBatch as From>::from(rwb); + let lwb = <&mut MockPSWriteBatch as From>::from(lwb); + let rwb = <&mut MockPSWriteBatch as From>::from(rwb); lwb.data.append(&mut rwb.data); } pub unsafe extern "C" fn ffi_mockps_handle_clear_wb(wb: RawVoidPtr) { - let wb: _ = <&mut MockPSWriteBatch as From>::from(wb); + let wb = <&mut MockPSWriteBatch as From>::from(wb); wb.data.clear(); } @@ -136,7 +136,7 @@ pub unsafe extern "C" fn ffi_mockps_handle_consume_wb( wb: RawVoidPtr, ) { let store = into_engine_store_server_wrap(wrap); - let wb: _ = <&mut MockPSWriteBatch as From>::from(wb); + let wb = <&mut MockPSWriteBatch as From>::from(wb); let mut guard = (*store.engine_store_server) .page_storage .data diff --git a/proxy_components/proxy_ffi/src/jemalloc_utils.rs b/proxy_components/proxy_ffi/src/jemalloc_utils.rs index 33875908b0c6..3c0654f706ac 100644 --- a/proxy_components/proxy_ffi/src/jemalloc_utils.rs +++ b/proxy_components/proxy_ffi/src/jemalloc_utils.rs @@ -23,56 +23,58 @@ extern "C" { #[allow(unused_variables)] #[allow(unused_mut)] #[allow(unused_unsafe)] -fn issue_mallctl(command: &str) -> u64 { - type PtrUnderlying = u64; - let mut ptr: PtrUnderlying = 0; - let mut size = std::mem::size_of::() as u64; - let c_str = std::ffi::CString::new(command).unwrap(); - let c_ptr: *const ::std::os::raw::c_char = c_str.as_ptr() as *const ::std::os::raw::c_char; +pub fn issue_mallctl_args( + command: &str, + oldptr: *mut ::std::os::raw::c_void, + oldsize: *mut u64, + newptr: *mut ::std::os::raw::c_void, + newsize: u64, +) -> ::std::os::raw::c_int { unsafe { + let c_str = std::ffi::CString::new(command).unwrap(); + let c_ptr: *const ::std::os::raw::c_char = c_str.as_ptr() as *const ::std::os::raw::c_char; // See unprefixed_malloc_on_supported_platforms in tikv-jemalloc-sys. #[cfg(any(test, feature = "testexport"))] { - #[cfg(any(feature = "jemalloc"))] + // Test part + #[cfg(feature = "jemalloc")] { // See NO_UNPREFIXED_MALLOC #[cfg(any(target_os = "android", target_os = "dragonfly", target_os = "macos"))] - _rjem_mallctl( - c_ptr, - &mut ptr as *mut _ as *mut ::std::os::raw::c_void, - &mut size as *mut u64, - std::ptr::null_mut(), - 0, - ); + return _rjem_mallctl(c_ptr, oldptr, oldsize, newptr, newsize); #[cfg(not(any( target_os = "android", target_os = "dragonfly", target_os = "macos" )))] - mallctl( - c_ptr, - &mut ptr as *mut _ as *mut ::std::os::raw::c_void, - &mut size as *mut u64, - std::ptr::null_mut(), - 0, - ); + return mallctl(c_ptr, oldptr, oldsize, newptr, newsize); } + 0 } #[cfg(not(any(test, feature = "testexport")))] { - // Must linked to tiflash. + // No test part #[cfg(feature = "external-jemalloc")] - mallctl( - c_ptr, - &mut ptr as *mut _ as *mut ::std::os::raw::c_void, - &mut size as *mut u64, - std::ptr::null_mut(), - 0, - ); + { + // Must linked to tiflash. + return mallctl(c_ptr, oldptr, oldsize, newptr, newsize); + } + #[cfg(not(feature = "external-jemalloc"))] + { + // Happens only with `raftstore-proxy-main` + #[cfg(not(any( + target_os = "android", + target_os = "dragonfly", + target_os = "macos" + )))] + { + return mallctl(c_ptr, oldptr, oldsize, newptr, newsize); + } + 0 + } } } - ptr } pub fn get_allocatep_on_thread_start() -> u64 { diff --git a/proxy_components/proxy_ffi/src/raftstore_proxy.rs b/proxy_components/proxy_ffi/src/raftstore_proxy.rs index 55f2be1a0852..125ed0342ff6 100644 --- a/proxy_components/proxy_ffi/src/raftstore_proxy.rs +++ b/proxy_components/proxy_ffi/src/raftstore_proxy.rs @@ -176,7 +176,7 @@ impl RaftStoreProxy { break; } let sel = futures::future::select_all(pending); - let (resp, _completed_idx, remaining) = rt.block_on(async { sel.await }); + let (resp, _completed_idx, remaining) = rt.block_on(sel); let (res, need_retry) = parse_response(&rt, resp.unwrap()); has_need_retry |= need_retry; diff --git a/proxy_components/proxy_ffi/src/raftstore_proxy_helper_impls.rs b/proxy_components/proxy_ffi/src/raftstore_proxy_helper_impls.rs index 1ae277abe955..bb975cfde823 100644 --- a/proxy_components/proxy_ffi/src/raftstore_proxy_helper_impls.rs +++ b/proxy_components/proxy_ffi/src/raftstore_proxy_helper_impls.rs @@ -31,9 +31,7 @@ use super::{ impl Clone for RaftStoreProxyPtr { fn clone(&self) -> RaftStoreProxyPtr { - RaftStoreProxyPtr { - inner: self.inner.clone(), - } + *self } } diff --git a/proxy_components/proxy_server/src/run.rs b/proxy_components/proxy_server/src/run.rs index ba08159f0fd4..c22a17f0174a 100644 --- a/proxy_components/proxy_server/src/run.rs +++ b/proxy_components/proxy_server/src/run.rs @@ -42,7 +42,6 @@ use error_code::ErrorCodeExt; use file_system::{get_io_rate_limiter, BytesFetcher, MetricsManager as IOMetricsManager}; use futures::executor::block_on; use grpcio::{EnvBuilder, Environment}; -use grpcio_health::HealthService; use health_controller::HealthController; use kvproto::{ debugpb::create_debug, diagnosticspb::create_diagnostics, import_sstpb::create_import_sst, @@ -274,6 +273,7 @@ pub fn run_impl( } #[inline] +#[allow(clippy::extra_unused_type_parameters)] fn run_impl_only_for_decryption( config: TikvConfig, proxy_config: ProxyConfig, @@ -1134,7 +1134,6 @@ impl TiKvServer { ) .unwrap_or_else(|e| fatal!("failed to validate raftstore config {}", e)); let raft_store = Arc::new(VersionTrack::new(self.core.config.raft_store.clone())); - let health_service = HealthService::default(); let mut default_store = kvproto::metapb::Store::default(); if !self.proxy_config.server.engine_store_version.is_empty() { @@ -1540,6 +1539,7 @@ impl TiKvServer { .unwrap() .join(Path::new(file_system::SPACE_PLACEHOLDER_FILE)); + #[allow(clippy::needless_borrows_for_generic_args)] let placeholder_size: u64 = file_system::get_file_size(&placeholer_file_path).unwrap_or(0);