Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add priority based scheduler example #288

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions test/lib/prios/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
(library
(name prios)
(libraries backoff multicore-magic picos picos_std.sync psq))
130 changes: 130 additions & 0 deletions test/lib/prios/prios.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
open Picos
module S = Picos_std_sync

module List_ext = struct
let[@tail_mod_cons] rec drop_first_or_not_found x' = function
| [] -> raise_notrace Not_found
| x :: xs -> if x == x' then xs else x :: drop_first_or_not_found x' xs
end

module Id = struct
type t = T of int [@@unboxed]

let[@inline] compare (T l) (T r) = Int.compare l r
let key = Fiber.FLS.create ()
let next_id = Atomic.make 0

let get_as fiber =
match Fiber.FLS.get_exn fiber key with
| id -> id
| exception Fiber.FLS.Not_set ->
let id = T (Atomic.fetch_and_add next_id 1) in
Fiber.FLS.set fiber key id;
id

let get () = get_as @@ Fiber.current ()
end

module Priority = struct
type t = T of int [@@unboxed]

let[@inline] compare (T l) (T r) = Int.compare l r
let[@inline] max (T l) (T r) = T (Int.max l r)
let default = T 0
let higher (T p) = T (p + 1)
let key = Fiber.FLS.create ()
let get_as fiber = Fiber.FLS.get fiber key ~default
let get () = get_as @@ Fiber.current ()

let set priority =
let fiber = Fiber.current () in
if priority = default then Fiber.FLS.remove fiber key
else Fiber.FLS.set fiber key priority
end

module Priority_inv = struct
type t = Priority.t

let compare l r = Priority.compare r l
end

module Pq_hi = Psq.Make (Id) (Priority_inv)

(*
type _ tdt =
| Nothing : [> `Nothing ] tdt
| Holder :
*)
type mutex = { waiters : Pq_hi.t Atomic.t; mutex : S.Mutex.t }

module Mutex = struct
type t = mutex

let key = Fiber.FLS.create ()
let get_as fiber = Fiber.FLS.get fiber key ~default:[]

let add_mutex_as fiber t =
get_as fiber |> List.cons t |> Fiber.FLS.set fiber key

let remove_mutex_as fiber t =
get_as fiber
|> List_ext.drop_first_or_not_found t
|> Fiber.FLS.set fiber key

let rec add_waiter t id priority backoff =
let before = Atomic.get t.waiters in
let after = Pq_hi.add id priority before in
if not (Atomic.compare_and_set t.waiters before after) then
add_waiter t id priority (Backoff.once backoff)

let rec remove_waiter t id backoff =
let before = Atomic.get t.waiters in
let after = Pq_hi.remove id before in
if not (Atomic.compare_and_set t.waiters before after) then
remove_waiter t id (Backoff.once backoff)

let max_waiter t =
match Pq_hi.min (Atomic.get t.waiters) with
| None -> Priority.default
| Some (_id, priority) -> priority

let create ?padded () =
let waiters = Atomic.make Pq_hi.empty |> Multicore_magic.copy_as ?padded in
let mutex = S.Mutex.create ?padded () in
Multicore_magic.copy_as ?padded { waiters; mutex }

let lock t =
let fiber = Fiber.current () in
let id = Id.get_as fiber in
let priority = Priority.get_as fiber in
add_waiter t id priority Backoff.default;
match S.Mutex.lock t.mutex with
| () ->
remove_waiter t id Backoff.default;
add_mutex_as fiber t
| exception exn ->
remove_waiter t id Backoff.default;
raise exn

let unlock t =
let fiber = Fiber.current () in
remove_mutex_as fiber t;
S.Mutex.unlock t.mutex
end

module Condition = struct
type t = S.Condition.t

let create = S.Condition.create
let wait t m = S.Condition.wait t m.mutex
let broadcast = S.Condition.broadcast
end

let _get_dynamic_priority_as fiber =
Mutex.get_as fiber
|> List.fold_left
(fun p m -> Priority.max p (Mutex.max_waiter m))
(Priority.get_as fiber)

let run_fiber ?fatal_exn_handler:_ _fiber _main = failwith "XXX"
let run ?fatal_exn_handler:_ ?forbid:_ _main = failwith "XXX"
77 changes: 77 additions & 0 deletions test/lib/prios/prios.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
(** *)

open Picos

module Id : sig
(** *)

type t
(** *)

val compare : t -> t -> int
(** *)

val get : unit -> t
(** *)
end

module Priority : sig
(** *)

type t
(** *)

val compare : t -> t -> int
(** *)

val default : t
(** *)

val higher : t -> t
(** *)

val get : unit -> t
(** *)

val set : t -> unit
(** *)
end

module Mutex : sig
(** *)

type t
(** *)

val create : ?padded:bool -> unit -> t
(** *)

val lock : t -> unit
(** *)

val unlock : t -> unit
(** *)
end

module Condition : sig
(** *)

type t
(** *)

val create : ?padded:bool -> unit -> t
(** *)

val wait : t -> Mutex.t -> unit
(** *)

val broadcast : t -> unit
(** *)
end

val run_fiber :
?fatal_exn_handler:(exn -> unit) -> Fiber.t -> (Fiber.t -> unit) -> unit
(** *)

val run : ?fatal_exn_handler:(exn -> unit) -> ?forbid:bool -> (unit -> 'a) -> 'a
(** *)
Loading