Skip to content

Commit

Permalink
Merge pull request #69 from bclement-ocp/bclement/cpuaff
Browse files Browse the repository at this point in the history
Better benchmark stability
  • Loading branch information
c-cube authored Jun 13, 2023
2 parents faaa12e + d541596 commit 80b184f
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 36 deletions.
1 change: 1 addition & 0 deletions benchpress.opam
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ depends: [
"uuidm"
"base64"
"ptime"
"processor"
"pp_loc" { >= "2.0" & < "3.0" }
"gnuplot" { >= "0.6" & < "0.8" }
"sqlite3"
Expand Down
15 changes: 8 additions & 7 deletions src/bin/Run_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ module T = Test
module Log = (val Logs.src_log (Logs.Src.create "benchpress.run-main"))

(* run provers on the given dirs, return a list [prover, dir, results] *)
let execute_run_prover_action ?j ?timestamp ?pp_results ?dyn ?limits ?proof_dir
?output ~notify ~uuid ~save ~wal_mode ~update (defs : Definitions.t)
(r : Action.run_provers) : _ * Test_compact_result.t =
let execute_run_prover_action ?j ?cpus ?timestamp ?pp_results ?dyn ?limits
?proof_dir ?output ~notify ~uuid ~save ~wal_mode ~update
(defs : Definitions.t) (r : Action.run_provers) : _ * Test_compact_result.t
=
Error.guard
(Error.wrapf "run prover action@ `@[%a@]`" Action.pp_run_provers r)
@@ fun () ->
let interrupted = CCLock.create false in
let r =
Exec_action.Exec_run_provers.expand ?dyn ?j ?proof_dir ?limits defs r.limits
r.j r.pattern r.dirs r.provers
Exec_action.Exec_run_provers.expand ?dyn ?j ?cpus ?proof_dir ?limits defs
r.limits r.j r.pattern r.dirs r.provers
in
let len = List.length r.problems in
Notify.sendf notify "testing with %d provers, %d problems…"
Expand Down Expand Up @@ -68,7 +69,7 @@ type top_task =
| TT_run_slurm_submission of
Action.run_provers_slurm_submission * Definitions.t

let main ?j ?pp_results ?dyn ?timeout ?memory ?csv ?(provers = []) ?meta:_
let main ?j ?cpus ?pp_results ?dyn ?timeout ?memory ?csv ?(provers = []) ?meta:_
?summary ?task ?(dir_files = []) ?proof_dir ?output ?(save = true)
?(wal_mode = false) ~desktop_notification ~no_failure ~update
?(sbatch = false) ?partition ?nodes ?addr ?port ?ntasks
Expand Down Expand Up @@ -188,7 +189,7 @@ let main ?j ?pp_results ?dyn ?timeout ?memory ?csv ?(provers = []) ?meta:_

let top_res, (results : Test_compact_result.t) =
execute_run_prover_action ~uuid ?pp_results ?proof_dir ?dyn:progress
~limits ?j ?output ~notify ~timestamp ~save ~wal_mode ~update defs
~limits ?j ?cpus ?output ~notify ~timestamp ~save ~wal_mode ~update defs
run_provers_action
in
if CCOpt.is_some csv then (
Expand Down
62 changes: 51 additions & 11 deletions src/bin/benchpress_bin.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ module Run = struct
(* sub-command for running tests *)
let cmd =
let open Cmdliner in
let aux j pp_results dyn paths dir_files proof_dir defs task timeout memory
meta provers csv summary no_color output save wal_mode
let aux j cpus pp_results dyn paths dir_files proof_dir defs task timeout
memory meta provers csv summary no_color output save wal_mode
desktop_notification no_failure update =
catch_err @@ fun () ->
if no_color then CCFormat.set_color_default false;
Expand All @@ -30,8 +30,8 @@ module Run = struct
else
None
in
Run_main.main ~pp_results ?dyn ~j ?timeout ?memory ?csv ~provers ~meta
?task ?summary ~dir_files ?proof_dir ?output ~save ~wal_mode
Run_main.main ~pp_results ?dyn ~j ?cpus ?timeout ?memory ?csv ~provers
~meta ?task ?summary ~dir_files ?proof_dir ?output ~save ~wal_mode
~desktop_notification ~no_failure ~update defs paths ()
in
let defs = Bin_utils.definitions_term
Expand All @@ -51,8 +51,7 @@ module Run = struct
Arg.(value & flag & info [ "wal" ] ~doc:"turn on the journal WAL mode")
and dir_files =
Arg.(
value
& opt_all file []
value & opt_all file []
& info [ "F" ] ~doc:"file containing a list of files")
and proof_dir =
Arg.(
Expand All @@ -67,6 +66,47 @@ module Run = struct
& opt (some int) None
& info [ "t"; "timeout" ] ~doc:"timeout (in s)")
and j = Arg.(value & opt int 1 & info [ "j" ] ~doc:"level of parallelism")
and cpus =
let doc =
"Limit the specific CPUs or cores to use. When provided, the\n\
\ [-j] flag is ignored, and each prover gets allocated its own \
CPU core from\n\
\ this list. A comma-separated list or hyphen-separated ranges \
are allowed."
in
let parser s =
match String.split_on_char '-' s with
| [] -> assert false (* [split_on_char] invariant *)
| [ n ] -> Result.map (fun x -> x, x) Arg.(conv_parser int n)
| [ n; m ] ->
Result.bind Arg.(conv_parser int n) @@ fun n ->
Result.bind Arg.(conv_parser int m) @@ fun m ->
if m < n then
Error (`Msg (Format.asprintf "invalid range: %d-%d" n m))
else
Ok (n, m)
| _ -> Error (`Msg (Format.asprintf "invalid cpuset: %s" s))
in
let printer ppf (n, m) =
if n = m then
Format.pp_print_int ppf n
else
Format.fprintf ppf "%d-%d" n m
in
let cpuspec = Arg.conv ~docv:"MASK" (parser, printer) in
let parse xs =
let cpus =
CCList.flat_map
(fun (n, m) -> List.init (m + 1 - n) (fun i -> i + n))
xs
|> List.sort_uniq Int.compare
in
match cpus with
| [] -> None
| _ -> Some cpus
in
Term.(
const parse $ Arg.(value & opt (list cpuspec) [] & info [ "cpus" ] ~doc))
and memory =
Arg.(
value
Expand Down Expand Up @@ -123,9 +163,10 @@ module Run = struct
in
Cmd.v (Cmd.info ~doc "run")
Term.(
const aux $ j $ pp_results $ dyn $ paths $ dir_files $ proof_dir $ defs
$ task $ timeout $ memory $ meta $ provers $ csv $ summary $ no_color
$ output $ save $ wal_mode $ desktop_notification $ no_failure $ update)
const aux $ j $ cpus $ pp_results $ dyn $ paths $ dir_files $ proof_dir
$ defs $ task $ timeout $ memory $ meta $ provers $ csv $ summary
$ no_color $ output $ save $ wal_mode $ desktop_notification
$ no_failure $ update)
end

module Slurm = struct
Expand Down Expand Up @@ -169,8 +210,7 @@ module Slurm = struct
Arg.(value & flag & info [ "wal" ] ~doc:"turn on the journal WAL mode")
and dir_files =
Arg.(
value
& opt_all file []
value & opt_all file []
& info [ "F" ] ~doc:"file containing a list of files")
and proof_dir =
Arg.(
Expand Down
57 changes: 41 additions & 16 deletions src/core/Exec_action.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ module Log = (val Logs.src_log (Logs.Src.create "benchpress.runexec-action"))

module Exec_run_provers : sig
type t = Action.run_provers
type jobs = Bounded of int | Cpus of int list

type expanded = {
j: int;
j: jobs;
problems: Problem.t list;
provers: Prover.t list;
checkers: Proof_checker.t Misc.Str_map.t;
Expand All @@ -18,6 +19,7 @@ module Exec_run_provers : sig
val expand :
?slurm:bool ->
?j:int ->
?cpus:int list ->
?dyn:bool ->
?limits:Limit.All.t ->
?proof_dir:string ->
Expand Down Expand Up @@ -73,6 +75,7 @@ module Exec_run_provers : sig
Test_top_result.t lazy_t * Test_compact_result.t
end = struct
type t = Action.run_provers
type jobs = Bounded of int | Cpus of int list

let ( >? ) a b =
match a with
Expand All @@ -85,7 +88,7 @@ end = struct
| Some _ as x -> x

type expanded = {
j: int;
j: jobs;
problems: Problem.t list;
provers: Prover.t list;
checkers: Proof_checker.t Misc.Str_map.t;
Expand Down Expand Up @@ -136,21 +139,27 @@ end = struct
| exn -> Error.(raise @@ of_exn exn)
(* Expand options into concrete choices *)
let expand ?(slurm = false) ?j ?(dyn = false) ?limits ?proof_dir ?interrupted
(defs : Definitions.t) s_limits s_j s_pattern s_dirs s_provers : expanded
=
let expand ?(slurm = false) ?j ?cpus ?(dyn = false) ?limits ?proof_dir
?interrupted (defs : Definitions.t) s_limits s_j s_pattern s_dirs
s_provers : expanded =
let limits =
match limits with
| None -> s_limits
| Some l -> Limit.All.with_defaults l ~defaults:s_limits
in
let j =
j >?? s_j
>?
if slurm then
0
else
Misc.guess_cpu_count ()
match cpus with
| None ->
let j =
j >?? s_j
>?
if slurm then
0
else
Misc.guess_cpu_count ()
in
Bounded j
| Some cpus -> Cpus cpus
in
let problems =
CCList.flat_map
Expand Down Expand Up @@ -331,10 +340,19 @@ end = struct
(* run provers *)
let res_l =
let db = CCLock.create db in
Misc.Par_map.map_p ~j:self.j
(fun (prover, pb) -> run_prover_pb ~prover ~pb ~db ())
jobs
|> CCList.flatten
match self.j with
| Bounded j ->
Misc.Par_map.map_p ~j
(fun (prover, pb) -> run_prover_pb ~prover ~pb ~db ())
jobs
|> CCList.flatten
| Cpus cpus ->
Misc.Par_map.map_with_resource ~resources:cpus
(fun cpu (prover, pb) ->
Log.debug (fun m -> m "Running on cpu %d" cpu);
Misc.with_affinity cpu (run_prover_pb ~prover ~pb ~db))
jobs
|> CCList.flatten
in
if interrupted () then Error.fail "run.interrupted";
Expand Down Expand Up @@ -375,6 +393,13 @@ end = struct
~port ~ntasks ?output ?(update = false) ~uuid ~save ~wal_mode
(self : expanded) : _ * _ =
ignore on_start_proof_check;
let j =
match self.j with
| Bounded j -> j
| Cpus cpus ->
Log.warn (fun m -> m "cpu affinity ignored in slurm mode");
List.length cpus
in
let start = Misc.now_s () in
let db =
prepare_db ~wal_mode ?output ~update timestamp uuid save self.provers
Expand Down Expand Up @@ -556,7 +581,7 @@ end = struct
(Unix.string_of_inet_addr addr)
used_port);
let sbatch_cmds =
Slurm_cmd.mk_sbatch_cmds self.limits self.proof_dir self.j addr used_port
Slurm_cmd.mk_sbatch_cmds self.limits self.proof_dir j addr used_port
partition config_file nodes
in
let job_ids =
Expand Down
8 changes: 7 additions & 1 deletion src/core/Exec_action.mli
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@ type cb_progress =
module Exec_run_provers : sig
type t = Action.run_provers

type jobs =
| Bounded of int (* [Bounded j] is at most [j] parallel jobs *)
| Cpus of int list
(* [Cpus cpus] assigns an exclusive cpu from [cpus] to each job *)

type expanded = {
j: int;
j: jobs;
problems: Problem.t list;
provers: Prover.t list;
checkers: Proof_checker.t Misc.Str_map.t;
Expand All @@ -17,6 +22,7 @@ module Exec_run_provers : sig
val expand :
?slurm:bool ->
?j:int ->
?cpus:int list ->
?dyn:bool ->
?limits:Limit.All.t ->
?proof_dir:string ->
Expand Down
38 changes: 38 additions & 0 deletions src/core/Misc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,34 @@ module Par_map = struct
Logs.debug (fun k -> k "par-map: stop pool");
P.stop ();
res

(* Map on the list [l] with each call to [f] being associated one of the
resources from [resources] that is guaranteed not to be used concurrently
by another call to [f]. *)
let map_with_resource ~resources f l =
match l with
| [] -> []
| _ ->
if CCList.is_empty resources then
invalid_arg "map_with_resource: ~resources";
die_on_sigterm ();
let jobs = List.length resources in
let queue = CCBlockingQueue.create jobs in
List.iter (CCBlockingQueue.push queue) resources;
let f x =
let resource = CCBlockingQueue.take queue in
Fun.protect
~finally:(fun () -> CCBlockingQueue.push queue resource)
(fun () -> f resource x)
in
Logs.debug (fun m -> m "par-map: create pool j=%d" jobs);
let module P = CCPool.Make (struct
let max_size = jobs
end) in
let res = P.Fut.map_l (P.Fut.make1 f) l |> P.Fut.get in
Logs.debug (fun m -> m "par-map: stop pool");
P.stop ();
res
end

module Git = struct
Expand Down Expand Up @@ -449,3 +477,13 @@ let start_server n server_fun sock =
let establish_server n server_fun sockaddr =
let sock, _ = mk_socket sockaddr in
start_server n server_fun sock

let with_affinity cpu f =
let aff = Processor.Affinity.get_ids () in
Processor.Affinity.set_ids [ cpu ];
Fun.protect ~finally:(fun () -> Processor.Affinity.set_ids aff) f

let with_affinity_opt cpu f =
match cpu with
| None -> f ()
| Some cpu -> with_affinity cpu f
2 changes: 1 addition & 1 deletion src/core/dune
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
(wrapped true)
(libraries containers containers.unix containers-thread re re.perl csv iter
printbox printbox-text logs logs.cli gnuplot ptime ptime.clock.os uuidm
sqlite3 sqlite3_utils cmdliner pp_loc)
sqlite3 sqlite3_utils cmdliner pp_loc processor)
(flags :standard -w -5 -warn-error -a+8 -strict-sequence))

(rule
Expand Down

0 comments on commit 80b184f

Please sign in to comment.