From 8dee4fcbf0be5cdd416c18deb44e6e904312be9d Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Sun, 14 Jul 2024 14:11:18 +0300 Subject: [PATCH] Redesign `Spawn` and `FLS` This PR redesigns the `Spawn` and `FLS` mechanism to better support new use cases. --- bench/bench_fib.ocaml5.ml | 2 +- bench/bench_spawn.ml | 28 +++--- lib/picos/bootstrap/picos_bootstrap.ml | 96 ++++++++++++++++++-- lib/picos/intf.ocaml5.ml | 17 ++-- lib/picos/ocaml4/picos_ocaml.ml | 16 ++-- lib/picos/ocaml5/picos_ocaml.ml | 12 +-- lib/picos/picos.mli | 70 ++++++++------- lib/picos_fifos/picos_fifos.ml | 32 ++++--- lib/picos_fifos/picos_fifos.mli | 2 +- lib/picos_lwt/picos_lwt.ml | 11 +-- lib/picos_randos/picos_randos.ml | 41 ++++++--- lib/picos_randos/picos_randos.mli | 2 +- lib/picos_structured/bundle.ml | 35 +++++--- lib/picos_structured/run.ml | 48 +++++----- lib/picos_threaded/picos_threaded.ml | 119 +++++++++---------------- lib/picos_threaded/picos_threaded.mli | 2 +- test/dune | 7 ++ test/lib/meow/dllist.ml | 43 +++++++++ test/lib/meow/dllist.mli | 10 +++ test/lib/meow/dune | 3 + test/lib/meow/meow.ml | 2 + test/lib/meow/meow.mli | 16 ++++ test/lib/meow/ownership.ml | 115 ++++++++++++++++++++++++ test/lib/meow/promise.ml | 48 ++++++++++ test/test_meow.ml | 22 +++++ test/test_picos.ml | 8 +- test/test_scheduler.ocaml4.ml | 4 +- test/test_scheduler.ocaml5.ml | 24 +++-- test/test_schedulers.ml | 29 +++++- test/test_sync.ml | 17 ++-- 30 files changed, 631 insertions(+), 250 deletions(-) create mode 100644 test/lib/meow/dllist.ml create mode 100644 test/lib/meow/dllist.mli create mode 100644 test/lib/meow/dune create mode 100644 test/lib/meow/meow.ml create mode 100644 test/lib/meow/meow.mli create mode 100644 test/lib/meow/ownership.ml create mode 100644 test/lib/meow/promise.ml create mode 100644 test/test_meow.ml diff --git a/bench/bench_fib.ocaml5.ml b/bench/bench_fib.ocaml5.ml index 13e933a68..cbe063882 100644 --- a/bench/bench_fib.ocaml5.ml +++ b/bench/bench_fib.ocaml5.ml @@ -9,7 +9,7 @@ let rec exp_fib i = else let computation = Computation.create () in let main () = Computation.return computation (exp_fib (i - 2)) in - Fiber.spawn ~forbid:false computation [ main ]; + Fiber.spawn (Fiber.create ~forbid:false computation) main; let f1 = exp_fib (i - 1) in let f2 = Computation.await computation in f1 + f2 diff --git a/bench/bench_spawn.ml b/bench/bench_spawn.ml index a71a5a485..a17a24081 100644 --- a/bench/bench_spawn.ml +++ b/bench/bench_spawn.ml @@ -5,7 +5,7 @@ let factor = Util.iter_factor * if String.starts_with ~prefix:"4." Sys.ocaml_version then 1 else 10 -let run_one ~budgetf ~at_a_time () = +let run_one ~budgetf () = let n_spawns = 10 * factor in let init _ = () in @@ -13,25 +13,23 @@ let run_one ~budgetf ~at_a_time () = let work _ () = let counter = ref n_spawns in let computation = Computation.create () in - for _ = 1 to n_spawns / at_a_time do - let main () = - let n = !counter - 1 in - counter := n; - if n = 0 then Computation.finish computation - in - let mains = List.init at_a_time @@ fun _ -> main in - Fiber.spawn ~forbid:false computation mains + let computation_packed = Computation.Packed computation in + let main () = + let n = !counter - 1 in + counter := n; + if n = 0 then Computation.finish computation; + Fiber.finalize (Fiber.current ()) + in + for _ = 1 to n_spawns do + let fiber = Fiber.create_packed ~forbid:false computation_packed in + Fiber.spawn fiber main done; Computation.await computation in - let config = Printf.sprintf "%d at a time" at_a_time in + let config = "with packed computation" in Times.record ~budgetf ~n_domains:1 ~n_warmups:1 ~n_runs_min:1 ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_spawns ~singular:"spawn" ~config -let run_suite ~budgetf = - if Sys.int_size <= 32 then [] - else - [ 1; 2; 4; 8 ] - |> List.concat_map @@ fun at_a_time -> run_one ~budgetf ~at_a_time () +let run_suite ~budgetf = if Sys.int_size <= 32 then [] else run_one ~budgetf () diff --git a/lib/picos/bootstrap/picos_bootstrap.ml b/lib/picos/bootstrap/picos_bootstrap.ml index 510aec76b..16cc316bd 100644 --- a/lib/picos/bootstrap/picos_bootstrap.ml +++ b/lib/picos/bootstrap/picos_bootstrap.ml @@ -331,13 +331,60 @@ module Fiber = struct type 'a initial = Constant of 'a | Computed of (unit -> 'a) - let new_key initial = + type finalizers = + | Nil + | Finalizer : { + index : int; + finalize : 'a -> unit; + next : finalizers; + } + -> finalizers + + let finalizers = Atomic.make Nil + + let rec add_finalizer index finalize = + let before = Atomic.get finalizers in + let after = Finalizer { index; finalize; next = before } in + if not (Atomic.compare_and_set finalizers before after) then + add_finalizer index finalize + + type initializers = + | Nil + | Initializer : { + key : 'a key; + initialize : t -> 'a; + next : initializers; + } + -> initializers + + let initializers = Atomic.make Nil + + let rec add_initializer key initialize = + let before = Atomic.get initializers in + let after = Initializer { key; initialize; next = before } in + if not (Atomic.compare_and_set initializers before after) then + add_initializer key initialize + + let new_key ?finalize ?initialize initial = let index = Atomic.fetch_and_add counter 1 in - match initial with - | Constant default -> - let default = Sys.opaque_identity (Obj.magic default : non_float) in - { index; default; compute } - | Computed compute -> { index; default = unique; compute } + begin + match finalize with + | None -> () + | Some finalize -> add_finalizer index finalize + end; + let key = + match initial with + | Constant default -> + let default = Sys.opaque_identity (Obj.magic default : non_float) in + { index; default; compute } + | Computed compute -> { index; default = unique; compute } + in + begin + match initialize with + | None -> () + | Some initialize -> add_initializer key initialize + end; + key let get (type a) (Fiber r : t) (key : a key) = let fls = r.fls in @@ -369,6 +416,13 @@ module Fiber = struct (Sys.opaque_identity (Obj.magic value : non_float)); value + let[@inline] has (Fiber r : t) key = + let fls = r.fls in + key.index < Array.length fls + && + let value = Array.unsafe_get fls key.index in + value != unique + let set (type a) (Fiber r : t) (key : a key) (value : a) = let fls = r.fls in if key.index < Array.length fls then @@ -379,14 +433,40 @@ module Fiber = struct r.fls <- fls; Array.unsafe_set fls key.index (Sys.opaque_identity (Obj.magic value : non_float)) + + let rec finalize fls = function + | Finalizer r -> + if r.index < Array.length fls then begin + let value = Array.unsafe_get fls r.index in + if value != unique then r.finalize (Obj.magic value) + end; + finalize fls r.next + | Nil -> () + + let rec initialize ~parent ~child = function + | Initializer r -> + if not (has child r.key) then set child r.key (r.initialize parent); + initialize ~parent ~child r.next + | Nil -> () end + + let[@inline] finalize (Fiber t : t) = + let fls = t.fls in + if 0 < Array.length fls then FLS.finalize fls (Atomic.get FLS.finalizers) + + let[@inline] initialize ~parent ~child = + match Atomic.get FLS.initializers with + | Initializer r -> + if not (FLS.has child r.key) then + FLS.set child r.key (r.initialize parent); + FLS.initialize ~parent ~child r.next + | Nil -> () end module Handler = struct type 'c t = { current : 'c -> Fiber.t; - spawn : - 'a. 'c -> forbid:bool -> 'a Computation.t -> (unit -> unit) list -> unit; + spawn : 'c -> Fiber.t -> (unit -> unit) -> unit; yield : 'c -> unit; cancel_after : 'a. 'c -> 'a Computation.t -> seconds:float -> Exn_bt.t -> unit; diff --git a/lib/picos/intf.ocaml5.ml b/lib/picos/intf.ocaml5.ml index 83b5cbf35..24cb487db 100644 --- a/lib/picos/intf.ocaml5.ml +++ b/lib/picos/intf.ocaml5.ml @@ -132,13 +132,13 @@ module type Fiber = sig associated with the fiber has been canceled the scheduler is free to discontinue the fiber immediately before spawning new fibers. - The scheduler is free to run the newly created fibers on any domain and + The scheduler is free to run the newly created fiber on any domain and decide which fiber to give priority to. - ⚠️ The scheduler should guarantee that, when [Spawn] returns normally, all - of the [mains] will eventually be called by the scheduler and, when - [Spawn] raises an exception, none of the [mains] will be called. In other - words, [Spawn] should check cancelation just once and be all or nothing. + ⚠️ The scheduler should guarantee that, when [Spawn] returns normally, the + given [main] will eventually be called by the scheduler and, when [Spawn] + raises an exception, the [main] will not be called. In other words, + [Spawn] should check cancelation just once and be all or nothing. Furthermore, in case a newly spawned fiber is canceled before its main is called, the scheduler must still call the main. This allows a program to ensure, i.e. keep track of, that all fibers it spawns are terminated @@ -146,10 +146,5 @@ module type Fiber = sig properly. *) type _ Effect.t += private - | Spawn : { - forbid : bool; - computation : 'a computation; - mains : (unit -> unit) list; - } - -> unit Effect.t + | Spawn : { fiber : t; main : unit -> unit } -> unit Effect.t end diff --git a/lib/picos/ocaml4/picos_ocaml.ml b/lib/picos/ocaml4/picos_ocaml.ml index ad1743729..d92ffc11d 100644 --- a/lib/picos/ocaml4/picos_ocaml.ml +++ b/lib/picos/ocaml4/picos_ocaml.ml @@ -1,17 +1,17 @@ open Picos_bootstrap -let error () = +let[@inline never] error _ = raise (Sys_error "Picos.Handler.using not called for current thread") module Handler = struct type entry = E : { context : 'a; handler : 'a Handler.t } -> entry let default = - let current _ = error () - and spawn _ ~forbid:_ _ _ = error () - and yield _ = error () - and cancel_after _ _ ~seconds:_ _ = error () - and await _ _ = error () in + let current = error + and spawn _ _ = error + and yield = error + and cancel_after _ _ ~seconds:_ = error + and await _ = error in E { context = (); handler = { current; spawn; yield; cancel_after; await } } let key = Picos_thread.TLS.new_key @@ fun () -> default @@ -42,9 +42,9 @@ module Fiber = struct let (E r) = Handler.get () in r.handler.current r.context - let spawn ~forbid computation mains = + let spawn fiber main = let (E r) = Handler.get () in - r.handler.spawn r.context ~forbid computation mains + r.handler.spawn r.context fiber main let yield () = let (E r) = Handler.get () in diff --git a/lib/picos/ocaml5/picos_ocaml.ml b/lib/picos/ocaml5/picos_ocaml.ml index 8e845137c..0f2227bc4 100644 --- a/lib/picos/ocaml5/picos_ocaml.ml +++ b/lib/picos/ocaml5/picos_ocaml.ml @@ -25,15 +25,9 @@ module Fiber = struct let current () = Effect.perform Current type _ Effect.t += - | Spawn : { - forbid : bool; - computation : 'a Computation.t; - mains : (unit -> unit) list; - } - -> unit Effect.t + | Spawn : { fiber : Fiber.t; main : unit -> unit } -> unit Effect.t - let spawn ~forbid computation mains = - Effect.perform @@ Spawn { forbid; computation; mains } + let spawn fiber main = Effect.perform @@ Spawn { fiber; main } type _ Effect.t += Yield : unit Effect.t @@ -78,7 +72,7 @@ module Handler = struct | Fiber.Spawn r -> Some (fun k -> - match h.spawn c ~forbid:r.forbid r.computation r.mains with + match h.spawn c r.fiber r.main with | unit -> Effect.Deep.continue k unit | exception exn -> discontinue k exn) | Fiber.Yield -> yield diff --git a/lib/picos/picos.mli b/lib/picos/picos.mli index f7ed275c3..08ce25c71 100644 --- a/lib/picos/picos.mli +++ b/lib/picos/picos.mli @@ -757,30 +757,6 @@ module Fiber : sig (** [sleep ~seconds] suspends the current fiber for the specified number of [seconds]. *) - (** {2 Interface for spawning} *) - - val spawn : forbid:bool -> 'a Computation.t -> (unit -> unit) list -> unit - (** [spawn ~forbid computation mains] starts new fibers by performing the - {!Spawn} effect. The fibers will share the same [computation] and start - with {{!Fiber.has_forbidden} propagation of cancelation forbidden or - permitted} depending on the [forbid] flag. - - ℹ️ Any {{!Computation} computation}, including the computation of the - current fiber, may be passed as the computation for new fibers. Higher - level libraries are free to implement the desired structuring principles. - - ⚠️ Behavior is undefined if any function in [mains] raises an exception. - For example, raising an exception might terminate the whole application - (recommended, but not required) or the exception might be ignored. In - other words, the caller {i must} arrange for the computation to be - completed and errors reported in a desired manner. - - ℹ️ The behavior is that - - - on OCaml 5, [spawn] performs the {!Spawn} effect, and - - on OCaml 4, [spawn] will call the [spawn] operation of the {{!Handler} - current handler}. *) - (** {2 Interface for current fiber} *) type t @@ -943,7 +919,8 @@ module Fiber : sig (** Type to specify initial values for fibers. *) type 'a initial = Constant of 'a | Computed of (unit -> 'a) - val new_key : 'a initial -> 'a key + val new_key : + ?finalize:('a -> unit) -> ?initialize:(t -> 'a) -> 'a initial -> 'a key (** [new_key initial] allocates a new key for associating values in storage associated with fibers. The [initial] value for every fiber is either the given {!Constant} or is {!Computed} with the given function. If the @@ -965,6 +942,38 @@ module Fiber : sig ⚠️ It is only safe to call [set] from the fiber itself. *) end + (** {2 Interface for spawning} *) + + val create_packed : forbid:bool -> Computation.packed -> t + (** [create_packed ~forbid packed] creates a new fiber record. *) + + val create : forbid:bool -> 'a Computation.t -> t + (** [create ~forbid computation] is equivalent to + {{!create_packed} [create_packed ~forbid (Computation.Packed computation)]}. *) + + val finalize : t -> unit + (** *) + + val spawn : t -> (unit -> unit) -> unit + (** [spawn fiber main] starts a new fiber by performing the {!Spawn} effect. + + ⚠️ Fiber records must be unique and the caller of [spawn] must make sure + that a specific {{!fiber} fiber} record is not reused. Failure to ensure + that fiber records are unique will break concurrent abstractions written + on top the the Picos interface. + + ⚠️ Behavior is undefined if the [main] function raises an exception. For + example, raising an exception might terminate the whole application + (recommended, but not required) or the exception might be ignored. In + other words, the caller {i must} arrange for any exceptions to be handled + in a desired manner. + + ℹ️ The behavior is that + + - on OCaml 5, [spawn] performs the {!Spawn} effect, and + - on OCaml 4, [spawn] will call the [spawn] operation of the {{!Handler} + current handler}. *) + (** {2 Interface for structuring} *) val get_computation : t -> Computation.packed @@ -1051,12 +1060,8 @@ module Fiber : sig (** {2 Interface for schedulers} *) - val create_packed : forbid:bool -> Computation.packed -> t - (** [create_packed ~forbid packed] creates a new fiber. *) - - val create : forbid:bool -> 'a Computation.t -> t - (** [create ~forbid computation] is equivalent to - {{!create_packed} [create_packed ~forbid (Computation.Packed computation)]}. *) + val initialize : parent:t -> child:t -> unit + (** *) val try_suspend : t -> Trigger.t -> 'x -> 'y -> (Trigger.t -> 'x -> 'y -> unit) -> bool @@ -1115,8 +1120,7 @@ module Handler : sig type 'c t = { current : 'c -> Fiber.t; (** See {!Picos.Fiber.current}. *) - spawn : - 'a. 'c -> forbid:bool -> 'a Computation.t -> (unit -> unit) list -> unit; + spawn : 'c -> Fiber.t -> (unit -> unit) -> unit; (** See {!Picos.Fiber.spawn}. *) yield : 'c -> unit; (** See {!Picos.Fiber.yield}. *) cancel_after : diff --git a/lib/picos_fifos/picos_fifos.ml b/lib/picos_fifos/picos_fifos.ml index 76c6efd5a..ff5037492 100644 --- a/lib/picos_fifos/picos_fifos.ml +++ b/lib/picos_fifos/picos_fifos.ml @@ -19,15 +19,9 @@ type t = { (Exn_bt.t option, unit) Effect.Deep.continuation -> unit; retc : unit -> unit; + exnc : exn -> unit; } -let rec spawn t n forbid packed = function - | [] -> Atomic.fetch_and_add t.num_alive_fibers n |> ignore - | main :: mains -> - let fiber = Fiber.create_packed ~forbid packed in - Picos_mpscq.push t.ready (Spawn (fiber, main)); - spawn t (n + 1) forbid packed mains - let continue = Some (fun k -> Effect.Deep.continue k ()) let rec next t = @@ -55,7 +49,9 @@ let rec next t = whole operation or discontinue the fiber. *) if Fiber.is_canceled fiber then discontinue else begin - spawn t 0 r.forbid (Packed r.computation) r.mains; + Fiber.initialize ~parent:fiber ~child:r.fiber; + Atomic.incr t.num_alive_fibers; + Picos_mpscq.push t.ready (Spawn (r.fiber, r.main)); continue end | Fiber.Yield -> yield @@ -79,7 +75,7 @@ let rec next t = else Fiber.resume fiber k) | _ -> None in - Effect.Deep.match_with main () { retc = t.retc; exnc = raise; effc } + Effect.Deep.match_with main () { retc = t.retc; exnc = t.exnc; effc } | Continue (fiber, k) -> Fiber.continue fiber k () | Resume (fiber, k) -> Fiber.resume fiber k | exception Picos_mpscq.Empty -> @@ -102,7 +98,7 @@ let rec next t = next t end -let run ?(forbid = false) main = +let run ?(fatal_exn_handler = raise) ?(forbid = false) main = Select.check_configured (); let ready = Picos_mpscq.create ~padded:true () and needs_wakeup = Atomic.make false |> Multicore_magic.copy_as_padded @@ -110,7 +106,16 @@ let run ?(forbid = false) main = and mutex = Mutex.create () and condition = Condition.create () in let rec t = - { ready; needs_wakeup; num_alive_fibers; mutex; condition; resume; retc } + { + ready; + needs_wakeup; + num_alive_fibers; + mutex; + condition; + resume; + retc; + exnc = fatal_exn_handler; + } and retc () = Atomic.decr t.num_alive_fibers; next t @@ -145,7 +150,10 @@ let run ?(forbid = false) main = in let computation = Computation.create ~mode:`LIFO () in let fiber = Fiber.create ~forbid computation in - let main = Computation.capture computation main in + let main () = + Computation.capture computation main (); + Fiber.finalize fiber + in Picos_mpscq.push t.ready (Spawn (fiber, main)); next t; Computation.await computation diff --git a/lib/picos_fifos/picos_fifos.mli b/lib/picos_fifos/picos_fifos.mli index da4638d24..227162928 100644 --- a/lib/picos_fifos/picos_fifos.mli +++ b/lib/picos_fifos/picos_fifos.mli @@ -22,7 +22,7 @@ This scheduler also gives priority to fibers woken up from {{!Picos.Trigger.await} [await]} due to being canceled. *) -val run : ?forbid:bool -> (unit -> 'a) -> 'a +val run : ?fatal_exn_handler:(exn -> unit) -> ?forbid:bool -> (unit -> 'a) -> 'a (** [run main] runs the [main] thunk with the scheduler. Returns after [main] and all of the fibers spawned by [main] have returned. diff --git a/lib/picos_lwt/picos_lwt.ml b/lib/picos_lwt/picos_lwt.ml index a438ee57e..4184883ec 100644 --- a/lib/picos_lwt/picos_lwt.ml +++ b/lib/picos_lwt/picos_lwt.ml @@ -49,13 +49,9 @@ let[@alert "-handler"] rec go : (fun k -> match Fiber.canceled fiber with | None -> - let packed = Computation.Packed r.computation in - List.iter - (fun main -> - let fiber = Fiber.create_packed ~forbid:r.forbid packed in - Lwt.async @@ fun () -> - go fiber system (Effect.Shallow.fiber main) (Ok ())) - r.mains; + Fiber.initialize ~parent:fiber ~child:r.fiber; + Lwt.async (fun () -> + go r.fiber system (Effect.Shallow.fiber r.main) (Ok ())); go fiber system k (Ok ()) | Some exn_bt -> go fiber system k (Error exn_bt)) | Fiber.Yield -> @@ -111,6 +107,7 @@ let run ?(forbid = false) system main = let fiber = Fiber.create ~forbid computation in let main () = Computation.capture computation main (); + Fiber.finalize fiber; Computation.await computation in go fiber system (Effect.Shallow.fiber main) (Ok ()) diff --git a/lib/picos_randos/picos_randos.ml b/lib/picos_randos/picos_randos.ml index 25fe3f009..5bab98b89 100644 --- a/lib/picos_randos/picos_randos.ml +++ b/lib/picos_randos/picos_randos.ml @@ -44,21 +44,13 @@ type t = { (Exn_bt.t option, unit) Effect.Deep.continuation -> unit; retc : unit -> unit; + exnc : exn -> unit; num_waiters : int ref; condition : Condition.t; mutex : Mutex.t; mutable run : bool; } -let rec spawn t forbid packed = function - | [] -> () - | main :: mains -> - let fiber = Fiber.create_packed ~forbid packed in - Atomic.incr t.num_alive_fibers; - Collection.push t.ready (Spawn (fiber, main)); - if !(t.num_waiters_non_zero) then Condition.signal t.condition; - spawn t forbid packed mains - let rec next t = match Collection.pop_exn t.ready with | Spawn (fiber, main) -> @@ -84,7 +76,10 @@ let rec next t = | Fiber.Spawn r -> if Fiber.is_canceled fiber then yield else begin - spawn t r.forbid (Packed r.computation) r.mains; + Fiber.initialize ~parent:fiber ~child:r.fiber; + Atomic.incr t.num_alive_fibers; + Collection.push t.ready (Spawn (r.fiber, r.main)); + if !(t.num_waiters_non_zero) then Condition.signal t.condition; return end | Fiber.Yield -> yield @@ -112,7 +107,7 @@ let rec next t = end) | _ -> None in - Effect.Deep.match_with main () { retc = t.retc; exnc = raise; effc } + Effect.Deep.match_with main () { retc = t.retc; exnc = t.exnc; effc } | Raise (k, exn_bt) -> Exn_bt.discontinue k exn_bt | Return k -> Effect.Deep.continue k () | Current (fiber, k) -> Effect.Deep.continue k fiber @@ -151,8 +146,24 @@ let rec next t = Condition.broadcast t.condition end -let context () = +let default_fatal_exn_handler exn = + prerr_string "Fatal error: exception "; + prerr_string (Printexc.to_string exn); + prerr_char '\n'; + Printexc.print_backtrace stderr; + flush stderr; + exit 2 + +let context ?fatal_exn_handler () = Select.check_configured (); + let exnc = + match fatal_exn_handler with + | None -> default_fatal_exn_handler + | Some handler -> + fun exn -> + handler exn; + raise exn + in let rec t = { ready = Collection.create (); @@ -160,6 +171,7 @@ let context () = num_alive_fibers = Atomic.make 1 |> Multicore_magic.copy_as_padded; resume; retc; + exnc; num_waiters = ref 0 |> Multicore_magic.copy_as_padded; condition = Condition.create (); mutex = Mutex.create (); @@ -219,7 +231,10 @@ let run ?context:t_opt ?(forbid = false) main = Mutex.unlock t.mutex; let computation = Computation.create ~mode:`LIFO () in let fiber = Fiber.create ~forbid computation in - let main = Computation.capture computation main in + let main () = + Computation.capture computation main (); + Fiber.finalize fiber + in Collection.push t.ready (Spawn (fiber, main)); next t; Mutex.lock t.mutex; diff --git a/lib/picos_randos/picos_randos.mli b/lib/picos_randos/picos_randos.mli index 0e36c3ebc..9460acb81 100644 --- a/lib/picos_randos/picos_randos.mli +++ b/lib/picos_randos/picos_randos.mli @@ -13,7 +13,7 @@ type t (** Represents a shared context for randomized runners. *) -val context : unit -> t +val context : ?fatal_exn_handler:(exn -> unit) -> unit -> t (** [context ()] creates a new context for randomized runners. The context should be consumed by a call of {{!run} [run ~context ...]}. *) diff --git a/lib/picos_structured/bundle.ml b/lib/picos_structured/bundle.ml index 8ca94ac52..70cda6cfb 100644 --- a/lib/picos_structured/bundle.ml +++ b/lib/picos_structured/bundle.ml @@ -4,17 +4,18 @@ let[@inline never] completed () = invalid_arg "already completed" type t = { num_fibers : int Atomic.t; - bundle : unit Computation.t; + bundle : Computation.packed; errors : Control.Errors.t; finished : Trigger.t; } let terminate ?callstack t = - Computation.cancel t.bundle (Control.terminate_bt ?callstack ()) + let (Packed bundle) = t.bundle in + Computation.cancel bundle (Control.terminate_bt ?callstack ()) let terminate_after ?callstack t ~seconds = - Computation.cancel_after t.bundle ~seconds - (Control.terminate_bt ?callstack ()) + let (Packed bundle) = t.bundle in + Computation.cancel_after bundle ~seconds (Control.terminate_bt ?callstack ()) let error ?callstack t (exn_bt : Exn_bt.t) = if exn_bt.Exn_bt.exn != Control.Terminate then begin @@ -25,7 +26,8 @@ let error ?callstack t (exn_bt : Exn_bt.t) = let decr t = let n = Atomic.fetch_and_add t.num_fibers (-1) in if n = 1 then begin - Computation.finish t.bundle; + let (Packed bundle) = t.bundle in + Computation.cancel bundle (Control.terminate_bt ()); Trigger.signal t.finished end @@ -44,18 +46,18 @@ let join_after fn = (* The sequence of operations below ensures that nothing is leaked. *) let t = let num_fibers = Atomic.make 1 in - let bundle = Computation.create ~mode:`LIFO () in + let bundle = Computation.Packed (Computation.create ~mode:`LIFO ()) in let errors = Control.Errors.create () in let finished = Trigger.create () in { num_fibers; bundle; errors; finished } in let fiber = Fiber.current () in let (Packed parent as packed) = Fiber.get_computation fiber in - let bundle = Computation.Packed t.bundle in - let canceler = Computation.attach_canceler ~from:parent ~into:t.bundle in + let (Packed bundle) = t.bundle in + let canceler = Computation.attach_canceler ~from:parent ~into:bundle in (* Ideally there should be no poll point betweem [attach_canceler] and the [match ... with] below. *) - Fiber.set_computation fiber bundle; + Fiber.set_computation fiber t.bundle; match fn t with | value -> await t fiber packed canceler; @@ -76,8 +78,10 @@ let fork_as_promise t thunk = (* The sequence of operations below ensures that nothing is leaked. *) incr t Backoff.default; let child = Computation.create ~mode:`LIFO () in + let fiber = Fiber.create ~forbid:false child in try - let canceler = Computation.attach_canceler ~from:t.bundle ~into:child in + let (Packed bundle) = t.bundle in + let canceler = Computation.attach_canceler ~from:bundle ~into:child in let main () = begin match thunk () with @@ -87,10 +91,12 @@ let fork_as_promise t thunk = Computation.cancel child exn_bt; error t exn_bt end; - Computation.detach t.bundle canceler; + Fiber.finalize fiber; + let (Packed bundle) = t.bundle in + Computation.detach bundle canceler; decr t in - Fiber.spawn ~forbid:false child [ main ]; + Fiber.spawn fiber main; child with canceled_exn -> (* We don't need to worry about detaching the [canceler], because at this @@ -102,6 +108,9 @@ let fork t thunk = fork_as_promise t thunk |> ignore (* *) -let is_running t = Computation.is_running t.bundle +let is_running t = + let (Packed bundle) = t.bundle in + Computation.is_running bundle + let unsafe_incr t = Atomic.incr t.num_fibers let unsafe_reset t = Atomic.set t.num_fibers 1 diff --git a/lib/picos_structured/run.ml b/lib/picos_structured/run.ml index af42ac3d7..de0d40514 100644 --- a/lib/picos_structured/run.ml +++ b/lib/picos_structured/run.ml @@ -1,32 +1,36 @@ open Picos -let wrap_all t main = - Bundle.unsafe_incr t; - fun () -> - if Bundle.is_running t then begin - try main () with exn -> Bundle.error t (Exn_bt.get exn) - end; - Bundle.decr t +let wrap_all t fiber main () = + if Bundle.is_running t then begin + try main () with exn -> Bundle.error t (Exn_bt.get exn) + end; + Fiber.finalize fiber; + Bundle.decr t -let wrap_any t main = - Bundle.unsafe_incr t; - fun () -> - if Bundle.is_running t then begin - try - main (); - Bundle.terminate t - with exn -> Bundle.error t (Exn_bt.get exn) - end; - Bundle.decr t +let wrap_any t fiber main () = + if Bundle.is_running t then begin + match main () with + | () -> Bundle.terminate t + | exception exn -> Bundle.error t (Exn_bt.get exn) + end; + Fiber.finalize fiber; + Bundle.decr t + +let rec spawn (t : Bundle.t) wrap = function + | [] -> () + | main :: mains -> + Bundle.unsafe_incr t; + let fiber = Fiber.create_packed ~forbid:false t.bundle in + Fiber.spawn fiber (wrap t fiber main); + spawn t wrap mains let run actions wrap = Bundle.join_after @@ fun t -> - try - let mains = List.map (wrap t) actions in - Fiber.spawn ~forbid:false t.bundle mains + try spawn t wrap actions with exn -> - Bundle.unsafe_reset t; - raise exn + let exn_bt = Exn_bt.get exn in + Bundle.decr t; + Bundle.error t exn_bt let all actions = run actions wrap_all let any actions = run actions wrap_any diff --git a/lib/picos_threaded/picos_threaded.ml b/lib/picos_threaded/picos_threaded.ml index 6b8a3635c..5f380e099 100644 --- a/lib/picos_threaded/picos_threaded.ml +++ b/lib/picos_threaded/picos_threaded.ml @@ -1,12 +1,15 @@ open Picos -type t = { fiber : Fiber.t; mutex : Mutex.t; condition : Condition.t } +type t = { + fiber : Fiber.t; + mutex : Mutex.t; + condition : Condition.t; + fatal_exn_handler : exn -> unit; +} -let create_packed ~forbid packed = - let fiber = Fiber.create_packed ~forbid packed in - let mutex = Mutex.create () in - let condition = Condition.create () in - { fiber; mutex; condition } +let create ~fatal_exn_handler fiber = + let mutex = Mutex.create () and condition = Condition.create () in + { fiber; mutex; condition; fatal_exn_handler } let rec block trigger t = if not (Trigger.is_signaled trigger) then begin @@ -45,6 +48,17 @@ let resume trigger t _ = end; Condition.broadcast t.condition +let default_fatal_exn_handler exn = + prerr_string "Fatal error: exception "; + prerr_string (Printexc.to_string exn); + prerr_char '\n'; + Printexc.print_backtrace stderr; + flush stderr; + exit 2 + +let finalize fatal_exn_handler fiber = + try Fiber.finalize fiber with exn -> fatal_exn_handler exn + let[@alert "-handler"] rec await t trigger = if Fiber.try_suspend t.fiber trigger t t resume then block trigger t; Fiber.canceled t.fiber @@ -67,79 +81,32 @@ and cancel_after : type a. _ -> a Computation.t -> _ = Fiber.check t.fiber; Select.cancel_after computation ~seconds exn_bt -and spawn : type a. _ -> forbid:bool -> a Computation.t -> _ = - fun t ~forbid computation mains -> +and spawn t fiber main = Fiber.check t.fiber; - let packed = Computation.Packed computation in - match mains with - | [ main ] -> - Thread.create - (fun () -> - (* We need to (recursively) install the handler on each new thread - that we create. *) - Handler.using handler (create_packed ~forbid packed) main) - () - |> ignore - | mains -> begin - (* We try to be careful to implement the all-or-nothing behaviour based on - the assumption that we may run out of threads well before we run out of - memory. In a thread pool based scheduler this should actually not - require special treatment. *) - let all_or_nothing = ref `Wait in - match - mains - |> List.iter @@ fun main -> - Thread.create - (fun () -> - if !all_or_nothing == `Wait then begin - Mutex.lock t.mutex; - match - while - match !all_or_nothing with - | `Wait -> - Condition.wait t.condition t.mutex; - true - | `All | `Nothing -> false - do - () - done - with - | () -> Mutex.unlock t.mutex - | exception async_exn -> - (* Condition.wait may be interrupted by asynchronous - exceptions and we must make sure to unlock even in that - case. *) - Mutex.unlock t.mutex; - raise async_exn - end; - if !all_or_nothing == `All then - (* We need to (recursively) install the handler on each new - thread that we create. *) - Handler.using handler (create_packed ~forbid packed) main) - () - |> ignore - with - | () -> - Mutex.lock t.mutex; - all_or_nothing := `All; - Mutex.unlock t.mutex; - Condition.broadcast t.condition - | exception exn -> - Mutex.lock t.mutex; - all_or_nothing := `Nothing; - Mutex.unlock t.mutex; - Condition.broadcast t.condition; - raise exn - end + Fiber.initialize ~parent:t.fiber ~child:fiber; + match Thread.create start (fiber, t.fatal_exn_handler, main) with + | _ -> ( (* We assume that [main] is now guaranteed to be called. *) ) + | exception exn -> + let exn_bt = Exn_bt.get exn in + (* [main] wasn't called, so we need to finalize. *) + let (Packed computation) = Fiber.get_computation fiber in + Computation.cancel computation exn_bt; + finalize t.fatal_exn_handler fiber; + Exn_bt.raise exn_bt and handler = Handler.{ current; spawn; yield; cancel_after; await } -let run ?(forbid = false) main = +and start (fiber, fatal_exn_handler, main) = + (* We need to install the handler on each new thread that we create. *) + try Handler.using handler (create ~fatal_exn_handler fiber) main + with exn -> fatal_exn_handler exn + +let run ?(forbid = false) ?(fatal_exn_handler = default_fatal_exn_handler) main + = Select.check_configured (); let computation = Computation.create ~mode:`LIFO () in - let context = create_packed ~forbid (Packed computation) in - let main () = - Computation.capture computation main (); - Computation.await computation - in - Handler.using handler context main + let fiber = Fiber.create ~forbid computation in + let main = Computation.capture computation main in + Handler.using handler (create ~fatal_exn_handler fiber) main; + finalize fatal_exn_handler fiber; + Computation.await computation diff --git a/lib/picos_threaded/picos_threaded.mli b/lib/picos_threaded/picos_threaded.mli index cdab39864..70e07103d 100644 --- a/lib/picos_threaded/picos_threaded.mli +++ b/lib/picos_threaded/picos_threaded.mli @@ -24,7 +24,7 @@ OCaml 5 a scheduler that implements an effect handler directly is likely to perform better. *) -val run : ?forbid:bool -> (unit -> 'a) -> 'a +val run : ?forbid:bool -> ?fatal_exn_handler:(exn -> unit) -> (unit -> 'a) -> 'a (** [run main] runs the [main] thunk with the scheduler. Returns after [main] and all of the fibers spawned by [main] have returned. diff --git a/test/dune b/test/dune index dcc6b2f0f..a28ec5ed9 100644 --- a/test/dune +++ b/test/dune @@ -181,3 +181,10 @@ cohttp-lwt-unix conduit-lwt-unix uri)) + +;; + +(test + (name test_meow) + (modules test_meow) + (libraries test_scheduler meow alcotest)) diff --git a/test/lib/meow/dllist.ml b/test/lib/meow/dllist.ml new file mode 100644 index 000000000..e155f1ed1 --- /dev/null +++ b/test/lib/meow/dllist.ml @@ -0,0 +1,43 @@ +type 'a node = { mutable lhs : 'a node; mutable rhs : 'a node; value : 'a } +type 'a t = 'a node + +let new_node value = + let node = { lhs = Obj.magic (); rhs = Obj.magic (); value } in + node.lhs <- node; + node.rhs <- node; + node + +let create () = new_node (Obj.magic ()) +let is_empty t = t.lhs == t +let value node = node.value + +let remove node = + let lhs = node.lhs in + if lhs != node then begin + let rhs = node.rhs in + lhs.rhs <- rhs; + rhs.lhs <- lhs; + node.lhs <- node; + node.rhs <- node + end + +let move_l t node = + let lhs = node.lhs in + if lhs != node then begin + let rhs = node.rhs in + lhs.rhs <- rhs; + rhs.lhs <- lhs + end; + let lhs = t.lhs in + lhs.rhs <- node; + node.lhs <- lhs; + t.lhs <- node; + node.rhs <- t + +let rec iter_l action t node = + if node != t then begin + action node.value; + iter_l action t node.lhs + end + +let iter_l action t = iter_l action t t.lhs diff --git a/test/lib/meow/dllist.mli b/test/lib/meow/dllist.mli new file mode 100644 index 000000000..d095dd3f3 --- /dev/null +++ b/test/lib/meow/dllist.mli @@ -0,0 +1,10 @@ +type !'a t +type !'a node + +val create : unit -> 'a t +val is_empty : 'a t -> bool +val new_node : 'a -> 'a node +val value : 'a node -> 'a +val remove : 'a node -> unit +val move_l : 'a t -> 'a node -> unit +val iter_l : ('a -> unit) -> 'a t -> unit diff --git a/test/lib/meow/dune b/test/lib/meow/dune new file mode 100644 index 000000000..46425fe5a --- /dev/null +++ b/test/lib/meow/dune @@ -0,0 +1,3 @@ +(library + (name meow) + (libraries picos)) diff --git a/test/lib/meow/meow.ml b/test/lib/meow/meow.ml new file mode 100644 index 000000000..71edfc281 --- /dev/null +++ b/test/lib/meow/meow.ml @@ -0,0 +1,2 @@ +module Ownership = Ownership +module Promise = Promise diff --git a/test/lib/meow/meow.mli b/test/lib/meow/meow.mli new file mode 100644 index 000000000..39879cb4d --- /dev/null +++ b/test/lib/meow/meow.mli @@ -0,0 +1,16 @@ +module Ownership : sig + type t + + val create : finally:('a -> unit) -> 'a -> t + val own : t -> unit + val check : t -> unit + val disown : t -> unit + val bless : t -> unit +end + +module Promise : sig + type !'a t + + val async : ?give:Ownership.t list -> (unit -> 'a) -> 'a t + val await : 'a t -> 'a +end diff --git a/test/lib/meow/ownership.ml b/test/lib/meow/ownership.ml new file mode 100644 index 000000000..e3a795dd5 --- /dev/null +++ b/test/lib/meow/ownership.ml @@ -0,0 +1,115 @@ +open Picos + +exception Resource_leaked +exception Not_owner +exception Parent_is_dead + +type resource = + | Resource : { + finally : 'a -> unit; + value : 'a; + mutable owner : Fiber.Maybe.t; + } + -> resource + +type t = resource Dllist.node + +let finalize (Resource r) = r.finally r.value + +let owned_key = + let finalize resources = + if not (Dllist.is_empty resources) then + let fiber = Fiber.current () in + if Fiber.is_canceled fiber then Dllist.iter_l finalize resources + else raise Resource_leaked + in + Fiber.FLS.new_key ~finalize (Computed Dllist.create) + +let own_as resource fiber = + let (Resource r) = Dllist.value resource in + if r.owner != Fiber.Maybe.nothing then + invalid_arg "Resource already owned by some fiber"; + r.owner <- Fiber.Maybe.of_fiber fiber; + let owned = Fiber.FLS.get fiber owned_key in + Dllist.move_l owned resource + +let own resource = own_as resource (Fiber.current ()) + +let create ~finally value = + Dllist.new_node (Resource { finally; value; owner = Fiber.Maybe.nothing }) + +type _ tdt = + | Finalized : [> `Finalized ] tdt + | Nil : [> `Nil ] tdt + | Blessed : { + resource : t; + next : [ `Nil | `Blessed ] tdt; + } + -> [> `Blessed ] tdt + +let rec iter action = function + | Nil -> () + | Blessed r -> + action r.resource; + iter action r.next + +let blessed_key : [ `Finalized | `Nil | `Blessed ] tdt Atomic.t Fiber.FLS.key = + let finalize t = + match Atomic.exchange t Finalized with + | Finalized -> () + | (Nil as resources) | (Blessed _ as resources) -> + resources |> iter @@ fun node -> finalize (Dllist.value node) + in + Fiber.FLS.new_key ~finalize (Computed (fun () -> Atomic.make Nil)) + +let[@inline never] accept_as fiber blessed = + match Atomic.exchange blessed Nil with + | Finalized -> failwith "accept after finalize" + | (Nil as resources) | (Blessed _ as resources) -> + let owned = Fiber.FLS.get fiber owned_key in + resources + |> iter @@ fun node -> + let (Resource r) = Dllist.value node in + r.owner <- Fiber.Maybe.of_fiber fiber; + Dllist.move_l owned node + +let[@inline] accept_as fiber = + let blessed = Fiber.FLS.get fiber blessed_key in + if Atomic.get blessed != Nil then accept_as fiber blessed + +let check_as resource fiber = + accept_as fiber; + let (Resource r) = Dllist.value resource in + if Fiber.Maybe.unequal r.owner (Fiber.Maybe.of_fiber fiber) then + raise Not_owner + +let check resource = check_as resource (Fiber.current ()) + +let disown_as resource fiber = + check_as resource fiber; + Dllist.remove resource; + let (Resource r) = Dllist.value resource in + r.owner <- Fiber.Maybe.nothing + +let disown resource = disown_as resource (Fiber.current ()) + +let parent_key = + let initialize parent = Fiber.FLS.get parent blessed_key + and root () = invalid_arg "Root fiber has no parent" in + Fiber.FLS.new_key ~initialize (Computed root) + +let bless resource = + let fiber = Fiber.current () in + disown_as resource fiber; + let parent = Fiber.FLS.get fiber parent_key in + let rec loop parent resource = + match Atomic.get parent with + | Finalized -> + own_as resource fiber; + raise Parent_is_dead + | (Nil as before) | (Blessed _ as before) -> + let after = Blessed { resource; next = before } in + if not (Atomic.compare_and_set parent before after) then + loop parent resource + in + loop parent resource diff --git a/test/lib/meow/promise.ml b/test/lib/meow/promise.ml new file mode 100644 index 000000000..bccce8d48 --- /dev/null +++ b/test/lib/meow/promise.ml @@ -0,0 +1,48 @@ +open Picos + +exception Not_a_child +exception Still_has_children + +type 'a t = { + computation : 'a Computation.t; + finalized : Trigger.t; + parent : Fiber.t; +} + +let num_children_key = + let finalize n = if n != 0 then raise Still_has_children in + Fiber.FLS.new_key ~finalize (Constant 0) + +let async ?give main = + let t = + let computation = Computation.create () in + let finalized = Trigger.create () in + let parent = Fiber.current () in + { computation; finalized; parent } + in + let canceler = + let (Packed from) = Fiber.get_computation t.parent in + Computation.attach_canceler ~from ~into:t.computation (* may raise *) + in + Fiber.FLS.set t.parent num_children_key + (Fiber.FLS.get t.parent num_children_key + 1); + let child = Fiber.create ~forbid:false t.computation in + give |> Option.iter @@ List.iter (fun node -> Ownership.own_as node child); + let main () = + Computation.capture t.computation main (); + Fiber.finalize (Fiber.current ()); + let (Packed from) = Fiber.get_computation t.parent in + Computation.detach from canceler; + Trigger.signal t.finalized + in + Fiber.spawn child main; + t + +let await t = + if Fiber.current () != t.parent then raise Not_a_child; + let result = Trigger.await t.finalized in + Fiber.FLS.set t.parent num_children_key + (Fiber.FLS.get t.parent num_children_key - 1); + match result with + | None -> Computation.await t.computation + | Some exn_bt -> Exn_bt.raise exn_bt diff --git a/test/test_meow.ml b/test/test_meow.ml new file mode 100644 index 000000000..e831ef91b --- /dev/null +++ b/test/test_meow.ml @@ -0,0 +1,22 @@ +open Meow + +let test_ownership_finalized_on_error () = + let resource = ref 1 in + Test_scheduler.run @@ fun () -> + let promise = + Promise.async ~give:[ Ownership.create ~finally:decr resource ] @@ fun () -> + raise Not_found + in + match Promise.await promise with + | () -> assert false + | exception Not_found -> assert (!resource = 0) + +let () = + [ + ( "Ownership", + [ + Alcotest.test_case "finalized on error" `Quick + test_ownership_finalized_on_error; + ] ); + ] + |> Alcotest.run "Meow" diff --git a/test/test_picos.ml b/test/test_picos.ml index efa904656..7a46530ae 100644 --- a/test/test_picos.ml +++ b/test/test_picos.ml @@ -3,7 +3,9 @@ open Picos_structured.Finally let run_in_fiber main = let computation = Computation.create () in - Fiber.spawn ~forbid:false computation [ Computation.capture computation main ]; + Fiber.spawn + (Fiber.create ~forbid:false computation) + (Computation.capture computation main); Computation.await computation let test_fls_basics = @@ -96,7 +98,7 @@ let test_thread_cancelation () = Fiber.yield () done in - Fiber.spawn ~forbid:false computation [ main ]; + Fiber.spawn (Fiber.create ~forbid:false computation) main; result in Computation.cancel computation (Exn_bt.get_callstack 0 Exit) @@ -111,7 +113,7 @@ let test_cancel_after () = Fiber.yield () done in - Fiber.spawn ~forbid:false computation [ main ]; + Fiber.spawn (Fiber.create ~forbid:false computation) main; Computation.cancel_after computation ~seconds:0.01 (Exn_bt.get_callstack 0 Not_found); Computation.await computation diff --git a/test/test_scheduler.ocaml4.ml b/test/test_scheduler.ocaml4.ml index 63b6cf233..dfc92afb0 100644 --- a/test/test_scheduler.ocaml4.ml +++ b/test/test_scheduler.ocaml4.ml @@ -15,5 +15,5 @@ let () = in propagate () -let run ?max_domains:_ ?allow_lwt:_ ?forbid main = - Picos_threaded.run ?forbid main +let run ?max_domains:_ ?allow_lwt:_ ?fatal_exn_handler ?forbid main = + Picos_threaded.run ?forbid ?fatal_exn_handler main diff --git a/test/test_scheduler.ocaml5.ml b/test/test_scheduler.ocaml5.ml index b0bb2fee4..e05599a8e 100644 --- a/test/test_scheduler.ocaml5.ml +++ b/test/test_scheduler.ocaml5.ml @@ -17,17 +17,31 @@ let () = in propagate () -let rec run ?(max_domains = 1) ?(allow_lwt = true) ?forbid main = +let rec run ?(max_domains = 1) ?(allow_lwt = true) ?fatal_exn_handler ?forbid + main = let scheduler = match Random.int 3 with 0 -> `Fifos | 1 -> `Randos | _ -> `Lwt in match scheduler with | `Lwt -> - if Picos_thread.is_main_thread () && allow_lwt then - Lwt_main.run (Picos_lwt_unix.run ?forbid main) + if Picos_thread.is_main_thread () && allow_lwt then begin + let old_hook = !Lwt.async_exception_hook in + begin + match fatal_exn_handler with + | None -> () + | Some hook -> Lwt.async_exception_hook := hook + end; + match Lwt_main.run (Picos_lwt_unix.run ?forbid main) with + | result -> + Lwt.async_exception_hook := old_hook; + result + | exception exn -> + Lwt.async_exception_hook := old_hook; + raise exn + end else run ~max_domains ~allow_lwt ?forbid main | `Randos -> - let context = Picos_randos.context () in + let context = Picos_randos.context ?fatal_exn_handler () in let rec spawn n = if n <= 1 then Picos_randos.run ~context ?forbid main else @@ -43,4 +57,4 @@ let rec run ?(max_domains = 1) ?(allow_lwt = true) ?forbid main = spawn (n - 1) in spawn (Int.min max_domains (Domain.recommended_domain_count ())) - | `Fifos -> Picos_fifos.run ?forbid main + | `Fifos -> Picos_fifos.run ?fatal_exn_handler ?forbid main diff --git a/test/test_schedulers.ml b/test/test_schedulers.ml index 2ff83c0b1..03d18dc74 100644 --- a/test/test_schedulers.ml +++ b/test/test_schedulers.ml @@ -55,15 +55,40 @@ let test_cancel_after_long_timeout () = | () -> Computation.finish computation | exception Invalid_argument _ -> () +let test_fatal () = + match + let computation = Computation.create () in + let fatal_exn_handler exn = + Computation.cancel computation (Exn_bt.get exn); + raise exn + in + Test_scheduler.run ~fatal_exn_handler ~max_domains:3 @@ fun () -> + for _ = 1 to 100 do + Fiber.spawn (Fiber.create ~forbid:false computation) @@ fun () -> + while true do + Fiber.yield () + done + done; + Fiber.spawn (Fiber.create ~forbid:false computation) (fun () -> + failwith "fatal"); + Computation.await computation + with + | _ -> assert false + | exception Failure msg -> assert (msg = "fatal") + let () = [ - ("Returns", [ Alcotest.test_case "" `Quick test_returns ]); - ("Completes", [ Alcotest.test_case "" `Quick test_completes ]); + ("Trivial main returns", [ Alcotest.test_case "" `Quick test_returns ]); + ( "Scheduler completes main computation", + [ Alcotest.test_case "" `Quick test_completes ] ); ("Current", [ Alcotest.test_case "" `Quick test_current ]); ( "Cancel_after", [ Alcotest.test_case "basic" `Quick test_cancel_after_basic; Alcotest.test_case "long timeout" `Quick test_cancel_after_long_timeout; ] ); + (* The fatal exn test must be kept last. *) + ( "Fatal exception terminates scheduler", + [ Alcotest.test_case "" `Quick test_fatal ] ); ] |> Alcotest.run "Picos schedulers" diff --git a/test/test_sync.ml b/test/test_sync.ml index 1a5ea5670..e92cecb5d 100644 --- a/test/test_sync.ml +++ b/test/test_sync.ml @@ -7,8 +7,9 @@ module Fiber = struct let start thunk = let computation = Computation.create () in - Fiber.spawn ~forbid:false computation - [ Computation.capture computation thunk ]; + Fiber.spawn + (Fiber.create ~forbid:false computation) + (Computation.capture computation thunk); computation end @@ -33,8 +34,9 @@ let test_mutex_and_condition_basics () = Condition.wait condition mutex); if 1 = Atomic.fetch_and_add n (-1) then Computation.finish test in - Fiber.spawn ~forbid:false computation - (List.init (Atomic.get n) @@ fun _ -> main); + for _ = 1 to Atomic.get n do + Fiber.spawn (Fiber.create ~forbid:false computation) main + done; while Computation.is_running test do Fiber.yield (); @@ -128,8 +130,9 @@ let test_mutex_and_condition_cancelation () = while Array.exists (fun step -> Atomic.get step < limit) steps do let finished = Trigger.create () in let checked = if Random.State.bool state then None else some_false in - Fiber.spawn ~forbid:false (Computation.create ()) - [ attempt i finished ?checked ]; + Fiber.spawn + (Fiber.create ~forbid:false (Computation.create ())) + (attempt i finished ?checked); Trigger.await finished |> ignore done in @@ -186,7 +189,7 @@ let test_lazy_cancelation () = in let computation = Computation.create () in Computation.cancel computation (Exn_bt.get_callstack 0 Exit); - Fiber.spawn ~forbid:false computation [ main ]; + Fiber.spawn (Fiber.create ~forbid:false computation) main; while not (Atomic.get tried) do Fiber.yield () done;