Skip to content

Commit

Permalink
MPMC 2-stack queue
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Jan 15, 2024
1 parent 99cb6c4 commit 8eb8ce8
Show file tree
Hide file tree
Showing 13 changed files with 421 additions and 0 deletions.
18 changes: 18 additions & 0 deletions bench/bench_stdlib_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
open Multicore_bench
module Queue = Stdlib.Queue

let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Queue.create () in

let op push = if push then Queue.push 101 t else Queue.take_opt t |> ignore in

let init _ =
assert (Queue.length t = 0);
Queues.generate_push_and_pop_sequence n_msgs
in
let work _ bits = Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
|> Util.thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_suite ~budgetf = run_one_domain ~budgetf ()
73 changes: 73 additions & 0 deletions bench/bench_two_stack_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
open Multicore_bench
module Queue = Saturn_lockfree.Two_stack_queue

let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
let t = Queue.create () in

let op push = if push then Queue.push t 101 else Queue.pop_opt t |> ignore in

let init _ =
assert (Queue.length t = 0);
Queues.generate_push_and_pop_sequence n_msgs
in
let work _ bits = Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
|> Util.thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2)
?(n_msgs = 50 * Util.iter_factor) () =
let n_domains = n_adders + n_takers in

let t = Queue.create () in

let n_msgs_to_take = Atomic.make 0 |> Multicore_magic.copy_as_padded in
let n_msgs_to_add = Atomic.make 0 |> Multicore_magic.copy_as_padded in

let init _ =
Atomic.set n_msgs_to_take n_msgs;
Atomic.set n_msgs_to_add n_msgs
in
let work i () =
if i < n_adders then
let rec work () =
let n = Util.alloc n_msgs_to_add in
if 0 < n then begin
for i = 1 to n do
Queue.push t i
done;
work ()
end
in
work ()
else
let rec work () =
let n = Util.alloc n_msgs_to_take in
if n <> 0 then
let rec loop n =
if 0 < n then
loop (n - Bool.to_int (Option.is_some (Queue.pop_opt t)))
else work ()
in
loop n
in
work ()
in

let config =
let format role n =
Printf.sprintf "%d %s%s" n role (if n = 1 then "" else "s")
in
Printf.sprintf "%s, %s"
(format "nb adder" n_adders)
(format "nb taker" n_takers)
in

Times.record ~budgetf ~n_domains ~init ~work ()
|> Util.thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
run_one_domain ~budgetf ()
@ (Util.cross [ 1; 2 ] [ 1; 2 ]
|> List.concat_map @@ fun (n_adders, n_takers) ->
run_one ~budgetf ~n_adders ~n_takers ())
32 changes: 32 additions & 0 deletions bench/bits.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
type t = { mutable bytes : Bytes.t; mutable length : int }

let create () = { bytes = Bytes.create 1; length = 0 }

let push t bool =
let capacity = Bytes.length t.bytes lsl 3 in
if t.length == capacity then t.bytes <- Bytes.extend t.bytes 0 (capacity lsr 3);
let byte_i = t.length lsr 3 in
let mask = 1 lsl (t.length land 7) in
t.length <- t.length + 1;
let byte = Char.code (Bytes.unsafe_get t.bytes byte_i) in
let byte = if bool then byte lor mask else byte land lnot mask in
Bytes.unsafe_set t.bytes byte_i (Char.chr byte)

let length t = t.length

let rec iter fn t i =
let n = t.length in
if i < n then begin
let byte = Char.code (Bytes.unsafe_get t.bytes (i lsr 3)) in
fn (0 <> byte land 1);
if i + 1 < n then fn (0 <> byte land 2);
if i + 2 < n then fn (0 <> byte land 4);
if i + 3 < n then fn (0 <> byte land 8);
if i + 4 < n then fn (0 <> byte land 16);
if i + 5 < n then fn (0 <> byte land 32);
if i + 6 < n then fn (0 <> byte land 64);
if i + 7 < n then fn (0 <> byte land 128);
iter fn t (i + 8)
end

let iter fn t = iter fn t 0
2 changes: 2 additions & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ let benchmarks =
("Saturn_lockfree Single_prod_single_cons_queue", Bench_spsc_queue.run_suite);
("Saturn_lockfree Size", Bench_size.run_suite);
("Saturn_lockfree Skiplist", Bench_skiplist.run_suite);
("Saturn_lockfree Two_stack_queue", Bench_two_stack_queue.run_suite);
("Stdlib Queue", Bench_stdlib_queue.run_suite);
]

let () = Multicore_bench.Cmd.run ~benchmarks ()
17 changes: 17 additions & 0 deletions bench/queues.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
let generate_push_and_pop_sequence ?(state = Random.State.make_self_init ())
n_msgs =
let bits = Bits.create () in
let rec loop length n_push n_pop =
if 0 < n_push || 0 < n_pop then begin
let push = Random.State.bool state && 0 < n_push in
Bits.push bits push;
loop
(if push then length + 1 else if 0 < length then length - 1 else length)
(n_push - Bool.to_int push)
(n_pop - Bool.to_int ((not push) && 0 < length))
end
else length
in
let length = loop 0 n_msgs n_msgs in
assert (length = 0);
bits
1 change: 1 addition & 0 deletions src/saturn.ml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ module Single_prod_single_cons_queue =
module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Skiplist = Saturn_lockfree.Skiplist
module Two_stack_queue = Saturn_lockfree.Two_stack_queue
1 change: 1 addition & 0 deletions src/saturn.mli
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ module Single_prod_single_cons_queue =
module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Skiplist = Saturn_lockfree.Skiplist
module Two_stack_queue = Saturn_lockfree.Two_stack_queue
1 change: 1 addition & 0 deletions src_lockfree/saturn_lockfree.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ module Single_consumer_queue = Mpsc_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Size = Size
module Skiplist = Skiplist
module Two_stack_queue = Two_stack_queue
1 change: 1 addition & 0 deletions src_lockfree/saturn_lockfree.mli
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ module Single_consumer_queue = Mpsc_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Skiplist = Skiplist
module Size = Size
module Two_stack_queue = Two_stack_queue
159 changes: 159 additions & 0 deletions src_lockfree/two_stack_queue.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
(* Copyright (c) 2023, Vesa Karvonen <[email protected]>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE. *)

module Atomic = Transparent_atomic

type 'a t = { head : 'a head Atomic.t; tail : 'a tail Atomic.t }

and ('a, _) tdt =
| Cons : {
counter : int;
value : 'a;
suffix : 'a head;
}
-> ('a, [> `Cons ]) tdt
| Head : { counter : int } -> ('a, [> `Head ]) tdt
| Snoc : {
counter : int;
prefix : 'a tail;
value : 'a;
}
-> ('a, [> `Snoc ]) tdt
| Tail : {
counter : int;
mutable move : ('a, [ `Snoc ]) tdt;
}
-> ('a, [> `Tail ]) tdt

and 'a head = H : ('a, [< `Cons | `Head ]) tdt -> 'a head [@@unboxed]
and 'a tail = T : ('a, [< `Snoc | `Tail ]) tdt -> 'a tail [@@unboxed]

let create () =
let head =
Atomic.make (H (Head { counter = 1 })) |> Multicore_magic.copy_as_padded
in
let tail =
Atomic.make (T (Tail { counter = 0; move = Obj.magic () }))
|> Multicore_magic.copy_as_padded
in
{ head; tail } |> Multicore_magic.copy_as_padded

let rec rev (suffix : (_, [< `Cons ]) tdt) = function
| T (Snoc { counter; prefix; value }) ->
rev (Cons { counter; value; suffix = H suffix }) prefix
| T (Tail _) -> suffix

let rev = function
| (Snoc { counter; prefix; value } : (_, [< `Snoc ]) tdt) ->
rev
(Cons { counter; value; suffix = H (Head { counter = counter + 1 }) })
prefix

let rec push backoff t value = function
| T (Snoc snoc_r as snoc) -> push_with backoff t snoc_r.counter (T snoc) value
| T (Tail tail_r as tail) ->
let move = tail_r.move in
if move != Obj.magic () then begin
let (Snoc move_r) = move in
begin
match Atomic.get t.head with
| H (Head head_r as head) when head_r.counter < move_r.counter ->
let after = rev move in
if
Atomic.fenceless_get t.head == H head
&& Atomic.compare_and_set t.head (H head) (H after)
then tail_r.move <- Obj.magic ()
| _ -> ()
end;
let new_tail = Atomic.fenceless_get t.tail in
if new_tail != T tail then push backoff t value new_tail
else push_with backoff t tail_r.counter (T tail) value
end
else push_with backoff t tail_r.counter (T tail) value

and push_with backoff t counter prefix value =
let after = Snoc { counter = counter + 1; prefix; value } in
let new_tail = Atomic.fenceless_get t.tail in
if new_tail != prefix then push backoff t value new_tail
else if not (Atomic.compare_and_set t.tail prefix (T after)) then
let backoff = Backoff.once backoff in
push backoff t value (Atomic.fenceless_get t.tail)

let push t value = push Backoff.default t value (Atomic.fenceless_get t.tail)

exception Empty

let is_tail = function T (Tail _) -> true | T (Snoc _) -> false

let rec pop backoff t =
match Atomic.get t.head with
| H (Cons cons_r as cons) ->
if Atomic.compare_and_set t.head (H cons) cons_r.suffix then cons_r.value
else pop (Backoff.once backoff) t
| H (Head _ as head) -> begin
match Atomic.fenceless_get t.tail with
| T (Snoc snoc_r as move) ->
if is_tail snoc_r.prefix then begin
let tail =
Tail { counter = snoc_r.counter - 1; move = Obj.magic () }
in
if
Atomic.get t.head == H head
&& Atomic.compare_and_set t.tail (T move) (T tail)
then snoc_r.value
else pop backoff t
end
else
let tail = Tail { counter = snoc_r.counter; move } in
if
Atomic.get t.head == H head
&& Atomic.compare_and_set t.tail (T move) (T tail)
then pop_moving backoff t head move tail
else pop backoff t
| T (Tail tail_r as tail) ->
let move = tail_r.move in
if move == Obj.magic () then pop_emptyish backoff t head
else pop_moving backoff t head move tail
end

and pop_moving backoff t (Head head_r as head : (_, [< `Head ]) tdt)
(Snoc move_r as move) (Tail tail_r : (_, [< `Tail ]) tdt) =
if head_r.counter < move_r.counter then
match rev move with
| Cons cons_r ->
if Atomic.compare_and_set t.head (H head) cons_r.suffix then begin
tail_r.move <- Obj.magic ();
cons_r.value
end
else pop backoff t
else pop_emptyish backoff t head

and pop_emptyish backoff t head =
if Atomic.get t.head == H head then raise_notrace Empty else pop backoff t

let pop t = pop Backoff.default t
let pop_opt t = match pop t with value -> Some value | exception Empty -> None

let rec length t =
let head = Atomic.get t.head in
let tail = Atomic.fenceless_get t.tail in
if head != Atomic.get t.head then length t
else
let head_at =
match head with H (Cons r) -> r.counter | H (Head r) -> r.counter
in
let tail_at =
match tail with T (Snoc r) -> r.counter | T (Tail r) -> r.counter
in
tail_at - head_at + 1
20 changes: 20 additions & 0 deletions src_lockfree/two_stack_queue.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
type !'a t
(** *)

val create : unit -> 'a t
(** *)

val push : 'a t -> 'a -> unit
(** *)

exception Empty
(** *)

val pop : 'a t -> 'a
(** *)

val pop_opt : 'a t -> 'a option
(** *)

val length : 'a t -> int
(** *)
13 changes: 13 additions & 0 deletions test/two_stack_queue/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
(test
(package saturn_lockfree)
(name stm_two_stack_queue)
(modules stm_two_stack_queue)
(libraries
saturn_lockfree
qcheck-core
qcheck-core.runner
qcheck-stm.stm
qcheck-stm.sequential
qcheck-stm.domain)
(action
(run %{test} --verbose)))
Loading

0 comments on commit 8eb8ce8

Please sign in to comment.