Skip to content

Commit

Permalink
Merge pull request #12219 from vkuznet/fix-issue-12040-fix2
Browse files Browse the repository at this point in the history
Switch updateSiteList to updateElementsByWorkflow API
  • Loading branch information
amaltaro authored Jan 8, 2025
2 parents ede4d00 + 33f4549 commit ea263e8
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 30 deletions.
3 changes: 2 additions & 1 deletion src/python/WMComponent/WorkflowUpdater/SiteListPoller.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ def algorithm(self, parameters=None):
self.logger.info(" siteBlackList %s => %s", wmaBlackList, siteBlackList)
try:
# update local WorkQueue first
self.localWQ.updateSiteLists(wflow, siteWhiteList, siteBlackList)
params = {'SiteWhitelist': siteWhiteList, 'SiteBlacklist': siteBlackList}
self.localWQ.updateElementsByWorkflow(wHelper, params, status=['Available'])
self.logger.info("successfully updated workqueue elements for workflow %s", wflow)
except Exception as ex:
logging.exception("Unexpected exception while updating elements in local workqueue Details:\n%s", str(ex))
Expand Down
29 changes: 0 additions & 29 deletions src/python/WMCore/Services/WorkQueue/WorkQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,34 +237,6 @@ def cancelWorkflow(self, wf):
elements = [x['id'] for x in data.get('rows', []) if x['key'][1] not in nonCancelableElements]
return self.updateElements(*elements, Status='CancelRequested')

def updateSiteLists(self, wf, siteWhiteList=None, siteBlackList=None):
"""
Update site list parameters in elements matching a given workflow and a list of element statuse
:param wf: workflow name
:param siteWhiteList: optional list of strings, new site white list
:param siteBlackList: optional list of strings, new site black list
:return: None
"""
# Update elements in Available status
data = self.db.loadView('WorkQueue', 'jobStatusByRequest',
{'reduce': False})
states = ['Available']
elementsToUpdate = [x['id'] for x in data.get('rows', []) if x['key'][-1] in states and wf in x['key']]
if elementsToUpdate:
self.logger.info("Updating %d elements in status %s for workflow %s", len(elementsToUpdate), states, wf)
self.updateElements(*elementsToUpdate, SiteWhiteList=siteWhiteList, SiteBlackList=siteBlackList)
# Update the spec, if it exists
if self.db.documentExists(wf):
wmspec = WMWorkloadHelper()
# update local workqueue couchDB
wmspec.load(self.hostWithAuth + "/%s/%s/spec" % (self.db.name, wf))
wmspec.setSiteWhiteList(siteWhiteList)
wmspec.setSiteBlackList(siteBlackList)
dummy_values = {'name': wmspec.name()}
wmspec.saveCouch(self.hostWithAuth, self.db.name, dummy_values)
return

def updatePriority(self, wf, priority):
"""Update priority of a workflow, this implies
updating the spec and the priority of the Available elements"""
Expand Down Expand Up @@ -318,7 +290,6 @@ def updateElementsByWorkflow(self, workload, updateParams, status=None):
workload.saveCouchUrl(workload.specUrl())
return


def getWorkflowNames(self, inboxFlag=False):
"""Get workflow names from workqueue db"""
if inboxFlag:
Expand Down

0 comments on commit ea263e8

Please sign in to comment.