Skip to content

Commit

Permalink
Update grid workflow and sandbox tasks to support new RWTH htcondor CE.
Browse files Browse the repository at this point in the history
  • Loading branch information
yrath committed Sep 3, 2020
1 parent 24bbe0f commit 910fc4b
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 87 deletions.
159 changes: 91 additions & 68 deletions analysis/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,58 @@ def req(dataset, shift):
return collections.OrderedDict([(params, req(*params)) for params in params_list])


class GridWorkflow(AnalysisTask, law.glite.GLiteWorkflow, law.arc.ARCWorkflow):
class HTCondorWorkflow(law.htcondor.HTCondorWorkflow):

htcondor_pool = luigi.Parameter(default="", significant=False, description="target "
"htcondor pool")
htcondor_scheduler = luigi.Parameter(default="", significant=False, description="target "
"htcondor scheduler")

htcondor_ce = law.CSVParameter(default=(), significant=False, description="target arc computing "
"element(s), default: ()")

def htcondor_use_local_scheduler(self):
return True


class AnalysisSandboxTask(law.SandboxTask):

allow_empty_sandbox = True
req_sandbox = "slc7" # sandbox key

def __init__(self, *args, **kwargs):
self.singularity_forward_law = lambda: False
self.singularity_allow_binds = lambda: False
super(AnalysisSandboxTask, self).__init__(*args, **kwargs)

def singularity_args(self):
if os.environ.get("JTSF_ON_GRID", 0) == "1":
return ["--bind", "/cvmfs", "-H", os.environ["LAW_JOB_HOME"]]
else:
return []

def sandbox_setup_cmds(self):
cmds = super(AnalysisSandboxTask, self).sandbox_setup_cmds()

if os.environ.get("JTSF_ON_GRID") == "1":
for var in ["JTSF_DATA", "JTSF_STORE", "JTSF_LOCAL_CACHE",
"JTSF_GRID_USER", "JTSF_ON_GRID", "TMP", "LAW_JOB_HOME"]:
cmds.append('export {}="{}"'.format(var, os.environ[var]))
# environment variables that may differ between sandbox and outer layer
for var in ["JTSF_SOFTWARE", "CMSSW_VERSION", "CMSSW_BASE", "X509_USER_PROXY"]:
cmds.append('export {}="{}"'.format(var, os.environ["SANDBOX_" + var]))

cmds.append('export JTSF_CMSSW_SETUP="{}"'.format(os.environ["JTSF_CMSSW_SETUP"]))
cmds.append("source {}".format(os.path.join(os.environ["JTSF_BASE"], "setup.sh")))
cmds.append("source {}".format(os.path.join(
os.environ["JTSF_BASE"], "singularity", "setup_{}.sh".format(self.req_sandbox)))
)
return cmds


class GridWorkflow(AnalysisTask, AnalysisSandboxTask, law.glite.GLiteWorkflow, law.arc.ARCWorkflow, HTCondorWorkflow):

glite_ce_map = {
"RWTH": "grid-ce.physik.rwth-aachen.de:8443/cream-pbs-cms",
"RWTH_short": "grid-ce.physik.rwth-aachen.de:8443/cream-pbs-short",
"CNAF": [
"ce04-lcg.cr.cnaf.infn.it:8443/cream-lsf-cms",
"ce05-lcg.cr.cnaf.infn.it:8443/cream-lsf-cms",
Expand All @@ -238,21 +285,34 @@ class GridWorkflow(AnalysisTask, law.glite.GLiteWorkflow, law.arc.ARCWorkflow):
"DESY": "grid-arcce0.desy.de",
"KIT": ["arc-{}-kit.gridka.de".format(i) for i in range(1, 6 + 1)],
}
htcondor_ce_map = {
"RWTH": "grid-ce-1-rwth.gridka.de grid-ce-1-rwth.gridka.de:9619", # input to grid_resource
}

sl_distribution_map = collections.defaultdict(lambda: "slc7", {"RWTH": "slc6"})
req_sandbox = "slc7" # sandbox to run in on the grid

sl_distribution_map = collections.defaultdict(lambda: "slc7")
grid_ce = law.CSVParameter(default=["RWTH"], significant=False, description="target computing "
"element(s)")

sandbox = "singularity::/cvmfs/singularity.opensciencegrid.org/cmssw/cms:rhel7-m20200612"

exclude_params_branch = {"grid_ce"}

@classmethod
def modify_param_values(cls, params):
params = AnalysisTask.modify_param_values(params)
if "workflow" in params and law.is_no_param(params["workflow"]):
grid_ce = params["grid_ce"]
workflow = "arc" if grid_ce[0] in cls.arc_ce_map else "glite"

# figure out ce version
if grid_ce[0] in cls.arc_ce_map:
workflow = "arc"
elif grid_ce[0] in cls.glite_ce_map:
workflow = "glite"
elif grid_ce[0] in cls.htcondor_ce_map:
workflow = "htcondor"
else:
raise ValueError("Unknown computing element type {}".format(grid_ce[0]))

ces = []
for ce in grid_ce:
ces.append(getattr(cls, workflow + "_ce_map").get(ce, ce))
Expand All @@ -261,20 +321,16 @@ def modify_param_values(cls, params):
return params

def _setup_workflow_requires(self, reqs):
# figure out if the CE runs the same operating system as we are
# if not, upload the software and cmssw from an appropriate sandbox
self.sl_dist_version = os.getenv("JTSF_DIST_VERSION")

if not len(set([self.sl_distribution_map[ce] for ce in self.grid_ce])) == 1:
raise Exception("Cannot submit to multiple CEs running different distributions.")
self.remote_sl_dist_version = self.sl_distribution_map[self.grid_ce[0]]

reqs["cmssw"] = UploadCMSSW.req(self, replicas=10, _prefer_cli=["replicas"],
sandbox=self.config_inst.get_aux("sandboxes")[self.req_sandbox])
sandbox=self.sandbox)
reqs["software"] = UploadSoftware.req(self, replicas=10, _prefer_cli=["replicas"],
sandbox=self.config_inst.get_aux("sandboxes")[self.remote_sl_dist_version])
reqs["sandbox_software"] = UploadSoftware.req(self, replicas=10, _prefer_cli=["replicas"],
sandbox=self.config_inst.get_aux("sandboxes")[self.req_sandbox])
sandbox=self.sandbox)
reqs["repo"] = UploadRepo.req(self, replicas=10, _prefer_cli=["replicas"])

def _setup_render_variables(self, config, reqs):
Expand All @@ -289,6 +345,7 @@ def _setup_render_variables(self, config, reqs):
config.render_variables["repo_checksum"] = reqs["repo"].checksum
config.render_variables["repo_base"] = reqs["repo"].output().dir.uri()

# GLITE
def glite_workflow_requires(self):
reqs = law.glite.GLiteWorkflow.glite_workflow_requires(self)
self._setup_workflow_requires(reqs)
Expand All @@ -309,6 +366,7 @@ def glite_job_config(self, config, job_num, branches):
config.vo = "cms:/cms/dcms"
return config

# ARC
def arc_workflow_requires(self):
reqs = law.arc.ARCWorkflow.arc_workflow_requires(self)
self._setup_workflow_requires(reqs)
Expand All @@ -330,37 +388,36 @@ def arc_job_config(self, config, job_num, branches):
def arc_stageout_file(self):
return law.util.rel_path(__file__, "files", "arc_stageout.sh")


class HTCondorWorkflow(law.htcondor.HTCondorWorkflow):
"""
Batch systems are typically very heterogeneous by design, and so is HTCondor. Law does not aim
to "magically" adapt to all possible HTCondor setups which would certainly end in a mess.
Therefore we have to configure the base HTCondor workflow in law.contrib.htcondor to work with
the VISPA environment. In most cases, like in this example, only a minimal amount of
configuration is required.
"""
htcondor_logs = luigi.BoolParameter()
htcondor_gpus = luigi.IntParameter(default=2, significant=False, description="number "
"of GPUs to request on the VISPA cluster")
# HTCONDOR
def htcondor_workflow_requires(self):
reqs = law.htcondor.HTCondorWorkflow.htcondor_workflow_requires(self)
self._setup_workflow_requires(reqs)
return reqs

def htcondor_output_directory(self):
# the directory where submission meta data should be stored
return law.LocalDirectoryTarget(self.local_path())
return self.glite_output_directory()

def htcondor_output_uri(self):
return self.glite_output_uri()

def htcondor_bootstrap_file(self):
return self.glite_bootstrap_file()

def htcondor_job_config(self, config, job_num, branches):
# copy the entire environment
config.custom_content.append(("getenv", "true"))
# condor logs
self._setup_render_variables(config, self.htcondor_workflow_requires())
config.render_variables["output_uri"] = self.htcondor_output_uri()
config.universe = "grid"
config.stdout = "out.txt"
config.stderr = "err.txt"
config.log = "log.txt"
config.custom_content.append(("grid_resource", "condor {}".format(self.htcondor_ce[0])))
config.custom_content.append(("use_x509userproxy", "true"))

if self.htcondor_gpus > 0:
config.custom_content.append(("request_gpus", self.htcondor_gpus))

config.custom_content.append(("RequestMemory", "16000"))
return config

def htcondor_stageout_file(self):
return self.arc_stageout_file()


class InstallCMSSWCode(AnalysisTask):

Expand Down Expand Up @@ -403,40 +460,6 @@ def run(self):
output.touch(self.checksum)


class AnalysisSandboxTask(law.SandboxTask):

allow_empty_sandbox = True

def __init__(self, *args, **kwargs):
self.singularity_forward_law = lambda: False
self.singularity_allow_binds = lambda: False
super(AnalysisSandboxTask, self).__init__(*args, **kwargs)

def singularity_args(self):
if os.environ.get("JTSF_ON_GRID", 0) == "1":
return ["--bind", "/cvmfs"]
else:
return []

def sandbox_setup_cmds(self):
cmds = super(AnalysisSandboxTask, self).sandbox_setup_cmds()

if os.environ.get("JTSF_ON_GRID") == "1":
for var in ["JTSF_DATA", "JTSF_STORE", "JTSF_LOCAL_CACHE",
"JTSF_GRID_USER", "JTSF_ON_GRID", "TMP", "LAW_JOB_HOME"]:
cmds.append('export {}="{}"'.format(var, os.environ[var]))
# environment variables that may differ between sandbox and outer layer
for var in ["JTSF_SOFTWARE", "CMSSW_VERSION", "CMSSW_BASE", "X509_USER_PROXY"]:
cmds.append('export {}="{}"'.format(var, os.environ["SANDBOX_" + var]))

cmds.append('export JTSF_CMSSW_SETUP="{}"'.format(os.environ["JTSF_CMSSW_SETUP"]))
cmds.append("source {}".format(os.path.join(os.environ["JTSF_BASE"], "setup.sh")))
cmds.append("source {}".format(os.path.join(
os.environ["JTSF_BASE"], "singularity", "setup_slc7.sh"))
)
return cmds


class UploadCMSSW(AnalysisTask, law.tasks.TransferLocalFile, AnalysisSandboxTask,
law.cms.BundleCMSSW):

Expand Down
10 changes: 5 additions & 5 deletions analysis/tasks/files/arc_stageout.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ action() {

if [ -f "$src_dir/$file_name" ]; then
echo "stageout $msg_name $src_dir/$file_name"
PATH="$PATH_ORIG" \
PYTHONPATH="$PYTHONPATH_ORIG" \
LD_LIBRARY_PATH="$LD_LIBRARY_PATH_ORIG" \
GFAL_PLUGIN_DIR="$GFAL_PLUGIN_DIR_ORIG" \
gfal-copy "$src_dir/$file_name" "$output_uri/$file_name"
python -c "import gfal2;\
ctx = gfal2.creat_context();\
params = ctx.transfer_parameters();\
params.overwrite = True;\
ctx.filecopy(params, 'file://$src_dir/$file_name', '$output_uri/$file_name')"
else
2>&1 echo "cannot stageout missing $msg_name $src_dir/$file_name"
fi
Expand Down
19 changes: 14 additions & 5 deletions analysis/tasks/hists.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
from collections import defaultdict

from analysis.config.jet_tagging_sf import CategoryGetter
from analysis.tasks.base import AnalysisTask, DatasetTask, ShiftTask, WrapperTask, GridWorkflow, HTCondorWorkflow
from analysis.tasks.base import AnalysisTask, DatasetTask, ShiftTask, WrapperTask, GridWorkflow, AnalysisSandboxTask
from analysis.tasks.trees import MergeTrees, MergeMetaData
from analysis.tasks.external import CalculatePileupWeights
from analysis.util import TreeExtender, walk_categories, format_shifts


class WriteHistograms(DatasetTask, GridWorkflow, law.LocalWorkflow, HTCondorWorkflow):
class WriteHistograms(DatasetTask, GridWorkflow, law.LocalWorkflow):

iteration = luigi.IntParameter(default=0, description="iteration of the scale factor "
"calculation, starting at zero, default: 0")
Expand Down Expand Up @@ -113,8 +113,10 @@ def requires(self):

def store_parts(self):
binning_part = "optimized" if self.optimize_binning else "default"
variable_part = self.variable_tag if self.variable_tag else "all"
shift_part = "_".join(self.used_shifts) if self.used_shifts else "all"
return super(WriteHistograms, self).store_parts() + (self.b_tagger,) + (self.iteration,) \
+ (binning_part,)
+ (variable_part,) + (shift_part,) + (binning_part,)

def output(self):
return self.wlcg_target("hists_{}.root".format(self.branch))
Expand Down Expand Up @@ -329,7 +331,7 @@ def run(self):
for shift in self.shifts:
nominal_sfs = inp["sf"]["nominal"]["sf"] if shift.startswith("c_stat") \
else None
weighters.append(self.get_scale_factor_weighter( # TODO
weighters.append(self.get_scale_factor_weighter(
inp["sf"], shift,
nominal_sfs=nominal_sfs)
)
Expand Down Expand Up @@ -480,9 +482,14 @@ class MergeHistograms(GridWorkflow, law.tasks.CascadeMerge):

b_tagger = WriteHistograms.b_tagger
category_tags = WriteHistograms.category_tags
variable_tag = WriteHistograms.variable_tag
used_shifts = WriteHistograms.used_shifts

merge_factor = 13

sandbox = "singularity::/cvmfs/singularity.opensciencegrid.org/cmssw/cms:rhel6-m20200612"
req_sandbox = "slc6"

def create_branch_map(self):
return law.tasks.CascadeMerge.create_branch_map(self)

Expand Down Expand Up @@ -518,8 +525,10 @@ def cascade_requires(self, start_leaf, end_leaf):

def store_parts(self):
binning_part = "optimized" if self.optimize_binning else "default"
variable_part = self.variable_tag if self.variable_tag else "all"
shift_part = "_".join(self.used_shifts) if self.used_shifts else "all"
return super(MergeHistograms, self).store_parts() + (self.b_tagger,) + (self.iteration,) \
+ (binning_part,)
+ (variable_part,) + (shift_part,) + (binning_part,)

def cascade_output(self):
return self.wlcg_target("hists.root")
Expand Down
8 changes: 3 additions & 5 deletions analysis/tasks/trees.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,20 @@
import luigi
import six

from analysis.tasks.base import AnalysisTask, DatasetTask, WrapperTask, GridWorkflow, HTCondorWorkflow, AnalysisSandboxTask
from analysis.tasks.base import AnalysisTask, DatasetTask, WrapperTask, GridWorkflow
from analysis.tasks.external import GetDatasetLFNs, DownloadSetupFiles
from analysis.util import wget, determine_xrd_redirector
from analysis.config.jet_tagging_sf import xrd_redirectors


class WriteTrees(DatasetTask, AnalysisSandboxTask, GridWorkflow, law.LocalWorkflow):
class WriteTrees(DatasetTask, GridWorkflow, law.LocalWorkflow):

max_events = luigi.IntParameter(default=law.NO_INT)
workflow_run_decorators = [law.decorator.notify]

stream_input_file = False
xrdcp_attempts = 3

sandbox = "singularity::/cvmfs/singularity.opensciencegrid.org/cmssw/cms:rhel7-m20200612"

def workflow_requires(self):
if self.cancel_jobs or self.cleanup_jobs:
return {}
Expand Down Expand Up @@ -166,7 +164,7 @@ class WriteTreesWrapper(WrapperTask):
wrapped_task = WriteTrees


class MergeTrees(DatasetTask, AnalysisSandboxTask, law.tasks.CascadeMerge, GridWorkflow):
class MergeTrees(DatasetTask, law.tasks.CascadeMerge, GridWorkflow):

merge_factor = 25

Expand Down
2 changes: 1 addition & 1 deletion cmssw/setup_Legacy18.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ action() {
# E-gamma
git cms-addpkg EgammaAnalysis/ElectronTools #check out the package otherwise code accessing it will crash
rm EgammaAnalysis/ElectronTools/data -rf #delete the data directory so we can populate it ourselves
git clone git@github.com:cms-data/EgammaAnalysis-ElectronTools.git EgammaAnalysis/ElectronTools/data
git clone https://github.com/cms-data/EgammaAnalysis-ElectronTools.git EgammaAnalysis/ElectronTools/data

# fix for new JER version
# git cms-merge-topic ahinzmann:resolutionSmearingFix102
Expand Down
7 changes: 4 additions & 3 deletions setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ action() {
fi

# other defaults
[ -z "$JTSF_SOFTWARE" ] && export JTSF_SOFTWARE="$JTSF_DATA/$JTSF_DIST_VERSION/software/$( whoami )"
[ -z "$JTSF_STORE" ] && export JTSF_STORE="$JTSF_DATA/store"
[ -z "$JTSF_LOCAL_CACHE" ] && export JTSF_LOCAL_CACHE="$JTSF_DATA/cache"
[ -z "$JTSF_CMSSW_SETUP" ] && export JTSF_CMSSW_SETUP="Legacy18"
[ -z "$JTSF_CAMPAIGN" ] && export JTSF_CAMPAIGN="Run2_pp_13TeV_$JTSF_CMSSW_SETUP"
[ -z "$JTSF_SOFTWARE" ] && export JTSF_SOFTWARE="$JTSF_DATA/$JTSF_CMSSW_SETUP/$JTSF_DIST_VERSION/software/$( whoami )"
[ -z "$JTSF_STORE" ] && export JTSF_STORE="$JTSF_DATA/store"
[ -z "$JTSF_LOCAL_CACHE" ] && export JTSF_LOCAL_CACHE="$JTSF_DATA/cache"


# default CMSSW setup when on VISPA or otherwise set
[ "$JTSF_ON_VISPA" = "1" ] && export JTSF_CMSSW_SETUP="NONE"
Expand Down

0 comments on commit 910fc4b

Please sign in to comment.