Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.8.2.8 #144

Merged
merged 10 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion PILOTVERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.8.1.66
3.8.2.8
2 changes: 1 addition & 1 deletion pilot/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def __init__(self,
self.trace_report = trace_report if trace_report else TraceReport(pq=os.environ.get('PILOT_SITENAME', ''), ipv=self.ipv, workdir=self.workdir)

if not self.acopytools:
msg = f'failed to initilize StagingClient: no acopytools options found, acopytools={self.acopytools}'
msg = f'failed to initialize StagingClient: no acopytools options found, acopytools={self.acopytools}'
logger.error(msg)
self.trace_report.update(clientState='BAD_COPYTOOL', stateReason=msg)
self.trace_report.send()
Expand Down
6 changes: 5 additions & 1 deletion pilot/control/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,13 @@ def _stage_in(args: object, job: JobData) -> bool:
logger.info('stage-in will not be done in a container')

client, activity = get_stagein_client(job, args, label)
logger.info(f'activity={activity}')
use_pcache = job.infosys.queuedata.use_pcache

logger.debug(f'use_pcache={use_pcache}')
# get the proper input file destination (normally job.workdir unless stager workflow)
jobworkdir = job.workdir # there is a distinction for mv copy tool on ND vs non-ATLAS
workdir = get_proper_input_destination(job.workdir, args.input_destination_dir)
logger.debug(f'workdir={workdir}')
kwargs = {'workdir': workdir,
'cwd': job.workdir,
'usecontainer': False,
Expand All @@ -301,7 +303,9 @@ def _stage_in(args: object, job: JobData) -> bool:
'rucio_host': args.rucio_host,
'jobworkdir': jobworkdir,
'args': args}
logger.debug(f'kwargs={kwargs}')
client.prepare_sources(job.indata)
logger.info('prepared sources - will now transfer files')
client.transfer(job.indata, activity=activity, **kwargs)
except PilotException as error:
error_msg = traceback.format_exc()
Expand Down
20 changes: 17 additions & 3 deletions pilot/control/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,21 @@ def extract_error_info(error: str) -> (int, str):
return error_code, diagnostics


def get_rtlogging() -> str:
def get_rtlogging(catchall: str) -> str:
"""
Return the proper rtlogging value from the experiment specific plug-in or the config file.
Return the proper rtlogging value from PQ.catchall, the experiment specific plug-in or the config file.

:param catchall: catchall field from queuedata (str)
:return: rtlogging (str).
"""
if catchall:
_rtlogging = findall(r'logging=([^,]+)', catchall)
if _rtlogging and ";" in _rtlogging[0]:
logger.info(f"found rtlogging in catchall: {_rtlogging[0]}")
return _rtlogging[0]

rtlogging = None

pilot_user = os.environ.get('PILOT_USER', 'generic').lower()
try:
user = __import__(f'pilot.user.{pilot_user}.common', globals(), locals(), [pilot_user], 0)
Expand Down Expand Up @@ -419,7 +427,13 @@ def get_logging_info(job: JobData, args: object) -> dict:
info_dic['logname'] = args.realtime_logname if args.realtime_logname else "pilot-log"
logserver = args.realtime_logging_server if args.realtime_logging_server else ""

info = findall(r'(\S+)\;(\S+)\:\/\/(\S+)\:(\d+)', get_rtlogging())
try:
catchall = job.infosys.queuedata.catchall
except Exception as exc:
logger.warning(f'exception caught: {exc}')
catchall = ""

info = findall(r'(\S+)\;(\S+)\:\/\/(\S+)\:(\d+)', get_rtlogging(catchall))
if not logserver and not info:
logger.warning(f"not enough info available for activating real-time logging (info='{info}', logserver='{logserver}')")
return {}
Expand Down
22 changes: 14 additions & 8 deletions pilot/user/atlas/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
)
from pilot.util.parameters import convert_to_int
from pilot.util.processes import is_process_running
from pilot.util.psutils import get_command_by_pid
from pilot.util.psutils import (
get_command_by_pid,
find_process_by_jobid
)

from .setup import get_asetup

Expand Down Expand Up @@ -158,11 +161,9 @@ def get_proper_pid(pid: int, jobid: str, use_container: bool = True) -> int:
if not is_process_running(pid):
return -1

ps = get_ps_info()

# lookup the process id using ps aux
# lookup the process id using ps command or psutils
logger.debug(f'attempting to identify pid from job id ({jobid})')
_pid = get_pid_for_jobid(ps, jobid)
_pid = get_pid_for_jobid(jobid)
if _pid:
logger.debug(f'discovered pid={_pid} for job id {jobid}')
cmd = get_command_by_pid(_pid)
Expand All @@ -188,6 +189,8 @@ def get_ps_info(whoami: str = None, options: str = 'axfo pid,user,args') -> str:
"""
Return ps info for the given user.

Note: this is a fallback solution in case the pid cannot be found in the psutils lookup.

:param whoami: username (str)
:param options: ps options (str)
:return: ps aux for given user (str).
Expand All @@ -200,16 +203,19 @@ def get_ps_info(whoami: str = None, options: str = 'axfo pid,user,args') -> str:
return stdout


def get_pid_for_jobid(ps: str, jobid: str) -> int or None:
def get_pid_for_jobid(jobid: str) -> int or None:
"""
Return the process id for the ps entry that contains the job id.

:param ps: ps command output (str)
:param jobid: PanDA job id (str).
:return: pid (int) or None if no such process (int or None).
"""
pid = None
pid = find_process_by_jobid(jobid)
if pid:
return pid

# fallback to ps command
ps = get_ps_info()
for line in ps.split('\n'):
if jobid in line and 'xrootd' not in line:
# extract pid
Expand Down
4 changes: 2 additions & 2 deletions pilot/util/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
# Pilot version
RELEASE = '3' # released number should be fixed at 3 for Pilot 3
VERSION = '8' # version number is '1' for first release, '0' until then, increased for bigger updates
REVISION = '1' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '66' # build number should be reset to '1' for every new development cycle
REVISION = '2' # revision number should be reset to '0' for every new version release, increased for small updates
BUILD = '8' # build number should be reset to '1' for every new development cycle

SUCCESS = 0
FAILURE = 1
Expand Down
2 changes: 1 addition & 1 deletion pilot/util/https.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ def get_panda_server(url: str, port: int, update_server: bool = True) -> str:
if default in pandaserver:
try:
rnd = random.choice([socket.getfqdn(vv) for vv in set([v[-1][0] for v in socket.getaddrinfo(default, 25443, socket.AF_INET)])])
except socket.herror as exc:
except (socket.herror, socket.gaierror) as exc:
logger.warning(f'failed to get address from socket: {exc} - will use default server ({pandaserver})')
else:
pandaserver = pandaserver.replace(default, rnd)
Expand Down
26 changes: 23 additions & 3 deletions pilot/util/networking.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,43 @@ def dump_ipv6_info() -> None:
"""Dump the IPv6 info to the log."""
cmd = 'ifconfig'
if not is_command_available(cmd):
_cmd = '/usr/sbin/ifconfig'
_cmd = '/usr/sbin/ifconfig -a'
if not is_command_available(_cmd):
logger.warning(f'command {cmd} is not available - this WN might not support IPv6')
return
cmd = _cmd

_, stdout, stderr = execute(cmd, timeout=10)
if stdout:
ipv6 = extract_ipv6(stdout)
ipv6 = extract_ipv6_addresses(stdout)
if ipv6:
logger.info(f'IPv6 addresses: {ipv6}')
else:
logger.warning('no IPv6 addresses found - this WN does not support IPv6')
logger.warning('no IPv6 addresses were found')
else:
logger.warning(f'failed to run ifconfig: {stderr}')


def extract_ipv6_addresses(ifconfig_output: str) -> list:
"""Extracts IPv6 addresses from ifconfig output.

Args:
ifconfig_output: The output of the ifconfig command.

Returns:
A list of IPv6 addresses.
"""

ipv6_addresses = []
for line in ifconfig_output.splitlines():
line = line.strip().replace("\t", " ").replace("\r", "").replace("\n", "")
match = re.search(r"inet6 (.*?)\s", line)
if match and match.group(1) != "::1": # skip loopback address
ipv6_addresses.append(match.group(1))

return ipv6_addresses


def extract_ipv6(ifconfig: str) -> str:
"""
Extract the IPv6 address from the ifconfig output.
Expand Down
24 changes: 24 additions & 0 deletions pilot/util/psutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,27 @@ def get_command_by_pid(pid: int) -> str or None:
except psutil.NoSuchProcess:
logger.warning(f"process with PID {pid} not found")
return None


def find_process_by_jobid(jobid: int) -> int or None:
"""
Find the process ID of a process whose command arguments contain the given job ID.

:param jobid: the job ID to search for (int)
:return: the process ID of the matching process, or None if no match is found (int or None).
"""
if not _is_psutil_available:
logger.warning('find_process_by_jobid(): psutil not available - aborting')
return None

for proc in psutil.process_iter():
try:
cmd_line = proc.cmdline()
except psutil.NoSuchProcess:
continue

for arg in cmd_line:
if str(jobid) in arg and 'xrootd' not in arg:
return proc.pid

return None
2 changes: 1 addition & 1 deletion pilot/util/realtimelogger.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def __init__(self, args: Any, info_dic: dict, workdir: str, secrets: str, level:
if workdir: # bypass pylint warning - keep workdir for possible future development
pass
if not info_dic:
logger.warning('info dictionary not set - add \'logging=type:protocol://host:port\' to PQ.catchall)')
logger.warning('info dictionary not set - add \'logging=type;protocol://host:port\' to PQ.catchall)')
RealTimeLogger.glogger = None
return

Expand Down
Loading