From 301d7053e4301308acde84e5c949f18f1139adeb Mon Sep 17 00:00:00 2001 From: Carine Morel Date: Thu, 21 Nov 2024 18:52:01 +0100 Subject: [PATCH] Add push_all --- src/mpsc_queue.ml | 17 ++++++++++++++--- src/mpsc_queue.mli | 28 ++++++++++++++++++++++++++-- test/mpsc_queue/stm_mpsc_queue.ml | 11 ++++++++++- 3 files changed, 50 insertions(+), 6 deletions(-) diff --git a/src/mpsc_queue.ml b/src/mpsc_queue.ml index cda7858f..75818764 100644 --- a/src/mpsc_queue.ml +++ b/src/mpsc_queue.ml @@ -32,10 +32,11 @@ type 'a t = { mutable head : 'a clist; tail : 'a clist Atomic.t } let create () = { head = Open; tail = Atomic.make_contended Open } -let[@tail_mod_cons] rec clist_of_list l = - match l with [] -> Open | List.(x :: xs) -> x :: clist_of_list xs +let[@tail_mod_cons] rec append_list_to_clist l l' = + match l with [] -> l' | List.(x :: xs) -> x :: append_list_to_clist xs l' -let of_list l = { head = clist_of_list l; tail = Atomic.make_contended Open } +let of_list l = + { head = append_list_to_clist l Open; tail = Atomic.make_contended Open } (* *) @@ -62,6 +63,16 @@ let rec push t x = let push_head t x = match t.head with Closed -> raise Closed | before -> t.head <- x :: before +let[@tail_mod_cons] rec append_list_to_clist l l' = + match l with [] -> l' | List.(x :: xs) -> x :: append_list_to_clist xs l' + +let rec push_all t values = + match Atomic.get t.tail with + | Closed -> raise Closed + | before -> + let after = append_list_to_clist (List.rev values) before in + if not (Atomic.compare_and_set t.tail before after) then push_all t values + (* *) type ('a, _) poly = diff --git a/src/mpsc_queue.mli b/src/mpsc_queue.mli index 1112137c..e1f645fd 100644 --- a/src/mpsc_queue.mli +++ b/src/mpsc_queue.mli @@ -46,6 +46,30 @@ val push : 'a t -> 'a -> unit @raise Closed if [q] is closed. *) +val push_all : 'a t -> 'a list -> unit +(** [push_all q vs] adds all the elements [vs] at the end of the queue [q]. + This can be used safely by multiple producer domains, in parallel with + the other operations. + + @raise Closed if [q] is closed. + + {[ + # open Saturn.Single_consumer_queue + # let t : int t = create () + val t : int t = + # push_all t [1; 2; 3] + -: unit = () + # pop_opt t + - : int option = Some 1 + # peek_opt t + - : int option = Some 2 + # pop_opt t + - : int option = Some 3 + # pop_opt t + - : int option = None + ]} + *) + (** {2 Consumer-only functions} *) exception Empty @@ -115,9 +139,9 @@ val push_head : 'a t -> 'a -> unit # let t : int t = create () val t : int t = # push t 1 - - : bool = true + - : unit = () # push t 42 - - : bool = true + - : unit = () # pop_opt t - : int option = Some 1 # peek_opt t diff --git a/test/mpsc_queue/stm_mpsc_queue.ml b/test/mpsc_queue/stm_mpsc_queue.ml index 5152a337..82c5a3f3 100644 --- a/test/mpsc_queue/stm_mpsc_queue.ml +++ b/test/mpsc_queue/stm_mpsc_queue.ml @@ -8,15 +8,20 @@ module Mpsc_queue = Saturn.Single_consumer_queue module Spec = struct type cmd = | Push of int + | Push_all of int list | Pop_exn | Peek_exn | Push_head of int | Is_empty | Close + let string_of_int_list l = + "[" ^ String.concat "; " (List.map string_of_int l) ^ "]" + let show_cmd c = match c with | Push i -> "Push " ^ string_of_int i + | Push_all l -> "Push_all " ^ string_of_int_list l | Pop_exn -> "Pop_exn" | Peek_exn -> "Peek_exn" | Push_head i -> "Push_head" ^ string_of_int i @@ -32,6 +37,7 @@ module Spec = struct (Gen.oneof [ Gen.map (fun i -> Push i) int_gen; + Gen.map (fun l -> Push_all l) (Gen.small_list int_gen); (* Gen.return Is_empty; *) (* Gen.return Close; *) ]) @@ -57,6 +63,7 @@ module Spec = struct match c with | Push i -> (is_closed, if not is_closed then i :: List.rev s |> List.rev else s) + | Push_all l -> (is_closed, if not is_closed then s @ l else s) | Push_head i -> (is_closed, if not (is_closed && s = []) then i :: s else s) | Is_empty -> (is_closed, s) | Pop_exn -> (is_closed, match s with [] -> s | _ :: s' -> s') @@ -68,6 +75,8 @@ module Spec = struct let run c d = match c with | Push i -> Res (result unit exn, protect (fun d -> Mpsc_queue.push d i) d) + | Push_all l -> + Res (result unit exn, protect (fun d -> Mpsc_queue.push_all d l) d) | Pop_exn -> Res (result int exn, protect Mpsc_queue.pop_exn d) | Peek_exn -> Res (result int exn, protect Mpsc_queue.peek_exn d) | Push_head i -> @@ -77,7 +86,7 @@ module Spec = struct let postcond c ((is_closed, s) : state) res = match (c, res) with - | Push _, Res ((Result (Unit, Exn), _), res) -> + | (Push _ | Push_all _), Res ((Result (Unit, Exn), _), res) -> if is_closed then res = Error Mpsc_queue.Closed else res = Ok () | Push_head _, Res ((Result (Unit, Exn), _), res) -> if is_closed && s = [] then res = Error Mpsc_queue.Closed