Skip to content

Commit

Permalink
Add more structured Run operations
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed Dec 15, 2024
1 parent 84d5428 commit 9dc05dc
Show file tree
Hide file tree
Showing 15 changed files with 661 additions and 308 deletions.
2 changes: 1 addition & 1 deletion .ocamlformat
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
profile = default
version = 0.26.2
version = 0.27.0

exp-grouping=preserve
1 change: 1 addition & 0 deletions bench/bench_run.ocaml4.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
let run_suite ~budgetf:_ = []
28 changes: 28 additions & 0 deletions bench/bench_run.ocaml5.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
open Multicore_bench
open Picos_std_structured
module Multififo = Picos_mux_multififo

let run_one_multififo ~budgetf ~n_domains ~n () =
let context = ref (Obj.magic ()) in

let before _ = context := Multififo.context () in
let init _ = !context in
let work i context =
if i <> 0 then Multififo.runner_on_this_thread context
else ignore @@ Multififo.run ~context @@ fun () -> Run.for_n n ignore
in

let config =
Printf.sprintf "%d mfifo%s, run_n %d" n_domains
(if n_domains = 1 then "" else "s")
n
in
Times.record ~budgetf ~n_domains ~before ~init ~work ()
|> Times.to_thruput_metrics ~n ~singular:"ignore" ~config

let run_suite ~budgetf =
Util.cross [ 1; 2; 4; 8 ]
[ 100; 1_000; 10_000; 100_000; 1_000_000; 10_000_000 ]
|> List.concat_map @@ fun (n_domains, n) ->
if Picos_domain.recommended_domain_count () < n_domains then []
else run_one_multififo ~budgetf ~n_domains ~n ()
6 changes: 6 additions & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
(run %{test} -brief "Picos binaries")
(run %{test} -brief "Bounded_q with Picos_sync")
(run %{test} -brief "Memory usage")
(run %{test} -brief "Picos_std_structured.Run")
;;
))
(foreign_stubs
Expand All @@ -49,6 +50,11 @@
from
(picos_mux.fifo -> scheduler.ocaml5.ml)
(picos_mux.thread -> scheduler.ocaml4.ml))
(select
bench_run.ml
from
(picos_mux.multififo -> bench_run.ocaml5.ml)
(-> bench_run.ocaml4.ml))
(select
bench_fib.ml
from
Expand Down
1 change: 1 addition & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ let benchmarks =
("Picos binaries", Bench_binaries.run_suite);
("Bounded_q with Picos_sync", Bench_bounded_q.run_suite);
("Memory usage", Bench_memory.run_suite);
("Picos_std_structured.Run", Bench_run.run_suite);
]

let () = Multicore_bench.Cmd.run ~benchmarks ()
65 changes: 43 additions & 22 deletions lib/picos_std.structured/bundle.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ type _ tdt =
}
-> [> `Bundle ] tdt

let config_terminated_bit = 0x01
and config_callstack_mask = 0x3E
and config_callstack_shift = 1
and config_one = 0x40 (* memory runs out before overflow *)
let config_on_return_terminate_bit = 0x01
and config_on_terminate_raise_bit = 0x02
and config_callstack_mask = 0x6C
and config_callstack_shift = 2
and config_one = 0x80 (* memory runs out before overflow *)

let flock_key : [ `Bundle | `Nothing ] tdt Fiber.FLS.t = Fiber.FLS.create ()

Expand All @@ -35,19 +36,23 @@ let error ?callstack (Bundle r as t : t) exn bt =
terminate ?callstack t;
Control.Errors.push r.errors exn bt
end
else if Atomic.get r.config land config_on_terminate_raise_bit <> 0 then
terminate ?callstack t

let decr (Bundle r : t) =
let n = Atomic.fetch_and_add r.config (-config_one) in
if n < config_one * 2 then begin
let (Packed bundle) = r.bundle in
Computation.cancel bundle Control.Terminate Control.empty_bt;
Trigger.signal r.finished
end

type _ pass = FLS : unit pass | Arg : t pass

let[@inline never] no_flock () = invalid_arg "no flock"

let[@inline] on_terminate = function
| None | Some `Ignore -> `Ignore
| Some `Raise -> `Raise

let get_flock fiber =
match Fiber.FLS.get fiber flock_key ~default:Nothing with
| Bundle _ as t -> t
Expand All @@ -56,14 +61,24 @@ let get_flock fiber =
let await (Bundle r as t : t) fiber packed canceler outer =
decr t;
Fiber.set_computation fiber packed;
if Fiber.FLS.get fiber flock_key ~default:Nothing != outer then
Fiber.FLS.set fiber flock_key outer;
let forbid = Fiber.exchange fiber ~forbid:true in
Trigger.await r.finished |> ignore;
Fiber.set fiber ~forbid;
if Fiber.FLS.get fiber flock_key ~default:Nothing != outer then
Fiber.FLS.set fiber flock_key outer;
let (Packed parent) = packed in
Computation.detach parent canceler;
Control.Errors.check r.errors;
begin
let (Packed bundle) = r.bundle in
match Computation.peek_exn bundle with
| _ -> ()
| exception Computation.Running ->
Computation.cancel bundle Control.Terminate Control.empty_bt
| exception Control.Terminate
when Atomic.get r.config land config_on_terminate_raise_bit = 0 ->
()
end;
Fiber.check fiber

let[@inline never] raised exn t fiber packed canceler outer =
Expand All @@ -75,7 +90,7 @@ let[@inline never] raised exn t fiber packed canceler outer =
let[@inline never] returned value (Bundle r as t : t) fiber packed canceler
outer =
let config = Atomic.get r.config in
if config land config_terminated_bit <> 0 then begin
if config land config_on_return_terminate_bit <> 0 then begin
let callstack =
let n = (config land config_callstack_mask) lsr config_callstack_shift in
if n = 0 then None else Some n
Expand All @@ -90,25 +105,31 @@ let join_after_realloc x fn t fiber packed canceler outer =
| value -> returned value t fiber packed canceler outer
| exception exn -> raised exn t fiber packed canceler outer

let join_after_pass (type a) ?callstack ?on_return (fn : a -> _) (pass : a pass)
=
let join_after_pass (type a) ?callstack ?on_return ?on_terminate (fn : a -> _)
(pass : a pass) =
(* The sequence of operations below ensures that nothing is leaked. *)
let (Bundle r as t : t) =
let terminated =
let config =
match on_return with
| None | Some `Wait -> 0
| Some `Terminate -> config_terminated_bit
| None | Some `Wait -> config_one
| Some `Terminate -> config_one lor config_on_return_terminate_bit
in
let callstack =
let config =
match on_terminate with
| None | Some `Ignore -> config
| Some `Raise -> config lor config_on_terminate_raise_bit
in
let config =
match callstack with
| None -> 0
| None -> config
| Some n ->
if n <= 0 then 0
if n <= 0 then config
else
Int.min n (config_callstack_mask lsr config_callstack_shift)
lsl config_callstack_shift
config
lor Int.min n (config_callstack_mask lsr config_callstack_shift)
lsl config_callstack_shift
in
let config = Atomic.make (config_one lor callstack lor terminated) in
let config = Atomic.make config in
let bundle = Computation.Packed (Computation.create ~mode:`LIFO ()) in
let errors = Control.Errors.create () in
let finished = Trigger.create () in
Expand Down Expand Up @@ -208,8 +229,8 @@ let fork_pass (type a) (Bundle r as t : t) thunk (pass : a pass) =
let is_running (Bundle { bundle = Packed bundle; _ } : t) =
Computation.is_running bundle

let join_after ?callstack ?on_return fn =
join_after_pass ?callstack ?on_return fn Arg
let join_after ?callstack ?on_return ?on_terminate fn =
join_after_pass ?callstack ?on_return ?on_terminate fn Arg

let fork t thunk = fork_pass t thunk Arg
let fork_as_promise t thunk = fork_as_promise_pass t thunk Arg
Expand Down
16 changes: 9 additions & 7 deletions lib/picos_std.structured/control.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ module Errors = struct
| [ (exn, bt) ] -> Printexc.raise_with_backtrace exn bt
| exn_bts -> check exn_bts []

let rec push t exn bt backoff =
let before = Atomic.get t in
let after = (exn, bt) :: before in
if not (Atomic.compare_and_set t before after) then
push t exn bt (Backoff.once backoff)

let push t exn bt = push t exn bt Backoff.default
let push t exn bt =
let backoff = ref Backoff.default in
while
let before = Atomic.get t in
let after = (exn, bt) :: before in
not (Atomic.compare_and_set t before after)
do
backoff := Backoff.once !backoff
done
end

let raise_if_canceled () = Fiber.check (Fiber.current ())
Expand Down
12 changes: 12 additions & 0 deletions lib/picos_std.structured/dune
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
(rule
(enabled_if
(<= 5.0.0 %{ocaml_version}))
(action
(copy for.ocaml5.ml for.ml)))

(rule
(enabled_if
(< %{ocaml_version} 5.0.0))
(action
(copy for.ocaml4.ml for.ml)))

(library
(name picos_std_structured)
(public_name picos_std.structured)
Expand Down
4 changes: 2 additions & 2 deletions lib/picos_std.structured/flock.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ let error ?callstack exn_bt = Bundle.error (get ()) ?callstack exn_bt
let fork_as_promise thunk = Bundle.fork_as_promise_pass (get ()) thunk FLS
let fork action = Bundle.fork_pass (get ()) action FLS

let join_after ?callstack ?on_return fn =
Bundle.join_after_pass ?callstack ?on_return fn Bundle.FLS
let join_after ?callstack ?on_return ?on_terminate fn =
Bundle.join_after_pass ?callstack ?on_return ?on_terminate fn Bundle.FLS
57 changes: 57 additions & 0 deletions lib/picos_std.structured/for.ocaml4.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
type _ tdt =
| Empty : [> `Empty ] tdt
| Range : {
mutable lo : int;
hi : int;
parent : [ `Empty | `Range ] tdt;
}
-> [> `Range ] tdt

let[@poll error] cas_lo (Range r : [ `Range ] tdt) before after =
r.lo == before
&& begin
r.lo <- after;
true
end

let rec for_out t (Range r as range : [ `Range ] tdt) action =
let lo_before = r.lo in
let n = r.hi - lo_before in
if 0 < n then begin
if Bundle.is_running t then begin
let lo_after = lo_before + 1 in
if cas_lo range lo_before lo_after then begin
try action lo_before
with exn -> Bundle.error t exn (Printexc.get_raw_backtrace ())
end;
for_out t range action
end
end
else
match r.parent with
| Empty -> ()
| Range _ as range -> for_out t range action

let rec for_in t (Range r as range : [ `Range ] tdt) action =
let lo_before = r.lo in
let n = r.hi - lo_before in
if n <= 1 then for_out t range action
else
let lo_after = lo_before + (n asr 1) in
if cas_lo range lo_before lo_after then begin
Bundle.fork t (fun () -> for_in t range action);
let child = Range { lo = lo_before; hi = lo_after; parent = range } in
for_in t child action
end
else for_in t range action

let for_n ?on_terminate n action =
if 0 < n then
if n = 1 then
try action 0
with
| Control.Terminate when Bundle.on_terminate on_terminate == `Ignore ->
()
else
let range = Range { lo = 0; hi = n; parent = Empty } in
Bundle.join_after ?on_terminate @@ fun t -> for_in t range action
82 changes: 82 additions & 0 deletions lib/picos_std.structured/for.ocaml5.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
open Picos

type per_fiber = { mutable lo : int; mutable hi : int }

type _ tdt =
| Empty : [> `Empty ] tdt
| Range : {
mutable _lo : int;
hi : int;
parent : [ `Empty | `Range ] tdt;
}
-> [> `Range ] tdt

external lo_as_atomic : [ `Range ] tdt -> int Atomic.t = "%identity"

let rec for_out t (Range r as range : [ `Range ] tdt) per_fiber action =
let lo_before = Atomic.get (lo_as_atomic range) in
let n = r.hi - lo_before in
if 0 < n then begin
let lo_after = lo_before + 1 + (n asr 1) in
if Atomic.compare_and_set (lo_as_atomic range) lo_before lo_after then begin
per_fiber.lo <- lo_before;
per_fiber.hi <- lo_after;
while Bundle.is_running t && per_fiber.lo < per_fiber.hi do
try
while per_fiber.lo < per_fiber.hi do
let i = per_fiber.lo in
per_fiber.lo <- i + 1;
action i
done
with exn -> Bundle.error t exn (Printexc.get_raw_backtrace ())
done
end;
for_out t range per_fiber action
end
else
match r.parent with
| Empty -> ()
| Range _ as range -> for_out t range per_fiber action

let rec for_in t (Range r as range : [ `Range ] tdt) per_fiber action =
let lo_before = Atomic.get (lo_as_atomic range) in
let n = r.hi - lo_before in
if n <= 1 then for_out t range per_fiber action
else
let lo_after = lo_before + (n asr 1) in
if Atomic.compare_and_set (lo_as_atomic range) lo_before lo_after then begin
Bundle.fork t (fun () -> for_in_enter t range action);
let child = Range { _lo = lo_before; hi = lo_after; parent = range } in
for_in t child per_fiber action
end
else for_in t range per_fiber action

and for_in_enter bundle range action =
let per_fiber = { lo = 0; hi = 0 } in
let effc (type a) :
a Effect.t -> ((a, _) Effect.Deep.continuation -> _) option = function
| Fiber.Spawn _ | Fiber.Current | Computation.Cancel_after _ -> None
| _ ->
(* Might be blocking, so fork any remaining work to another fiber. *)
if per_fiber.lo < per_fiber.hi then begin
let range =
Range { _lo = per_fiber.lo; hi = per_fiber.hi; parent = Empty }
in
per_fiber.lo <- per_fiber.hi;
Bundle.fork bundle (fun () -> for_in_enter bundle range action)
end;
None
in
let handler = Effect.Deep.{ effc } in
Effect.Deep.try_with (for_in bundle range per_fiber) action handler

let for_n ?on_terminate n action =
if 0 < n then
if n = 1 then
try action 0
with
| Control.Terminate when Bundle.on_terminate on_terminate == `Ignore ->
()
else
let range = Range { _lo = 0; hi = n; parent = Empty } in
Bundle.join_after ?on_terminate @@ fun t -> for_in_enter t range action
Loading

0 comments on commit 9dc05dc

Please sign in to comment.