Skip to content

Commit

Permalink
feat(allocate): Allocate wms resources at end of daemon iteration
Browse files Browse the repository at this point in the history
  • Loading branch information
tcjennings committed Jan 30, 2025
1 parent df58e0f commit 7116838
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 12 deletions.
112 changes: 111 additions & 1 deletion src/lsst/cmservice/common/daemon.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import importlib.util
import os
import sys
from datetime import datetime, timedelta

from sqlalchemy.ext.asyncio import async_scoped_session
from sqlalchemy.future import select

from ..common.logging import LOGGER
from ..config import config
from ..db.queue import Queue
from ..db.script import Script
from .htcondor import build_htcondor_submit_environment
from .logging import LOGGER

logger = LOGGER.bind(module=__name__)

Expand All @@ -20,6 +24,7 @@ async def daemon_iteration(session: async_scoped_session) -> None:
# TODO: should the daemon check any campaigns with a state == prepared that
# do not have queues? Queue creation should not be a manual step.
queue_entry: Queue
processed_nodes = 0
for (queue_entry,) in queue_entries:
try:
queued_node = await queue_entry.get_node(session)
Expand All @@ -30,9 +35,11 @@ async def daemon_iteration(session: async_scoped_session) -> None:
):
logger.info("Processing queue_entry %s", queued_node.fullname)
await queue_entry.process_node(session)
processed_nodes += 1
sleep_time = await queue_entry.node_sleep_time(session)
else:
# Put this entry to sleep for a while
logger.debug("Not processing queue_entry %s", queued_node.fullname)
sleep_time = config.daemon.processing_interval
time_next_check = iteration_start + timedelta(seconds=sleep_time)
queue_entry.time_next_check = time_next_check
Expand All @@ -41,3 +48,106 @@ async def daemon_iteration(session: async_scoped_session) -> None:
logger.exception()
continue
await session.commit()

# Try to allocate resources at the end of the loop, but do not crash if it
# doesn't work.
# FIXME this could be run async
try:
if config.daemon.allocate_resources and processed_nodes > 0:
allocate_resources()
except Exception:
logger.exception()


def allocate_resources() -> None:
"""Allocate resources for htcondor jobs submitted during the daemon
iteration.
"""
if (htcondor := sys.modules.get("htcondor")) is not None:
pass
elif (importlib.util.find_spec("htcondor")) is not None:
htcondor = importlib.import_module("htcondor")

if htcondor is None:
logger.warning("HTcondor not available, will not allocate resources")
return

# Ensure environment is configured for htcondor operations
# FIXME: the python process needs the correct condor env set up. Alternate
# to setting these values JIT in the os.environ would be to hack a way to
# have the config.htcondor submodel's validation_alias match the
# serialization_alias, e.g., "_CONDOR_value"
condor_environment = config.htcondor.model_dump(by_alias=True)
os.environ |= condor_environment

coll = htcondor.Collector(config.htcondor.collector_host)

# Do we need to allocate resources? i.e., are there idle condor jobs for
# which we are responsible?

# TODO condor query for idle jobs with our batch_name
# FIXME we should round-robin submits to available schedds and approximate
# a global query for our idle jobs.

# schedds = coll.locateAll(htcondor.DaemonTypes.Schedd)

# Mapping of schedd ad to a list of its idle jobs
# idle_jobs = {
# ad: htcondor.Schedd(ad).query(
# projection=["ClusterId"],
# constraint="(JobStatus == 1)",
# opts=htcondor.QueryOpts.DefaultMyJobsOnly,
# )
# for ad in schedds
# }

# # Filter query result to those schedds with idle jobs
# idle_job_schedds = [k for k, v in idle_jobs.items() if v]

# if not idle_job_schedds:
# return

# the schedd to which we need to submit this job should be one where idle
# jobs are available. Pick one per daemon iteration; if there are multiple
# schedds with idle jobs, the next loop will pick it up.
# schedd = htcondor.Schedd(idle_job_schedds.pop())

# FIXME only queries the single schedd to which we are submitting jobs
schedd_ad = coll.locate(htcondor.DaemonTypes.Schedd, name=config.htcondor.schedd_host)
schedd = htcondor.Schedd(schedd_ad)

idle_jobs = schedd.query(
projection=["ClusterId"],
constraint="(JobStatus == 1)",
opts=htcondor.QueryOpts.DefaultMyJobsOnly,
)
if not idle_jobs:
return

# Set the htcondor config in the submission environment
# The environment command in the submit file is a double-quoted,
# whitespace-delimited list of name=value pairs where literal quote marks
# are doubled ("" or '').
submission_environment = " ".join([f"{k}={v}" for k, v in build_htcondor_submit_environment().items()])

# The minimum necessary submission spec executes a resoruce allocation
# script to the local universe and does not preserve the output.
submission_spec = {
"executable": f"{config.htcondor.remote_user_home}/.local/bin/allocateNodes.py",
"arguments": (
f"--auto --account {config.slurm.account} -n 50 -m 4-00:00:00 "
f"-q {config.slurm.partition} -g 240 {config.slurm.platform}"
),
"environment": f'"{submission_environment}"',
"initialdir": config.htcondor.working_directory,
"batch_name": config.htcondor.batch_name,
"universe": "local",
# "output": "allocate_resources.out",
# "error": "allocate_resources.err",
}
submit = htcondor.Submit(submission_spec)

# job cluster id of our resource allocation script; fire and forget
cluster_id = schedd.submit(submit)
logger.info("Allocating Resources with condor job %s", cluster_id.cluster())
logger.debug(cluster_id)
16 changes: 10 additions & 6 deletions src/lsst/cmservice/common/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,23 +186,27 @@ def build_htcondor_submit_environment() -> Mapping[str, str]:
should closer match the environment of an interactive sdfianaXXX user at
SLAC.
"""
# TODO use all configured htcondor config settings
# condor_environment = config.htcondor.model_dump(by_alias=True)
# TODO we should not always use the same schedd host. We could get a list
# of all schedds from the collector and pick one at random.
return dict(
CONDOR_CONFIG="ONLY_ENV",
CONDOR_CONFIG=config.htcondor.config_source,
_CONDOR_CONDOR_HOST=config.htcondor.collector_host,
_CONDOR_COLLECTOR_HOST=config.htcondor.collector_host,
_CONDOR_SCHEDD_HOST=config.htcondor.schedd_host,
_CONDOR_SEC_CLIENT_AUTHENTICATION_METHODS=config.htcondor.authn_methods,
_CONDOR_DAGMAN_MANAGER_JOB_APPEND_GETENV=str(config.htcondor.dagman_job_append_get_env),
DAF_BUTLER_REPOSITORY_INDEX=config.butler.repository_index,
_CONDOR_DAGMAN_MANAGER_JOB_APPEND_GETENV="True",
FS_REMOTE_DIR=config.htcondor.fs_remote_dir,
HOME=config.htcondor.user_home,
DAF_BUTLER_REPOSITORY_INDEX=config.butler.repository_index,
HOME=config.htcondor.remote_user_home,
LSST_VERSION=config.bps.lsst_version,
LSST_DISTRIB_DIR=config.bps.lsst_distrib_dir,
# FIX: because there is no db-auth.yaml in lsstsvc1's home directory
PGPASSFILE=f"{config.htcondor.user_home}/.lsst/postgres-credentials.txt",
PGPASSFILE=f"{config.htcondor.remote_user_home}/.lsst/postgres-credentials.txt",
PGUSER=config.butler.default_username,
PATH=(
f"{config.htcondor.user_home}/.local/bin:{config.htcondor.user_home}/bin:{config.slurm.home}:"
f"{config.htcondor.remote_user_home}/.local/bin:{config.htcondor.remote_user_home}/bin:{config.slurm.home}:"
f"/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin"
),
)
29 changes: 24 additions & 5 deletions src/lsst/cmservice/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,16 @@ class HTCondorConfiguration(BaseModel):
their serialization alias.
"""

user_home: str = Field(
config_source: str = Field(
description="Source of htcondor configuration",
default="ONLY_ENV",
serialization_alias="CONDOR_CONFIG",
)

remote_user_home: str = Field(
description=("Path to the user's home directory, as resolvable from an htcondor access node."),
default="/sdf/home/l/lsstsvc1",
exclude=True,
)

condor_home: str = Field(
Expand Down Expand Up @@ -207,10 +214,12 @@ class HTCondorConfiguration(BaseModel):
serialization_alias="FS_REMOTE_DIR",
)

# FIXME: unclear if this is at all necessary
dagman_job_append_get_env: bool = Field(
description="...", default=True, serialization_alias="_CONDOR_DAGMAN_MANAGER_JOB_APPEND_GETENV"
)
# FIXME: unclear if this is necessary or specific to bps submit jobs
# dagman_job_append_get_env: str = Field(
# description="...",
# default="true",
# serialization_alias="_CONDOR_DAGMAN_MANAGER_JOB_APPEND_GETENV",
# )


# TODO deprecate and remove "slurm"-specific logic from cm-service; it is
Expand Down Expand Up @@ -249,6 +258,11 @@ class SlurmConfiguration(BaseModel):
default="milano",
)

platform: str = Field(
description="Platform requested when submitting a slurm job.",
default="s3df",
)


class AsgiConfiguration(BaseModel):
"""Configuration for the application's ASGI web server."""
Expand Down Expand Up @@ -309,6 +323,11 @@ class DaemonConfiguration(BaseModel):
Set according to DAEMON__FIELD environment variables.
"""

allocate_resources: bool = Field(
default=False,
description="Whether the daemon should try to allocate its own htcondor or slurm resources.",
)

processing_interval: int = Field(
default=30,
description=(
Expand Down

0 comments on commit 7116838

Please sign in to comment.