diff --git a/src/python/WMCore/ReqMgr/Service/Request.py b/src/python/WMCore/ReqMgr/Service/Request.py index caa9d4ed67..eaedf171e6 100644 --- a/src/python/WMCore/ReqMgr/Service/Request.py +++ b/src/python/WMCore/ReqMgr/Service/Request.py @@ -423,31 +423,19 @@ def _handleNoStatusUpdate(self, workload, request_args, dn): cherrypy.log('Updated workqueue statistics of "{}", with: {}'.format(workload.name(), reqArgs)) return report - reqArgsNothandled = [] - for reqArg in reqArgs: - if reqArg == 'RequestPriority': - validate_request_priority(reqArgs) - # must update three places: GQ elements, workload_cache and workload spec - self.gq_service.updatePriority(workload.name(), reqArgs['RequestPriority']) - workload.setPriority(reqArgs['RequestPriority']) - cherrypy.log('Updated priority of "{}" to: {}'.format(workload.name(), reqArgs['RequestPriority'])) - elif reqArg == "SiteWhitelist": - workload.setSiteWhitelist(reqArgs["SiteWhitelist"]) - cherrypy.log('Updated SiteWhitelist of "{}", with: {}'.format(workload.name(), reqArgs['SiteWhitelist'])) - elif reqArg == "SiteBlacklist": - workload.setSiteBlacklist(reqArgs["SiteBlacklist"]) - cherrypy.log('Updated SiteBlacklist of "{}", with: {}'.format(workload.name(), reqArgs['SiteBlacklist'])) - else: - reqArgsNothandled.append(reqArg) - cherrypy.log("Unhandled argument for no-status update: %s" % reqArg) + # Update all workload parameters based on the full reqArgs dictionary + workload.updateWorkloadArgs(reqArgs) - if reqArgsNothandled: - msg = "There were unhandled arguments left for no-status update: %s" % reqArgsNothandled - raise InvalidSpecParameterValue(msg) + # Commit the changes of the current workload object to the database: + workload.saveCouchUrl(workload.specUrl()) # Commit the changes of the current workload object to the database: workload.saveCouchUrl(workload.specUrl()) + # Commit all Global WorkQueue changes per workflow in a single go: + self.gq_service.updateElementsByWorkflow(workload.name(), reqArgs) + + # Finally update ReqMgr Database report = self.reqmgr_db_service.updateRequestProperty(workload.name(), reqArgs, dn) return report diff --git a/src/python/WMCore/Services/WorkQueue/WorkQueue.py b/src/python/WMCore/Services/WorkQueue/WorkQueue.py index 24bac2584d..e4b37efc0e 100644 --- a/src/python/WMCore/Services/WorkQueue/WorkQueue.py +++ b/src/python/WMCore/Services/WorkQueue/WorkQueue.py @@ -256,6 +256,41 @@ def updatePriority(self, wf, priority): wmspec.saveCouch(self.hostWithAuth, self.db.name, dummy_values) return + def updateElementsByWorkflow(self, wf, updateParams, status=None): + """ + Update all available WorkQueue elements of a given workflow with a set + of arguments provided through the `updateParams` dictionary + :param wf: The workflow name + :param updateParams: A dictionary with parameters to be updated + :param status: A list of allowed WorkQueue elements statuses to be considered for updating + Default: None - do not filter by status + :return: No value, raises exceptions from internal methods in case of errors. + """ + # Fetch the whole view with Workqueue elements per given workflow + data = self.db.loadView('WorkQueue', 'elementsDetailByWorkflowAndStatus', + {'startkey': [wf], 'endkey': [wf, {}], + 'reduce': False}) + + # Fetch only a list of WorkQueue element Ids && Filter them by allowed status + if status: + elementsToUpdate = [x['id'] for x in data.get('rows', []) if x['value']['Status'] in status] + else: + elementsToUpdate = [x['id'] for x in data.get('rows', [])] + + # Update all WorkQueue elements with the parameters provided in a single push + if elementsToUpdate: + self.updateElements(*elementsToUpdate, **updateParams) + + # Update the spec, if it exists + if self.db.documentExists(wf): + wmspec = WMWorkloadHelper() + wmspec.load(self.hostWithAuth + "/%s/%s/spec" % (self.db.name, wf)) + wmspec.updateWorkloadArgs(updateParams) + dummy_values = {'name': wmspec.name()} + wmspec.saveCouch(self.hostWithAuth, self.db.name, dummy_values) + return + + def getWorkflowNames(self, inboxFlag=False): """Get workflow names from workqueue db""" if inboxFlag: diff --git a/src/python/WMCore/WMSpec/WMWorkload.py b/src/python/WMCore/WMSpec/WMWorkload.py index 7aee473d81..b220c7a6e9 100644 --- a/src/python/WMCore/WMSpec/WMWorkload.py +++ b/src/python/WMCore/WMSpec/WMWorkload.py @@ -9,6 +9,9 @@ from builtins import next, range from future.utils import viewitems, viewvalues +from collections import namedtuple +import inspect + from Utils.Utilities import strToBool from WMCore.Configuration import ConfigSection @@ -59,6 +62,8 @@ class WMWorkloadException(WMException): pass +setterTuple = namedtuple('SetterTuple', ['reqArg', 'setterFunc', 'setterSignature']) + class WMWorkloadHelper(PersistencyHelper): """ _WMWorkloadHelper_ @@ -68,6 +73,52 @@ class WMWorkloadHelper(PersistencyHelper): def __init__(self, wmWorkload=None): self.data = wmWorkload + self.settersMap = {} + + def updateWorkloadArgs(self, reqArgs): + """ + Method to take a dictionary of arguments of the type: + {reqArg1: value, + reqArg2: value, + ...} + and update the workload by a predefined map of reqArg to setter methods. + :param reqArgs: A Dictionary of request arguments to be updated + :return: Nothing, Raises an error of type WMWorkloadException if + fails to apply the proper setter method + """ + # NOTE: So far we support only a single argument setter methods, like + # setSiteWhitelist or setPriority. This may change in the future, + # but it will require a change in the logic of how we validate and + # call the proper setter methods bellow. + + # populate the current instance settersMap + self.settersMap['RequestPriority'] = setterTuple('RequestPriority', self.setPriority, inspect.signature(self.setPriority)) + self.settersMap['SiteBlacklist'] = setterTuple('SiteBlacklist', self.setSiteBlacklist, inspect.signature(self.setSiteBlacklist)) + self.settersMap['SiteWhitelist'] = setterTuple('SiteWhitelist', self.setSiteWhitelist, inspect.signature(self.setSiteWhitelist)) + + # First validate if we can properly call the setter function given the reqArgs passed. + for reqArg, argValue in reqArgs.items(): + if not self.settersMap.get(reqArg, None): + msg = f"Unsupported or missing setter method for updating reqArg: {reqArg}." + raise WMWorkloadException(msg) + try: + self.settersMap[reqArg].setterSignature.bind(argValue) + except TypeError as ex: + msg = f"Setter's method signature does not match the method calls we currently support: Error: req{str(ex)}" + raise WMWorkloadException(msg) from None + + # Now go through the reqArg again and call every setter method according to the map + for reqArg, argValue in reqArgs.items(): + try: + self.settersMap[reqArg].setterFunc(argValue) + except Exception as ex: + currFrame = inspect.currentframe() + argsInfo = inspect.getargvalues(currFrame) + argVals = {arg: argsInfo.locals.get(arg) for arg in argsInfo.args} + msg = f"Failure while calling setter method {self.settersMap[reqArg].setterFunc.__name__} " + msg += f"With arguments: {argVals}" + msg += f"Full exception string: {str(ex)}" + raise WMWorkloadException(msg) from None def setSpecUrl(self, url): self.data.persistency.specUrl = sanitizeURL(url)["url"]