From 9dc05dc44b060e5562f5568220583b11fe0ae0e4 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Wed, 27 Nov 2024 02:17:12 +0200 Subject: [PATCH] Add more structured `Run` operations --- .ocamlformat | 2 +- bench/bench_run.ocaml4.ml | 1 + bench/bench_run.ocaml5.ml | 28 + bench/dune | 6 + bench/main.ml | 1 + lib/picos_std.structured/bundle.ml | 65 +- lib/picos_std.structured/control.ml | 16 +- lib/picos_std.structured/dune | 12 + lib/picos_std.structured/flock.ml | 4 +- lib/picos_std.structured/for.ocaml4.ml | 57 ++ lib/picos_std.structured/for.ocaml5.ml | 82 +++ .../picos_std_structured.mli | 584 ++++++++++-------- lib/picos_std.structured/run.ml | 51 +- test/dune | 3 + test/test_structured.ml | 57 +- 15 files changed, 661 insertions(+), 308 deletions(-) create mode 100644 bench/bench_run.ocaml4.ml create mode 100644 bench/bench_run.ocaml5.ml create mode 100644 lib/picos_std.structured/for.ocaml4.ml create mode 100644 lib/picos_std.structured/for.ocaml5.ml diff --git a/.ocamlformat b/.ocamlformat index 79ff36537..ffb06fc4e 100644 --- a/.ocamlformat +++ b/.ocamlformat @@ -1,4 +1,4 @@ profile = default -version = 0.26.2 +version = 0.27.0 exp-grouping=preserve diff --git a/bench/bench_run.ocaml4.ml b/bench/bench_run.ocaml4.ml new file mode 100644 index 000000000..ce98597cb --- /dev/null +++ b/bench/bench_run.ocaml4.ml @@ -0,0 +1 @@ +let run_suite ~budgetf:_ = [] diff --git a/bench/bench_run.ocaml5.ml b/bench/bench_run.ocaml5.ml new file mode 100644 index 000000000..3eedfe66f --- /dev/null +++ b/bench/bench_run.ocaml5.ml @@ -0,0 +1,28 @@ +open Multicore_bench +open Picos_std_structured +module Multififo = Picos_mux_multififo + +let run_one_multififo ~budgetf ~n_domains ~n () = + let context = ref (Obj.magic ()) in + + let before _ = context := Multififo.context () in + let init _ = !context in + let work i context = + if i <> 0 then Multififo.runner_on_this_thread context + else ignore @@ Multififo.run ~context @@ fun () -> Run.for_n n ignore + in + + let config = + Printf.sprintf "%d mfifo%s, run_n %d" n_domains + (if n_domains = 1 then "" else "s") + n + in + Times.record ~budgetf ~n_domains ~before ~init ~work () + |> Times.to_thruput_metrics ~n ~singular:"ignore" ~config + +let run_suite ~budgetf = + Util.cross [ 1; 2; 4; 8 ] + [ 100; 1_000; 10_000; 100_000; 1_000_000; 10_000_000 ] + |> List.concat_map @@ fun (n_domains, n) -> + if Picos_domain.recommended_domain_count () < n_domains then [] + else run_one_multififo ~budgetf ~n_domains ~n () diff --git a/bench/dune b/bench/dune index 3d27fe7ee..269e910d3 100644 --- a/bench/dune +++ b/bench/dune @@ -23,6 +23,7 @@ (run %{test} -brief "Picos binaries") (run %{test} -brief "Bounded_q with Picos_sync") (run %{test} -brief "Memory usage") + (run %{test} -brief "Picos_std_structured.Run") ;; )) (foreign_stubs @@ -49,6 +50,11 @@ from (picos_mux.fifo -> scheduler.ocaml5.ml) (picos_mux.thread -> scheduler.ocaml4.ml)) + (select + bench_run.ml + from + (picos_mux.multififo -> bench_run.ocaml5.ml) + (-> bench_run.ocaml4.ml)) (select bench_fib.ml from diff --git a/bench/main.ml b/bench/main.ml index 423c3774a..2fb898e61 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -22,6 +22,7 @@ let benchmarks = ("Picos binaries", Bench_binaries.run_suite); ("Bounded_q with Picos_sync", Bench_bounded_q.run_suite); ("Memory usage", Bench_memory.run_suite); + ("Picos_std_structured.Run", Bench_run.run_suite); ] let () = Multicore_bench.Cmd.run ~benchmarks () diff --git a/lib/picos_std.structured/bundle.ml b/lib/picos_std.structured/bundle.ml index b1b3593c4..50c97e524 100644 --- a/lib/picos_std.structured/bundle.ml +++ b/lib/picos_std.structured/bundle.ml @@ -12,10 +12,11 @@ type _ tdt = } -> [> `Bundle ] tdt -let config_terminated_bit = 0x01 -and config_callstack_mask = 0x3E -and config_callstack_shift = 1 -and config_one = 0x40 (* memory runs out before overflow *) +let config_on_return_terminate_bit = 0x01 +and config_on_terminate_raise_bit = 0x02 +and config_callstack_mask = 0x6C +and config_callstack_shift = 2 +and config_one = 0x80 (* memory runs out before overflow *) let flock_key : [ `Bundle | `Nothing ] tdt Fiber.FLS.t = Fiber.FLS.create () @@ -35,12 +36,12 @@ let error ?callstack (Bundle r as t : t) exn bt = terminate ?callstack t; Control.Errors.push r.errors exn bt end + else if Atomic.get r.config land config_on_terminate_raise_bit <> 0 then + terminate ?callstack t let decr (Bundle r : t) = let n = Atomic.fetch_and_add r.config (-config_one) in if n < config_one * 2 then begin - let (Packed bundle) = r.bundle in - Computation.cancel bundle Control.Terminate Control.empty_bt; Trigger.signal r.finished end @@ -48,6 +49,10 @@ type _ pass = FLS : unit pass | Arg : t pass let[@inline never] no_flock () = invalid_arg "no flock" +let[@inline] on_terminate = function + | None | Some `Ignore -> `Ignore + | Some `Raise -> `Raise + let get_flock fiber = match Fiber.FLS.get fiber flock_key ~default:Nothing with | Bundle _ as t -> t @@ -56,14 +61,24 @@ let get_flock fiber = let await (Bundle r as t : t) fiber packed canceler outer = decr t; Fiber.set_computation fiber packed; + if Fiber.FLS.get fiber flock_key ~default:Nothing != outer then + Fiber.FLS.set fiber flock_key outer; let forbid = Fiber.exchange fiber ~forbid:true in Trigger.await r.finished |> ignore; Fiber.set fiber ~forbid; - if Fiber.FLS.get fiber flock_key ~default:Nothing != outer then - Fiber.FLS.set fiber flock_key outer; let (Packed parent) = packed in Computation.detach parent canceler; Control.Errors.check r.errors; + begin + let (Packed bundle) = r.bundle in + match Computation.peek_exn bundle with + | _ -> () + | exception Computation.Running -> + Computation.cancel bundle Control.Terminate Control.empty_bt + | exception Control.Terminate + when Atomic.get r.config land config_on_terminate_raise_bit = 0 -> + () + end; Fiber.check fiber let[@inline never] raised exn t fiber packed canceler outer = @@ -75,7 +90,7 @@ let[@inline never] raised exn t fiber packed canceler outer = let[@inline never] returned value (Bundle r as t : t) fiber packed canceler outer = let config = Atomic.get r.config in - if config land config_terminated_bit <> 0 then begin + if config land config_on_return_terminate_bit <> 0 then begin let callstack = let n = (config land config_callstack_mask) lsr config_callstack_shift in if n = 0 then None else Some n @@ -90,25 +105,31 @@ let join_after_realloc x fn t fiber packed canceler outer = | value -> returned value t fiber packed canceler outer | exception exn -> raised exn t fiber packed canceler outer -let join_after_pass (type a) ?callstack ?on_return (fn : a -> _) (pass : a pass) - = +let join_after_pass (type a) ?callstack ?on_return ?on_terminate (fn : a -> _) + (pass : a pass) = (* The sequence of operations below ensures that nothing is leaked. *) let (Bundle r as t : t) = - let terminated = + let config = match on_return with - | None | Some `Wait -> 0 - | Some `Terminate -> config_terminated_bit + | None | Some `Wait -> config_one + | Some `Terminate -> config_one lor config_on_return_terminate_bit in - let callstack = + let config = + match on_terminate with + | None | Some `Ignore -> config + | Some `Raise -> config lor config_on_terminate_raise_bit + in + let config = match callstack with - | None -> 0 + | None -> config | Some n -> - if n <= 0 then 0 + if n <= 0 then config else - Int.min n (config_callstack_mask lsr config_callstack_shift) - lsl config_callstack_shift + config + lor Int.min n (config_callstack_mask lsr config_callstack_shift) + lsl config_callstack_shift in - let config = Atomic.make (config_one lor callstack lor terminated) in + let config = Atomic.make config in let bundle = Computation.Packed (Computation.create ~mode:`LIFO ()) in let errors = Control.Errors.create () in let finished = Trigger.create () in @@ -208,8 +229,8 @@ let fork_pass (type a) (Bundle r as t : t) thunk (pass : a pass) = let is_running (Bundle { bundle = Packed bundle; _ } : t) = Computation.is_running bundle -let join_after ?callstack ?on_return fn = - join_after_pass ?callstack ?on_return fn Arg +let join_after ?callstack ?on_return ?on_terminate fn = + join_after_pass ?callstack ?on_return ?on_terminate fn Arg let fork t thunk = fork_pass t thunk Arg let fork_as_promise t thunk = fork_as_promise_pass t thunk Arg diff --git a/lib/picos_std.structured/control.ml b/lib/picos_std.structured/control.ml index 5a0296206..e70d71f79 100644 --- a/lib/picos_std.structured/control.ml +++ b/lib/picos_std.structured/control.ml @@ -41,13 +41,15 @@ module Errors = struct | [ (exn, bt) ] -> Printexc.raise_with_backtrace exn bt | exn_bts -> check exn_bts [] - let rec push t exn bt backoff = - let before = Atomic.get t in - let after = (exn, bt) :: before in - if not (Atomic.compare_and_set t before after) then - push t exn bt (Backoff.once backoff) - - let push t exn bt = push t exn bt Backoff.default + let push t exn bt = + let backoff = ref Backoff.default in + while + let before = Atomic.get t in + let after = (exn, bt) :: before in + not (Atomic.compare_and_set t before after) + do + backoff := Backoff.once !backoff + done end let raise_if_canceled () = Fiber.check (Fiber.current ()) diff --git a/lib/picos_std.structured/dune b/lib/picos_std.structured/dune index fc6e5764d..df0659f59 100644 --- a/lib/picos_std.structured/dune +++ b/lib/picos_std.structured/dune @@ -1,3 +1,15 @@ +(rule + (enabled_if + (<= 5.0.0 %{ocaml_version})) + (action + (copy for.ocaml5.ml for.ml))) + +(rule + (enabled_if + (< %{ocaml_version} 5.0.0)) + (action + (copy for.ocaml4.ml for.ml))) + (library (name picos_std_structured) (public_name picos_std.structured) diff --git a/lib/picos_std.structured/flock.ml b/lib/picos_std.structured/flock.ml index b097f33be..82e78be82 100644 --- a/lib/picos_std.structured/flock.ml +++ b/lib/picos_std.structured/flock.ml @@ -10,5 +10,5 @@ let error ?callstack exn_bt = Bundle.error (get ()) ?callstack exn_bt let fork_as_promise thunk = Bundle.fork_as_promise_pass (get ()) thunk FLS let fork action = Bundle.fork_pass (get ()) action FLS -let join_after ?callstack ?on_return fn = - Bundle.join_after_pass ?callstack ?on_return fn Bundle.FLS +let join_after ?callstack ?on_return ?on_terminate fn = + Bundle.join_after_pass ?callstack ?on_return ?on_terminate fn Bundle.FLS diff --git a/lib/picos_std.structured/for.ocaml4.ml b/lib/picos_std.structured/for.ocaml4.ml new file mode 100644 index 000000000..25d018ade --- /dev/null +++ b/lib/picos_std.structured/for.ocaml4.ml @@ -0,0 +1,57 @@ +type _ tdt = + | Empty : [> `Empty ] tdt + | Range : { + mutable lo : int; + hi : int; + parent : [ `Empty | `Range ] tdt; + } + -> [> `Range ] tdt + +let[@poll error] cas_lo (Range r : [ `Range ] tdt) before after = + r.lo == before + && begin + r.lo <- after; + true + end + +let rec for_out t (Range r as range : [ `Range ] tdt) action = + let lo_before = r.lo in + let n = r.hi - lo_before in + if 0 < n then begin + if Bundle.is_running t then begin + let lo_after = lo_before + 1 in + if cas_lo range lo_before lo_after then begin + try action lo_before + with exn -> Bundle.error t exn (Printexc.get_raw_backtrace ()) + end; + for_out t range action + end + end + else + match r.parent with + | Empty -> () + | Range _ as range -> for_out t range action + +let rec for_in t (Range r as range : [ `Range ] tdt) action = + let lo_before = r.lo in + let n = r.hi - lo_before in + if n <= 1 then for_out t range action + else + let lo_after = lo_before + (n asr 1) in + if cas_lo range lo_before lo_after then begin + Bundle.fork t (fun () -> for_in t range action); + let child = Range { lo = lo_before; hi = lo_after; parent = range } in + for_in t child action + end + else for_in t range action + +let for_n ?on_terminate n action = + if 0 < n then + if n = 1 then + try action 0 + with + | Control.Terminate when Bundle.on_terminate on_terminate == `Ignore -> + () + else + let range = Range { lo = 0; hi = n; parent = Empty } in + Bundle.join_after ?on_terminate @@ fun t -> for_in t range action diff --git a/lib/picos_std.structured/for.ocaml5.ml b/lib/picos_std.structured/for.ocaml5.ml new file mode 100644 index 000000000..97465b9ae --- /dev/null +++ b/lib/picos_std.structured/for.ocaml5.ml @@ -0,0 +1,82 @@ +open Picos + +type per_fiber = { mutable lo : int; mutable hi : int } + +type _ tdt = + | Empty : [> `Empty ] tdt + | Range : { + mutable _lo : int; + hi : int; + parent : [ `Empty | `Range ] tdt; + } + -> [> `Range ] tdt + +external lo_as_atomic : [ `Range ] tdt -> int Atomic.t = "%identity" + +let rec for_out t (Range r as range : [ `Range ] tdt) per_fiber action = + let lo_before = Atomic.get (lo_as_atomic range) in + let n = r.hi - lo_before in + if 0 < n then begin + let lo_after = lo_before + 1 + (n asr 1) in + if Atomic.compare_and_set (lo_as_atomic range) lo_before lo_after then begin + per_fiber.lo <- lo_before; + per_fiber.hi <- lo_after; + while Bundle.is_running t && per_fiber.lo < per_fiber.hi do + try + while per_fiber.lo < per_fiber.hi do + let i = per_fiber.lo in + per_fiber.lo <- i + 1; + action i + done + with exn -> Bundle.error t exn (Printexc.get_raw_backtrace ()) + done + end; + for_out t range per_fiber action + end + else + match r.parent with + | Empty -> () + | Range _ as range -> for_out t range per_fiber action + +let rec for_in t (Range r as range : [ `Range ] tdt) per_fiber action = + let lo_before = Atomic.get (lo_as_atomic range) in + let n = r.hi - lo_before in + if n <= 1 then for_out t range per_fiber action + else + let lo_after = lo_before + (n asr 1) in + if Atomic.compare_and_set (lo_as_atomic range) lo_before lo_after then begin + Bundle.fork t (fun () -> for_in_enter t range action); + let child = Range { _lo = lo_before; hi = lo_after; parent = range } in + for_in t child per_fiber action + end + else for_in t range per_fiber action + +and for_in_enter bundle range action = + let per_fiber = { lo = 0; hi = 0 } in + let effc (type a) : + a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option = function + | Fiber.Spawn _ | Fiber.Current | Computation.Cancel_after _ -> None + | _ -> + (* Might be blocking, so fork any remaining work to another fiber. *) + if per_fiber.lo < per_fiber.hi then begin + let range = + Range { _lo = per_fiber.lo; hi = per_fiber.hi; parent = Empty } + in + per_fiber.lo <- per_fiber.hi; + Bundle.fork bundle (fun () -> for_in_enter bundle range action) + end; + None + in + let handler = Effect.Deep.{ effc } in + Effect.Deep.try_with (for_in bundle range per_fiber) action handler + +let for_n ?on_terminate n action = + if 0 < n then + if n = 1 then + try action 0 + with + | Control.Terminate when Bundle.on_terminate on_terminate == `Ignore -> + () + else + let range = Range { _lo = 0; hi = n; parent = Empty } in + Bundle.join_after ?on_terminate @@ fun t -> for_in_enter t range action diff --git a/lib/picos_std.structured/picos_std_structured.mli b/lib/picos_std.structured/picos_std_structured.mli index 78e3e1da8..232f739a7 100644 --- a/lib/picos_std.structured/picos_std_structured.mli +++ b/lib/picos_std.structured/picos_std_structured.mli @@ -25,7 +25,7 @@ module Control : sig that they should terminate by letting the exception propagate. ℹ️ Within {{!Picos_std_structured} this library}, the [Terminate] exception - does not, by itself, indicate an error. Raising it inside a fiber forked + does not, by itself, indicate an error. Raising it inside a fiber forked within the structured concurrency constructs of this library simply causes the relevant part of the tree of fibers to be terminated. @@ -60,13 +60,14 @@ module Control : sig (** [protect thunk] forbids propagation of cancelation for the duration of [thunk ()]. - ℹ️ {{!Picos_std_sync} Many operations are cancelable}. In particular, + ℹ️ {{!Picos_std_sync} Many operations are cancelable}. In particular, anything that might suspend the current fiber to await for something - should typically be cancelable. Operations that release resources may - sometimes also be cancelable and {{!Picos_std_finally.finally} calls of - such operations should typically be protected} to ensure that resources - will be properly released. Forbidding propagation of cancelation may also - be required when a sequence of cancelable operations must be performed. + should typically be cancelable. Operations that release resources may + sometimes also be cancelable and + {{!Picos_std_finally.finally} calls of such operations should typically be + protected} to ensure that resources will be properly released. Forbidding + propagation of cancelation may also be required when a sequence of + cancelable operations must be performed. ℹ️ With the constructs provided by {{!Picos_std_structured} this library} it is not possible to prevent a fiber from being canceled, but it is @@ -77,27 +78,27 @@ module Control : sig (** [block ()] suspends the current fiber until it is canceled at which point the cancelation exception will be raised. - @raise Invalid_argument in case propagation of cancelation has been - {{!protect} forbidden}. + @raise Invalid_argument + in case propagation of cancelation has been {{!protect} forbidden}. - @raise Sys_error in case the underlying computation of the fiber is forced - to return during [block]. This is only possible when the fiber has been - spawned through another library. *) + @raise Sys_error + in case the underlying computation of the fiber is forced to return + during [block]. This is only possible when the fiber has been spawned + through another library. *) val terminate_after : ?callstack:int -> seconds:float -> (unit -> 'a) -> 'a (** [terminate_after ~seconds thunk] arranges to terminate the execution of [thunk] on the current fiber after the specified timeout in [seconds]. Using [terminate_after] one can attempt any blocking operation that - supports cancelation with a timeout. For example, one could try to + supports cancelation with a timeout. For example, one could try to {{!Picos_std_sync.Ivar.read} [read]} an {{!Picos_std_sync.Ivar} [Ivar]} with a timeout {[ let peek_in ~seconds ivar = match - Control.terminate_after ~seconds @@ fun () -> - Ivar.read ivar + Control.terminate_after ~seconds @@ fun () -> Ivar.read ivar with | value -> Some value | exception Control.Terminate -> None @@ -110,7 +111,7 @@ module Control : sig let try_connect_in ~seconds socket sockaddr = match Control.terminate_after ~seconds @@ fun () -> - Unix.connect socket sockaddr + Unix.connect socket sockaddr with | () -> true | exception Control.Terminate -> false @@ -141,9 +142,9 @@ module Promise : sig ⚠️ {{!try_terminate} Canceling} a promise does not immediately terminate the fiber or wait for the fiber working to complete the promise to - terminate. Constructs like {!Bundle.join_after} and {!Flock.join_after} + terminate. Constructs like {!Bundle.join_after} and {!Flock.join_after} only guarantee that all fibers forked within their scope have terminated - before they return or raise. The reason for this design choice in this + before they return or raise. The reason for this design choice in this library is that synchronization is expensive and delaying synchronization to the join operation is typically sufficient and amortizes the cost. *) @@ -155,8 +156,8 @@ module Promise : sig given [value]. ℹ️ Promises can also be created in the scope of a - {{!Bundle.fork_as_promise} [Bundle]} or a {{!Flock.fork_as_promise} - [Flock]}. *) + {{!Bundle.fork_as_promise} [Bundle]} or a + {{!Flock.fork_as_promise} [Flock]}. *) val await : 'a t -> 'a (** [await promise] awaits until the promise has completed and either returns @@ -204,35 +205,40 @@ module Bundle : sig (** An explicit dynamic bundle of fibers guaranteed to be joined at the end. Bundles allow you to conveniently structure or delimit concurrency into - nested scopes. After a bundle returns or raises an exception, no fibers + nested scopes. After a bundle returns or raises an exception, no fibers {{!fork} forked} to the bundle remain. An unhandled exception, or error, within any fiber of the bundle causes all of the fibers {{!fork} forked} to the bundle to be canceled and the - bundle to raise the error exception or {{!Control.Errors} error - exceptions} raised by all of the fibers forked into the bundle. *) + bundle to raise the error exception or + {{!Control.Errors} error exceptions} raised by all of the fibers forked + into the bundle. *) type t (** Represents a bundle of fibers. *) val join_after : - ?callstack:int -> ?on_return:[ `Terminate | `Wait ] -> (t -> 'a) -> 'a - (** [join_after scope] calls [scope] with a {{!t} bundle}. A call of + ?callstack:int -> + ?on_return:[ `Terminate | `Wait ] -> + ?on_terminate:[ `Raise | `Ignore ] -> + (t -> 'a) -> + 'a + (** [join_after scope] calls [scope] with a {{!t} bundle}. A call of [join_after] returns or raises only after [scope] has returned or raised - and all {{!fork} forked} fibers have terminated. If [scope] raises an + and all {{!fork} forked} fibers have terminated. If [scope] raises an exception, {!error} will be called. The optional [on_return] argument specifies what to do when the scope - returns normally. It defaults to [`Wait], which means to just wait for - all the fibers to terminate on their own. When explicitly specified as + returns normally. It defaults to [`Wait], which means to just wait for all + the fibers to terminate on their own. When explicitly specified as [~on_return:`Terminate], then {{!terminate} [terminate ?callstack]} will - be called on return. This can be convenient, for example, when dealing - with {{:https://en.wikipedia.org/wiki/Daemon_(computing)} daemon} - fibers. *) + be called on return. This can be convenient, for example, when dealing + with {{:https://en.wikipedia.org/wiki/Daemon_(computing)} daemon} fibers. + *) val terminate : ?callstack:int -> t -> unit (** [terminate bundle] cancels all of the {{!fork} forked} fibers using the - {{!Control.Terminate} [Terminate]} exception. After [terminate] has been + {{!Control.Terminate} [Terminate]} exception. After [terminate] has been called, no new fibers can be forked to the bundle. The optional [callstack] argument specifies the number of callstack @@ -242,10 +248,10 @@ module Bundle : sig ℹ️ Calling [terminate] at the end of a bundle can be a convenient way to cancel any background fibers started by the bundle. - ℹ️ Calling [terminate] does not raise the {{!Control.Terminate} - [Terminate]} exception, but blocking operations after [terminate] will - raise the exception to propagate cancelation unless {{!Control.protect} - propagation of cancelation is forbidden}. *) + ℹ️ Calling [terminate] does not raise the + {{!Control.Terminate} [Terminate]} exception, but blocking operations + after [terminate] will raise the exception to propagate cancelation unless + {{!Control.protect} propagation of cancelation is forbidden}. *) val terminate_after : ?callstack:int -> t -> seconds:float -> unit (** [terminate_after ~seconds bundle] arranges to {!terminate} the bundle @@ -261,8 +267,8 @@ module Bundle : sig val fork_as_promise : t -> (unit -> 'a) -> 'a Promise.t (** [fork_as_promise bundle thunk] spawns a new fiber to the [bundle] that - will run the given [thunk]. The result of the [thunk] will be written to - the {{!Promise} promise}. If the [thunk] raises an exception, {!error} + will run the given [thunk]. The result of the [thunk] will be written to + the {{!Promise} promise}. If the [thunk] raises an exception, {!error} will be called with that exception. *) val fork : t -> (unit -> unit) -> unit @@ -274,7 +280,7 @@ module Flock : sig (** An implicit dynamic flock of fibers guaranteed to be joined at the end. Flocks allow you to conveniently structure or delimit concurrency into - nested scopes. After a flock returns or raises an exception, no fibers + nested scopes. After a flock returns or raises an exception, no fibers {{!fork} forked} to the flock remain. An unhandled exception, or error, within any fiber of the flock causes all @@ -287,30 +293,34 @@ module Flock : sig ⚠️ All of the operations in this module, except {!join_after}, raise the {!Invalid_argument} exception in case they are called from outside of the - dynamic multifiber scope of a flock established by calling - {!join_after}. *) + dynamic multifiber scope of a flock established by calling {!join_after}. + *) val join_after : - ?callstack:int -> ?on_return:[ `Terminate | `Wait ] -> (unit -> 'a) -> 'a + ?callstack:int -> + ?on_return:[ `Terminate | `Wait ] -> + ?on_terminate:[ `Raise | `Ignore ] -> + (unit -> 'a) -> + 'a (** [join_after scope] creates a new flock for fibers, calls [scope] after setting current flock to the new flock, and restores the previous flock, - if any after [scope] exits. The flock will be implicitly propagated to - all fibers {{!fork} forked} into the flock. A call of [join_after] - returns or raises only after [scope] has returned or raised and all - {{!fork} forked} fibers have terminated. If [scope] raises an exception, - {!error} will be called. + if any after [scope] exits. The flock will be implicitly propagated to all + fibers {{!fork} forked} into the flock. A call of [join_after] returns or + raises only after [scope] has returned or raised and all {{!fork} forked} + fibers have terminated. If [scope] raises an exception, {!error} will be + called. The optional [on_return] argument specifies what to do when the scope - returns normally. It defaults to [`Wait], which means to just wait for - all the fibers to terminate on their own. When explicitly specified as + returns normally. It defaults to [`Wait], which means to just wait for all + the fibers to terminate on their own. When explicitly specified as [~on_return:`Terminate], then {{!terminate} [terminate ?callstack]} will - be called on return. This can be convenient, for example, when dealing - with {{:https://en.wikipedia.org/wiki/Daemon_(computing)} daemon} - fibers. *) + be called on return. This can be convenient, for example, when dealing + with {{:https://en.wikipedia.org/wiki/Daemon_(computing)} daemon} fibers. + *) val terminate : ?callstack:int -> unit -> unit (** [terminate ()] cancels all of the {{!fork} forked} fibers using the - {{!Control.Terminate} [Terminate]} exception. After [terminate] has been + {{!Control.Terminate} [Terminate]} exception. After [terminate] has been called, no new fibers can be forked to the current flock. The optional [callstack] argument specifies the number of callstack @@ -320,10 +330,10 @@ module Flock : sig ℹ️ Calling [terminate] at the end of a flock can be a convenient way to cancel any background fibers started by the flock. - ℹ️ Calling [terminate] does not raise the {{!Control.Terminate} - [Terminate]} exception, but blocking operations after [terminate] will - raise the exception to propagate cancelation unless {{!Control.protect} - propagation of cancelation is forbidden}. *) + ℹ️ Calling [terminate] does not raise the + {{!Control.Terminate} [Terminate]} exception, but blocking operations + after [terminate] will raise the exception to propagate cancelation unless + {{!Control.protect} propagation of cancelation is forbidden}. *) val terminate_after : ?callstack:int -> seconds:float -> unit -> unit (** [terminate_after ~seconds ()] arranges to {!terminate} the current flock @@ -339,9 +349,9 @@ module Flock : sig val fork_as_promise : (unit -> 'a) -> 'a Promise.t (** [fork_as_promise thunk] spawns a new fiber to the current flock that will - run the given [thunk]. The result of the [thunk] will be written to the - {{!Promise} promise}. If the [thunk] raises an exception, {!error} will - be called with that exception. *) + run the given [thunk]. The result of the [thunk] will be written to the + {{!Promise} promise}. If the [thunk] raises an exception, {!error} will be + called with that exception. *) val fork : (unit -> unit) -> unit (** [fork action] is equivalent to @@ -349,43 +359,42 @@ module Flock : sig end module Run : sig - (** Operations for running fibers in specific patterns. *) - - val all : (unit -> unit) list -> unit - (** [all actions] starts the actions as separate fibers and waits until they - all complete or one of them raises an unhandled exception other than - {{!Control.Terminate} [Terminate]}, which is not counted as an error, - after which the remaining fibers will be canceled. - - ⚠️ One of the actions may be run on the current fiber. - - ⚠️ It is not guaranteed that any of the actions in the list are called. In - particular, after any action raises an unhandled exception or after the - main fiber is canceled, the actions that have not yet started may be - skipped entirely. - - [all] is roughly equivalent to + (** Operations for running actions concurrently. + + ⚠️ In general, when an action expected to return the unit value [()], + started by an operation in this module raises an unhandled exception, + other than {{!Control.Terminate} [Terminate]}, which is not counted as an + error, the whole operation will be canceled and the exception will be + raised. + + ⚠️ The operations in this module run their actions such that any action may + block to await without preventing other actions from being run. At the + limit every action may need to be run in a distinct fiber. However, it is + not guaranteed that every action always runs in a distinct fiber. The + actual number of fibers used can be much less than the number of actions + executed in case the actions do not block, complete quickly, and/or the + scheduler doesn't provide parallelism. + + ⚠️ The operations in this module do not guaranteed that any of the actions + are executed. In particular, after any action raises an unhandled + exception or after the main fiber is canceled, the actions that have not + yet started may be skipped entirely. *) + + val all : ?on_terminate:[ `Raise | `Ignore ] -> (unit -> unit) list -> unit + (** [all actions] starts the actions and waits until they all complete. + + [all] is roughly equivalent to: {[ - let all actions = - Bundle.join_after @@ fun bundle -> - List.iter (Bundle.fork bundle) actions - ]} - but treats the list of actions as a single computation. *) + let all ?on_terminate actions = + Bundle.join_after ?on_terminate @@ fun bundle -> + try actions |> List.iter @@ fun action -> Bundle.fork bundle action + with exn -> Bundle.error bundle exn (Printexc.get_raw_backtrace ()) + ]} *) val any : (unit -> unit) list -> unit - (** [any actions] starts the actions as separate fibers and waits until one of - them completes or raises an unhandled exception other than - {{!Control.Terminate} [Terminate]}, which is not counted as an error, - after which the rest of the started fibers will be canceled. - - ⚠️ One of the actions may be run on the current fiber. + (** [any actions] starts the actions and waits until one of them completes. - ⚠️ It is not guaranteed that any of the actions in the list are called. In - particular, after the first action returns successfully or after any - action raises an unhandled exception or after the main fiber is canceled, - the actions that have not yet started may be skipped entirely. - - [any] is roughly equivalent to + [any] is roughly equivalent to: {[ let any actions = Bundle.join_after @@ fun bundle -> @@ -395,9 +404,41 @@ module Run : sig Bundle.fork bundle @@ fun () -> action (); Bundle.terminate bundle - with Control.Terminate -> () - ]} - but treats the list of actions as a single computation. *) + with exn -> Bundle.error bundle exn (Printexc.get_raw_backtrace ()) + ]} *) + + val for_n : ?on_terminate:[ `Raise | `Ignore ] -> int -> (int -> unit) -> unit + (** [for_n n action], when [0 < n], starts [action i] for each integer [i] + from [0] to [n-1] and waits until they all complete. + + [for_n] is roughly equivalent to: + {[ + let for_n ?on_terminate n action = + Bundle.join_after ?on_terminate @@ fun bundle -> + for i = 0 to n - 1 do + Bundle.fork bundle @@ fun () -> action i + done + ]} *) + + val find_opt_n : int -> (int -> 'a option) -> 'a list + (** *) + + module Array : sig + (** Concurrent operations over arrays. *) + + type 'a t = 'a array + (** Type alias for [array]. *) + + val iter : + ?on_terminate:[ `Raise | `Ignore ] -> ('a -> unit) -> 'a t -> unit + (** [iter action array] starts [action array.(i)] for each index of the + [array] and waits until they all complete. *) + + val map : ('a -> 'b) -> 'a t -> 'b t + (** [map fn array] starts [fn array.(i)] for each index of the [array], + waits until they all complete, and return a new array with the return + values from those calls. *) + end end (** {1 Examples} @@ -408,105 +449,117 @@ end {[ let main () = - Flock.join_after begin fun () -> - let promise = - Flock.fork_as_promise @@ fun () -> - Control.block () - in - - Flock.fork begin fun () -> - Promise.await promise - end; - - Flock.fork begin fun () -> - let condition = Condition.create () - and mutex = Mutex.create () in - Mutex.protect mutex begin fun () -> - while true do - Condition.wait condition mutex - done - end - end; - - Flock.fork begin fun () -> - let sem = - Semaphore.Binary.make false - in - Semaphore.Binary.acquire sem - end; - - Flock.fork begin fun () -> - let sem = - Semaphore.Counting.make 0 - in - Semaphore.Counting.acquire sem - end; - - Flock.fork begin fun () -> - Event.sync (Event.choose []) - end; - - Flock.fork begin fun () -> - let latch = Latch.create 1 in - Latch.await latch - end; - - Flock.fork begin fun () -> - let ivar = Ivar.create () in - Ivar.read ivar - end; - - Flock.fork begin fun () -> - let stream = Stream.create () in - Stream.read (Stream.tap stream) - |> ignore - end; - - Flock.fork begin fun () -> - let@ inn, out = finally - Unix.close_pair @@ fun () -> - Unix.socketpair ~cloexec:true - PF_UNIX SOCK_STREAM 0 - in - Unix.set_nonblock inn; - let n = - Unix.read inn (Bytes.create 1) - 0 1 - in - assert (n = 1) - end; - - Flock.fork begin fun () -> - let a_month = - 60.0 *. 60.0 *. 24.0 *. 30.0 - in - Control.sleep ~seconds:a_month - end; - - (* Let the children get stuck *) - Control.sleep ~seconds:0.1; - - Flock.terminate () - end + Flock.join_after + begin + fun () -> + let promise = + Flock.fork_as_promise @@ fun () -> Control.block () + in + + Flock.fork + begin + fun () -> Promise.await promise + end; + + Flock.fork + begin + fun () -> + let condition = Condition.create () + and mutex = Mutex.create () in + Mutex.protect mutex + begin + fun () -> + while true do + Condition.wait condition mutex + done + end + end; + + Flock.fork + begin + fun () -> + let sem = Semaphore.Binary.make false in + Semaphore.Binary.acquire sem + end; + + Flock.fork + begin + fun () -> + let sem = Semaphore.Counting.make 0 in + Semaphore.Counting.acquire sem + end; + + Flock.fork + begin + fun () -> Event.sync (Event.choose []) + end; + + Flock.fork + begin + fun () -> + let latch = Latch.create 1 in + Latch.await latch + end; + + Flock.fork + begin + fun () -> + let ivar = Ivar.create () in + Ivar.read ivar + end; + + Flock.fork + begin + fun () -> + let stream = Stream.create () in + Stream.read (Stream.tap stream) |> ignore + end; + + Flock.fork + begin + fun () -> + let@ inn, out = + finally Unix.close_pair @@ fun () -> + Unix.socketpair ~cloexec:true PF_UNIX SOCK_STREAM 0 + in + Unix.set_nonblock inn; + let n = Unix.read inn (Bytes.create 1) 0 1 in + assert (n = 1) + end; + + Flock.fork + begin + fun () -> + let a_month = 60.0 *. 60.0 *. 24.0 *. 30.0 in + Control.sleep ~seconds:a_month + end; + + (* Let the children get stuck *) + Control.sleep ~seconds:0.1; + + Flock.terminate () + end ]} First of all, note that above the {{!Picos_std_sync.Mutex} [Mutex]}, - {{!Picos_std_sync.Condition} [Condition]}, and {{!Picos_std_sync.Semaphore} - [Semaphore]} modules come from the {!Picos_std_sync} library and the - {{!Picos_io.Unix} [Unix]} module comes from the {!Picos_io} library. They - do not come from the standard OCaml libraries. + {{!Picos_std_sync.Condition} [Condition]}, and + {{!Picos_std_sync.Semaphore} [Semaphore]} modules come from the + {!Picos_std_sync} library and the {{!Picos_io.Unix} [Unix]} module comes + from the {!Picos_io} library. They do not come from the standard OCaml + libraries. - The above program creates a {{!Flock} flock} of fibers and {{!Flock.fork} - forks} several fibers to the flock that all block in various ways. In - detail, + The above program creates a {{!Flock} flock} of fibers and + {{!Flock.fork} forks} several fibers to the flock that all block in various + ways. In detail, - {!Control.block} never returns, - {!Promise.await} never returns as the promise won't be completed, - {{!Picos_std_sync.Condition.wait} [Condition.wait]} never returns, because the condition is never signaled, - {{!Picos_std_sync.Semaphore.Binary.acquire} [Semaphore.Binary.acquire]} - and {{!Picos_std_sync.Semaphore.Counting.acquire} - [Semaphore.Counting.acquire]} never return, because the counts of the + and + {{!Picos_std_sync.Semaphore.Counting.acquire} + [Semaphore.Counting.acquire]} never return, because the counts of the semaphores never change from [0], - {{!Picos_std_event.Event.sync} [Event.sync]} never returns, because the event can never be committed to, @@ -520,9 +573,9 @@ end never written to, and the - {!Control.sleep} call would return only after about a month. - Fibers forked to a flock can be canceled in various ways. In the above + Fibers forked to a flock can be canceled in various ways. In the above program we call {!Flock.terminate} to cancel all of the fibers and - effectively close the flock. This allows the program to return normally + effectively close the flock. This allows the program to return normally immediately and without leaking or leaving anything in an invalid state: {[ @@ -539,7 +592,7 @@ end Cancelation is a signaling mechanism that allows structured concurrent abstractions, like the {!Flock} abstraction, to (hopefully) gracefully tear - down concurrent fibers in case of errors. Indeed, one of the basic ideas + down concurrent fibers in case of errors. Indeed, one of the basic ideas behind the {!Flock} abstraction is that in case any fiber forked to the flock raises an unhandled exception, the whole flock will be terminated and the error will be raised from the flock, which allows you to understand what @@ -547,7 +600,7 @@ end stuck, for example. Cancelation can also, with some care, be used as a mechanism to terminate - fibers once they are no longer needed. However, just like sleep, for + fibers once they are no longer needed. However, just like sleep, for example, cancelation is inherently prone to races, i.e. it is difficult to understand the exact point and state at which a fiber gets canceled and it is usually non-deterministic, and therefore cancelation is not recommended @@ -560,16 +613,18 @@ end {[ let many_errors () = Flock.join_after @@ fun () -> - let latch = Latch.create 1 in let fork_raising exn = - Flock.fork begin fun () -> - Control.protect begin fun () -> - Latch.await latch - end; - raise exn - end + Flock.fork + begin + fun () -> + Control.protect + begin + fun () -> Latch.await latch + end; + raise exn + end in fork_raising Exit; @@ -579,10 +634,11 @@ end Latch.decr latch ]} - The above program starts three fibers and uses a {{!Picos_std_sync.Latch} - latch} to ensure that all of them have been started, before two of them - raise errors and the third raises {{!Control.Terminate} [Terminate]}, which - is not considered an error in this library. Running the program + The above program starts three fibers and uses a + {{!Picos_std_sync.Latch} latch} to ensure that all of them have been + started, before two of them raise errors and the third raises + {{!Control.Terminate} [Terminate]}, which is not considered an error in this + library. Running the program {[ # Picos_mux_fifo.run many_errors @@ -599,35 +655,32 @@ end {[ let run_server server_fd = - Flock.join_after begin fun () -> - while true do - let@ client_fd = - instantiate Unix.close @@ fun () -> - Unix.accept - ~cloexec:true server_fd |> fst - in - - (* Fork a fiber for client *) - Flock.fork begin fun () -> - let@ client_fd = - move client_fd - in - Unix.set_nonblock client_fd; - - let bs = Bytes.create 100 in - let n = - Unix.read client_fd bs 0 - (Bytes.length bs) - in - Unix.write client_fd bs 0 n - |> ignore - end - done - end + Flock.join_after + begin + fun () -> + while true do + let@ client_fd = + instantiate Unix.close @@ fun () -> + Unix.accept ~cloexec:true server_fd |> fst + in + + (* Fork a fiber for client *) + Flock.fork + begin + fun () -> + let@ client_fd = move client_fd in + Unix.set_nonblock client_fd; + + let bs = Bytes.create 100 in + let n = Unix.read client_fd bs 0 (Bytes.length bs) in + Unix.write client_fd bs 0 n |> ignore + end + done + end ]} - The server function expects a listening socket. For each accepted client - the server forks a new fiber to handle it. The client socket is + The server function expects a listening socket. For each accepted client the + server forks a new fiber to handle it. The client socket is {{!Finally.move} moved} from the server fiber to the client fiber to avoid leaks and to ensure that the socket will be closed. @@ -637,32 +690,23 @@ end let run_client server_addr = let@ socket = finally Unix.close @@ fun () -> - Unix.socket ~cloexec:true - PF_INET SOCK_STREAM 0 + Unix.socket ~cloexec:true PF_INET SOCK_STREAM 0 in Unix.set_nonblock socket; Unix.connect socket server_addr; let msg = "Hello!" in - Unix.write_substring - socket msg 0 (String.length msg) - |> ignore; + Unix.write_substring socket msg 0 (String.length msg) |> ignore; - let bytes = - Bytes.create (String.length msg) - in - let n = - Unix.read socket bytes 0 - (Bytes.length bytes) - in + let bytes = Bytes.create (String.length msg) in + let n = Unix.read socket bytes 0 (Bytes.length bytes) in - Printf.printf "Received: %s\n%!" - (Bytes.sub_string bytes 0 n) + Printf.printf "Received: %s\n%!" (Bytes.sub_string bytes 0 n) ]} The client function takes the address of the server and connects a socket to - the server address. It then writes a message to the server and reads a - reply from the server and prints it. + the server address. It then writes a message to the server and reads a reply + from the server and prints it. Here is the main program: @@ -670,37 +714,37 @@ end let main () = let@ server_fd = finally Unix.close @@ fun () -> - Unix.socket ~cloexec:true - PF_INET SOCK_STREAM 0 + Unix.socket ~cloexec:true PF_INET SOCK_STREAM 0 in Unix.set_nonblock server_fd; (* Let system determine the port *) - Unix.bind server_fd Unix.( - ADDR_INET(inet_addr_loopback, 0)); + Unix.bind server_fd Unix.(ADDR_INET (inet_addr_loopback, 0)); Unix.listen server_fd 8; - let server_addr = - Unix.getsockname server_fd - in - - Flock.join_after ~on_return:`Terminate begin fun () -> - (* Start server *) - Flock.fork begin fun () -> - run_server server_fd - end; - - (* Run clients concurrently *) - Flock.join_after begin fun () -> - for _ = 1 to 5 do - Flock.fork @@ fun () -> - run_client server_addr - done + let server_addr = Unix.getsockname server_fd in + + Flock.join_after ~on_return:`Terminate + begin + fun () -> + (* Start server *) + Flock.fork + begin + fun () -> run_server server_fd + end; + + (* Run clients concurrently *) + Flock.join_after + begin + fun () -> + for _ = 1 to 5 do + Flock.fork @@ fun () -> run_client server_addr + done + end end - end ]} - The main program creates a socket for the server and configures it. The - server is then started as a fiber in a flock terminated on return. Then the + The main program creates a socket for the server and configures it. The + server is then started as a fiber in a flock terminated on return. Then the clients are started to run concurrently in an inner flock. Finally we run the main program with a scheduler: @@ -716,7 +760,7 @@ end ]} As an exercise, you might want to refactor the server to avoid - {{!Finally.move} moving} the file descriptors and use a {{!Finally.let@} - recursive} accept loop instead. You could also {{!Flock.terminate} - terminate the whole flock} at the end instead of just terminating the - server. *) + {{!Finally.move} moving} the file descriptors and use a + {{!Finally.let@} recursive} accept loop instead. You could also + {{!Flock.terminate} terminate the whole flock} at the end instead of just + terminating the server. *) diff --git a/lib/picos_std.structured/run.ml b/lib/picos_std.structured/run.ml index bc9122f50..1085a9387 100644 --- a/lib/picos_std.structured/run.ml +++ b/lib/picos_std.structured/run.ml @@ -27,13 +27,58 @@ let rec spawn (Bundle r as t : Bundle.t) wrap = function Fiber.spawn fiber (wrap t main); spawn t wrap mains -let run actions wrap = - Bundle.join_after @@ fun (Bundle _ as t : Bundle.t) -> +let run ?on_terminate actions wrap = + Bundle.join_after ?on_terminate @@ fun (Bundle _ as t : Bundle.t) -> try spawn t wrap actions with exn -> let bt = Printexc.get_raw_backtrace () in Bundle.decr t; Bundle.error t exn bt -let all actions = run actions wrap_all +let all ?on_terminate actions = run ?on_terminate actions wrap_all let any actions = run actions wrap_any + +(* *) + +let for_n = For.for_n + +let find_opt_n n fn = + let results = Atomic.make [] in + begin + match + For.for_n ~on_terminate:`Raise n @@ fun i -> + match fn i with + | None -> () + | Some v -> + let backoff = ref Backoff.default in + while + let before = Atomic.get results in + let after = v :: before in + not (Atomic.compare_and_set results before after) + do + backoff := Backoff.once !backoff + done; + raise_notrace Control.Terminate + with + | () -> () + | exception Control.Terminate -> () + end; + Atomic.get results + +module Array = struct + type 'a t = 'a array + + let iter ?on_terminate action xs = + for_n ?on_terminate (Array.length xs) @@ fun i -> + action (Array.unsafe_get xs i) + + let[@inline never] map fn xs = + let n = Array.length xs in + if n = 0 then [||] + else + let ys = Array.make n (Obj.magic ()) in + for_n ~on_terminate:`Raise n (fun i -> + Array.unsafe_set ys i (fn (Array.unsafe_get xs i))); + if Obj.double_tag != Obj.tag (Obj.repr (Array.unsafe_get ys 0)) then ys + else Array.map Fun.id ys +end diff --git a/test/dune b/test/dune index 48b81eb49..424093abb 100644 --- a/test/dune +++ b/test/dune @@ -256,7 +256,10 @@ (modules test_structured) (libraries alcotest + backoff + multicore-magic picos + picos.domain picos_aux.mpscq picos_std.finally picos_std.structured diff --git a/test/test_structured.ml b/test/test_structured.ml index 27a2c17c2..eaa518452 100644 --- a/test/test_structured.ml +++ b/test/test_structured.ml @@ -3,9 +3,10 @@ open Picos_std_finally open Picos_std_structured open Picos_std_sync module Mpscq = Picos_aux_mpscq +module Atomic_array = Multicore_magic.Atomic_array (** Helper to check that computation is restored *) -let check join_after ?callstack ?on_return scope = +let check join_after ?callstack ?on_return ?on_terminate scope = let open Picos in let fiber = Fiber.current () in let before = Fiber.get_computation fiber in @@ -14,7 +15,7 @@ let check join_after ?callstack ?on_return scope = assert (before == after) in lastly check_computation_was_scoped @@ fun () -> - join_after ?callstack ?on_return @@ fun bundle -> + join_after ?callstack ?on_return ?on_terminate @@ fun bundle -> let during = Fiber.get_computation fiber in assert (before != during); scope bundle @@ -219,7 +220,10 @@ let test_any_and_all_returns () = |> List.iter @@ fun n_terminates -> [ 0; 1; 2 ] |> List.iter @@ fun n_incr -> - [ (Run.all, n_incr, n_incr); (Run.any, Int.min 1 n_incr, n_incr) ] + [ + (Run.all ?on_terminate:None, n_incr, n_incr); + (Run.any, Int.min 1 n_incr, n_incr); + ] |> List.iter @@ fun (run_op, min, max) -> Test_scheduler.run ~max_domains:(n_terminates + n_incr + 1) @@ fun () -> @@ -258,6 +262,52 @@ let test_race_any () = (* This is non-deterministic and may need to changed if flaky *) assert (Atomic.get winner = 1) +let test_for_n_basic () = + Test_scheduler.run ~max_domains:(Picos_domain.recommended_domain_count ()) + @@ fun () -> + [ `Ignore; `Raise ] + |> List.iter @@ fun on_terminate -> + for n = 0 to 128 do + let elems = Atomic_array.make n 0 in + let incremented = Atomic.make 0 in + let terminated = Atomic.make 0 in + match + Run.for_n ~on_terminate n @@ fun i -> + if Random.bool () then Control.yield (); + if Random.int n = i then begin + Atomic.incr terminated; + raise Control.Terminate + end; + Atomic.incr incremented; + while + let before = Atomic_array.unsafe_fenceless_get elems i in + let after = before + 1 in + not (Atomic_array.unsafe_compare_and_set elems i before after) + do + Backoff.once Backoff.default |> ignore + done + with + | () -> + if on_terminate != `Ignore then begin + assert (0 = Atomic.get terminated); + assert (n = Atomic.get incremented) + end; + for i = 0 to n - 1 do + let n = Atomic_array.unsafe_fenceless_get elems i in + assert (0 <= n && n <= 1); + if n = 0 then Atomic.decr terminated else Atomic.decr incremented + done; + assert (0 = Atomic.get terminated); + assert (0 = Atomic.get incremented) + | exception Control.Terminate -> + assert (on_terminate == `Raise); + assert (1 <= Atomic.get terminated); + for i = 0 to n - 1 do + let n = Atomic_array.unsafe_fenceless_get elems i in + assert (0 <= n && n <= 1) + done + done + let () = [ ( "Bundle", @@ -286,6 +336,7 @@ let () = Alcotest.test_case "any and all errors" `Quick test_any_and_all_errors; Alcotest.test_case "any and all returns" `Quick test_any_and_all_returns; Alcotest.test_case "race any" `Quick test_race_any; + Alcotest.test_case "for_n basic" `Quick test_for_n_basic; ] ); ] |> Alcotest.run "Picos_structured"