Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add parallelism support for sqlness #71

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sqlness-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl CliController {
impl EnvController for CliController {
type DB = DBProxy;

async fn start(&self, _env: &str, _config: Option<&Path>) -> Self::DB {
async fn start(&self, _env: &str, _id: usize, _config: Option<&Path>) -> Self::DB {
DBProxy::new(self.db_config.clone(), self.db_type)
}

Expand All @@ -112,6 +112,7 @@ fn main() {

let config = ConfigBuilder::default()
.case_dir(args.case_dir)
.parallelism(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add it to clap parser?

.build()
.expect("build config");

Expand Down
3 changes: 2 additions & 1 deletion sqlness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ readme = { workspace = true }
async-trait = "0.1"
derive_builder = "0.11"
duration-str = "0.11.2"
futures = "0.3"
minijinja = "1"
mysql = { version = "23.0.1", optional = true }
postgres = { version = "0.19.7", optional = true }
prettydiff = { version = "0.6.2", default_features = false }
prettydiff = { version = "0.6.2", default-features = false }
regex = "1.7.1"
serde_json = "1"
thiserror = "1.0"
Expand Down
2 changes: 1 addition & 1 deletion sqlness/examples/bad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ struct MyDB;
#[async_trait]
impl Database for MyDB {
async fn query(&self, _ctx: QueryContext, _query: String) -> Box<dyn Display> {
return Box::new("Unexpected".to_string());
Box::new("Unexpected")
}
}

Expand Down
2 changes: 1 addition & 1 deletion sqlness/examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ struct MyDB;
impl Database for MyDB {
async fn query(&self, _context: QueryContext, _query: String) -> Box<dyn Display> {
// Implement query logic here
return Box::new("ok".to_string());
Box::new("ok")
}
}

Expand Down
9 changes: 8 additions & 1 deletion sqlness/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use derive_builder::Builder;
/// Configurations of [`Runner`].
///
/// [`Runner`]: crate::Runner
#[derive(Builder)]
#[derive(Builder, Clone)]
pub struct Config {
pub case_dir: String,
/// Default value: `sql`
Expand Down Expand Up @@ -38,6 +38,9 @@ pub struct Config {
/// Interceptors used to pre-process input query and post-process query response
#[builder(default = "Config::default_registry()")]
pub interceptor_registry: Registry,
/// Default value: 1
#[builder(default = "Config::default_parallelism()")]
pub parallelism: usize,
}

impl Config {
Expand Down Expand Up @@ -76,6 +79,10 @@ impl Config {
fn default_registry() -> Registry {
Registry::default()
}

fn default_parallelism() -> usize {
1
}
}

/// Config for DatabaseBuilder
Expand Down
2 changes: 1 addition & 1 deletion sqlness/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ use crate::case::QueryContext;
/// [`Runner`]: crate::Runner
/// [`EnvController::start`]: crate::EnvController#tymethod.start
#[async_trait]
pub trait Database {
pub trait Database: Send + Sync {
async fn query(&self, context: QueryContext, query: String) -> Box<dyn Display>;
}
14 changes: 9 additions & 5 deletions sqlness/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ use crate::database::Database;
/// directories of test case directory. Refer to crate level documentation for more information
/// about directory organizaiton rules.
#[async_trait]
pub trait EnvController {
pub trait EnvController: Send + Sync {
type DB: Database;

/// Start a [`Database`] to run test queries.
///
/// Two parameters are the mode of this environment, or environment's name.
/// And the config file's path to this environment if it's find, it's defined
/// by the `env_config_file` field in the root config toml, and the default
/// Three parameters are the mode of this environment, or environment's name,
/// the id of this database instance, and the config file's path to this environment if it's find,
/// it's defined by the `env_config_file` field in the root config toml, and the default
/// value is `config.toml`.
async fn start(&self, env: &str, config: Option<&Path>) -> Self::DB;
///
/// The id is used to distinguish different database instances in the same environment.
/// For example, you may want to run the sqlness test in parallel against different instances
/// of the same environment to accelerate the test.
async fn start(&self, env: &str, id: usize, config: Option<&Path>) -> Self::DB;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strings can be used for identifiers (id) because they offer flexibility and can accommodate any character-based data, making them suitable for dynamic or user-generated IDs that might include numbers, letters, or special characters.


/// Stop one [`Database`].
async fn stop(&self, env: &str, database: Self::DB);
Expand Down
2 changes: 1 addition & 1 deletion sqlness/src/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub trait Interceptor {
}
}

pub type InterceptorFactoryRef = Arc<dyn InterceptorFactory>;
pub type InterceptorFactoryRef = Arc<dyn InterceptorFactory + Send + Sync>;

pub trait InterceptorFactory {
fn try_new(&self, ctx: &str) -> Result<InterceptorRef>;
Expand Down
2 changes: 1 addition & 1 deletion sqlness/src/interceptor/sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub const PREFIX: &str = "SLEEP";
/// - `1s` for 1 second
/// - `1ms` for 1 millisecond
/// - `1s500ms` for 1.5 seconds
/// etc. See detailed format in [duration_str](https://docs.rs/duration-str/0.11.2/duration_str/) crate
/// etc. See detailed format in [duration_str](https://docs.rs/duration-str/0.11.2/duration_str/) crate
///
/// Note that this implementation is not accurate and may be affected by the system load.
/// It is guaranteed that the sleep time is at least the given milliseconds, but the lag may be
Expand Down
83 changes: 63 additions & 20 deletions sqlness/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::fs::{read_dir, OpenOptions};
use std::io::{Cursor, Read, Seek, Write};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Instant;

use prettydiff::basic::{DiffOp, SliceChangeset};
Expand Down Expand Up @@ -59,9 +60,17 @@ impl<E: EnvController> Runner<E> {
} else {
None
};
let db = self.env_controller.start(&env, config_path).await;
let run_result = self.run_env(&env, &db).await;
self.env_controller.stop(&env, db).await;
let parallelism = self.config.parallelism.max(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

min(1)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

min will become either 0 or 1. This is "take the min/max among two" (really misleading, I've fallen into this many times...)

let mut databases = Vec::with_capacity(parallelism);
println!("Creating enviroment with parallelism: {}", parallelism);
for id in 0..parallelism {
let db = self.env_controller.start(&env, id, config_path).await;
databases.push(db);
}
let run_result = self.run_env(&env, &databases).await;
for db in databases {
self.env_controller.stop(&env, db).await;
}

if let Err(e) = run_result {
println!("Environment {env} run failed, error:{e:?}.");
Expand Down Expand Up @@ -105,40 +114,74 @@ impl<E: EnvController> Runner<E> {
Ok(result)
}

async fn run_env(&self, env: &str, db: &E::DB) -> Result<()> {
async fn run_env(&self, env: &str, databases: &[E::DB]) -> Result<()> {
let case_paths = self.collect_case_paths(env).await?;
let mut failed_cases = vec![];
let mut errors = vec![];
let start = Instant::now();
for path in case_paths {
let is_success = self.run_single_case(db, &path).await;
let case_name = path.as_os_str().to_str().unwrap().to_owned();
match is_success {
Ok(false) => failed_cases.push(case_name),
Ok(true) => {}
Err(e) => {
if self.config.fail_fast {
println!("Case {case_name} failed with error {e:?}");
println!("Stopping environment {env} due to previous error.");
break;
} else {
errors.push((case_name, e))

let case_queue = Arc::new(Mutex::new(case_paths));
let failed_cases = Arc::new(Mutex::new(Vec::new()));
let errors = Arc::new(Mutex::new(Vec::new()));

let mut futures = Vec::new();

// Create futures for each database to process cases
for (db_idx, db) in databases.iter().enumerate() {
let case_queue = case_queue.clone();
let failed_cases = failed_cases.clone();
let errors = errors.clone();
let fail_fast = self.config.fail_fast;

futures.push(async move {
loop {
// Try to get next case from the queue
let next_case = {
let mut queue = case_queue.lock().unwrap();
if queue.is_empty() {
break;
}
queue.pop().unwrap()
};

let case_name = next_case.as_os_str().to_str().unwrap().to_owned();
match self.run_single_case(db, &next_case).await {
Ok(false) => {
println!("[DB-{:2}] Case {} failed", db_idx, case_name);
failed_cases.lock().unwrap().push(case_name);
}
Ok(true) => {
println!("[DB-{:2}] Case {} succeeded", db_idx, case_name);
}
Err(e) => {
println!(
"[DB-{:2}] Case {} failed with error {:?}",
db_idx, case_name, e
);
if fail_fast {
errors.lock().unwrap().push((case_name, e));
return;
}
errors.lock().unwrap().push((case_name, e));
}
}
}
}
});
}

futures::future::join_all(futures).await;

println!(
"Environment {} run finished, cost:{}ms",
env,
start.elapsed().as_millis()
);

let failed_cases = failed_cases.lock().unwrap();
if !failed_cases.is_empty() {
println!("Failed cases:");
println!("{failed_cases:#?}");
}

let errors = errors.lock().unwrap();
if !errors.is_empty() {
println!("Error cases:");
println!("{errors:#?}");
Expand Down
Loading