Skip to content

Commit

Permalink
Add Rwlock
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Jan 10, 2025
1 parent fc7ca3e commit 9909e19
Show file tree
Hide file tree
Showing 10 changed files with 445 additions and 18 deletions.
112 changes: 112 additions & 0 deletions bench/bench_rwlock_htbl.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
open Multicore_bench

module Htbl = struct
open Picos_std_sync
module Htbl = Picos_aux_htbl

type ('k, 'v) t = { htbl : ('k, 'v) Htbl.t; rwlock : Rwlock.t }

let create ?hashed_type () =
{
htbl = Htbl.create ?hashed_type ();
rwlock = Rwlock.create ~padded:true ();
}

let remove_all t = Htbl.remove_all t.htbl

let find_opt t key =
Rwlock.rd_lock t.rwlock;
let result =
match Htbl.find_exn t.htbl key with
| value -> Some value
| exception Not_found -> None
in
Rwlock.rd_unlock t.rwlock;
result

let try_add t key value =
Rwlock.wr_lock t.rwlock;
let result = Htbl.try_add t.htbl key value in
Rwlock.wr_unlock t.rwlock;
result

let try_remove t key =
Rwlock.wr_lock t.rwlock;
let result = Htbl.try_remove t.htbl key in
Rwlock.wr_unlock t.rwlock;
result
end

module Key = struct
type t = int

let equal = Int.equal
let hash = Fun.id
end

let run_one ~budgetf ~n_domains ?(n_ops = 100 * Util.iter_factor)
?(n_keys = 1000) ~percent_mem ?(percent_add = (100 - percent_mem + 1) / 2)
?(prepopulate = true) () =
let limit_mem = percent_mem in
let limit_add = percent_mem + percent_add in

assert (0 <= limit_mem && limit_mem <= 100);
assert (limit_mem <= limit_add && limit_add <= 100);

let t = Htbl.create ~hashed_type:(module Key) () in

let n_ops = (100 + percent_mem) * n_ops / 100 in
let n_ops = n_ops * n_domains in

let n_ops_todo = Countdown.create ~n_domains () in

let before () =
let _ : _ Seq.t = Htbl.remove_all t in
Countdown.non_atomic_set n_ops_todo n_ops
in
let init i =
Scheduler.run @@ fun () ->
let state = Random.State.make_self_init () in
if prepopulate then begin
let n = ((i + 1) * n_keys / n_domains) - (i * n_keys / n_domains) in
for _ = 1 to n do
let value = Random.State.bits state in
let key = value mod n_keys in
Htbl.try_add t key value |> ignore
done
end;
state
in
let wrap _ _ = Scheduler.run in
let work domain_index state =
let rec work () =
let n = Countdown.alloc n_ops_todo ~domain_index ~batch:1000 in
if n <> 0 then begin
for _ = 1 to n do
let value = Random.State.bits state in
let op = (value asr 20) mod 100 in
let key = value mod n_keys in
if op < percent_mem then Htbl.find_opt t key |> ignore
else if op < limit_add then Htbl.try_add t key value |> ignore
else Htbl.try_remove t key |> ignore
done;
work ()
end
in
work ()
in

let config =
Printf.sprintf "%d worker%s, %d%% reads" n_domains
(if n_domains = 1 then "" else "s")
percent_mem
in
Times.record ~budgetf ~n_warmups:1 ~n_runs_min:1 ~n_domains ~before ~init
~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_ops ~singular:"operation" ~config

let run_suite ~budgetf =
Util.cross [ 1; 2; 4; 8 ] [ 10; 50; 90; 95; 100 ]
|> List.concat_map @@ fun (n_domains, percent_mem) ->
if Picos_domain.recommended_domain_count () < n_domains then []
else run_one ~budgetf ~n_domains ~percent_mem ()
1 change: 1 addition & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
(run %{test} -brief "Picos_mpmcq")
(run %{test} -brief "Picos_mpscq")
(run %{test} -brief "Picos_htbl")
(run %{test} -brief "Picos_htbl with Rwlock")
(run %{test} -brief "Picos_stdio")
(run %{test} -brief "Picos_sync Stream")
(run %{test} -brief "Fib")
Expand Down
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ let benchmarks =
("Picos_mpmcq", Bench_mpmcq.run_suite);
("Picos_mpscq", Bench_mpscq.run_suite);
("Picos_htbl", Bench_htbl.run_suite);
("Picos_htbl with Rwlock", Bench_rwlock_htbl.run_suite);
("Picos_stdio", Bench_stdio.run_suite);
("Picos_sync Stream", Bench_stream.run_suite);
("Fib", Bench_fib.run_suite);
Expand Down
42 changes: 28 additions & 14 deletions lib/picos_std.awaitable/picos_std_awaitable.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ module Awaitable = struct
end

module Awaiters = struct
let equal_bit = 0b01
let num_bits = 1
let one = 1 lsl num_bits

type _ tdt =
| Zero : [> `Zero ] tdt
| One : {
awaitable : 'a awaitable; (* Might also want to clear this *)
mutable value : 'a; (* This is mutable to avoid space leaks *)
trigger : Trigger.t;
mutable counter : int;
mutable counter_and_bits : int;
mutable next : min0;
}
-> [> `One ] tdt
Expand All @@ -53,22 +57,22 @@ module Awaitable = struct
let[@inline] snoc t (One tail_r as tail) =
match t with
| Min1 (One head_r) ->
tail_r.counter <- head_r.counter + 1;
tail_r.counter_and_bits <- head_r.counter_and_bits + one;
Many { head = One head_r; prev = One head_r; tail }
| Min1 (Many many_r as many) ->
exec many;
let (One prev_r as prev) = many_r.tail in
tail_r.counter <- prev_r.counter + 1;
tail_r.counter_and_bits <- prev_r.counter_and_bits + one;
Many { head = many_r.head; prev; tail }

external as1 : min0 -> is1 = "%identity"

let[@inline] awaitable_of (One r : is1) = Packed.Packed r.awaitable
let[@inline] counter_of (One r : is1) = r.counter
let[@inline] counter_of (One r : is1) = r.counter_and_bits lsr num_bits

let[@inline] next_of (One r : is1) ~tail =
let[@inline] next_of (One r as one : is1) ~tail =
let next = as1 r.next in
let counter = r.counter in
let counter = counter_of one in
if counter_of tail - counter < counter_of next - counter then tail
else next

Expand All @@ -77,7 +81,12 @@ module Awaitable = struct

let[@inline] generalize (One r : is1) = One r
let[@inline] is_signaled (One r : is1) = Trigger.is_signaled r.trigger
let[@inline] is_signalable (One r : is1) = get r.awaitable != r.value

let[@inline] is_signalable (One r : is1) =
Bool.to_int (get r.awaitable != r.value)
lxor (r.counter_and_bits land equal_bit)
<> 0

let[@inline] await (One r : is1) = Trigger.await r.trigger
let[@inline] clear (One r : is1) = r.value <- Obj.magic ()

Expand Down Expand Up @@ -156,7 +165,8 @@ module Awaitable = struct
let trigger = Trigger.create () in
if bits land (1 lsl i) = 0 then Trigger.signal trigger;
let awaitable = make 0 and next = Min0 Zero in
One { awaitable; value = 1; trigger; counter = 0; next }
One
{ awaitable; value = 1; trigger; counter_and_bits = 0; next }
in
let queue = ref (Min1 (make 0)) in
for i = 1 to n - 1 do
Expand Down Expand Up @@ -284,7 +294,7 @@ module Awaitable = struct
let awaitable = make 0 and next = Min0 Zero in
if signaled_bits land (1 lsl i) = 0 then
Trigger.signal trigger;
One { awaitable; value; trigger; counter = 0; next }
One { awaitable; value; trigger; counter_and_bits = 0; next }
in
let queue = ref (Min1 (make 0)) in
for i = 1 to n - 1 do
Expand Down Expand Up @@ -387,12 +397,16 @@ module Awaitable = struct
done
with Not_found -> ()

type op = Isnt | Is

module Awaiter = struct
type t = Awaiters.is1

let add_as (type a) (t : a awaitable) trigger value =
let add_as (type a) (t : a awaitable) trigger op value =
let one : Awaiters.is1 =
One { awaitable = t; value; trigger; counter = 0; next = Min0 Zero }
let counter_and_bits = match op with Isnt -> 0 | Is -> 1 in
One
{ awaitable = t; value; trigger; counter_and_bits; next = Min0 Zero }
in
let backoff = ref Backoff.default in
while
Expand All @@ -409,16 +423,16 @@ module Awaitable = struct

let add (type a) (t : a awaitable) trigger =
let unique_value = Sys.opaque_identity (Obj.magic awaiters : a) in
add_as t trigger unique_value
add_as t trigger Isnt unique_value

let remove one =
Awaiters.signal_and_clear one;
update (Awaiters.awaitable_of one) ~signal:false ~count:1
end

let await t value =
let await t op value =
let trigger = Trigger.create () in
let one = Awaiter.add_as t trigger value in
let one = Awaiter.add_as t trigger op value in
if Awaiters.is_signalable one then Awaiter.remove one
else
match Awaiters.await one with
Expand Down
12 changes: 8 additions & 4 deletions lib/picos_std.awaitable/picos_std_awaitable.mli
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,13 @@ module Awaitable : sig
{{:https://en.wikipedia.org/wiki/Thundering_herd_problem} thundering herd}
phenomena. *)

val await : 'a t -> 'a -> unit
(** [await awaitable before] suspends the current fiber until the awaitable is
explicitly {!signal}ed and has a value other than [before].
(** *)
type op = Isnt | Is

val await : 'a t -> op -> 'a -> unit
(** [await awaitable op before] suspends the current fiber until the awaitable
is explicitly {!signal}ed and has a value that either is or isn't equal to
[before] as specified by the {!op}.
⚠️ This operation is subject to the
{{:https://en.wikipedia.org/wiki/ABA_problem} ABA} problem. An [await] for
Expand Down Expand Up @@ -141,7 +145,7 @@ end
let lock t =
if not (Awaitable.compare_and_set t 0 1) then
while Awaitable.exchange t 2 <> 0 do
Awaitable.await t 2
Awaitable.await t Isnt 2
done
let unlock t =
Expand Down
1 change: 1 addition & 0 deletions lib/picos_std.sync/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(public_name picos_std.sync)
(libraries
(re_export picos_std.event)
picos_std.awaitable
picos
backoff
multicore-magic))
Expand Down
2 changes: 2 additions & 0 deletions lib/picos_std.sync/mutex.ml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ let[@inline] lock ?checked t =
Fiber.check fiber;
lock_as (Fiber.Maybe.of_fiber fiber) t Nothing Backoff.default

let is_locked t = Atomic.get t != Unlocked

let try_lock ?checked t =
let fiber = Fiber.Maybe.current_and_check_if checked in
Atomic.get t == Unlocked
Expand Down
1 change: 1 addition & 0 deletions lib/picos_std.sync/picos_std_sync.ml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module Mutex = Mutex
module Condition = Condition
module Semaphore = Semaphore
module Rwlock = Rwlock
module Lazy = Lazy
module Latch = Latch
module Ivar = Ivar
Expand Down
22 changes: 22 additions & 0 deletions lib/picos_std.sync/picos_std_sync.mli
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,28 @@ module Semaphore : sig
end
end

module Rwlock : sig
(** A read-write lock. *)

type t
(** *)

val create : ?padded:bool -> unit -> t
(** *)

val rd_lock : t -> unit
(** *)

val rd_unlock : t -> unit
(** *)

val wr_lock : t -> unit
(** *)

val wr_unlock : t -> unit
(** *)
end

module Lazy : sig
(** A lazy suspension.
Expand Down
Loading

0 comments on commit 9909e19

Please sign in to comment.