Skip to content

Commit

Permalink
Of_list function.
Browse files Browse the repository at this point in the history
  • Loading branch information
lyrm committed Nov 21, 2024
1 parent 2eb8d60 commit 245128b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 41 deletions.
8 changes: 6 additions & 2 deletions src/mpsc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,19 @@ type 'a t = { mutable head : 'a clist; tail : 'a clist Atomic.t }

let create () = { head = Open; tail = Atomic.make_contended Open }

let[@tail_mod_cons] rec clist_of_list l =
match l with [] -> Open | List.(x :: xs) -> x :: clist_of_list xs

let of_list l = { head = clist_of_list l; tail = Atomic.make_contended Open }

(* *)

let is_empty t =
match t.head with
| _ :: _ -> false
| Closed -> raise Closed
| Open -> ( match Atomic.get t.tail with _ :: _ -> false | _ -> true)


let close t =
match Atomic.exchange t.tail Closed with
| Closed -> raise Closed
Expand Down Expand Up @@ -109,6 +114,5 @@ let rec peek_as : type a r. a t -> (a, r) poly2 -> r =
let pop_opt t = pop_as t Option
let pop_exn t = pop_as t Value
let drop_exn t = pop_as t Unit

let peek_exn t = peek_as t Value
let peek_opt t = peek_as t Option
64 changes: 25 additions & 39 deletions src/mpsc_queue.mli
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,24 @@ exception Closed
val create : unit -> 'a t
(** [create ()] returns a new empty queue. *)

val of_list : 'a list -> 'a t
(** [of_list l] returns a new empty queue.
{[
# open Saturn.Single_consumer_queue
# let t : int t = of_list [1; 2; 3]
val t : int t = <abstr>
# pop_opt t
- : int option = Some 1
# peek_opt t
- : int option = Some 2
# pop_opt t
- : int option = Some 3
# pop_opt t
- : int option = None
]}
*)

(** {2 Producer-only functions} *)

val push : 'a t -> 'a -> unit
Expand Down Expand Up @@ -90,56 +108,24 @@ val push_head : 'a t -> 'a -> unit
@raise Closed if [q] is closed and empty. *)


(** {1 Examples}
An example top-level session:
{[
# open Saturn.Bounded_stack
# open Saturn.Single_consumer_queue
# let t : int t = create ()
val t : int t = <abstr>
# try_push t 42
# push t 1
- : bool = true
# push t 42
- : bool = true
# push_exn t 1
- : unit = ()
# pop_exn t
- : int = 1
# peek_opt t
- : int option = Some 42
# pop_opt t
- : int option = Some 1
# peek_opt t
- : int option = Some 42
# pop_opt t
- : int option = None
# pop_exn t
Exception: Saturn__Bounded_stack.Empty.]}
Exception: Saturn__Single_consumer_queue.Empty.]}
A multicore example:
{@ocaml non-deterministic[
# open Saturn.Bounded_stack
# let t :int t = create ()
val t : int t = <abstr>
# let barrier = Atomic.make 2
val barrier : int Atomic.t = <abstr>
# let pusher () =
Atomic.decr barrier;
while Atomic.get barrier != 0 do Domain.cpu_relax () done;
try_push_all t [1;2;3] |> ignore;
push_exn t 42;
push_exn t 12
val pusher : unit -> unit = <fun>
# let popper () =
Atomic.decr barrier;
while Atomic.get barrier != 0 do Domain.cpu_relax () done;
List.init 6 (fun i -> Domain.cpu_relax (); pop_opt t)
val popper : unit -> int option list = <fun>
# let domain_pusher = Domain.spawn pusher
val domain_pusher : unit Domain.t = <abstr>
# let domain_popper = Domain.spawn popper
val domain_popper : int option list Domain.t = <abstr>
# Domain.join domain_pusher
- : unit = ()
# Domain.join domain_popper
- : int option list = [Some 42; Some 3; Some 2; Some 1; None; Some 12]
]}
*)

0 comments on commit 245128b

Please sign in to comment.