Skip to content

Commit

Permalink
Merge branch 'develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
egede authored Feb 14, 2024
2 parents 2af7ca1 + dd0dc7d commit 88c68ae
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 59 deletions.
4 changes: 1 addition & 3 deletions bin/ganga
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ def standardSetup():

#On CVMFS we need to point to the site-packages directory as we don't start the virtualenv
if exeDir.startswith('/cvmfs/ganga.cern.ch', 0, 20):
envDir = exeDir[:-3]
envDir = os.path.join(envDir, 'lib/python3.8/site-packages')
sys.path.insert(0, envDir)
sys.path.insert(0, '/cvmfs/ganga.cern.ch/Ganga/install/miniconda3/envs/ganga/lib/python3.11/site-packages/')

#This function is needed to add the individual ganga modules to the sys path - awful hack but saved rewriting all the code. This is needed for pip installs
pathsToAdd = filter(lambda p : 'ganga' in os.listdir(p),
Expand Down
5 changes: 5 additions & 0 deletions doc/UserGuide/InstallAndBasicUsage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ The best is to install inside a virtual environment
# Install Ganga
python3 -m pip install ganga
Note that if you want to use the Dirac or LHCb plugins you need to request the necessary dependencies when installing. i.e. for LHCb:

.. code-block:: bash
python3 -m pip install [Dirac,LHCb]ganga
To install pip locally if it's not on your system and you don't have admin access please consult: https://pip.pypa.io/en/stable/installing/

Now each time you want to use Ganga in a new shell, you have to activate the virtual environment:
Expand Down
10 changes: 1 addition & 9 deletions ganga/GangaCore/Runtime/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,6 @@ def parseOptions(self):

parser = OptionParser(usage, version=_gangaVersion)

parser.add_option("-i", dest="force_interactive", action="store_true",
help='enter interactive mode after running script')

parser.add_option("--config", dest="config_file", action="store", metavar="FILE", default=None,
help=('read user configuration from FILE, overrides the GANGA_CONFIG_FILE environment variable. '
'Default: ~/.gangarc'))
Expand Down Expand Up @@ -320,7 +317,7 @@ def parseOptions(self):
parser.add_option("--daemon", dest='daemon', action="store_true", default=False,
help='run Ganga as service.')

parser.set_defaults(force_interactive=False, config_file=None,
parser.set_defaults(config_file=None,
force_loglevel=None, rexec=1, monitoring=1, prompt=1, generate_config=None)
parser.disable_interspersed_args()

Expand Down Expand Up @@ -353,11 +350,6 @@ def file_opens(f, message):
open_file = file_opens(
self.options.config_file, 'reading configuration file')
open_file.close()
# we run in the batch mode if a script has been specified and other
# options (such as -i) do not force it
if len(self.args) > 0:
if not self.options.force_interactive:
self.interactive = False

# Can't check here if the file is readable, because the path isn't known
# file_opens(self.args[0],'reading script')
Expand Down
54 changes: 41 additions & 13 deletions ganga/GangaDirac/Lib/Backends/DiracBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -1094,13 +1094,20 @@ async def _internal_job_finalisation(job, updated_dirac_status):
DiracBase.finalise_jobs(job.master.subjobs, job.master.backend.downloadSandbox)

if updated_dirac_status == 'completed':
await DiracBase.complete_dirac_job(job)
try:
await DiracBase.complete_dirac_job(job)
except Exception as err:
raise GangaDiracError(f"Error while finalising job {job.id}: {str(err)}")

elif updated_dirac_status == 'failed':
await DiracBase.finalise_failed_job(job)
if updated_dirac_status == 'failed':
try:
await DiracBase.finalise_failed_job(job)
except Exception as err:
raise GangaDiracError(f"Error while finalising failed job {job.id}: {str(err)}")

else:
logger.error("Job #%s Unexpected dirac status '%s' encountered" % (job.getFQID('.'), updated_dirac_status))
if updated_dirac_status not in ['completing', 'failed', 'killed', 'removed', 'completed']:
logger.error("Job #%s Unexpected dirac status '%s' encountered" % (job.getFQID('.'), updated_dirac_status))

@staticmethod
async def finalise_failed_job(job):
Expand All @@ -1116,12 +1123,20 @@ async def finalise_failed_job(job):

# if requested try downloading outputsandbox anyway
if configDirac['failed_sandbox_download'] and not job.backend.status == 'Killed':
dm = AsyncDiracManager()
await dm.execute(getOutputSandbox, args_dict={
'id': job.backend.id,
'outputDir': job.getOutputWorkspace().getPath(),
'unpack': job.backend.unpackOutputSandbox
}, cred_req=job.backend.credential_requirements)
try:
dm = AsyncDiracManager()
await dm.execute(
getOutputSandbox,
args_dict={
"id": job.backend.id,
"outputDir": job.getOutputWorkspace().getPath(),
"unpack": job.backend.unpackOutputSandbox,
},
cred_req=job.backend.credential_requirements,
)
except Exception as err:
raise GangaDiracError(str(err))


@staticmethod
async def complete_dirac_job(job):
Expand Down Expand Up @@ -1312,7 +1327,12 @@ async def job_finalisation(job, updated_dirac_status):
job.been_queued = True
task = monitoring_component.loop.create_task(
DiracBase._internal_job_finalisation(job, updated_dirac_status))
await task

try:
await task
except GangaDiracError as err:
logger.error("Error in Monitoring Loop, jobs on the DIRAC backend may not update")
logger.error(err)
err = task.exception()

if not err:
Expand All @@ -1335,7 +1355,7 @@ async def job_finalisation(job, updated_dirac_status):
logger.error("Unable to finalise job %s after %s retries due to error:\n%s" %
(job.getFQID('.'), str(count), str(err)))
job.force_status('failed')
raise
raise

time.sleep(sleep_length)

Expand Down Expand Up @@ -1381,6 +1401,13 @@ def finalise_jobs(allJobs, downloadSandbox=True):
'downloadSandbox': downloadSandbox
}))

@staticmethod
def job_exception_handler(job_id, task):
exception = task.exception()
if exception:
# Handle the exception, e.g., log it
logger.error(f"Error in DIRAC job {job_id}: {exception}")

@staticmethod
def requeue_dirac_finished_jobs(requeue_jobs, finalised_statuses):
from GangaCore.Core import monitoring_component
Expand All @@ -1405,8 +1432,9 @@ def requeue_dirac_finished_jobs(requeue_jobs, finalised_statuses):
continue
else:
j.been_queued = True
monitoring_component.loop.create_task(
task = monitoring_component.loop.create_task(
DiracBase.job_finalisation(j, finalised_statuses[j.backend.status]))
task.add_done_callback(lambda task, job_id=j.id: DiracBase.job_exception_handler(job_id, task))

# @trace_and_save
@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion ganga/GangaDirac/Lib/RTHandlers/ExeDiracRTHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def prepare(self, app, appsubconfig, appmasterconfig, jobmasterconfig):


def exe_script_template():
script_template = """#!/usr/bin/env python2
script_template = """#!/usr/bin/env python3
'''Script to run Executable application'''
import sys, os
import subprocess
Expand Down
40 changes: 20 additions & 20 deletions ganga/GangaDirac/Lib/Server/DiracCommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,7 @@ def getJobGroupJobs(jg):
@diracCommand
def kill(id):
''' Kill a given DIRAC Job ID within DIRAC '''
stat_results = dirac.getJobStatus(id)
stat = stat_results['Value'][id].get('Status', None)
if stat == 'Waiting':
return dirac.deleteJob(id)
else:
return dirac.killJob(id)
return dirac.deleteJob(id)


@diracCommand
Expand Down Expand Up @@ -52,9 +47,9 @@ def ping(system, service):
def removeFile(lfn):
''' Remove a given LFN from the DFC'''
ret = {}
if type(lfn) is list:
for l in lfn:
ret.update(dirac.removeFile(l))
if isinstance(lfn, list):
for _l in lfn:
ret.update(dirac.removeFile(_l))
else:
ret.update(dirac.removeFile(lfn))
return ret
Expand Down Expand Up @@ -165,7 +160,8 @@ def getOutputSandbox(id, outputDir=os.getcwd(), unpack=True, oversized=True, noJ

os.system(
'for file in $(ls %s/*Ganga_*.log); do ln -s ${file} %s/stdout; break; done' % (outputDir, outputDir))
# So the download failed. Maybe the sandbox was oversized and stored on the grid. Check in the job parameters and download it
# So the download failed. Maybe the sandbox was oversized and stored on
# the grid. Check in the job parameters and download it
else:
parameters = dirac.getJobParameters(id)
if parameters is not None and parameters.get('OK', False):
Expand Down Expand Up @@ -222,7 +218,7 @@ def getOutputDataLFNs(id, pipe_out=True):
if 'UploadedOutputData' in parameters:
lfn_list = parameters['UploadedOutputData']
import re
lfns = re.split(',\s*', lfn_list)
lfns = re.split(',\\s*', lfn_list)
if sandbox is not None and sandbox in lfns:
lfns.remove(sandbox)
ok = True
Expand Down Expand Up @@ -252,8 +248,10 @@ def normCPUTime(id, pipe_out=True):

@diracCommand
def finished_job(id, outputDir=os.getcwd(), unpack=True, oversized=True, noJobDir=True, downloadSandbox=True):
''' Nesting function to reduce number of calls made against DIRAC when finalising a job, takes arguments such as getOutputSandbox
Returns the CPU time of the job as a dict, the output sandbox information in another dict and a dict of the LFN of any uploaded data'''
''' Nesting function to reduce number of calls made against DIRAC when finalising a job,
takes arguments such as getOutputSandbox
Returns the CPU time of the job as a dict, the output sandbox information in another dict
and a dict of the LFN of any uploaded data'''
out_cpuTime = normCPUTime(id, pipe_out=False)
if downloadSandbox:
out_sandbox = getOutputSandbox(id, outputDir, unpack, oversized, noJobDir, pipe_out=False)
Expand All @@ -266,7 +264,8 @@ def finished_job(id, outputDir=os.getcwd(), unpack=True, oversized=True, noJobDi

@diracCommand
def finaliseJobs(inputDict, downloadSandbox=True, oversized=True, noJobDir=True):
''' A function to get the necessaries to finalise a whole bunch of jobs. Returns a dict of job information and a dict of stati.'''
''' A function to get the necessaries to finalise a whole bunch of jobs.
Returns a dict of job information and a dict of stati.'''
returnDict = {}
statusList = dirac.getJobStatus(list(inputDict))
for diracID in inputDict:
Expand All @@ -284,7 +283,8 @@ def finaliseJobs(inputDict, downloadSandbox=True, oversized=True, noJobDir=True)

@diracCommand
def status(job_ids, statusmapping, pipe_out=True):
'''Function to check the statuses and return the Ganga status of a job after looking it's DIRAC status against a Ganga one'''
'''Function to check the statuses and return the Ganga status of a job after looking
it's DIRAC status against a Ganga one'''
# Translate between the many statuses in DIRAC and the few in Ganga

# return {'OK':True, 'Value':[['WIP', 'WIP', 'WIP', 'WIP', 'WIP']]}
Expand Down Expand Up @@ -312,7 +312,7 @@ def status(job_ids, statusmapping, pipe_out=True):
from DIRAC.Core.DISET.RPCClient import RPCClient
monitoring = RPCClient('WorkloadManagement/JobMonitoring')
app_status = monitoring.getJobAttributes(_id)['Value']['ApplicationStatus']
except:
except BaseException:
app_status = "unknown ApplicationStatus"

status_list.append([minor_status, dirac_status, dirac_site, ganga_status, app_status])
Expand Down Expand Up @@ -344,9 +344,9 @@ def getStateTime(id, status, pipe_out=True):
print("%s" % None)
return

for l in L:
if checkstr in l[0]:
T = datetime.datetime(*(time.strptime(l[3], "%Y-%m-%d %H:%M:%S")[0:6]))
for _l in L:
if checkstr in _l[0]:
T = datetime.datetime(*(time.strptime(_l[3], "%Y-%m-%d %H:%M:%S")[0:6]))
return T

return None
Expand Down Expand Up @@ -475,7 +475,7 @@ def listFiles(baseDir, minAge=None):
withMetaData = False
cutoffTime = datetime.utcnow()
import re
r = re.compile('\d:\d:\d')
r = re.compile('\\d:\\d:\\d')
if r.match(minAge):
withMetaData = True
timeList = minAge.split(':')
Expand Down
18 changes: 5 additions & 13 deletions ganga/GangaRelease/tools/ganga-cvmfs-install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,22 @@

cvmfs_server transaction ganga.cern.ch

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/cvmfs/sft.cern.ch/lcg/releases/LCG_100/Python/3.8.6/x86_64-centos7-gcc9-opt/lib
conda activate ganga

cd /cvmfs/ganga.cern.ch/Ganga/install

/cvmfs/sft.cern.ch/lcg/releases/LCG_100/Python/3.8.6/x86_64-centos7-gcc9-opt/bin/python3 -m venv $1
python -m venv $1

. $1/bin/activate

pip install --upgrade pip setuptools

pip install ganga[LHCb]@git+https://github.com/ganga-devs/ganga.git@$1

sed -i "23i\
lib_string = '/cvmfs/sft.cern.ch/lcg/views/LCG_100/x86_64-centos7-gcc9-opt/lib64:/cvmfs/sft.cern.ch/lcg/views/LCG_100/x86_64-centos7-gcc9-opt/lib:/cvmfs/sft.cern.ch/lcg/releases/gcc/9.2.0-afc57/x86_64-centos7/lib:/cvmfs/sft.cern.ch/lcg/releases/gcc/9.2.0-afc57/x86_64-centos7/lib64'\n\
sys.path.append('/cvmfs/sft.cern.ch/lcg/views/LCG_100/x86_64-centos7-gcc9-opt/lib/python3.8/site-packages')\n\
if not 'LD_LIBRARY_PATH' in os.environ.keys():\n\
os.environ['LD_LIBRARY_PATH'] = lib_string\n\
os.execv(sys.argv[0], sys.argv)\n\
elif not lib_string in os.environ['LD_LIBRARY_PATH']:\n\
os.environ['LD_LIBRARY_PATH'] += ':'+lib_string\n\
os.execv(sys.argv[0], sys.argv)" $1/bin/ganga
pip install ganga[LHCb,Dirac]@git+https://github.com/ganga-devs/ganga.git@$1

deactivate

conda deactivate

rm -f /cvmfs/ganga.cern.ch/Ganga/install/LATEST

ln -s /cvmfs/ganga.cern.ch/Ganga/install/$1 /cvmfs/ganga.cern.ch/Ganga/install/LATEST
Expand Down

0 comments on commit 88c68ae

Please sign in to comment.