diff --git a/bench/bench_semaphore.ml b/bench/bench_semaphore.ml new file mode 100644 index 00000000..abafc8c0 --- /dev/null +++ b/bench/bench_semaphore.ml @@ -0,0 +1,66 @@ +open Multicore_bench +open Picos +open Picos_sync +open Picos_structured + +let is_ocaml4 = String.starts_with ~prefix:"4." Sys.ocaml_version + +(** This will keep a domain running. *) +let yielder computation = + let main _ = + try + while true do + Fiber.yield () + done + with Exit -> () + in + Fiber.spawn (Fiber.create ~forbid:false computation) main + +let n_workers = 4 + +let run_one ~budgetf ~use_domains ~n_resources () = + let semaphore = Semaphore.Counting.make ~padded:true n_resources in + + let n_domains = if use_domains then n_workers else 1 in + let n_ops = + (if use_domains || is_ocaml4 then 10 else 100) * Util.iter_factor + in + + let run_worker () = + let computation = Computation.create () in + if not is_ocaml4 then yielder computation; + for _ = 1 to n_ops do + Semaphore.Counting.acquire semaphore; + Fiber.yield (); + Semaphore.Counting.release semaphore + done; + Computation.cancel computation (Exn_bt.get_callstack 0 Exit) + in + + let init _ = () in + let wrap _ () = Scheduler.run in + let work _ () = + if use_domains then run_worker () + else + Bundle.join_after @@ fun bundle -> + for _ = 1 to n_workers do + Bundle.fork bundle run_worker + done + in + let config = + Printf.sprintf "%d %s%s, %d resource%s" n_workers + (if use_domains then "domain" else "fiber") + (if n_workers = 1 then "" else "s") + n_resources + (if n_resources = 1 then "" else "s") + in + Times.record ~budgetf ~n_domains ~init ~wrap ~work () + |> Times.to_thruput_metrics ~n:(n_ops * n_workers) ~singular:"acquired yield" + ~config + +let run_suite ~budgetf = + Util.cross [ false; true ] [ 1; 2; 3; 4 ] + |> List.concat_map @@ fun (use_domains, n_resources) -> + if use_domains && Picos_domain.recommended_domain_count () < n_workers then + [] + else run_one ~budgetf ~use_domains ~n_resources () diff --git a/bench/main.ml b/bench/main.ml index 91e1de16..e739afb8 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -8,6 +8,7 @@ let benchmarks = ("Picos TLS", Bench_tls.run_suite); ("Picos DLS", Bench_dls.run_suite); ("Picos Mutex", Bench_mutex.run_suite); + ("Picos Semaphore", Bench_semaphore.run_suite); ("Picos Spawn", Bench_spawn.run_suite); ("Picos Yield", Bench_yield.run_suite); ("Picos Cancel_after with Picos_select", Bench_cancel_after.run_suite); diff --git a/lib/picos_structured/picos_structured.mli b/lib/picos_structured/picos_structured.mli index e7684952..e5f73f05 100644 --- a/lib/picos_structured/picos_structured.mli +++ b/lib/picos_structured/picos_structured.mli @@ -393,6 +393,20 @@ end end end; + Flock.fork begin fun () -> + let sem = + Semaphore.Binary.make false + in + Semaphore.Binary.acquire sem + end; + + Flock.fork begin fun () -> + let sem = + Semaphore.Counting.make 0 + in + Semaphore.Counting.acquire sem + end; + Flock.fork begin fun () -> Event.sync (Event.choose []) end; @@ -441,10 +455,11 @@ end end ]} - First of all, note that above the {{!Picos_sync.Mutex} [Mutex]} and - {{!Picos_sync.Condition} [Condition]} modules come from the {!Picos_sync} - library and the {{!Picos_stdio.Unix} [Unix]} module comes from the - {!Picos_stdio} library. They do not come from the standard OCaml libraries. + First of all, note that above the {{!Picos_sync.Mutex} [Mutex]}, + {{!Picos_sync.Condition} [Condition]}, and {{!Picos_sync.Semaphore} + [Semaphore]} modules come from the {!Picos_sync} library and the + {{!Picos_stdio.Unix} [Unix]} module comes from the {!Picos_stdio} library. + They do not come from the standard OCaml libraries. The above program creates a {{!Flock} flock} of fibers and {{!Flock.fork} forks} several fibers to the flock that all block in various ways. In @@ -454,6 +469,9 @@ end - {!Promise.await} never returns as the promise won't be completed, - {{!Picos_sync.Condition.wait} [Condition.wait]} never returns, because the condition is never signaled, + - {{!Picos_sync.Semaphore.Binary.acquire} [Semaphore.Binary.acquire]} and + {{!Picos_sync.Semaphore.Counting.acquire} [Semaphore.Counting.acquire]} + never return, because the counts of the semaphores never change from [0], - {{!Picos_sync.Event.sync} [Event.sync]} never returns, because the event can never be committed to, - {{!Picos_sync.Latch.await} [Latch.await]} never returns, because the count diff --git a/lib/picos_sync/condition.ml b/lib/picos_sync/condition.ml index cd4bef62..e936c4b7 100644 --- a/lib/picos_sync/condition.ml +++ b/lib/picos_sync/condition.ml @@ -1,87 +1,49 @@ open Picos -type state = Empty | Queue of { head : Trigger.t list; tail : Trigger.t list } -type t = state Atomic.t +type t = Trigger.t Q.t Atomic.t -let create ?padded () = Multicore_magic.copy_as ?padded @@ Atomic.make Empty +let create ?padded () = + Multicore_magic.copy_as ?padded @@ Atomic.make (Q.T Zero) -let broadcast t = - if Atomic.get t != Empty then - match Atomic.exchange t Empty with - | Empty -> () - | Queue r -> - List.iter Trigger.signal r.head; - List.iter Trigger.signal (List.rev r.tail) +let broadcast (t : t) = + if Atomic.get t != T Zero then + match Atomic.exchange t (T Zero) with + | T Zero -> () + | T (One _ as q) -> Q.iter q Trigger.signal (* We try to avoid starvation of signal by making it so that when, at the start of signal or wait, the head is empty, the tail is reversed into the head. This way both signal and wait attempt O(1) and O(n) operations at the same time. *) -let rec signal t backoff = +let rec signal (t : t) backoff = match Atomic.get t with - | Empty -> () - | Queue r as before -> begin - match r.head with - | trigger :: head -> - signal_cas t backoff before - (if head == [] && r.tail == [] then Empty else Queue { r with head }) - trigger - | [] -> begin - match List.rev r.tail with - | trigger :: head -> - signal_cas t backoff before - (if head == [] then Empty else Queue { head; tail = [] }) - trigger - | [] -> failwith "impossible" - end - end + | T Zero -> () + | T (One _ as q) as before -> + let after = Q.tail q in + if Atomic.compare_and_set t before after then + let trigger = Q.head q in + Trigger.signal trigger + else signal t (Backoff.once backoff) -and signal_cas t backoff before after trigger = - if Atomic.compare_and_set t before after then Trigger.signal trigger - else signal t (Backoff.once backoff) - -let signal t = signal t Backoff.default - -let rec cleanup backoff trigger t = +let rec cleanup backoff trigger (t : t) = (* We have been canceled. If we can't drop our trigger from the variable, we signal the next trigger in queue to make sure each signal wakes up at least one non-canceled waiter if possible. *) match Atomic.get t with - | Empty -> () - | Queue r as before -> begin - if r.head != [] then - match List_ext.drop_first_or_not_found trigger r.head with - | head -> - cleanup_cas backoff trigger t before - (if head == [] && r.tail == [] then Empty - else Queue { r with head }) - | exception Not_found -> begin - match List_ext.drop_first_or_not_found trigger r.tail with - | tail -> - cleanup_cas backoff trigger t before (Queue { r with tail }) - | exception Not_found -> signal t - end - else - match List_ext.drop_first_or_not_found trigger r.tail with - | tail -> - cleanup_cas backoff trigger t before - (if tail == [] then Empty else Queue { head = []; tail }) - | exception Not_found -> signal t - end + | T Zero -> () + | T (One _ as q) as before -> + let after = Q.remove q trigger in + if before == after then signal t Backoff.default + else if not (Atomic.compare_and_set t before after) then + cleanup (Backoff.once backoff) trigger t -and cleanup_cas backoff trigger t before after = - if not (Atomic.compare_and_set t before after) then - cleanup (Backoff.once backoff) trigger t - -let rec wait t mutex trigger fiber backoff = +let rec wait (t : t) mutex trigger fiber backoff = let before = Atomic.get t in let after = match before with - | Empty -> Queue { head = [ trigger ]; tail = [] } - | Queue r -> - if r.head != [] then Queue { r with tail = trigger :: r.tail } - else Queue { head = List.rev_append r.tail [ trigger ]; tail = [] } + | T Zero -> Q.singleton trigger + | T (One _ as q) -> Q.snoc q trigger in if Atomic.compare_and_set t before after then begin Mutex.unlock_as (Fiber.Maybe.of_fiber fiber) mutex Backoff.default; @@ -101,3 +63,5 @@ let wait t mutex = let fiber = Fiber.current () in let trigger = Trigger.create () in wait t mutex trigger fiber Backoff.default + +let[@inline] signal t = signal t Backoff.default diff --git a/lib/picos_sync/mutex.ml b/lib/picos_sync/mutex.ml index f360ad02..23209693 100644 --- a/lib/picos_sync/mutex.ml +++ b/lib/picos_sync/mutex.ml @@ -10,45 +10,28 @@ type _ tdt = type state = | Unlocked - | Locked of { - fiber : Fiber.Maybe.t; - head : [ `Entry ] tdt list; - tail : [ `Entry ] tdt list; - } + | Locked of { fiber : Fiber.Maybe.t; waiters : [ `Entry ] tdt Q.t } type t = state Atomic.t let create ?padded () = Multicore_magic.copy_as ?padded @@ Atomic.make Unlocked - -(* We try to avoid starvation of unlock by making it so that when, at the start - of lock or unlock, the head is empty, the tail is reversed into the head. - This way both lock and unlock attempt O(1) and O(n) operations at the same - time. *) - -let locked_nothing = - Locked { fiber = Fiber.Maybe.nothing; head = []; tail = [] } +let locked_nothing = Locked { fiber = Fiber.Maybe.nothing; waiters = T Zero } let rec unlock_as owner t backoff = match Atomic.get t with | Unlocked -> unlocked () | Locked r as before -> if Fiber.Maybe.equal r.fiber owner then - match r.head with - | Entry { trigger; fiber } :: rest -> - let after = Locked { r with fiber; head = rest } in + match r.waiters with + | T Zero -> + if not (Atomic.compare_and_set t before Unlocked) then + unlock_as owner t (Backoff.once backoff) + | T (One _ as q) -> + let (Entry { trigger; fiber }) = Q.head q in + let waiters = Q.tail q in + let after = Locked { fiber; waiters } in if Atomic.compare_and_set t before after then Trigger.signal trigger else unlock_as owner t (Backoff.once backoff) - | [] -> begin - match List.rev r.tail with - | Entry { trigger; fiber } :: rest -> - let after = Locked { fiber; head = rest; tail = [] } in - if Atomic.compare_and_set t before after then - Trigger.signal trigger - else unlock_as owner t (Backoff.once backoff) - | [] -> - if not (Atomic.compare_and_set t before Unlocked) then - unlock_as owner t (Backoff.once backoff) - end else not_owner () let[@inline] unlock ?checked t = @@ -60,28 +43,24 @@ let rec cleanup_as (Entry entry_r as entry : [ `Entry ] tdt) t backoff = Otherwise we must remove our entry from the queue. *) match Atomic.get t with | Locked r as before -> begin - match List_ext.drop_first_or_not_found entry r.head with - | head -> - let after = Locked { r with head } in - if not (Atomic.compare_and_set t before after) then - cleanup_as entry t (Backoff.once backoff) - | exception Not_found -> begin - match List_ext.drop_first_or_not_found entry r.tail with - | tail -> - let after = Locked { r with tail } in - if not (Atomic.compare_and_set t before after) then - cleanup_as entry t (Backoff.once backoff) - | exception Not_found -> unlock_as entry_r.fiber t Backoff.default - end + match r.waiters with + | T Zero -> unlock_as entry_r.fiber t backoff + | T (One _ as q) -> + let waiters = Q.remove q entry in + if r.waiters == waiters then unlock_as entry_r.fiber t backoff + else + let after = Locked { fiber = r.fiber; waiters } in + if not (Atomic.compare_and_set t before after) then + cleanup_as entry t (Backoff.once backoff) end - | Unlocked -> unlocked () (* impossible *) + | Unlocked -> unlocked () let rec lock_as fiber t entry backoff = match Atomic.get t with | Unlocked as before -> let after = if fiber == Fiber.Maybe.nothing then locked_nothing - else Locked { fiber; head = []; tail = [] } + else Locked { fiber; waiters = T Zero } in if not (Atomic.compare_and_set t before after) then lock_as fiber t entry (Backoff.once backoff) @@ -94,11 +73,12 @@ let rec lock_as fiber t entry backoff = Entry { trigger; fiber } | Entry _ as entry -> entry in - let after = - if r.head == [] then - Locked { r with head = List.rev_append r.tail [ entry ]; tail = [] } - else Locked { r with tail = entry :: r.tail } + let waiters = + match r.waiters with + | T Zero -> Q.singleton entry + | T (One _ as q) -> Q.snoc q entry in + let after = Locked { fiber = r.fiber; waiters } in if Atomic.compare_and_set t before after then begin match Trigger.await entry_r.trigger with | None -> () @@ -118,7 +98,7 @@ let try_lock ?checked t = Atomic.get t == Unlocked && Atomic.compare_and_set t Unlocked (if fiber == Fiber.Maybe.nothing then locked_nothing - else Locked { fiber; head = []; tail = [] }) + else Locked { fiber; waiters = T Zero }) let protect ?checked t body = let fiber = Fiber.Maybe.current_and_check_if checked in diff --git a/lib/picos_sync/picos_sync.ml b/lib/picos_sync/picos_sync.ml index ffea9a8b..8f8d6522 100644 --- a/lib/picos_sync/picos_sync.ml +++ b/lib/picos_sync/picos_sync.ml @@ -1,5 +1,6 @@ module Mutex = Mutex module Condition = Condition +module Semaphore = Semaphore module Lazy = Lazy module Event = Event module Latch = Latch diff --git a/lib/picos_sync/picos_sync.mli b/lib/picos_sync/picos_sync.mli index 6d353d73..99b01ecc 100644 --- a/lib/picos_sync/picos_sync.mli +++ b/lib/picos_sync/picos_sync.mli @@ -106,6 +106,66 @@ module Condition : sig variable. *) end +module Semaphore : sig + (** {!Counting} and {!Binary} semaphores. + + ℹ️ This intentionally mimics the interface of {!Stdlib.Semaphore}. Unlike + with the standard library semaphores, blocking on these semaphores allows + an effects based scheduler to run other fibers on the thread. *) + + module Counting : sig + (** A counting semaphore. *) + + type t + (** Represents a counting semaphore. *) + + val make : ?padded:bool -> int -> t + (** [make initial] creates a new counting semaphore with the given [initial] + count. + + @raise Invalid_argument in case the given [initial] count is negative. *) + + val release : t -> unit + (** [release semaphore] increments the count of the semaphore. + + @raise Sys_error in case the count would overflow. *) + + val acquire : t -> unit + (** [acquire semaphore] waits until the count of the semaphore is greater + than [0] and then atomically decrements the count. *) + + val try_acquire : t -> bool + (** [try_acquire semaphore] attempts to atomically decrement the count of + the semaphore unless the count is already [0]. *) + + val get_value : t -> int + (** [get_value semaphore] returns the current count of the semaphore. This + should only be used for debugging or informational messages. *) + end + + module Binary : sig + (** A binary semaphore. *) + + type t + (** Represents a binary semaphore. *) + + val make : ?padded:bool -> bool -> t + (** [make initial] creates a new binary semaphore with count of [1] in case + [initial] is [true] and count of [0] otherwise. *) + + val release : t -> unit + (** [release semaphore] sets the count of the semaphore to [1]. *) + + val acquire : t -> unit + (** [acquire semaphore] waits until the count of the semaphore is [1] and + then atomically changes the count to [0]. *) + + val try_acquire : t -> bool + (** [try_acquire semaphore] attempts to atomically change the count of the + semaphore from [1] to [0]. *) + end +end + module Lazy : sig (** A lazy suspension. @@ -550,7 +610,8 @@ end (** {1 Conventions} The optional [padded] argument taken by several constructor functions, e.g. - {!Latch.create}, {!Mutex.create}, and {!Condition.create}, defaults to + {!Latch.create}, {!Mutex.create}, {!Condition.create}, + {!Semaphore.Counting.make}, and {!Semaphore.Binary.make}, defaults to [false]. When explicitly specified as [~padded:true] the object is allocated in a way to avoid {{:https://en.wikipedia.org/wiki/False_sharing} false sharing}. For relatively long lived objects this can improve diff --git a/lib/picos_sync/q.ml b/lib/picos_sync/q.ml new file mode 100644 index 00000000..f96e8bbf --- /dev/null +++ b/lib/picos_sync/q.ml @@ -0,0 +1,72 @@ +type ('a, _) tdt = + | Nil : ('a, [> `Nil ]) tdt + | Cons : { value : 'a; mutable next : 'a spine } -> ('a, [> `Cons ]) tdt + +and 'a spine = S : ('a, [< `Nil | `Cons ]) tdt -> 'a spine [@@unboxed] + +type 'a cons = ('a, [ `Cons ]) tdt + +external as_cons : 'a spine -> 'a cons = "%identity" + +type ('a, _) queue = + | Zero : ('a, [> `Zero ]) queue + | One : { + head : 'a cons; + tail : 'a cons; + cons : 'a cons; + } + -> ('a, [> `One ]) queue + +type ('a, 'n) one = ('a, ([< `One ] as 'n)) queue +type 'a t = T : ('a, [< `Zero | `One ]) queue -> 'a t [@@unboxed] + +let[@inline] singleton value = + let cons = Cons { value; next = S Nil } in + T (One { head = cons; tail = cons; cons }) + +let[@inline] exec (One o : (_, _) one) = + if o.tail != o.cons then + let (Cons tl) = o.tail in + if tl.next != S o.cons then tl.next <- S o.cons + +let[@inline] snoc (One o as t : (_, _) one) value = + exec t; + let cons = Cons { value; next = S Nil } in + T (One { head = o.head; tail = o.cons; cons }) + +let[@inline] head (One { head = Cons hd; _ } : (_, _) one) = hd.value + +let[@inline] tail (One o as t : (_, _) one) = + exec t; + if o.head == o.cons then T Zero + else + let (Cons hd) = o.head in + T (One { head = as_cons hd.next; tail = o.cons; cons = o.cons }) + +let rec iter (Cons cons_r : _ cons) action = + action cons_r.value; + match cons_r.next with S Nil -> () | S (Cons _ as cons) -> iter cons action + +let[@inline] iter (One o as t : (_, _) one) action = + exec t; + iter o.head action + +let rec find_tail (Cons cons_r as cons : _ cons) = + match cons_r.next with S Nil -> cons | S (Cons _ as cons) -> find_tail cons + +let[@tail_mod_cons] rec reject (Cons cons_r : _ cons) value = + if cons_r.value != value then + match cons_r.next with + | S Nil -> raise_notrace Not_found + | S (Cons _ as cons) -> + S (Cons { value = cons_r.value; next = reject cons value }) + else cons_r.next + +let remove (One o as t : (_, _) one) value = + exec t; + match reject o.head value with + | S Nil -> T Zero + | S (Cons _ as head) -> + let tail = find_tail head in + T (One { head; tail; cons = tail }) + | exception Not_found -> T t diff --git a/lib/picos_sync/semaphore.ml b/lib/picos_sync/semaphore.ml new file mode 100644 index 00000000..e7a10d76 --- /dev/null +++ b/lib/picos_sync/semaphore.ml @@ -0,0 +1,117 @@ +open Picos + +let[@inline never] overflow () = raise (Sys_error "overflow") +let[@inline never] negative () = invalid_arg "negative initial count" + +module Counting = struct + type t = Obj.t Atomic.t + + let make ?padded count = + if count < 0 then negative (); + Atomic.make (Obj.repr count) |> Multicore_magic.copy_as ?padded + + let rec release t backoff = + let before = Atomic.get t in + if Obj.is_int before then begin + let count = Obj.obj before in + if count < count + 1 then begin + let after = Obj.repr (count + 1) in + if not (Atomic.compare_and_set t before after) then + release t (Backoff.once backoff) + end + else overflow () + end + else + let after = Q.tail (Obj.obj before) in + if Atomic.compare_and_set t before (Obj.repr after) then + let trigger = Q.head (Obj.obj before) in + Trigger.signal trigger + else release t (Backoff.once backoff) + + let rec cleanup t trigger backoff = + let before = Atomic.get t in + if Obj.is_int before then release t Backoff.default + else + let before = Obj.obj before in + let after = Q.remove before trigger in + if before == after then release t Backoff.default + else if not (Atomic.compare_and_set t (Obj.repr before) (Obj.repr after)) + then cleanup t trigger (Backoff.once backoff) + + let rec acquire t backoff = + let before = Atomic.get t in + if Obj.is_int before then + let count = Obj.obj before in + if 0 < count then begin + let after = Obj.repr (count - 1) in + if not (Atomic.compare_and_set t before after) then + acquire t (Backoff.once backoff) + end + else + let trigger = Trigger.create () in + let after = Q.singleton trigger in + if Atomic.compare_and_set t before (Obj.repr after) then begin + match Trigger.await trigger with + | None -> () + | Some exn_bt -> + cleanup t trigger Backoff.default; + Exn_bt.raise exn_bt + end + else acquire t (Backoff.once backoff) + else + let trigger = Trigger.create () in + let after = Q.snoc (Obj.obj before) trigger in + if Atomic.compare_and_set t before (Obj.repr after) then begin + match Trigger.await trigger with + | None -> () + | Some exn_bt -> + cleanup t trigger Backoff.default; + Exn_bt.raise exn_bt + end + else acquire t (Backoff.once backoff) + + let rec try_acquire t backoff = + let before = Atomic.get t in + Obj.is_int before + && + let count = Obj.obj before in + 0 < count + && + let after = Obj.repr (count - 1) in + Atomic.compare_and_set t before after + || try_acquire t (Backoff.once backoff) + + let get_value t = + let state = Atomic.get t in + if Obj.is_int state then Obj.obj state else 0 + + let[@inline] release t = release t Backoff.default + let[@inline] acquire t = acquire t Backoff.default + let[@inline] try_acquire t = try_acquire t Backoff.default +end + +module Binary = struct + type t = Counting.t + + let make ?padded initial = Counting.make ?padded (Bool.to_int initial) + + let rec release t backoff = + let before = Atomic.get t in + if Obj.is_int before then begin + let count = Obj.obj before in + if count = 0 then + let after = Obj.repr 1 in + if not (Atomic.compare_and_set t before after) then + release t (Backoff.once backoff) + end + else + let after = Q.tail (Obj.obj before) in + if Atomic.compare_and_set t before (Obj.repr after) then + let trigger = Q.head (Obj.obj before) in + Trigger.signal trigger + else release t (Backoff.once backoff) + + let acquire = Counting.acquire + let try_acquire = Counting.try_acquire + let[@inline] release t = release t Backoff.default +end diff --git a/test/test_structured.ml b/test/test_structured.ml index 999a0b2c..b7966530 100644 --- a/test/test_structured.ml +++ b/test/test_structured.ml @@ -122,37 +122,23 @@ let test_block_raises_sys_error () = let test_termination_nests () = Test_scheduler.run ~max_domains:3 @@ fun () -> - let mutex = Mutex.create () in - let condition = Condition.create () in - let blocked = ref false in + let semaphore = Semaphore.Binary.make false in + check Flock.join_after @@ fun () -> begin - check Flock.join_after @@ fun () -> + Flock.fork @@ fun () -> begin - Flock.fork @@ fun () -> + check Flock.join_after @@ fun () -> begin - check Flock.join_after @@ fun () -> - begin - Flock.fork @@ fun () -> - begin - Mutex.protect mutex @@ fun () -> blocked := true - end; - Condition.signal condition; - while true do - Control.sleep ~seconds:1.0 - done - end + Flock.fork @@ fun () -> + Semaphore.Binary.release semaphore; + while true do + Control.sleep ~seconds:1.0 + done end - end; - - begin - Mutex.protect mutex @@ fun () -> - while not !blocked do - Condition.wait condition mutex - done - end; - - Flock.terminate () - end + end + end; + Semaphore.Binary.acquire semaphore; + Flock.terminate () let test_promise_cancelation_does_not_terminate () = Test_scheduler.run ~max_domains:2 @@ fun () -> diff --git a/test/test_sync.ml b/test/test_sync.ml index dc592778..677d672a 100644 --- a/test/test_sync.ml +++ b/test/test_sync.ml @@ -174,6 +174,44 @@ let test_mutex_and_condition_cancelation () = msgs := msg :: !msgs end +let test_semaphore_basics () = + Test_scheduler.run @@ fun () -> + begin + match Semaphore.Counting.make (-1) with + | _ -> assert false + | exception Invalid_argument _ -> () + end; + begin + let s = Semaphore.Counting.make Int.max_int in + match Semaphore.Counting.release s with + | () -> assert false + | exception Sys_error _ -> + Semaphore.Counting.acquire s; + Semaphore.Counting.release s + end + +let is_ocaml4 = String.starts_with ~prefix:"4." Sys.ocaml_version + +let test_semaphore_stress () = + Test_scheduler.run ~max_domains:4 @@ fun () -> + Bundle.join_after @@ fun bundle -> + let s = Semaphore.Counting.make ~padded:true 0 in + let rec loop ~n_acquire ~n_release = + if 0 < n_acquire && 0 < n_release then + if Random.bool () then fork_acquire ~n_acquire ~n_release + else fork_release ~n_acquire ~n_release + else if 0 < n_acquire then fork_acquire ~n_acquire ~n_release + else if 0 < n_release then fork_release ~n_acquire ~n_release + and fork_acquire ~n_acquire ~n_release = + Bundle.fork bundle (fun () -> Semaphore.Counting.acquire s); + loop ~n_acquire:(n_acquire - 1) ~n_release + and fork_release ~n_acquire ~n_release = + Bundle.fork bundle (fun () -> Semaphore.Counting.release s); + loop ~n_acquire ~n_release:(n_release - 1) + in + let n = if is_ocaml4 then 100 else 100_000 in + loop ~n_acquire:n ~n_release:n + let test_lazy_basics () = Test_scheduler.run @@ fun () -> assert (101 = (Lazy.from_fun (fun () -> 101) |> Lazy.force_val)); @@ -257,6 +295,11 @@ let () = Alcotest.test_case "cancelation" `Quick test_mutex_and_condition_cancelation; ] ); + ( "Semaphore", + [ + Alcotest.test_case "basics" `Quick test_semaphore_basics; + Alcotest.test_case "stress" `Quick test_semaphore_stress; + ] ); ( "Lazy", [ Alcotest.test_case "basics" `Quick test_lazy_basics;