Skip to content

Commit

Permalink
ForkProcess: Implement fd_pipes via send_handle
Browse files Browse the repository at this point in the history
Bug: https://bugs.gentoo.org/915896
Signed-off-by: Zac Medico <[email protected]>
  • Loading branch information
zmedico committed Oct 20, 2023
1 parent 7ac176d commit 4ac6b4a
Showing 1 changed file with 92 additions and 23 deletions.
115 changes: 92 additions & 23 deletions lib/portage/util/_async/ForkProcess.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import portage
from portage import os
from portage.cache.mappings import slot_dict_class
from portage.util.futures import asyncio
from _emerge.SpawnProcess import SpawnProcess

Expand All @@ -26,29 +27,31 @@ class ForkProcess(SpawnProcess):
"_proc_join_task",
)

_file_names = ("connection", "slave_fd")
_files_dict = slot_dict_class(_file_names, prefix="")

# Number of seconds between poll attempts for process exit status
# (after the sentinel has become ready).
_proc_join_interval = 0.1

def _start(self):
if self.fd_pipes or self.logfile:
if self.fd_pipes:
if multiprocessing.get_start_method() != "fork":
raise NotImplementedError(
'fd_pipes only supported with multiprocessing start method "fork"'
)
super()._start()
return
_HAVE_SEND_HANDLE = getattr(multiprocessing.reduction, "HAVE_SEND_HANDLE", False)

if self.logfile:
if multiprocessing.get_start_method() == "fork":
# Use superclass pty support.
super()._start()
return
def _start(self):
if multiprocessing.get_start_method() == "fork":
# backward compatibility mode
super()._start()
return

if self.fd_pipes and not self._HAVE_SEND_HANDLE:
raise NotImplementedError(
'fd_pipes only supported with HAVE_SEND_HANDLE or multiprocessing start method "fork"'
)

# Log via multiprocessing.Pipe if necessary.
pr, pw = multiprocessing.Pipe(duplex=False)
self._child_connection = pw
if self.fd_pipes or self.logfile:
# Log via multiprocessing.Pipe if necessary.
connection, self._child_connection = multiprocessing.Pipe(
duplex=self._HAVE_SEND_HANDLE
)

retval = self._spawn(self.args, fd_pipes=self.fd_pipes)

Expand All @@ -59,11 +62,51 @@ def _start(self):
self._async_waitpid()
else:
self._child_connection.close()
self.fd_pipes = self.fd_pipes or {}
stdout_fd = None
if not self.background:
stdout_fd = os.dup(sys.__stdout__.fileno())
self.fd_pipes.setdefault(0, portage._get_stdin().fileno())
self.fd_pipes.setdefault(1, sys.__stdout__.fileno())
self.fd_pipes.setdefault(2, sys.__stderr__.fileno())
stdout_fd = os.dup(self.fd_pipes[1])

if self._HAVE_SEND_HANDLE:
master_fd, slave_fd = self._pipe(self.fd_pipes)
self.fd_pipes[1] = slave_fd
self.fd_pipes[2] = slave_fd
self._files = self._files_dict(connection=connection, slave_fd=slave_fd)
else:
master_fd = connection

self._start_main_task(
master_fd, log_file_path=self.logfile, stdout_fd=stdout_fd
)

self._start_main_task(pr, log_file_path=self.logfile, stdout_fd=stdout_fd)
async def _main(self, build_logger, pipe_logger, loop=None):
try:
if self._HAVE_SEND_HANDLE:
fd_list = list(set(self.fd_pipes.values()))
await self.scheduler.run_in_executor(
None,
self._files.connection.send,
(self.fd_pipes, fd_list),
)
for fd in fd_list:
await self.scheduler.run_in_executor(
None,
multiprocessing.reduction.send_handle,
self._files.connection,
fd,
self.pid,
)
finally:
if self._files is not None:
self._files.connection.close()
del self._files.connection
os.close(self._files.slave_fd)
del self._files.slave_fd
if not self.scheduler.is_closed():
await super()._main(build_logger, pipe_logger, loop=loop)

def _spawn(self, args, fd_pipes=None, **kwargs):
"""
Expand Down Expand Up @@ -102,6 +145,11 @@ def _spawn(self, args, fd_pipes=None, **kwargs):
stdin_dup, fcntl.F_SETFD, fcntl.fcntl(stdin_fd, fcntl.F_GETFD)
)
fd_pipes[0] = stdin_dup

if multiprocessing.get_start_method() != "fork":
# Handle fd_pipes in _main instead.
fd_pipes = None

self._proc = multiprocessing.Process(
target=self._bootstrap,
args=(self._child_connection, fd_pipes, target, args, kwargs),
Expand Down Expand Up @@ -205,10 +253,31 @@ def _bootstrap(child_connection, fd_pipes, target, args, kwargs):
portage.locks._close_fds()

if child_connection is not None:
fd_pipes = fd_pipes or {}
fd_pipes[sys.stdout.fileno()] = child_connection.fileno()
fd_pipes[sys.stderr.fileno()] = child_connection.fileno()
fd_pipes[child_connection.fileno()] = child_connection.fileno()
if ForkProcess._HAVE_SEND_HANDLE:
fd_pipes, fd_list = child_connection.recv()
fd_pipes_map = {}
for fd in fd_list:
fd_pipes_map[fd] = multiprocessing.reduction.recv_handle(
child_connection
)
child_connection.close()
for k, v in list(fd_pipes.items()):
fd_pipes[k] = fd_pipes_map[v]
keys_map = {
0: portage._get_stdin().fileno(),
1: sys.stdout.fileno(),
2: sys.stderr.fileno(),
}
for k, v in list(fd_pipes.items()):
if keys_map.get(k, k) != k:
fd_pipes[keys_map[k]] = v
del fd_pipes[k]

else:
fd_pipes = fd_pipes or {}
fd_pipes[sys.stdout.fileno()] = child_connection.fileno()
fd_pipes[sys.stderr.fileno()] = child_connection.fileno()
fd_pipes[child_connection.fileno()] = child_connection.fileno()

if fd_pipes:
# We don't exec, so use close_fds=False
Expand Down

0 comments on commit 4ac6b4a

Please sign in to comment.