diff --git a/PILOTVERSION b/PILOTVERSION index 4b3244f5..e88f7664 100644 --- a/PILOTVERSION +++ b/PILOTVERSION @@ -1 +1 @@ -3.8.1.66 \ No newline at end of file +3.8.2.8 \ No newline at end of file diff --git a/pilot/api/data.py b/pilot/api/data.py index 76be4e73..93cc76f1 100644 --- a/pilot/api/data.py +++ b/pilot/api/data.py @@ -131,7 +131,7 @@ def __init__(self, self.trace_report = trace_report if trace_report else TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), ipv=self.ipv, workdir=self.workdir) if not self.acopytools: - msg = f'failed to initilize StagingClient: no acopytools options found, acopytools={self.acopytools}' + msg = f'failed to initialize StagingClient: no acopytools options found, acopytools={self.acopytools}' logger.error(msg) self.trace_report.update(clientState='BAD_COPYTOOL', stateReason=msg) self.trace_report.send() diff --git a/pilot/control/data.py b/pilot/control/data.py index 506ce68a..28db36a3 100644 --- a/pilot/control/data.py +++ b/pilot/control/data.py @@ -285,11 +285,13 @@ def _stage_in(args: object, job: JobData) -> bool: logger.info('stage-in will not be done in a container') client, activity = get_stagein_client(job, args, label) + logger.info(f'activity={activity}') use_pcache = job.infosys.queuedata.use_pcache - + logger.debug(f'use_pcache={use_pcache}') # get the proper input file destination (normally job.workdir unless stager workflow) jobworkdir = job.workdir # there is a distinction for mv copy tool on ND vs non-ATLAS workdir = get_proper_input_destination(job.workdir, args.input_destination_dir) + logger.debug(f'workdir={workdir}') kwargs = {'workdir': workdir, 'cwd': job.workdir, 'usecontainer': False, @@ -301,7 +303,9 @@ def _stage_in(args: object, job: JobData) -> bool: 'rucio_host': args.rucio_host, 'jobworkdir': jobworkdir, 'args': args} + logger.debug(f'kwargs={kwargs}') client.prepare_sources(job.indata) + logger.info('prepared sources - will now transfer files') client.transfer(job.indata, activity=activity, **kwargs) except PilotException as error: error_msg = traceback.format_exc() diff --git a/pilot/control/payload.py b/pilot/control/payload.py index 25adce15..7d091073 100644 --- a/pilot/control/payload.py +++ b/pilot/control/payload.py @@ -376,13 +376,21 @@ def extract_error_info(error: str) -> (int, str): return error_code, diagnostics -def get_rtlogging() -> str: +def get_rtlogging(catchall: str) -> str: """ - Return the proper rtlogging value from the experiment specific plug-in or the config file. + Return the proper rtlogging value from PQ.catchall, the experiment specific plug-in or the config file. + :param catchall: catchall field from queuedata (str) :return: rtlogging (str). """ + if catchall: + _rtlogging = findall(r'logging=([^,]+)', catchall) + if _rtlogging and ";" in _rtlogging[0]: + logger.info(f"found rtlogging in catchall: {_rtlogging[0]}") + return _rtlogging[0] + rtlogging = None + pilot_user = os.environ.get('PILOT_USER', 'generic').lower() try: user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0) @@ -419,7 +427,13 @@ def get_logging_info(job: JobData, args: object) -> dict: info_dic['logname'] = args.realtime_logname if args.realtime_logname else "pilot-log" logserver = args.realtime_logging_server if args.realtime_logging_server else "" - info = findall(r'(\S+)\;(\S+)\:\/\/(\S+)\:(\d+)', get_rtlogging()) + try: + catchall = job.infosys.queuedata.catchall + except Exception as exc: + logger.warning(f'exception caught: {exc}') + catchall = "" + + info = findall(r'(\S+)\;(\S+)\:\/\/(\S+)\:(\d+)', get_rtlogging(catchall)) if not logserver and not info: logger.warning(f"not enough info available for activating real-time logging (info='{info}', logserver='{logserver}')") return {} diff --git a/pilot/user/atlas/utilities.py b/pilot/user/atlas/utilities.py index c73df92a..27d91384 100644 --- a/pilot/user/atlas/utilities.py +++ b/pilot/user/atlas/utilities.py @@ -41,7 +41,10 @@ ) from pilot.util.parameters import convert_to_int from pilot.util.processes import is_process_running -from pilot.util.psutils import get_command_by_pid +from pilot.util.psutils import ( + get_command_by_pid, + find_process_by_jobid +) from .setup import get_asetup @@ -158,11 +161,9 @@ def get_proper_pid(pid: int, jobid: str, use_container: bool = True) -> int: if not is_process_running(pid): return -1 - ps = get_ps_info() - - # lookup the process id using ps aux + # lookup the process id using ps command or psutils logger.debug(f'attempting to identify pid from job id ({jobid})') - _pid = get_pid_for_jobid(ps, jobid) + _pid = get_pid_for_jobid(jobid) if _pid: logger.debug(f'discovered pid={_pid} for job id {jobid}') cmd = get_command_by_pid(_pid) @@ -188,6 +189,8 @@ def get_ps_info(whoami: str = None, options: str = 'axfo pid,user,args') -> str: """ Return ps info for the given user. + Note: this is a fallback solution in case the pid cannot be found in the psutils lookup. + :param whoami: username (str) :param options: ps options (str) :return: ps aux for given user (str). @@ -200,16 +203,19 @@ def get_ps_info(whoami: str = None, options: str = 'axfo pid,user,args') -> str: return stdout -def get_pid_for_jobid(ps: str, jobid: str) -> int or None: +def get_pid_for_jobid(jobid: str) -> int or None: """ Return the process id for the ps entry that contains the job id. - :param ps: ps command output (str) :param jobid: PanDA job id (str). :return: pid (int) or None if no such process (int or None). """ - pid = None + pid = find_process_by_jobid(jobid) + if pid: + return pid + # fallback to ps command + ps = get_ps_info() for line in ps.split('\n'): if jobid in line and 'xrootd' not in line: # extract pid diff --git a/pilot/util/constants.py b/pilot/util/constants.py index 0a4003b3..e497600c 100644 --- a/pilot/util/constants.py +++ b/pilot/util/constants.py @@ -27,8 +27,8 @@ # Pilot version RELEASE = '3' # released number should be fixed at 3 for Pilot 3 VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates -REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates -BUILD = '66' # build number should be reset to '1' for every new development cycle +REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates +BUILD = '8' # build number should be reset to '1' for every new development cycle SUCCESS = 0 FAILURE = 1 diff --git a/pilot/util/https.py b/pilot/util/https.py index 9a2da0d2..a26c75ce 100644 --- a/pilot/util/https.py +++ b/pilot/util/https.py @@ -646,7 +646,7 @@ def get_panda_server(url: str, port: int, update_server: bool = True) -> str: if default in pandaserver: try: rnd = random.choice([socket.getfqdn(vv) for vv in set([v[-1][0] for v in socket.getaddrinfo(default, 25443, socket.AF_INET)])]) - except socket.herror as exc: + except (socket.herror, socket.gaierror) as exc: logger.warning(f'failed to get address from socket: {exc} - will use default server ({pandaserver})') else: pandaserver = pandaserver.replace(default, rnd) diff --git a/pilot/util/networking.py b/pilot/util/networking.py index 5e03368d..d081b784 100644 --- a/pilot/util/networking.py +++ b/pilot/util/networking.py @@ -35,7 +35,7 @@ def dump_ipv6_info() -> None: """Dump the IPv6 info to the log.""" cmd = 'ifconfig' if not is_command_available(cmd): - _cmd = '/usr/sbin/ifconfig' + _cmd = '/usr/sbin/ifconfig -a' if not is_command_available(_cmd): logger.warning(f'command {cmd} is not available - this WN might not support IPv6') return @@ -43,15 +43,35 @@ def dump_ipv6_info() -> None: _, stdout, stderr = execute(cmd, timeout=10) if stdout: - ipv6 = extract_ipv6(stdout) + ipv6 = extract_ipv6_addresses(stdout) if ipv6: logger.info(f'IPv6 addresses: {ipv6}') else: - logger.warning('no IPv6 addresses found - this WN does not support IPv6') + logger.warning('no IPv6 addresses were found') else: logger.warning(f'failed to run ifconfig: {stderr}') +def extract_ipv6_addresses(ifconfig_output: str) -> list: + """Extracts IPv6 addresses from ifconfig output. + + Args: + ifconfig_output: The output of the ifconfig command. + + Returns: + A list of IPv6 addresses. + """ + + ipv6_addresses = [] + for line in ifconfig_output.splitlines(): + line = line.strip().replace("\t", " ").replace("\r", "").replace("\n", "") + match = re.search(r"inet6 (.*?)\s", line) + if match and match.group(1) != "::1": # skip loopback address + ipv6_addresses.append(match.group(1)) + + return ipv6_addresses + + def extract_ipv6(ifconfig: str) -> str: """ Extract the IPv6 address from the ifconfig output. diff --git a/pilot/util/psutils.py b/pilot/util/psutils.py index f9b606e2..eb70f263 100644 --- a/pilot/util/psutils.py +++ b/pilot/util/psutils.py @@ -267,3 +267,27 @@ def get_command_by_pid(pid: int) -> str or None: except psutil.NoSuchProcess: logger.warning(f"process with PID {pid} not found") return None + + +def find_process_by_jobid(jobid: int) -> int or None: + """ + Find the process ID of a process whose command arguments contain the given job ID. + + :param jobid: the job ID to search for (int) + :return: the process ID of the matching process, or None if no match is found (int or None). + """ + if not _is_psutil_available: + logger.warning('find_process_by_jobid(): psutil not available - aborting') + return None + + for proc in psutil.process_iter(): + try: + cmd_line = proc.cmdline() + except psutil.NoSuchProcess: + continue + + for arg in cmd_line: + if str(jobid) in arg and 'xrootd' not in arg: + return proc.pid + + return None diff --git a/pilot/util/realtimelogger.py b/pilot/util/realtimelogger.py index d06984fd..df085a29 100644 --- a/pilot/util/realtimelogger.py +++ b/pilot/util/realtimelogger.py @@ -121,7 +121,7 @@ def __init__(self, args: Any, info_dic: dict, workdir: str, secrets: str, level: if workdir: # bypass pylint warning - keep workdir for possible future development pass if not info_dic: - logger.warning('info dictionary not set - add \'logging=type:protocol://host:port\' to PQ.catchall)') + logger.warning('info dictionary not set - add \'logging=type;protocol://host:port\' to PQ.catchall)') RealTimeLogger.glogger = None return