Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
tcjennings committed Feb 5, 2025
1 parent 9aa9c07 commit e86c1dc
Show file tree
Hide file tree
Showing 18 changed files with 244 additions and 102 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ cython_debug/
.pypirc

# Local Ignores
outputs/
output/
prod_area/
build/

Expand Down
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,16 @@ migrate: export DB__URL=postgresql://${PGHOST}/${PGDATABASE}
migrate: run-compose
alembic upgrade head

.PHONY: unmigrate
unmigrate: export PGUSER=cm-service
unmigrate: export PGDATABASE=cm-service
unmigrate: export PGHOST=localhost
unmigrate: export DB__PORT=$(shell docker compose port postgresql 5432 | cut -d: -f2)
unmigrate: export DB__PASSWORD=INSECURE-PASSWORD
unmigrate: export DB__URL=postgresql://${PGHOST}/${PGDATABASE}
unmigrate: run-compose
alembic downgrade base


#------------------------------------------------------------------------------
# Targets for developers to debug running against local sqlite. Can be used on
Expand Down
3 changes: 0 additions & 3 deletions examples/example_standard_scripts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
name: bps_panda_submit_script
handler: lsst.cmservice.handlers.jobs.PandaScriptHandler
data:
wms: panda
bps_wms_yaml_file: "${CTRL_BPS_PANDA_DIR}/config/bps_usdf.yaml"
# Run a bps report script
- SpecBlock:
Expand All @@ -59,8 +58,6 @@
- SpecBlock:
name: bps_htcondor_submit_script
handler: lsst.cmservice.handlers.jobs.HTCondorScriptHandler
data:
wms: htcondor
# Run a bps report script
- SpecBlock:
name: bps_htcondor_report_script
Expand Down
14 changes: 0 additions & 14 deletions examples/stack_files/bps_htcondor_usdf.yaml

This file was deleted.

9 changes: 0 additions & 9 deletions examples/templates/example_bps_core_script_template.yaml

This file was deleted.

8 changes: 0 additions & 8 deletions examples/templates/example_bps_htcondor_script_template.yaml

This file was deleted.

14 changes: 12 additions & 2 deletions src/lsst/cmservice/common/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class WmsMethodEnum(enum.Enum):
panda = 1
Runs under PanDA
ht_condor = 2
htcondor = 2
Runs under HTCondor
More methods to come...
Expand All @@ -257,4 +257,14 @@ class WmsMethodEnum(enum.Enum):
default = -1
bash = 0
panda = 1
ht_condor = 2
htcondor = 2


class WmsComputeSite(enum.Enum):
"""Define a potential compute site"""

default = -1
usdf = 1
lanc = 2
ral = 3
in2p3 = 4
22 changes: 21 additions & 1 deletion src/lsst/cmservice/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pydantic import BaseModel, Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict

from .common.enums import ScriptMethodEnum, StatusEnum
from .common.enums import ScriptMethodEnum, StatusEnum, WmsComputeSite

__all__ = ["Configuration", "config"]

Expand Down Expand Up @@ -373,6 +373,11 @@ class Configuration(BaseSettings):
default=ScriptMethodEnum.htcondor,
)

compute_site: WmsComputeSite = Field(
description="The default WMS compute site",
default=WmsComputeSite.usdf,
)

mock_status: StatusEnum | None = Field(
description="A fake status to return from all operations",
default=None,
Expand All @@ -389,6 +394,7 @@ def validate_mock_status_by_name(cls, value: str | StatusEnum) -> StatusEnum | N
warn(f"Invalid mock status ({value}) provided to config, using default.")
return None

# TODO refactor these identical field validators with type generics
@field_validator("script_handler", mode="before")
@classmethod
def validate_script_method_by_name(cls, value: str | ScriptMethodEnum) -> ScriptMethodEnum:
Expand All @@ -403,6 +409,20 @@ def validate_script_method_by_name(cls, value: str | ScriptMethodEnum) -> Script
warn(f"Invalid script handler ({value}) provided to config, using default.")
return ScriptMethodEnum.htcondor

@field_validator("compute_site", mode="before")
@classmethod
def validate_compute_site_by_name(cls, value: str | WmsComputeSite) -> WmsComputeSite:
"""Use a string value to resolve an enum by its name, falling back to
the default value if an invalid input is provided.
"""
if isinstance(value, WmsComputeSite):
return value
try:
return WmsComputeSite[value]
except KeyError:
warn(f"Invalid script handler ({value}) provided to config, using default.")
return WmsComputeSite.usdf


config = Configuration()
"""Configuration for cm-service."""
11 changes: 11 additions & 0 deletions src/lsst/cmservice/db/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from ..common.enums import LevelEnum, NodeTypeEnum, ScriptMethodEnum, StatusEnum
from ..common.errors import CMBadEnumError, CMMissingRowCreateInputError
from ..config import config
from .base import Base
from .campaign import Campaign
from .element import ElementMixin
Expand Down Expand Up @@ -89,6 +90,16 @@ class Script(Base, NodeMixin):
"superseded",
]

@property
def run_method(self) -> ScriptMethodEnum:
"""Get a ``ScriptMethodEnum`` for the script, resolving the default
method as necessary.
"""
if self.method is ScriptMethodEnum.default:
return config.script_handler
else:
return self.method

@property
def level(self) -> LevelEnum:
"""Returns LevelEnum.script"""
Expand Down
96 changes: 55 additions & 41 deletions src/lsst/cmservice/handlers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,34 +58,46 @@ class BpsScriptHandler(ScriptHandler):
`parent.collections['run']`
"""

wms_method = WmsMethodEnum.default

async def _write_script(
self,
session: async_scoped_session,
script: Script,
parent: ElementMixin,
**kwargs: Any,
) -> StatusEnum:
resolved_cols = await script.resolve_collections(session)
# Database operations
await session.refresh(parent, attribute_names=["c_", "p_"])
data_dict = await script.data_dict(session)
resolved_cols = await script.resolve_collections(session)

# Resolve mandatory data element inputs. All of these values must be
# provided somewhere along the SpecBlock chain.
try:
prod_area = os.path.expandvars(data_dict["prod_area"])
butler_repo = os.path.expandvars(data_dict["butler_repo"])
lsst_version = os.path.expandvars(data_dict["lsst_version"])
lsst_distrib_dir = os.path.expandvars(data_dict["lsst_distrib_dir"])
pipeline_yaml = os.path.expandvars(data_dict["pipeline_yaml"])
run_coll = resolved_cols["run"]
input_colls = resolved_cols["inputs"]
except KeyError as msg:
raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg

# optional stuff from data_dict
rescue = data_dict.get("rescue", False)
skip_colls = data_dict.get("skip_colls", "")
custom_lsst_setup = data_dict.get("custom_lsst_setup", None)
bps_wms_yaml_file = data_dict.get("bps_wms_yaml_file", None)
bps_wms_clustering_file = data_dict.get("bps_wms_clustering_file", None)
bps_wms_resources_file = data_dict.get("bps_wms_resources_file", None)
data_query = data_dict.get("data_query", None)
extra_qgraph_options = data_dict.get("extra_qgraph_options", None)
# workflow_config is the values dictionary to use while rendering a
# yaml template, NOT the yaml template itself!
workflow_config: dict[str, Any] = {}
workflow_config["project"] = parent.p_.name # type: ignore
workflow_config["campaign"] = parent.c_.name # type: ignore
workflow_config["pipeline_yaml"] = pipeline_yaml
workflow_config["lsst_version"] = lsst_version
workflow_config["lsst_distrib_dir"] = lsst_distrib_dir
workflow_config["wms"] = self.wms_method.name
workflow_config["script_method"] = script.run_method.name
workflow_config["compute_site"] = data_dict.get("compute_site", self.default_compute_site.name)
workflow_config["custom_lsst_setup"] = data_dict.get("custom_lsst_setup", None)
workflow_config["extra_qgraph_options"] = data_dict.get("extra_qgraph_options", None)

# Get the output file paths
script_url = await self._set_script_files(session, script, prod_area)
Expand All @@ -94,39 +106,39 @@ async def _write_script(
os.path.expandvars(f"{prod_area}/{script.fullname}_bps_config.yaml")
).resolve()
log_url = await Path(os.path.expandvars(f"{prod_area}/{script.fullname}.log")).resolve()

config_path = await Path(config_url).resolve()
submit_path = await Path(f"{prod_area}/{parent.fullname}/submit").resolve()
workflow_config["submit_path"] = str(submit_path)

try:
await run_in_threadpool(shutil.rmtree, submit_path)
except FileNotFoundError:
pass

# build up the bps wrapper script
command = f"{config.bps.bps_bin} --log-file {json_url} --no-log-tty submit {config_path} > {log_url}"
await write_bash_script(script_url, command, values=workflow_config)

await write_bash_script(script_url, command, values=data_dict)

# Collect values for and render bps submit yaml from template
await session.refresh(parent, attribute_names=["c_", "p_"])
# FIXME at this point, how could the following path *not* exist?
# is this meant to be `config_url` instead?
await Path(script_url).parent.mkdir(parents=True, exist_ok=True)

# workflow_config becomes values dictionary to use while rendering a
# yaml template, NOT the yaml template itself!
workflow_config: dict[str, Any] = {}
workflow_config["project"] = parent.p_.name # type: ignore
workflow_config["campaign"] = parent.c_.name # type: ignore
workflow_config["submit_path"] = str(submit_path)
workflow_config["lsst_version"] = os.path.expandvars(lsst_version)
workflow_config["pipeline_yaml"] = pipeline_yaml
workflow_config["custom_lsst_setup"] = custom_lsst_setup
workflow_config["extra_qgraph_options"] = extra_qgraph_options
workflow_config["extra_yaml_literals"] = []

include_configs = []
for to_include_ in [bps_wms_yaml_file, bps_wms_clustering_file, bps_wms_resources_file]:

# FIXME `bps_wms_*_file` should be added to the generic list of
# `bps_wms_extra_files` instead of being specific keywords. The only
# reason they are kept separate is to support overrides of their
# specific role
bps_wms_extra_files = data_dict.get("bps_wms_extra_files", [])
bps_wms_clustering_file = data_dict.get("bps_wms_clustering_file", None)
bps_wms_resources_file = data_dict.get("bps_wms_resources_file", None)
bps_wms_yaml_file = data_dict.get("bps_wms_yaml_file", None)
for to_include_ in [
bps_wms_yaml_file,
bps_wms_clustering_file,
bps_wms_resources_file,
*bps_wms_extra_files,
]:
if to_include_:
# We want abspaths, but we need to be careful about
# envvars that are not yet expanded
Expand All @@ -141,17 +153,14 @@ async def _write_script(
# Otherwise, instead of including it we should render it out
# because it's a path we understand but the bps runtime won't
to_include_ = await Path(to_include_).resolve()
# async load the text of the file and if it is valid yaml
# append it to the extra_yaml_literals

try:
include_yaml_ = yaml.dump(yaml.safe_load(await to_include_.read_text()))
workflow_config["extra_yaml_literals"].append(include_yaml_)
except yaml.YAMLError:
logger.exception()
raise

# FIXME include this in the list of potential include files above
# include_configs += bps_wms_extra_files
workflow_config["include_configs"] = include_configs

if isinstance(input_colls, list): # pragma: no cover
Expand All @@ -160,14 +169,14 @@ async def _write_script(
in_collection = input_colls

payload = {
"payloadName": parent.c_.name, # type: ignore
"butlerConfig": butler_repo,
"outputRun": run_coll,
"inCollection": in_collection,
"name": parent.c_.name, # type: ignore
"butler_config": butler_repo,
"output_run_collection": run_coll,
"input_collection": in_collection,
"data_query": data_dict.get("data_query", None),
}
if data_query:
payload["dataQuery"] = data_query.replace("\n", " ").strip()
if rescue: # pragma: no cover
if data_dict.get("rescue", False): # pragma: no cover
skip_colls = data_dict.get("skip_colls", "")
payload["extra_args"] = f"--skip-existing-in {skip_colls}"

workflow_config["payload"] = payload
Expand Down Expand Up @@ -425,7 +434,7 @@ def get_job_id(cls, bps_dict: dict) -> str:
class HTCondorScriptHandler(BpsScriptHandler):
"""Class to handle running Bps for ht_condor jobs"""

wms_method = WmsMethodEnum.ht_condor
wms_method = WmsMethodEnum.htcondor

@classmethod
def get_job_id(cls, bps_dict: dict) -> str:
Expand Down Expand Up @@ -465,10 +474,15 @@ async def _write_script(
graph_url = await Path(f"{prod_area}/{parent.fullname}/submit/{qgraph_file}").resolve()
report_url = await Path(f"{prod_area}/{parent.fullname}/submit/manifest_report.yaml").resolve()

template_values = {
"script_method": script.run_method.name,
**data_dict,
}

command = (
f"{config.bps.pipetask_bin} report --full-output-filename {report_url} {butler_repo} {graph_url}"
)
await write_bash_script(script_url, command, values=data_dict)
await write_bash_script(script_url, command, values=template_values)

return StatusEnum.prepared

Expand Down
1 change: 1 addition & 0 deletions src/lsst/cmservice/handlers/script_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ class ScriptHandler(BaseScriptHandler):
"""SubClass of Handler to deal with script operations using real scripts"""

default_method = config.script_handler
default_compute_site = config.compute_site

@staticmethod
async def _check_stamp_file(
Expand Down
Loading

0 comments on commit e86c1dc

Please sign in to comment.