Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid triggering the DeprecationWarning on os.fork #429

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1d53ebf
Non-regression test for the deprecation warning on os.fork
ogrisel Feb 27, 2025
e62fc4b
Extend multiprocessing.resource_tracker instead of implementing from …
ogrisel Feb 28, 2025
8786ac6
Skip test_no_deprecation_warning_is_raised_on_fork on older Python ve…
ogrisel Feb 28, 2025
9da7a43
Restore windows support for resource tracker
ogrisel Feb 28, 2025
de4d2ea
Restore windows support for resource tracker (part 2)
ogrisel Feb 28, 2025
5134e90
Merge branch 'master' into fix-warning-on-fork
ogrisel Feb 28, 2025
1eea3d0
Bump Python version on github actions
ogrisel Feb 28, 2025
8b9c7e2
Add setuptools to the test env, required by install_coverage_subproce…
ogrisel Feb 28, 2025
4533573
FIX use _posixsubprocess.fork_exec instead of os.fork + os.execve
ogrisel Feb 28, 2025
1dab63a
Trying to debug remaingin keep_fds problems
ogrisel Feb 28, 2025
776dcd8
Explain why we do not close stdin
ogrisel Feb 28, 2025
3af5e03
Ignore /dev/null
ogrisel Feb 28, 2025
8fb36f4
Merge branch 'master' into fix-warning-on-fork
ogrisel Feb 28, 2025
998e8a3
cosmetics
ogrisel Feb 28, 2025
01756dc
Typo in comment
ogrisel Feb 28, 2025
6342be8
Maybe the default way to clean the fds is actually fine in the end
ogrisel Feb 28, 2025
274ddfe
Fix older Python versions
ogrisel Feb 28, 2025
4f5cfd8
Fix in Python version specific args
ogrisel Feb 28, 2025
6f4ea81
Cosmit
ogrisel Feb 28, 2025
296759e
Do not rely on setuptools/distutils in tests
ogrisel Feb 28, 2025
c82b475
Officially support Python 3.13
ogrisel Feb 28, 2025
0f41687
Non regression test for #420
ogrisel Feb 28, 2025
449eab6
Grammar fix in CHANGES.md
ogrisel Mar 3, 2025
3cd336e
Merge branch 'master' into fix-warning-on-fork
ogrisel Mar 3, 2025
274c5a2
Trigger CI
ogrisel Mar 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,24 @@ jobs:
matrix:
include:

- name: windows-py311
- name: windows-py313
os: windows-latest
PYTHON_VERSION: "3.11"
PYTHON_VERSION: "3.13"
- name: windows-py39
os: windows-latest
PYTHON_VERSION: "3.9"
LOKY_TEST_NO_CIM: "true"

- name: macos-py311
- name: macos-py313
os: macos-latest
PYTHON_VERSION: "3.11"
PYTHON_VERSION: "3.13"
- name: macos-py39
os: macos-latest
PYTHON_VERSION: "3.9"

- name: linux-py311
- name: linux-py313
os: ubuntu-latest
PYTHON_VERSION: "3.11"
PYTHON_VERSION: "3.13"
LOKY_TEST_NO_LSCPU: "true"
- name: linux-py39-joblib-tests
os: ubuntu-latest
Expand Down
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
### 3.5.0 - in development

- Avoid raising `DeprecationWarning` related to `os.fork` when running in a
natively multi-threaded process. (#435).

- Fix a crash when calling commands that access `stdin` via `subprocess.run` in
worker processes on POSIX systems. (#435).

- Fix detection of the number of physical cores in
`cpu_count(only_physical_cores=True)` on some Linux systems and recent
Expand Down
4 changes: 2 additions & 2 deletions continuous_integration/install_coverage_subprocess_pth.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
# http://coverage.readthedocs.io/en/latest/subprocess.html

import os.path as op
from distutils.sysconfig import get_python_lib
from sysconfig import get_path

FILE_CONTENT = """\
import coverage; coverage.process_startup()
"""

filename = op.join(get_python_lib(), "coverage_subprocess.pth")
filename = op.join(get_path("platlib"), "coverage_subprocess.pth")
with open(filename, mode="w") as f:
f.write(FILE_CONTENT)

Expand Down
83 changes: 54 additions & 29 deletions loky/backend/fork_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,65 @@
#
# author: Thomas Moreau and Olivier Grisel
#
import os
import sys
import os
import subprocess


def close_fds(keep_fds): # pragma: no cover
"""Close all the file descriptors except those in keep_fds."""

# Make sure to keep stdout and stderr open for logging purpose
keep_fds = {*keep_fds, 1, 2}

# We try to retrieve all the open fds
try:
open_fds = {int(fd) for fd in os.listdir("/proc/self/fd")}
except FileNotFoundError:
import resource

max_nfds = resource.getrlimit(resource.RLIMIT_NOFILE)[0]
open_fds = {*range(max_nfds)}

for i in open_fds - keep_fds:
try:
os.close(i)
except OSError:
pass
def fork_exec(cmd, keep_fds, env=None):
import _posixsubprocess

# Encoded command args as bytes:
cmd = [os.fsencode(arg) for arg in cmd]

def fork_exec(cmd, keep_fds, env=None):
# copy the environment variables to set in the child process
# Copy the environment variables to set in the child process (also encoded
# as bytes).
env = env or {}
child_env = {**os.environ, **env}
env = {**os.environ, **env}
encoded_env = []
for key, value in env.items():
encoded_env.append(f"{key}={value}".encode("utf-8"))

pid = os.fork()
if pid == 0: # pragma: no cover
close_fds(keep_fds)
os.execve(sys.executable, cmd, child_env)
# Fds with fileno larger than 3 (stdin=0, stdout=1, stderr=2) are be closed
# in the child process, except for those passed in keep_fds.
keep_fds = tuple(sorted(map(int, keep_fds)))
errpipe_read, errpipe_write = os.pipe()

# VFORK is not supported on older Python versions.
if hasattr(subprocess, "_USE_VFORK"):
# Python 3.11 and later
pgid_to_set = [-1]
allow_vfork = [subprocess._USE_VFORK]
else:
return pid
pgid_to_set = []
allow_vfork = []

try:
return _posixsubprocess.fork_exec(
cmd,
cmd[0:1],
True,
keep_fds,
None,
encoded_env,
-1,
-1,
-1,
-1,
-1,
-1,
errpipe_read,
errpipe_write,
False,
False,
*pgid_to_set,
None,
None,
None,
-1,
None,
*allow_vfork,
)
finally:
os.close(errpipe_read)
os.close(errpipe_write)
86 changes: 17 additions & 69 deletions loky/backend/resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@
#
# author: Thomas Moreau
#
# adapted from multiprocessing/semaphore_tracker.py (17/02/2017)
# * include custom spawnv_passfds to start the process
# * add some VERBOSE logging
#
# TODO: multiprocessing.resource_tracker was contributed to Python 3.8 so
# once loky drops support for Python 3.7 it might be possible to stop
# maintaining this loky-specific fork. As a consequence, it might also be
# possible to stop maintaining the loky.backend.synchronize fork of
# multiprocessing.synchronize.

# Adapted from multiprocessing/resource_tracker.py
# * add some VERBOSE logging,
# * add support to track folders,
# * add Windows support,
# * refcounting scheme to avoid unlinking resources still in use.
#
# On Unix we run a server process which keeps track of unlinked
# resources. The server ignores SIGINT and SIGTERM and reads from a
Expand All @@ -40,17 +35,19 @@
# Note that this behavior differs from CPython's resource_tracker, which only
# implements list of shared resources, and not a proper refcounting scheme.
# Also, CPython's resource tracker will only attempt to cleanup those shared
# resources once all procsses connected to the resouce tracker have exited.
# resources once all processes connected to the resource tracker have exited.


import os
import shutil
import sys
import signal
import warnings
import threading
from _multiprocessing import sem_unlink
from multiprocessing import util
from multiprocessing.resource_tracker import (
ResourceTracker as _ResourceTracker,
)

from . import spawn

Expand All @@ -74,15 +71,11 @@
VERBOSE = False


class ResourceTracker:
def __init__(self):
self._lock = threading.Lock()
self._fd = None
self._pid = None

def getfd(self):
class ResourceTracker(_ResourceTracker):
def maybe_unlink(self, name, rtype):
"""Decrement the refcount of a resource, and delete it if it hits 0"""
self.ensure_running()
return self._fd
self._send("MAYBE_UNLINK", name, rtype)

def ensure_running(self):
"""Make sure that resource tracker process is running.
Expand Down Expand Up @@ -164,39 +157,6 @@
else:
os.close(r)

def _check_alive(self):
"""Check for the existence of the resource tracker process."""
try:
self._send("PROBE", "", "")
except BrokenPipeError:
return False
else:
return True

def register(self, name, rtype):
"""Register a named resource, and increment its refcount."""
self.ensure_running()
self._send("REGISTER", name, rtype)

def unregister(self, name, rtype):
"""Unregister a named resource with resource tracker."""
self.ensure_running()
self._send("UNREGISTER", name, rtype)

def maybe_unlink(self, name, rtype):
"""Decrement the refcount of a resource, and delete it if it hits 0"""
self.ensure_running()
self._send("MAYBE_UNLINK", name, rtype)

def _send(self, cmd, name, rtype):
if len(name) > 512:
# posix guarantees that writes to a pipe of less than PIPE_BUF
# bytes are atomic, and that PIPE_BUF >= 512
raise ValueError("name too long")
msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
nbytes = os.write(self._fd, msg)
assert nbytes == len(msg)


_resource_tracker = ResourceTracker()
ensure_running = _resource_tracker.ensure_running
Expand Down Expand Up @@ -348,25 +308,13 @@
util.debug("resource tracker shut down")


#
# Start a program with only specified fds kept open
#


def spawnv_passfds(path, args, passfds):
passfds = sorted(passfds)
if sys.platform != "win32":
errpipe_read, errpipe_write = os.pipe()
try:
from .reduction import _mk_inheritable
from .fork_exec import fork_exec

_pass = [_mk_inheritable(fd) for fd in passfds]
return fork_exec(args, _pass)
finally:
os.close(errpipe_read)
os.close(errpipe_write)
args = [arg.encode("utf-8") for arg in args]
path = path.encode("utf-8")
return util.spawnv_passfds(path, args, passfds)
else:
passfds = sorted(passfds)

Check warning on line 317 in loky/backend/resource_tracker.py

View check run for this annotation

Codecov / codecov/patch

loky/backend/resource_tracker.py#L317

Added line #L317 was not covered by tests
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lack of coverage is a false negative. This branch is actually executed when running the resource manager on Windows (to track temporary folders) but we properly do not configure coverage tracking in the resource manager process our our CI.

cmd = " ".join(f'"{x}"' for x in args)
try:
_, ht, pid, _ = _winapi.CreateProcess(
Expand Down
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ def run(self):
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Scientific/Engineering",
"Topic :: Utilities",
"Topic :: Software Development :: Libraries",
Expand Down
27 changes: 26 additions & 1 deletion tests/_test_process_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import pytest
import weakref
import tempfile
import traceback
import threading
import faulthandler
import warnings
Expand Down Expand Up @@ -1187,6 +1186,32 @@ def raise_value_error():

executor.shutdown(wait=True)

def test_no_deprecation_warning_is_raised_on_fork(self):
# On POSIX, recent versions of Python detect if the process has native
# threads running when calling `os.fork` and similar. All supported
# process executors should not trigger this warning by not calling
# `os.fork` (and `os.execve`) manually but instead using the compound
# _posixsubprocess.fork_exec.
try:
from _testcapi import _spawn_pthread_waiter, _end_spawned_pthread
except ImportError:
pytest.skip("Cannot test without _testcapi module")

_spawn_pthread_waiter()
try:
with warnings.catch_warnings(
category=DeprecationWarning, record=True
) as w:
warnings.simplefilter("always", category=DeprecationWarning)
executor = self.executor_type(max_workers=2)
try:
list(executor.map(sqrt, range(10)))
finally:
executor.shutdown(wait=True)
assert len(w) == 0, [w.message for w in w]
finally:
_end_spawned_pthread()


def _custom_initializer():
"""_custom_initializer is module function to be picklable
Expand Down
Loading
Loading