Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make futex and condition variable waiting interfaces more similar. #46

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ set(CHEETAH_SOURCES
personality.c
sched_stats.c
scheduler.c
condvar.c
)

set(CHEETAH_ABI_SOURCE
Expand Down
159 changes: 159 additions & 0 deletions runtime/condvar.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
#include <err.h>
#include <stdatomic.h>

#include "mutex.h"

#if defined __FreeBSD__ || defined __OpenBSD__ || defined __APPLE__
#define HAVE_ERRC 1
#endif

#if USE_FUTEX

#include <stdbool.h>
#include <limits.h> // INT_MAX

#ifdef __linux__
#include <errno.h>
#include <linux/futex.h>
#include <sys/syscall.h>
#include <unistd.h>
#define WAIT FUTEX_WAIT_PRIVATE
#define WAKE FUTEX_WAKE_PRIVATE
#endif

#ifdef __FreeBSD__
#include <sys/types.h>
#include <sys/umtx.h>
#define WAIT UMTX_OP_WAIT
#define WAKE UMTX_OP_WAKE_PRIVATE
#endif

#ifdef __OpenBSD__
// This is not tested.
#include <sys/time.h>
#include <sys/futex.h>
#define WAIT FUTEX_WAIT
#define WAKE FUTEX_WAKE
#endif

// Convenience wrapper for futex syscall.
// In this context only success (true) or failure needs to be returned.
// Linux syscall() returns long. Here the values fit in an int.
// OpenBSD syscall() returns int.
// BSD _umtx_op returns int.
static inline bool futex(futex_t *obj, int futex_op, futex_t val) {
#if defined __linux__
return syscall(SYS_futex, obj, futex_op, val, NULL, NULL, 0) >= 0;
#elif defined __FreeBSD__
return _umtx_op(obj, futex_op, 0, NULL, NULL) >= 0;
#elif defined __OpenBSD__
// This is not tested.
return futex(obj, futex_op, 0, NULL, NULL) >= 0;
#else
// TODO: Private interface __ulock_wait on Mac OS?
// TODO: C++20 std::atomic<>::wait, notify_one, notify_all
#error "no futex implementation"
return false;
#endif
}

// Wait for the object to be unequal to the value.
// Acquire here pairs with release in cond_post.
void cond_wait(futex_t *obj, futex_val_t val) {
while (atomic_load_explicit(obj, memory_order_acquire) == val) {
if (futex(obj, WAIT, val)) {
// Formally the futex operation does not include a fence.
atomic_thread_fence(memory_order_acquire);
break;
}
if (errno != EAGAIN)
err(EXIT_FAILURE, "futex(FUTEX_WAIT)");
}
}

// Set the futex pointed to by `obj` to `val`, and wake up one
// thread waiting on that futex.
void cond_post(futex_t *obj, futex_val_t val) {
atomic_store_explicit(obj, val, memory_order_release);
if (!futex(obj, WAKE, 1))
err(EXIT_FAILURE, "futex(FUTEX_WAKE)");
}

// Set the futex pointed to by `obj` to `val`, and wake up all
// threads waiting on that futex.
void cond_broadcast(futex_t *obj, futex_val_t val) {
atomic_store_explicit(obj, val, memory_order_release);
if (!futex(obj, WAKE, INT_MAX))
err(EXIT_FAILURE, "futex(FUTEX_WAKE)");
}

void cond_wake_some(futex_t *obj, int count) {
if (!futex(obj, WAKE, count))
err(EXIT_FAILURE, "futex(FUTEX_WAKE)");
}

#else // begin pthread implementation

#include <pthread.h>

#if HAVE_ERRC
#define ERRCHK(MSG) \
if (__builtin_expect(error, 0)) errc(EXIT_FAILURE, error, MSG)
#else
#define ERRCHK(MSG) \
if (__builtin_expect(error, 0)) errx(EXIT_FAILURE, MSG " returned %d", error)
#endif

void cond_wait(futex_t *obj, futex_val_t val,
pthread_cond_t *cond, pthread_mutex_t *mutex) {
int error;
// No fast path check. This is only called if *obj == val.
error = pthread_mutex_lock(mutex);
ERRCHK("pthread_mutex_lock");
while (atomic_load_explicit(obj, memory_order_acquire) == val) {
error = pthread_cond_wait(cond, mutex);
ERRCHK("pthread_cond_wait");
}
error = pthread_mutex_unlock(mutex);
ERRCHK("pthread_mutex_unlock");
}

void cond_post(futex_t *obj, futex_val_t val,
pthread_cond_t *cond, pthread_mutex_t *mutex) {
int error;

error = pthread_mutex_lock(mutex);
ERRCHK("pthread_mutex_lock");

atomic_store_explicit(obj, val, memory_order_release);
error = pthread_cond_signal(cond);
ERRCHK("pthread_cond_signal");
error = pthread_mutex_unlock(mutex);
ERRCHK("pthread_mutex_unlock");
}

void cond_broadcast(futex_t *obj, futex_val_t val,
pthread_cond_t *cond, pthread_mutex_t *mutex) {
int error;

error = pthread_mutex_lock(mutex);
ERRCHK("pthread_mutex_lock");
atomic_store_explicit(obj, val, memory_order_release);
error = pthread_cond_broadcast(cond);
ERRCHK("pthread_mutex_broadcast");
error = pthread_mutex_unlock(mutex);
ERRCHK("pthread_mutex_unlock");
}

void cond_wake_some_locked(futex_t *obj, futex_val_t val,
pthread_cond_t *cond, int count) {
int error;

atomic_store_explicit(obj, val, memory_order_release);
while (count-- > 0) {
error = pthread_cond_signal(cond);
ERRCHK("pthread_cond_signal");
}
}

#endif // USE_FUTEX
4 changes: 3 additions & 1 deletion runtime/global.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,16 @@ static global_state *global_state_allocate() {
cilk_mutex_init(&g->index_lock);
cilk_mutex_init(&g->print_lock);

atomic_store_explicit(&g->cilkified_futex, 0, memory_order_relaxed);
atomic_store_explicit(&g->cilkified, 0, memory_order_relaxed);

#if !USE_FUTEX
// TODO: Convert to cilk_* equivalents
pthread_mutex_init(&g->cilkified_lock, NULL);
pthread_cond_init(&g->cilkified_cond_var, NULL);

pthread_mutex_init(&g->disengaged_lock, NULL);
pthread_cond_init(&g->disengaged_cond_var, NULL);
#endif

return g;
}
Expand Down
16 changes: 8 additions & 8 deletions runtime/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,14 @@ struct global_state {

// These fields are shared between the boss thread and a couple workers.

// NOTE: We can probably update the runtime system so that, when it uses
// cilkified_futex, it does not also use the cilkified field. But the
// cilkified field is helpful for debugging, and it seems unlikely that this
// optimization would improve performance.
_Atomic uint32_t cilkified_futex __attribute__((aligned(CILK_CACHE_LINE)));
atomic_bool cilkified;
#if USE_FUTEX
futex_t cilkified __attribute__((aligned(CILK_CACHE_LINE)));
#else
futex_t cilkified;

pthread_mutex_t cilkified_lock;
pthread_cond_t cilkified_cond_var;
#endif

// These fields are shared among all workers in the work-stealing loop.

Expand All @@ -98,10 +97,11 @@ struct global_state {
#define GET_SENTINEL(D) ((D) & 0xffffffff)
#define DISENGAGED_SENTINEL(A, B) (((uint64_t)(A) << 32) | (uint32_t)(B))

_Atomic uint32_t disengaged_thieves_futex __attribute__((aligned(CILK_CACHE_LINE)));

futex_t disengaged_thieves __attribute__((aligned(CILK_CACHE_LINE)));
#if !USE_FUTEX
pthread_mutex_t disengaged_lock;
pthread_cond_t disengaged_cond_var;
#endif

cilk_mutex print_lock; // global lock for printing messages

Expand Down
2 changes: 2 additions & 0 deletions runtime/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -659,13 +659,15 @@ static void global_state_deinit(global_state *g) {
cilk_internal_malloc_global_destroy(g); // internal malloc last
cilk_mutex_destroy(&(g->print_lock));
cilk_mutex_destroy(&(g->index_lock));
#if !USE_FUTEX
// TODO: Convert to cilk_* equivalents
pthread_mutex_destroy(&g->cilkified_lock);
pthread_cond_destroy(&g->cilkified_cond_var);
/* pthread_mutex_destroy(&g->start_thieves_lock); */
/* pthread_cond_destroy(&g->start_thieves_cond_var); */
pthread_mutex_destroy(&g->disengaged_lock);
pthread_cond_destroy(&g->disengaged_cond_var);
#endif
free(g->worker_args);
g->worker_args = NULL;
free(g->workers);
Expand Down
46 changes: 45 additions & 1 deletion runtime/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,28 @@ typedef union cilk_mutex cilk_mutex;
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>

#include "rts-config.h"

// Linux uses uint32_t. OpenBSD copies Linux.
// FreeBSD uses long.
// Other systems don't use the futex interface and can pick either.

#ifdef __FreeBSD__
typedef long futex_val_t;
#define FUTEX_MAX LONG_MAX
#define USE_FUTEX 1
#else
typedef uint32_t futex_val_t;
#define FUTEX_MAX 0x7fffffff
#if defined __linux__ || defined __OpenBSD__
#define USE_FUTEX 1
#endif
#endif

typedef _Atomic futex_val_t futex_t;

#ifndef __APPLE__
#define USE_SPINLOCK 1
#endif
Expand Down Expand Up @@ -85,4 +104,29 @@ static inline void cilk_mutex_destroy(cilk_mutex *lock) {
pthread_mutex_destroy(&(lock->posix));
#endif
}
#endif

#if USE_FUTEX
// Wait for *obj to be unequal to val.
extern void cond_wait(futex_t *obj, futex_val_t val);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add a condvar.h?

// Set *obj = val and wake up one waiter.
extern void cond_post(futex_t *obj, futex_val_t val);
// Set *obj = val and wake up all waiters.
extern void cond_broadcast(futex_t *obj, futex_val_t val);
// Wake up COUNT waiters. The value has already been updated.
extern void cond_wake_some(futex_t *obj, int count);
#else
extern void cond_wait(futex_t *obj, futex_val_t val,
pthread_cond_t *cond,
pthread_mutex_t *mutex);
extern void cond_post(futex_t *obj, futex_val_t val,
pthread_cond_t *cond,
pthread_mutex_t *mutex);
extern void cond_broadcast(futex_t *obj, futex_val_t val,
pthread_cond_t *cond,
pthread_mutex_t *mutex);
// This function is called with the lock held.
extern void cond_wake_some_locked(futex_t *obj, futex_val_t val,
pthread_cond_t *cond, int count);
#endif // USE_FUTEX
#endif // _CILK_MUTEX_H

Loading
Loading