diff --git a/ganga/GangaND280/Highland/Highland.py b/ganga/GangaND280/Highland/Highland.py index 8924b1127d..2f2d33e85d 100644 --- a/ganga/GangaND280/Highland/Highland.py +++ b/ganga/GangaND280/Highland/Highland.py @@ -16,6 +16,7 @@ from GangaCore.GPIDev.Schema import Schema, SimpleItem, Version from GangaCore.Utility.Config import getConfig from GangaCore.Utility.files import expandfilename +from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers shared_path = os.path.join( expandfilename(getConfig('Configuration')['gangadir']), @@ -82,19 +83,6 @@ class Highland(IPrepareApp): _name = 'Highland' _scriptname = None _exportmethods = ['prepare'] - _GUIPrefs = [ - {'attribute': 'exe', 'widget': 'File'}, - {'attribute': 'args', 'widget': 'String_List'}, - {'attribute': 'outputfile', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] - - _GUIAdvancedPrefs = [ - {'attribute': 'exe', 'widget': 'File'}, - {'attribute': 'args', 'widget': 'String_List'}, - {'attribute': 'outputfile', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] def __init__(self): super(Highland, self).__init__() @@ -192,8 +180,6 @@ def prepare(self, app, appconfig, appmasterconfig, jobmasterconfig): ) -from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers - allHandlers.add('Highland', 'LSF', RTHandler) allHandlers.add('Highland', 'Local', RTHandler) allHandlers.add('Highland', 'PBS', RTHandler) diff --git a/ganga/GangaND280/ND280Checkers/ND280Checker.py b/ganga/GangaND280/ND280Checkers/ND280Checker.py index f9e226ff7e..0d7bc0c65c 100644 --- a/ganga/GangaND280/ND280Checkers/ND280Checker.py +++ b/ganga/GangaND280/ND280Checkers/ND280Checker.py @@ -3,23 +3,20 @@ # ################################################################################ -from GangaCore.GPIDev.Base import GangaObject from GangaCore.GPIDev.Adapters.IPostProcessor import PostProcessException -from GangaCore.GPIDev.Adapters.IChecker import IChecker, IFileChecker -from GangaCore.GPIDev.Base.Proxy import GPIProxyObject -from GangaCore.GPIDev.Schema import ComponentItem, FileItem, Schema, SimpleItem, Version -from GangaCore.Utility.Plugin import allPlugins +from GangaCore.GPIDev.Adapters.IChecker import IFileChecker +from GangaCore.GPIDev.Schema import SimpleItem from GangaCore.Utility.logging import getLogger import subprocess -import copy import os -import string import shutil # Simon's post_status - communicates to processing DB from . import post_status -import urllib.request, urllib.error, urllib.parse +import urllib.request +import urllib.error +import urllib.parse logger = getLogger() @@ -178,7 +175,8 @@ def send_status(self): logger.info('Result for %s %s %s %s is: %s, %s, %s, %s, %s' % (self.RUN,self.SUBRUN,self.TRIGTYPE,self.STAGE,self.site,self.ReturnCode,self.Time,self.EventsIn,self.EventsOut)) - if self.range == 0: return # no remote status report for CosMC + if self.range == 0: + return # no remote status report for CosMC if not self.path: logger.error("No monitoring info sent because MONDIR is not defined") diff --git a/ganga/GangaND280/ND280Control/ND280Configs.py b/ganga/GangaND280/ND280Control/ND280Configs.py index 16ee6a3b90..a8f64a211f 100755 --- a/ganga/GangaND280/ND280Control/ND280Configs.py +++ b/ganga/GangaND280/ND280Control/ND280Configs.py @@ -448,7 +448,7 @@ def CreateCosmicMCCF(self): logger.error('Please make sure all options stated above are entered') return '' - if not self.options['stage'] in ['base', 'fgd', 'tript', 'all']: + if self.options['stage'] not in ['base', 'fgd', 'tript', 'all']: logger.error('"stage" options should be one of', ['base', 'fgd', 'tript', 'all']) return '' @@ -537,7 +537,7 @@ def CreateSandMCCF(self): logger.error('Please make sure all options stated above are entered') return '' - if not self.options['stage'] in ['neutMC', 'g4anal', 'neutSetup']: + if self.options['stage'] not in ['neutMC', 'g4anal', 'neutSetup']: logger.error('"stage" options should be one of', ['neutMC', 'g4anal', 'neutSetup']) return '' diff --git a/ganga/GangaND280/ND280Control/runND280.py b/ganga/GangaND280/ND280Control/runND280.py index ea892a11e4..99853306f3 100644 --- a/ganga/GangaND280/ND280Control/runND280.py +++ b/ganga/GangaND280/ND280Control/runND280.py @@ -17,6 +17,7 @@ from GangaCore.GPIDev.Schema import Schema, SimpleItem, Version from GangaCore.Utility.Config import getConfig from GangaCore.Utility.files import expandfilename +from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers shared_path = os.path.join( expandfilename(getConfig('Configuration')['gangadir']), @@ -80,19 +81,7 @@ class runND280(IPrepareApp): _category = 'applications' _name = 'runND280' _exportmethods = ['prepare'] - _GUIPrefs = [ - {'attribute': 'args', 'widget': 'String_List'}, - {'attribute': 'cmtsetup', 'widget': 'String'}, - {'attribute': 'configfile', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] - - _GUIAdvancedPrefs = [ - {'attribute': 'args', 'widget': 'String_List'}, - {'attribute': 'cmtsetup', 'widget': 'String'}, - {'attribute': 'configfile', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] + exe = 'runND280' def __init__(self): @@ -229,7 +218,6 @@ def prepare(self, app, appconfig, appmasterconfig, jobmasterconfig): ) -from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers allHandlers.add('runND280', 'LSF', RTHandler) allHandlers.add('runND280', 'Local', RTHandler) diff --git a/ganga/GangaND280/ND280Control/runND280CosMC.py b/ganga/GangaND280/ND280Control/runND280CosMC.py index 1d107d36a5..f04114f337 100644 --- a/ganga/GangaND280/ND280Control/runND280CosMC.py +++ b/ganga/GangaND280/ND280Control/runND280CosMC.py @@ -17,6 +17,8 @@ from GangaCore.GPIDev.Schema import Schema, SimpleItem, Version from GangaCore.Utility.Config import getConfig from GangaCore.Utility.files import expandfilename +from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers +from . import ND280Configs shared_path = os.path.join( expandfilename(getConfig('Configuration')['gangadir']), @@ -24,8 +26,6 @@ getConfig('Configuration')['user'], ) -from . import ND280Configs - class runND280CosMC(IPrepareApp): """ @@ -82,19 +82,6 @@ class runND280CosMC(IPrepareApp): _category = 'applications' _name = 'runND280CosMC' _exportmethods = ['prepare'] - _GUIPrefs = [ - {'attribute': 'args', 'widget': 'String_List'}, - {'attribute': 'cmtsetup', 'widget': 'String'}, - {'attribute': 'confopts', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] - - _GUIAdvancedPrefs = [ - {'attribute': 'args', 'widget': 'String_List'}, - {'attribute': 'cmtsetup', 'widget': 'String'}, - {'attribute': 'confopts', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] def __init__(self): super(runND280CosMC, self).__init__() @@ -219,8 +206,6 @@ def prepare(self, app, appconfig, appmasterconfig, jobmasterconfig): ) -from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers - allHandlers.add('runND280CosMC', 'LSF', RTHandler) allHandlers.add('runND280CosMC', 'Local', RTHandler) allHandlers.add('runND280CosMC', 'PBS', RTHandler) diff --git a/ganga/GangaND280/ND280Control/runND280CtrlSmpl.py b/ganga/GangaND280/ND280Control/runND280CtrlSmpl.py index 5f994d075b..c46c7a66cd 100644 --- a/ganga/GangaND280/ND280Control/runND280CtrlSmpl.py +++ b/ganga/GangaND280/ND280Control/runND280CtrlSmpl.py @@ -17,6 +17,7 @@ from GangaCore.GPIDev.Schema import Schema, SimpleItem, Version from GangaCore.Utility.Config import getConfig from GangaCore.Utility.files import expandfilename +from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers shared_path = os.path.join( expandfilename(getConfig('Configuration')['gangadir']), @@ -94,19 +95,7 @@ class runND280CtrlSmpl(IPrepareApp): _category = 'applications' _name = 'runND280CtrlSmpl' _exportmethods = ['prepare'] - _GUIPrefs = [ - {'attribute': 'args', 'widget': 'String_List'}, - {'attribute': 'cmtsetup', 'widget': 'String'}, - {'attribute': 'configfile', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] - - _GUIAdvancedPrefs = [ - {'attribute': 'args', 'widget': 'String_List'}, - {'attribute': 'cmtsetup', 'widget': 'String'}, - {'attribute': 'configfile', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] + exe = 'runND280CtrlSmpl' def __init__(self): @@ -233,8 +222,6 @@ def prepare(self, app, appconfig, appmasterconfig, jobmasterconfig): ) -from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers - allHandlers.add('runND280CtrlSmpl', 'LSF', RTHandler) allHandlers.add('runND280CtrlSmpl', 'Local', RTHandler) allHandlers.add('runND280CtrlSmpl', 'PBS', RTHandler) diff --git a/ganga/GangaND280/ND280Control/runND280Kin.py b/ganga/GangaND280/ND280Control/runND280Kin.py index 3765413724..fdb2d4787b 100644 --- a/ganga/GangaND280/ND280Control/runND280Kin.py +++ b/ganga/GangaND280/ND280Control/runND280Kin.py @@ -17,6 +17,7 @@ from GangaCore.GPIDev.Schema import Schema, SimpleItem, Version from GangaCore.Utility.Config import getConfig from GangaCore.Utility.files import expandfilename +from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers shared_path = os.path.join( expandfilename(getConfig('Configuration')['gangadir']), @@ -79,19 +80,6 @@ class runND280Kin(IPrepareApp): _category = 'applications' _name = 'runND280Kin' _exportmethods = ['prepare'] - _GUIPrefs = [ - {'attribute': 'args', 'widget': 'String_List'}, - {'attribute': 'cmtsetup', 'widget': 'String'}, - {'attribute': 'confopts', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] - - _GUIAdvancedPrefs = [ - {'attribute': 'args', 'widget': 'String_List'}, - {'attribute': 'cmtsetup', 'widget': 'String'}, - {'attribute': 'confopts', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] def __init__(self): super(runND280Kin, self).__init__() @@ -199,8 +187,6 @@ def prepare(self, app, appconfig, appmasterconfig, jobmasterconfig): ) -from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers - allHandlers.add('runND280Kin', 'LSF', RTHandler) allHandlers.add('runND280Kin', 'Local', RTHandler) allHandlers.add('runND280Kin', 'PBS', RTHandler) diff --git a/ganga/GangaND280/ND280Control/runND280RDP.py b/ganga/GangaND280/ND280Control/runND280RDP.py index 41658fab07..d077eb3c11 100644 --- a/ganga/GangaND280/ND280Control/runND280RDP.py +++ b/ganga/GangaND280/ND280Control/runND280RDP.py @@ -17,6 +17,9 @@ from GangaCore.GPIDev.Schema import Schema, SimpleItem, Version from GangaCore.Utility.Config import getConfig from GangaCore.Utility.files import expandfilename +from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers +from GangaCore.Lib.Virtualization import Apptainer +from . import ND280Configs shared_path = os.path.join( expandfilename(getConfig('Configuration')['gangadir']), @@ -24,8 +27,6 @@ getConfig('Configuration')['user'], ) -from . import ND280Configs - class runND280RDP(IPrepareApp): """ @@ -75,24 +76,21 @@ class runND280RDP(IPrepareApp): comparable=1, doc='Location of shared resources. Presence of this attribute implies the application has been prepared.', ), + 'container': SimpleItem( + defvalue=None, doc='Path to container', typelist=['type(None)', 'str'] + ), + 'mounts': SimpleItem( + defvalue=[], + doc='Container paths to mount', + typelist=['str'], + sequence=1, + strict_sequence=0, + ), }, ) _category = 'applications' _name = 'runND280RDP' _exportmethods = ['prepare'] - _GUIPrefs = [ - {'attribute': 'args', 'widget': 'String_List'}, - {'attribute': 'cmtsetup', 'widget': 'String'}, - {'attribute': 'confopts', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] - - _GUIAdvancedPrefs = [ - {'attribute': 'args', 'widget': 'String_List'}, - {'attribute': 'cmtsetup', 'widget': 'String'}, - {'attribute': 'confopts', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] def __init__(self): super(runND280RDP, self).__init__() @@ -101,6 +99,12 @@ def configure(self, masterappconfig): args = convertIntToStringArgs(self.args) job = self.getJobObject() + if self.container: + job.virtualization = Apptainer() + job.virtualization.image = self.container + + if len(self.mounts) > 0: + job.virtualization.mounts = {mount: mount for mount in self.mounts} if self.cmtsetup == []: raise ApplicationConfigurationError('No cmt setup script given.') @@ -219,8 +223,6 @@ def prepare(self, app, appconfig, appmasterconfig, jobmasterconfig): ) -from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers - allHandlers.add('runND280RDP', 'LSF', RTHandler) allHandlers.add('runND280RDP', 'Local', RTHandler) allHandlers.add('runND280RDP', 'PBS', RTHandler) diff --git a/ganga/GangaND280/ND280Control/runND280SandMC.py b/ganga/GangaND280/ND280Control/runND280SandMC.py index 62ef455c0d..dac2534a12 100644 --- a/ganga/GangaND280/ND280Control/runND280SandMC.py +++ b/ganga/GangaND280/ND280Control/runND280SandMC.py @@ -17,6 +17,8 @@ from GangaCore.GPIDev.Schema import SimpleItem, Schema, Version from GangaCore.Utility.Config import getConfig from GangaCore.Utility.files import expandfilename +from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers +from . import ND280Configs shared_path = os.path.join( expandfilename(getConfig('Configuration')['gangadir']), @@ -24,8 +26,6 @@ getConfig('Configuration')['user'], ) -from . import ND280Configs - class runND280SandMC(IPrepareApp): """ @@ -82,19 +82,6 @@ class runND280SandMC(IPrepareApp): _category = 'applications' _name = 'runND280SandMC' _exportmethods = ['prepare'] - _GUIPrefs = [ - {'attribute': 'args', 'widget': 'String_List'}, - {'attribute': 'cmtsetup', 'widget': 'String'}, - {'attribute': 'confopts', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] - - _GUIAdvancedPrefs = [ - {'attribute': 'args', 'widget': 'String_List'}, - {'attribute': 'cmtsetup', 'widget': 'String'}, - {'attribute': 'confopts', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] def __init__(self): super(runND280SandMC, self).__init__() @@ -222,8 +209,6 @@ def prepare(self, app, appconfig, appmasterconfig, jobmasterconfig): ) -from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers - allHandlers.add('runND280SandMC', 'LSF', RTHandler) allHandlers.add('runND280SandMC', 'Local', RTHandler) allHandlers.add('runND280SandMC', 'PBS', RTHandler) diff --git a/ganga/GangaND280/ND280Dataset/ND280Dataset.py b/ganga/GangaND280/ND280Dataset/ND280Dataset.py index 98a5a01586..7bbeb94d32 100755 --- a/ganga/GangaND280/ND280Dataset/ND280Dataset.py +++ b/ganga/GangaND280/ND280Dataset/ND280Dataset.py @@ -8,16 +8,18 @@ It relies on the basic ND280Dataset class which is used through inheritance in other more specific classes. """ -from GangaCore.GPIDev.Schema import * +from GangaCore.GPIDev.Schema import Schema, Version, SimpleItem from GangaCore.GPIDev.Lib.Dataset import Dataset from GangaCore.Utility.Config import getConfig from GangaCore.Utility.logging import getLogger -import os, re, fnmatch +import os +import fnmatch import subprocess logger = getLogger() + class ND280Dataset(Dataset): """ Base class for ND280 Datasets. @@ -26,88 +28,88 @@ class ND280Dataset(Dataset): You can define the list of files by hand directly if you want: dataset.names = ['/path/to/the/files/Input1.root','/path/to/the/files/Input2.root'] """ - - _schema = Schema(Version(1,0), { - 'names': SimpleItem(defvalue = [], typelist=['str'], sequence=1,doc='List of input files with full path'), + + _schema = Schema(Version(1, 0), { + 'names': SimpleItem(defvalue=[], typelist=['str'], sequence=1, doc='List of input files with full path'), }) - + _category = 'datasets' _name = 'ND280Dataset' _hidden = 1 def __init__(self): super(ND280Dataset, self).__init__() - - def get_dataset_from_list(self,list_file): - """Get the dataset files as listed in a text file.""" - logger.info('Reading list file %s ...',list_file) + def get_dataset_from_list(self, list_file): + """Get the dataset files as listed in a text file.""" + + logger.info('Reading list file %s ...', list_file) + + if not os.path.exists(list_file): + logger.error('File %s does not exist', list_file) + return - if not os.path.exists(list_file): - logger.error('File %s does not exist',list_file) - return + f = open(list_file) + for ln in f.readlines(): - f = open( list_file ) - for ln in f.readlines(): + # split the directory from the file and call get_dataset + if os.path.isdir(ln.strip()): + self.get_dataset(ln.strip()) + else: + self.get_dataset(os.path.dirname(ln.strip()), os.path.basename(ln.strip())) - # split the directory from the file and call get_dataset - if os.path.isdir(ln.strip()): - self.get_dataset( ln.strip() ) - else: - self.get_dataset( os.path.dirname(ln.strip()), os.path.basename( ln.strip() ) ) - - def set_dataset_into_list(self,list_file): - """Write the dataset files as a list in a text file.""" + def set_dataset_into_list(self, list_file): + """Write the dataset files as a list in a text file.""" - logger.info('Writing list file %s ...',list_file) + logger.info('Writing list file %s ...', list_file) - if not len(self.names): - logger.error('No input files found. The dataset is empty.') - return 0 + if not len(self.names): + logger.error('No input files found. The dataset is empty.') + return 0 - try: - fileList = open( list_file, 'w') - except IOError: - logger.error('File %s cannot be created',list_file) - return 0 + try: + fileList = open(list_file, 'w') + except IOError: + logger.error('File %s cannot be created', list_file) + return 0 - for inFile in self.names: - fileList.write(inFile+'\n') + for inFile in self.names: + fileList.write(inFile + '\n') - fileList.close() - return 1 + fileList.close() + return 1 - def get_dataset(self,directory,filter=None): + def get_dataset(self, directory, filter=None): """Get the list of files in the dataset's directory. This is not implemented in ND280Dataset but is defined in each class inheriting from ND280Dataset.""" - - raise NotImplementedError + raise NotImplementedError def get_dataset_filenames(self): """Simply returns a python list containing all the filenames in this ND280Dataset. """ return self.names - def set_dataset_filenames(self,list_file): - """Copy the list of files given in input as the list of files in the ND280Dataset. - NOTE: This will not append the input list to the existing file list but instead replace it.""" + def set_dataset_filenames(self, list_file): + """Copy the list of files given in input as the list of files in the ND280Dataset. + NOTE: This will not append the input list to the existing file list but instead replace it.""" - logger.info('Writing list file %s ...',list_file) + logger.info('Writing list file %s ...', list_file) - self.names=list_file + self.names = list_file def get_filenames(app): """Retrieve the file names starting from an application object""" - - job=app._getRoot() + + job = app._getRoot() if not job: logger.warning('Application object is not associated to a job.') return [] - + # Jobs without inputdata are allowed - if not job.inputdata: return [] - + if not job.inputdata: + return [] + import re classnamematch = re.match(r"ND280.*Dataset", job.inputdata._name) if not classnamematch: @@ -115,79 +117,76 @@ def get_filenames(app): return [] return job.inputdata.names - - get_filenames=staticmethod(get_filenames) + + get_filenames = staticmethod(get_filenames) class ND280LocalDataset(ND280Dataset): """ND280LocalDataset manages files located in a local directory.""" - - _schema = Schema(Version(1,1), { - 'names': SimpleItem(defvalue = [], typelist=['str'], sequence=1,doc='List of input files with full path'), + + _schema = Schema(Version(1, 1), { + 'names': SimpleItem(defvalue=[], typelist=['str'], sequence=1, doc='List of input files with full path'), }) - + _category = 'datasets' _name = 'ND280LocalDataset' - _exportmethods = ['get_dataset', 'get_dataset_filenames', 'get_dataset_from_list', 'get_raw_from_list', 'get_kin_range','set_dataset_into_list', 'set_dataset_filenames'] + _exportmethods = ['get_dataset', 'get_dataset_filenames', 'get_dataset_from_list', + 'get_raw_from_list', 'get_kin_range', 'set_dataset_into_list', 'set_dataset_filenames'] - _GUIPrefs = [ { 'attribute' : 'names', 'attribute' : 'String_List' } ] - def __init__(self): super(ND280LocalDataset, self).__init__() + def get_dataset(self, directory, filter=None): + """Get the list of files in the local directory given. + You can provide a wildcard filter such as 'oa_*.root'""" - def get_dataset(self,directory,filter=None): - """Get the list of files in the local directory given. - You can provide a wildcard filter such as 'oa_*.root'""" - - logger.info('Reading %s ...',directory) - + logger.info('Reading %s ...', directory) - if not os.path.isdir(directory): - logger.error('Path %s is no directory',directory) - return + if not os.path.isdir(directory): + logger.error('Path %s is no directory', directory) + return - directory = os.path.abspath(directory) - if filter: - new_names = [ os.path.join(directory,name) for name in fnmatch.filter(sorted(os.listdir(directory)),filter) ] - else: - new_names = [ os.path.join(directory,name) for name in sorted(os.listdir(directory)) ] + directory = os.path.abspath(directory) + if filter: + new_names = [os.path.join(directory, name) for name in fnmatch.filter(sorted(os.listdir(directory)), filter)] + else: + new_names = [os.path.join(directory, name) for name in sorted(os.listdir(directory))] - self.names.extend( new_names ) + self.names.extend(new_names) - self._setDirty() + self._setDirty() - def get_raw_from_list(self,prfx,list_file): - """Get the dataset of raw files as listed in a text file as run/subrun combinations.""" + def get_raw_from_list(self, prfx, list_file, exp_name='nd280'): + """Get the dataset of raw files as listed in a text file as run/subrun combinations.""" - logger.info('Reading list file %s ...',list_file) + logger.info('Reading list file %s ...', list_file) - if not os.path.isdir(prfx): - logger.error('Directory $s does not exist',prfx) - return + if not os.path.isdir(prfx): + logger.error('Directory $s does not exist', prfx) + return - if not os.path.exists(list_file): - logger.error('File %s does not exist',list_file) - return + if not os.path.exists(list_file): + logger.error('File %s does not exist', list_file) + return - f = open( list_file ) - for ln in f.readlines(): + f = open(list_file) + for ln in f.readlines(): - chunks = ln.split() - run = "%08d" % int(chunks[0]) - sub = "%04d" % int(chunks[1]) - rang = run[:5]+'000_'+run[:5]+'999' - file = 'nd280_'+run+'_'+sub+'.daq.mid.gz' - self.get_dataset( os.path.join(prfx,rang), file) + chunks = ln.split() + run = "%08d" % int(chunks[0]) + sub = "%04d" % int(chunks[1]) + rang = run[:5] + '000_' + run[:5] + '999' + file = exp_name + '_' + run + '_' + sub + '.daq.mid.gz' + self.get_dataset(os.path.join(prfx, rang), file) - def get_kin_range(self,fr,to): + def get_kin_range(self, fr, to): """Get the dataset of kin file numbers""" - logger.info('Producing a list of kin file numbers in the range from %s to %s.',fr,to) + logger.info('Producing a list of kin file numbers in the range from %s to %s.', fr, to) - self.names.extend([j for j in range(fr,to+1)]) + self.names.extend([j for j in range(fr, to + 1)]) class ND280DCacheDataset(ND280Dataset): @@ -197,17 +196,16 @@ class ND280DCacheDataset(ND280Dataset): And currently the only configured server is TRIUMF but later you will be able to use another server: dataset.server = 'TRIUMF' """ - - _schema = Schema(Version(1,0), { - 'names': SimpleItem(defvalue = [], typelist=['str'], sequence=1,doc='List of input files with full path to get the file on the server, i.e. "dcap://the/path/thefile'), - 'server': SimpleItem(defvalue = "TRIUMF", doc='Name of the dcache server used'), + + _schema = Schema(Version(1, 0), { + 'names': SimpleItem(defvalue=[], typelist=['str'], sequence=1, doc='List of input files with full path to get the file on the server, i.e. "dcap://the/path/thefile'), + 'server': SimpleItem(defvalue="TRIUMF", doc='Name of the dcache server used'), }) - - _name = 'ND280DCacheDataset' - _exportmethods = ['get_dataset', 'get_dataset_filenames', 'get_dataset_from_list', 'set_dataset_into_list', 'set_dataset_filenames' ] + _name = 'ND280DCacheDataset' - _GUIPrefs = [ { 'attribute' : 'names', 'attribute' : 'server', 'widget' : 'String_List' } ] + _exportmethods = ['get_dataset', 'get_dataset_filenames', + 'get_dataset_from_list', 'set_dataset_into_list', 'set_dataset_filenames'] _commandstr = getConfig('ND280')['ND280DCacheDatasetCommandStr'] _filebasepath = getConfig('ND280')['ND280DCacheDatasetFileBasePath'] @@ -215,32 +213,31 @@ class ND280DCacheDataset(ND280Dataset): def __init__(self): super(ND280DCacheDataset, self).__init__() + def get_dataset(self, directory, filter=None): + """Get the list of files in the directory on the dCache server. + You can provide a wildcard filter such as 'oa_*.root'""" - def get_dataset(self,directory,filter=None): - """Get the list of files in the directory on the dCache server. - You can provide a wildcard filter such as 'oa_*.root'""" - - if not self.server in list(self._commandstr.keys()): - logger.error('DCache server %s is unknown.', self.server) - return + if self.server not in list(self._commandstr.keys()): + logger.error('DCache server %s is unknown.', self.server) + return - logger.info('Reading %s ...',directory) + logger.info('Reading %s ...', directory) - command = self._commandstr[self.server] % directory - rawoutput = subprocess.getoutput(command); - allfiles = rawoutput.split("\n") + command = self._commandstr[self.server] % directory + rawoutput = subprocess.getoutput(command) + allfiles = rawoutput.split("\n") - # TODO: return directory error when curl isn't happy - #if not os.path.isdir(directory): - # logger.error('Path %s is no directory',directory) - # return + # TODO: return directory error when curl isn't happy + # if not os.path.isdir(directory): + # logger.error('Path %s is no directory',directory) + # return - fullpath = os.path.join(self._filebasepath[self.server], directory ) - if filter: - new_names = [ os.path.join(fullpath,name) for name in fnmatch.filter(allfiles,filter) ] - else: - new_names = [ os.path.join(fullpath,name) for name in allfiles ] + fullpath = os.path.join(self._filebasepath[self.server], directory) + if filter: + new_names = [os.path.join(fullpath, name) for name in fnmatch.filter(allfiles, filter)] + else: + new_names = [os.path.join(fullpath, name) for name in allfiles] - self.names.extend( new_names ) + self.names.extend(new_names) - self._setDirty() + self._setDirty() diff --git a/ganga/GangaND280/ND280Executable/ND280Executable.py b/ganga/GangaND280/ND280Executable/ND280Executable.py index 40cf8098fa..f9beb209f4 100644 --- a/ganga/GangaND280/ND280Executable/ND280Executable.py +++ b/ganga/GangaND280/ND280Executable/ND280Executable.py @@ -15,6 +15,7 @@ from GangaCore.GPIDev.Schema import SimpleItem, Schema, Version from GangaCore.Utility.Config import getConfig from GangaCore.Utility.files import expandfilename +from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers shared_path = os.path.join( expandfilename(getConfig('Configuration')['gangadir']), @@ -82,19 +83,6 @@ class ND280Executable(IPrepareApp): _name = 'ND280Executable' _scriptname = None _exportmethods = ['prepare'] - _GUIPrefs = [ - {'attribute': 'exe', 'widget': 'File'}, - {'attribute': 'args', 'widget': 'String_List'}, - {'attribute': 'outputfile', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] - - _GUIAdvancedPrefs = [ - {'attribute': 'exe', 'widget': 'File'}, - {'attribute': 'args', 'widget': 'String_List'}, - {'attribute': 'outputfile', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] def __init__(self): super(ND280Executable, self).__init__() @@ -201,8 +189,6 @@ def prepare(self, app, appconfig, appmasterconfig, jobmasterconfig): ) -from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers - allHandlers.add('ND280Executable', 'LSF', RTHandler) allHandlers.add('ND280Executable', 'Local', RTHandler) allHandlers.add('ND280Executable', 'PBS', RTHandler) diff --git a/ganga/GangaND280/ND280RecoValidation/RecoPlusVFT.py b/ganga/GangaND280/ND280RecoValidation/RecoPlusVFT.py index c3895a4f31..998b1787f3 100644 --- a/ganga/GangaND280/ND280RecoValidation/RecoPlusVFT.py +++ b/ganga/GangaND280/ND280RecoValidation/RecoPlusVFT.py @@ -15,6 +15,7 @@ from GangaCore.GPIDev.Schema import Schema, SimpleItem, Version from GangaCore.Utility.Config import getConfig from GangaCore.Utility.files import expandfilename +from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers shared_path = os.path.join( expandfilename(getConfig('Configuration')['gangadir']), @@ -112,27 +113,6 @@ class RecoPlusVFT(IPrepareApp): _name = 'RecoPlusVFT' _scriptname = None _exportmethods = ['prepare'] - _GUIPrefs = [ - {'attribute': 'reco_exe', 'widget': 'File'}, - {'attribute': 'reco_args', 'widget': 'String_List'}, - {'attribute': 'vft_exe', 'widget': 'File'}, - {'attribute': 'vft_args', 'widget': 'String_List'}, - {'attribute': 'filenamesubstr', 'widget': 'String'}, - {'attribute': 'reconewstr', 'widget': 'String'}, - {'attribute': 'vftnewstr', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] - - _GUIAdvancedPrefs = [ - {'attribute': 'reco_exe', 'widget': 'File'}, - {'attribute': 'reco_args', 'widget': 'String_List'}, - {'attribute': 'vft_exe', 'widget': 'File'}, - {'attribute': 'vft_args', 'widget': 'String_List'}, - {'attribute': 'filenamesubstr', 'widget': 'String'}, - {'attribute': 'reconewstr', 'widget': 'String'}, - {'attribute': 'vftnewstr', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] def __init__(self): super(RecoPlusVFT, self).__init__() @@ -273,8 +253,6 @@ def prepare(self, app, appconfig, appmasterconfig, jobmasterconfig): ) -from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers - allHandlers.add('RecoPlusVFT', 'LSF', RTHandler) allHandlers.add('RecoPlusVFT', 'Local', RTHandler) allHandlers.add('RecoPlusVFT', 'PBS', RTHandler) diff --git a/ganga/GangaND280/ND280RecoValidation/VFT_make_ana.py b/ganga/GangaND280/ND280RecoValidation/VFT_make_ana.py index 6e3a0d1221..825182728e 100644 --- a/ganga/GangaND280/ND280RecoValidation/VFT_make_ana.py +++ b/ganga/GangaND280/ND280RecoValidation/VFT_make_ana.py @@ -4,6 +4,7 @@ # Created 16/12/2013 ################################################################################ +import os from GangaCore.GPIDev.Adapters.IPrepareApp import IPrepareApp from GangaCore.GPIDev.Adapters.IRuntimeHandler import IRuntimeHandler from GangaCore.GPIDev.Schema import Schema, SimpleItem, Version @@ -12,8 +13,8 @@ from GangaCore.Core.exceptions import ApplicationConfigurationError -import os from GangaCore.Utility.files import expandfilename +from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers shared_path = os.path.join( expandfilename(getConfig('Configuration')['gangadir']), @@ -147,24 +148,6 @@ class VFT_make_ana(IPrepareApp): _name = 'VFT_make_ana' _scriptname = None _exportmethods = ['prepare'] - _GUIPrefs = [ - {'attribute': 'cmtsetup', 'widget': 'String'}, - {'attribute': 'tree', 'widget': 'String'}, - {'attribute': 'ana_custom', 'widget': 'String'}, - {'attribute': 'ana_output', 'widget': 'String'}, - {'attribute': 'ana_useropt', 'widget': 'String_List'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] - - _GUIAdvancedPrefs = [ - {'attribute': 'cmtsetup', 'widget': 'String'}, - {'attribute': 'tree', 'widget': 'String'}, - {'attribute': 'ana_custom', 'widget': 'String'}, - {'attribute': 'ana_output', 'widget': 'String'}, - {'attribute': 'ana_useropt', 'widget': 'String_List'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] - def __init__(self): super(VFT_make_ana, self).__init__() @@ -262,7 +245,7 @@ def configure(self, masterappconfig): # } for key in argDict: - if not getattr(self, argDict[key]) is None: + if getattr(self, argDict[key]) is not None: args.append(key + '=' + getattr(self, argDict[key])) for opt in self.pdf_options: @@ -334,7 +317,6 @@ def prepare(self, app, appconfig, appmasterconfig, jobmasterconfig): ) -from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers allHandlers.add('VFT_make_ana', 'LSF', RTHandler) allHandlers.add('VFT_make_ana', 'Local', RTHandler) diff --git a/ganga/GangaND280/ND280RecoValidation/oaReconPlusoaAnalysis.py b/ganga/GangaND280/ND280RecoValidation/oaReconPlusoaAnalysis.py index 4230480662..07d31648d5 100644 --- a/ganga/GangaND280/ND280RecoValidation/oaReconPlusoaAnalysis.py +++ b/ganga/GangaND280/ND280RecoValidation/oaReconPlusoaAnalysis.py @@ -15,6 +15,7 @@ from GangaCore.GPIDev.Schema import Schema, SimpleItem, Version from GangaCore.Utility.Config import getConfig from GangaCore.Utility.files import expandfilename +from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers shared_path = os.path.join( expandfilename(getConfig('Configuration')['gangadir']), @@ -100,23 +101,6 @@ class oaReconPlusoaAnalysis(IPrepareApp): _name = 'oaReconPlusoaAnalysis' _scriptname = None _exportmethods = ['prepare'] - _GUIPrefs = [ - {'attribute': 'reco_args', 'widget': 'String_List'}, - {'attribute': 'anal_args', 'widget': 'String_List'}, - {'attribute': 'filenamesubstr', 'widget': 'String'}, - {'attribute': 'reconewstr', 'widget': 'String'}, - {'attribute': 'analnewstr', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] - - _GUIAdvancedPrefs = [ - {'attribute': 'reco_args', 'widget': 'String_List'}, - {'attribute': 'anal_args', 'widget': 'String_List'}, - {'attribute': 'filenamesubstr', 'widget': 'String'}, - {'attribute': 'reconewstr', 'widget': 'String'}, - {'attribute': 'analnewstr', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] def __init__(self): super(oaReconPlusoaAnalysis, self).__init__() @@ -246,8 +230,6 @@ def prepare(self, app, appconfig, appmasterconfig, jobmasterconfig): ) -from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers - allHandlers.add('oaReconPlusoaAnalysis', 'LSF', RTHandler) allHandlers.add('oaReconPlusoaAnalysis', 'Local', RTHandler) allHandlers.add('oaReconPlusoaAnalysis', 'PBS', RTHandler) diff --git a/ganga/GangaND280/ND280Skimmer/ND280Skimmer.py b/ganga/GangaND280/ND280Skimmer/ND280Skimmer.py index 13f911bdd3..63d013cdac 100644 --- a/ganga/GangaND280/ND280Skimmer/ND280Skimmer.py +++ b/ganga/GangaND280/ND280Skimmer/ND280Skimmer.py @@ -7,6 +7,8 @@ Ganga module with classes to skim from reco files a set of events listed in a CSV file. """ +from os.path import isfile +import os from GangaCore.GPIDev.Adapters.IPrepareApp import IPrepareApp from GangaCore.GPIDev.Adapters.IRuntimeHandler import IRuntimeHandler from GangaCore.GPIDev.Schema import Schema, SimpleItem, Version @@ -15,13 +17,11 @@ from GangaCore.Utility.logging import getLogger from GangaCore.Core.exceptions import ApplicationConfigurationError +from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers +from GangaCore.Utility.files import expandfilename logger = getLogger() -from os.path import isfile - -import os -from GangaCore.Utility.files import expandfilename shared_path = os.path.join( expandfilename(getConfig('Configuration')['gangadir']), @@ -76,19 +76,6 @@ class ND280RecoSkimmer(IPrepareApp): _name = 'ND280RecoSkimmer' _scriptname = None _exportmethods = ['prepare'] - _GUIPrefs = [ - {'attribute': 'csvfile', 'widget': 'String'}, - {'attribute': 'cmtsetup', 'widget': 'String'}, - {'attribute': 'outputfile', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] - - _GUIAdvancedPrefs = [ - {'attribute': 'csvfile', 'widget': 'String'}, - {'attribute': 'cmtsetup', 'widget': 'String'}, - {'attribute': 'outputfile', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] def __init__(self): super(ND280RecoSkimmer, self).__init__() @@ -236,8 +223,6 @@ def prepare(self, app, appconfig, appmasterconfig, jobmasterconfig): ) -from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers - allHandlers.add('ND280RecoSkimmer', 'LSF', RTHandler) allHandlers.add('ND280RecoSkimmer', 'Local', RTHandler) allHandlers.add('ND280RecoSkimmer', 'PBS', RTHandler) diff --git a/ganga/GangaND280/ND280Splitter/ND280Splitter.py b/ganga/GangaND280/ND280Splitter/ND280Splitter.py index a80884fc3e..f0226aa0b0 100755 --- a/ganga/GangaND280/ND280Splitter/ND280Splitter.py +++ b/ganga/GangaND280/ND280Splitter/ND280Splitter.py @@ -7,16 +7,17 @@ This module contains various splitters for ND280 jobs. """ -import inspect from GangaCore.GPIDev.Adapters.ISplitter import ISplitter from GangaCore.GPIDev.Base.Proxy import addProxy, stripProxy -from GangaCore.GPIDev.Schema import * +from GangaCore.GPIDev.Schema import Schema, Version, SimpleItem from GangaCore.Utility.logging import getLogger logger = getLogger() # First define the functions that can be used here or in the transforms + + def splitCSVFile(csvfile, nbevents): subsets = [] allLines = [] @@ -26,29 +27,29 @@ def splitCSVFile(csvfile, nbevents): line = line.rstrip('\r\n') row = line.split(",") if not len(row) == 3: - logger.info("Ignoring badly-formatted line: {}".format(",".join(row)) ) + logger.info("Ignoring badly-formatted line: {}".format(",".join(row))) continue allLines.append(line) csvfilebuf.close() if nbevents < 1: - raise Exception('Number of nbevents not set properly.') + raise Exception('Number of nbevents not set properly.') subsets = [] # Less lines than number of events per job wanted => easy if len(allLines) < nbevents: - subsets.append(allLines) + subsets.append(allLines) else: - nbfulljobs = len(allLines) // nbevents - for nb in range(nbfulljobs): - Low = nb * nbevents - High = (nb+1) * nbevents - subsets.append(allLines[Low:High]) + nbfulljobs = len(allLines) // nbevents + for nb in range(nbfulljobs): + Low = nb * nbevents + High = (nb + 1) * nbevents + subsets.append(allLines[Low:High]) - if len(allLines) % nbevents: - # If the number of lines is not divisible by nbevents - # then the last subjob which has less - subsets.append(allLines[High:]) + if len(allLines) % nbevents: + # If the number of lines is not divisible by nbevents + # then the last subjob which has less + subsets.append(allLines[High:]) return subsets @@ -60,19 +61,19 @@ def splitNbInputFile(infiles, nbfiles): subsets = [infiles] # Less files than number of jobs wanted => easy. elif len(infiles) < nbfiles: - for f in infiles: - subsets.append([f]) + for f in infiles: + subsets.append([f]) else: - nbfulljobs = len(infiles) // nbfiles - for nb in range(nbfulljobs): - Low = nb*nbfiles - High = (nb+1)*nbfiles - subsets.append(infiles[Low:High]) + nbfulljobs = len(infiles) // nbfiles + for nb in range(nbfulljobs): + Low = nb * nbfiles + High = (nb + 1) * nbfiles + subsets.append(infiles[Low:High]) - if len(infiles) % nbfiles: - # If the number of input files is not divisible by nbfiles - # then the last subjob which has less - subsets.append(infiles[High:]) + if len(infiles) % nbfiles: + # If the number of input files is not divisible by nbfiles + # then the last subjob which has less + subsets.append(infiles[High:]) return subsets @@ -95,47 +96,47 @@ class ND280SplitNbJobs(ISplitter): j = Job(splitter=S) j.submit() - """ + """ _name = "ND280SplitNbJobs" - _schema = Schema(Version(1,0), { - 'nbjobs' : SimpleItem(defvalue=-1,doc='The number of subjobs'), - } ) + _schema = Schema(Version(1, 0), { + 'nbjobs': SimpleItem(defvalue=-1, doc='The number of subjobs'), + }) + + def split(self, job): - def split(self,job): - subjobs = [] filenames = job.inputdata.get_dataset_filenames() - - logger.info('Creating %d subjobs ...',self.nbjobs) + + logger.info('Creating %d subjobs ...', self.nbjobs) if self.nbjobs < 1: - raise Exception('Number of nbjobs not set properly.') + raise Exception('Number of nbjobs not set properly.') subsets = [] # Less files than number of jobs wanted => easy if len(filenames) < self.nbjobs: - for f in filenames: - subsets.append([f]) + for f in filenames: + subsets.append([f]) else: - isPerfectSplit = (len(filenames) % self.nbjobs) == 0 - if isPerfectSplit: - # If the number of input files is divisible by nbjobs - # then all subjobs have the same number of input files - nbfulljobs = self.nbjobs - else: - # Otherwise all subjobs have the same number of input files - # except the last subjob which has less - nbfulljobs = self.nbjobs - 1 - - persub = int(len(filenames) / nbfulljobs) - for nb in range(nbfulljobs): - Low = nb*persub - High = (nb+1)*persub - subsets.append(filenames[Low:High]) - - if not isPerfectSplit: - subsets.append(filenames[High:]) + isPerfectSplit = (len(filenames) % self.nbjobs) == 0 + if isPerfectSplit: + # If the number of input files is divisible by nbjobs + # then all subjobs have the same number of input files + nbfulljobs = self.nbjobs + else: + # Otherwise all subjobs have the same number of input files + # except the last subjob which has less + nbfulljobs = self.nbjobs - 1 + + persub = int(len(filenames) / nbfulljobs) + for nb in range(nbfulljobs): + Low = nb * persub + High = (nb + 1) * persub + subsets.append(filenames[Low:High]) + + if not isPerfectSplit: + subsets.append(filenames[High:]) for sub in subsets: @@ -148,7 +149,6 @@ def split(self,job): return subjobs - class ND280SplitNbInputFiles(ISplitter): """ @@ -166,24 +166,24 @@ class ND280SplitNbInputFiles(ISplitter): j = Job(splitter=S) j.submit() - """ + """ _name = "ND280SplitNbInputFiles" - _schema = Schema(Version(1,0), { - 'nbfiles' : SimpleItem(defvalue=-1,doc='The number of input files for each subjobs'), - } ) + _schema = Schema(Version(1, 0), { + 'nbfiles': SimpleItem(defvalue=-1, doc='The number of input files for each subjobs'), + }) + + def split(self, job): - def split(self,job): - subjobs = [] filenames = job.inputdata.get_dataset_filenames() - + if self.nbfiles < 1: - raise Exception('Number of nbfiles not set properly.') + raise Exception('Number of nbfiles not set properly.') subsets = splitNbInputFile(filenames, self.nbfiles) - logger.info('Creating %d subjobs ...',len(subjobs)) + logger.info('Creating %d subjobs ...', len(subjobs)) for sub in subsets: @@ -196,8 +196,6 @@ def split(self,job): return subjobs - - class ND280SplitCSVByNbEvt(ISplitter): """ @@ -215,37 +213,37 @@ class ND280SplitCSVByNbEvt(ISplitter): j = Job(splitter=S) j.submit() - """ + """ _name = "ND280SplitCSVByNbEvt" - _schema = Schema(Version(1,0), { - 'nbevents' : SimpleItem(defvalue=-1,doc='The number of events for each subjobs'), - } ) + _schema = Schema(Version(1, 0), { + 'nbevents': SimpleItem(defvalue=-1, doc='The number of events for each subjobs'), + }) - def split(self,job): + def split(self, job): import os - + subjobs = [] subsets = splitCSVFile(job.application.csvfile, self.nbevents) # Less files than number of jobs wanted => easy - logger.info('Creating %d subjobs ...',len(allLines)) + logger.info('Creating %d subjobs ...', len(subsets)) # Base for the naming of each subjob's CSV file - tmpname = os.path.basename(incsvfile) + tmpname = os.path.basename(job.application.csvfile) if len(tmpname.split('.')) > 1: - patterncsv = '.'.join(tmpname.split('.')[0:-1])+"_sub%d."+ tmpname.split('.')[-1] + patterncsv = '.'.join(tmpname.split('.')[0:-1]) + "_sub%d." + tmpname.split('.')[-1] else: - patterncsv = tmpname+"_sub%d" + patterncsv = tmpname + "_sub%d" # Base for the naming of each subjob's output file tmpname = os.path.basename(job.application.outputfile) if len(tmpname.split('.')) > 1: - patternout = '.'.join(tmpname.split('.')[0:-1])+"_sub%d."+ tmpname.split('.')[-1] + patternout = '.'.join(tmpname.split('.')[0:-1]) + "_sub%d." + tmpname.split('.')[-1] else: - patternout = tmpname+"_sub%d" + patternout = tmpname + "_sub%d" - for s,sub in enumerate(subsets): + for s, sub in enumerate(subsets): j = addProxy(self.createSubjob(job)) j.inputdata = job.inputdata @@ -256,19 +254,17 @@ def split(self,job): thiscsv = patterncsv % s # Save in the main job's inputdir now, then the file will be moved to # the inputdir of each subjobs. - job.getInputWorkspace().writefile(FileBuffer(thiscsv,subLines),executable=0) - j.application.csvfile = os.path.join(job.inputdir,thiscsv) + job.getInputWorkspace().writefile(FileBuffer(thiscsv, subLines), executable=0) + j.application.csvfile = os.path.join(job.inputdir, thiscsv) j.application.outputfile = patternout % s # Prepare the output filenames which must be unique subjobs.append(stripProxy(j)) - return subjobs - class ND280SplitOneInputFile(ISplitter): """ @@ -283,20 +279,19 @@ class ND280SplitOneInputFile(ISplitter): j = Job(splitter=S) j.submit() - """ + """ _name = "ND280SplitOneInputFile" - _schema = Schema(Version(1,0), { - } ) + _schema = Schema(Version(1, 0), { + }) + + def split(self, job): - def split(self,job): - subjobs = [] filenames = job.inputdata.get_dataset_filenames() - - subsets = [] + # Less files than number of jobs wanted => easy - logger.info('Creating %d subjobs ...',len(filenames)) + logger.info('Creating %d subjobs ...', len(filenames)) for nb in range(len(filenames)): j = addProxy(self.createSubjob(job)) diff --git a/ganga/GangaND280/ND280TPCGasInteractions/TRExPlusOAAnalysis.py b/ganga/GangaND280/ND280TPCGasInteractions/TRExPlusOAAnalysis.py index 507bfb983c..b8e4f89ac0 100644 --- a/ganga/GangaND280/ND280TPCGasInteractions/TRExPlusOAAnalysis.py +++ b/ganga/GangaND280/ND280TPCGasInteractions/TRExPlusOAAnalysis.py @@ -15,6 +15,7 @@ from GangaCore.GPIDev.Schema import Schema, SimpleItem, Version from GangaCore.Utility.Config import getConfig from GangaCore.Utility.files import expandfilename +from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers shared_path = os.path.join( expandfilename(getConfig('Configuration')['gangadir']), @@ -88,19 +89,6 @@ class TRExPlusOAAnalysis(IPrepareApp): _name = 'TRExPlusOAAnalysis' _scriptname = None _exportmethods = ['prepare'] - _GUIPrefs = [ - {'attribute': 'trex_args', 'widget': 'String_List'}, - {'attribute': 'oaana_args', 'widget': 'String_List'}, - {'attribute': 'filenamesubstr', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] - - _GUIAdvancedPrefs = [ - {'attribute': 'trex_args', 'widget': 'String_List'}, - {'attribute': 'oaana_args', 'widget': 'String_List'}, - {'attribute': 'filenamesubstr', 'widget': 'String'}, - {'attribute': 'env', 'widget': 'DictOfString'}, - ] def __init__(self): super(TRExPlusOAAnalysis, self).__init__() @@ -232,8 +220,6 @@ def prepare(self, app, appconfig, appmasterconfig, jobmasterconfig): ) -from GangaCore.GPIDev.Adapters.ApplicationRuntimeHandlers import allHandlers - allHandlers.add('TRExPlusOAAnalysis', 'LSF', RTHandler) allHandlers.add('TRExPlusOAAnalysis', 'Local', RTHandler) allHandlers.add('TRExPlusOAAnalysis', 'PBS', RTHandler) diff --git a/ganga/GangaND280/Tasks/ND280Task.py b/ganga/GangaND280/Tasks/ND280Task.py index b6b80bc335..4304fb9bdc 100644 --- a/ganga/GangaND280/Tasks/ND280Task.py +++ b/ganga/GangaND280/Tasks/ND280Task.py @@ -1,10 +1,9 @@ from GangaCore.GPIDev.Schema import * from GangaCore.GPIDev.Lib.Tasks.common import * from GangaCore.GPIDev.Lib.Registry.JobRegistry import JobRegistrySlice, JobRegistrySliceProxy -import time from GangaCore.GPIDev.Lib.Tasks import ITask -######################################################################## +######################################################################## class ND280Task(ITask): """T2K add-ons for the Task framework""" diff --git a/ganga/GangaND280/Tasks/ND280Transform.py b/ganga/GangaND280/Tasks/ND280Transform.py index 0685e7a5cc..a46bcca973 100644 --- a/ganga/GangaND280/Tasks/ND280Transform.py +++ b/ganga/GangaND280/Tasks/ND280Transform.py @@ -1,148 +1,139 @@ -from GangaCore.GPIDev.Schema import * -from GangaCore.GPIDev.Lib.Tasks.common import * -from GangaCore.GPIDev.Lib.Tasks.ITransform import ITransform -from GangaCore.GPIDev.Lib.Job.Job import JobError -from GangaCore.GPIDev.Lib.Registry.JobRegistry import JobRegistrySlice, JobRegistrySliceProxy -from GangaCore.Core.exceptions import ApplicationConfigurationError +import os +from GangaCore.GPIDev.Schema import Schema, Version, SimpleItem from GangaCore.GPIDev.Lib.Tasks.ITransform import ITransform -from GangaCore.GPIDev.Lib.Tasks.TaskLocalCopy import TaskLocalCopy -from GangaCore.GPIDev.Lib.File.MassStorageFile import SharedFile from GangaCore.Utility.Config import getConfig from .ND280Unit import ND280Unit from GangaND280.ND280Dataset.ND280Dataset import ND280LocalDataset, ND280DCacheDataset from GangaND280.ND280Splitter import splitNbInputFile import GangaCore.GPI as GPI -import os class ND280Transform(ITransform): - _schema = Schema(Version(1,0), dict(list(ITransform._schema.datadict.items()) + list({ - 'nbinputfiles' : SimpleItem(defvalue=1,doc='The max number of files assigned to each unit. Use 0 to put all the available files in each given inputdata in one unit (i.e. N inputdata => N units).'), - 'inputdatasubsets' : SimpleItem(defvalue=[], hidden=1,doc='List of subsets of files. The number of files in each subset is equal to nbinputfiles'), + _schema = Schema(Version(1, 0), dict(list(ITransform._schema.datadict.items()) + list({ + 'nbinputfiles': SimpleItem(defvalue=1, doc='The max number of files assigned to each unit. Use 0 to put all the available files in each given inputdata in one unit (i.e. N inputdata => N units).'), + 'inputdatasubsets': SimpleItem(defvalue=[], hidden=1, doc='List of subsets of files. The number of files in each subset is equal to nbinputfiles'), }.items()))) - _category = 'transforms' - _name = 'ND280Transform' - _exportmethods = ITransform._exportmethods + [ ] - - - def __init__(self): - super(ND280Transform,self).__init__() - self.inputdatasubsets = [] - - def createUnits(self): - """Create new units if required given the inputdata""" - - # call parent for chaining - super(ND280Transform,self).createUnits() - - # loop over input data and see if we need to create any more units - for idx,inds in enumerate(self.inputdata): - - # currently only checking for Local Datasets and DCache - if inds._name == "ND280LocalDataset" or inds._name == "ND280DCacheDataset": - # First pass, create all the subsets - if len(self.inputdatasubsets) == idx: - subsets = splitNbInputFile(inds.names, self.nbinputfiles) - self.inputdatasubsets.append(subsets) - elif len(self.inputdatasubsets) < idx: - raise Exception('ND280Transform: The inputdata and inputdatasubsets are out of sync. This should not happen.') - - ok = True - - for subset in self.inputdatasubsets[idx]: - # check if this data is being run over by checking all the names listed - ok = False - for unit in self.units: - if (set(unit.inputdata.names) == set(subset)): - ok = True - break - - if not ok: - # new unit required for this subset - unit = ND280Unit() - unit.name = "Unit %d" % len(self.units) - self.addUnitToTRF( unit ) - if inds._name == "ND280LocalDataset": - unit.inputdata = ND280LocalDataset() - elif inds._name == "ND280DCacheDataset": - unit.inputdata = ND280DCacheDataset() - unit.inputdata.names = subset - - # For special cases where there is no inputdata given, - # just create one unit. - if len(self.inputdata) == 0: - unit = ND280Unit() - unit.name = "Unit %d" % len(self.units) - self.addUnitToTRF( unit ) - - - def addUnit(self, filelist): - """Create a new unit based on this file list""" - unit = ND280Unit() - unit.inputdata = ND280LocalDataset() - unit.inputdata.names = filelist - self.addUnitToTRF( unit ) - - def createChainUnit( self, parent_units, use_copy_output = True ): - """Create a chained unit using the output data from the given units""" - - # check all parent units for copy_output - copy_output_ok = True - for parent in parent_units: - if not parent.copy_output: - copy_output_ok = False - - # all parent units must be completed so the outputfiles are filled correctly - for parent in parent_units: - if parent.status != "completed": - return None - - if len(parent_units) == 0: - return None - - if not use_copy_output or not copy_output_ok: - unit = ND280Unit() - unit.inputdata = ND280LocalDataset() - for parent in parent_units: - # loop over the output files and add them to the ND280LocalDataset - THIS MIGHT NEED SOME WORK! - job = GPI.jobs(parent.active_job_ids[0]) - - # if TaskChainInput.include_file_mask is not used go old way (see below) - # otherwise add all file matching include_file_mask(s) to the unit.inputdata. DV. - inc_file_mask = False - for p in self.inputdata[0].include_file_mask: - unit.inputdata.get_dataset(job.outputdir, p) - inc_file_mask = True - - if not inc_file_mask: - for f in job.outputfiles: - # should check for different file types and add them as appropriate to the dataset - # self.inputdata (== TaskChainInput).include/exclude_file_mask could help with this - # This will be A LOT easier with Ganga 6.1 as you can easily map outputfiles -> inputfiles! - # TODO: implement use of include/exclude_file_mask - # - try: - outputfilenameformat = f.outputfilenameformat - except: - inputdir = job.outputdir - else: - #### WARNING: The following will work only if the SharedFile puts the files in local directories ! - inputdir = '/'.join([os.path.expanduser(os.path.expandvars(getConfig('Output')['SharedFile']['uploadOptions']['path'])), f.outputfilenameformat.replace('{fname}','')]) - unit.inputdata.get_dataset( inputdir, f.namePattern ) - else: - - unit = ND280Unit() - unit.inputdata = ND280LocalDataset() - - for parent in parent_units: - # unit needs to have completed and downloaded before we can get file list + _category = 'transforms' + _name = 'ND280Transform' + _exportmethods = ITransform._exportmethods + [] + + def __init__(self): + super(ND280Transform, self).__init__() + self.inputdatasubsets = [] + + def createUnits(self): + """Create new units if required given the inputdata""" + + # call parent for chaining + super(ND280Transform, self).createUnits() + + # loop over input data and see if we need to create any more units + for idx, inds in enumerate(self.inputdata): + + # currently only checking for Local Datasets and DCache + if inds._name == "ND280LocalDataset" or inds._name == "ND280DCacheDataset": + # First pass, create all the subsets + if len(self.inputdatasubsets) == idx: + subsets = splitNbInputFile(inds.names, self.nbinputfiles) + self.inputdatasubsets.append(subsets) + elif len(self.inputdatasubsets) < idx: + raise Exception('ND280Transform: The inputdata and inputdatasubsets are out of sync. This should not happen.') + + ok = True + + for subset in self.inputdatasubsets[idx]: + # check if this data is being run over by checking all the names listed + ok = False + for unit in self.units: + if (set(unit.inputdata.names) == set(subset)): + ok = True + break + + if not ok: + # new unit required for this subset + unit = ND280Unit() + unit.name = "Unit %d" % len(self.units) + self.addUnitToTRF(unit) + if inds._name == "ND280LocalDataset": + unit.inputdata = ND280LocalDataset() + elif inds._name == "ND280DCacheDataset": + unit.inputdata = ND280DCacheDataset() + unit.inputdata.names = subset + + # For special cases where there is no inputdata given, + # just create one unit. + if len(self.inputdata) == 0: + unit = ND280Unit() + unit.name = "Unit %d" % len(self.units) + self.addUnitToTRF(unit) + + def addUnit(self, filelist): + """Create a new unit based on this file list""" + unit = ND280Unit() + unit.inputdata = ND280LocalDataset() + unit.inputdata.names = filelist + self.addUnitToTRF(unit) + + def createChainUnit(self, parent_units, use_copy_output=True): + """Create a chained unit using the output data from the given units""" + + # check all parent units for copy_output + copy_output_ok = True + for parent in parent_units: + if not parent.copy_output: + copy_output_ok = False + + # all parent units must be completed so the outputfiles are filled correctly + for parent in parent_units: if parent.status != "completed": - return None - - # we should be OK so copy all output to the dataset - for f in parent.copy_output.files: - unit.inputdata.names.append( os.path.join( parent.copy_output.local_location, f ) ) - - return unit - + return None + + if len(parent_units) == 0: + return None + + if not use_copy_output or not copy_output_ok: + unit = ND280Unit() + unit.inputdata = ND280LocalDataset() + for parent in parent_units: + # loop over the output files and add them to the ND280LocalDataset - THIS MIGHT NEED SOME WORK! + job = GPI.jobs(parent.active_job_ids[0]) + + # if TaskChainInput.include_file_mask is not used go old way (see below) + # otherwise add all file matching include_file_mask(s) to the unit.inputdata. DV. + inc_file_mask = False + for p in self.inputdata[0].include_file_mask: + unit.inputdata.get_dataset(job.outputdir, p) + inc_file_mask = True + + if not inc_file_mask: + for f in job.outputfiles: + # should check for different file types and add them as appropriate to the dataset + # self.inputdata (== TaskChainInput).include/exclude_file_mask could help with this + # This will be A LOT easier with Ganga 6.1 as you can easily map outputfiles -> inputfiles! + # TODO: implement use of include/exclude_file_mask + # + try: + outputfilenameformat = f.outputfilenameformat + except: + inputdir = job.outputdir + else: + # WARNING: The following will work only if the SharedFile puts the files in local directories ! + inputdir = '/'.join([os.path.expanduser(os.path.expandvars(getConfig('Output')['SharedFile'] + ['uploadOptions']['path'])), f.outputfilenameformat.replace('{fname}', '')]) + unit.inputdata.get_dataset(inputdir, f.namePattern) + else: + + unit = ND280Unit() + unit.inputdata = ND280LocalDataset() + + for parent in parent_units: + # unit needs to have completed and downloaded before we can get file list + if parent.status != "completed": + return None + + # we should be OK so copy all output to the dataset + for f in parent.copy_output.files: + unit.inputdata.names.append(os.path.join(parent.copy_output.local_location, f)) + + return unit diff --git a/ganga/GangaND280/Tasks/ND280Transform_CSVEvtList.py b/ganga/GangaND280/Tasks/ND280Transform_CSVEvtList.py index 3bc10f3124..a5a360fa90 100644 --- a/ganga/GangaND280/Tasks/ND280Transform_CSVEvtList.py +++ b/ganga/GangaND280/Tasks/ND280Transform_CSVEvtList.py @@ -1,11 +1,5 @@ -from GangaCore.GPIDev.Schema import * -from GangaCore.GPIDev.Lib.Tasks.common import * +from GangaCore.GPIDev.Schema import Schema, Version, SimpleItem from GangaCore.GPIDev.Lib.Tasks.ITransform import ITransform -from GangaCore.GPIDev.Lib.Job.Job import JobError -from GangaCore.GPIDev.Lib.Registry.JobRegistry import JobRegistrySlice, JobRegistrySliceProxy -from GangaCore.Core.exceptions import ApplicationConfigurationError -from GangaCore.GPIDev.Lib.Tasks.ITransform import ITransform -from GangaCore.GPIDev.Lib.Tasks.TaskLocalCopy import TaskLocalCopy from GangaCore.Utility.logging import getLogger from .ND280Unit_CSVEvtList import ND280Unit_CSVEvtList from GangaND280.ND280Dataset.ND280Dataset import ND280LocalDataset diff --git a/ganga/GangaND280/Tasks/ND280Unit.py b/ganga/GangaND280/Tasks/ND280Unit.py index 923719c787..05b6afa6bd 100644 --- a/ganga/GangaND280/Tasks/ND280Unit.py +++ b/ganga/GangaND280/Tasks/ND280Unit.py @@ -1,103 +1,98 @@ -from GangaCore.GPIDev.Schema import * -from GangaCore.GPIDev.Lib.Tasks.common import * +import os +from copy import deepcopy +from GangaCore.GPIDev.Schema import Schema, Version from GangaCore.GPIDev.Lib.Tasks.IUnit import IUnit -from GangaCore.GPIDev.Lib.Job.Job import JobError -from GangaCore.GPIDev.Lib.Registry.JobRegistry import JobRegistrySlice, JobRegistrySliceProxy -from GangaCore.Core.exceptions import ApplicationConfigurationError -from GangaCore.GPIDev.Base.Proxy import addProxy, stripProxy from GangaCore.Utility.logging import getLogger import GangaCore.GPI as GPI -import os -from copy import deepcopy - logger = getLogger() + class ND280Unit(IUnit): - _schema = Schema(Version(1,0), dict(IUnit._schema.datadict.items())) - - _category = 'units' - _name = 'ND280Unit' - _exportmethods = IUnit._exportmethods + [ ] - - def __init__(self): - super(ND280Unit, self).__init__() - - def createNewJob(self): - """Create any jobs required for this unit""" - j = GPI.Job() - j._impl.backend = self._getParent().backend.clone() - j._impl.application = self._getParent().application.clone() - if not self.inputdata is None: - j.inputdata = self.inputdata.clone() - - trf = self._getParent() - task = trf._getParent() - - # copy across the outputfiles - for f in trf.outputfiles: - j.outputfiles += [f.clone()] - - j.inputsandbox = trf.inputsandbox - - # Sort out the splitter - if trf.splitter: - j.splitter = trf.splitter.clone() - - # Postprocessors - for pp in trf.postprocessors: - j.postprocessors.append(deepcopy(pp)) - - return j - - def checkMajorResubmit(self, job): - """Check if a failed job shold be 'rebrokered' (new job created) rather than just resubmitted""" - return False - - def majorResubmit(self, job): - """Do any bookkeeping before a major resubmit is done""" - super(ND280Unit,self).majorResubmit(job) - - def reset(self): - """Reset the unit completely""" - super(ND280Unit,self).reset() - - def updateStatus(self, status): - """Update status hook""" - super(ND280Unit,self).updateStatus(status) - - def checkForSubmission(self): - """Additional checks for unit submission""" - - # call the base class - if not super(ND280Unit,self).checkForSubmission(): - return False - - return True - - def copyOutput(self): - """Copy the output data to local storage""" - - job = GPI.jobs(self.active_job_ids[0]) - - if self.copy_output._name != "TaskLocalCopy" or job.outputdata._impl._name != "DQ2OutputDataset": - logger.error("Cannot transfer from DS type '%s' to '%s'. Please contact plugin developer." % (job.outputdata._name, self.copy_output._name)) - return False - - # check which fies still need downloading - to_download = [] - for f in job.outputfiles: - - # check for REs - if self.copy_output.isValid( os.path.join(f.localDir, f.namePattern)) and not self.copy_output.isDownloaded( os.path.join(f.localDir, f.namePattern)): - to_download.append(f) - - # is everything downloaded? - if len(to_download) == 0: - return True - - # nope, so pick the requested number and off we go - for f in to_download: - f.get() - - return False + _schema = Schema(Version(1, 0), dict(IUnit._schema.datadict.items())) + + _category = 'units' + _name = 'ND280Unit' + _exportmethods = IUnit._exportmethods + [] + + def __init__(self): + super(ND280Unit, self).__init__() + + def createNewJob(self): + """Create any jobs required for this unit""" + j = GPI.Job() + j._impl.backend = self._getParent().backend.clone() + j._impl.application = self._getParent().application.clone() + if self.inputdata is not None: + j.inputdata = self.inputdata.clone() + + trf = self._getParent() + + # copy across the outputfiles + for f in trf.outputfiles: + j.outputfiles += [f.clone()] + + j.inputsandbox = trf.inputsandbox + + # Sort out the splitter + if trf.splitter: + j.splitter = trf.splitter.clone() + + # Postprocessors + for pp in trf.postprocessors: + j.postprocessors.append(deepcopy(pp)) + + return j + + def checkMajorResubmit(self, job): + """Check if a failed job shold be 'rebrokered' (new job created) rather than just resubmitted""" + return False + + def majorResubmit(self, job): + """Do any bookkeeping before a major resubmit is done""" + super(ND280Unit, self).majorResubmit(job) + + def reset(self): + """Reset the unit completely""" + super(ND280Unit, self).reset() + + def updateStatus(self, status): + """Update status hook""" + super(ND280Unit, self).updateStatus(status) + + def checkForSubmission(self): + """Additional checks for unit submission""" + + # call the base class + if not super(ND280Unit, self).checkForSubmission(): + return False + + return True + + def copyOutput(self): + """Copy the output data to local storage""" + + job = GPI.jobs(self.active_job_ids[0]) + + if self.copy_output._name != "TaskLocalCopy" or job.outputdata._impl._name != "DQ2OutputDataset": + logger.error("Cannot transfer from DS type '%s' to '%s'. Please contact plugin developer." % + (job.outputdata._name, self.copy_output._name)) + return False + + # check which fies still need downloading + to_download = [] + for f in job.outputfiles: + + # check for REs + if self.copy_output.isValid(os.path.join(f.localDir, f.namePattern)) and not self.copy_output.isDownloaded(os.path.join(f.localDir, f.namePattern)): + to_download.append(f) + + # is everything downloaded? + if len(to_download) == 0: + return True + + # nope, so pick the requested number and off we go + for f in to_download: + f.get() + + return False diff --git a/ganga/GangaND280/Tasks/ND280Unit_CSVEvtList.py b/ganga/GangaND280/Tasks/ND280Unit_CSVEvtList.py index 67311a7aeb..4666f12309 100644 --- a/ganga/GangaND280/Tasks/ND280Unit_CSVEvtList.py +++ b/ganga/GangaND280/Tasks/ND280Unit_CSVEvtList.py @@ -1,128 +1,123 @@ -from GangaCore.GPIDev.Schema import * -from GangaCore.GPIDev.Lib.Tasks.common import * +import os +from GangaCore.GPIDev.Schema import Schema, Version, SimpleItem from GangaCore.GPIDev.Lib.Tasks.IUnit import IUnit -from GangaCore.GPIDev.Lib.Job.Job import JobError -from GangaCore.GPIDev.Lib.Registry.JobRegistry import JobRegistrySlice, JobRegistrySliceProxy -from GangaCore.Core.exceptions import ApplicationConfigurationError -from GangaCore.GPIDev.Base.Proxy import addProxy, stripProxy from GangaCore.Utility.logging import getLogger import GangaCore.GPI as GPI -import os - logger = getLogger() + class ND280Unit_CSVEvtList(IUnit): - _schema = Schema(Version(1,0), dict(list(IUnit._schema.datadict.items()) + list({ - 'subpartid' : SimpleItem(defvalue=-1,typelist=['int'],doc='Index of this unit which is important when the original CSV file was split by the transform. Otherwise leave it at -1'), - 'eventswanted' : SimpleItem(defvalue='',typelist=['str','list'],doc='CSV list of run, subrun, and event numbers wanted.'), + _schema = Schema(Version(1, 0), dict(list(IUnit._schema.datadict.items()) + list({ + 'subpartid': SimpleItem(defvalue=-1, typelist=['int'], doc='Index of this unit which is important when the original CSV file was split by the transform. Otherwise leave it at -1'), + 'eventswanted': SimpleItem(defvalue='', typelist=['str', 'list'], doc='CSV list of run, subrun, and event numbers wanted.'), }.items()))) - _category = 'units' - _name = 'ND280Unit_CSVEvtList' - _exportmethods = IUnit._exportmethods + [ ] - - def __init__(self): - super(ND280Unit_CSVEvtList, self).__init__() - - def createNewJob(self): - """Create any jobs required for this unit""" - j = GPI.Job() - j._impl.backend = self._getParent().backend.clone() - j._impl.application = self._getParent().application.clone() - j.inputdata = self.inputdata.clone() - - trf = self._getParent() - task = trf._getParent() - - # copy across the outputfiles - for f in trf.outputfiles: - j.outputfiles += [f.clone()] - - j.inputsandbox = trf.inputsandbox - - if type(self.eventswanted) == type(''): - subLines = self.eventswanted - else: - subLines = '\n'.join(self.eventswanted) - # Base for the naming of each subjob's CSV file - incsvfile = j._impl.application.csvfile - tmpname = os.path.basename(incsvfile) - if len(tmpname.split('.')) > 1: - patterncsv = '.'.join(tmpname.split('.')[0:-1])+"_sub%d."+ tmpname.split('.')[-1] - else: - patterncsv = tmpname+"_sub%d" - - from GangaCore.GPIDev.Lib.File import FileBuffer - thiscsv = patterncsv % self.subpartid - - # Create the CSV file for this Unit - j._impl.getInputWorkspace().writefile(FileBuffer(thiscsv,subLines),executable=0) - j._impl.application.csvfile = j._impl.getInputWorkspace().getPath()+thiscsv - j.inputsandbox.append(j._impl.getInputWorkspace().getPath()+thiscsv) - - # Base for the naming of each subjob's output file - tmpname = os.path.basename(j._impl.application.outputfile) - if len(tmpname.split('.')) > 1: - patternout = '.'.join(tmpname.split('.')[0:-1])+"_sub%d."+ tmpname.split('.')[-1] - else: - patternout = tmpname+"_sub%d" - j._impl.application.outputfile = patternout % self.subpartid - - # Sort out the splitter - if trf.splitter: - j.splitter = trf.splitter.clone() - - return j - - def checkMajorResubmit(self, job): - """Check if a failed job shold be 'rebrokered' (new job created) rather than just resubmitted""" - return False - - def majorResubmit(self, job): - """Do any bookkeeping before a major resubmit is done""" - super(ND280Unit_CSVEvtList,self).majorResubmit(job) - - def reset(self): - """Reset the unit completely""" - super(ND280Unit_CSVEvtList,self).reset() - - def updateStatus(self, status): - """Update status hook""" - super(ND280Unit_CSVEvtList,self).updateStatus(status) - - def checkForSubmission(self): - """Additional checks for unit submission""" - - # call the base class - if not super(ND280Unit_CSVEvtList,self).checkForSubmission(): - return False - - return True - - def copyOutput(self): - """Copy the output data to local storage""" - - job = GPI.jobs(self.active_job_ids[0]) - - if self.copy_output._name != "TaskLocalCopy" or job.outputdata._impl._name != "DQ2OutputDataset": - logger.error("Cannot transfer from DS type '%s' to '%s'. Please contact plugin developer." % (job.outputdata._name, self.copy_output._name)) - return False - - # check which fies still need downloading - to_download = [] - for f in job.outputfiles: - - # check for REs - if self.copy_output.isValid( os.path.join(f.localDir, f.namePattern)) and not self.copy_output.isDownloaded( os.path.join(f.localDir, f.namePattern)): - to_download.append(f) - - # is everything downloaded? - if len(to_download) == 0: - return True - - # nope, so pick the requested number and off we go - for f in to_download: - f.get() - - return False + _category = 'units' + _name = 'ND280Unit_CSVEvtList' + _exportmethods = IUnit._exportmethods + [] + + def __init__(self): + super(ND280Unit_CSVEvtList, self).__init__() + + def createNewJob(self): + """Create any jobs required for this unit""" + j = GPI.Job() + j._impl.backend = self._getParent().backend.clone() + j._impl.application = self._getParent().application.clone() + j.inputdata = self.inputdata.clone() + + trf = self._getParent() + + # copy across the outputfiles + for f in trf.outputfiles: + j.outputfiles += [f.clone()] + + j.inputsandbox = trf.inputsandbox + + if isinstance(self.eventswanted, str): + subLines = self.eventswanted + else: + subLines = '\n'.join(self.eventswanted) + # Base for the naming of each subjob's CSV file + incsvfile = j._impl.application.csvfile + tmpname = os.path.basename(incsvfile) + if len(tmpname.split('.')) > 1: + patterncsv = '.'.join(tmpname.split('.')[0:-1]) + "_sub%d." + tmpname.split('.')[-1] + else: + patterncsv = tmpname + "_sub%d" + + from GangaCore.GPIDev.Lib.File import FileBuffer + thiscsv = patterncsv % self.subpartid + + # Create the CSV file for this Unit + j._impl.getInputWorkspace().writefile(FileBuffer(thiscsv, subLines), executable=0) + j._impl.application.csvfile = j._impl.getInputWorkspace().getPath() + thiscsv + j.inputsandbox.append(j._impl.getInputWorkspace().getPath() + thiscsv) + + # Base for the naming of each subjob's output file + tmpname = os.path.basename(j._impl.application.outputfile) + if len(tmpname.split('.')) > 1: + patternout = '.'.join(tmpname.split('.')[0:-1]) + "_sub%d." + tmpname.split('.')[-1] + else: + patternout = tmpname + "_sub%d" + j._impl.application.outputfile = patternout % self.subpartid + + # Sort out the splitter + if trf.splitter: + j.splitter = trf.splitter.clone() + + return j + + def checkMajorResubmit(self, job): + """Check if a failed job shold be 'rebrokered' (new job created) rather than just resubmitted""" + return False + + def majorResubmit(self, job): + """Do any bookkeeping before a major resubmit is done""" + super(ND280Unit_CSVEvtList, self).majorResubmit(job) + + def reset(self): + """Reset the unit completely""" + super(ND280Unit_CSVEvtList, self).reset() + + def updateStatus(self, status): + """Update status hook""" + super(ND280Unit_CSVEvtList, self).updateStatus(status) + + def checkForSubmission(self): + """Additional checks for unit submission""" + + # call the base class + if not super(ND280Unit_CSVEvtList, self).checkForSubmission(): + return False + + return True + + def copyOutput(self): + """Copy the output data to local storage""" + + job = GPI.jobs(self.active_job_ids[0]) + + if self.copy_output._name != "TaskLocalCopy" or job.outputdata._impl._name != "DQ2OutputDataset": + logger.error("Cannot transfer from DS type '%s' to '%s'. Please contact plugin developer." % + (job.outputdata._name, self.copy_output._name)) + return False + + # check which fies still need downloading + to_download = [] + for f in job.outputfiles: + + # check for REs + if self.copy_output.isValid(os.path.join(f.localDir, f.namePattern)) and not self.copy_output.isDownloaded(os.path.join(f.localDir, f.namePattern)): + to_download.append(f) + + # is everything downloaded? + if len(to_download) == 0: + return True + + # nope, so pick the requested number and off we go + for f in to_download: + f.get() + + return False