Skip to content

Commit

Permalink
EWMS/TMS Prep: Schema (#103)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <[email protected]>
  • Loading branch information
ric-evans and github-actions authored Dec 6, 2023
1 parent 0eb016d commit bb22c99
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 229 deletions.
78 changes: 39 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,50 +295,54 @@ Pseudo-code:
event_i3live_json_dict: dict,
scanner_server_args: str,
tms_args: list[str],
env_vars: dict[str, dict[str, Any]],
classifiers: dict[str, str | bool | float | int]
event_i3live_json_dict__hash: str, # a deterministic hash of the event json
clusters: [ # 2 types: condor & k8s -- different 'location' sub-fields
{
orchestrator: 'condor',
location: {
collector: str,
schedd: str,
},
cluster_id: int,
n_workers: int,
starter_info: dict,
statuses: {
'Completed': { # condor job status
'FatalError': int, # pilot status value -> # of jobs
'Done': int, # pilot status value -> # of jobs
...
ewms_task: {
tms_args: list[str],
env_vars: dict[str, dict[str, Any]],
clusters: [ # 2 types: condor & k8s -- different 'location' sub-fields
{
orchestrator: 'condor',
location: {
collector: str,
schedd: str,
},
'Running': {
'Tasking': int,
cluster_id: int,
n_workers: int,
starter_info: dict,
statuses: {
'Completed': { # condor job status
'FatalError': int, # pilot status value -> # of jobs
'Done': int, # pilot status value -> # of jobs
...
},
'Running': {
'Tasking': int,
...
}
...
}
...
},
top_task_errors: dict[str, int], # error message -> # of jobs
},
top_task_errors: dict[str, int], # error message -> # of jobs
},
...
{
orchestrator: 'k8s',
location: {
host: str,
namespace: str,
...
{
orchestrator: 'k8s',
location: {
host: str,
namespace: str,
},
cluster_id: int,
n_workers: int,
starter_info: dict,
},
cluster_id: int,
n_workers: int,
starter_info: dict,
},
...
],
...
],
# signifies scanner is done (server and worker cluster(s))
complete: bool,
},
# found/created during first few seconds of scanning
event_metadata: {
Expand Down Expand Up @@ -367,9 +371,6 @@ Pseudo-code:
last_updated: str,
},
# signifies scanner is done (server and worker cluster(s))
complete: bool,
# timestamp of any update to manifest -- also see `progress.last_updated`
last_updated: float,
}
Expand All @@ -379,7 +380,6 @@ Pseudo-code:
##### Manifest Fields Excluded by Default in Response
Some routes/methods respond with the scan's manifest. This is a large dictionary, so by default, all but [GET @ `/scan/SCAN_ID/manifest`](#scanscan_idmanifest---get) exclude these fields:
- `event_i3live_json_dict`
- `env_vars`

See https://github.com/search?q=repo%3AWIPACrepo%2FSkyDriver+DEFAULT_EXCLUDED_MANIFEST_FIELDS&type=code

Expand Down
14 changes: 7 additions & 7 deletions dependencies-from-Dockerfile.log
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
# pip freeze
########################################################################
backoff==2.2.1
boto3==1.33.6
botocore==1.33.6
boto3==1.33.8
botocore==1.33.8
cachetools==5.3.2
certifi==2023.11.17
cffi==1.16.0
Expand All @@ -18,7 +18,7 @@ cryptography==41.0.7
dacite==1.8.1
Deprecated==1.2.14
dnspython==2.4.2
google-auth==2.24.0
google-auth==2.25.1
googleapis-common-protos==1.59.1
grpcio==1.59.3
htcondor==23.2.0
Expand Down Expand Up @@ -75,15 +75,15 @@ pip==23.2.1
pipdeptree==2.13.1
setuptools==65.5.1
skydriver-clientmanager
├── boto3 [required: Any, installed: 1.33.6]
│ ├── botocore [required: >=1.33.6,<1.34.0, installed: 1.33.6]
├── boto3 [required: Any, installed: 1.33.8]
│ ├── botocore [required: >=1.33.8,<1.34.0, installed: 1.33.8]
│ │ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1]
│ │ ├── python-dateutil [required: >=2.1,<3.0.0, installed: 2.8.2]
│ │ │ └── six [required: >=1.5, installed: 1.16.0]
│ │ └── urllib3 [required: >=1.25.4,<2.1, installed: 1.26.18]
│ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1]
│ └── s3transfer [required: >=0.8.2,<0.9.0, installed: 0.8.2]
│ └── botocore [required: >=1.33.2,<2.0a.0, installed: 1.33.6]
│ └── botocore [required: >=1.33.2,<2.0a.0, installed: 1.33.8]
│ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1]
│ ├── python-dateutil [required: >=2.1,<3.0.0, installed: 2.8.2]
│ │ └── six [required: >=1.5, installed: 1.16.0]
Expand All @@ -95,7 +95,7 @@ skydriver-clientmanager
├── humanfriendly [required: Any, installed: 10.0]
├── kubernetes [required: Any, installed: 28.1.0]
│ ├── certifi [required: >=14.05.14, installed: 2023.11.17]
│ ├── google-auth [required: >=1.0.1, installed: 2.24.0]
│ ├── google-auth [required: >=1.0.1, installed: 2.25.1]
│ │ ├── cachetools [required: >=2.0.0,<6.0, installed: 5.3.2]
│ │ ├── pyasn1-modules [required: >=0.2.1, installed: 0.3.0]
│ │ │ └── pyasn1 [required: >=0.4.6,<0.6.0, installed: 0.5.1]
Expand Down
44 changes: 43 additions & 1 deletion skydriver/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,35 @@
from .config import ENV, LOGGER, config_logging


async def database_schema_migration(motor_client) -> None: # type: ignore[no-untyped-def]
from dacite import from_dict
from motor.motor_asyncio import AsyncIOMotorCollection

collection = AsyncIOMotorCollection(
motor_client[database.utils._DB_NAME],
database.utils._MANIFEST_COLL_NAME,
)

all_documents = [
d async for d in collection.find({"ewms_task": {"$exists": False}})
]
m = 0
for i, doc in enumerate(all_documents):
m += 1
LOGGER.info(f"migrating {doc}...")
doc["ewms_task"] = {
"tms_args": doc.pop("tms_args"),
"env_vars": doc.pop("env_vars"),
"clusters": doc.pop("clusters"),
"complete": doc.pop("complete"),
}
_id = doc.pop("_id")
from_dict(database.schema.Manifest, doc) # validate
await collection.find_one_and_replace({"_id": _id}, doc)
LOGGER.info(f"migrated {doc}")
LOGGER.info(f"total migrated: {m} (looked at {i+1} docs)")


async def main() -> None:
"""Establish connections and start components."""

Expand All @@ -16,14 +45,27 @@ async def main() -> None:
await asyncio.sleep(0) # start up previous task
LOGGER.info("Mongo client connected.")

try:
await database_schema_migration(mongo_client)
except Exception as e: # noqa: E722
# if this triggers, Ric is working on a fix :-)
LOGGER.exception(e)
LOGGER.error("failed to migrate database data...")

import time

time.sleep(60 * 60 * 24) # one day

# K8s client
LOGGER.info("Setting up k8s client...")
k8s_batch_api = k8s.setup_k8s_batch_api()
LOGGER.info("K8s client connected.")

# Scan Backlog Runner
LOGGER.info("Starting scan backlog runner...")
backlog_task = asyncio.create_task(k8s.scan_backlog.startup(mongo_client, k8s_batch_api))
backlog_task = asyncio.create_task(
k8s.scan_backlog.startup(mongo_client, k8s_batch_api)
)
await asyncio.sleep(0) # start up previous task

# REST Server
Expand Down
53 changes: 32 additions & 21 deletions skydriver/database/interface.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Database interface for persisted scan data."""

import copy
import dataclasses as dc
import logging
import time
Expand Down Expand Up @@ -72,8 +73,10 @@ async def post(
is_deleted=False,
event_i3live_json_dict=event_i3live_json_dict,
scanner_server_args=scanner_server_args,
tms_args=tms_args_list,
env_vars=env_vars,
ewms_task=schema.EWMSTaskDirective(
tms_args=tms_args_list,
env_vars=env_vars,
),
classifiers=classifiers,
)

Expand Down Expand Up @@ -146,25 +149,33 @@ async def patch(
reason=msg,
)

# cluster / clusters
# TODO - when TMS is up and running, it will handle cluster updating--remove then
# NOTE - there is a race condition inherent with list attributes, don't do this in TMS
if not cluster:
pass # don't put in DB
else:
try: # find by uuid -> replace
idx = next(
i for i, c in enumerate(in_db.clusters) if cluster.uuid == c.uuid
)
upserting["clusters"] = (
in_db.clusters[:idx] + [cluster] + in_db.clusters[idx + 1 :]
)
except StopIteration: # not found -> append
upserting["clusters"] = in_db.clusters + [cluster]

# complete # workforce is done
if complete is not None:
upserting["complete"] = complete # workforce is done
# tms
if cluster or complete is not None:
upserting["ewms_task"] = copy.deepcopy(in_db.ewms_task)
# cluster / clusters
# TODO - when TMS is up and running, it will handle cluster updating--remove then
# NOTE - there is a race condition inherent with list attributes, don't do this in TMS
if not cluster:
pass # don't put in DB
else:
try: # find by uuid -> replace
idx = next(
i
for i, c in enumerate(in_db.ewms_task.clusters)
if cluster.uuid == c.uuid
)
upserting["ewms_task"].clusters = (
in_db.ewms_task.clusters[:idx]
+ [cluster]
+ in_db.ewms_task.clusters[idx + 1 :]
)
except StopIteration: # not found -> append
upserting["ewms_task"].clusters = in_db.ewms_task.clusters + [
cluster
]
# complete # workforce is done
if complete is not None:
upserting["ewms_task"].complete = complete # workforce is done

# progress
if progress:
Expand Down
Loading

0 comments on commit bb22c99

Please sign in to comment.