From e5bffcad6263bb82f2497a29d3eb5764a8c52b83 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Wed, 11 Dec 2024 14:58:46 -0800 Subject: [PATCH 1/5] python: add prototype for submission cli plugins Problem: There's no way to extend job submission cli commands in Flux via plugins. Add a prototype CLIPlugin class and CLIPluginRegistry (which loads CLIPlugins). --- src/bindings/python/flux/Makefile.am | 1 + src/bindings/python/flux/cli/plugin.py | 187 +++++++++++++++++++++++++ 2 files changed, 188 insertions(+) create mode 100644 src/bindings/python/flux/cli/plugin.py diff --git a/src/bindings/python/flux/Makefile.am b/src/bindings/python/flux/Makefile.am index 789de88f2e79..548c7a154d1c 100644 --- a/src/bindings/python/flux/Makefile.am +++ b/src/bindings/python/flux/Makefile.am @@ -21,6 +21,7 @@ nobase_fluxpy_PYTHON = \ cli/run.py \ cli/submit.py \ cli/fortune.py \ + cli/plugin.py \ core/__init__.py \ core/watchers.py \ core/inner.py \ diff --git a/src/bindings/python/flux/cli/plugin.py b/src/bindings/python/flux/cli/plugin.py new file mode 100644 index 000000000000..d88ccf054c60 --- /dev/null +++ b/src/bindings/python/flux/cli/plugin.py @@ -0,0 +1,187 @@ +############################################################## +# Copyright 2024 Lawrence Livermore National Security, LLC +# (c.f. AUTHORS, NOTICE.LLNS, COPYING) +# +# This file is part of the Flux resource manager framework. +# For details, see https://github.com/flux-framework. +# +# SPDX-License-Identifier: LGPL-3.0 +############################################################## +from os import getenv +import glob +from abc import ABC + +from flux.conf_builtin import conf_builtin_get +from flux.importer import import_path + + +class CLIPluginValue: + def __init__(self, value): + temp = value.split("=") + self.key = temp[0] + try: + self.value = temp[1] + except IndexError: + self.value = "" + + def get_value(self): + """Getter for returning the key and value""" + return self.key, self.value + + +class CLIPlugin(ABC): # pragma no cover + """Base class for a CLI submission plugin + + A plugin should derive from this class and implement one or more + base methods (described below) + + Attributes: + opt (str): command-line key passed to -P/--plugin to activate the + extension + usage (str): short --help message for the extension + help (str): long --help message for the extension + version (str): optional version for the plugin, preferred in format + MAJOR.MINOR.PATCH + prog (str): command-line subcommand for which the plugin is active, + e.g. "submit", "run", "alloc", "batch", "bulksubmit" + """ + + def __init__(self, prog): + self.opt = None + self.usage = None + self.help = None + self.version = None + self.prog = prog + if prog.startswith("flux "): + self.prog = prog[5:] + + def get_plugin_name(self): + """Return the KEY to invoke the plugin""" + return self.opt + + def get_help_message(self): + """Return the message to print with --help""" + return self.help + + def preinit(self, args, values): + """After parsing options, before jobspec is initialized + + Either here, or in validation, plugin extensions should check types and + perform their own validation, since argparse will accept any string. + + Args: + args (:py:obj:`Namespace`): Namespace result from + :py:meth:`Argparse.ArgumentParser.parse_args()`. + values (:obj:`Dict`): Dictionary of KEY[=VALUE] pairs returned + by parsing the arguments provided to the -P/--plugin + option. KEY is None if plugin was not invoked. VALUE is the + empty string if the plugin was invoked without VALUE. + """ + pass + + def modify_jobspec(self, args, jobspec, values): + """Allow plugin to modify jobspec + + This function is called after arguments have been parsed and jobspec + has mostly been initialized. + + Args: + args (:py:obj:`Namespace`): Namespace result from + :py:meth:`Argparse.ArgumentParser.parse_args()`. + jobspec (:obj:`flux.job.Jobspec`): instantiated jobspec object. + This plugin can modify this object directly to enact + changes in the current programs generated jobspec. + values (:obj:`Dict`): Dictionary of KEY[=VALUE] pairs returned + by parsing the arguments provided to the -P/--plugin + option. KEY is None if plugin was not invoked. VALUE is the + empty string if the plugin was invoked without VALUE. + """ + pass + + def validate(self, jobspec): + """Allow a plugin to validate jobspec + + This callback may be used by the cli itself or a job validator + to validate the final jobspec. + + On an invalid jobspec, this callback should raise ValueError + with a useful error message. + + Args: + jobspec (:obj:`flux.job.Jobspec`): jobspec object to validate. + """ + pass + + +class CLIPluginRegistry: + """Flux CLI plugin registry helper class""" + + def __init__(self): + self.plugins = [] + etc = conf_builtin_get("confdir") + if etc is None: + raise ValueError("failed to get builtin confdir") + if getenv("FLUX_CLI_PLUGINPATH"): + self.plugindir = getenv("FLUX_CLI_PLUGINPATH") + else: + self.plugindir = f"{etc}/cli/plugins" + + def _add_plugins(self, module, program): + entries = [ + getattr(module, attr) for attr in dir(module) if not attr.startswith("_") + ] + ## TODO: as we get each entry, make sure it is NOT speciifically the CLIPlugin base class + for entry in entries: + # only process entries that are an instance of type (i.e. a class) + # and are a subclass of CLIPlugin: + if isinstance(entry, type) and issubclass(entry, CLIPlugin): + self.plugins.append(entry(program)) + + def load_plugins(self, program): + """Load all cli plugins from the standard path""" + for path in glob.glob(f"{self.plugindir}/*.py"): + self._add_plugins(import_path(path), program) + return self + + def add_plugin_option(self, parser): + """Add the -P option and list of available keys""" + parser.add_argument( + "-P", + "--plugin", + type=CLIPluginValue, + action="append", + help=f"{self.get_plugin_help_messages()}", + ) + + def get_plugin_options(self): + """Return all plugin option keys from self.plugins""" + opts = [] + for plugin in self.plugins[1:]: + opts.append(str(plugin.get_plugin_name())) + return opts + + def get_plugin_help_messages(self): + """Return a string that has all self.plugin help messages""" + ## TODO: Utilize the HelpFormatter class in argparse to make + ## these messages better + help = "" + for plugin in self.plugins[1:]: + help += str(plugin.get_plugin_name()) + help += str(plugin.get_help_message()) + help += "\n" + return help + + def preinit(self, args, values): + """Call all plugin ``preinit`` callbacks""" + for plugin in self.plugins: + plugin.preinit(args, values) + + def modify_jobspec(self, args, jobspec, value): + """Call all plugin ``modify_jobspec`` callbacks""" + for plugin in self.plugins: + plugin.modify_jobspec(args, jobspec, value) + + def validate(self, jobspec): + """Call any plugin validate callback""" + for plugin in self.plugins: + plugin.validate(jobspec) From 9c4fd3151322e7f78d2a6bcc331bd4904ce9bf89 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Wed, 11 Dec 2024 15:01:19 -0800 Subject: [PATCH 2/5] python: extend submission cli commands with plugins Problem: Flux submission cli commands don't support plugins. Load any plugins in the default plugin path "{confdir}/cli/plugins/*.py" and call any plugin `add_options()` and `modify_jobspec()` callbacks at appropriate points. --- src/bindings/python/flux/cli/base.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/src/bindings/python/flux/cli/base.py b/src/bindings/python/flux/cli/base.py index ba387c40304b..2fd300424017 100644 --- a/src/bindings/python/flux/cli/base.py +++ b/src/bindings/python/flux/cli/base.py @@ -36,6 +36,7 @@ import flux from flux import debugged, job, util +from flux.cli.plugin import CLIPluginRegistry, CLIPluginValue from flux.constraint.parser import ConstraintParser, ConstraintSyntaxError from flux.idset import IDset from flux.job import JobspecV1, JobWatcher @@ -751,7 +752,10 @@ def __init__(self, prog, usage=None, description=None, exclude_io=False): self.exitcode = 0 self.progress = None self.watcher = None + self.plugins = CLIPluginRegistry().load_plugins(prog) self.parser = self.create_parser(prog, usage, description, exclude_io) + self.group = self.parser.add_argument_group("Options provided by plugins") + self.plugins.add_plugin_option(self.group) @staticmethod def create_parser( @@ -1010,6 +1014,20 @@ def jobspec_create(self, args): """ Create a jobspec from args and return it to caller """ + plugins = {} + if args.plugin: + for provided_arg in args.plugin: + key, val = provided_arg.get_value() + if key in self.plugins.get_plugin_options(): + plugins[key] = val + else: + raise ValueError( + f"Unsupported option provided to -P/--plugin: {key}" + ) + + # plugins MAY raise IndexError if the plugin opt was not provided. + self.plugins.preinit(args, plugins) + jobspec = self.init_jobspec(args) jobspec.environment, env_expand = get_filtered_environment(args.env) @@ -1127,6 +1145,8 @@ def jobspec_create(self, args): for arg in args.add_file: self.handle_add_file_arg(jobspec, arg) + self.plugins.modify_jobspec(args, jobspec, plugins) + return jobspec def submit_async(self, args, jobspec=None): From d27639a11324f7726ed7dcab92c95a97fe19e5c9 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 12 Dec 2024 10:43:43 -0800 Subject: [PATCH 3/5] python: return parsed jobspec from validate_jobspec() Problem: flux.job.Jobspec.validate_jobspec() throws away the parsed jobspec object if validation is successful. However, it could be useful to return the resulting jobspec object. Return a tuple of (result, jobspec) from validate_jobspec(). --- src/bindings/python/flux/job/Jobspec.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/bindings/python/flux/job/Jobspec.py b/src/bindings/python/flux/job/Jobspec.py index 12febd45a961..cadcc2611a8f 100644 --- a/src/bindings/python/flux/job/Jobspec.py +++ b/src/bindings/python/flux/job/Jobspec.py @@ -126,10 +126,10 @@ def validate_jobspec(jobspec, require_version=None): return (1, "Unable to parse JSON") _validate_keys(Jobspec.top_level_keys, jobspec_obj.keys()) if require_version == 1 or jobspec_obj.get("version", 0) == 1: - JobspecV1(**jobspec_obj) + jobspec = JobspecV1(**jobspec_obj) else: - Jobspec(**jobspec_obj) - return True + jobspec = Jobspec(**jobspec_obj) + return True, jobspec class Jobspec(object): From 497cf859aa958a14e11735de24169072d5127f7d Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Thu, 12 Dec 2024 10:46:45 -0800 Subject: [PATCH 4/5] python: use plugins if available in the jobspec validator Problem: Job submission cli plugins offer a validate() callback, but this callback is not used. Load plugins and run all validate callbacks in the jobspec validator plugin. --- .../python/flux/job/validator/plugins/jobspec.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/bindings/python/flux/job/validator/plugins/jobspec.py b/src/bindings/python/flux/job/validator/plugins/jobspec.py index 54e57fd8d19b..7af57667a334 100644 --- a/src/bindings/python/flux/job/validator/plugins/jobspec.py +++ b/src/bindings/python/flux/job/validator/plugins/jobspec.py @@ -17,6 +17,7 @@ import json +from flux.cli.plugin import CLIPluginRegistry from flux.job import validate_jobspec from flux.job.validator import ValidatorPlugin @@ -24,6 +25,7 @@ class Validator(ValidatorPlugin): def __init__(self, parser): self.require_version = 1 + self.plugins = [] parser.add_argument( "--require-version", metavar="V", @@ -47,5 +49,12 @@ def configure(self, args): raise ValueError(f"Invalid argument to --require-version") self.require_version = None + self.plugins = CLIPluginRegistry().load_plugins("validate") + def validate(self, args): - validate_jobspec(json.dumps(args.jobspec), self.require_version) + result, jobspec = validate_jobspec( + json.dumps(args.jobspec), self.require_version + ) + + # validate with any submit cli plugin validate methods + self.plugins.validate(jobspec) From 90fd106d05e4e91e1b2a050ee4add7b1427488ff Mon Sep 17 00:00:00 2001 From: Hobbs Date: Thu, 6 Mar 2025 14:02:36 -0800 Subject: [PATCH 5/5] t: test CLI plugins with example plugins Problem: the CLI plugin interface has no testing. Add some. --- t/Makefile.am | 1 + t/cli-plugins/cli/plugins/bar.py | 39 ++++++++++++++++++++++++++ t/cli-plugins/cli/plugins/gpumode.py | 35 ++++++++++++++++++++++++ t/t2717-python-cli-plugins.t | 41 ++++++++++++++++++++++++++++ 4 files changed, 116 insertions(+) create mode 100644 t/cli-plugins/cli/plugins/bar.py create mode 100644 t/cli-plugins/cli/plugins/gpumode.py create mode 100755 t/t2717-python-cli-plugins.t diff --git a/t/Makefile.am b/t/Makefile.am index 9bcb8048ab13..47172c9b3640 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -227,6 +227,7 @@ TESTSCRIPTS = \ t2713-python-cli-bulksubmit.t \ t2715-python-cli-cancel.t \ t2716-python-cli-batch-conf.t \ + t2717-python-cli-plugins.t \ t2800-jobs-cmd.t \ t2800-jobs-cmd-multiuser.t \ t2800-jobs-recursive.t \ diff --git a/t/cli-plugins/cli/plugins/bar.py b/t/cli-plugins/cli/plugins/bar.py new file mode 100644 index 000000000000..6a88ebaf4b4a --- /dev/null +++ b/t/cli-plugins/cli/plugins/bar.py @@ -0,0 +1,39 @@ +import flux +from flux.cli.plugin import CLIPlugin + + +class CustomFluxionPolicyPlugin(CLIPlugin): + """Accept command-line option that updates fluxion policy in subinstance""" + + def __init__(self, prog): + super().__init__(prog) + self.opt = "match-policy" + self.help = "Set the sched-fluxion-resource.match-policy for a subinstance. See sched-fluxion-resource(5) for available policies." + + def preinit(self, args, values): + pol = None + try: + pol = str(values["match-policy"]) + except KeyError: + pass + if self.prog in ("batch", "alloc") and pol: + args.conf.update(f'sched-fluxion-resource.match-policy="{pol}"') + + def modify_jobspec(self, args, jobspec, values): + try: + pol = str(values["match-policy"]) + except KeyError: + pol = None + else: + if pol: + jobspec.setattr("system.fluxion_match_policy", str(pol)) + + def validate(self, jobspec): + raise ValueError("hi hobbs") + try: + pol = jobspec.attributes["system"]["fluxion_match_policy"] + except KeyError: + return + if pol != "firstnodex": + raise ValueError("Invalid option for fluxion-match-policy") + return diff --git a/t/cli-plugins/cli/plugins/gpumode.py b/t/cli-plugins/cli/plugins/gpumode.py new file mode 100644 index 000000000000..9883ba42f3f3 --- /dev/null +++ b/t/cli-plugins/cli/plugins/gpumode.py @@ -0,0 +1,35 @@ +from flux.cli.plugin import CLIPlugin + + +class RediscoverGPUPlugin(CLIPlugin): + """Add --multi-user option to configure a multi-user subinstance""" + + def __init__(self, prog): + super().__init__(prog) + self.opt = "gpumode" + self.usage = "set gpumode on elcap" + self.help = "Option for setting AMD SMI compute partitioning. Choices are CPX, TPX, or SPX." + self.version = str("0.1.0") + + def preinit(self, args, value): + if self.prog in ("batch", "alloc"): + try: + gpumode = str(value["gpumode"]) + except KeyError: ## necessary? + gpumode = "" + if gpumode == "TPX" or gpumode == "CPX": + args.conf.update("resource.rediscover=true") + elif gpumode == "SPX" or gpumode == "": + pass + else: + raise ValueError("--gpumode can only be set to CPX, TPX, or SPX") + + def modify_jobspec(self, args, jobspec, value): + try: + if value["gpumode"]: + jobspec.setattr("gpumode", value["gpumode"]) + except KeyError: + pass + + def validate(self, jobspec): + return diff --git a/t/t2717-python-cli-plugins.t b/t/t2717-python-cli-plugins.t new file mode 100755 index 000000000000..770242f2e161 --- /dev/null +++ b/t/t2717-python-cli-plugins.t @@ -0,0 +1,41 @@ +#!/bin/sh + +test_description='Test command line plugin interface' + +. $(dirname $0)/sharness.sh + +test_under_flux 4 job + +export FLUX_CLI_PLUGINPATH=${FLUX_BUILD_DIR}/t/cli-plugins/cli/plugins + +test_expect_success 'flux-run: base --help message is formatted correctly' ' + flux run --help +' +test_expect_success 'flux-run: a job that does not provide plugins can run' ' + flux run hostname +' +test_expect_success 'flux-alloc: job that calls -P has jobspec set properly' ' + flux alloc -P match-policy=firstnodex -N1 --dry-run > jobspec1.json && + test $(jq -r .attributes.system.fluxion_match_policy jobspec1.json) = "firstnodex" +' +test_expect_success 'flux-alloc: job with invalid key to -P is rejected' ' + test_must_fail flux alloc -P match-policy=junkpolicy -N1 echo hello 2> err.out && + grep "Unsupported option" err.out +' +test_expect_success 'flux-alloc: job with preinit has config set accordingly' ' + flux alloc -P gpumode=TPX -N1 flux config get > config.json && + test $(jq -r .resource.rediscover config.json) = "true" +' +test_expect_success 'flux-run: job with invalid plugin key is rejected outright' ' + test_must_fail flux run -P junk -P notthis=one -N 1 echo hello 2> out2.err && + grep -q "flux-run: ERROR: Unsupported option" out2.err + +' + +## Test plan: can take multiple plugins, one plugin, or no plugins; +## sets the jobspec correctly; sets the config correctly; +## rejects invalid keys from the command line; +## rejects invalid values provided to valid keys; +## rejects invalid options submitted to python; + +test_done