Skip to content

Commit

Permalink
Add Priorities for Scans [major] (#107)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <[email protected]>
  • Loading branch information
ric-evans and github-actions authored Dec 15, 2023
1 parent a9ff273 commit 14308e7
Show file tree
Hide file tree
Showing 15 changed files with 129 additions and 72 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ _Launch a new scan of an event_
| `"worker_memory"` | str | default: `8G` | how much memory per client worker to request
| `"worker_disk"` | str | default: `1G` | how much disk per client worker to request
| `"debug_mode"` | str or list | default: None | what debug mode(s) to use: `"client-logs"` collects the scanner clients' stderr/stdout including icetray logs (scans are limited in # of workers)
| `"predictive_scanning_threshold"` | float | default: `1.0` | the predictive scanning threshold [0.1, 1.0] (see [Skymap Scanner](https://github.com/icecube/skymap_scanner))
| `"predictive_scanning_threshold"` | float | default: `1.0` | the predictive scanning threshold `[0.1, 1.0]` (see [Skymap Scanner](https://github.com/icecube/skymap_scanner))
| `"priority"` | int | default: `0` | the relative priority of this scan -- higher values indicate higher priority. **NOTE: Values `>= 10` are reserved for Realtime alert scans (these scan requests are not throttled).** Also, see [HTCondor jobs](https://htcondor.readthedocs.io/en/latest/users-manual/priorities-and-preemption.html#job-priority)
| `"classifiers"` | <code>dict[str, str &#124; bool &#124; float &#124; int]</code> | default: `{}` | a user-defined collection of labels, attributes, etc. -- this is constrained in size and is intended for user-defined metadata only
| `"manifest_projection"` | list | default: all fields but [these](#manifest-fields-excluded-by-default-in-response) | which `Manifest` fields to include in the response (include `*` to include all fields)

Expand Down Expand Up @@ -347,6 +348,8 @@ Pseudo-code:
event_i3live_json_dict: dict,
scanner_server_args: str,
priority: int,
classifiers: dict[str, str | bool | float | int]
event_i3live_json_dict__hash: str, # a deterministic hash of the event json
Expand Down
5 changes: 5 additions & 0 deletions clientmanager/clientmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ def wait_for_file(waitee: Path, wait_time: int) -> Path:
type=int,
help="how long each worker is allowed to run -- condor only", # TODO - set for k8s?
)
sub_parser.add_argument(
"--priority",
required=True,
help="relative priority of this job/jobs -- condor only", # TODO - set for k8s?
)

# client args
sub_parser.add_argument(
Expand Down
1 change: 1 addition & 0 deletions clientmanager/condor/act.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def _act(args: argparse.Namespace, schedd_obj: htcondor.Schedd) -> None:
worker_disk_bytes=args.worker_disk_bytes,
n_cores=args.n_cores,
max_worker_runtime=args.max_worker_runtime,
priority=args.priority,
# starter CL args -- client
client_args=args.client_args,
client_startup_json_s3=utils.s3ify(args.client_startup_json),
Expand Down
4 changes: 4 additions & 0 deletions clientmanager/condor/starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def make_condor_job_description(
worker_disk_bytes: int,
n_cores: int,
max_worker_runtime: int,
priority: int,
# skymap scanner args
image: str,
client_startup_json_s3: S3File,
Expand Down Expand Up @@ -86,6 +87,7 @@ def make_condor_job_description(
"request_disk": humanfriendly.format_size( # 1073741824 -> "1 GiB" -> "1 GB"
worker_disk_bytes, binary=True
).replace("i", ""),
"priority": int(priority),
"+WantIOProxy": "true", # for HTChirp
"+OriginalTime": max_worker_runtime, # Execution time limit -- 1 hour default on OSG
}
Expand Down Expand Up @@ -129,6 +131,7 @@ def prep(
worker_disk_bytes: int,
n_cores: int,
max_worker_runtime: int,
priority: int,
# starter CL args -- client
client_args: list[tuple[str, str]],
client_startup_json_s3: S3File,
Expand Down Expand Up @@ -156,6 +159,7 @@ def prep(
worker_disk_bytes,
n_cores,
max_worker_runtime,
priority,
# skymap scanner args
image,
client_startup_json_s3,
Expand Down
44 changes: 22 additions & 22 deletions dependencies-from-Dockerfile.log
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
# pip freeze
########################################################################
backoff==2.2.1
boto3==1.33.9
botocore==1.33.9
boto3==1.34.1
botocore==1.34.1
cachetools==5.3.2
certifi==2023.11.17
cffi==1.16.0
Expand All @@ -18,9 +18,9 @@ cryptography==41.0.7
dacite==1.8.1
Deprecated==1.2.14
dnspython==2.4.2
google-auth==2.25.1
google-auth==2.25.2
googleapis-common-protos==1.59.1
grpcio==1.59.3
grpcio==1.60.0
htcondor==23.2.0
humanfriendly==10.0
idna==3.6
Expand Down Expand Up @@ -52,12 +52,12 @@ requests==2.31.0
requests-futures==1.0.1
requests-oauthlib==1.3.1
rsa==4.9
s3transfer==0.8.2
s3transfer==0.9.0
six==1.16.0
thrift==0.16.0
tornado==6.4
typeguard==4.1.5
typing_extensions==4.8.0
typing_extensions==4.9.0
urllib3==1.26.18
websocket-client==1.7.0
wipac-dev-tools==1.8.2
Expand All @@ -75,15 +75,15 @@ pip==23.2.1
pipdeptree==2.13.1
setuptools==65.5.1
skydriver-clientmanager
├── boto3 [required: Any, installed: 1.33.9]
│ ├── botocore [required: >=1.33.9,<1.34.0, installed: 1.33.9]
├── boto3 [required: Any, installed: 1.34.1]
│ ├── botocore [required: >=1.34.1,<1.35.0, installed: 1.34.1]
│ │ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1]
│ │ ├── python-dateutil [required: >=2.1,<3.0.0, installed: 2.8.2]
│ │ │ └── six [required: >=1.5, installed: 1.16.0]
│ │ └── urllib3 [required: >=1.25.4,<2.1, installed: 1.26.18]
│ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1]
│ └── s3transfer [required: >=0.8.2,<0.9.0, installed: 0.8.2]
│ └── botocore [required: >=1.33.2,<2.0a.0, installed: 1.33.9]
│ └── s3transfer [required: >=0.9.0,<0.10.0, installed: 0.9.0]
│ └── botocore [required: >=1.33.2,<2.0a.0, installed: 1.34.1]
│ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1]
│ ├── python-dateutil [required: >=2.1,<3.0.0, installed: 2.8.2]
│ │ └── six [required: >=1.5, installed: 1.16.0]
Expand All @@ -95,7 +95,7 @@ skydriver-clientmanager
├── humanfriendly [required: Any, installed: 10.0]
├── kubernetes [required: Any, installed: 28.1.0]
│ ├── certifi [required: >=14.05.14, installed: 2023.11.17]
│ ├── google-auth [required: >=1.0.1, installed: 2.25.1]
│ ├── google-auth [required: >=1.0.1, installed: 2.25.2]
│ │ ├── cachetools [required: >=2.0.0,<6.0, installed: 5.3.2]
│ │ ├── pyasn1-modules [required: >=0.2.1, installed: 0.3.0]
│ │ │ └── pyasn1 [required: >=0.4.6,<0.6.0, installed: 0.5.1]
Expand Down Expand Up @@ -132,20 +132,20 @@ skydriver-clientmanager
│ └── urllib3 [required: >=1.21.1,<3, installed: 1.26.18]
├── tornado [required: Any, installed: 6.4]
├── typeguard [required: Any, installed: 4.1.5]
│ └── typing-extensions [required: >=4.7.0, installed: 4.8.0]
│ └── typing-extensions [required: >=4.7.0, installed: 4.9.0]
├── wipac-dev-tools [required: Any, installed: 1.8.2]
│ ├── requests [required: Any, installed: 2.31.0]
│ │ ├── certifi [required: >=2017.4.17, installed: 2023.11.17]
│ │ ├── charset-normalizer [required: >=2,<4, installed: 3.3.2]
│ │ ├── idna [required: >=2.5,<4, installed: 3.6]
│ │ └── urllib3 [required: >=1.21.1,<3, installed: 1.26.18]
│ └── typing-extensions [required: Any, installed: 4.8.0]
│ └── typing-extensions [required: Any, installed: 4.9.0]
└── wipac-rest-tools [required: Any, installed: 1.5.2]
├── cachetools [required: Any, installed: 5.3.2]
├── PyJWT [required: !=2.6.0, installed: 2.8.0]
├── qrcode [required: Any, installed: 7.4.2]
│ ├── pypng [required: Any, installed: 0.20220715.0]
│ └── typing-extensions [required: Any, installed: 4.8.0]
│ └── typing-extensions [required: Any, installed: 4.9.0]
├── requests [required: Any, installed: 2.31.0]
│ ├── certifi [required: >=2017.4.17, installed: 2023.11.17]
│ ├── charset-normalizer [required: >=2,<4, installed: 3.3.2]
Expand All @@ -164,7 +164,7 @@ skydriver-clientmanager
│ ├── charset-normalizer [required: >=2,<4, installed: 3.3.2]
│ ├── idna [required: >=2.5,<4, installed: 3.6]
│ └── urllib3 [required: >=1.21.1,<3, installed: 1.26.18]
└── typing-extensions [required: Any, installed: 4.8.0]
└── typing-extensions [required: Any, installed: 4.9.0]
wheel==0.42.0
wipac-telemetry==0.3.0
├── coloredlogs [required: Any, installed: 15.0.1]
Expand All @@ -178,7 +178,7 @@ wipac-telemetry==0.3.0
│ ├── opentelemetry-exporter-jaeger-proto-grpc [required: ==1.21.0, installed: 1.21.0]
│ │ ├── googleapis-common-protos [required: ~=1.52,<1.60.0, installed: 1.59.1]
│ │ │ └── protobuf [required: >=3.19.5,<5.0.0.dev0,!=4.21.5,!=4.21.4,!=4.21.3,!=4.21.2,!=4.21.1,!=3.20.1,!=3.20.0, installed: 4.25.1]
│ │ ├── grpcio [required: >=1.0.0,<2.0.0, installed: 1.59.3]
│ │ ├── grpcio [required: >=1.0.0,<2.0.0, installed: 1.60.0]
│ │ ├── opentelemetry-api [required: ~=1.3, installed: 1.21.0]
│ │ │ ├── Deprecated [required: >=1.2.6, installed: 1.2.14]
│ │ │ │ └── wrapt [required: >=1.10,<2, installed: 1.16.0]
Expand All @@ -191,7 +191,7 @@ wipac-telemetry==0.3.0
│ │ │ └── importlib-metadata [required: >=6.0,<7.0, installed: 6.11.0]
│ │ │ └── zipp [required: >=0.5, installed: 3.17.0]
│ │ ├── opentelemetry-semantic-conventions [required: ==0.42b0, installed: 0.42b0]
│ │ └── typing-extensions [required: >=3.7.4, installed: 4.8.0]
│ │ └── typing-extensions [required: >=3.7.4, installed: 4.9.0]
│ └── opentelemetry-exporter-jaeger-thrift [required: ==1.21.0, installed: 1.21.0]
│ ├── opentelemetry-api [required: ~=1.3, installed: 1.21.0]
│ │ ├── Deprecated [required: >=1.2.6, installed: 1.2.14]
Expand All @@ -205,7 +205,7 @@ wipac-telemetry==0.3.0
│ │ │ └── importlib-metadata [required: >=6.0,<7.0, installed: 6.11.0]
│ │ │ └── zipp [required: >=0.5, installed: 3.17.0]
│ │ ├── opentelemetry-semantic-conventions [required: ==0.42b0, installed: 0.42b0]
│ │ └── typing-extensions [required: >=3.7.4, installed: 4.8.0]
│ │ └── typing-extensions [required: >=3.7.4, installed: 4.9.0]
│ └── thrift [required: >=0.10.0, installed: 0.16.0]
│ └── six [required: >=1.7.2, installed: 1.16.0]
├── opentelemetry-exporter-otlp-proto-http [required: Any, installed: 1.21.0]
Expand All @@ -232,7 +232,7 @@ wipac-telemetry==0.3.0
│ │ │ └── importlib-metadata [required: >=6.0,<7.0, installed: 6.11.0]
│ │ │ └── zipp [required: >=0.5, installed: 3.17.0]
│ │ ├── opentelemetry-semantic-conventions [required: ==0.42b0, installed: 0.42b0]
│ │ └── typing-extensions [required: >=3.7.4, installed: 4.8.0]
│ │ └── typing-extensions [required: >=3.7.4, installed: 4.9.0]
│ └── requests [required: ~=2.7, installed: 2.31.0]
│ ├── certifi [required: >=2017.4.17, installed: 2023.11.17]
│ ├── charset-normalizer [required: >=2,<4, installed: 3.3.2]
Expand All @@ -245,13 +245,13 @@ wipac-telemetry==0.3.0
│ │ └── importlib-metadata [required: >=6.0,<7.0, installed: 6.11.0]
│ │ └── zipp [required: >=0.5, installed: 3.17.0]
│ ├── opentelemetry-semantic-conventions [required: ==0.42b0, installed: 0.42b0]
│ └── typing-extensions [required: >=3.7.4, installed: 4.8.0]
│ └── typing-extensions [required: >=3.7.4, installed: 4.9.0]
├── protobuf [required: Any, installed: 4.25.1]
├── typing-extensions [required: Any, installed: 4.8.0]
├── typing-extensions [required: Any, installed: 4.9.0]
└── wipac-dev-tools [required: Any, installed: 1.8.2]
├── requests [required: Any, installed: 2.31.0]
│ ├── certifi [required: >=2017.4.17, installed: 2023.11.17]
│ ├── charset-normalizer [required: >=2,<4, installed: 3.3.2]
│ ├── idna [required: >=2.5,<4, installed: 3.6]
│ └── urllib3 [required: >=1.21.1,<3, installed: 1.26.18]
└── typing-extensions [required: Any, installed: 4.8.0]
└── typing-extensions [required: Any, installed: 4.9.0]
2 changes: 2 additions & 0 deletions skydriver/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
TMS_STOPPER_K8S_TTL_SECONDS_AFTER_FINISHED = 1 * 60 * 60
TMS_STOPPER_K8S_JOB_N_RETRIES = 6

SCAN_MIN_PRIORITY_TO_START_NOW = 10


@enum.unique
class DebugMode(enum.Enum):
Expand Down
9 changes: 7 additions & 2 deletions skydriver/database/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Any, AsyncIterator

from motor.motor_asyncio import AsyncIOMotorClient
from pymongo import ASCENDING, ReturnDocument
from pymongo import ASCENDING, DESCENDING, ReturnDocument
from tornado import web

from ..config import ENV
Expand Down Expand Up @@ -62,6 +62,7 @@ async def post(
tms_args_list: list[str],
env_vars: schema.EnvVars,
classifiers: dict[str, str | bool | float | int],
priority: int,
) -> schema.Manifest:
"""Create `schema.Manifest` doc."""
LOGGER.debug("creating new manifest")
Expand All @@ -78,6 +79,7 @@ async def post(
env_vars=env_vars,
),
classifiers=classifiers,
priority=priority,
)

# db
Expand Down Expand Up @@ -339,7 +341,10 @@ async def fetch_next_as_pending(self) -> schema.ScanBacklogEntry:
"$set": {"pending_timestamp": time.time()},
"$inc": {"next_attempt": 1},
},
sort=[("timestamp", ASCENDING)],
sort=[
("priority", DESCENDING), # highest first
("timestamp", ASCENDING), # then, oldest
],
return_document=ReturnDocument.AFTER,
return_dclass=schema.ScanBacklogEntry,
)
Expand Down
5 changes: 4 additions & 1 deletion skydriver/database/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ScanBacklogEntry(ScanIDDataclass):

timestamp: float
pickled_k8s_job: bytes
priority: int = 0
pending_timestamp: float = 0.0
next_attempt: int = 0

Expand Down Expand Up @@ -249,7 +250,9 @@ class Manifest(ScanIDDataclass):
ewms_task: EWMSTaskDirective

# args placed in k8s job obj
scanner_server_args: str # TODO - move to TMS???
scanner_server_args: str

priority: int = 0 # same as https://htcondor.readthedocs.io/en/latest/users-manual/priorities-and-preemption.html#job-priority

# open to requestor
classifiers: dict[str, str | bool | float | int] = dc.field(default_factory=dict)
Expand Down
5 changes: 5 additions & 0 deletions skydriver/database/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ async def ensure_indexes(motor_client: AsyncIOMotorClient) -> None: # type: ign
name="timestamp_index",
unique=False,
)
await motor_client[_DB_NAME][_SCAN_BACKLOG_COLL_NAME].create_index( # type: ignore[index]
[("priority", DESCENDING)],
name="priority_index",
unique=False,
)
await motor_client[_DB_NAME][_SCAN_BACKLOG_COLL_NAME].create_index( # type: ignore[index]
"scan_id",
name="scan_id_index",
Expand Down
6 changes: 3 additions & 3 deletions skydriver/k8s/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import logging

import kubernetes.client # type: ignore[import]
import kubernetes.client # type: ignore[import-untyped]
from kubernetes import config
from kubernetes.client.rest import ApiException # type: ignore[import]
from kubernetes.client.rest import ApiException # type: ignore[import-untyped]

from . import scanner_instance, utils # noqa: F401 # export
from . import scan_backlog, scanner_instance, utils # noqa: F401 # export

LOGGER = logging.getLogger(__name__)

Expand Down
4 changes: 3 additions & 1 deletion skydriver/k8s/scan_backlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time

import bson
import kubernetes.client # type: ignore[import]
import kubernetes.client # type: ignore[import-untyped]
from motor.motor_asyncio import AsyncIOMotorClient

from .. import database
Expand All @@ -21,12 +21,14 @@ async def enqueue(
scan_id: str,
job_obj: kubernetes.client.V1Job,
scan_backlog: database.interface.ScanBacklogClient,
priority: int,
) -> None:
"""Enqueue k8s job to be started by job-starter thread."""
entry = database.schema.ScanBacklogEntry(
scan_id=scan_id,
timestamp=time.time(),
pickled_k8s_job=bson.Binary(pickle.dumps(job_obj)),
priority=priority,
)
await scan_backlog.insert(entry)

Expand Down
Loading

0 comments on commit 14308e7

Please sign in to comment.