-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP]: Support for command-line plugins #6691
base: master
Are you sure you want to change the base?
Changes from all commits
e5bffca
9c4fd31
d27639a
497cf85
90fd106
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,7 @@ | |
|
||
import flux | ||
from flux import debugged, job, util | ||
from flux.cli.plugin import CLIPluginRegistry, CLIPluginValue | ||
from flux.constraint.parser import ConstraintParser, ConstraintSyntaxError | ||
from flux.idset import IDset | ||
from flux.job import JobspecV1, JobWatcher | ||
|
@@ -751,7 +752,10 @@ | |
self.exitcode = 0 | ||
self.progress = None | ||
self.watcher = None | ||
self.plugins = CLIPluginRegistry().load_plugins(prog) | ||
self.parser = self.create_parser(prog, usage, description, exclude_io) | ||
self.group = self.parser.add_argument_group("Options provided by plugins") | ||
self.plugins.add_plugin_option(self.group) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding the |
||
|
||
@staticmethod | ||
def create_parser( | ||
|
@@ -1010,6 +1014,20 @@ | |
""" | ||
Create a jobspec from args and return it to caller | ||
""" | ||
plugins = {} | ||
if args.plugin: | ||
for provided_arg in args.plugin: | ||
key, val = provided_arg.get_value() | ||
if key in self.plugins.get_plugin_options(): | ||
plugins[key] = val | ||
else: | ||
raise ValueError( | ||
f"Unsupported option provided to -P/--plugin: {key}" | ||
) | ||
Comment on lines
+1017
to
+1026
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be better if this code was part of the CLIPluginRegistry class, which should take args and extract out the active plugins, raising an exception if there was any invalid input or a plugin raised an error in preinit. |
||
|
||
# plugins MAY raise IndexError if the plugin opt was not provided. | ||
self.plugins.preinit(args, plugins) | ||
|
||
jobspec = self.init_jobspec(args) | ||
|
||
jobspec.environment, env_expand = get_filtered_environment(args.env) | ||
|
@@ -1127,6 +1145,8 @@ | |
for arg in args.add_file: | ||
self.handle_add_file_arg(jobspec, arg) | ||
|
||
self.plugins.modify_jobspec(args, jobspec, plugins) | ||
|
||
return jobspec | ||
|
||
def submit_async(self, args, jobspec=None): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,187 @@ | ||
############################################################## | ||
# Copyright 2024 Lawrence Livermore National Security, LLC | ||
# (c.f. AUTHORS, NOTICE.LLNS, COPYING) | ||
# | ||
# This file is part of the Flux resource manager framework. | ||
# For details, see https://github.com/flux-framework. | ||
# | ||
# SPDX-License-Identifier: LGPL-3.0 | ||
############################################################## | ||
from os import getenv | ||
import glob | ||
from abc import ABC | ||
|
||
from flux.conf_builtin import conf_builtin_get | ||
from flux.importer import import_path | ||
|
||
|
||
class CLIPluginValue: | ||
def __init__(self, value): | ||
temp = value.split("=") | ||
self.key = temp[0] | ||
try: | ||
self.value = temp[1] | ||
except IndexError: | ||
self.value = "" | ||
|
||
def get_value(self): | ||
"""Getter for returning the key and value""" | ||
return self.key, self.value | ||
|
||
|
||
class CLIPlugin(ABC): # pragma no cover | ||
"""Base class for a CLI submission plugin | ||
|
||
A plugin should derive from this class and implement one or more | ||
base methods (described below) | ||
|
||
Attributes: | ||
opt (str): command-line key passed to -P/--plugin to activate the | ||
extension | ||
usage (str): short --help message for the extension | ||
help (str): long --help message for the extension | ||
version (str): optional version for the plugin, preferred in format | ||
MAJOR.MINOR.PATCH | ||
prog (str): command-line subcommand for which the plugin is active, | ||
e.g. "submit", "run", "alloc", "batch", "bulksubmit" | ||
""" | ||
|
||
def __init__(self, prog): | ||
self.opt = None | ||
self.usage = None | ||
self.help = None | ||
self.version = None | ||
self.prog = prog | ||
Comment on lines
+49
to
+54
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be cleaner if the standard attributes were passed in in the initializer, preferably with the name first (I'd argue for a plugin "name" vs "opt" if we're going with def __init__(self, name, prog, usage, version=None): Then plugins initialize themselves with |
||
if prog.startswith("flux "): | ||
self.prog = prog[5:] | ||
|
||
def get_plugin_name(self): | ||
"""Return the KEY to invoke the plugin""" | ||
return self.opt | ||
|
||
def get_help_message(self): | ||
"""Return the message to print with --help""" | ||
return self.help | ||
|
||
Comment on lines
+58
to
+65
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since |
||
def preinit(self, args, values): | ||
"""After parsing options, before jobspec is initialized | ||
|
||
Either here, or in validation, plugin extensions should check types and | ||
perform their own validation, since argparse will accept any string. | ||
|
||
Args: | ||
args (:py:obj:`Namespace`): Namespace result from | ||
:py:meth:`Argparse.ArgumentParser.parse_args()`. | ||
values (:obj:`Dict`): Dictionary of KEY[=VALUE] pairs returned | ||
by parsing the arguments provided to the -P/--plugin | ||
option. KEY is None if plugin was not invoked. VALUE is the | ||
empty string if the plugin was invoked without VALUE. | ||
Comment on lines
+75
to
+78
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If Is |
||
""" | ||
pass | ||
|
||
def modify_jobspec(self, args, jobspec, values): | ||
"""Allow plugin to modify jobspec | ||
|
||
This function is called after arguments have been parsed and jobspec | ||
has mostly been initialized. | ||
|
||
Args: | ||
args (:py:obj:`Namespace`): Namespace result from | ||
:py:meth:`Argparse.ArgumentParser.parse_args()`. | ||
jobspec (:obj:`flux.job.Jobspec`): instantiated jobspec object. | ||
This plugin can modify this object directly to enact | ||
changes in the current programs generated jobspec. | ||
values (:obj:`Dict`): Dictionary of KEY[=VALUE] pairs returned | ||
by parsing the arguments provided to the -P/--plugin | ||
option. KEY is None if plugin was not invoked. VALUE is the | ||
empty string if the plugin was invoked without VALUE. | ||
""" | ||
pass | ||
|
||
def validate(self, jobspec): | ||
"""Allow a plugin to validate jobspec | ||
|
||
This callback may be used by the cli itself or a job validator | ||
to validate the final jobspec. | ||
|
||
On an invalid jobspec, this callback should raise ValueError | ||
with a useful error message. | ||
|
||
Args: | ||
jobspec (:obj:`flux.job.Jobspec`): jobspec object to validate. | ||
""" | ||
pass | ||
|
||
|
||
class CLIPluginRegistry: | ||
"""Flux CLI plugin registry helper class""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note to drop "helper" here when this docstring gets expanded. This isn't just a helper class but is the plugin registry class. |
||
|
||
def __init__(self): | ||
self.plugins = [] | ||
etc = conf_builtin_get("confdir") | ||
if etc is None: | ||
raise ValueError("failed to get builtin confdir") | ||
if getenv("FLUX_CLI_PLUGINPATH"): | ||
self.plugindir = getenv("FLUX_CLI_PLUGINPATH") | ||
else: | ||
self.plugindir = f"{etc}/cli/plugins" | ||
|
||
def _add_plugins(self, module, program): | ||
entries = [ | ||
getattr(module, attr) for attr in dir(module) if not attr.startswith("_") | ||
] | ||
## TODO: as we get each entry, make sure it is NOT speciifically the CLIPlugin base class | ||
for entry in entries: | ||
# only process entries that are an instance of type (i.e. a class) | ||
# and are a subclass of CLIPlugin: | ||
if isinstance(entry, type) and issubclass(entry, CLIPlugin): | ||
self.plugins.append(entry(program)) | ||
|
||
def load_plugins(self, program): | ||
"""Load all cli plugins from the standard path""" | ||
for path in glob.glob(f"{self.plugindir}/*.py"): | ||
self._add_plugins(import_path(path), program) | ||
return self | ||
|
||
def add_plugin_option(self, parser): | ||
"""Add the -P option and list of available keys""" | ||
parser.add_argument( | ||
"-P", | ||
"--plugin", | ||
type=CLIPluginValue, | ||
action="append", | ||
help=f"{self.get_plugin_help_messages()}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As you've noticed, this isn't going to display well in Let's add a function to organize all the current plugin names and short usage messages into a nice table, then emit that in a custom section of the help output for the commands, e.g.:
Also, the |
||
) | ||
|
||
def get_plugin_options(self): | ||
"""Return all plugin option keys from self.plugins""" | ||
opts = [] | ||
for plugin in self.plugins[1:]: | ||
opts.append(str(plugin.get_plugin_name())) | ||
return opts | ||
|
||
def get_plugin_help_messages(self): | ||
"""Return a string that has all self.plugin help messages""" | ||
## TODO: Utilize the HelpFormatter class in argparse to make | ||
## these messages better | ||
help = "" | ||
for plugin in self.plugins[1:]: | ||
help += str(plugin.get_plugin_name()) | ||
help += str(plugin.get_help_message()) | ||
help += "\n" | ||
return help | ||
Comment on lines
+162
to
+172
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be replaced with a function to pretty print the option+usage table as noted above. |
||
|
||
def preinit(self, args, values): | ||
"""Call all plugin ``preinit`` callbacks""" | ||
for plugin in self.plugins: | ||
plugin.preinit(args, values) | ||
|
||
def modify_jobspec(self, args, jobspec, value): | ||
"""Call all plugin ``modify_jobspec`` callbacks""" | ||
for plugin in self.plugins: | ||
plugin.modify_jobspec(args, jobspec, value) | ||
|
||
def validate(self, jobspec): | ||
"""Call any plugin validate callback""" | ||
for plugin in self.plugins: | ||
plugin.validate(jobspec) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
import flux | ||
from flux.cli.plugin import CLIPlugin | ||
|
||
|
||
class CustomFluxionPolicyPlugin(CLIPlugin): | ||
"""Accept command-line option that updates fluxion policy in subinstance""" | ||
|
||
def __init__(self, prog): | ||
super().__init__(prog) | ||
self.opt = "match-policy" | ||
self.help = "Set the sched-fluxion-resource.match-policy for a subinstance. See sched-fluxion-resource(5) for available policies." | ||
|
||
def preinit(self, args, values): | ||
pol = None | ||
try: | ||
pol = str(values["match-policy"]) | ||
except KeyError: | ||
pass | ||
if self.prog in ("batch", "alloc") and pol: | ||
args.conf.update(f'sched-fluxion-resource.match-policy="{pol}"') | ||
|
||
def modify_jobspec(self, args, jobspec, values): | ||
try: | ||
pol = str(values["match-policy"]) | ||
except KeyError: | ||
pol = None | ||
else: | ||
if pol: | ||
jobspec.setattr("system.fluxion_match_policy", str(pol)) | ||
|
||
def validate(self, jobspec): | ||
raise ValueError("hi hobbs") | ||
try: | ||
pol = jobspec.attributes["system"]["fluxion_match_policy"] | ||
except KeyError: | ||
return | ||
if pol != "firstnodex": | ||
raise ValueError("Invalid option for fluxion-match-policy") | ||
return |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
from flux.cli.plugin import CLIPlugin | ||
|
||
|
||
class RediscoverGPUPlugin(CLIPlugin): | ||
"""Add --multi-user option to configure a multi-user subinstance""" | ||
|
||
def __init__(self, prog): | ||
super().__init__(prog) | ||
self.opt = "gpumode" | ||
self.usage = "set gpumode on elcap" | ||
self.help = "Option for setting AMD SMI compute partitioning. Choices are CPX, TPX, or SPX." | ||
self.version = str("0.1.0") | ||
|
||
def preinit(self, args, value): | ||
if self.prog in ("batch", "alloc"): | ||
try: | ||
gpumode = str(value["gpumode"]) | ||
except KeyError: ## necessary? | ||
gpumode = "" | ||
if gpumode == "TPX" or gpumode == "CPX": | ||
args.conf.update("resource.rediscover=true") | ||
elif gpumode == "SPX" or gpumode == "": | ||
pass | ||
else: | ||
raise ValueError("--gpumode can only be set to CPX, TPX, or SPX") | ||
|
||
def modify_jobspec(self, args, jobspec, value): | ||
try: | ||
if value["gpumode"]: | ||
jobspec.setattr("gpumode", value["gpumode"]) | ||
except KeyError: | ||
pass | ||
|
||
def validate(self, jobspec): | ||
return |
Check notice
Code scanning / CodeQL
Unused import Note