Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructure grabber.py #19

Merged
merged 4 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 164 additions & 110 deletions grabber.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,44 @@
import urllib3
from urllib.parse import urljoin, urlencode
import logging
import traceback
import dqmsquare_cfg
from custom_logger import custom_formatter, set_log_handler
from db import DQM2MirrorDB
import threading
from collections.abc import Callable

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

logger = logging.getLogger(__name__)
cfg = dqmsquare_cfg.load_cfg()


def _request_wrapper(
method: Callable, url: str, *args, **kwargs
) -> requests.Response | None:
"""
Simple wrapper to be used on request methods to handle common exceptions.
On exception, None is returned, else the response is returned.
"""
try:
response = method(url, *args, **kwargs)
except urllib3.exceptions.ReadTimeoutError:
logger.warning(
f"Timeout when running {method.__name__} on {url}", exc_info=True
)
return None
except requests.exceptions.ConnectionError:
logger.warning(f"Could not connect to {url}", exc_info=True)
return None
except Exception:
logger.warning(
f"Unhandled exception when running {method.__name__} on {url}",
exc_info=True,
)
return None
return response


def get_documents_from_fff(host: str, port: int = cfg["FFF_PORT"], runs_ids: list = []):
"""
Given a host and the port where fff_dqmtools is listenting to, request documents for runs
Expand All @@ -42,14 +68,44 @@ def get_documents_from_fff(host: str, port: int = cfg["FFF_PORT"], runs_ids: lis
cert=cert_path,
verify=False,
headers={},
cookies=cookies,
cookies=_get_cookies(),
timeout=30,
)
logger.debug(f"Got {len(r.content)} byte response.")
return r.content


def get_headers_from_fff(host: str, port: int = cfg["FFF_PORT"], revision: int = 0):
def process_headers_and_store_data(host: str, db: DQM2MirrorDB, headers: list[dict]):
"""
Process headers as received from the DQM cluster, triage and then store the data
in the database.
"""
for i, header in enumerate(headers):
id = header["_id"]
logger.info(f"Processing header {str(id)} ({i+1}/{len(headers)})")

is_bu = host.startswith("bu") or host.startswith("dqmrubu")
if is_bu and "analyze_files" not in id:
logger.debug("Skipping, no 'analyze_files' key found")
continue
document_answer = get_documents_from_fff(host, runs_ids=[id])
document = json.loads(json.loads(document_answer)["messages"][0])["documents"][
0
]
logger.debug("Filling info into DB ... ")

# BU sends us file delivery graph info, FUs sends us logs and event processing rates.
answer = (
db.fill_graph(header, document) if is_bu else db.fill_run(header, document)
)
# TODO: Decide what kind of errors should be returned from the db fill functions.
# if answer:
# bad_rvs += [answer]


def get_headers_from_fff(
host: str, port: int = cfg["FFF_PORT"], revision: int = 0
) -> bytes | None:
"""
Given a host and the port where fff_dqmtools is listening to, connect to the proxy
(i.e. indirectly the SERVER_FFF_MACHINE) and request headers, starting from the
Expand All @@ -70,18 +126,26 @@ def get_headers_from_fff(host: str, port: int = cfg["FFF_PORT"], revision: int =
jsn = {"event": "sync_request", "known_rev": str(revision)}
data = json.dumps({"messages": [json.dumps(jsn)]})
logger.debug(f"POSTing to '{url}' with data: {jsn}")
r = requests.post(

r = _request_wrapper(
requests.post,
url,
data=data,
cert=cert_path,
verify=False,
headers={},
cookies=cookies,
cookies=_get_cookies(),
timeout=30,
)
logger.debug(f"Got response of length {len(r.content)}.")

return r.content
if r:
if r.status_code != 200:
logger.warning(
f"Got response {r.status_code} when POSTing to {url}: {r.content}."
)
return None
else:
return r.content
return None


# # Global list storing number of errors returned from fill/fill_graph(??)
Expand All @@ -90,57 +154,35 @@ def get_headers_from_fff(host: str, port: int = cfg["FFF_PORT"], revision: int =
# bad_rvs = []


def get_latest_info_from_host(host: str, rev: int, db: DQM2MirrorDB) -> None:
"""
Given a host and a revision, it gets the latest headers from the host specified,
and for each one it gets the appropriate "documents", storing them in the
database.
def process_cluster_status(cluster_status: requests.Response) -> dict | None:
"""
# global bad_rvs
logger.info(f"Updating host {host}, starting from revision {str(rev)}")
if not rev:
rev = 0
The response is expected to be a JSON like:

headers_answer = get_headers_from_fff(host, revision=rev)
{
'dqmrubu-c2a06-03...cms': {'up': True, 'msg': None},
'dqmfu-c2b01-45...cms': {'up': True, 'msg': None},
'dqmfu-c2b02-45...cms': {'up': True, 'msg': None}
}
"""
return_value = None
try:
headers_answer = json.loads(json.loads(headers_answer)["messages"][0])
headers = headers_answer["headers"]
rev = headers_answer["rev"]
logger.debug(
f"Got {headers_answer['total_sent']} headers, from {rev[0]} to {rev[1]}."
return_value = cluster_status.json()
assert all(
"up" in return_value[host] and "msg" in return_value[host]
for host in return_value
)
except Exception as e:
logger.warning(f"Error when getting headers: {repr(e)}")
logger.warning(f"Got response: {headers_answer}")
logger.warning(traceback.format_exc())
return

if not len(headers):
return
for i, header in enumerate(headers):
id = header["_id"]
logger.info(f"Processing header {str(id)} ({i+1}/{len(headers)})")

is_bu = host.startswith("bu") or host.startswith("dqmrubu")
if is_bu and "analyze_files" not in id:
logger.debug("Skip, no 'analyze_files' key")
continue
document_answer = get_documents_from_fff(host, runs_ids=[id])
document = json.loads(json.loads(document_answer)["messages"][0])["documents"][
0
]
logger.debug("Filling info into DB ... ")

# BU sends us file delivery graph info, FUs sends us logs and event processing rates.
answer = (
db.fill_graph(header, document) if is_bu else db.fill_run(header, document)
except AssertionError:
logger.error(f"Unexpected status response received: {return_value}")
return_value = None
except Exception:
logger.error(
f"Unhandled exception when parsing {cluster_status} status: {cluster_status.text}",
exc_info=True,
)
# TODO: Decide what kind of errors should be returned from the db fill functions.
# if answer:
# bad_rvs += [answer]
return return_value


def get_cluster_status(db: DQM2MirrorDB, cluster: str = "playback"):
def get_cluster_status(cluster: str = "playback") -> dict | None:
"""
Function that queries the gateway playback machine periodically to get the status of the
production or playback cluster machines.
Expand All @@ -150,28 +192,53 @@ def get_cluster_status(db: DQM2MirrorDB, cluster: str = "playback"):
cfg["CMSWEB_FRONTEND_PROXY_URL"] + "/",
"cr/exe?" + urlencode({"cluster": cluster, "what": "get_cluster_status"}),
)
response = requests.get(
response = _request_wrapper(
requests.get,
url,
cookies={str(cfg["FFF_SECRET_NAME"]): os.environ.get("DQM_FFF_SECRET").strip()},
cookies=_get_cookies(),
verify=False,
cert=([cfg["SERVER_GRID_CERT_PATH"], cfg["SERVER_GRID_KEY_PATH"]]),
)
if response.status_code != 200:
logger.error(
f"fff_dqmtools ({url}) returned {response.status_code}. Response: "
f"{response.text}"
)
raise Exception(
f"Failed to fetch {cluster} status. Got ({response.status_code}) {response.text}"
)
logger.debug(f"Got {cluster} cluster status.")
if response:
if response.status_code != 200:
logger.error(
f"fff_dqmtools ({url}) returned {response.status_code}. Response: "
f"{response.text}"
)
raise Exception(
f"Failed to fetch {cluster} status. Got ({response.status_code}) {response.text}"
)
logger.debug(f"Got {cluster} cluster status.")
return process_cluster_status(cluster_status=response)
return None

try:
response = response.json()
except Exception as e:
logger.error(f"Exception {e} when parsing: {response.text}")
raise Exception(f"Failed to parse {cluster} status. Got {response.text}")
db.fill_cluster_status(response)

def get_latest_info_from_host(host: str, rev: int) -> list[dict]:
"""
Given a host and a revision, it gets the latest headers from the host specified,
and for each one it gets the appropriate "documents", storing them in the
database.
"""
# global bad_rvs
logger.info(f"Updating host {host}, starting from revision {str(rev)}")
if not rev:
rev = 0
headers = []
headers_answer = get_headers_from_fff(host, revision=rev)
if headers_answer:
try:
headers_answer = json.loads(json.loads(headers_answer)["messages"][0])
headers = headers_answer["headers"]
rev = headers_answer["rev"]
logger.debug(
f"Got {headers_answer['total_sent']} headers, from {rev[0]} to {rev[1]}."
)
except Exception:
logger.warning("Error when getting headers: ", exc_info=True)
logger.warning(f"Got response: {headers_answer}")
return []

return headers


def get_latest_info_from_hosts(hosts: list[str], db: DQM2MirrorDB) -> None:
Expand All @@ -182,7 +249,16 @@ def get_latest_info_from_hosts(hosts: list[str], db: DQM2MirrorDB) -> None:
for host in hosts:
logger.debug(f"Getting latest rev for {host} from DB.")
rev = db.get_latest_revision(host)
get_latest_info_from_host(host=host, rev=rev, db=db)
headers = get_latest_info_from_host(host=host, rev=rev)
process_headers_and_store_data(host=host, db=db, headers=headers)


def _get_cookies() -> dict[str, str]:
env_secret: str = os.environ.get("DQM_FFF_SECRET", "")
assert env_secret, "No secret found in environmental variables"

# Trailing whitespace in secret leads to crashes, strip it
return {str(cfg["FFF_SECRET_NAME"]): env_secret.strip()}


if __name__ == "__main__":
Expand Down Expand Up @@ -229,19 +305,11 @@ def get_latest_info_from_hosts(hosts: list[str], db: DQM2MirrorDB) -> None:
cmsweb_proxy_url: str = cfg["CMSWEB_FRONTEND_PROXY_URL"]
cert_path: list[str] = [cfg["SERVER_GRID_CERT_PATH"], cfg["SERVER_GRID_KEY_PATH"]]

env_secret: str = os.environ.get("DQM_FFF_SECRET")
if env_secret:
fff_secret = env_secret
logger.debug("Found secret in environmental variables")
else:
logger.warning("No secret found in environmental variables")

# Trailing whitespace in secret leads to crashes, strip it
cookies: dict[str, str] = {str(cfg["FFF_SECRET_NAME"]): env_secret.strip()}
cookies = _get_cookies()

# DB CONNECTION
db_playback: DQM2MirrorDB = None
db_production: DQM2MirrorDB = None
db_playback: DQM2MirrorDB
db_production: DQM2MirrorDB

if "playback" in run_modes:
db_playback = DQM2MirrorDB(
Expand All @@ -264,7 +332,7 @@ def get_latest_info_from_hosts(hosts: list[str], db: DQM2MirrorDB) -> None:

logger.info("Starting loop for modes " + str(run_modes))

# Main Loop.
# Loop for fetching job information from the cluster.
# Fetches data for CMSSW jobs from DQM^2, and stores it into the database.
# For each mode (production/playback), loop over each available host FU/BU machine.
# For each one, request "headers" (TODO: what are they?). This will return at most 1000
Expand All @@ -275,17 +343,15 @@ def get_latest_info_from_hosts(hosts: list[str], db: DQM2MirrorDB) -> None:
# host. Using the latest revision on each header, it asks for 1000 more headers on the next
# iteration. This goes on forever, until the latest documents are fetched.
def loop_info(
machines: list[str],
hosts: list[str],
db: DQM2MirrorDB,
timeout: int = cfg["GRABBER_SLEEP_TIME_INFO"],
):
while True:
try:
get_latest_info_from_hosts(machines, db)
except Exception as error:
logger.warning(f"Crashed in info loop with error: {repr(error)}")
logger.warning(f"Traceback: {traceback.format_exc()}")
continue
get_latest_info_from_hosts(hosts, db)
except Exception:
logger.warning("Unhandled exception in info loop", exc_info=True)
logger.debug(f"Sleeping for {timeout}s")
time.sleep(timeout)

Expand All @@ -297,11 +363,11 @@ def loop_status(
):
while True:
try:
get_cluster_status(db, cluster)
except Exception as error:
logger.warning(f"Crashed in status loop with error: {repr(error)}")
logger.warning(f"Traceback: {traceback.format_exc()}")
continue
cluster_status = get_cluster_status(cluster)
if cluster_status:
db.fill_cluster_status(cluster_status)
except Exception:
logger.warning("Unhandled exception in status loop", exc_info=True)
logger.debug(f"Sleeping for {timeout}s")
time.sleep(timeout)

Expand Down Expand Up @@ -344,23 +410,11 @@ def loop_status(
logger.info(f"Starting thread {thread._name}")
thread.start()
while True:
time.sleep(1)
# try:
# ### get content from active sites
# if "playback" in run_modes:
# # get_latest_info_from_hosts(playback_machines, db_playback)
# get_cluster_status(db_playback, "playback")
# if "production" in run_modes:
# # get_latest_info_from_hosts(production_machines, db_production)
# get_cluster_status(db_production, "production")
# except KeyboardInterrupt:
# break
# except Exception as error:
# logger.warning(f"Crashed in loop with error: {repr(error)}")
# logger.warning(f"Traceback: {traceback.format_exc()}")
# continue

# time.sleep(int(cfg["GRABBER_SLEEP_TIME_INFO"]))
try:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Keyboard interrupt detected, exiting")
break

# if len(bad_rvs):
# log.info(f"BAD REVISIONS: {bad_rvs}")
Loading
Loading