From 9c5b32683878792a55276b9c5c51e3a5734b4e81 Mon Sep 17 00:00:00 2001 From: Dimitris Papagiannis Date: Wed, 8 Jan 2025 10:55:27 +0100 Subject: [PATCH 1/4] Sleep after unhandled exceptions Cleanup of logging as well, edit of a docstring. --- grabber.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/grabber.py b/grabber.py index 59db391..f91eb0d 100644 --- a/grabber.py +++ b/grabber.py @@ -7,7 +7,6 @@ import urllib3 from urllib.parse import urljoin, urlencode import logging -import traceback import dqmsquare_cfg from custom_logger import custom_formatter, set_log_handler from db import DQM2MirrorDB @@ -264,7 +263,7 @@ def get_latest_info_from_hosts(hosts: list[str], db: DQM2MirrorDB) -> None: logger.info("Starting loop for modes " + str(run_modes)) - # Main Loop. + # Loop for fetching job information from the cluster. # Fetches data for CMSSW jobs from DQM^2, and stores it into the database. # For each mode (production/playback), loop over each available host FU/BU machine. # For each one, request "headers" (TODO: what are they?). This will return at most 1000 @@ -282,10 +281,8 @@ def loop_info( while True: try: get_latest_info_from_hosts(machines, db) - except Exception as error: - logger.warning(f"Crashed in info loop with error: {repr(error)}") - logger.warning(f"Traceback: {traceback.format_exc()}") - continue + except Exception: + logger.warning("Unhandled exception in info loop", exc_info=True) logger.debug(f"Sleeping for {timeout}s") time.sleep(timeout) @@ -298,10 +295,8 @@ def loop_status( while True: try: get_cluster_status(db, cluster) - except Exception as error: - logger.warning(f"Crashed in status loop with error: {repr(error)}") - logger.warning(f"Traceback: {traceback.format_exc()}") - continue + except Exception: + logger.warning("Unhandled exception in status loop", exc_info=True) logger.debug(f"Sleeping for {timeout}s") time.sleep(timeout) From 32215702fdf5ef5a24aa3393c1225d821fe8613e Mon Sep 17 00:00:00 2001 From: Dimitris Papagiannis Date: Wed, 8 Jan 2025 11:02:42 +0100 Subject: [PATCH 2/4] Handle errors in get_headers_from_fff --- grabber.py | 64 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/grabber.py b/grabber.py index f91eb0d..61caa98 100644 --- a/grabber.py +++ b/grabber.py @@ -48,7 +48,9 @@ def get_documents_from_fff(host: str, port: int = cfg["FFF_PORT"], runs_ids: lis return r.content -def get_headers_from_fff(host: str, port: int = cfg["FFF_PORT"], revision: int = 0): +def get_headers_from_fff( + host: str, port: int = cfg["FFF_PORT"], revision: int = 0 +) -> bytes | None: """ Given a host and the port where fff_dqmtools is listening to, connect to the proxy (i.e. indirectly the SERVER_FFF_MACHINE) and request headers, starting from the @@ -69,17 +71,27 @@ def get_headers_from_fff(host: str, port: int = cfg["FFF_PORT"], revision: int = jsn = {"event": "sync_request", "known_rev": str(revision)} data = json.dumps({"messages": [json.dumps(jsn)]}) logger.debug(f"POSTing to '{url}' with data: {jsn}") - r = requests.post( - url, - data=data, - cert=cert_path, - verify=False, - headers={}, - cookies=cookies, - timeout=30, - ) - logger.debug(f"Got response of length {len(r.content)}.") - + try: + r = requests.post( + url, + data=data, + cert=cert_path, + verify=False, + headers={}, + cookies=cookies, + timeout=30, + ) + except urllib3.exceptions.ReadTimeoutError: + logger.warning(f"Timeout when POSTing to {url}", exc_info=True) + return None + except requests.exceptions.ConnectionError: + logger.warning(f"Could not connect to {url}", exc_info=True) + return None + if r.status_code != 200: + logger.warning( + f"Got response {r.status_code} when POSTing to {url}: {r.content}." + ) + return None return r.content @@ -99,22 +111,22 @@ def get_latest_info_from_host(host: str, rev: int, db: DQM2MirrorDB) -> None: logger.info(f"Updating host {host}, starting from revision {str(rev)}") if not rev: rev = 0 - + headers = [] headers_answer = get_headers_from_fff(host, revision=rev) - try: - headers_answer = json.loads(json.loads(headers_answer)["messages"][0]) - headers = headers_answer["headers"] - rev = headers_answer["rev"] - logger.debug( - f"Got {headers_answer['total_sent']} headers, from {rev[0]} to {rev[1]}." - ) - except Exception as e: - logger.warning(f"Error when getting headers: {repr(e)}") - logger.warning(f"Got response: {headers_answer}") - logger.warning(traceback.format_exc()) - return + if headers_answer: + try: + headers_answer = json.loads(json.loads(headers_answer)["messages"][0]) + headers = headers_answer["headers"] + rev = headers_answer["rev"] + logger.debug( + f"Got {headers_answer['total_sent']} headers, from {rev[0]} to {rev[1]}." + ) + except Exception: + logger.warning("Error when getting headers: ", exc_info=True) + logger.warning(f"Got response: {headers_answer}") + return - if not len(headers): + if not headers: return for i, header in enumerate(headers): id = header["_id"] From a6fb70b5ee35f60ba7f8d31f3e78018623382dd5 Mon Sep 17 00:00:00 2001 From: Dimitris Papagiannis Date: Wed, 8 Jan 2025 15:09:54 +0100 Subject: [PATCH 3/4] Restructure grabber.py - Split code into more functions to isolate functionality - Added more exception handling for cases that the cluster is unreachable. --- grabber.py | 275 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 161 insertions(+), 114 deletions(-) diff --git a/grabber.py b/grabber.py index 61caa98..eb9219f 100644 --- a/grabber.py +++ b/grabber.py @@ -11,6 +11,7 @@ from custom_logger import custom_formatter, set_log_handler from db import DQM2MirrorDB import threading +from collections.abc import Callable urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -18,6 +19,32 @@ cfg = dqmsquare_cfg.load_cfg() +def _request_wrapper( + method: Callable, url: str, *args, **kwargs +) -> requests.Response | None: + """ + Simple wrapper to be used on request methods to handle common exceptions. + On exception, None is returned, else the response is returned. + """ + try: + response = method(url, *args, **kwargs) + except urllib3.exceptions.ReadTimeoutError: + logger.warning( + f"Timeout when running {method.__name__} on {url}", exc_info=True + ) + return None + except requests.exceptions.ConnectionError: + logger.warning(f"Could not connect to {url}", exc_info=True) + return None + except Exception: + logger.warning( + f"Unhandled exception when running {method.__name__} on {url}", + exc_info=True, + ) + return None + return response + + def get_documents_from_fff(host: str, port: int = cfg["FFF_PORT"], runs_ids: list = []): """ Given a host and the port where fff_dqmtools is listenting to, request documents for runs @@ -41,13 +68,41 @@ def get_documents_from_fff(host: str, port: int = cfg["FFF_PORT"], runs_ids: lis cert=cert_path, verify=False, headers={}, - cookies=cookies, + cookies=_get_cookies(), timeout=30, ) logger.debug(f"Got {len(r.content)} byte response.") return r.content +def process_headers_and_store_data(host: str, db: DQM2MirrorDB, headers: list[dict]): + """ + Process headers as received from the DQM cluster, triage and then store the data + in the database. + """ + for i, header in enumerate(headers): + id = header["_id"] + logger.info(f"Processing header {str(id)} ({i+1}/{len(headers)})") + + is_bu = host.startswith("bu") or host.startswith("dqmrubu") + if is_bu and "analyze_files" not in id: + logger.debug("Skipping, no 'analyze_files' key found") + continue + document_answer = get_documents_from_fff(host, runs_ids=[id]) + document = json.loads(json.loads(document_answer)["messages"][0])["documents"][ + 0 + ] + logger.debug("Filling info into DB ... ") + + # BU sends us file delivery graph info, FUs sends us logs and event processing rates. + answer = ( + db.fill_graph(header, document) if is_bu else db.fill_run(header, document) + ) + # TODO: Decide what kind of errors should be returned from the db fill functions. + # if answer: + # bad_rvs += [answer] + + def get_headers_from_fff( host: str, port: int = cfg["FFF_PORT"], revision: int = 0 ) -> bytes | None: @@ -71,28 +126,26 @@ def get_headers_from_fff( jsn = {"event": "sync_request", "known_rev": str(revision)} data = json.dumps({"messages": [json.dumps(jsn)]}) logger.debug(f"POSTing to '{url}' with data: {jsn}") - try: - r = requests.post( - url, - data=data, - cert=cert_path, - verify=False, - headers={}, - cookies=cookies, - timeout=30, - ) - except urllib3.exceptions.ReadTimeoutError: - logger.warning(f"Timeout when POSTing to {url}", exc_info=True) - return None - except requests.exceptions.ConnectionError: - logger.warning(f"Could not connect to {url}", exc_info=True) - return None - if r.status_code != 200: - logger.warning( - f"Got response {r.status_code} when POSTing to {url}: {r.content}." - ) - return None - return r.content + + r = _request_wrapper( + requests.post, + url, + data=data, + cert=cert_path, + verify=False, + headers={}, + cookies=_get_cookies(), + timeout=30, + ) + if r: + if r.status_code != 200: + logger.warning( + f"Got response {r.status_code} when POSTing to {url}: {r.content}." + ) + return None + else: + return r.content + return None # # Global list storing number of errors returned from fill/fill_graph(??) @@ -101,7 +154,66 @@ def get_headers_from_fff( # bad_rvs = [] -def get_latest_info_from_host(host: str, rev: int, db: DQM2MirrorDB) -> None: +def process_cluster_status(cluster_status: requests.Response) -> dict | None: + """ + The response is expected to be a JSON like: + + { + 'dqmrubu-c2a06-03-01.cms': {'up': True, 'msg': None}, + 'dqmfu-c2b01-45-01.cms': {'up': True, 'msg': None}, + 'dqmfu-c2b02-45-01.cms': {'up': True, 'msg': None} + } + """ + return_value = None + try: + return_value = cluster_status.json() + assert all( + "up" in return_value[host] and "msg" in return_value[host] + for host in return_value + ) + except AssertionError: + logger.error(f"Unexpected status response received: {return_value}") + return_value = None + except Exception: + logger.error( + f"Unhandled exception when parsing {cluster_status} status: {cluster_status.text}", + exc_info=True, + ) + return return_value + + +def get_cluster_status(cluster: str = "playback") -> dict | None: + """ + Function that queries the gateway playback machine periodically to get the status of the + production or playback cluster machines. + """ + logger.debug(f"Requesting {cluster} cluster status.") + url = urljoin( + cfg["CMSWEB_FRONTEND_PROXY_URL"] + "/", + "cr/exe?" + urlencode({"cluster": cluster, "what": "get_cluster_status"}), + ) + response = _request_wrapper( + requests.get, + url, + cookies=_get_cookies(), + verify=False, + cert=([cfg["SERVER_GRID_CERT_PATH"], cfg["SERVER_GRID_KEY_PATH"]]), + ) + if response: + if response.status_code != 200: + logger.error( + f"fff_dqmtools ({url}) returned {response.status_code}. Response: " + f"{response.text}" + ) + raise Exception( + f"Failed to fetch {cluster} status. Got ({response.status_code}) {response.text}" + ) + logger.debug(f"Got {cluster} cluster status.") + return process_cluster_status(cluster_status=response) + return None + + +def get_latest_info_from_host(host: str, rev: int) -> list[dict]: """ Given a host and a revision, it gets the latest headers from the host specified, and for each one it gets the appropriate "documents", storing them in the @@ -124,65 +236,9 @@ def get_latest_info_from_host(host: str, rev: int, db: DQM2MirrorDB) -> None: except Exception: logger.warning("Error when getting headers: ", exc_info=True) logger.warning(f"Got response: {headers_answer}") - return - - if not headers: - return - for i, header in enumerate(headers): - id = header["_id"] - logger.info(f"Processing header {str(id)} ({i+1}/{len(headers)})") - - is_bu = host.startswith("bu") or host.startswith("dqmrubu") - if is_bu and "analyze_files" not in id: - logger.debug("Skip, no 'analyze_files' key") - continue - document_answer = get_documents_from_fff(host, runs_ids=[id]) - document = json.loads(json.loads(document_answer)["messages"][0])["documents"][ - 0 - ] - logger.debug("Filling info into DB ... ") - - # BU sends us file delivery graph info, FUs sends us logs and event processing rates. - answer = ( - db.fill_graph(header, document) if is_bu else db.fill_run(header, document) - ) - # TODO: Decide what kind of errors should be returned from the db fill functions. - # if answer: - # bad_rvs += [answer] + return [] - -def get_cluster_status(db: DQM2MirrorDB, cluster: str = "playback"): - """ - Function that queries the gateway playback machine periodically to get the status of the - production or playback cluster machines. - """ - logger.debug(f"Requesting {cluster} cluster status.") - url = urljoin( - cfg["CMSWEB_FRONTEND_PROXY_URL"] + "/", - "cr/exe?" + urlencode({"cluster": cluster, "what": "get_cluster_status"}), - ) - response = requests.get( - url, - cookies={str(cfg["FFF_SECRET_NAME"]): os.environ.get("DQM_FFF_SECRET").strip()}, - verify=False, - cert=([cfg["SERVER_GRID_CERT_PATH"], cfg["SERVER_GRID_KEY_PATH"]]), - ) - if response.status_code != 200: - logger.error( - f"fff_dqmtools ({url}) returned {response.status_code}. Response: " - f"{response.text}" - ) - raise Exception( - f"Failed to fetch {cluster} status. Got ({response.status_code}) {response.text}" - ) - logger.debug(f"Got {cluster} cluster status.") - - try: - response = response.json() - except Exception as e: - logger.error(f"Exception {e} when parsing: {response.text}") - raise Exception(f"Failed to parse {cluster} status. Got {response.text}") - db.fill_cluster_status(response) + return headers def get_latest_info_from_hosts(hosts: list[str], db: DQM2MirrorDB) -> None: @@ -193,7 +249,16 @@ def get_latest_info_from_hosts(hosts: list[str], db: DQM2MirrorDB) -> None: for host in hosts: logger.debug(f"Getting latest rev for {host} from DB.") rev = db.get_latest_revision(host) - get_latest_info_from_host(host=host, rev=rev, db=db) + headers = get_latest_info_from_host(host=host, rev=rev) + process_headers_and_store_data(host=host, db=db, headers=headers) + + +def _get_cookies() -> dict[str, str]: + env_secret: str = os.environ.get("DQM_FFF_SECRET", "") + assert env_secret, "No secret found in environmental variables" + + # Trailing whitespace in secret leads to crashes, strip it + return {str(cfg["FFF_SECRET_NAME"]): env_secret.strip()} if __name__ == "__main__": @@ -240,19 +305,11 @@ def get_latest_info_from_hosts(hosts: list[str], db: DQM2MirrorDB) -> None: cmsweb_proxy_url: str = cfg["CMSWEB_FRONTEND_PROXY_URL"] cert_path: list[str] = [cfg["SERVER_GRID_CERT_PATH"], cfg["SERVER_GRID_KEY_PATH"]] - env_secret: str = os.environ.get("DQM_FFF_SECRET") - if env_secret: - fff_secret = env_secret - logger.debug("Found secret in environmental variables") - else: - logger.warning("No secret found in environmental variables") - - # Trailing whitespace in secret leads to crashes, strip it - cookies: dict[str, str] = {str(cfg["FFF_SECRET_NAME"]): env_secret.strip()} + cookies = _get_cookies() # DB CONNECTION - db_playback: DQM2MirrorDB = None - db_production: DQM2MirrorDB = None + db_playback: DQM2MirrorDB + db_production: DQM2MirrorDB if "playback" in run_modes: db_playback = DQM2MirrorDB( @@ -286,13 +343,13 @@ def get_latest_info_from_hosts(hosts: list[str], db: DQM2MirrorDB) -> None: # host. Using the latest revision on each header, it asks for 1000 more headers on the next # iteration. This goes on forever, until the latest documents are fetched. def loop_info( - machines: list[str], + hosts: list[str], db: DQM2MirrorDB, timeout: int = cfg["GRABBER_SLEEP_TIME_INFO"], ): while True: try: - get_latest_info_from_hosts(machines, db) + get_latest_info_from_hosts(hosts, db) except Exception: logger.warning("Unhandled exception in info loop", exc_info=True) logger.debug(f"Sleeping for {timeout}s") @@ -306,7 +363,9 @@ def loop_status( ): while True: try: - get_cluster_status(db, cluster) + cluster_status = get_cluster_status(cluster) + if cluster_status: + db.fill_cluster_status(cluster_status) except Exception: logger.warning("Unhandled exception in status loop", exc_info=True) logger.debug(f"Sleeping for {timeout}s") @@ -351,23 +410,11 @@ def loop_status( logger.info(f"Starting thread {thread._name}") thread.start() while True: - time.sleep(1) - # try: - # ### get content from active sites - # if "playback" in run_modes: - # # get_latest_info_from_hosts(playback_machines, db_playback) - # get_cluster_status(db_playback, "playback") - # if "production" in run_modes: - # # get_latest_info_from_hosts(production_machines, db_production) - # get_cluster_status(db_production, "production") - # except KeyboardInterrupt: - # break - # except Exception as error: - # logger.warning(f"Crashed in loop with error: {repr(error)}") - # logger.warning(f"Traceback: {traceback.format_exc()}") - # continue - - # time.sleep(int(cfg["GRABBER_SLEEP_TIME_INFO"])) + try: + time.sleep(1) + except KeyboardInterrupt: + logger.info("Keyboard interrupt detected, exiting") + break # if len(bad_rvs): # log.info(f"BAD REVISIONS: {bad_rvs}") From 503194710144c698849fdff1ec6cfa2f751a67ec Mon Sep 17 00:00:00 2001 From: Dimitris Papagiannis Date: Wed, 8 Jan 2025 15:10:23 +0100 Subject: [PATCH 4/4] Cleanup requirements.txt --- grabber.py | 6 +++--- requirements.txt | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/grabber.py b/grabber.py index eb9219f..ea57bc5 100644 --- a/grabber.py +++ b/grabber.py @@ -159,9 +159,9 @@ def process_cluster_status(cluster_status: requests.Response) -> dict | None: The response is expected to be a JSON like: { - 'dqmrubu-c2a06-03-01.cms': {'up': True, 'msg': None}, - 'dqmfu-c2b01-45-01.cms': {'up': True, 'msg': None}, - 'dqmfu-c2b02-45-01.cms': {'up': True, 'msg': None} + 'dqmrubu-c2a06-03...cms': {'up': True, 'msg': None}, + 'dqmfu-c2b01-45...cms': {'up': True, 'msg': None}, + 'dqmfu-c2b02-45...cms': {'up': True, 'msg': None} } """ return_value = None diff --git a/requirements.txt b/requirements.txt index 7d621fb..2134674 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,10 @@ requests==2.31.0 cernrequests Flask<3.0 -SQLAlchemy<2.0 +SQLAlchemy<2.0 sqlalchemy-utils gunicorn<21.0.0 psycopg2-binary werkzeug python-dotenv<2.0.0 -pytz +pytz \ No newline at end of file