Skip to content

Commit

Permalink
Add push_all
Browse files Browse the repository at this point in the history
  • Loading branch information
lyrm committed Nov 21, 2024
1 parent 245128b commit 301d705
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 6 deletions.
17 changes: 14 additions & 3 deletions src/mpsc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ type 'a t = { mutable head : 'a clist; tail : 'a clist Atomic.t }

let create () = { head = Open; tail = Atomic.make_contended Open }

let[@tail_mod_cons] rec clist_of_list l =
match l with [] -> Open | List.(x :: xs) -> x :: clist_of_list xs
let[@tail_mod_cons] rec append_list_to_clist l l' =
match l with [] -> l' | List.(x :: xs) -> x :: append_list_to_clist xs l'

let of_list l = { head = clist_of_list l; tail = Atomic.make_contended Open }
let of_list l =
{ head = append_list_to_clist l Open; tail = Atomic.make_contended Open }

(* *)

Expand All @@ -62,6 +63,16 @@ let rec push t x =
let push_head t x =
match t.head with Closed -> raise Closed | before -> t.head <- x :: before

let[@tail_mod_cons] rec append_list_to_clist l l' =
match l with [] -> l' | List.(x :: xs) -> x :: append_list_to_clist xs l'

let rec push_all t values =
match Atomic.get t.tail with
| Closed -> raise Closed
| before ->
let after = append_list_to_clist (List.rev values) before in
if not (Atomic.compare_and_set t.tail before after) then push_all t values

(* *)

type ('a, _) poly =
Expand Down
28 changes: 26 additions & 2 deletions src/mpsc_queue.mli
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,30 @@ val push : 'a t -> 'a -> unit
@raise Closed if [q] is closed. *)

val push_all : 'a t -> 'a list -> unit
(** [push_all q vs] adds all the elements [vs] at the end of the queue [q].
This can be used safely by multiple producer domains, in parallel with
the other operations.
@raise Closed if [q] is closed.
{[
# open Saturn.Single_consumer_queue
# let t : int t = create ()
val t : int t = <abstr>
# push_all t [1; 2; 3]
-: unit = ()
# pop_opt t
- : int option = Some 1
# peek_opt t
- : int option = Some 2
# pop_opt t
- : int option = Some 3
# pop_opt t
- : int option = None
]}
*)

(** {2 Consumer-only functions} *)

exception Empty
Expand Down Expand Up @@ -115,9 +139,9 @@ val push_head : 'a t -> 'a -> unit
# let t : int t = create ()
val t : int t = <abstr>
# push t 1
- : bool = true
- : unit = ()
# push t 42
- : bool = true
- : unit = ()
# pop_opt t
- : int option = Some 1
# peek_opt t
Expand Down
11 changes: 10 additions & 1 deletion test/mpsc_queue/stm_mpsc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@ module Mpsc_queue = Saturn.Single_consumer_queue
module Spec = struct
type cmd =
| Push of int
| Push_all of int list
| Pop_exn
| Peek_exn
| Push_head of int
| Is_empty
| Close

let string_of_int_list l =
"[" ^ String.concat "; " (List.map string_of_int l) ^ "]"

let show_cmd c =
match c with
| Push i -> "Push " ^ string_of_int i
| Push_all l -> "Push_all " ^ string_of_int_list l
| Pop_exn -> "Pop_exn"
| Peek_exn -> "Peek_exn"
| Push_head i -> "Push_head" ^ string_of_int i
Expand All @@ -32,6 +37,7 @@ module Spec = struct
(Gen.oneof
[
Gen.map (fun i -> Push i) int_gen;
Gen.map (fun l -> Push_all l) (Gen.small_list int_gen);
(* Gen.return Is_empty; *)
(* Gen.return Close; *)
])
Expand All @@ -57,6 +63,7 @@ module Spec = struct
match c with
| Push i ->
(is_closed, if not is_closed then i :: List.rev s |> List.rev else s)
| Push_all l -> (is_closed, if not is_closed then s @ l else s)
| Push_head i -> (is_closed, if not (is_closed && s = []) then i :: s else s)
| Is_empty -> (is_closed, s)
| Pop_exn -> (is_closed, match s with [] -> s | _ :: s' -> s')
Expand All @@ -68,6 +75,8 @@ module Spec = struct
let run c d =
match c with
| Push i -> Res (result unit exn, protect (fun d -> Mpsc_queue.push d i) d)
| Push_all l ->
Res (result unit exn, protect (fun d -> Mpsc_queue.push_all d l) d)
| Pop_exn -> Res (result int exn, protect Mpsc_queue.pop_exn d)
| Peek_exn -> Res (result int exn, protect Mpsc_queue.peek_exn d)
| Push_head i ->
Expand All @@ -77,7 +86,7 @@ module Spec = struct

let postcond c ((is_closed, s) : state) res =
match (c, res) with
| Push _, Res ((Result (Unit, Exn), _), res) ->
| (Push _ | Push_all _), Res ((Result (Unit, Exn), _), res) ->
if is_closed then res = Error Mpsc_queue.Closed else res = Ok ()
| Push_head _, Res ((Result (Unit, Exn), _), res) ->
if is_closed && s = [] then res = Error Mpsc_queue.Closed
Expand Down

0 comments on commit 301d705

Please sign in to comment.