Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: intercepting stdout / stderr / python logger on loky processes #306

Open
Sikerdebaard opened this issue Nov 24, 2021 · 1 comment

Comments

@Sikerdebaard
Copy link

Sikerdebaard commented Nov 24, 2021

Hi, I've been using Loky to do some batch-processing on a supercomputer. So far this has worked well. This library has saved me a significant amount of time. However, I'm now running into an issue.

As far as my understanding on Loky goes each job submitted runs in a pool of worker processes outside of the main-process.

I'm wondering if the stdout / stderr of each job submitted to the reusable worker pool can be captured to a separate file. Currently it's being spewed out on the main process, either on stdout/stderr or the python logger, but this is making individual jobs hard to debug as I'm crunching through thousands of jobs over the course of a few days resulting in gigabytes of logging to be produced in a single file.

Here's a short code example on what I'm trying to achieve.

import sys

from loky import get_reusable_executor, wrap_non_picklable_objects
from loky.backend.context import get_context, set_start_method


class Job():
    def __init__(self, jid):
        self.id = jid

@wrap_non_picklable_objects
def _loky_run_job(job):
    # stdout / stderr produced when this code runs needs to be intercepted to separate files
    # and not dumped on the main process console or python logger

    errors = []
    try:
        print('job runs here')
    except Exception:
        # catch exceptions in case the job fails
        exc_type, _, trace = sys.exc_info()
        exc_info = traceback.format_exc()
        trace = traceback.extract_tb(trace, 1)[0]
        print('LokyExecution encountered exception ({}) during execution:\n{}'.format(exc_type.__name__, exc_info))
        errors.append((exc_type.__name__, exc_info, trace[0], trace[1]))

    return job.id, errors

nr_of_workers = 2
executor = get_reusable_executor(
    max_workers=nr_of_workers,
    timeout=3600,  # respawn workers after 1 hour of inactivity
    reuse=True,
)

jobs = [Job(jid) for jid in range(1, 1000)]

futures = [executor.submit(_loky_run_job, job) for job in jobs]
@1mt4y
Copy link

1mt4y commented Feb 14, 2025

If you absolutely need to, you can capture individual task outputs by extending loky's implementation. Here's a solution that works by monkey-patching the process worker to redirect stdout/stderr at the file descriptor level. Note that since this modifies loky's internals, it could potentially break with future updates so I certainly don't like this.

The key part of this solution is intercepting the Queue's get() method (which contains a list of call queues) since loky reuses workers (meaning multiple tasks can run on the same worker) :

import os
import sys
from pathlib import Path
import tempfile
from contextlib import contextmanager
from joblib.externals.loky import process_executor
from joblib.externals.loky.process_executor import _CallItem
from joblib import Parallel, delayed


@contextmanager
def redirect_output(log_path):
    """redirection at the file descriptor level"""
    sys.stdout.flush()
    sys.stderr.flush()

    stdout_fd = sys.stdout.fileno()
    stderr_fd = sys.stderr.fileno()
    saved_stdout_fd = os.dup(stdout_fd)
    saved_stderr_fd = os.dup(stderr_fd)

    try:
        with open(log_path, "w", buffering=1) as log_file:
            os.dup2(log_file.fileno(), stdout_fd)
            os.dup2(log_file.fileno(), stderr_fd)
            yield
    finally:
        sys.stdout.flush()
        sys.stderr.flush()
        os.dup2(saved_stdout_fd, stdout_fd)
        os.dup2(saved_stderr_fd, stderr_fd)
        os.close(saved_stdout_fd)
        os.close(saved_stderr_fd)


class LoggingCallItem(_CallItem):
    def __call__(self):
        parent_pid = os.getenv("PARENT_PID", "0")
        worker_pid = os.getpid()
        log_dir = Path(tempfile.gettempdir()) / "joblib_logs"
        log_dir.mkdir(exist_ok=True)
        log_path = log_dir / f"task_{parent_pid}_{worker_pid}_{self.work_id}.log"

        # call parent while capturing output
        with redirect_output(log_path):
            return super().__call__()


def setup_worker_logging():
    """monkey-patching process_executor._process_worker to be abel to track it"""
    original_worker = process_executor._process_worker

    def logging_process_worker(call_queue, result_queue, *args, **kwargs):
        # wrap the queue to convert _CallItems to LoggingCallItems
        class WrappedQueue:
            def __init__(self, queue):
                self.queue = queue

            def get(self, *args, **kwargs):
                item = self.queue.get(*args, **kwargs)
                if item is not None:
                    return LoggingCallItem(
                        work_id=item.work_id,
                        fn=item.fn,
                        args=item.args,
                        kwargs=item.kwargs,
                    )
                return item

            def __getattr__(self, name):
                return getattr(self.queue, name)

        # call original worker with wrapped queue
        return original_worker(WrappedQueue(call_queue), result_queue, *args, **kwargs)

    process_executor._process_worker = logging_process_worker


def get_logs():
    """retrieves and cleans up log files from worker processes."""
    log_dir = Path(tempfile.gettempdir()) / "joblib_logs"
    logs = {}

    if log_dir.exists():
        log_files = sorted(
            log_dir.glob(f"task_*.log"),
            key=lambda f: int(f.stem.split("_")[-1]),
        )

        for log_file in log_files:
            logs[log_file.name] = log_file.read_text()
            log_file.unlink()

    return logs


# example usage :
def process_item(x):
    print(f"Processing item {x}")
    import time

    time.sleep(1)
    print(f"Finished item {x}")
    return x * 2


def test():
    print("Starting parallel processing")
    results = Parallel(n_jobs=2)(delayed(process_item)(i) for i in range(4))
    print(f"Results: {results}")


if __name__ == "__main__":
    setup_worker_logging()

    # your code here
    test()

    logs = get_logs()
    print(logs)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants