From a0352805fb43f8c74fa86c14adcecbbc5119373c Mon Sep 17 00:00:00 2001 From: j-mendez Date: Wed, 27 Dec 2023 07:00:14 -0500 Subject: [PATCH] perf(lock): add rwlock tracker --- Cargo.lock | 2 +- Cargo.toml | 4 ++-- src/lib.rs | 45 +++++++++++++++++++++++++++++++++------------ 3 files changed, 36 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f3ca967..add9f2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -45,7 +45,7 @@ dependencies = [ [[package]] name = "async_job" -version = "0.1.3" +version = "0.1.4" dependencies = [ "async-trait", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 818271b..363e720 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "async_job" -version = "0.1.3" +version = "0.1.4" edition = "2021" description = "Simple async cron job crate for Rust" repository = "https://github.com/spider-rs/async_job" @@ -17,7 +17,7 @@ chrono = "0.4.31" cron = "0.12.0" lazy_static = "1.4.0" log = "0.4.20" -tokio = { version = "^1.35.0", features = [ "macros", "time", "parking_lot" ] } +tokio = { version = "^1.35.0", features = [ "macros", "time", "parking_lot", "sync" ] } [features] default = ["rt-multi-thread"] diff --git a/src/lib.rs b/src/lib.rs index e883151..93e2d89 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,7 +14,7 @@ //! use async_job::{Job, Runner, Schedule, async_trait}; //! use tokio::time::Duration; //! use tokio; -//! +//! //! struct ExampleJob; //! //! #[async_trait] @@ -65,11 +65,11 @@ use chrono::{DateTime, Duration, Utc}; pub use cron::Schedule; use lazy_static::lazy_static; use log::{debug, error, info}; -use std::sync::mpsc::{Receiver, Sender}; use std::sync::{ atomic::{AtomicBool, Ordering}, - mpsc, Arc, Mutex, + Arc, RwLock, }; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::task::JoinHandle; lazy_static! { @@ -77,7 +77,7 @@ lazy_static! { /// same job to run again while its already running /// unless you specificly allow the job to run in /// parallel with itself - pub static ref TRACKER: Mutex = Mutex::new(Tracker::new()); + pub static ref TRACKER: RwLock = RwLock::new(Tracker::new()); } #[async_trait] @@ -187,7 +187,7 @@ pub struct Runner { /// is the task running or not pub running: bool, /// channel sending message - pub tx: Option>>, + pub tx: Option>>, /// tracker to determine crons working pub working: Arc, } @@ -275,8 +275,14 @@ impl Runner { async fn spawn( runner: Runner, working: Arc, -) -> (Option>, Option>>) { - let (tx, rx): (Sender>, Receiver>) = mpsc::channel(); +) -> ( + Option>, + Option>>, +) { + let (tx, mut rx): ( + UnboundedSender>, + UnboundedReceiver>, + ) = unbounded_channel(); let handler = tokio::spawn(async move { let mut jobs = runner.jobs; @@ -288,12 +294,21 @@ async fn spawn( } for (id, job) in jobs.iter_mut().enumerate() { - let no = (id + 1).to_string(); + let no: String = (id + 1).to_string(); if job.should_run() - && (job.allow_parallel_runs() || !TRACKER.lock().unwrap().running(&id)) + && (job.allow_parallel_runs() + || match TRACKER.read() { + Ok(s) => !s.running(&id), + _ => false, + }) { - TRACKER.lock().unwrap().start(&id); + match TRACKER.write() { + Ok(mut s) => { + s.start(&id); + } + _ => (), + } let now = Utc::now(); debug!( @@ -301,12 +316,18 @@ async fn spawn( format!("cron-job-thread-{}", no), now.format("%H:%M:%S%.f") ); + working.store(true, Ordering::Relaxed); - // keep the work on the same task for now. job.handle().await; - working.store(TRACKER.lock().unwrap().stop(&id) != 0, Ordering::Relaxed); + working.store( + match TRACKER.write() { + Ok(mut s) => s.stop(&id) != 0, + _ => false, + }, + Ordering::Relaxed, + ); debug!( "FINISH: {} --- {}",