Skip to content

Commit

Permalink
Redesign Spawn and FLS
Browse files Browse the repository at this point in the history
This PR redesigns the `Spawn` and `FLS` mechanism to better support new use
cases.
  • Loading branch information
polytypic committed Jul 25, 2024
1 parent 600233d commit 8dee4fc
Show file tree
Hide file tree
Showing 30 changed files with 631 additions and 250 deletions.
2 changes: 1 addition & 1 deletion bench/bench_fib.ocaml5.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ let rec exp_fib i =
else
let computation = Computation.create () in
let main () = Computation.return computation (exp_fib (i - 2)) in
Fiber.spawn ~forbid:false computation [ main ];
Fiber.spawn (Fiber.create ~forbid:false computation) main;
let f1 = exp_fib (i - 1) in
let f2 = Computation.await computation in
f1 + f2
Expand Down
28 changes: 13 additions & 15 deletions bench/bench_spawn.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,31 @@ let factor =
Util.iter_factor
* if String.starts_with ~prefix:"4." Sys.ocaml_version then 1 else 10

let run_one ~budgetf ~at_a_time () =
let run_one ~budgetf () =
let n_spawns = 10 * factor in

let init _ = () in
let wrap _ () = Scheduler.run in
let work _ () =
let counter = ref n_spawns in
let computation = Computation.create () in
for _ = 1 to n_spawns / at_a_time do
let main () =
let n = !counter - 1 in
counter := n;
if n = 0 then Computation.finish computation
in
let mains = List.init at_a_time @@ fun _ -> main in
Fiber.spawn ~forbid:false computation mains
let computation_packed = Computation.Packed computation in
let main () =
let n = !counter - 1 in
counter := n;
if n = 0 then Computation.finish computation;
Fiber.finalize (Fiber.current ())
in
for _ = 1 to n_spawns do
let fiber = Fiber.create_packed ~forbid:false computation_packed in
Fiber.spawn fiber main
done;
Computation.await computation
in

let config = Printf.sprintf "%d at a time" at_a_time in
let config = "with packed computation" in
Times.record ~budgetf ~n_domains:1 ~n_warmups:1 ~n_runs_min:1 ~init ~wrap
~work ()
|> Times.to_thruput_metrics ~n:n_spawns ~singular:"spawn" ~config

let run_suite ~budgetf =
if Sys.int_size <= 32 then []
else
[ 1; 2; 4; 8 ]
|> List.concat_map @@ fun at_a_time -> run_one ~budgetf ~at_a_time ()
let run_suite ~budgetf = if Sys.int_size <= 32 then [] else run_one ~budgetf ()
96 changes: 88 additions & 8 deletions lib/picos/bootstrap/picos_bootstrap.ml
Original file line number Diff line number Diff line change
Expand Up @@ -331,13 +331,60 @@ module Fiber = struct

type 'a initial = Constant of 'a | Computed of (unit -> 'a)

let new_key initial =
type finalizers =
| Nil
| Finalizer : {
index : int;
finalize : 'a -> unit;
next : finalizers;
}
-> finalizers

let finalizers = Atomic.make Nil

let rec add_finalizer index finalize =
let before = Atomic.get finalizers in
let after = Finalizer { index; finalize; next = before } in
if not (Atomic.compare_and_set finalizers before after) then
add_finalizer index finalize

type initializers =
| Nil
| Initializer : {
key : 'a key;
initialize : t -> 'a;
next : initializers;
}
-> initializers

let initializers = Atomic.make Nil

let rec add_initializer key initialize =
let before = Atomic.get initializers in
let after = Initializer { key; initialize; next = before } in
if not (Atomic.compare_and_set initializers before after) then
add_initializer key initialize

let new_key ?finalize ?initialize initial =
let index = Atomic.fetch_and_add counter 1 in
match initial with
| Constant default ->
let default = Sys.opaque_identity (Obj.magic default : non_float) in
{ index; default; compute }
| Computed compute -> { index; default = unique; compute }
begin
match finalize with
| None -> ()
| Some finalize -> add_finalizer index finalize
end;
let key =
match initial with
| Constant default ->
let default = Sys.opaque_identity (Obj.magic default : non_float) in
{ index; default; compute }
| Computed compute -> { index; default = unique; compute }
in
begin
match initialize with
| None -> ()
| Some initialize -> add_initializer key initialize
end;
key

let get (type a) (Fiber r : t) (key : a key) =
let fls = r.fls in
Expand Down Expand Up @@ -369,6 +416,13 @@ module Fiber = struct
(Sys.opaque_identity (Obj.magic value : non_float));
value

let[@inline] has (Fiber r : t) key =
let fls = r.fls in
key.index < Array.length fls
&&
let value = Array.unsafe_get fls key.index in
value != unique

let set (type a) (Fiber r : t) (key : a key) (value : a) =
let fls = r.fls in
if key.index < Array.length fls then
Expand All @@ -379,14 +433,40 @@ module Fiber = struct
r.fls <- fls;
Array.unsafe_set fls key.index
(Sys.opaque_identity (Obj.magic value : non_float))

let rec finalize fls = function
| Finalizer r ->
if r.index < Array.length fls then begin
let value = Array.unsafe_get fls r.index in
if value != unique then r.finalize (Obj.magic value)
end;
finalize fls r.next
| Nil -> ()

let rec initialize ~parent ~child = function
| Initializer r ->
if not (has child r.key) then set child r.key (r.initialize parent);
initialize ~parent ~child r.next
| Nil -> ()
end

let[@inline] finalize (Fiber t : t) =
let fls = t.fls in
if 0 < Array.length fls then FLS.finalize fls (Atomic.get FLS.finalizers)

let[@inline] initialize ~parent ~child =
match Atomic.get FLS.initializers with
| Initializer r ->
if not (FLS.has child r.key) then
FLS.set child r.key (r.initialize parent);
FLS.initialize ~parent ~child r.next
| Nil -> ()
end

module Handler = struct
type 'c t = {
current : 'c -> Fiber.t;
spawn :
'a. 'c -> forbid:bool -> 'a Computation.t -> (unit -> unit) list -> unit;
spawn : 'c -> Fiber.t -> (unit -> unit) -> unit;
yield : 'c -> unit;
cancel_after :
'a. 'c -> 'a Computation.t -> seconds:float -> Exn_bt.t -> unit;
Expand Down
17 changes: 6 additions & 11 deletions lib/picos/intf.ocaml5.ml
Original file line number Diff line number Diff line change
Expand Up @@ -132,24 +132,19 @@ module type Fiber = sig
associated with the fiber has been canceled the scheduler is free to
discontinue the fiber immediately before spawning new fibers.
The scheduler is free to run the newly created fibers on any domain and
The scheduler is free to run the newly created fiber on any domain and
decide which fiber to give priority to.
⚠️ The scheduler should guarantee that, when [Spawn] returns normally, all
of the [mains] will eventually be called by the scheduler and, when
[Spawn] raises an exception, none of the [mains] will be called. In other
words, [Spawn] should check cancelation just once and be all or nothing.
⚠️ The scheduler should guarantee that, when [Spawn] returns normally, the
given [main] will eventually be called by the scheduler and, when [Spawn]
raises an exception, the [main] will not be called. In other words,
[Spawn] should check cancelation just once and be all or nothing.
Furthermore, in case a newly spawned fiber is canceled before its main is
called, the scheduler must still call the main. This allows a program to
ensure, i.e. keep track of, that all fibers it spawns are terminated
properly and any resources transmitted to spawned fibers will be disposed
properly. *)
type _ Effect.t +=
private
| Spawn : {
forbid : bool;
computation : 'a computation;
mains : (unit -> unit) list;
}
-> unit Effect.t
| Spawn : { fiber : t; main : unit -> unit } -> unit Effect.t
end
16 changes: 8 additions & 8 deletions lib/picos/ocaml4/picos_ocaml.ml
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
open Picos_bootstrap

let error () =
let[@inline never] error _ =
raise (Sys_error "Picos.Handler.using not called for current thread")

module Handler = struct
type entry = E : { context : 'a; handler : 'a Handler.t } -> entry

let default =
let current _ = error ()
and spawn _ ~forbid:_ _ _ = error ()
and yield _ = error ()
and cancel_after _ _ ~seconds:_ _ = error ()
and await _ _ = error () in
let current = error
and spawn _ _ = error
and yield = error
and cancel_after _ _ ~seconds:_ = error
and await _ = error in
E { context = (); handler = { current; spawn; yield; cancel_after; await } }

let key = Picos_thread.TLS.new_key @@ fun () -> default
Expand Down Expand Up @@ -42,9 +42,9 @@ module Fiber = struct
let (E r) = Handler.get () in
r.handler.current r.context

let spawn ~forbid computation mains =
let spawn fiber main =
let (E r) = Handler.get () in
r.handler.spawn r.context ~forbid computation mains
r.handler.spawn r.context fiber main

let yield () =
let (E r) = Handler.get () in
Expand Down
12 changes: 3 additions & 9 deletions lib/picos/ocaml5/picos_ocaml.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,9 @@ module Fiber = struct
let current () = Effect.perform Current

type _ Effect.t +=
| Spawn : {
forbid : bool;
computation : 'a Computation.t;
mains : (unit -> unit) list;
}
-> unit Effect.t
| Spawn : { fiber : Fiber.t; main : unit -> unit } -> unit Effect.t

let spawn ~forbid computation mains =
Effect.perform @@ Spawn { forbid; computation; mains }
let spawn fiber main = Effect.perform @@ Spawn { fiber; main }

type _ Effect.t += Yield : unit Effect.t

Expand Down Expand Up @@ -78,7 +72,7 @@ module Handler = struct
| Fiber.Spawn r ->
Some
(fun k ->
match h.spawn c ~forbid:r.forbid r.computation r.mains with
match h.spawn c r.fiber r.main with
| unit -> Effect.Deep.continue k unit
| exception exn -> discontinue k exn)
| Fiber.Yield -> yield
Expand Down
70 changes: 37 additions & 33 deletions lib/picos/picos.mli
Original file line number Diff line number Diff line change
Expand Up @@ -757,30 +757,6 @@ module Fiber : sig
(** [sleep ~seconds] suspends the current fiber for the specified number of
[seconds]. *)

(** {2 Interface for spawning} *)

val spawn : forbid:bool -> 'a Computation.t -> (unit -> unit) list -> unit
(** [spawn ~forbid computation mains] starts new fibers by performing the
{!Spawn} effect. The fibers will share the same [computation] and start
with {{!Fiber.has_forbidden} propagation of cancelation forbidden or
permitted} depending on the [forbid] flag.
ℹ️ Any {{!Computation} computation}, including the computation of the
current fiber, may be passed as the computation for new fibers. Higher
level libraries are free to implement the desired structuring principles.
⚠️ Behavior is undefined if any function in [mains] raises an exception.
For example, raising an exception might terminate the whole application
(recommended, but not required) or the exception might be ignored. In
other words, the caller {i must} arrange for the computation to be
completed and errors reported in a desired manner.
ℹ️ The behavior is that
- on OCaml 5, [spawn] performs the {!Spawn} effect, and
- on OCaml 4, [spawn] will call the [spawn] operation of the {{!Handler}
current handler}. *)

(** {2 Interface for current fiber} *)

type t
Expand Down Expand Up @@ -943,7 +919,8 @@ module Fiber : sig
(** Type to specify initial values for fibers. *)
type 'a initial = Constant of 'a | Computed of (unit -> 'a)

val new_key : 'a initial -> 'a key
val new_key :
?finalize:('a -> unit) -> ?initialize:(t -> 'a) -> 'a initial -> 'a key
(** [new_key initial] allocates a new key for associating values in storage
associated with fibers. The [initial] value for every fiber is either
the given {!Constant} or is {!Computed} with the given function. If the
Expand All @@ -965,6 +942,38 @@ module Fiber : sig
⚠️ It is only safe to call [set] from the fiber itself. *)
end

(** {2 Interface for spawning} *)

val create_packed : forbid:bool -> Computation.packed -> t
(** [create_packed ~forbid packed] creates a new fiber record. *)

val create : forbid:bool -> 'a Computation.t -> t
(** [create ~forbid computation] is equivalent to
{{!create_packed} [create_packed ~forbid (Computation.Packed computation)]}. *)

val finalize : t -> unit
(** *)

val spawn : t -> (unit -> unit) -> unit
(** [spawn fiber main] starts a new fiber by performing the {!Spawn} effect.
⚠️ Fiber records must be unique and the caller of [spawn] must make sure
that a specific {{!fiber} fiber} record is not reused. Failure to ensure
that fiber records are unique will break concurrent abstractions written
on top the the Picos interface.
⚠️ Behavior is undefined if the [main] function raises an exception. For
example, raising an exception might terminate the whole application
(recommended, but not required) or the exception might be ignored. In
other words, the caller {i must} arrange for any exceptions to be handled
in a desired manner.
ℹ️ The behavior is that
- on OCaml 5, [spawn] performs the {!Spawn} effect, and
- on OCaml 4, [spawn] will call the [spawn] operation of the {{!Handler}
current handler}. *)

(** {2 Interface for structuring} *)

val get_computation : t -> Computation.packed
Expand Down Expand Up @@ -1051,12 +1060,8 @@ module Fiber : sig

(** {2 Interface for schedulers} *)

val create_packed : forbid:bool -> Computation.packed -> t
(** [create_packed ~forbid packed] creates a new fiber. *)

val create : forbid:bool -> 'a Computation.t -> t
(** [create ~forbid computation] is equivalent to
{{!create_packed} [create_packed ~forbid (Computation.Packed computation)]}. *)
val initialize : parent:t -> child:t -> unit
(** *)

val try_suspend :
t -> Trigger.t -> 'x -> 'y -> (Trigger.t -> 'x -> 'y -> unit) -> bool
Expand Down Expand Up @@ -1115,8 +1120,7 @@ module Handler : sig

type 'c t = {
current : 'c -> Fiber.t; (** See {!Picos.Fiber.current}. *)
spawn :
'a. 'c -> forbid:bool -> 'a Computation.t -> (unit -> unit) list -> unit;
spawn : 'c -> Fiber.t -> (unit -> unit) -> unit;
(** See {!Picos.Fiber.spawn}. *)
yield : 'c -> unit; (** See {!Picos.Fiber.yield}. *)
cancel_after :
Expand Down
Loading

0 comments on commit 8dee4fc

Please sign in to comment.