Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize work-stealing deque #124

Merged
merged 6 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions bench/bench_ws_deque.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ let run_as_scheduler ~budgetf ?(n_domains = 1) () =
in

let rec try_own own =
match Ws_deque.pop (Array.unsafe_get deques own) with
match Ws_deque.pop_exn (Array.unsafe_get deques own) with
| work -> work
| exception Exit -> try_steal own (next own)
| exception Ws_deque.Empty -> try_steal own (next own)
and try_steal own other =
if other = own then raise_notrace Exit
if other = own then raise_notrace Ws_deque.Empty
else
match Ws_deque.steal (Array.unsafe_get deques other) with
match Ws_deque.steal_exn (Array.unsafe_get deques other) with
| work -> work
| exception Exit -> try_steal own (next other)
| exception Ws_deque.Empty -> try_steal own (next other)
in
let rec run own =
match try_own own with
| work ->
work own;
run own
| exception Exit ->
| exception Ws_deque.Empty ->
if not !exit then begin
Domain.cpu_relax ();
run own
Expand All @@ -47,7 +47,7 @@ let run_as_scheduler ~budgetf ?(n_domains = 1) () =
if x == Obj.magic exit then begin
begin
match try_own own with
| exception Exit -> Domain.cpu_relax ()
| exception Ws_deque.Empty -> Domain.cpu_relax ()
| work -> work own
end;
await own promise
Expand Down Expand Up @@ -90,14 +90,19 @@ let run_as_one_domain ~budgetf ?(n_msgs = 150 * Util.iter_factor) order =

let op_lifo push =
if push then Ws_deque.push t 101
else match Ws_deque.pop t with _ -> () | exception Exit -> ()
else
match Ws_deque.pop_exn t with _ -> () | exception Ws_deque.Empty -> ()
and op_fifo push =
if push then Ws_deque.push t 101
else match Ws_deque.steal t with _ -> () | exception Exit -> ()
else
match Ws_deque.steal_exn t with _ -> () | exception Ws_deque.Empty -> ()
in

let init _ =
assert (match Ws_deque.steal t with _ -> false | exception Exit -> true);
assert (
match Ws_deque.steal_exn t with
| _ -> false
| exception Ws_deque.Empty -> true);
Util.generate_push_and_pop_sequence n_msgs
in
let work _ bits =
Expand All @@ -121,7 +126,10 @@ let run_as_spmc ~budgetf ~n_thiefs () =
let n_msgs_to_steal = Atomic.make 0 |> Multicore_magic.copy_as_padded in

let init _ =
assert (match Ws_deque.steal t with _ -> false | exception Exit -> true);
assert (
match Ws_deque.steal_exn t with
| _ -> false
| exception Ws_deque.Empty -> true);
Atomic.set n_msgs_to_steal n_msgs
in
let work i () =
Expand All @@ -131,8 +139,8 @@ let run_as_spmc ~budgetf ~n_thiefs () =
if 0 < n then
let rec loop n =
if 0 < n then
match Ws_deque.steal t with
| exception Exit ->
match Ws_deque.steal_exn t with
| exception Ws_deque.Empty ->
Domain.cpu_relax ();
loop n
| _ -> loop (n - 1)
Expand Down
240 changes: 108 additions & 132 deletions src/ws_deque.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* Copyright (c) 2015, KC Sivaramakrishnan <[email protected]>
* Copyright (c) 2017, Nicolas ASSOUAD <[email protected]>
* Copyright (c) 2021, Tom Kelly <[email protected]>
* Copyright (c) 2024, Vesa Karvonen <[email protected]>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
Expand All @@ -27,150 +28,125 @@
* https://dl.acm.org/doi/abs/10.1145/2442516.2442524
*)

module CArray = struct
type 'a t = 'a array
module Atomic = Multicore_magic.Transparent_atomic

let rec log2 n = if n <= 1 then 0 else 1 + log2 (n asr 1)

let create sz v =
(* [sz] must be a power of two. *)
assert (0 < sz && sz = Int.shift_left 1 (log2 sz));
assert (Int.logand sz (sz - 1) == 0);
Array.make sz v

let size t = Array.length t [@@inline]
let mask t = size t - 1 [@@inline]

let index i t =
(* Because [size t] is a power of two, [i mod (size t)] is the same as
[i land (size t - 1)], that is, [i land (mask t)]. *)
Int.logand i (mask t)
[@@inline]

let get t i = Array.unsafe_get t (index i t) [@@inline]
let put t i v = Array.unsafe_set t (index i t) v [@@inline]

let transfer src dst top num =
ArrayExtra.blit_circularly (* source array and index: *)
src
(index top src) (* target array and index: *)
dst
(index top dst) (* number of elements: *)
num
[@@inline]

let grow t top bottom =
let sz = size t in
assert (bottom - top = sz);
let dst = create (2 * sz) (Obj.magic ()) in
transfer t dst top sz;
dst

let shrink t top bottom =
let sz = size t in
assert (bottom - top <= sz / 2);
let dst = create (sz / 2) (Obj.magic ()) in
transfer t dst top (bottom - top);
dst
end

let min_size = 32
let shrink_const = 3
(** This must be a power of two. *)
let min_capacity = 16

type 'a t = {
top : int Atomic.t;
bottom : int Atomic.t;
tab : 'a ref CArray.t Atomic.t;
mutable next_shrink : int;
top_cache : int ref;
mutable tab : 'a ref array;
}

let create () =
{
top = Atomic.make 1;
bottom = Atomic.make 1;
tab = Atomic.make (CArray.create min_size (Obj.magic ()));
next_shrink = 0;
}

let set_next_shrink q =
let sz = CArray.size (Atomic.get q.tab) in
if sz <= min_size then q.next_shrink <- 0
else q.next_shrink <- sz / shrink_const

let grow q t b =
Atomic.set q.tab (CArray.grow (Atomic.get q.tab) t b);
set_next_shrink q

let size q =
let b = Atomic.get q.bottom in
let t = Atomic.get q.top in
b - t
let top = Atomic.make_contended 0 in
let tab = Array.make min_capacity (Obj.magic ()) in
let bottom = Atomic.make_contended 0 in
let top_cache = ref 0 |> Multicore_magic.copy_as_padded in
{ top; bottom; top_cache; tab } |> Multicore_magic.copy_as_padded

let realloc a t b sz new_sz =
let new_a = Array.make new_sz (Obj.magic ()) in
ArrayExtra.blit_circularly a
(t land (sz - 1))
new_a
(t land (new_sz - 1))
(b - t);
new_a

let push q v =
let v' = ref v in
let b = Atomic.get q.bottom in
let t = Atomic.get q.top in
let a = Atomic.get q.tab in
let size = b - t in
let a =
if size = CArray.size a then (
grow q t b;
Atomic.get q.tab)
else a
in
CArray.put a b v';
Atomic.set q.bottom (b + 1)

let release ptr =
let res = !ptr in
(* we know this ptr will never be dereferenced, but want to
break the reference to ensure that the contents of the
deque array get garbage collected *)
ptr := Obj.magic ();
res
[@@inline]

let pop q =
if size q = 0 then raise Exit
else
let b = Atomic.get q.bottom - 1 in
Atomic.set q.bottom b;
let v = ref v in
(* Read of [bottom] by the owner simply does not require a fence as the
[bottom] is only mutated by the owner. *)
let b = Atomic.fenceless_get q.bottom in
let t_cache = !(q.top_cache) in
let a = q.tab in
let size = b - t_cache in
let capacity = Array.length a in
if
size < capacity
||
let t = Atomic.get q.top in
let a = Atomic.get q.tab in
let size = b - t in
if size < 0 then (
(* empty queue *)
Atomic.set q.bottom (b + 1);
raise Exit)
else
let out = CArray.get a b in
if b = t then
(* single last element *)
if Atomic.compare_and_set q.top t (t + 1) then (
Atomic.set q.bottom (b + 1);
release out)
else (
Atomic.set q.bottom (b + 1);
raise Exit)
else (
(* non-empty queue *)
if q.next_shrink > size then (
Atomic.set q.tab (CArray.shrink a t b);
set_next_shrink q);
release out)
q.top_cache := t;
t != t_cache
then begin
Array.unsafe_set a (b land (capacity - 1)) v;
Atomic.incr q.bottom
end
else
let a = realloc a t_cache b capacity (capacity lsl 1) in
Array.unsafe_set a (b land (Array.length a - 1)) v;
q.tab <- a;
Atomic.incr q.bottom

type ('a, _) poly = Option : ('a, 'a option) poly | Value : ('a, 'a) poly

let pop_opt q = try Some (pop q) with Exit -> None
exception Empty

let rec steal backoff q =
let t = Atomic.get q.top in
let pop_as : type a r. a t -> (a, r) poly -> r =
fun q poly ->
let b = Atomic.fetch_and_add q.bottom (-1) - 1 in
(* Read of [top] at this point requires no fence as we simply need to ensure
that the read happens after updating [bottom]. *)
let t = Atomic.fenceless_get q.top in
let size = b - t in
if 0 < size then begin
let a = q.tab in
let capacity = Array.length a in
let out = Array.unsafe_get a (b land (capacity - 1)) in
let res = !out in
out := Obj.magic ();
if size + size + size <= capacity - min_capacity then
q.tab <- realloc a t b capacity (capacity lsr 1);
match poly with Option -> Some res | Value -> res
end
else if b = t then begin
(* Whether or not the [compare_and_set] below succeeds, [top_cache] can be
updated, because in either case [top] has been incremented. *)
q.top_cache := t + 1;
let got = Atomic.compare_and_set q.top t (t + 1) in
(* This write of [bottom] requires no fence. The deque is empty and
remains so until the next [push]. *)
Atomic.fenceless_set q.bottom (b + 1);
if got then begin
let a = q.tab in
let out = Array.unsafe_get a (b land (Array.length a - 1)) in
let res = !out in
out := Obj.magic ();
match poly with Option -> Some res | Value -> res
end
else match poly with Option -> None | Value -> raise Empty
end
else begin
(* This write of [bottom] requires no fence. The deque is empty and
remains so until the next [push]. *)
Atomic.fenceless_set q.bottom (b + 1);
match poly with Option -> None | Value -> raise Empty
end

let pop_exn q = pop_as q Value
let pop_opt q = pop_as q Option

let rec steal_as : type a r. a t -> Backoff.t -> (a, r) poly -> r =
fun q backoff poly ->
(* Read of [top] does not require a fence at this point, but the read of
[top] must happen before the read of [bottom]. The write of [top] later
has no effect in case we happened to read an old value of [top]. *)
let t = Atomic.fenceless_get q.top in
let b = Atomic.get q.bottom in
let size = b - t in
if size <= 0 then raise Exit
else
let a = Atomic.get q.tab in
let out = CArray.get a t in
if Atomic.compare_and_set q.top t (t + 1) then release out
else steal (Backoff.once backoff) q

let steal q = steal Backoff.default q
let steal_opt q = try Some (steal q) with Exit -> None
if 0 < size then
let a = q.tab in
let out = Array.unsafe_get a (t land (Array.length a - 1)) in
if Atomic.compare_and_set q.top t (t + 1) then begin
let res = !out in
out := Obj.magic ();
match poly with Option -> Some res | Value -> res
end
else steal_as q (Backoff.once backoff) poly
else match poly with Option -> None | Value -> raise Empty

let steal_exn q = steal_as q Backoff.default Value
let steal_opt q = steal_as q Backoff.default Option
10 changes: 6 additions & 4 deletions src/ws_deque.mli
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ type 'a t
val create : unit -> 'a t
(** [create ()] returns a new empty work-stealing queue. *)

exception Empty

(** {1 Queue owner functions} *)

val push : 'a t -> 'a -> unit
(** [push t v] adds [v] to the front of the queue [q].
It should only be invoked by the domain which owns the queue [q]. *)

val pop : 'a t -> 'a
(** [pop q] removes and returns the first element in queue
val pop_exn : 'a t -> 'a
(** [pop_exn q] removes and returns the first element in queue
[q].It should only be invoked by the domain which owns the queue
[q].

Expand All @@ -39,8 +41,8 @@ val pop_opt : 'a t -> 'a option

(** {1 Stealers function} *)

val steal : 'a t -> 'a
(** [steal q] removes and returns the last element from queue
val steal_exn : 'a t -> 'a
(** [steal_exn q] removes and returns the last element from queue
[q]. It should only be invoked by domain which doesn't own the
queue [q].

Expand Down
6 changes: 4 additions & 2 deletions test/ws_deque/dune
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
(test
(package saturn)
(name ws_deque_dscheck)
(libraries atomic dscheck alcotest backoff)
(libraries atomic dscheck alcotest backoff multicore-magic-dscheck)
(build_if
(>= %{ocaml_version} 5))
(modules ArrayExtra ws_deque ws_deque_dscheck))
(modules ArrayExtra ws_deque ws_deque_dscheck)
(flags
(:standard -open Multicore_magic_dscheck)))

(test
(package saturn)
Expand Down
Loading
Loading