diff --git a/src/job.rs b/src/job.rs index 0d63717..6bd39f5 100644 --- a/src/job.rs +++ b/src/job.rs @@ -233,6 +233,7 @@ unsafe impl Send for Job {} pub struct JobQueue { sentinel: NonNull, tail: NonNull, + len: u32, } impl Default for JobQueue { @@ -248,6 +249,7 @@ impl Default for JobQueue { Self { sentinel: root, tail: root, + len: 0, } } } @@ -263,6 +265,10 @@ impl Drop for JobQueue { } impl JobQueue { + pub fn len(&self) -> u32 { + self.len + } + /// Any `Job` pushed onto the queue should alive at least until it gets /// popped. pub unsafe fn push_back(&mut self, job: &Job) { @@ -282,6 +288,8 @@ impl JobQueue { .set(Some(NonNull::from(next_tail).cast())); next_tail.prev.set(Some(current_tail.into())); + self.len += 1; + self.tail = next_tail.into(); } @@ -301,6 +309,8 @@ impl JobQueue { current_tail.prev.set(None); prev_tail.fut_or_next.set(None); + self.len -= 1; + self.tail = prev_tail.into(); } } @@ -344,6 +354,8 @@ impl JobQueue { head.fut_or_next .set(Some(Box::leak(Box::new(Future::default())).into())); + self.len -= 1; + // `self.sentinel`'s `fut_or_next` pointer can only be set by // `JobQueue::push_back` or by `JobQueue::pop_front` when it's set // to a job that was previous set by `JobQueue::push_back` and diff --git a/src/lib.rs b/src/lib.rs index 570f323..fed749d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -276,13 +276,34 @@ impl<'s> Scope<'s> { self.context.heartbeats[self.worker_index].store(false, Ordering::Relaxed); } - fn join_inner(&mut self, a: A, b: B) -> (RA, RB) + fn join_seq(&mut self, a: A, b: B) -> (RA, RB) where A: FnOnce(&mut Scope<'_>) -> RA + Send, B: FnOnce(&mut Scope<'_>) -> RB + Send, RA: Send, RB: Send, { + let rb = b(self); + let ra = a(self); + + (ra, rb) + } + + fn join_heartbeat(&mut self, a: A, b: B) -> (RA, RB) + where + A: FnOnce(&mut Scope<'_>) -> RA + Send, + B: FnOnce(&mut Scope<'_>) -> RB + Send, + RA: Send, + RB: Send, + { + let a = move |scope: &mut Scope<'_>| { + if scope.context.heartbeats[scope.worker_index].load(Ordering::Relaxed) { + scope.heartbeat(); + } + + a(scope) + }; + let stack = JobStack::new(a); let job = Job::new(&stack); @@ -346,7 +367,7 @@ impl<'s> Scope<'s> { RA: Send, RB: Send, { - self.join_with_heartbeat_every::<16, _, _, _, _>(a, b) + self.join_with_heartbeat_every::<64, _, _, _, _>(a, b) } /// Runs `a` and `b` potentially in parallel on separate threads and @@ -383,18 +404,10 @@ impl<'s> Scope<'s> { { self.join_count = self.join_count.wrapping_add(1) % TIMES; - if self.join_count == 0 { - let a = move |scope: &mut Scope<'_>| { - if scope.context.heartbeats[scope.worker_index].load(Ordering::Relaxed) { - scope.heartbeat(); - } - - a(scope) - }; - - self.join_inner(a, b) + if self.join_count == 0 || self.job_queue.len() < 3 { + self.join_heartbeat(a, b) } else { - self.join_inner(a, b) + self.join_seq(a, b) } } } @@ -613,7 +626,7 @@ mod tests { _ => { let (head, tail) = slice.split_at_mut(1); - s.join( + s.join_with_heartbeat_every::<1, _, _, _, _>( |_| { thread::sleep(Duration::from_micros(10)); head[0] += 1;