Skip to content

Commit

Permalink
Merge pull request #11 from dragostis/api-flexibility
Browse files Browse the repository at this point in the history
Improve the scope API.
  • Loading branch information
dragostis authored Sep 20, 2024
2 parents db6296f + 9440870 commit fc27f2b
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 50 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ rayon = { version = "1.10.0", optional = true }
[[bench]]
name = "overhead"
harness = false
required-features = ["bench"]
required-features = ["bench"]
4 changes: 2 additions & 2 deletions benches/overhead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn no_overhead(bencher: Bencher, nodes: (usize, usize)) {
}

let tree = Node::tree(nodes.0);
let mut thread_pool = ThreadPool::new().unwrap();
let thread_pool = ThreadPool::new().unwrap();
let mut scope = thread_pool.scope();

bencher.bench_local(move || {
Expand All @@ -66,7 +66,7 @@ fn chili_overhead(bencher: Bencher, nodes: (usize, usize)) {
}

let tree = Node::tree(nodes.0);
let mut thread_pool = ThreadPool::new().unwrap();
let thread_pool = ThreadPool::new().unwrap();
let mut scope = thread_pool.scope();

bencher.bench_local(move || {
Expand Down
36 changes: 36 additions & 0 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl<T> Future<T> {

match result {
Ok(_) => {
// SAFETY:
// Lock is acquired, only we are accessing `self.waiting_thread`.
unsafe { *self.waiting_thread.get() = Some(thread::current()) };

Expand All @@ -52,6 +53,7 @@ impl<T> Future<T> {
continue;
}
Err(state) if state == Poll::Ready as u8 => {
// SAFETY:
// `state` is `Poll::Ready` only after `Self::complete`
// releases the lock.
//
Expand Down Expand Up @@ -83,11 +85,13 @@ impl<T> Future<T> {
}
}

// SAFETY:
// Lock is acquired, only we are accessing `self.val`.
unsafe {
*self.val.get() = Some(val);
}

// SAFETY:
// Lock is acquired, only we are accessing `self.waiting_thread`.
if let Some(thread) = unsafe { (*self.waiting_thread.get()).take() } {
thread.unpark();
Expand All @@ -110,8 +114,10 @@ impl<F> JobStack<F> {
}
}

/// SAFETY:
/// It should only be called once.
pub unsafe fn take_once(&self) -> F {
// SAFETY:
// No `Job` has has been executed, therefore `self.f` has not yet been
// `take`n.
unsafe { ManuallyDrop::take(&mut *self.f.get()) }
Expand All @@ -136,6 +142,7 @@ impl<T> Job<T> {
F: FnOnce(&mut Scope<'_>) -> T + Send,
T: Send,
{
/// SAFETY:
/// It should only be called while the `stack` is still alive.
unsafe fn harness<F, T>(
scope: &mut Scope<'_>,
Expand All @@ -145,12 +152,15 @@ impl<T> Job<T> {
F: FnOnce(&mut Scope<'_>) -> T + Send,
T: Send,
{
// SAFETY:
// The `stack` is still alive.
let stack: &JobStack<F> = unsafe { stack.cast().as_ref() };
// SAFETY:
// This is the first call to `take_once` since `Job::execute`
// (the only place where this harness is called) is called only
// after the job has been popped.
let f = unsafe { stack.take_once() };
// SAFETY:
// Before being popped, the `JobQueue` allocates and stores a
// `Future` in `self.fur_or_next` that should get passed here.
let fut: &Future<T> = unsafe { fut.cast().as_ref() };
Expand All @@ -174,11 +184,13 @@ impl<T> Job<T> {
self.stack == other.stack
}

/// SAFETY:
/// It should only be called after being popped from a `JobQueue`.
pub unsafe fn poll(&self) -> bool {
self.fut_or_next
.get()
.map(|fut| {
// SAFETY:
// Before being popped, the `JobQueue` allocates and stores a
// `Future` in `self.fur_or_next` that should get passed here.
let fut = unsafe { fut.as_ref() };
Expand All @@ -187,12 +199,15 @@ impl<T> Job<T> {
.unwrap_or_default()
}

/// SAFETY:
/// It should only be called after being popped from a `JobQueue`.
pub unsafe fn wait(&self) -> Option<thread::Result<T>> {
self.fut_or_next.get().and_then(|fut| {
// SAFETY:
// Before being popped, the `JobQueue` allocates and stores a
// `Future` in `self.fur_or_next` that should get passed here.
let result = unsafe { fut.as_ref().wait() };
// SAFETY:
// We only can drop the `Box` *after* waiting on the `Future`
// in order to ensure unique access.
unsafe {
Expand All @@ -203,10 +218,12 @@ impl<T> Job<T> {
})
}

/// SAFETY:
/// It should only be called in the case where the job has been popped
/// from the front and will not be `Job::Wait`ed.
pub unsafe fn drop(&self) {
if let Some(fut) = self.fut_or_next.get() {
// SAFETY:
// Before being popped, the `JobQueue` allocates and store a
// `Future` in `self.fur_or_next` that should get passed here.
unsafe {
Expand All @@ -217,9 +234,11 @@ impl<T> Job<T> {
}

impl Job {
/// SAFETY:
/// It should only be called while the `JobStack` it was created with is
/// still alive and after being popped from a `JobQueue`.
pub unsafe fn execute(&self, scope: &mut Scope<'_>) {
// SAFETY:
// Before being popped, the `JobQueue` allocates and store a
// `Future` in `self.fur_or_next` that should get passed here.
unsafe {
Expand All @@ -228,6 +247,10 @@ impl Job {
}
}

// SAFETY:
// The job's `stack` will only be accessed after acquiring a lock (in
// `Future`), while `prev` and `fut_or_next` are never accessed after being
// sent across threads.
unsafe impl Send for Job {}

#[derive(Debug)]
Expand Down Expand Up @@ -257,6 +280,7 @@ impl Default for JobQueue {

impl Drop for JobQueue {
fn drop(&mut self) {
// SAFETY:
// `self.sentinel` never gets written over, so it contains the original
// `leak`ed `Box` that gets allocated in `JobQueue::default`.
unsafe {
Expand All @@ -270,13 +294,16 @@ impl JobQueue {
self.len
}

/// SAFETY:
/// Any `Job` pushed onto the queue should alive at least until it gets
/// popped.
pub unsafe fn push_back<T>(&mut self, job: &Job<T>) {
// SAFETY:
// The tail can either be the root `Box::leak`ed in the default
// constructor or a `Job` that has been pushed previously and which is
// still alive.
let current_tail = unsafe { self.tail.as_ref() };
// SAFETY:
// This effectively casts the `Job`'s `fut_or_next` from `Future<T>` to
// `Future<()>` which casts the `Future`'s `Box<T>` to a `Box<()>`.
//
Expand All @@ -294,13 +321,16 @@ impl JobQueue {
self.tail = next_tail.into();
}

/// SAFETY:
/// The last `Job` in the queue must still be alive.
pub unsafe fn pop_back(&mut self) {
// SAFETY:
// The tail can either be the root `Box::leak`ed in the default
// constructor or a `Job` that has been pushed previously and which is
// still alive.
let current_tail = unsafe { self.tail.as_ref() };
if let Some(prev_tail) = current_tail.prev.get() {
// SAFETY:
// `Job`'s `prev` pointer can only be set by `JobQueue::push_back`
// to the previous tail which should still be alive or by
// `JobQueue::pop_front` when it's set to `self.sentinel` which is
Expand All @@ -316,12 +346,15 @@ impl JobQueue {
}
}

/// SAFETY:
/// The first `Job` in the queue must still be alive.
pub unsafe fn pop_front(&mut self) -> Option<Job> {
// SAFETY:
// `self.sentinel` is alive for the entirety of `self`.
let sentinel = unsafe { self.sentinel.as_ref() };

sentinel.fut_or_next.get().map(|next| {
// SAFETY:
// `self.sentinel`'s `fut_or_next` pointer can only be set by
// `JobQueue::push_back` or by `JobQueue::pop_front` when it's set
// to a job that was previous set by `JobQueue::push_back` and
Expand All @@ -331,6 +364,7 @@ impl JobQueue {
if let Some(next) = head.fut_or_next.get() {
sentinel.fut_or_next.set(Some(next.cast()));

// SAFETY:
// `Job`'s `fut_or_next` pointer can only be set by
// `JobQueue::push_back` or by `JobQueue::pop_front` when it's set
// to a job that was previous set by `JobQueue::push_back` and
Expand All @@ -345,6 +379,7 @@ impl JobQueue {
self.tail = sentinel.into();
}

// SAFETY:
// `self.sentinel`'s `fut_or_next` pointer can only be set by
// `JobQueue::push_back` or by `JobQueue::pop_front` when it's set
// to a job that was previous set by `JobQueue::push_back` and
Expand All @@ -357,6 +392,7 @@ impl JobQueue {

self.len -= 1;

// SAFETY:
// `self.sentinel`'s `fut_or_next` pointer can only be set by
// `JobQueue::push_back` or by `JobQueue::pop_front` when it's set
// to a job that was previous set by `JobQueue::push_back` and
Expand Down
Loading

0 comments on commit fc27f2b

Please sign in to comment.