Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]: Support for command-line plugins #6691

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/bindings/python/flux/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ nobase_fluxpy_PYTHON = \
cli/run.py \
cli/submit.py \
cli/fortune.py \
cli/plugin.py \
core/__init__.py \
core/watchers.py \
core/inner.py \
Expand Down
20 changes: 20 additions & 0 deletions src/bindings/python/flux/cli/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import flux
from flux import debugged, job, util
from flux.cli.plugin import CLIPluginRegistry, CLIPluginValue

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'CLIPluginValue' is not used.
from flux.constraint.parser import ConstraintParser, ConstraintSyntaxError
from flux.idset import IDset
from flux.job import JobspecV1, JobWatcher
Expand Down Expand Up @@ -751,7 +752,10 @@
self.exitcode = 0
self.progress = None
self.watcher = None
self.plugins = CLIPluginRegistry().load_plugins(prog)
self.parser = self.create_parser(prog, usage, description, exclude_io)
self.group = self.parser.add_argument_group("Options provided by plugins")
self.plugins.add_plugin_option(self.group)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the -P, --plugin option should be pulled out of the function and moved to where all the other options are added.


@staticmethod
def create_parser(
Expand Down Expand Up @@ -1010,6 +1014,20 @@
"""
Create a jobspec from args and return it to caller
"""
plugins = {}
if args.plugin:
for provided_arg in args.plugin:
key, val = provided_arg.get_value()
if key in self.plugins.get_plugin_options():
plugins[key] = val
else:
raise ValueError(
f"Unsupported option provided to -P/--plugin: {key}"
)
Comment on lines +1017 to +1026
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better if this code was part of the CLIPluginRegistry class, which should take args and extract out the active plugins, raising an exception if there was any invalid input or a plugin raised an error in preinit.


# plugins MAY raise IndexError if the plugin opt was not provided.
self.plugins.preinit(args, plugins)

jobspec = self.init_jobspec(args)

jobspec.environment, env_expand = get_filtered_environment(args.env)
Expand Down Expand Up @@ -1127,6 +1145,8 @@
for arg in args.add_file:
self.handle_add_file_arg(jobspec, arg)

self.plugins.modify_jobspec(args, jobspec, plugins)

return jobspec

def submit_async(self, args, jobspec=None):
Expand Down
187 changes: 187 additions & 0 deletions src/bindings/python/flux/cli/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
##############################################################
# Copyright 2024 Lawrence Livermore National Security, LLC
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
#
# This file is part of the Flux resource manager framework.
# For details, see https://github.com/flux-framework.
#
# SPDX-License-Identifier: LGPL-3.0
##############################################################
from os import getenv
import glob
from abc import ABC

from flux.conf_builtin import conf_builtin_get
from flux.importer import import_path


class CLIPluginValue:
def __init__(self, value):
temp = value.split("=")
self.key = temp[0]
try:
self.value = temp[1]
except IndexError:
self.value = ""

def get_value(self):
"""Getter for returning the key and value"""
return self.key, self.value


class CLIPlugin(ABC): # pragma no cover
"""Base class for a CLI submission plugin

A plugin should derive from this class and implement one or more
base methods (described below)

Attributes:
opt (str): command-line key passed to -P/--plugin to activate the
extension
usage (str): short --help message for the extension
help (str): long --help message for the extension
version (str): optional version for the plugin, preferred in format
MAJOR.MINOR.PATCH
prog (str): command-line subcommand for which the plugin is active,
e.g. "submit", "run", "alloc", "batch", "bulksubmit"
"""

def __init__(self, prog):
self.opt = None
self.usage = None
self.help = None
self.version = None
self.prog = prog
Comment on lines +49 to +54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be cleaner if the standard attributes were passed in in the initializer, preferably with the name first (I'd argue for a plugin "name" vs "opt" if we're going with -P name=val. Drop the help attribute for now, I'm wondering if we can get that from the class.__doc__ in the future, so let's leave it as a future enhancement.

def __init__(self, name, prog, usage, version=None):

Then plugins initialize themselves with super().__init__("foo", prog, usage="Enable foo")

if prog.startswith("flux "):
self.prog = prog[5:]

def get_plugin_name(self):
"""Return the KEY to invoke the plugin"""
return self.opt

def get_help_message(self):
"""Return the message to print with --help"""
return self.help

Comment on lines +58 to +65
Copy link
Contributor

@grondo grondo Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since name and usage will be required attributes of the class, I'd drop these getters. Callers can just use plugin.name and plugin.usage

def preinit(self, args, values):
"""After parsing options, before jobspec is initialized

Either here, or in validation, plugin extensions should check types and
perform their own validation, since argparse will accept any string.

Args:
args (:py:obj:`Namespace`): Namespace result from
:py:meth:`Argparse.ArgumentParser.parse_args()`.
values (:obj:`Dict`): Dictionary of KEY[=VALUE] pairs returned
by parsing the arguments provided to the -P/--plugin
option. KEY is None if plugin was not invoked. VALUE is the
empty string if the plugin was invoked without VALUE.
Comment on lines +75 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KEY is None if plugin was not invoked.

If values is a dictionary then by "KEY is None" here do you mean "is not set in the dictionary"?

Is values the dictionary of all NAME=VALUE pairs provided to --plugin? Maybe it would be better to use a Namespace here and keep the same semantics as argparse, i.e. args.name is None if the plugin was not invoked, True if it was invoked without an arg, and a string argument that the plugin is responsible for parsing if an argument was given.

"""
pass

def modify_jobspec(self, args, jobspec, values):
"""Allow plugin to modify jobspec

This function is called after arguments have been parsed and jobspec
has mostly been initialized.

Args:
args (:py:obj:`Namespace`): Namespace result from
:py:meth:`Argparse.ArgumentParser.parse_args()`.
jobspec (:obj:`flux.job.Jobspec`): instantiated jobspec object.
This plugin can modify this object directly to enact
changes in the current programs generated jobspec.
values (:obj:`Dict`): Dictionary of KEY[=VALUE] pairs returned
by parsing the arguments provided to the -P/--plugin
option. KEY is None if plugin was not invoked. VALUE is the
empty string if the plugin was invoked without VALUE.
"""
pass

def validate(self, jobspec):
"""Allow a plugin to validate jobspec

This callback may be used by the cli itself or a job validator
to validate the final jobspec.

On an invalid jobspec, this callback should raise ValueError
with a useful error message.

Args:
jobspec (:obj:`flux.job.Jobspec`): jobspec object to validate.
"""
pass


class CLIPluginRegistry:
"""Flux CLI plugin registry helper class"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to drop "helper" here when this docstring gets expanded. This isn't just a helper class but is the plugin registry class.


def __init__(self):
self.plugins = []
etc = conf_builtin_get("confdir")
if etc is None:
raise ValueError("failed to get builtin confdir")
if getenv("FLUX_CLI_PLUGINPATH"):
self.plugindir = getenv("FLUX_CLI_PLUGINPATH")
else:
self.plugindir = f"{etc}/cli/plugins"

def _add_plugins(self, module, program):
entries = [
getattr(module, attr) for attr in dir(module) if not attr.startswith("_")
]
## TODO: as we get each entry, make sure it is NOT speciifically the CLIPlugin base class
for entry in entries:
# only process entries that are an instance of type (i.e. a class)
# and are a subclass of CLIPlugin:
if isinstance(entry, type) and issubclass(entry, CLIPlugin):
self.plugins.append(entry(program))

def load_plugins(self, program):
"""Load all cli plugins from the standard path"""
for path in glob.glob(f"{self.plugindir}/*.py"):
self._add_plugins(import_path(path), program)
return self

def add_plugin_option(self, parser):
"""Add the -P option and list of available keys"""
parser.add_argument(
"-P",
"--plugin",
type=CLIPluginValue,
action="append",
help=f"{self.get_plugin_help_messages()}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you've noticed, this isn't going to display well in --help output.

Let's add a function to organize all the current plugin names and short usage messages into a nice table, then emit that in a custom section of the help output for the commands, e.g.:

Options provided by plugins:
    foo=VAL      Set option foo to VAL.
    bar[=VAL]    Set a bar with optional VAL.

Also, the add_argument() should not go in this class if we're treating them this way. Instead it just be with all the other add_argument() calls in the cli base.py.

)

def get_plugin_options(self):
"""Return all plugin option keys from self.plugins"""
opts = []
for plugin in self.plugins[1:]:
opts.append(str(plugin.get_plugin_name()))
return opts

def get_plugin_help_messages(self):
"""Return a string that has all self.plugin help messages"""
## TODO: Utilize the HelpFormatter class in argparse to make
## these messages better
help = ""
for plugin in self.plugins[1:]:
help += str(plugin.get_plugin_name())
help += str(plugin.get_help_message())
help += "\n"
return help
Comment on lines +162 to +172
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be replaced with a function to pretty print the option+usage table as noted above.


def preinit(self, args, values):
"""Call all plugin ``preinit`` callbacks"""
for plugin in self.plugins:
plugin.preinit(args, values)

def modify_jobspec(self, args, jobspec, value):
"""Call all plugin ``modify_jobspec`` callbacks"""
for plugin in self.plugins:
plugin.modify_jobspec(args, jobspec, value)

def validate(self, jobspec):
"""Call any plugin validate callback"""
for plugin in self.plugins:
plugin.validate(jobspec)
6 changes: 3 additions & 3 deletions src/bindings/python/flux/job/Jobspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ def validate_jobspec(jobspec, require_version=None):
return (1, "Unable to parse JSON")
_validate_keys(Jobspec.top_level_keys, jobspec_obj.keys())
if require_version == 1 or jobspec_obj.get("version", 0) == 1:
JobspecV1(**jobspec_obj)
jobspec = JobspecV1(**jobspec_obj)
else:
Jobspec(**jobspec_obj)
return True
jobspec = Jobspec(**jobspec_obj)
return True, jobspec


class Jobspec(object):
Expand Down
11 changes: 10 additions & 1 deletion src/bindings/python/flux/job/validator/plugins/jobspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

import json

from flux.cli.plugin import CLIPluginRegistry
from flux.job import validate_jobspec
from flux.job.validator import ValidatorPlugin


class Validator(ValidatorPlugin):
def __init__(self, parser):
self.require_version = 1
self.plugins = []
parser.add_argument(
"--require-version",
metavar="V",
Expand All @@ -47,5 +49,12 @@ def configure(self, args):
raise ValueError(f"Invalid argument to --require-version")
self.require_version = None

self.plugins = CLIPluginRegistry().load_plugins("validate")

def validate(self, args):
validate_jobspec(json.dumps(args.jobspec), self.require_version)
result, jobspec = validate_jobspec(
json.dumps(args.jobspec), self.require_version
)

# validate with any submit cli plugin validate methods
self.plugins.validate(jobspec)
1 change: 1 addition & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ TESTSCRIPTS = \
t2713-python-cli-bulksubmit.t \
t2715-python-cli-cancel.t \
t2716-python-cli-batch-conf.t \
t2717-python-cli-plugins.t \
t2800-jobs-cmd.t \
t2800-jobs-cmd-multiuser.t \
t2800-jobs-recursive.t \
Expand Down
39 changes: 39 additions & 0 deletions t/cli-plugins/cli/plugins/bar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import flux
from flux.cli.plugin import CLIPlugin


class CustomFluxionPolicyPlugin(CLIPlugin):
"""Accept command-line option that updates fluxion policy in subinstance"""

def __init__(self, prog):
super().__init__(prog)
self.opt = "match-policy"
self.help = "Set the sched-fluxion-resource.match-policy for a subinstance. See sched-fluxion-resource(5) for available policies."

def preinit(self, args, values):
pol = None
try:
pol = str(values["match-policy"])
except KeyError:
pass
if self.prog in ("batch", "alloc") and pol:
args.conf.update(f'sched-fluxion-resource.match-policy="{pol}"')

def modify_jobspec(self, args, jobspec, values):
try:
pol = str(values["match-policy"])
except KeyError:
pol = None
else:
if pol:
jobspec.setattr("system.fluxion_match_policy", str(pol))

def validate(self, jobspec):
raise ValueError("hi hobbs")
try:
pol = jobspec.attributes["system"]["fluxion_match_policy"]
except KeyError:
return
if pol != "firstnodex":
raise ValueError("Invalid option for fluxion-match-policy")
return
35 changes: 35 additions & 0 deletions t/cli-plugins/cli/plugins/gpumode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from flux.cli.plugin import CLIPlugin


class RediscoverGPUPlugin(CLIPlugin):
"""Add --multi-user option to configure a multi-user subinstance"""

def __init__(self, prog):
super().__init__(prog)
self.opt = "gpumode"
self.usage = "set gpumode on elcap"
self.help = "Option for setting AMD SMI compute partitioning. Choices are CPX, TPX, or SPX."
self.version = str("0.1.0")

def preinit(self, args, value):
if self.prog in ("batch", "alloc"):
try:
gpumode = str(value["gpumode"])
except KeyError: ## necessary?
gpumode = ""
if gpumode == "TPX" or gpumode == "CPX":
args.conf.update("resource.rediscover=true")
elif gpumode == "SPX" or gpumode == "":
pass
else:
raise ValueError("--gpumode can only be set to CPX, TPX, or SPX")

def modify_jobspec(self, args, jobspec, value):
try:
if value["gpumode"]:
jobspec.setattr("gpumode", value["gpumode"])
except KeyError:
pass

def validate(self, jobspec):
return
Loading
Loading