-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Question: intercepting stdout / stderr / python logger on loky processes #306
Comments
If you absolutely need to, you can capture individual task outputs by extending loky's implementation. Here's a solution that works by monkey-patching the process worker to redirect stdout/stderr at the file descriptor level. Note that since this modifies loky's internals, it could potentially break with future updates so I certainly don't like this. The key part of this solution is intercepting the Queue's import os
import sys
from pathlib import Path
import tempfile
from contextlib import contextmanager
from joblib.externals.loky import process_executor
from joblib.externals.loky.process_executor import _CallItem
from joblib import Parallel, delayed
@contextmanager
def redirect_output(log_path):
"""redirection at the file descriptor level"""
sys.stdout.flush()
sys.stderr.flush()
stdout_fd = sys.stdout.fileno()
stderr_fd = sys.stderr.fileno()
saved_stdout_fd = os.dup(stdout_fd)
saved_stderr_fd = os.dup(stderr_fd)
try:
with open(log_path, "w", buffering=1) as log_file:
os.dup2(log_file.fileno(), stdout_fd)
os.dup2(log_file.fileno(), stderr_fd)
yield
finally:
sys.stdout.flush()
sys.stderr.flush()
os.dup2(saved_stdout_fd, stdout_fd)
os.dup2(saved_stderr_fd, stderr_fd)
os.close(saved_stdout_fd)
os.close(saved_stderr_fd)
class LoggingCallItem(_CallItem):
def __call__(self):
parent_pid = os.getenv("PARENT_PID", "0")
worker_pid = os.getpid()
log_dir = Path(tempfile.gettempdir()) / "joblib_logs"
log_dir.mkdir(exist_ok=True)
log_path = log_dir / f"task_{parent_pid}_{worker_pid}_{self.work_id}.log"
# call parent while capturing output
with redirect_output(log_path):
return super().__call__()
def setup_worker_logging():
"""monkey-patching process_executor._process_worker to be abel to track it"""
original_worker = process_executor._process_worker
def logging_process_worker(call_queue, result_queue, *args, **kwargs):
# wrap the queue to convert _CallItems to LoggingCallItems
class WrappedQueue:
def __init__(self, queue):
self.queue = queue
def get(self, *args, **kwargs):
item = self.queue.get(*args, **kwargs)
if item is not None:
return LoggingCallItem(
work_id=item.work_id,
fn=item.fn,
args=item.args,
kwargs=item.kwargs,
)
return item
def __getattr__(self, name):
return getattr(self.queue, name)
# call original worker with wrapped queue
return original_worker(WrappedQueue(call_queue), result_queue, *args, **kwargs)
process_executor._process_worker = logging_process_worker
def get_logs():
"""retrieves and cleans up log files from worker processes."""
log_dir = Path(tempfile.gettempdir()) / "joblib_logs"
logs = {}
if log_dir.exists():
log_files = sorted(
log_dir.glob(f"task_*.log"),
key=lambda f: int(f.stem.split("_")[-1]),
)
for log_file in log_files:
logs[log_file.name] = log_file.read_text()
log_file.unlink()
return logs
# example usage :
def process_item(x):
print(f"Processing item {x}")
import time
time.sleep(1)
print(f"Finished item {x}")
return x * 2
def test():
print("Starting parallel processing")
results = Parallel(n_jobs=2)(delayed(process_item)(i) for i in range(4))
print(f"Results: {results}")
if __name__ == "__main__":
setup_worker_logging()
# your code here
test()
logs = get_logs()
print(logs) |
Hi, I've been using Loky to do some batch-processing on a supercomputer. So far this has worked well. This library has saved me a significant amount of time. However, I'm now running into an issue.
As far as my understanding on Loky goes each job submitted runs in a pool of worker processes outside of the main-process.
I'm wondering if the stdout / stderr of each job submitted to the reusable worker pool can be captured to a separate file. Currently it's being spewed out on the main process, either on stdout/stderr or the python logger, but this is making individual jobs hard to debug as I'm crunching through thousands of jobs over the course of a few days resulting in gigabytes of logging to be produced in a single file.
Here's a short code example on what I'm trying to achieve.
The text was updated successfully, but these errors were encountered: