diff --git a/bin/hgcs_master.py b/bin/hgcs_master.py index f81f0c3..378c3d4 100644 --- a/bin/hgcs_master.py +++ b/bin/hgcs_master.py @@ -20,18 +20,6 @@ # sys.path.insert(0, _LIB_PATH) -# =============================================================== - -LOG_LEVEL_MAP = { - "ERROR": logging.ERROR, - "WARNING": logging.WARNING, - "INFO": logging.INFO, - "DEBUG": logging.DEBUG, -} - -# =============================================================== - - def main(): """ main function @@ -91,15 +79,20 @@ def main(): "flush_period": getattr(section, "flush_period", None), "grace_period": getattr(section, "grace_period", None), "limit": getattr(section, "limit", None), + "logger_format_colored": logger_format_colored, + "log_level": log_level, + "log_file": log_file, } agent_instance = class_obj(**param_dict) - utils.setup_logger(agent_instance.logger, pid=agent_instance.get_pid, colored=logger_format_colored, to_file=log_file) - logging_log_level = LOG_LEVEL_MAP.get(log_level, logging.ERROR) - agent_instance.logger.setLevel(logging_log_level) thread_list.append(agent_instance) + # master log + main_logger = logging.getLogger("hgcs_main") + utils.setup_logger(main_logger, pid=os.getpid(), colored=logger_format_colored, to_file=log_file) + main_logger.info("This is HGCS") # run threads for thr in thread_list: print(f"Start thread of agent {thr.__class__.__name__}") + main_logger.info(f"Start thread of agent {thr.__class__.__name__}") thr.start() diff --git a/lib/hgcs/agents.py b/lib/hgcs/agents.py index d6fc72e..5fa185c 100644 --- a/lib/hgcs/agents.py +++ b/lib/hgcs/agents.py @@ -65,6 +65,8 @@ def __init__(self, flush_period=86400, retrieve_mode="copy", **kwarg): self.retrieve_mode = retrieve_mode def run(self): + self.set_logger() + self.logger.info("agent starts") self.logger.debug(f"startTimestamp: {self.start_timestamp}") already_handled_job_id_set = set() last_flush_timestamp = time.time() @@ -86,6 +88,7 @@ def run(self): else: self.logger.error(f"{exc} . No more retry. Exit") return + n_new_handled_jobs = 0 for job in schedd.xquery(constraint=self.requirements, projection=self.projection): job_id = get_condor_job_id(job) if job_id in already_handled_job_id_set: @@ -97,6 +100,7 @@ def run(self): ret_val = self.via_system(job) if ret_val: already_handled_job_id_set.add(job_id) + n_new_handled_jobs += 1 elif self.retrieve_mode == "condor": self.via_condor_retrieve(job) n_try = 3 @@ -112,7 +116,7 @@ def run(self): else: already_handled_job_id_set.clear() break - self.logger.info("run ends") + self.logger.info(f"run ends; handled {n_new_handled_jobs} jobs") time.sleep(self.sleep_period) def via_system(self, job, symlink_mode=False): @@ -204,6 +208,8 @@ def __init__(self, sleep_period=60, delay_time=7200): self.delay_time = delay_time def run(self): + self.set_logger() + self.logger.info("agent starts") self.logger.debug(f"startTimestamp: {self.start_timestamp}") while True: self.logger.info("run starts") @@ -267,6 +273,8 @@ def __init__(self, flush_period=86400, limit=6000, **kwarg): self.limit = 6000 def run(self): + self.set_logger() + self.logger.info("agent starts") self.logger.debug(f"startTimestamp: {self.start_timestamp}") already_handled_job_id_set = set() last_flush_timestamp = time.time() @@ -290,6 +298,8 @@ def run(self): return already_sdf_copied_job_id_set = set() to_skip_sdf_copied_job_id_set = set() + n_new_handled_jobs = 0 + n_new_skipped_jobs = 0 try: jobs_iter = schedd.xquery(constraint=self.requirements, projection=self.projection, limit=self.limit) for job in jobs_iter: @@ -300,8 +310,10 @@ def run(self): ret_val = self.via_system(job) if ret_val is True: already_sdf_copied_job_id_set.add(job_id) + n_new_handled_jobs += 1 elif ret_val is False: to_skip_sdf_copied_job_id_set.add(job_id) + n_new_skipped_jobs += 1 except RuntimeError as exc: self.logger.error(f"Failed to query jobs. Exit. RuntimeError: {exc} ") else: @@ -333,7 +345,7 @@ def run(self): already_handled_job_id_set.update(to_skip_sdf_copied_job_id_set) to_skip_sdf_copied_job_id_set.clear() break - self.logger.info("run ends") + self.logger.info(f"run ends; handled {n_new_handled_jobs} jobs, skipped {n_new_skipped_jobs} jobs") time.sleep(self.sleep_period) def via_system(self, job): @@ -397,6 +409,8 @@ def __init__(self, grace_period=86400, **kwarg): self.grace_period = grace_period def run(self): + self.set_logger() + self.logger.info("agent starts") self.logger.debug(f"startTimestamp: {self.start_timestamp}") while True: self.logger.info("run starts") @@ -412,6 +426,7 @@ def run(self): else: self.logger.error(f"{exc} . No more retry. Exit") return + res_str = str(None) try: requirements = self.requirements_template.format(grace_period=int(self.grace_period)) self.logger.debug("try to remove-x jobs") @@ -420,6 +435,6 @@ def run(self): except RuntimeError as exc: self.logger.error(f"Failed to remove-x jobs. Exit. RuntimeError: {exc} ") else: - self.logger.debug(f"act return : {str(dict(act_ret))}") - self.logger.info("run ends") + res_str = str(dict(act_ret)) + self.logger.info(f"run ends; return: {res_str}") time.sleep(self.sleep_period) diff --git a/lib/hgcs/utils.py b/lib/hgcs/utils.py index 1d40ec2..604d803 100644 --- a/lib/hgcs/utils.py +++ b/lib/hgcs/utils.py @@ -7,19 +7,28 @@ import threading import time -try: - from threading import get_ident -except ImportError: - from thread import get_ident +from threading import get_ident import htcondor + # =============================================================== global_lock = threading.Lock() # =============================================================== +# =============================================================== + +LOG_LEVEL_MAP = { + "ERROR": logging.ERROR, + "WARNING": logging.WARNING, + "INFO": logging.INFO, + "DEBUG": logging.DEBUG, +} + +# =============================================================== + def setup_logger(logger, pid=None, colored=True, to_file=None): """ @@ -68,14 +77,21 @@ class ThreadBase(threading.Thread): base class of thread to run HGCS agents """ - def __init__(self, sleep_period=60, **kwarg): + def __init__(self, sleep_period=60, **kwargs): threading.Thread.__init__(self) self.os_pid = os.getpid() self.logger = logging.getLogger(self.__class__.__name__) self.sleep_period = sleep_period self.start_timestamp = time.time() + self.logger_format_colored = kwargs.get("logger_format_colored") + self.log_level = kwargs.get("log_level") + self.log_file = kwargs.get("log_file") + + def set_logger(self): + setup_logger(self.logger, pid=self.get_pid(), colored=self.logger_format_colored, to_file=self.log_file) + logging_log_level = LOG_LEVEL_MAP.get(self.log_level, logging.ERROR) + self.logger.setLevel(logging_log_level) - @property def get_pid(self): """ get unique thread identifier including process ID (from OS) and thread ID (from python) diff --git a/pkg_info.py b/pkg_info.py index 5b7bb79..4e5ce2b 100644 --- a/pkg_info.py +++ b/pkg_info.py @@ -1 +1 @@ -release_version = "2.0.1" +release_version = "2.0.2" diff --git a/pyproject.toml b/pyproject.toml index 29d29c5..89a85df 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,9 +18,7 @@ dependencies = [ ] [project.optional-dependencies] -kubernetes = ['kubernetes', 'pyyaml'] -mysql = ['mysqlclient'] -atlasgrid = ['uWSGI >= 2.0.20', 'htcondor >= 10.3.0', 'mysqlclient >= 2.1.1'] +atlasgrid = ['htcondor >= 10.3.0'] [project.urls] Homepage = "https://github.com/PanDAWMS/HGCS" diff --git a/temp/single_script.py b/temp/single_script.py index 6512f2f..8f26edb 100644 --- a/temp/single_script.py +++ b/temp/single_script.py @@ -7,14 +7,10 @@ import threading import time -try: - from threading import get_ident -except ImportError: - from thread import get_ident +from threading import get_ident import classad import htcondor -from six import configparser # =============================================================== @@ -66,10 +62,9 @@ def __init__(self): threading.Thread.__init__(self) self.os_pid = os.getpid() self.logger = logging.getLogger(self.__class__.__name__) - setupLogger(self.logger, pid=self.get_pid, colored=False) + setupLogger(self.logger, pid=self.get_pid(), colored=False) self.start_timestamp = time.time() - @property def get_pid(self): return f"{self.os_pid}-{get_ident()}" @@ -178,7 +173,7 @@ def via_system(self, job, symlink_mode=False): name = match.group(1) dest_path = os.path.normpath(match.group(2)) if name == src_log_name: - dest_log = osdest_path + dest_log = dest_path elif name == src_out_name: dest_out = dest_path elif name == src_err_name: