diff --git a/benchpress.opam b/benchpress.opam index 736d246..bf911c9 100644 --- a/benchpress.opam +++ b/benchpress.opam @@ -21,6 +21,7 @@ depends: [ "uuidm" "base64" "ptime" + "processor" "pp_loc" { >= "2.0" & < "3.0" } "gnuplot" { >= "0.6" & < "0.8" } "sqlite3" diff --git a/src/bin/Run_main.ml b/src/bin/Run_main.ml index 3ef177d..cc3c3d2 100644 --- a/src/bin/Run_main.ml +++ b/src/bin/Run_main.ml @@ -3,16 +3,17 @@ module T = Test module Log = (val Logs.src_log (Logs.Src.create "benchpress.run-main")) (* run provers on the given dirs, return a list [prover, dir, results] *) -let execute_run_prover_action ?j ?timestamp ?pp_results ?dyn ?limits ?proof_dir - ?output ~notify ~uuid ~save ~wal_mode ~update (defs : Definitions.t) - (r : Action.run_provers) : _ * Test_compact_result.t = +let execute_run_prover_action ?j ?cpus ?timestamp ?pp_results ?dyn ?limits + ?proof_dir ?output ~notify ~uuid ~save ~wal_mode ~update + (defs : Definitions.t) (r : Action.run_provers) : _ * Test_compact_result.t + = Error.guard (Error.wrapf "run prover action@ `@[%a@]`" Action.pp_run_provers r) @@ fun () -> let interrupted = CCLock.create false in let r = - Exec_action.Exec_run_provers.expand ?dyn ?j ?proof_dir ?limits defs r.limits - r.j r.pattern r.dirs r.provers + Exec_action.Exec_run_provers.expand ?dyn ?j ?cpus ?proof_dir ?limits defs + r.limits r.j r.pattern r.dirs r.provers in let len = List.length r.problems in Notify.sendf notify "testing with %d provers, %d problems…" @@ -68,7 +69,7 @@ type top_task = | TT_run_slurm_submission of Action.run_provers_slurm_submission * Definitions.t -let main ?j ?pp_results ?dyn ?timeout ?memory ?csv ?(provers = []) ?meta:_ +let main ?j ?cpus ?pp_results ?dyn ?timeout ?memory ?csv ?(provers = []) ?meta:_ ?summary ?task ?(dir_files = []) ?proof_dir ?output ?(save = true) ?(wal_mode = false) ~desktop_notification ~no_failure ~update ?(sbatch = false) ?partition ?nodes ?addr ?port ?ntasks @@ -188,7 +189,7 @@ let main ?j ?pp_results ?dyn ?timeout ?memory ?csv ?(provers = []) ?meta:_ let top_res, (results : Test_compact_result.t) = execute_run_prover_action ~uuid ?pp_results ?proof_dir ?dyn:progress - ~limits ?j ?output ~notify ~timestamp ~save ~wal_mode ~update defs + ~limits ?j ?cpus ?output ~notify ~timestamp ~save ~wal_mode ~update defs run_provers_action in if CCOpt.is_some csv then ( diff --git a/src/bin/benchpress_bin.ml b/src/bin/benchpress_bin.ml index 145fd21..5c36aef 100644 --- a/src/bin/benchpress_bin.ml +++ b/src/bin/benchpress_bin.ml @@ -19,8 +19,8 @@ module Run = struct (* sub-command for running tests *) let cmd = let open Cmdliner in - let aux j pp_results dyn paths dir_files proof_dir defs task timeout memory - meta provers csv summary no_color output save wal_mode + let aux j cpus pp_results dyn paths dir_files proof_dir defs task timeout + memory meta provers csv summary no_color output save wal_mode desktop_notification no_failure update = catch_err @@ fun () -> if no_color then CCFormat.set_color_default false; @@ -30,8 +30,8 @@ module Run = struct else None in - Run_main.main ~pp_results ?dyn ~j ?timeout ?memory ?csv ~provers ~meta - ?task ?summary ~dir_files ?proof_dir ?output ~save ~wal_mode + Run_main.main ~pp_results ?dyn ~j ?cpus ?timeout ?memory ?csv ~provers + ~meta ?task ?summary ~dir_files ?proof_dir ?output ~save ~wal_mode ~desktop_notification ~no_failure ~update defs paths () in let defs = Bin_utils.definitions_term @@ -51,8 +51,7 @@ module Run = struct Arg.(value & flag & info [ "wal" ] ~doc:"turn on the journal WAL mode") and dir_files = Arg.( - value - & opt_all file [] + value & opt_all file [] & info [ "F" ] ~doc:"file containing a list of files") and proof_dir = Arg.( @@ -67,6 +66,47 @@ module Run = struct & opt (some int) None & info [ "t"; "timeout" ] ~doc:"timeout (in s)") and j = Arg.(value & opt int 1 & info [ "j" ] ~doc:"level of parallelism") + and cpus = + let doc = + "Limit the specific CPUs or cores to use. When provided, the\n\ + \ [-j] flag is ignored, and each prover gets allocated its own \ + CPU core from\n\ + \ this list. A comma-separated list or hyphen-separated ranges \ + are allowed." + in + let parser s = + match String.split_on_char '-' s with + | [] -> assert false (* [split_on_char] invariant *) + | [ n ] -> Result.map (fun x -> x, x) Arg.(conv_parser int n) + | [ n; m ] -> + Result.bind Arg.(conv_parser int n) @@ fun n -> + Result.bind Arg.(conv_parser int m) @@ fun m -> + if m < n then + Error (`Msg (Format.asprintf "invalid range: %d-%d" n m)) + else + Ok (n, m) + | _ -> Error (`Msg (Format.asprintf "invalid cpuset: %s" s)) + in + let printer ppf (n, m) = + if n = m then + Format.pp_print_int ppf n + else + Format.fprintf ppf "%d-%d" n m + in + let cpuspec = Arg.conv ~docv:"MASK" (parser, printer) in + let parse xs = + let cpus = + CCList.flat_map + (fun (n, m) -> List.init (m + 1 - n) (fun i -> i + n)) + xs + |> List.sort_uniq Int.compare + in + match cpus with + | [] -> None + | _ -> Some cpus + in + Term.( + const parse $ Arg.(value & opt (list cpuspec) [] & info [ "cpus" ] ~doc)) and memory = Arg.( value @@ -123,9 +163,10 @@ module Run = struct in Cmd.v (Cmd.info ~doc "run") Term.( - const aux $ j $ pp_results $ dyn $ paths $ dir_files $ proof_dir $ defs - $ task $ timeout $ memory $ meta $ provers $ csv $ summary $ no_color - $ output $ save $ wal_mode $ desktop_notification $ no_failure $ update) + const aux $ j $ cpus $ pp_results $ dyn $ paths $ dir_files $ proof_dir + $ defs $ task $ timeout $ memory $ meta $ provers $ csv $ summary + $ no_color $ output $ save $ wal_mode $ desktop_notification + $ no_failure $ update) end module Slurm = struct @@ -169,8 +210,7 @@ module Slurm = struct Arg.(value & flag & info [ "wal" ] ~doc:"turn on the journal WAL mode") and dir_files = Arg.( - value - & opt_all file [] + value & opt_all file [] & info [ "F" ] ~doc:"file containing a list of files") and proof_dir = Arg.( diff --git a/src/core/Exec_action.ml b/src/core/Exec_action.ml index 7740229..ffb3327 100644 --- a/src/core/Exec_action.ml +++ b/src/core/Exec_action.ml @@ -5,9 +5,10 @@ module Log = (val Logs.src_log (Logs.Src.create "benchpress.runexec-action")) module Exec_run_provers : sig type t = Action.run_provers + type jobs = Bounded of int | Cpus of int list type expanded = { - j: int; + j: jobs; problems: Problem.t list; provers: Prover.t list; checkers: Proof_checker.t Misc.Str_map.t; @@ -18,6 +19,7 @@ module Exec_run_provers : sig val expand : ?slurm:bool -> ?j:int -> + ?cpus:int list -> ?dyn:bool -> ?limits:Limit.All.t -> ?proof_dir:string -> @@ -73,6 +75,7 @@ module Exec_run_provers : sig Test_top_result.t lazy_t * Test_compact_result.t end = struct type t = Action.run_provers + type jobs = Bounded of int | Cpus of int list let ( >? ) a b = match a with @@ -85,7 +88,7 @@ end = struct | Some _ as x -> x type expanded = { - j: int; + j: jobs; problems: Problem.t list; provers: Prover.t list; checkers: Proof_checker.t Misc.Str_map.t; @@ -136,21 +139,27 @@ end = struct | exn -> Error.(raise @@ of_exn exn) (* Expand options into concrete choices *) - let expand ?(slurm = false) ?j ?(dyn = false) ?limits ?proof_dir ?interrupted - (defs : Definitions.t) s_limits s_j s_pattern s_dirs s_provers : expanded - = + let expand ?(slurm = false) ?j ?cpus ?(dyn = false) ?limits ?proof_dir + ?interrupted (defs : Definitions.t) s_limits s_j s_pattern s_dirs + s_provers : expanded = let limits = match limits with | None -> s_limits | Some l -> Limit.All.with_defaults l ~defaults:s_limits in let j = - j >?? s_j - >? - if slurm then - 0 - else - Misc.guess_cpu_count () + match cpus with + | None -> + let j = + j >?? s_j + >? + if slurm then + 0 + else + Misc.guess_cpu_count () + in + Bounded j + | Some cpus -> Cpus cpus in let problems = CCList.flat_map @@ -331,10 +340,19 @@ end = struct (* run provers *) let res_l = let db = CCLock.create db in - Misc.Par_map.map_p ~j:self.j - (fun (prover, pb) -> run_prover_pb ~prover ~pb ~db ()) - jobs - |> CCList.flatten + match self.j with + | Bounded j -> + Misc.Par_map.map_p ~j + (fun (prover, pb) -> run_prover_pb ~prover ~pb ~db ()) + jobs + |> CCList.flatten + | Cpus cpus -> + Misc.Par_map.map_with_resource ~resources:cpus + (fun cpu (prover, pb) -> + Log.debug (fun m -> m "Running on cpu %d" cpu); + Misc.with_affinity cpu (run_prover_pb ~prover ~pb ~db)) + jobs + |> CCList.flatten in if interrupted () then Error.fail "run.interrupted"; @@ -375,6 +393,13 @@ end = struct ~port ~ntasks ?output ?(update = false) ~uuid ~save ~wal_mode (self : expanded) : _ * _ = ignore on_start_proof_check; + let j = + match self.j with + | Bounded j -> j + | Cpus cpus -> + Log.warn (fun m -> m "cpu affinity ignored in slurm mode"); + List.length cpus + in let start = Misc.now_s () in let db = prepare_db ~wal_mode ?output ~update timestamp uuid save self.provers @@ -556,7 +581,7 @@ end = struct (Unix.string_of_inet_addr addr) used_port); let sbatch_cmds = - Slurm_cmd.mk_sbatch_cmds self.limits self.proof_dir self.j addr used_port + Slurm_cmd.mk_sbatch_cmds self.limits self.proof_dir j addr used_port partition config_file nodes in let job_ids = diff --git a/src/core/Exec_action.mli b/src/core/Exec_action.mli index cfd0d66..b8bbef1 100644 --- a/src/core/Exec_action.mli +++ b/src/core/Exec_action.mli @@ -5,8 +5,13 @@ type cb_progress = module Exec_run_provers : sig type t = Action.run_provers + type jobs = + | Bounded of int (* [Bounded j] is at most [j] parallel jobs *) + | Cpus of int list + (* [Cpus cpus] assigns an exclusive cpu from [cpus] to each job *) + type expanded = { - j: int; + j: jobs; problems: Problem.t list; provers: Prover.t list; checkers: Proof_checker.t Misc.Str_map.t; @@ -17,6 +22,7 @@ module Exec_run_provers : sig val expand : ?slurm:bool -> ?j:int -> + ?cpus:int list -> ?dyn:bool -> ?limits:Limit.All.t -> ?proof_dir:string -> diff --git a/src/core/Misc.ml b/src/core/Misc.ml index 40d7aaf..eb174f6 100644 --- a/src/core/Misc.ml +++ b/src/core/Misc.ml @@ -317,6 +317,34 @@ module Par_map = struct Logs.debug (fun k -> k "par-map: stop pool"); P.stop (); res + + (* Map on the list [l] with each call to [f] being associated one of the + resources from [resources] that is guaranteed not to be used concurrently + by another call to [f]. *) + let map_with_resource ~resources f l = + match l with + | [] -> [] + | _ -> + if CCList.is_empty resources then + invalid_arg "map_with_resource: ~resources"; + die_on_sigterm (); + let jobs = List.length resources in + let queue = CCBlockingQueue.create jobs in + List.iter (CCBlockingQueue.push queue) resources; + let f x = + let resource = CCBlockingQueue.take queue in + Fun.protect + ~finally:(fun () -> CCBlockingQueue.push queue resource) + (fun () -> f resource x) + in + Logs.debug (fun m -> m "par-map: create pool j=%d" jobs); + let module P = CCPool.Make (struct + let max_size = jobs + end) in + let res = P.Fut.map_l (P.Fut.make1 f) l |> P.Fut.get in + Logs.debug (fun m -> m "par-map: stop pool"); + P.stop (); + res end module Git = struct @@ -449,3 +477,13 @@ let start_server n server_fun sock = let establish_server n server_fun sockaddr = let sock, _ = mk_socket sockaddr in start_server n server_fun sock + +let with_affinity cpu f = + let aff = Processor.Affinity.get_ids () in + Processor.Affinity.set_ids [ cpu ]; + Fun.protect ~finally:(fun () -> Processor.Affinity.set_ids aff) f + +let with_affinity_opt cpu f = + match cpu with + | None -> f () + | Some cpu -> with_affinity cpu f diff --git a/src/core/dune b/src/core/dune index b2f3687..a7bb8c5 100644 --- a/src/core/dune +++ b/src/core/dune @@ -7,7 +7,7 @@ (wrapped true) (libraries containers containers.unix containers-thread re re.perl csv iter printbox printbox-text logs logs.cli gnuplot ptime ptime.clock.os uuidm - sqlite3 sqlite3_utils cmdliner pp_loc) + sqlite3 sqlite3_utils cmdliner pp_loc processor) (flags :standard -w -5 -warn-error -a+8 -strict-sequence)) (rule