Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Stdlib style Semaphore module to Picos_sync #170

Merged
merged 1 commit into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions bench/bench_semaphore.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
open Multicore_bench
open Picos
open Picos_sync
open Picos_structured

let is_ocaml4 = String.starts_with ~prefix:"4." Sys.ocaml_version

(** This will keep a domain running. *)
let yielder computation =
let main _ =
try
while true do
Fiber.yield ()
done
with Exit -> ()
in
Fiber.spawn (Fiber.create ~forbid:false computation) main

let n_workers = 4

let run_one ~budgetf ~use_domains ~n_resources () =
let semaphore = Semaphore.Counting.make ~padded:true n_resources in

let n_domains = if use_domains then n_workers else 1 in
let n_ops =
(if use_domains || is_ocaml4 then 10 else 100) * Util.iter_factor
in

let run_worker () =
let computation = Computation.create () in
if not is_ocaml4 then yielder computation;
for _ = 1 to n_ops do
Semaphore.Counting.acquire semaphore;
Fiber.yield ();
Semaphore.Counting.release semaphore
done;
Computation.cancel computation (Exn_bt.get_callstack 0 Exit)
in

let init _ = () in
let wrap _ () = Scheduler.run in
let work _ () =
if use_domains then run_worker ()
else
Bundle.join_after @@ fun bundle ->
for _ = 1 to n_workers do
Bundle.fork bundle run_worker
done
in
let config =
Printf.sprintf "%d %s%s, %d resource%s" n_workers
(if use_domains then "domain" else "fiber")
(if n_workers = 1 then "" else "s")
n_resources
(if n_resources = 1 then "" else "s")
in
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:(n_ops * n_workers) ~singular:"acquired yield"
~config

let run_suite ~budgetf =
Util.cross [ false; true ] [ 1; 2; 3; 4 ]
|> List.concat_map @@ fun (use_domains, n_resources) ->
if use_domains && Picos_domain.recommended_domain_count () < n_workers then
[]
else run_one ~budgetf ~use_domains ~n_resources ()
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ let benchmarks =
("Picos TLS", Bench_tls.run_suite);
("Picos DLS", Bench_dls.run_suite);
("Picos Mutex", Bench_mutex.run_suite);
("Picos Semaphore", Bench_semaphore.run_suite);
("Picos Spawn", Bench_spawn.run_suite);
("Picos Yield", Bench_yield.run_suite);
("Picos Cancel_after with Picos_select", Bench_cancel_after.run_suite);
Expand Down
26 changes: 22 additions & 4 deletions lib/picos_structured/picos_structured.mli
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,20 @@ end
end
end;

Flock.fork begin fun () ->
let sem =
Semaphore.Binary.make false
in
Semaphore.Binary.acquire sem
end;

Flock.fork begin fun () ->
let sem =
Semaphore.Counting.make 0
in
Semaphore.Counting.acquire sem
end;

Flock.fork begin fun () ->
Event.sync (Event.choose [])
end;
Expand Down Expand Up @@ -441,10 +455,11 @@ end
end
]}

First of all, note that above the {{!Picos_sync.Mutex} [Mutex]} and
{{!Picos_sync.Condition} [Condition]} modules come from the {!Picos_sync}
library and the {{!Picos_stdio.Unix} [Unix]} module comes from the
{!Picos_stdio} library. They do not come from the standard OCaml libraries.
First of all, note that above the {{!Picos_sync.Mutex} [Mutex]},
{{!Picos_sync.Condition} [Condition]}, and {{!Picos_sync.Semaphore}
[Semaphore]} modules come from the {!Picos_sync} library and the
{{!Picos_stdio.Unix} [Unix]} module comes from the {!Picos_stdio} library.
They do not come from the standard OCaml libraries.

The above program creates a {{!Flock} flock} of fibers and {{!Flock.fork}
forks} several fibers to the flock that all block in various ways. In
Expand All @@ -454,6 +469,9 @@ end
- {!Promise.await} never returns as the promise won't be completed,
- {{!Picos_sync.Condition.wait} [Condition.wait]} never returns, because the
condition is never signaled,
- {{!Picos_sync.Semaphore.Binary.acquire} [Semaphore.Binary.acquire]} and
{{!Picos_sync.Semaphore.Counting.acquire} [Semaphore.Counting.acquire]}
never return, because the counts of the semaphores never change from [0],
- {{!Picos_sync.Event.sync} [Event.sync]} never returns, because the event
can never be committed to,
- {{!Picos_sync.Latch.await} [Latch.await]} never returns, because the count
Expand Down
92 changes: 28 additions & 64 deletions lib/picos_sync/condition.ml
Original file line number Diff line number Diff line change
@@ -1,87 +1,49 @@
open Picos

type state = Empty | Queue of { head : Trigger.t list; tail : Trigger.t list }
type t = state Atomic.t
type t = Trigger.t Q.t Atomic.t

let create ?padded () = Multicore_magic.copy_as ?padded @@ Atomic.make Empty
let create ?padded () =
Multicore_magic.copy_as ?padded @@ Atomic.make (Q.T Zero)

let broadcast t =
if Atomic.get t != Empty then
match Atomic.exchange t Empty with
| Empty -> ()
| Queue r ->
List.iter Trigger.signal r.head;
List.iter Trigger.signal (List.rev r.tail)
let broadcast (t : t) =
if Atomic.get t != T Zero then
match Atomic.exchange t (T Zero) with
| T Zero -> ()
| T (One _ as q) -> Q.iter q Trigger.signal

(* We try to avoid starvation of signal by making it so that when, at the start
of signal or wait, the head is empty, the tail is reversed into the head.
This way both signal and wait attempt O(1) and O(n) operations at the same
time. *)

let rec signal t backoff =
let rec signal (t : t) backoff =
match Atomic.get t with
| Empty -> ()
| Queue r as before -> begin
match r.head with
| trigger :: head ->
signal_cas t backoff before
(if head == [] && r.tail == [] then Empty else Queue { r with head })
trigger
| [] -> begin
match List.rev r.tail with
| trigger :: head ->
signal_cas t backoff before
(if head == [] then Empty else Queue { head; tail = [] })
trigger
| [] -> failwith "impossible"
end
end
| T Zero -> ()
| T (One _ as q) as before ->
let after = Q.tail q in
if Atomic.compare_and_set t before after then
let trigger = Q.head q in
Trigger.signal trigger
else signal t (Backoff.once backoff)

and signal_cas t backoff before after trigger =
if Atomic.compare_and_set t before after then Trigger.signal trigger
else signal t (Backoff.once backoff)

let signal t = signal t Backoff.default

let rec cleanup backoff trigger t =
let rec cleanup backoff trigger (t : t) =
(* We have been canceled. If we can't drop our trigger from the variable, we
signal the next trigger in queue to make sure each signal wakes up at least
one non-canceled waiter if possible. *)
match Atomic.get t with
| Empty -> ()
| Queue r as before -> begin
if r.head != [] then
match List_ext.drop_first_or_not_found trigger r.head with
| head ->
cleanup_cas backoff trigger t before
(if head == [] && r.tail == [] then Empty
else Queue { r with head })
| exception Not_found -> begin
match List_ext.drop_first_or_not_found trigger r.tail with
| tail ->
cleanup_cas backoff trigger t before (Queue { r with tail })
| exception Not_found -> signal t
end
else
match List_ext.drop_first_or_not_found trigger r.tail with
| tail ->
cleanup_cas backoff trigger t before
(if tail == [] then Empty else Queue { head = []; tail })
| exception Not_found -> signal t
end
| T Zero -> ()
| T (One _ as q) as before ->
let after = Q.remove q trigger in
if before == after then signal t Backoff.default
else if not (Atomic.compare_and_set t before after) then
cleanup (Backoff.once backoff) trigger t

and cleanup_cas backoff trigger t before after =
if not (Atomic.compare_and_set t before after) then
cleanup (Backoff.once backoff) trigger t

let rec wait t mutex trigger fiber backoff =
let rec wait (t : t) mutex trigger fiber backoff =
let before = Atomic.get t in
let after =
match before with
| Empty -> Queue { head = [ trigger ]; tail = [] }
| Queue r ->
if r.head != [] then Queue { r with tail = trigger :: r.tail }
else Queue { head = List.rev_append r.tail [ trigger ]; tail = [] }
| T Zero -> Q.singleton trigger
| T (One _ as q) -> Q.snoc q trigger
in
if Atomic.compare_and_set t before after then begin
Mutex.unlock_as (Fiber.Maybe.of_fiber fiber) mutex Backoff.default;
Expand All @@ -101,3 +63,5 @@ let wait t mutex =
let fiber = Fiber.current () in
let trigger = Trigger.create () in
wait t mutex trigger fiber Backoff.default

let[@inline] signal t = signal t Backoff.default
74 changes: 27 additions & 47 deletions lib/picos_sync/mutex.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,45 +10,28 @@ type _ tdt =

type state =
| Unlocked
| Locked of {
fiber : Fiber.Maybe.t;
head : [ `Entry ] tdt list;
tail : [ `Entry ] tdt list;
}
| Locked of { fiber : Fiber.Maybe.t; waiters : [ `Entry ] tdt Q.t }

type t = state Atomic.t

let create ?padded () = Multicore_magic.copy_as ?padded @@ Atomic.make Unlocked

(* We try to avoid starvation of unlock by making it so that when, at the start
of lock or unlock, the head is empty, the tail is reversed into the head.
This way both lock and unlock attempt O(1) and O(n) operations at the same
time. *)

let locked_nothing =
Locked { fiber = Fiber.Maybe.nothing; head = []; tail = [] }
let locked_nothing = Locked { fiber = Fiber.Maybe.nothing; waiters = T Zero }

let rec unlock_as owner t backoff =
match Atomic.get t with
| Unlocked -> unlocked ()
| Locked r as before ->
if Fiber.Maybe.equal r.fiber owner then
match r.head with
| Entry { trigger; fiber } :: rest ->
let after = Locked { r with fiber; head = rest } in
match r.waiters with
| T Zero ->
if not (Atomic.compare_and_set t before Unlocked) then
unlock_as owner t (Backoff.once backoff)
| T (One _ as q) ->
let (Entry { trigger; fiber }) = Q.head q in
let waiters = Q.tail q in
let after = Locked { fiber; waiters } in
if Atomic.compare_and_set t before after then Trigger.signal trigger
else unlock_as owner t (Backoff.once backoff)
| [] -> begin
match List.rev r.tail with
| Entry { trigger; fiber } :: rest ->
let after = Locked { fiber; head = rest; tail = [] } in
if Atomic.compare_and_set t before after then
Trigger.signal trigger
else unlock_as owner t (Backoff.once backoff)
| [] ->
if not (Atomic.compare_and_set t before Unlocked) then
unlock_as owner t (Backoff.once backoff)
end
else not_owner ()

let[@inline] unlock ?checked t =
Expand All @@ -60,28 +43,24 @@ let rec cleanup_as (Entry entry_r as entry : [ `Entry ] tdt) t backoff =
Otherwise we must remove our entry from the queue. *)
match Atomic.get t with
| Locked r as before -> begin
match List_ext.drop_first_or_not_found entry r.head with
| head ->
let after = Locked { r with head } in
if not (Atomic.compare_and_set t before after) then
cleanup_as entry t (Backoff.once backoff)
| exception Not_found -> begin
match List_ext.drop_first_or_not_found entry r.tail with
| tail ->
let after = Locked { r with tail } in
if not (Atomic.compare_and_set t before after) then
cleanup_as entry t (Backoff.once backoff)
| exception Not_found -> unlock_as entry_r.fiber t Backoff.default
end
match r.waiters with
| T Zero -> unlock_as entry_r.fiber t backoff
| T (One _ as q) ->
let waiters = Q.remove q entry in
if r.waiters == waiters then unlock_as entry_r.fiber t backoff
else
let after = Locked { fiber = r.fiber; waiters } in
if not (Atomic.compare_and_set t before after) then
cleanup_as entry t (Backoff.once backoff)
end
| Unlocked -> unlocked () (* impossible *)
| Unlocked -> unlocked ()

let rec lock_as fiber t entry backoff =
match Atomic.get t with
| Unlocked as before ->
let after =
if fiber == Fiber.Maybe.nothing then locked_nothing
else Locked { fiber; head = []; tail = [] }
else Locked { fiber; waiters = T Zero }
in
if not (Atomic.compare_and_set t before after) then
lock_as fiber t entry (Backoff.once backoff)
Expand All @@ -94,11 +73,12 @@ let rec lock_as fiber t entry backoff =
Entry { trigger; fiber }
| Entry _ as entry -> entry
in
let after =
if r.head == [] then
Locked { r with head = List.rev_append r.tail [ entry ]; tail = [] }
else Locked { r with tail = entry :: r.tail }
let waiters =
match r.waiters with
| T Zero -> Q.singleton entry
| T (One _ as q) -> Q.snoc q entry
in
let after = Locked { fiber = r.fiber; waiters } in
if Atomic.compare_and_set t before after then begin
match Trigger.await entry_r.trigger with
| None -> ()
Expand All @@ -118,7 +98,7 @@ let try_lock ?checked t =
Atomic.get t == Unlocked
&& Atomic.compare_and_set t Unlocked
(if fiber == Fiber.Maybe.nothing then locked_nothing
else Locked { fiber; head = []; tail = [] })
else Locked { fiber; waiters = T Zero })

let protect ?checked t body =
let fiber = Fiber.Maybe.current_and_check_if checked in
Expand Down
1 change: 1 addition & 0 deletions lib/picos_sync/picos_sync.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module Mutex = Mutex
module Condition = Condition
module Semaphore = Semaphore
module Lazy = Lazy
module Event = Event
module Latch = Latch
Expand Down
Loading
Loading