From bb91f31ff58993e07ea89845791235138283a24c Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 12 Feb 2025 14:43:22 +0800 Subject: [PATCH 1/3] feat: Add parallelism support Signed-off-by: Ruihang Xia --- sqlness-cli/src/main.rs | 3 +- sqlness/Cargo.toml | 3 +- sqlness/examples/bad.rs | 2 +- sqlness/examples/basic.rs | 2 +- sqlness/src/config.rs | 9 +++- sqlness/src/database.rs | 2 +- sqlness/src/environment.rs | 14 ++++-- sqlness/src/interceptor.rs | 2 +- sqlness/src/interceptor/sleep.rs | 2 +- sqlness/src/runner.rs | 83 ++++++++++++++++++++++++-------- 10 files changed, 89 insertions(+), 33 deletions(-) diff --git a/sqlness-cli/src/main.rs b/sqlness-cli/src/main.rs index 7a1e425..b020ca3 100644 --- a/sqlness-cli/src/main.rs +++ b/sqlness-cli/src/main.rs @@ -91,7 +91,7 @@ impl CliController { impl EnvController for CliController { type DB = DBProxy; - async fn start(&self, _env: &str, _config: Option<&Path>) -> Self::DB { + async fn start(&self, _env: &str, _id: usize, _config: Option<&Path>) -> Self::DB { DBProxy::new(self.db_config.clone(), self.db_type) } @@ -112,6 +112,7 @@ fn main() { let config = ConfigBuilder::default() .case_dir(args.case_dir) + .parallelism(1) .build() .expect("build config"); diff --git a/sqlness/Cargo.toml b/sqlness/Cargo.toml index 764fff4..94e5c1f 100644 --- a/sqlness/Cargo.toml +++ b/sqlness/Cargo.toml @@ -13,10 +13,11 @@ readme = { workspace = true } async-trait = "0.1" derive_builder = "0.11" duration-str = "0.11.2" +futures = "0.3" minijinja = "1" mysql = { version = "23.0.1", optional = true } postgres = { version = "0.19.7", optional = true } -prettydiff = { version = "0.6.2", default_features = false } +prettydiff = { version = "0.6.2", default-features = false } regex = "1.7.1" serde_json = "1" thiserror = "1.0" diff --git a/sqlness/examples/bad.rs b/sqlness/examples/bad.rs index f2840af..33d7f25 100644 --- a/sqlness/examples/bad.rs +++ b/sqlness/examples/bad.rs @@ -16,7 +16,7 @@ struct MyDB; #[async_trait] impl Database for MyDB { async fn query(&self, _ctx: QueryContext, _query: String) -> Box { - return Box::new("Unexpected".to_string()); + Box::new("Unexpected") } } diff --git a/sqlness/examples/basic.rs b/sqlness/examples/basic.rs index 30dbd3a..4ef2ffe 100644 --- a/sqlness/examples/basic.rs +++ b/sqlness/examples/basic.rs @@ -12,7 +12,7 @@ struct MyDB; impl Database for MyDB { async fn query(&self, _context: QueryContext, _query: String) -> Box { // Implement query logic here - return Box::new("ok".to_string()); + Box::new("ok") } } diff --git a/sqlness/src/config.rs b/sqlness/src/config.rs index a891872..4dec738 100644 --- a/sqlness/src/config.rs +++ b/sqlness/src/config.rs @@ -6,7 +6,7 @@ use derive_builder::Builder; /// Configurations of [`Runner`]. /// /// [`Runner`]: crate::Runner -#[derive(Builder)] +#[derive(Builder, Clone)] pub struct Config { pub case_dir: String, /// Default value: `sql` @@ -38,6 +38,9 @@ pub struct Config { /// Interceptors used to pre-process input query and post-process query response #[builder(default = "Config::default_registry()")] pub interceptor_registry: Registry, + /// Default value: 1 + #[builder(default = "Config::default_parallelism()")] + pub parallelism: usize, } impl Config { @@ -76,6 +79,10 @@ impl Config { fn default_registry() -> Registry { Registry::default() } + + fn default_parallelism() -> usize { + 1 + } } /// Config for DatabaseBuilder diff --git a/sqlness/src/database.rs b/sqlness/src/database.rs index 813e7d8..f454673 100644 --- a/sqlness/src/database.rs +++ b/sqlness/src/database.rs @@ -15,6 +15,6 @@ use crate::case::QueryContext; /// [`Runner`]: crate::Runner /// [`EnvController::start`]: crate::EnvController#tymethod.start #[async_trait] -pub trait Database { +pub trait Database: Send + Sync { async fn query(&self, context: QueryContext, query: String) -> Box; } diff --git a/sqlness/src/environment.rs b/sqlness/src/environment.rs index ec54709..6b8331f 100644 --- a/sqlness/src/environment.rs +++ b/sqlness/src/environment.rs @@ -18,16 +18,20 @@ use crate::database::Database; /// directories of test case directory. Refer to crate level documentation for more information /// about directory organizaiton rules. #[async_trait] -pub trait EnvController { +pub trait EnvController: Send + Sync { type DB: Database; /// Start a [`Database`] to run test queries. /// - /// Two parameters are the mode of this environment, or environment's name. - /// And the config file's path to this environment if it's find, it's defined - /// by the `env_config_file` field in the root config toml, and the default + /// Three parameters are the mode of this environment, or environment's name, + /// the id of this database instance, and the config file's path to this environment if it's find, + /// it's defined by the `env_config_file` field in the root config toml, and the default /// value is `config.toml`. - async fn start(&self, env: &str, config: Option<&Path>) -> Self::DB; + /// + /// The id is used to distinguish different database instances in the same environment. + /// For example, you may want to run the sqlness test in parallel against different instances + /// of the same environment to accelerate the test. + async fn start(&self, env: &str, id: usize, config: Option<&Path>) -> Self::DB; /// Stop one [`Database`]. async fn stop(&self, env: &str, database: Self::DB); diff --git a/sqlness/src/interceptor.rs b/sqlness/src/interceptor.rs index c312746..9471507 100644 --- a/sqlness/src/interceptor.rs +++ b/sqlness/src/interceptor.rs @@ -46,7 +46,7 @@ pub trait Interceptor { } } -pub type InterceptorFactoryRef = Arc; +pub type InterceptorFactoryRef = Arc; pub trait InterceptorFactory { fn try_new(&self, ctx: &str) -> Result; diff --git a/sqlness/src/interceptor/sleep.rs b/sqlness/src/interceptor/sleep.rs index 49b216a..31571fe 100644 --- a/sqlness/src/interceptor/sleep.rs +++ b/sqlness/src/interceptor/sleep.rs @@ -22,7 +22,7 @@ pub const PREFIX: &str = "SLEEP"; /// - `1s` for 1 second /// - `1ms` for 1 millisecond /// - `1s500ms` for 1.5 seconds -/// etc. See detailed format in [duration_str](https://docs.rs/duration-str/0.11.2/duration_str/) crate +/// etc. See detailed format in [duration_str](https://docs.rs/duration-str/0.11.2/duration_str/) crate /// /// Note that this implementation is not accurate and may be affected by the system load. /// It is guaranteed that the sleep time is at least the given milliseconds, but the lag may be diff --git a/sqlness/src/runner.rs b/sqlness/src/runner.rs index 9e9066f..2111ec3 100644 --- a/sqlness/src/runner.rs +++ b/sqlness/src/runner.rs @@ -4,6 +4,7 @@ use std::fs::{read_dir, OpenOptions}; use std::io::{Cursor, Read, Seek, Write}; use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::sync::{Arc, Mutex}; use std::time::Instant; use prettydiff::basic::{DiffOp, SliceChangeset}; @@ -59,9 +60,17 @@ impl Runner { } else { None }; - let db = self.env_controller.start(&env, config_path).await; - let run_result = self.run_env(&env, &db).await; - self.env_controller.stop(&env, db).await; + let parallelism = self.config.parallelism.max(1); + let mut databases = Vec::with_capacity(parallelism); + println!("Creating enviroment with parallelism: {}", parallelism); + for id in 0..parallelism { + let db = self.env_controller.start(&env, id, config_path).await; + databases.push(db); + } + let run_result = self.run_env(&env, &databases).await; + for db in databases { + self.env_controller.stop(&env, db).await; + } if let Err(e) = run_result { println!("Environment {env} run failed, error:{e:?}."); @@ -105,40 +114,74 @@ impl Runner { Ok(result) } - async fn run_env(&self, env: &str, db: &E::DB) -> Result<()> { + async fn run_env(&self, env: &str, databases: &[E::DB]) -> Result<()> { let case_paths = self.collect_case_paths(env).await?; - let mut failed_cases = vec![]; - let mut errors = vec![]; let start = Instant::now(); - for path in case_paths { - let is_success = self.run_single_case(db, &path).await; - let case_name = path.as_os_str().to_str().unwrap().to_owned(); - match is_success { - Ok(false) => failed_cases.push(case_name), - Ok(true) => {} - Err(e) => { - if self.config.fail_fast { - println!("Case {case_name} failed with error {e:?}"); - println!("Stopping environment {env} due to previous error."); - break; - } else { - errors.push((case_name, e)) + + let case_queue = Arc::new(Mutex::new(case_paths)); + let failed_cases = Arc::new(Mutex::new(Vec::new())); + let errors = Arc::new(Mutex::new(Vec::new())); + + let mut futures = Vec::new(); + + // Create futures for each database to process cases + for (db_idx, db) in databases.iter().enumerate() { + let case_queue = case_queue.clone(); + let failed_cases = failed_cases.clone(); + let errors = errors.clone(); + let fail_fast = self.config.fail_fast; + + futures.push(async move { + loop { + // Try to get next case from the queue + let next_case = { + let mut queue = case_queue.lock().unwrap(); + if queue.is_empty() { + break; + } + queue.pop().unwrap() + }; + + let case_name = next_case.as_os_str().to_str().unwrap().to_owned(); + match self.run_single_case(db, &next_case).await { + Ok(false) => { + println!("[DB-{:2}] Case {} failed", db_idx, case_name); + failed_cases.lock().unwrap().push(case_name); + } + Ok(true) => { + println!("[DB-{:2}] Case {} succeeded", db_idx, case_name); + } + Err(e) => { + println!( + "[DB-{:2}] Case {} failed with error {:?}", + db_idx, case_name, e + ); + if fail_fast { + errors.lock().unwrap().push((case_name, e)); + return; + } + errors.lock().unwrap().push((case_name, e)); + } } } - } + }); } + futures::future::join_all(futures).await; + println!( "Environment {} run finished, cost:{}ms", env, start.elapsed().as_millis() ); + let failed_cases = failed_cases.lock().unwrap(); if !failed_cases.is_empty() { println!("Failed cases:"); println!("{failed_cases:#?}"); } + let errors = errors.lock().unwrap(); if !errors.is_empty() { println!("Error cases:"); println!("{errors:#?}"); From ab5fc6b07e1f79a497c9a0fe5c58c9ef0680ce85 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 12 Feb 2025 11:31:44 -0800 Subject: [PATCH 2/3] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- sqlness/src/runner.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sqlness/src/runner.rs b/sqlness/src/runner.rs index 2111ec3..f9fc833 100644 --- a/sqlness/src/runner.rs +++ b/sqlness/src/runner.rs @@ -135,7 +135,7 @@ impl Runner { loop { // Try to get next case from the queue let next_case = { - let mut queue = case_queue.lock().unwrap(); + let mut queue = case_queue.lock().expect("Failed to lock case_queue mutex"); if queue.is_empty() { break; } @@ -157,10 +157,10 @@ impl Runner { db_idx, case_name, e ); if fail_fast { - errors.lock().unwrap().push((case_name, e)); + errors.lock().expect("Failed to acquire lock on errors").push((case_name, e)); return; } - errors.lock().unwrap().push((case_name, e)); + errors.lock().expect("Failed to acquire lock on errors").push((case_name, e)); } } } From 371bcf926b5174104218f1247cb97a5ed4aa763a Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 13 Feb 2025 03:26:09 +0800 Subject: [PATCH 3/3] fix clippy Signed-off-by: Ruihang Xia --- sqlness/examples/bad.rs | 6 +++--- sqlness/examples/basic.rs | 8 ++++---- sqlness/examples/interceptor.rs | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sqlness/examples/bad.rs b/sqlness/examples/bad.rs index 33d7f25..de012ee 100644 --- a/sqlness/examples/bad.rs +++ b/sqlness/examples/bad.rs @@ -24,7 +24,7 @@ impl Database for MyDB { const LOCK_FILE: &str = "/tmp/sqlness-bad-example.lock"; impl MyDB { - fn new(_env: &str, _config: Option<&Path>) -> Self { + fn new(_env: &str, _id: usize, _config: Option<&Path>) -> Self { File::create(LOCK_FILE).unwrap(); MyDB } @@ -38,8 +38,8 @@ impl MyDB { impl EnvController for MyController { type DB = MyDB; - async fn start(&self, env: &str, config: Option<&Path>) -> Self::DB { - MyDB::new(env, config) + async fn start(&self, env: &str, id: usize, config: Option<&Path>) -> Self::DB { + MyDB::new(env, id, config) } async fn stop(&self, _env: &str, database: Self::DB) { diff --git a/sqlness/examples/basic.rs b/sqlness/examples/basic.rs index 4ef2ffe..e4915b0 100644 --- a/sqlness/examples/basic.rs +++ b/sqlness/examples/basic.rs @@ -17,7 +17,7 @@ impl Database for MyDB { } impl MyDB { - fn new(_env: &str, _config: Option<&Path>) -> Self { + fn new(_env: &str, _id: usize, _config: Option<&Path>) -> Self { MyDB } @@ -30,9 +30,9 @@ impl MyDB { impl EnvController for MyController { type DB = MyDB; - async fn start(&self, env: &str, config: Option<&Path>) -> Self::DB { - println!("Start, env:{env}, config:{config:?}."); - MyDB::new(env, config) + async fn start(&self, env: &str, id: usize, config: Option<&Path>) -> Self::DB { + println!("Start, env:{env}, id:{id}, config:{config:?}."); + MyDB::new(env, id, config) } async fn stop(&self, env: &str, database: Self::DB) { diff --git a/sqlness/examples/interceptor.rs b/sqlness/examples/interceptor.rs index 0978d95..0c4fe31 100644 --- a/sqlness/examples/interceptor.rs +++ b/sqlness/examples/interceptor.rs @@ -30,7 +30,7 @@ impl Database for MyDB { } impl MyDB { - fn new(_env: &str, _config: Option<&Path>) -> Self { + fn new(_env: &str, _id: usize, _config: Option<&Path>) -> Self { MyDB } @@ -41,8 +41,8 @@ impl MyDB { impl EnvController for MyController { type DB = MyDB; - async fn start(&self, env: &str, config: Option<&Path>) -> Self::DB { - MyDB::new(env, config) + async fn start(&self, env: &str, id: usize, config: Option<&Path>) -> Self::DB { + MyDB::new(env, id, config) } async fn stop(&self, _env: &str, database: Self::DB) {