diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml new file mode 100644 index 0000000..2c2e794 --- /dev/null +++ b/.github/workflows/codeql.yml @@ -0,0 +1,77 @@ +# For most projects, this workflow file will not need changing; you simply need +# to commit it to your repository. +# +# You may wish to alter this file to override the set of languages analyzed, +# or to provide custom queries or build logic. +# +# ******** NOTE ******** +# We have attempted to detect the languages in your repository. Please check +# the `language` matrix defined below to confirm you have the correct set of +# supported CodeQL languages. +# +name: "CodeQL" + +on: + push: + branches: [ "master" ] + pull_request: + # The branches below must be a subset of the branches above + branches: [ "master" ] + schedule: + - cron: '33 1 * * 2' + +jobs: + analyze: + name: Analyze + runs-on: ${{ (matrix.language == 'swift' && 'macos-latest') || 'ubuntu-latest' }} + timeout-minutes: ${{ (matrix.language == 'swift' && 120) || 360 }} + permissions: + actions: read + contents: read + security-events: write + + strategy: + fail-fast: false + matrix: + language: [ 'python' ] + # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby', 'swift' ] + # Use only 'java' to analyze code written in Java, Kotlin or both + # Use only 'javascript' to analyze code written in JavaScript, TypeScript or both + # Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v2 + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + + # For more details on CodeQL's query packs, refer to: https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs + # queries: security-extended,security-and-quality + + + # Autobuild attempts to build any compiled languages (C/C++, C#, Go, Java, or Swift). + # If this step fails, then you should remove it and run the build manually (see below) + - name: Autobuild + uses: github/codeql-action/autobuild@v2 + + # ℹī¸ Command-line programs to run using the OS shell. + # 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun + + # If the Autobuild fails above, remove it and uncomment the following three lines. + # modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance. + + # - run: | + # echo "Run, Build Application using script" + # ./location_of_script_within_repo/buildscript.sh + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v2 + with: + category: "/language:${{matrix.language}}" diff --git a/.github/workflows/pylint.yml b/.github/workflows/pylint.yml new file mode 100644 index 0000000..4f8f2e8 --- /dev/null +++ b/.github/workflows/pylint.yml @@ -0,0 +1,31 @@ +name: Pylint + +on: + push: + branches: [ "master" ] + pull_request: + # The branches below must be a subset of the branches above + branches: [ "master" ] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.9", "3.11"] + steps: + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + - name: Install pylint dependencies + run: | + python -m pip install --upgrade pip + pip install pylint + - name: Install code dependencies + run: | + pip install --upgrade htcondor + - name: Analysing the code with pylint + run: | + pylint --fail-under=8.0 --extension-pkg-allow-list=htcondor $(git ls-files 'bin/*.py' 'lib/*.py') diff --git a/bin/hgcs-master.py b/bin/hgcs_master.py similarity index 66% rename from bin/hgcs-master.py rename to bin/hgcs_master.py index f8aaebd..ae27041 100644 --- a/bin/hgcs-master.py +++ b/bin/hgcs_master.py @@ -1,13 +1,11 @@ +""" +main executable of HGCS +""" + import os import sys import inspect -import errno -import shutil import argparse - -import time -import re - import logging # # Get main directory path @@ -32,38 +30,18 @@ #=============================================================== -def testing(): - """ - Test function - """ - # schedd = MySchedd() - # for job in schedd.xquery(projection=['ClusterId', 'ProcId', 'JobStatus']): - # print(repr(job)) - # requirements = ( - # 'JobStatus == 4 ' - # '&& User == "atlpan@cern.ch" ' - # '&& ClusterId == 18769 ' - # ) - # for job in schedd.xquery(constraint=requirements): - # print(job.get('ClusterId'), job.get('JobStatus'), job.get('SUBMIT_UserLog', None), job.get('ffffff', None)) - # sleep_period = 300 - # thread_list = [] - # thread_list.append(LogRetriever(sleep_period=sleep_period)) - # # thread_list.append(CleanupDelayer(sleep_period=sleep_period)) - # thread_list.append(SDFFetcher(sleep_period=sleep_period)) - # [ thr.start() for thr in thread_list ] - pass - def main(): """ - Main function + main function """ # command argparse oparser = argparse.ArgumentParser(prog='hgcs', add_help=True) - subparsers = oparser.add_subparsers() - oparser.add_argument('-c', '--config', action='store', dest='config', - metavar='', help='Configuration file') - oparser.add_argument('-F', '--foregroudlog', action='store_true', dest='foregroudlog', + # subparsers = oparser.add_subparsers() + oparser.add_argument('-c', '--config', action='store', + dest='config', metavar='', + help='Configuration file') + oparser.add_argument('-F', '--foregroudlog', action='store_true', + dest='foregroudlog', help='Print logs to foregroud') # start parsing if len(sys.argv) == 1: @@ -74,7 +52,7 @@ def main(): if os.path.isfile(arguments.config): config_file_path = arguments.config else: - print('Invalid configuration file: {0}'.format(arguments.config)) + print(f'Invalid configuration file: {arguments.config}') sys.exit(1) # defaults log_file = '/tmp/hgcs.log' @@ -83,11 +61,11 @@ def main(): # load config try: config = hgcs_config.ConfigClass(config_file_path) - except IOError as e: - print('IOError: {0}'.format(e)) + except IOError as exc: + print(f'IOError: {exc}') sys.exit(1) - except Exception as e: - print('Cannot load conifg: {0}'.format(e)) + except Exception as exc: + print(f'Cannot load conifg: {exc}') sys.exit(1) # handle master part of config try: @@ -118,19 +96,19 @@ def main(): 'limit': getattr(section, 'limit', None), } agent_instance = class_obj(**param_dict) - utils.setupLogger(agent_instance.logger, + utils.setup_logger(agent_instance.logger, pid=agent_instance.get_pid, colored=logger_format_colored, to_file=log_file) - agent_instance.logger.setLevel(LOG_LEVEL_MAP.get(log_level, logging.ERROR)) + logging_log_level = LOG_LEVEL_MAP.get(log_level, logging.ERROR) + agent_instance.logger.setLevel(logging_log_level) thread_list.append(agent_instance) # run threads for thr in thread_list: - print('Start thread of agent {0}'.format(thr.__class__.__name__)) + print(f'Start thread of agent {thr.__class__.__name__}') thr.start() #=============================================================== if __name__ == '__main__': - # testing() main() diff --git a/lib/hgcs/agents.py b/lib/hgcs/agents.py index eb4fcdb..7be5e0c 100644 --- a/lib/hgcs/agents.py +++ b/lib/hgcs/agents.py @@ -1,5 +1,9 @@ +""" +agents of HGCS +""" + import os -import sys +# import sys import errno import shutil import time @@ -9,25 +13,31 @@ #=============================================================== -# Get main directory path -_MAIN_DIR = os.path.join( os.path.dirname(__file__), '..' ) +# # Get main directory path +# _MAIN_DIR = os.path.join( os.path.dirname(__file__), '..' ) -# Setup lib path -_LIB_PATH = os.path.join( _MAIN_DIR, 'lib' ) -sys.path.insert(0, _LIB_PATH) +# # Setup lib path +# _LIB_PATH = os.path.join( _MAIN_DIR, 'lib' ) +# sys.path.insert(0, _LIB_PATH) from hgcs.utils import ThreadBase, MySchedd, global_lock # noqa: E402 #=============================================================== def get_condor_job_id(job): - ClusterId = job.get('ClusterId') - ProcId = job.get('ProcId') - return '{0}.{1}'.format(ClusterId, ProcId) + """ + get full condor job ID as ClusterId.ProcId + """ + cluster_id = job.get('ClusterId') + proc_id = job.get('ProcId') + return f'{cluster_id}.{proc_id}' #=============================================================== class LogRetriever(ThreadBase): + """ + agent to retrieve or copy logs from external schedd host + """ projection = [ 'ClusterId', 'ProcId', 'JobStatus', @@ -51,7 +61,7 @@ def __init__(self, flush_period = 86400, retrieve_mode='copy', **kwarg): self.retrieve_mode = retrieve_mode def run(self): - self.logger.debug('startTimestamp: {0}'.format(self.startTimestamp)) + self.logger.debug(f'startTimestamp: {self.start_timestamp}') already_handled_job_id_set = set() last_flush_timestamp = time.time() while True: @@ -65,24 +75,24 @@ def run(self): try: schedd = MySchedd() break - except RuntimeError as e: + except RuntimeError as exc: if i_try < n_try: - self.logger.warning('{0} . Retry...'.format(e)) + self.logger.warning(f'{exc} . Retry...') time.sleep(3) else: - self.logger.error('{0} . No more retry. Exit'.format(e)) + self.logger.error(f'{exc} . No more retry. Exit') return for job in schedd.xquery(constraint=self.requirements, projection=self.projection): job_id = get_condor_job_id(job) if job_id in already_handled_job_id_set: continue - self.logger.debug('to retrieve for condor job {0}'.format(job_id)) + self.logger.debug(f'to retrieve for condor job {job_id}') if self.retrieve_mode == 'symlink': self.via_system(job, symlink_mode=True) elif self.retrieve_mode == 'copy': - retVal = self.via_system(job) - if retVal: + ret_val = self.via_system(job) + if ret_val: already_handled_job_id_set.add(job_id) elif self.retrieve_mode == 'condor': self.via_condor_retrieve(job) @@ -92,10 +102,10 @@ def run(self): schedd.edit(list(already_handled_job_id_set), 'LeaveJobInQueue', 'false') except RuntimeError: if i_try < n_try: - self.logger.warning('failed to edit job {0} . Retry: {1}'.format(job_id, i_try)) + self.logger.warning(f'failed to edit job {job_id} . Retry: {i_try}') time.sleep(1) else: - self.logger.warning('failed to edit job {0} . Skipped...'.format(job_id)) + self.logger.warning(f'failed to edit job {job_id} . Skipped...') else: already_handled_job_id_set.clear() break @@ -103,7 +113,10 @@ def run(self): time.sleep(self.sleep_period) def via_system(self, job, symlink_mode=False): - retVal = True + """ + symlink or copy logs when source and destination are on the same host + """ + ret_val = True job_id = get_condor_job_id(job) src_dir = job.get('Iwd') src_err_name = job.get('Err') @@ -117,15 +130,15 @@ def via_system(self, job, symlink_mode=False): dest_log = job.get('SUBMIT_UserLog') transfer_remap_list = str(job.get('SUBMIT_TransferOutputRemaps')).split(';') if not dest_log: - self.logger.debug('{0} has no attribute of spool. Skipped...'.format(job_id)) + self.logger.debug(f'{job_id} has no attribute of spool. Skipped...') return True for _m in transfer_remap_list: - match = re.search('([a-zA-Z0-9_.\-]+)=([a-zA-Z0-9_.\-/]+)', _m) + match = re.search(r'([a-zA-Z0-9_.\-]+)=([a-zA-Z0-9_.\-/]+)', _m) if match: name = match.group(1) dest_path = os.path.normpath(match.group(2)) if name == src_log_name: - dest_log = osdest_path + dest_log = dest_path elif name == src_out_name: dest_out = dest_path elif name == src_err_name: @@ -134,44 +147,51 @@ def via_system(self, job, symlink_mode=False): if not os.path.isfile(src_path) or os.path.islink(src_path): if job.get('JobStatus') != 4: continue - retVal = False - self.logger.error('{0} is not a regular file. Skipped...'.format(src_path)) + ret_val = False + self.logger.error(f'{src_path} is not a regular file. Skipped...') continue if not dest_path: - retVal = False - self.logger.error('no destination path for {0} . Skipped...'.format(src_path)) + ret_val = False + self.logger.error(f'no destination path for {src_path} . Skipped...') continue try: if symlink_mode: os.symlink(src_path, dest_path) if os.path.islink(dest_path): - self.logger.debug('{0} symlink made'.format(dest_path)) + self.logger.debug(f'{dest_path} symlink made') else: - retVal = False - self.logger.error('{0} made but not found'.format(dest_path)) + ret_val = False + self.logger.error(f'{dest_path} made but not found') else: shutil.copy2(src_path, dest_path) if os.path.isfile(dest_path): - self.logger.debug('{0} copy made'.format(dest_path)) + self.logger.debug(f'{dest_path} copy made') else: - retVal = False - self.logger.error('{0} made but not found'.format(dest_path)) - except OSError as e: - if e.errno == errno.EEXIST: - self.logger.debug('{0} file already exists. Skipped...'.format(dest_path)) + ret_val = False + self.logger.error(f'{dest_path} made but not found') + except OSError as exc: + if exc.errno == errno.EEXIST: + self.logger.debug(f'{dest_path} file already exists. Skipped...') else: - retVal = False - self.logger.error(e) - except Exception as e: - retVal = False - self.logger.error(e) - return retVal + ret_val = False + self.logger.error(exc) + except Exception as exc: + ret_val = False + self.logger.error(exc) + return ret_val def via_condor_retrieve(self, job): + """ + retrieve logs via condor_retrieve + Not implemented yet + """ pass class CleanupDelayer(ThreadBase): + """ + agent to adjust LeaveJobInQueue of jobs to delay cleanup of the jobs + """ requirements = ( 'SUBMIT_UserLog is undefined ' @@ -187,7 +207,7 @@ def __init__(self, sleep_period=60, delay_time=7200): self.delay_time = delay_time def run(self): - self.logger.debug('startTimestamp: {0}'.format(self.startTimestamp)) + self.logger.debug(f'startTimestamp: {self.start_timestamp}') while True: self.logger.info('run starts') n_try = 999 @@ -195,37 +215,39 @@ def run(self): try: schedd = MySchedd() break - except RuntimeError as e: + except RuntimeError as exc: if i_try < n_try: - self.logger.warning('{0} . Retry...'.format(e)) + self.logger.warning(f'{exc} . Retry...') time.sleep(3) else: - self.logger.error('{0} . No more retry. Exit'.format(e)) + self.logger.error(f'{exc} . No more retry. Exit') return - # for job in schedd.xquery(constraint=self.requirements): - # job_id = get_condor_job_id(job) - # self.logger.debug('to adjust LeaveJobInQueue of condor job {0}'.format(job_id)) - job_id_list = [ get_condor_job_id(job) for job in schedd.xquery(constraint=self.requirements) ] + job_id_list = [ get_condor_job_id(job) \ + for job in schedd.xquery(constraint=self.requirements) ] n_jobs = len(job_id_list) n_try = 3 for i_try in range(1, n_try + 1): try: schedd.edit(job_id_list, 'LeaveJobInQueue', - self.ad_LeaveJobInQueue_template.format(delay_time=self.delay_time)) + self.ad_LeaveJobInQueue_template.format( + delay_time=self.delay_time)) except RuntimeError: if i_try < n_try: - self.logger.warning('failed to edit {0} jobs . Retry: {1}'.format(n_jobs, i_try)) + self.logger.warning(f'failed to edit {n_jobs} jobs . Retry: {i_try}') time.sleep(1) else: - self.logger.warning('failed to edit {0} jobs . Skipped...'.format(n_jobs)) + self.logger.warning(f'failed to edit {n_jobs} jobs . Skipped...') else: - self.logger.debug('adjusted LeaveJobInQueue of {0} condor jobs '.format(n_jobs)) + self.logger.debug(f'adjusted LeaveJobInQueue of {n_jobs} condor jobs ') break self.logger.info('run ends') time.sleep(self.sleep_period) class SDFFetcher(ThreadBase): + """ + agent to copy submit description file (SDF) the job to the same directory of job logs + """ projection = [ 'ClusterId', 'ProcId', 'JobStatus', @@ -250,7 +272,7 @@ def __init__(self, flush_period=86400, limit=6000, **kwarg): self.limit = 6000 def run(self): - self.logger.debug('startTimestamp: {0}'.format(self.startTimestamp)) + self.logger.debug(f'startTimestamp: {self.start_timestamp}') already_handled_job_id_set = set() last_flush_timestamp = time.time() while True: @@ -264,15 +286,15 @@ def run(self): try: schedd = MySchedd() break - except RuntimeError as e: + except RuntimeError as exc: if i_try < n_try: - self.logger.warning('{0} . Retry...'.format(e)) + self.logger.warning(f'{exc} . Retry...') time.sleep(3) else: - self.logger.error('{0} . No more retry. Exit'.format(e)) + self.logger.error(f'{exc} . No more retry. Exit') return already_sdf_copied_job_id_set = set() - failed_and_to_skip_sdf_copied_job_id_set = set() + to_skip_sdf_copied_job_id_set = set() try: jobs_iter = schedd.xquery(constraint=self.requirements, projection=self.projection, @@ -281,14 +303,14 @@ def run(self): job_id = get_condor_job_id(job) if job_id in already_handled_job_id_set: continue - self.logger.debug('to copy sdf for condor job {0}'.format(job_id)) - retVal = self.via_system(job) - if retVal is True: + self.logger.debug(f'to copy sdf for condor job {job_id}') + ret_val = self.via_system(job) + if ret_val is True: already_sdf_copied_job_id_set.add(job_id) - elif retVal is False: - failed_and_to_skip_sdf_copied_job_id_set.add(job_id) - except RuntimeError as e: - self.logger.error('Failed to query jobs. Exit. RuntimeError: {0} '.format(e)) + elif ret_val is False: + to_skip_sdf_copied_job_id_set.add(job_id) + except RuntimeError as exc: + self.logger.error(f'Failed to query jobs. Exit. RuntimeError: {exc} ') else: n_try = 3 for i_try in range(1, n_try + 1): @@ -296,10 +318,10 @@ def run(self): schedd.edit(list(already_sdf_copied_job_id_set), 'sdfCopied', '1') except RuntimeError: if i_try < n_try: - self.logger.warning('failed to edit job {0} . Retry: {1}'.format(job_id, i_try)) + self.logger.warning(f'failed to edit job {job_id} . Retry: {i_try}') time.sleep(1) else: - self.logger.warning('failed to edit job {0} . Skipped...'.format(job_id)) + self.logger.warning(f'failed to edit job {job_id} . Skipped...') else: already_handled_job_id_set.update(already_sdf_copied_job_id_set) already_sdf_copied_job_id_set.clear() @@ -307,64 +329,70 @@ def run(self): n_try = 3 for i_try in range(1, n_try + 1): try: - schedd.edit(list(failed_and_to_skip_sdf_copied_job_id_set), 'sdfCopied', '2') + schedd.edit(list(to_skip_sdf_copied_job_id_set), 'sdfCopied', '2') except RuntimeError: if i_try < n_try: - self.logger.warning('failed to edit job {0} . Retry: {1}'.format(job_id, i_try)) + self.logger.warning(f'failed to edit job {job_id} . Retry: {i_try}') time.sleep(1) else: - self.logger.warning('failed to edit job {0} . Skipped...'.format(job_id)) + self.logger.warning(f'failed to edit job {job_id} . Skipped...') else: - already_handled_job_id_set.update(failed_and_to_skip_sdf_copied_job_id_set) - failed_and_to_skip_sdf_copied_job_id_set.clear() + already_handled_job_id_set.update(to_skip_sdf_copied_job_id_set) + to_skip_sdf_copied_job_id_set.clear() break self.logger.info('run ends') time.sleep(self.sleep_period) def via_system(self, job): - retVal = True + """ + copy submit description file when source and destination are on the same host + """ + ret_val = True job_id = get_condor_job_id(job) src_path = job.get('sdfPath') dest_log = job.get('SUBMIT_UserLog') if not dest_log: dest_log = job.get('UserLog') if not dest_log: - self.logger.debug('{0} has no valid SUBMIT_UserLog nor UserLog. Skipped...'.format(job_id)) + self.logger.debug(f'{job_id} has no valid SUBMIT_UserLog nor UserLog. Skipped...') return True dest_dir = os.path.dirname(dest_log) dest_filename = re.sub(r'.log$', '.jdl', os.path.basename(dest_log)) dest_path = os.path.normpath(os.path.join(dest_dir, dest_filename)) if not os.path.isfile(src_path): - retVal = False - self.logger.error('{0} is not a regular file. Skipped...'.format(src_path)) + ret_val = False + self.logger.error(f'{src_path} is not a regular file. Skipped...') if not dest_path: - retVal = False - self.logger.error('no destination path for {0} . Skipped...'.format(src_path)) - if retVal is True: + ret_val = False + self.logger.error(f'no destination path for {src_path} . Skipped...') + if ret_val is True: if os.path.isfile(dest_path): - self.logger.debug('{0} file already exists. Skipped...'.format(dest_path)) + self.logger.debug(f'{dest_path} file already exists. Skipped...') return True try: shutil.copy2(src_path, dest_path) if os.path.isfile(dest_path): os.chmod(dest_path, 0o644) - self.logger.debug('{0} copy made'.format(dest_path)) + self.logger.debug(f'{dest_path} copy made') else: - retVal = None - self.logger.error('{0} made but not found'.format(dest_path)) - except OSError as e: - if e.errno == errno.EEXIST: - self.logger.debug('{0} file already exists. Skipped...'.format(dest_path)) + ret_val = None + self.logger.error(f'{dest_path} made but not found') + except OSError as exc: + if exc.errno == errno.EEXIST: + self.logger.debug(f'{dest_path} file already exists. Skipped...') else: - retVal = None - self.logger.error(e) - except Exception as e: - retVal = None - self.logger.error(e) - return retVal + ret_val = None + self.logger.error(exc) + except Exception as exc: + ret_val = None + self.logger.error(exc) + return ret_val class XJobCleaner(ThreadBase): + """ + agent to clean up jobs in removed status in the queue with forcex + """ requirements_template = ( 'JobStatus =?= 3 ' @@ -379,7 +407,7 @@ def __init__(self, grace_period=86400, **kwarg): self.grace_period = grace_period def run(self): - self.logger.debug('startTimestamp: {0}'.format(self.startTimestamp)) + self.logger.debug(f'startTimestamp: {self.start_timestamp}') while True: self.logger.info('run starts') n_try = 999 @@ -387,21 +415,22 @@ def run(self): try: schedd = MySchedd() break - except RuntimeError as e: + except RuntimeError as exc: if i_try < n_try: - self.logger.warning('{0} . Retry...'.format(e)) + self.logger.warning(f'{exc} . Retry...') time.sleep(3) else: - self.logger.error('{0} . No more retry. Exit'.format(e)) + self.logger.error(f'{exc} . No more retry. Exit') return try: - requirements = self.requirements_template.format(grace_period=int(self.grace_period)) + requirements = self.requirements_template.format( + grace_period=int(self.grace_period)) self.logger.debug('try to remove-x jobs') with global_lock: act_ret = schedd.act(htcondor.JobAction.RemoveX, requirements) - except RuntimeError as e: - self.logger.error('Failed to remove-x jobs. Exit. RuntimeError: {0} '.format(e)) + except RuntimeError as exc: + self.logger.error(f'Failed to remove-x jobs. Exit. RuntimeError: {exc} ') else: - self.logger.debug('act return : {act_ret}'.format(act_ret=str(dict(act_ret)))) + self.logger.debug(f'act return : {str(dict(act_ret))}') self.logger.info('run ends') time.sleep(self.sleep_period) diff --git a/lib/hgcs/hgcs_config.py b/lib/hgcs/hgcs_config.py index 577d8cb..d8f5a59 100644 --- a/lib/hgcs/hgcs_config.py +++ b/lib/hgcs/hgcs_config.py @@ -1,3 +1,7 @@ +""" +configuration parser of HGCS +""" + import os import sys import re @@ -9,63 +13,65 @@ #=============================================================== -# dummy section class -class _SectionClass(object): +class _SectionClass(): + """ + dummy class for config section + """ def __init__(self): pass - -# config class -class ConfigClass(object): +class ConfigClass(): + """ + class for HGCS configurations + """ def __init__(self, config_file=None): # get ConfigParser - tmpConf = configparser.ConfigParser() + tmp_conf = configparser.ConfigParser() # default and env variable for config file path - configPath_specified = os.path.normpath(config_file) - configEnvVar = 'HGCS_CONFIG_PATH' - configPath_default = '/etc/hgcs.cfg' - if configPath_specified: - configPath = configPath_specified - elif configEnvVar in os.environ: - configPath = os.path.normpath(os.environ[configEnvVar]) + config_path_specified = os.path.normpath(config_file) + config_env_var = 'HGCS_CONFIG_PATH' + config_path_default = '/etc/hgcs.cfg' + if config_path_specified: + config_path = config_path_specified + elif config_env_var in os.environ: + config_path = os.path.normpath(os.environ[config_env_var]) else: - configPath = configPath_default + config_path = config_path_default # read try: - tmpConf.read(configPath) - except Exception as e: - print('Failed to read config file from {0}: {1}'.format(configPath, e)) - raise e - return False + tmp_conf.read(config_path) + except Exception as exc: + print(f'Failed to read config file from {config_path}: {exc}') + raise exc # loop over all sections - for tmpSection in tmpConf.sections(): + for tmp_section in tmp_conf.sections(): # read section - tmpDict = tmpConf[tmpSection] + tmp_dict = tmp_conf[tmp_section] # make section class - tmpSelf = _SectionClass() + tmp_self = _SectionClass() # update module dict - setattr(self, tmpSection, tmpSelf) + setattr(self, tmp_section, tmp_self) # expand all values - for tmpKey, tmpVal in tmpDict.items(): + for tmp_key, tmp_val in tmp_dict.items(): # use env vars - if tmpVal.startswith('$'): - tmpMatch = re.search('\$\{*([^\}]+)\}*', tmpVal) - envName = tmpMatch.group(1) - if envName not in os.environ: - raise KeyError('{0} in the cfg is an undefined environment variable.'.format(envName)) - tmpVal = os.environ[envName] + if tmp_val.startswith('$'): + tmp_match = re.search(r'\$\{*([^\}]+)\}*', tmp_val) + env_name = tmp_match.group(1) + if env_name not in os.environ: + raise KeyError(f'{env_name} in config is undefined env variable') + tmp_val = os.environ[env_name] # convert string to bool/int - if tmpVal.lower() == 'true': - tmpVal = True - elif tmpVal.lower() == 'false': - tmpVal = False - elif tmpVal.lower() == 'none': - tmpVal = None - elif re.match('^\d+$', tmpVal): - tmpVal = int(tmpVal) - elif '\n' in tmpVal: - tmpVal = tmpVal.split('\n') + if tmp_val.lower() == 'true': + tmp_val = True + elif tmp_val.lower() == 'false': + tmp_val = False + elif tmp_val.lower() == 'none': + tmp_val = None + elif re.match(r'^\d+$', tmp_val): + tmp_val = int(tmp_val) + elif '\n' in tmp_val: + tmp_val = tmp_val.split('\n') # remove empty - tmpVal = [x.strip() for x in tmpVal if x.strip()] + tmp_val = [x.strip() for x in tmp_val if x.strip()] # update dict - setattr(tmpSelf, tmpKey, tmpVal) + setattr(tmp_self, tmp_key, tmp_val) diff --git a/lib/hgcs/utils.py b/lib/hgcs/utils.py index c6c3539..83ad876 100644 --- a/lib/hgcs/utils.py +++ b/lib/hgcs/utils.py @@ -1,3 +1,7 @@ +""" +common utilities of HGCS +""" + import os import time import logging @@ -16,34 +20,37 @@ #=============================================================== -def setupLogger(logger, pid=None, colored=True, to_file=None): +def setup_logger(logger, pid=None, colored=True, to_file=None): + """ + set up the logger + """ if to_file is not None: hdlr = logging.FileHandler(to_file) colored = False else: hdlr = logging.StreamHandler() - def emit_decorator(fn): + def emit_decorator(orig_func): def func(*args): + _fstr = f'[%(asctime)s %(levelname)s]({pid})(%(name)s.%(funcName)s) %(message)s' + format_str = _fstr if colored: levelno = args[0].levelno - if(levelno >= logging.CRITICAL): + if levelno >= logging.CRITICAL: color = '\033[35;1m' - elif(levelno >= logging.ERROR): + elif levelno >= logging.ERROR: color = '\033[31;1m' - elif(levelno >= logging.WARNING): + elif levelno >= logging.WARNING: color = '\033[33;1m' - elif(levelno >= logging.INFO): + elif levelno >= logging.INFO: color = '\033[32;1m' - elif(levelno >= logging.DEBUG): + elif levelno >= logging.DEBUG: color = '\033[36;1m' else: color = '\033[0m' - # formatter = logging.Formatter('{0}%(asctime)s %(levelname)s in %(filename)s:%(funcName)s:%(lineno)d [%(message)s]\033[0m'.format(color)) - formatter = logging.Formatter('{0}[%(asctime)s %(levelname)s]({1})(%(name)s.%(funcName)s) %(message)s\033[0m'.format(color, pid)) - else: - formatter = logging.Formatter('%(asctime)s %(levelname)s]({0})(%(name)s.%(funcName)s) %(message)s'.format(pid)) + format_str = f'{color}{_fstr}\033[0m' + formatter = logging.Formatter(format_str) hdlr.setFormatter(formatter) - return fn(*args) + return orig_func(*args) return func hdlr.emit = emit_decorator(hdlr.emit) logger.addHandler(hdlr) @@ -51,20 +58,38 @@ def func(*args): #=============================================================== class ThreadBase(threading.Thread): + """ + base class of thread to run HGCS agents + """ + def __init__(self, sleep_period=60, **kwarg): threading.Thread.__init__(self) self.os_pid = os.getpid() self.logger = logging.getLogger(self.__class__.__name__) self.sleep_period = sleep_period - self.startTimestamp = time.time() + self.start_timestamp = time.time() @property def get_pid(self): - return '{0}-{1}'.format(self.os_pid, get_ident()) + """ + get unique thread identifier including process ID (from OS) and thread ID (from python) + """ + return f'{self.os_pid}-{get_ident()}' + + def run(self): + """ + run the agent + """ + pass class MySchedd(htcondor.Schedd): + """ + Schedd class in singleton + """ + __instance = None + def __new__(cls, *args, **kwargs): if not isinstance(cls.__instance, cls): cls.__instance = super(MySchedd, cls).__new__(cls, *args, **kwargs) diff --git a/pkg_info.py b/pkg_info.py index bc1219b..7d3221b 100644 --- a/pkg_info.py +++ b/pkg_info.py @@ -1 +1 @@ -release_version = '0.1' +release_version = '1.0.0' diff --git a/setup.py b/setup.py index 7546e9d..b2bf218 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ packages=find_packages(where='lib'), package_dir = {'': 'lib'}, install_requires=[ - 'htcondor >= 8.9.0', + 'htcondor >= 9.6.0', ], # optional pip dependencies @@ -41,6 +41,6 @@ ), ], - scripts=['bin/hgcs-master.py', + scripts=['bin/hgcs_master.py', ] ) diff --git a/temp/hgcs.service.template b/temp/hgcs.service.template index 462ff2d..c822c81 100644 --- a/temp/hgcs.service.template +++ b/temp/hgcs.service.template @@ -9,7 +9,7 @@ User=atlpan Group=zp LimitSTACK=1073741824 Restart=on-abnormal -ExecStart=/opt/HGCS/bin/python /opt/HGCS/bin/hgcs-master.py -c /opt/hgcs.cfg +ExecStart=/opt/HGCS/bin/python /opt/HGCS/bin/hgcs_master.py -c /opt/hgcs.cfg [Install] WantedBy=multi-user.target diff --git a/temp/single_script.py b/temp/single_script.py index 666717e..c0ce748 100644 --- a/temp/single_script.py +++ b/temp/single_script.py @@ -27,15 +27,15 @@ def emit_decorator(fn): def func(*args): if colored: levelno = args[0].levelno - if(levelno >= logging.CRITICAL): + if (levelno >= logging.CRITICAL): color = '\033[35;1m' - elif(levelno >= logging.ERROR): + elif (levelno >= logging.ERROR): color = '\033[31;1m' - elif(levelno >= logging.WARNING): + elif (levelno >= logging.WARNING): color = '\033[33;1m' - elif(levelno >= logging.INFO): + elif (levelno >= logging.INFO): color = '\033[32;1m' - elif(levelno >= logging.DEBUG): + elif (levelno >= logging.DEBUG): color = '\033[36;1m' else: color = '\033[0m' @@ -51,9 +51,9 @@ def func(*args): def get_condor_job_id(job): - ClusterId = job.get('ClusterId') - ProcId = job.get('ProcId') - return '{0}.{1}'.format(ClusterId, ProcId) + cluster_id = job.get('ClusterId') + proc_id = job.get('ProcId') + return '{0}.{1}'.format(cluster_id, proc_id) #=============================================================== @@ -63,7 +63,7 @@ def __init__(self): self.os_pid = os.getpid() self.logger = logging.getLogger(self.__class__.__name__) setupLogger(self.logger, pid=self.get_pid, colored=False) - self.startTimestamp = time.time() + self.start_timestamp = time.time() @property def get_pid(self): @@ -100,7 +100,7 @@ def __init__(self, retrieve_mode='copy', sleep_period=60, flush_period=86400): self.flush_period = flush_period def run(self): - self.logger.debug('startTimestamp: {0}'.format(self.startTimestamp)) + self.logger.debug('startTimestamp: {0}'.format(self.start_timestamp)) already_handled_job_id_set = set() last_flush_timestamp = time.time() while True: @@ -237,7 +237,7 @@ def __init__(self, sleep_period=60, delay_time=7200): self.delay_time = delay_time def run(self): - self.logger.debug('startTimestamp: {0}'.format(self.startTimestamp)) + self.logger.debug('startTimestamp: {0}'.format(self.start_timestamp)) while True: self.logger.info('run starts') n_try = 999 @@ -296,7 +296,7 @@ def __init__(self, sleep_period=60, flush_period=86400): self.flush_period = flush_period def run(self): - self.logger.debug('startTimestamp: {0}'.format(self.startTimestamp)) + self.logger.debug('startTimestamp: {0}'.format(self.start_timestamp)) already_handled_job_id_set = set() last_flush_timestamp = time.time() while True: