diff --git a/bench/bench_rwlock_htbl.ml b/bench/bench_rwlock_htbl.ml new file mode 100644 index 00000000..fc182547 --- /dev/null +++ b/bench/bench_rwlock_htbl.ml @@ -0,0 +1,112 @@ +open Multicore_bench + +module Htbl = struct + open Picos_std_sync + module Htbl = Picos_aux_htbl + + type ('k, 'v) t = { htbl : ('k, 'v) Htbl.t; rwlock : Rwlock.t } + + let create ?hashed_type () = + { + htbl = Htbl.create ?hashed_type (); + rwlock = Rwlock.create ~padded:true (); + } + + let remove_all t = Htbl.remove_all t.htbl + + let find_opt t key = + Rwlock.rd_lock t.rwlock; + let result = + match Htbl.find_exn t.htbl key with + | value -> Some value + | exception Not_found -> None + in + Rwlock.rd_unlock t.rwlock; + result + + let try_add t key value = + Rwlock.wr_lock t.rwlock; + let result = Htbl.try_add t.htbl key value in + Rwlock.wr_unlock t.rwlock; + result + + let try_remove t key = + Rwlock.wr_lock t.rwlock; + let result = Htbl.try_remove t.htbl key in + Rwlock.wr_unlock t.rwlock; + result +end + +module Key = struct + type t = int + + let equal = Int.equal + let hash = Fun.id +end + +let run_one ~budgetf ~n_domains ?(n_ops = 100 * Util.iter_factor) + ?(n_keys = 1000) ~percent_mem ?(percent_add = (100 - percent_mem + 1) / 2) + ?(prepopulate = true) () = + let limit_mem = percent_mem in + let limit_add = percent_mem + percent_add in + + assert (0 <= limit_mem && limit_mem <= 100); + assert (limit_mem <= limit_add && limit_add <= 100); + + let t = Htbl.create ~hashed_type:(module Key) () in + + let n_ops = (100 + percent_mem) * n_ops / 100 in + let n_ops = n_ops * n_domains in + + let n_ops_todo = Countdown.create ~n_domains () in + + let before () = + let _ : _ Seq.t = Htbl.remove_all t in + Countdown.non_atomic_set n_ops_todo n_ops + in + let init i = + Scheduler.run @@ fun () -> + let state = Random.State.make_self_init () in + if prepopulate then begin + let n = ((i + 1) * n_keys / n_domains) - (i * n_keys / n_domains) in + for _ = 1 to n do + let value = Random.State.bits state in + let key = value mod n_keys in + Htbl.try_add t key value |> ignore + done + end; + state + in + let wrap _ _ = Scheduler.run in + let work domain_index state = + let rec work () = + let n = Countdown.alloc n_ops_todo ~domain_index ~batch:1000 in + if n <> 0 then begin + for _ = 1 to n do + let value = Random.State.bits state in + let op = (value asr 20) mod 100 in + let key = value mod n_keys in + if op < percent_mem then Htbl.find_opt t key |> ignore + else if op < limit_add then Htbl.try_add t key value |> ignore + else Htbl.try_remove t key |> ignore + done; + work () + end + in + work () + in + + let config = + Printf.sprintf "%d worker%s, %d%% reads" n_domains + (if n_domains = 1 then "" else "s") + percent_mem + in + Times.record ~budgetf ~n_warmups:1 ~n_runs_min:1 ~n_domains ~before ~init + ~wrap ~work () + |> Times.to_thruput_metrics ~n:n_ops ~singular:"operation" ~config + +let run_suite ~budgetf = + Util.cross [ 1; 2; 4; 8 ] [ 10; 50; 90; 95; 100 ] + |> List.concat_map @@ fun (n_domains, percent_mem) -> + if Picos_domain.recommended_domain_count () < n_domains then [] + else run_one ~budgetf ~n_domains ~percent_mem () diff --git a/bench/dune b/bench/dune index 3d27fe7e..dc63dea8 100644 --- a/bench/dune +++ b/bench/dune @@ -17,6 +17,7 @@ (run %{test} -brief "Picos_mpmcq") (run %{test} -brief "Picos_mpscq") (run %{test} -brief "Picos_htbl") + (run %{test} -brief "Picos_htbl with Rwlock") (run %{test} -brief "Picos_stdio") (run %{test} -brief "Picos_sync Stream") (run %{test} -brief "Fib") diff --git a/bench/main.ml b/bench/main.ml index 423c3774..2f8e3b1b 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -16,6 +16,7 @@ let benchmarks = ("Picos_mpmcq", Bench_mpmcq.run_suite); ("Picos_mpscq", Bench_mpscq.run_suite); ("Picos_htbl", Bench_htbl.run_suite); + ("Picos_htbl with Rwlock", Bench_rwlock_htbl.run_suite); ("Picos_stdio", Bench_stdio.run_suite); ("Picos_sync Stream", Bench_stream.run_suite); ("Fib", Bench_fib.run_suite); diff --git a/lib/picos_std.awaitable/picos_std_awaitable.ml b/lib/picos_std.awaitable/picos_std_awaitable.ml index 7734b8c3..be1c57bb 100644 --- a/lib/picos_std.awaitable/picos_std_awaitable.ml +++ b/lib/picos_std.awaitable/picos_std_awaitable.ml @@ -29,13 +29,17 @@ module Awaitable = struct end module Awaiters = struct + let equal_bit = 0b01 + let num_bits = 1 + let one = 1 lsl num_bits + type _ tdt = | Zero : [> `Zero ] tdt | One : { awaitable : 'a awaitable; (* Might also want to clear this *) mutable value : 'a; (* This is mutable to avoid space leaks *) trigger : Trigger.t; - mutable counter : int; + mutable counter_and_bits : int; mutable next : min0; } -> [> `One ] tdt @@ -53,22 +57,22 @@ module Awaitable = struct let[@inline] snoc t (One tail_r as tail) = match t with | Min1 (One head_r) -> - tail_r.counter <- head_r.counter + 1; + tail_r.counter_and_bits <- head_r.counter_and_bits + one; Many { head = One head_r; prev = One head_r; tail } | Min1 (Many many_r as many) -> exec many; let (One prev_r as prev) = many_r.tail in - tail_r.counter <- prev_r.counter + 1; + tail_r.counter_and_bits <- prev_r.counter_and_bits + one; Many { head = many_r.head; prev; tail } external as1 : min0 -> is1 = "%identity" let[@inline] awaitable_of (One r : is1) = Packed.Packed r.awaitable - let[@inline] counter_of (One r : is1) = r.counter + let[@inline] counter_of (One r : is1) = r.counter_and_bits lsr num_bits - let[@inline] next_of (One r : is1) ~tail = + let[@inline] next_of (One r as one : is1) ~tail = let next = as1 r.next in - let counter = r.counter in + let counter = counter_of one in if counter_of tail - counter < counter_of next - counter then tail else next @@ -77,7 +81,12 @@ module Awaitable = struct let[@inline] generalize (One r : is1) = One r let[@inline] is_signaled (One r : is1) = Trigger.is_signaled r.trigger - let[@inline] is_signalable (One r : is1) = get r.awaitable != r.value + + let[@inline] is_signalable (One r : is1) = + Bool.to_int (get r.awaitable != r.value) + lxor (r.counter_and_bits land equal_bit) + <> 0 + let[@inline] await (One r : is1) = Trigger.await r.trigger let[@inline] clear (One r : is1) = r.value <- Obj.magic () @@ -156,7 +165,8 @@ module Awaitable = struct let trigger = Trigger.create () in if bits land (1 lsl i) = 0 then Trigger.signal trigger; let awaitable = make 0 and next = Min0 Zero in - One { awaitable; value = 1; trigger; counter = 0; next } + One + { awaitable; value = 1; trigger; counter_and_bits = 0; next } in let queue = ref (Min1 (make 0)) in for i = 1 to n - 1 do @@ -284,7 +294,7 @@ module Awaitable = struct let awaitable = make 0 and next = Min0 Zero in if signaled_bits land (1 lsl i) = 0 then Trigger.signal trigger; - One { awaitable; value; trigger; counter = 0; next } + One { awaitable; value; trigger; counter_and_bits = 0; next } in let queue = ref (Min1 (make 0)) in for i = 1 to n - 1 do @@ -387,12 +397,16 @@ module Awaitable = struct done with Not_found -> () + type op = Isnt | Is + module Awaiter = struct type t = Awaiters.is1 - let add_as (type a) (t : a awaitable) trigger value = + let add_as (type a) (t : a awaitable) trigger op value = let one : Awaiters.is1 = - One { awaitable = t; value; trigger; counter = 0; next = Min0 Zero } + let counter_and_bits = match op with Isnt -> 0 | Is -> 1 in + One + { awaitable = t; value; trigger; counter_and_bits; next = Min0 Zero } in let backoff = ref Backoff.default in while @@ -409,16 +423,16 @@ module Awaitable = struct let add (type a) (t : a awaitable) trigger = let unique_value = Sys.opaque_identity (Obj.magic awaiters : a) in - add_as t trigger unique_value + add_as t trigger Isnt unique_value let remove one = Awaiters.signal_and_clear one; update (Awaiters.awaitable_of one) ~signal:false ~count:1 end - let await t value = + let await t op value = let trigger = Trigger.create () in - let one = Awaiter.add_as t trigger value in + let one = Awaiter.add_as t trigger op value in if Awaiters.is_signalable one then Awaiter.remove one else match Awaiters.await one with diff --git a/lib/picos_std.awaitable/picos_std_awaitable.mli b/lib/picos_std.awaitable/picos_std_awaitable.mli index 7fc68da6..602f89dd 100644 --- a/lib/picos_std.awaitable/picos_std_awaitable.mli +++ b/lib/picos_std.awaitable/picos_std_awaitable.mli @@ -82,9 +82,13 @@ module Awaitable : sig {{:https://en.wikipedia.org/wiki/Thundering_herd_problem} thundering herd} phenomena. *) - val await : 'a t -> 'a -> unit - (** [await awaitable before] suspends the current fiber until the awaitable is - explicitly {!signal}ed and has a value other than [before]. + (** *) + type op = Isnt | Is + + val await : 'a t -> op -> 'a -> unit + (** [await awaitable op before] suspends the current fiber until the awaitable + is explicitly {!signal}ed and has a value that either is or isn't equal to + [before] as specified by the {!op}. ⚠️ This operation is subject to the {{:https://en.wikipedia.org/wiki/ABA_problem} ABA} problem. An [await] for @@ -141,7 +145,7 @@ end let lock t = if not (Awaitable.compare_and_set t 0 1) then while Awaitable.exchange t 2 <> 0 do - Awaitable.await t 2 + Awaitable.await t Isnt 2 done let unlock t = diff --git a/lib/picos_std.sync/dune b/lib/picos_std.sync/dune index 39b0cbf5..51574c3e 100644 --- a/lib/picos_std.sync/dune +++ b/lib/picos_std.sync/dune @@ -3,6 +3,7 @@ (public_name picos_std.sync) (libraries (re_export picos_std.event) + picos_std.awaitable picos backoff multicore-magic)) diff --git a/lib/picos_std.sync/mutex.ml b/lib/picos_std.sync/mutex.ml index 78259322..ed4f5c98 100644 --- a/lib/picos_std.sync/mutex.ml +++ b/lib/picos_std.sync/mutex.ml @@ -134,6 +134,8 @@ let[@inline] lock ?checked t = Fiber.check fiber; lock_as (Fiber.Maybe.of_fiber fiber) t Nothing Backoff.default +let is_locked t = Atomic.get t != Unlocked + let try_lock ?checked t = let fiber = Fiber.Maybe.current_and_check_if checked in Atomic.get t == Unlocked diff --git a/lib/picos_std.sync/picos_std_sync.ml b/lib/picos_std.sync/picos_std_sync.ml index 007b8cf0..75d158b1 100644 --- a/lib/picos_std.sync/picos_std_sync.ml +++ b/lib/picos_std.sync/picos_std_sync.ml @@ -1,6 +1,7 @@ module Mutex = Mutex module Condition = Condition module Semaphore = Semaphore +module Rwlock = Rwlock module Lazy = Lazy module Latch = Latch module Ivar = Ivar diff --git a/lib/picos_std.sync/picos_std_sync.mli b/lib/picos_std.sync/picos_std_sync.mli index 5c1a3d5d..e7275c10 100644 --- a/lib/picos_std.sync/picos_std_sync.mli +++ b/lib/picos_std.sync/picos_std_sync.mli @@ -176,6 +176,28 @@ module Semaphore : sig end end +module Rwlock : sig + (** A read-write lock. *) + + type t + (** *) + + val create : ?padded:bool -> unit -> t + (** *) + + val rd_lock : t -> unit + (** *) + + val rd_unlock : t -> unit + (** *) + + val wr_lock : t -> unit + (** *) + + val wr_unlock : t -> unit + (** *) +end + module Lazy : sig (** A lazy suspension. diff --git a/lib/picos_std.sync/rwlock.ml b/lib/picos_std.sync/rwlock.ml new file mode 100644 index 00000000..13d65303 --- /dev/null +++ b/lib/picos_std.sync/rwlock.ml @@ -0,0 +1,269 @@ +open Picos_std_awaitable +open Picos +module Atomic = Multicore_magic.Transparent_atomic + +module Unfair_mutex : sig + type t + + val create : ?padded:bool -> unit -> t + val lock : t -> unit + val unlock : t -> unit + val is_locked : t -> bool +end = struct + type t = int Awaitable.t + + let create ?padded () = Awaitable.make ?padded 0 + + let lock t = + if not (Awaitable.compare_and_set t 0 1) then + while Awaitable.exchange t 2 <> 0 do + Awaitable.await t Isnt 2 + done + + let unlock t = + let before = Awaitable.fetch_and_add t (-1) in + if before = 2 then begin + Awaitable.set t 0; + Awaitable.signal t + end + + let is_locked t = Awaitable.get t <> 0 +end + +module Zero : sig + type t + + val create : ?padded:bool -> unit -> t + val is_zero : t -> bool + val incr : t -> unit + val decr : t -> bool + val signal : t -> unit + val broadcast : t -> unit + val await : t -> unit +end = struct + type t = int Awaitable.t + + let create ?padded () = Awaitable.make ?padded 0 + let is_zero t = Awaitable.get t == 0 + let incr t = Awaitable.incr t + let decr t = Awaitable.fetch_and_add t (-1) = 1 + let signal = Awaitable.signal + let broadcast = Awaitable.broadcast + let await t = if Awaitable.get t <> 0 then Awaitable.await t Is 0 +end + +module Tree_zero : sig + type t + + val create : ?padded:bool -> unit -> t + val is_zero : t -> bool + val incr : t -> unit + val decr : t -> bool + val signal : t -> unit + val broadcast : t -> unit + val await : t -> unit +end = struct + type t = int Awaitable.t array Atomic.t + + (* TODO: RESIZING *) + + let parent_of i = (i - 1) lsr 1 + + let create ?padded () = + Array.init 31 (fun _ -> Awaitable.make ~padded:true 0) + |> Atomic.make + |> Multicore_magic.copy_as ?padded + + let root_of t = Array.unsafe_get (Atomic.get t) 0 + let is_zero t = Awaitable.get (root_of t) == 0 + + let rec decr_at nodes i = + let counter = Array.unsafe_get nodes i in + let count = Awaitable.fetch_and_add counter (-1) in + count = 1 && (i = 0 || decr_at nodes (parent_of i)) + + let rec incr_at nodes i = + let counter = Array.unsafe_get nodes i in + let incremented_parent = ref false in + let count = ref 0 in + while + count := Awaitable.get counter; + if i <> 0 && !count = 0 && not !incremented_parent then begin + incr_at nodes (parent_of i); + incremented_parent := true + end; + not (Awaitable.compare_and_set counter !count (!count + 1)) + do + () + done; + if i <> 0 && !count <> 0 && !incremented_parent then + let _ : bool = decr_at nodes (parent_of i) in + () + + let incr t = + let nodes = Atomic.get t in + let leaf = Multicore_magic.instantaneous_domain_index () in + incr_at nodes (leaf + (Array.length nodes lsr 1)) + + let rec decr nodes leaf = + let i = leaf + (Array.length nodes lsr 1) in + let counter = Array.unsafe_get nodes i in + let count = Awaitable.get counter in + if 0 < count then + if Awaitable.compare_and_set counter count (count - 1) then + count = 1 && decr_at nodes (parent_of i) + else decr nodes leaf + else decr nodes ((leaf - 1) land (Array.length nodes lsr 1)) + + let decr t = + let nodes = Atomic.get t in + decr nodes (Multicore_magic.instantaneous_domain_index ()) + + let signal t = Awaitable.signal (root_of t) + let broadcast t = Awaitable.broadcast (root_of t) + + let await t = + let root = root_of t in + if Awaitable.get root <> 0 then Awaitable.await root Is 0 +end + +module Distributed_zero : sig + type t + + val create : ?padded:bool -> unit -> t + val is_zero : t -> bool + val incr : t -> unit + val decr : t -> bool + val signal : t -> unit + val await : t -> unit +end = struct + type t = { + counters : int Atomic.t array Atomic.t; + mutable trigger : Trigger.t; + } + + let create ?padded () = + let counters = + Array.init 2 (fun _ -> Atomic.make 0 |> Multicore_magic.copy_as_padded) + |> Atomic.make + |> Multicore_magic.copy_as ?padded + in + let trigger = Trigger.create () in + { counters; trigger } + + let rec accum counters i sum = + if i < Array.length counters then + accum counters (i + 2) (sum + Atomic.get (Array.unsafe_get counters i)) + else sum + + let rec is_zero t = + let counters = Atomic.get t.counters in + let decrs = accum counters 0 0 in + let incrs = accum counters 1 0 in + if Atomic.get t.counters == counters then decrs == incrs else is_zero t + + let rec update t which = + let i = which + (2 * Multicore_magic.instantaneous_domain_index ()) in + let counters = Atomic.get t in + if i < Array.length counters then Atomic.incr (Array.unsafe_get counters i) + else + let new_counters = + Array.init (Array.length counters * 2) @@ fun i -> + if i < Array.length counters then Array.unsafe_get counters i + else Atomic.make 0 |> Multicore_magic.copy_as_padded + in + Atomic.compare_and_set t counters new_counters |> ignore; + update t which + + let incr t = update t.counters 1 + + let decr t = + update t.counters 0; + true + + let signal t = Trigger.signal t.trigger + + let await t = + let tries = ref 1_000 in + while not (is_zero t) do + let n = !tries in + if 0 < n then begin + tries := n - 1; + let _ : Backoff.t = Backoff.once Backoff.default in + () + end + else + match Trigger.await t.trigger with + | None -> t.trigger <- Trigger.create () + | Some exn_bt -> + t.trigger <- Trigger.create (); + Printexc.raise_with_backtrace (fst exn_bt) (snd exn_bt) + done +end + +module R0 = Distributed_zero +module B0 = Zero +module M = Unfair_mutex +module W0 = Zero + +type t = { readers : R0.t; blockers : B0.t; mutex : M.t; writers : W0.t } + +let create ?padded () = + let readers = R0.create ?padded () in + let blockers = B0.create ?padded () in + let mutex = M.create ?padded () in + let writers = W0.create ?padded () in + { readers; blockers; mutex; writers } |> Multicore_magic.copy_as ?padded + +let wr_lock t = + B0.await t.blockers; + W0.incr t.writers; + match M.lock t.mutex with + | () -> begin + try R0.await t.readers + with exn -> + let bt = Printexc.get_raw_backtrace () in + M.unlock t.mutex; + if W0.decr t.writers then W0.broadcast t.writers; + Printexc.raise_with_backtrace exn bt + end + | exception exn -> + let bt = Printexc.get_raw_backtrace () in + if W0.decr t.writers then W0.broadcast t.writers; + Printexc.raise_with_backtrace exn bt + +let wr_unlock t = + M.unlock t.mutex; + if W0.decr t.writers then W0.broadcast t.writers + +let rd_lock t = + let patience = ref 1_000 in + while + R0.incr t.readers; + M.is_locked t.mutex + && begin + if R0.decr t.readers then R0.signal t.readers; + while M.is_locked t.mutex (* not (W0.is_zero t.writers) *) do + let _ : Backoff.t = Backoff.once Backoff.default in + let n = !patience in + if 0 < n then begin + let n = n - 1 in + patience := n; + if 0 = n then begin + B0.incr t.blockers; + try W0.await t.writers + with exn -> + let bt = Printexc.get_raw_backtrace () in + if B0.decr t.blockers then B0.broadcast t.blockers; + Printexc.raise_with_backtrace exn bt + end + end + done; + true + end + do + () + done; + if !patience = 0 && B0.decr t.blockers then B0.broadcast t.blockers + +let rd_unlock t = if R0.decr t.readers then R0.signal t.readers