diff --git a/README.md b/README.md index 9d3330447..0a2b8b5cd 100644 --- a/README.md +++ b/README.md @@ -100,7 +100,8 @@ _Launch a new scan of an event_ | `"worker_memory"` | str | default: `8G` | how much memory per client worker to request | `"worker_disk"` | str | default: `1G` | how much disk per client worker to request | `"debug_mode"` | str or list | default: None | what debug mode(s) to use: `"client-logs"` collects the scanner clients' stderr/stdout including icetray logs (scans are limited in # of workers) -| `"predictive_scanning_threshold"` | float | default: `1.0` | the predictive scanning threshold [0.1, 1.0] (see [Skymap Scanner](https://github.com/icecube/skymap_scanner)) +| `"predictive_scanning_threshold"` | float | default: `1.0` | the predictive scanning threshold `[0.1, 1.0]` (see [Skymap Scanner](https://github.com/icecube/skymap_scanner)) +| `"priority"` | int | default: `0` | the relative priority of this scan -- higher values indicate higher priority. **NOTE: Values `>= 10` are reserved for Realtime alert scans (these scan requests are not throttled).** Also, see [HTCondor jobs](https://htcondor.readthedocs.io/en/latest/users-manual/priorities-and-preemption.html#job-priority) | `"classifiers"` | dict[str, str | bool | float | int] | default: `{}` | a user-defined collection of labels, attributes, etc. -- this is constrained in size and is intended for user-defined metadata only | `"manifest_projection"` | list | default: all fields but [these](#manifest-fields-excluded-by-default-in-response) | which `Manifest` fields to include in the response (include `*` to include all fields) @@ -347,6 +348,8 @@ Pseudo-code: event_i3live_json_dict: dict, scanner_server_args: str, + priority: int, + classifiers: dict[str, str | bool | float | int] event_i3live_json_dict__hash: str, # a deterministic hash of the event json diff --git a/clientmanager/clientmanager.py b/clientmanager/clientmanager.py index 4e7b33d0e..37997b51b 100644 --- a/clientmanager/clientmanager.py +++ b/clientmanager/clientmanager.py @@ -177,6 +177,11 @@ def wait_for_file(waitee: Path, wait_time: int) -> Path: type=int, help="how long each worker is allowed to run -- condor only", # TODO - set for k8s? ) + sub_parser.add_argument( + "--priority", + required=True, + help="relative priority of this job/jobs -- condor only", # TODO - set for k8s? + ) # client args sub_parser.add_argument( diff --git a/clientmanager/condor/act.py b/clientmanager/condor/act.py index 3067dc41e..5a8d3ab90 100644 --- a/clientmanager/condor/act.py +++ b/clientmanager/condor/act.py @@ -41,6 +41,7 @@ def _act(args: argparse.Namespace, schedd_obj: htcondor.Schedd) -> None: worker_disk_bytes=args.worker_disk_bytes, n_cores=args.n_cores, max_worker_runtime=args.max_worker_runtime, + priority=args.priority, # starter CL args -- client client_args=args.client_args, client_startup_json_s3=utils.s3ify(args.client_startup_json), diff --git a/clientmanager/condor/starter.py b/clientmanager/condor/starter.py index 26ced2871..2babec157 100644 --- a/clientmanager/condor/starter.py +++ b/clientmanager/condor/starter.py @@ -26,6 +26,7 @@ def make_condor_job_description( worker_disk_bytes: int, n_cores: int, max_worker_runtime: int, + priority: int, # skymap scanner args image: str, client_startup_json_s3: S3File, @@ -86,6 +87,7 @@ def make_condor_job_description( "request_disk": humanfriendly.format_size( # 1073741824 -> "1 GiB" -> "1 GB" worker_disk_bytes, binary=True ).replace("i", ""), + "priority": int(priority), "+WantIOProxy": "true", # for HTChirp "+OriginalTime": max_worker_runtime, # Execution time limit -- 1 hour default on OSG } @@ -129,6 +131,7 @@ def prep( worker_disk_bytes: int, n_cores: int, max_worker_runtime: int, + priority: int, # starter CL args -- client client_args: list[tuple[str, str]], client_startup_json_s3: S3File, @@ -156,6 +159,7 @@ def prep( worker_disk_bytes, n_cores, max_worker_runtime, + priority, # skymap scanner args image, client_startup_json_s3, diff --git a/dependencies-from-Dockerfile.log b/dependencies-from-Dockerfile.log index 53a216a17..efb59ab83 100644 --- a/dependencies-from-Dockerfile.log +++ b/dependencies-from-Dockerfile.log @@ -7,8 +7,8 @@ # pip freeze ######################################################################## backoff==2.2.1 -boto3==1.33.9 -botocore==1.33.9 +boto3==1.34.1 +botocore==1.34.1 cachetools==5.3.2 certifi==2023.11.17 cffi==1.16.0 @@ -18,9 +18,9 @@ cryptography==41.0.7 dacite==1.8.1 Deprecated==1.2.14 dnspython==2.4.2 -google-auth==2.25.1 +google-auth==2.25.2 googleapis-common-protos==1.59.1 -grpcio==1.59.3 +grpcio==1.60.0 htcondor==23.2.0 humanfriendly==10.0 idna==3.6 @@ -52,12 +52,12 @@ requests==2.31.0 requests-futures==1.0.1 requests-oauthlib==1.3.1 rsa==4.9 -s3transfer==0.8.2 +s3transfer==0.9.0 six==1.16.0 thrift==0.16.0 tornado==6.4 typeguard==4.1.5 -typing_extensions==4.8.0 +typing_extensions==4.9.0 urllib3==1.26.18 websocket-client==1.7.0 wipac-dev-tools==1.8.2 @@ -75,15 +75,15 @@ pip==23.2.1 pipdeptree==2.13.1 setuptools==65.5.1 skydriver-clientmanager -├── boto3 [required: Any, installed: 1.33.9] -│ ├── botocore [required: >=1.33.9,<1.34.0, installed: 1.33.9] +├── boto3 [required: Any, installed: 1.34.1] +│ ├── botocore [required: >=1.34.1,<1.35.0, installed: 1.34.1] │ │ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1] │ │ ├── python-dateutil [required: >=2.1,<3.0.0, installed: 2.8.2] │ │ │ └── six [required: >=1.5, installed: 1.16.0] │ │ └── urllib3 [required: >=1.25.4,<2.1, installed: 1.26.18] │ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1] -│ └── s3transfer [required: >=0.8.2,<0.9.0, installed: 0.8.2] -│ └── botocore [required: >=1.33.2,<2.0a.0, installed: 1.33.9] +│ └── s3transfer [required: >=0.9.0,<0.10.0, installed: 0.9.0] +│ └── botocore [required: >=1.33.2,<2.0a.0, installed: 1.34.1] │ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1] │ ├── python-dateutil [required: >=2.1,<3.0.0, installed: 2.8.2] │ │ └── six [required: >=1.5, installed: 1.16.0] @@ -95,7 +95,7 @@ skydriver-clientmanager ├── humanfriendly [required: Any, installed: 10.0] ├── kubernetes [required: Any, installed: 28.1.0] │ ├── certifi [required: >=14.05.14, installed: 2023.11.17] -│ ├── google-auth [required: >=1.0.1, installed: 2.25.1] +│ ├── google-auth [required: >=1.0.1, installed: 2.25.2] │ │ ├── cachetools [required: >=2.0.0,<6.0, installed: 5.3.2] │ │ ├── pyasn1-modules [required: >=0.2.1, installed: 0.3.0] │ │ │ └── pyasn1 [required: >=0.4.6,<0.6.0, installed: 0.5.1] @@ -132,20 +132,20 @@ skydriver-clientmanager │ └── urllib3 [required: >=1.21.1,<3, installed: 1.26.18] ├── tornado [required: Any, installed: 6.4] ├── typeguard [required: Any, installed: 4.1.5] -│ └── typing-extensions [required: >=4.7.0, installed: 4.8.0] +│ └── typing-extensions [required: >=4.7.0, installed: 4.9.0] ├── wipac-dev-tools [required: Any, installed: 1.8.2] │ ├── requests [required: Any, installed: 2.31.0] │ │ ├── certifi [required: >=2017.4.17, installed: 2023.11.17] │ │ ├── charset-normalizer [required: >=2,<4, installed: 3.3.2] │ │ ├── idna [required: >=2.5,<4, installed: 3.6] │ │ └── urllib3 [required: >=1.21.1,<3, installed: 1.26.18] -│ └── typing-extensions [required: Any, installed: 4.8.0] +│ └── typing-extensions [required: Any, installed: 4.9.0] └── wipac-rest-tools [required: Any, installed: 1.5.2] ├── cachetools [required: Any, installed: 5.3.2] ├── PyJWT [required: !=2.6.0, installed: 2.8.0] ├── qrcode [required: Any, installed: 7.4.2] │ ├── pypng [required: Any, installed: 0.20220715.0] - │ └── typing-extensions [required: Any, installed: 4.8.0] + │ └── typing-extensions [required: Any, installed: 4.9.0] ├── requests [required: Any, installed: 2.31.0] │ ├── certifi [required: >=2017.4.17, installed: 2023.11.17] │ ├── charset-normalizer [required: >=2,<4, installed: 3.3.2] @@ -164,7 +164,7 @@ skydriver-clientmanager │ ├── charset-normalizer [required: >=2,<4, installed: 3.3.2] │ ├── idna [required: >=2.5,<4, installed: 3.6] │ └── urllib3 [required: >=1.21.1,<3, installed: 1.26.18] - └── typing-extensions [required: Any, installed: 4.8.0] + └── typing-extensions [required: Any, installed: 4.9.0] wheel==0.42.0 wipac-telemetry==0.3.0 ├── coloredlogs [required: Any, installed: 15.0.1] @@ -178,7 +178,7 @@ wipac-telemetry==0.3.0 │ ├── opentelemetry-exporter-jaeger-proto-grpc [required: ==1.21.0, installed: 1.21.0] │ │ ├── googleapis-common-protos [required: ~=1.52,<1.60.0, installed: 1.59.1] │ │ │ └── protobuf [required: >=3.19.5,<5.0.0.dev0,!=4.21.5,!=4.21.4,!=4.21.3,!=4.21.2,!=4.21.1,!=3.20.1,!=3.20.0, installed: 4.25.1] -│ │ ├── grpcio [required: >=1.0.0,<2.0.0, installed: 1.59.3] +│ │ ├── grpcio [required: >=1.0.0,<2.0.0, installed: 1.60.0] │ │ ├── opentelemetry-api [required: ~=1.3, installed: 1.21.0] │ │ │ ├── Deprecated [required: >=1.2.6, installed: 1.2.14] │ │ │ │ └── wrapt [required: >=1.10,<2, installed: 1.16.0] @@ -191,7 +191,7 @@ wipac-telemetry==0.3.0 │ │ │ └── importlib-metadata [required: >=6.0,<7.0, installed: 6.11.0] │ │ │ └── zipp [required: >=0.5, installed: 3.17.0] │ │ ├── opentelemetry-semantic-conventions [required: ==0.42b0, installed: 0.42b0] -│ │ └── typing-extensions [required: >=3.7.4, installed: 4.8.0] +│ │ └── typing-extensions [required: >=3.7.4, installed: 4.9.0] │ └── opentelemetry-exporter-jaeger-thrift [required: ==1.21.0, installed: 1.21.0] │ ├── opentelemetry-api [required: ~=1.3, installed: 1.21.0] │ │ ├── Deprecated [required: >=1.2.6, installed: 1.2.14] @@ -205,7 +205,7 @@ wipac-telemetry==0.3.0 │ │ │ └── importlib-metadata [required: >=6.0,<7.0, installed: 6.11.0] │ │ │ └── zipp [required: >=0.5, installed: 3.17.0] │ │ ├── opentelemetry-semantic-conventions [required: ==0.42b0, installed: 0.42b0] -│ │ └── typing-extensions [required: >=3.7.4, installed: 4.8.0] +│ │ └── typing-extensions [required: >=3.7.4, installed: 4.9.0] │ └── thrift [required: >=0.10.0, installed: 0.16.0] │ └── six [required: >=1.7.2, installed: 1.16.0] ├── opentelemetry-exporter-otlp-proto-http [required: Any, installed: 1.21.0] @@ -232,7 +232,7 @@ wipac-telemetry==0.3.0 │ │ │ └── importlib-metadata [required: >=6.0,<7.0, installed: 6.11.0] │ │ │ └── zipp [required: >=0.5, installed: 3.17.0] │ │ ├── opentelemetry-semantic-conventions [required: ==0.42b0, installed: 0.42b0] -│ │ └── typing-extensions [required: >=3.7.4, installed: 4.8.0] +│ │ └── typing-extensions [required: >=3.7.4, installed: 4.9.0] │ └── requests [required: ~=2.7, installed: 2.31.0] │ ├── certifi [required: >=2017.4.17, installed: 2023.11.17] │ ├── charset-normalizer [required: >=2,<4, installed: 3.3.2] @@ -245,13 +245,13 @@ wipac-telemetry==0.3.0 │ │ └── importlib-metadata [required: >=6.0,<7.0, installed: 6.11.0] │ │ └── zipp [required: >=0.5, installed: 3.17.0] │ ├── opentelemetry-semantic-conventions [required: ==0.42b0, installed: 0.42b0] -│ └── typing-extensions [required: >=3.7.4, installed: 4.8.0] +│ └── typing-extensions [required: >=3.7.4, installed: 4.9.0] ├── protobuf [required: Any, installed: 4.25.1] -├── typing-extensions [required: Any, installed: 4.8.0] +├── typing-extensions [required: Any, installed: 4.9.0] └── wipac-dev-tools [required: Any, installed: 1.8.2] ├── requests [required: Any, installed: 2.31.0] │ ├── certifi [required: >=2017.4.17, installed: 2023.11.17] │ ├── charset-normalizer [required: >=2,<4, installed: 3.3.2] │ ├── idna [required: >=2.5,<4, installed: 3.6] │ └── urllib3 [required: >=1.21.1,<3, installed: 1.26.18] - └── typing-extensions [required: Any, installed: 4.8.0] + └── typing-extensions [required: Any, installed: 4.9.0] diff --git a/skydriver/config.py b/skydriver/config.py index 1cba6d788..4f3f08128 100644 --- a/skydriver/config.py +++ b/skydriver/config.py @@ -30,6 +30,8 @@ TMS_STOPPER_K8S_TTL_SECONDS_AFTER_FINISHED = 1 * 60 * 60 TMS_STOPPER_K8S_JOB_N_RETRIES = 6 +SCAN_MIN_PRIORITY_TO_START_NOW = 10 + @enum.unique class DebugMode(enum.Enum): diff --git a/skydriver/database/interface.py b/skydriver/database/interface.py index 28cd00ac1..fa6949c43 100644 --- a/skydriver/database/interface.py +++ b/skydriver/database/interface.py @@ -7,7 +7,7 @@ from typing import Any, AsyncIterator from motor.motor_asyncio import AsyncIOMotorClient -from pymongo import ASCENDING, ReturnDocument +from pymongo import ASCENDING, DESCENDING, ReturnDocument from tornado import web from ..config import ENV @@ -62,6 +62,7 @@ async def post( tms_args_list: list[str], env_vars: schema.EnvVars, classifiers: dict[str, str | bool | float | int], + priority: int, ) -> schema.Manifest: """Create `schema.Manifest` doc.""" LOGGER.debug("creating new manifest") @@ -78,6 +79,7 @@ async def post( env_vars=env_vars, ), classifiers=classifiers, + priority=priority, ) # db @@ -339,7 +341,10 @@ async def fetch_next_as_pending(self) -> schema.ScanBacklogEntry: "$set": {"pending_timestamp": time.time()}, "$inc": {"next_attempt": 1}, }, - sort=[("timestamp", ASCENDING)], + sort=[ + ("priority", DESCENDING), # highest first + ("timestamp", ASCENDING), # then, oldest + ], return_document=ReturnDocument.AFTER, return_dclass=schema.ScanBacklogEntry, ) diff --git a/skydriver/database/schema.py b/skydriver/database/schema.py index f43e13fbb..ad72c735e 100644 --- a/skydriver/database/schema.py +++ b/skydriver/database/schema.py @@ -47,6 +47,7 @@ class ScanBacklogEntry(ScanIDDataclass): timestamp: float pickled_k8s_job: bytes + priority: int = 0 pending_timestamp: float = 0.0 next_attempt: int = 0 @@ -249,7 +250,9 @@ class Manifest(ScanIDDataclass): ewms_task: EWMSTaskDirective # args placed in k8s job obj - scanner_server_args: str # TODO - move to TMS??? + scanner_server_args: str + + priority: int = 0 # same as https://htcondor.readthedocs.io/en/latest/users-manual/priorities-and-preemption.html#job-priority # open to requestor classifiers: dict[str, str | bool | float | int] = dc.field(default_factory=dict) diff --git a/skydriver/database/utils.py b/skydriver/database/utils.py index da6d9f4d4..e3e7a12cf 100644 --- a/skydriver/database/utils.py +++ b/skydriver/database/utils.py @@ -44,6 +44,11 @@ async def ensure_indexes(motor_client: AsyncIOMotorClient) -> None: # type: ign name="timestamp_index", unique=False, ) + await motor_client[_DB_NAME][_SCAN_BACKLOG_COLL_NAME].create_index( # type: ignore[index] + [("priority", DESCENDING)], + name="priority_index", + unique=False, + ) await motor_client[_DB_NAME][_SCAN_BACKLOG_COLL_NAME].create_index( # type: ignore[index] "scan_id", name="scan_id_index", diff --git a/skydriver/k8s/__init__.py b/skydriver/k8s/__init__.py index a72fa9df6..f32426701 100644 --- a/skydriver/k8s/__init__.py +++ b/skydriver/k8s/__init__.py @@ -2,11 +2,11 @@ import logging -import kubernetes.client # type: ignore[import] +import kubernetes.client # type: ignore[import-untyped] from kubernetes import config -from kubernetes.client.rest import ApiException # type: ignore[import] +from kubernetes.client.rest import ApiException # type: ignore[import-untyped] -from . import scanner_instance, utils # noqa: F401 # export +from . import scan_backlog, scanner_instance, utils # noqa: F401 # export LOGGER = logging.getLogger(__name__) diff --git a/skydriver/k8s/scan_backlog.py b/skydriver/k8s/scan_backlog.py index 2a30b9987..59eaabee6 100644 --- a/skydriver/k8s/scan_backlog.py +++ b/skydriver/k8s/scan_backlog.py @@ -7,7 +7,7 @@ import time import bson -import kubernetes.client # type: ignore[import] +import kubernetes.client # type: ignore[import-untyped] from motor.motor_asyncio import AsyncIOMotorClient from .. import database @@ -21,12 +21,14 @@ async def enqueue( scan_id: str, job_obj: kubernetes.client.V1Job, scan_backlog: database.interface.ScanBacklogClient, + priority: int, ) -> None: """Enqueue k8s job to be started by job-starter thread.""" entry = database.schema.ScanBacklogEntry( scan_id=scan_id, timestamp=time.time(), pickled_k8s_job=bson.Binary(pickle.dumps(job_obj)), + priority=priority, ) await scan_backlog.insert(entry) diff --git a/skydriver/k8s/scanner_instance.py b/skydriver/k8s/scanner_instance.py index e3564b276..cc6d1647c 100644 --- a/skydriver/k8s/scanner_instance.py +++ b/skydriver/k8s/scanner_instance.py @@ -9,7 +9,7 @@ import kubernetes.client # type: ignore[import-untyped] from rest_tools.client import ClientCredentialsAuth -from .. import database, images +from .. import images from ..config import ( ENV, K8S_CONTAINER_MEMORY_TMS_STARTER_BYTES, @@ -20,7 +20,6 @@ DebugMode, ) from ..database import schema -from . import scan_backlog from .utils import KubeAPITools @@ -57,13 +56,11 @@ def get_tms_s3_v1envvars() -> list[kubernetes.client.V1EnvVar]: ] -class SkymapScannerJob: +class SkymapScannerK8sWrapper: """Wraps a Skymap Scanner Kubernetes job with tools to start and manage.""" def __init__( self, - k8s_batch_api: kubernetes.client.BatchV1Api, - scan_backlog: database.interface.ScanBacklogClient, # docker_tag: str, scan_id: str, @@ -79,6 +76,7 @@ def __init__( request_clusters: list[schema.Cluster], max_pixel_reco_time: int, max_worker_runtime: int, + priority: int, # universal debug_mode: list[DebugMode], # env @@ -86,8 +84,6 @@ def __init__( skyscan_mq_client_timeout_wait_for_first_message: int | None, ): LOGGER.info(f"making k8s job for {scan_id=}") - self.k8s_batch_api = k8s_batch_api - self.scan_backlog = scan_backlog self.scan_id = scan_id self.env_dict = {} @@ -138,6 +134,7 @@ def __init__( request_cluster=cluster, debug_mode=debug_mode, max_worker_runtime=max_worker_runtime, + priority=priority, ), cpu=0.125, volumes={common_space_volume_path.name: common_space_volume_path}, @@ -193,6 +190,7 @@ def get_tms_starter_args( request_cluster: schema.Cluster, debug_mode: list[DebugMode], max_worker_runtime: int, + priority: int, ) -> list[str]: """Make the starter container args. @@ -230,6 +228,7 @@ def get_tms_starter_args( f" --client-startup-json {common_space_volume_path/'startup.json'} " # f" --client-args {client_args} " # only potentially relevant arg is --debug-directory f" --max-worker-runtime {max_worker_runtime}" + f" --priority {priority}" ) if DebugMode.CLIENT_LOGS in debug_mode: @@ -312,12 +311,12 @@ def make_skyscan_server_v1envvars( # 4. generate & add auth tokens tokens = { - "SKYSCAN_BROKER_AUTH": SkymapScannerJob._get_token_from_keycloak( + "SKYSCAN_BROKER_AUTH": SkymapScannerK8sWrapper._get_token_from_keycloak( ENV.KEYCLOAK_OIDC_URL, ENV.KEYCLOAK_CLIENT_ID_BROKER, ENV.KEYCLOAK_CLIENT_SECRET_BROKER, ), - "SKYSCAN_SKYDRIVER_AUTH": SkymapScannerJob._get_token_from_keycloak( + "SKYSCAN_SKYDRIVER_AUTH": SkymapScannerK8sWrapper._get_token_from_keycloak( ENV.KEYCLOAK_OIDC_URL, ENV.KEYCLOAK_CLIENT_ID_SKYDRIVER_REST, ENV.KEYCLOAK_CLIENT_SECRET_SKYDRIVER_REST, @@ -400,12 +399,12 @@ def make_tms_starter_v1envvars( # 4. generate & add auth tokens tokens = { - "SKYSCAN_BROKER_AUTH": SkymapScannerJob._get_token_from_keycloak( + "SKYSCAN_BROKER_AUTH": SkymapScannerK8sWrapper._get_token_from_keycloak( ENV.KEYCLOAK_OIDC_URL, ENV.KEYCLOAK_CLIENT_ID_BROKER, ENV.KEYCLOAK_CLIENT_SECRET_BROKER, ), - "SKYSCAN_SKYDRIVER_AUTH": SkymapScannerJob._get_token_from_keycloak( + "SKYSCAN_SKYDRIVER_AUTH": SkymapScannerK8sWrapper._get_token_from_keycloak( ENV.KEYCLOAK_OIDC_URL, ENV.KEYCLOAK_CLIENT_ID_SKYDRIVER_REST, ENV.KEYCLOAK_CLIENT_SECRET_SKYDRIVER_REST, @@ -420,13 +419,8 @@ def make_tms_starter_v1envvars( return env - async def enqueue_job(self) -> Any: - """Enqueue the k8s job onto the Scan Backlog.""" - LOGGER.info(f"enqueuing k8s job for {self.scan_id=}") - await scan_backlog.enqueue(self.scan_id, self.job_obj, self.scan_backlog) - -class SkymapScannerWorkerStopper: +class SkymapScannerWorkerStopperK8sWrapper: """Wraps K8s logic to stop workers of a Skymap Scanner instance.""" def __init__( @@ -492,7 +486,7 @@ def go(self) -> Any: # f"requesting removal of Skymap Scanner Job (server & tms starters) -- {self.scan_id=}..." # ) # resp = self.k8s_batch_api.delete_namespaced_job( - # name=SkymapScannerJob.get_job_name(self.scan_id), + # name=SkymapScannerK8sWrapper.get_job_name(self.scan_id), # namespace=ENV.K8S_NAMESPACE, # body=kubernetes.client.V1DeleteOptions( # propagation_policy="Foreground", grace_period_seconds=5 diff --git a/skydriver/rest_handlers.py b/skydriver/rest_handlers.py index 2e9d49a9c..c40e78892 100644 --- a/skydriver/rest_handlers.py +++ b/skydriver/rest_handlers.py @@ -1,6 +1,5 @@ """Handlers for the SkyDriver REST API server interface.""" - import asyncio import dataclasses as dc import json @@ -23,6 +22,7 @@ ENV, KNOWN_CLUSTERS, LOGGER, + SCAN_MIN_PRIORITY_TO_START_NOW, DebugMode, is_testing, ) @@ -426,6 +426,11 @@ async def post(self) -> None: type=_classifiers_validator, default={}, ) + priority = self.get_argument( + "priority", + type=int, + default=0, + ) # response args manifest_projection = self.get_argument( @@ -441,10 +446,7 @@ async def post(self) -> None: scan_id = uuid.uuid4().hex # get the container info ready - k8s_job = k8s.scanner_instance.SkymapScannerJob( - k8s_batch_api=self.k8s_batch_api, - scan_backlog=self.scan_backlog, - # + scanner_wrapper = k8s.scanner_instance.SkymapScannerK8sWrapper( docker_tag=docker_tag, scan_id=scan_id, # server @@ -459,6 +461,7 @@ async def post(self) -> None: worker_disk_bytes=worker_disk_bytes, max_pixel_reco_time=max_pixel_reco_time, max_worker_runtime=max_worker_runtime, + priority=priority, # universal debug_mode=debug_mode, # env @@ -470,21 +473,46 @@ async def post(self) -> None: manifest = await self.manifests.post( event_i3live_json_dict, scan_id, - k8s_job.scanner_server_args, - k8s_job.tms_args_list, - from_dict(database.schema.EnvVars, k8s_job.env_dict), + scanner_wrapper.scanner_server_args, + scanner_wrapper.tms_args_list, + from_dict(database.schema.EnvVars, scanner_wrapper.env_dict), classifiers, + priority, ) - # enqueue skymap scanner instance to be started in-time - try: - await k8s_job.enqueue_job() - except Exception as e: - LOGGER.exception(e) - raise web.HTTPError( - 500, - log_message="Failed to enqueue Kubernetes job for Scanner instance", - ) + enqueue = True + + # start now? + if priority >= SCAN_MIN_PRIORITY_TO_START_NOW: + try: + resp = k8s.utils.KubeAPITools.start_job( + self.k8s_batch_api, + scanner_wrapper.job_obj, + ) + LOGGER.info(resp) + except kubernetes.client.exceptions.ApiException as e: + # job (entry) will be enqueued and tried again per priority + LOGGER.exception(e) + else: + enqueue = False + + # start later? + if enqueue: + # enqueue skymap scanner instance to be started in-time + try: + LOGGER.info(f"enqueuing k8s job for {scan_id=}") + await k8s.scan_backlog.enqueue( + scan_id, + scanner_wrapper.job_obj, + self.scan_backlog, + manifest.priority, + ) + except Exception as e: + LOGGER.exception(e) + raise web.HTTPError( + 400, + log_message="Failed to enqueue Kubernetes job for Scanner instance", + ) self.write( dict_projection(dc.asdict(manifest), manifest_projection), @@ -504,18 +532,18 @@ async def stop_scanner_instance( if manifest.ewms_task.complete: # workforce is done return manifest - stopper = k8s.scanner_instance.SkymapScannerWorkerStopper( + stopper_wrapper = k8s.scanner_instance.SkymapScannerWorkerStopperK8sWrapper( k8s_batch_api, scan_id, manifest.ewms_task.clusters, ) try: - stopper.go() + stopper_wrapper.go() except kubernetes.client.exceptions.ApiException as e: LOGGER.exception(e) raise web.HTTPError( - 500, + 400, log_message="Failed to stop Scanner instance", ) @@ -843,7 +871,7 @@ async def get(self, scan_id: str) -> None: try: pods_411["pod_status"] = k8s.utils.KubeAPITools.get_pod_status( self.k8s_batch_api, - k8s.scanner_instance.SkymapScannerJob.get_job_name(scan_id), + k8s.scanner_instance.SkymapScannerK8sWrapper.get_job_name(scan_id), ENV.K8S_NAMESPACE, ) pods_411["pod_message"] = "retrieved" @@ -887,7 +915,7 @@ async def get(self, scan_id: str) -> None: try: pod_container_logs = k8s.utils.KubeAPITools.get_container_logs( self.k8s_batch_api, - k8s.scanner_instance.SkymapScannerJob.get_job_name(scan_id), + k8s.scanner_instance.SkymapScannerK8sWrapper.get_job_name(scan_id), ENV.K8S_NAMESPACE, ) pod_container_logs_message = "retrieved" diff --git a/tests/integration/test_backlog_runner.py b/tests/integration/test_backlog_runner.py index 3e96ffa3b..a422c78ac 100644 --- a/tests/integration/test_backlog_runner.py +++ b/tests/integration/test_backlog_runner.py @@ -74,8 +74,10 @@ async def test_01(kapitsj_mock: Mock, server: Callable[[], RestClient]) -> None: print_it(await rc.request("GET", "/scans/backlog")) -# mock skydriver.k8s.scanner_instance.SkymapScannerWorkerStopper.go b/c it calls start_job -@mock.patch("skydriver.k8s.scanner_instance.SkymapScannerWorkerStopper.go", new=Mock()) +# mock skydriver.k8s.scanner_instance.SkymapScannerWorkerStopperK8sWrapper.go b/c it calls start_job +@mock.patch( + "skydriver.k8s.scanner_instance.SkymapScannerWorkerStopperK8sWrapper.go", new=Mock() +) @mock.patch("skydriver.k8s.utils.KubeAPITools.start_job") async def test_10( kapitsj_mock: Mock, diff --git a/tests/integration/test_rest_routes.py b/tests/integration/test_rest_routes.py index 7276b435d..fe9cac503 100644 --- a/tests/integration/test_rest_routes.py +++ b/tests/integration/test_rest_routes.py @@ -107,6 +107,7 @@ async def _launch_scan( ), classifiers=post_scan_body["classifiers"], last_updated=resp["last_updated"], # see below + priority=0, # TODO: check more fields in future (hint: ctrl+F this comment) ) assert RE_UUID4HEX.fullmatch(resp["scan_id"]) @@ -299,6 +300,7 @@ async def _do_patch( ), classifiers=CLASSIFIERS, last_updated=resp["last_updated"], # see below + priority=0, # TODO: check more fields in future (hint: ctrl+F this comment) ) assert 0.0 < resp["timestamp"] < now < resp["last_updated"] < time.time() @@ -660,6 +662,7 @@ def get_tms_args( f" --image {image} " f" --client-startup-json /common-space/startup.json " f" --max-worker-runtime {4 * 60 * 60} " + f" --priority 0 " f" --spool " ]