Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: Pass cancellation signal to job functions so they can gracefully shut down if necessary #409

Open
knpwrs opened this issue Dec 4, 2024 · 9 comments · May be fixed by #412

Comments

@knpwrs
Copy link

knpwrs commented Dec 4, 2024

There are two situations where I think an AbortController would be useful:

  1. Worker shutdown
  2. Job cancellation (job moved to dead)

In those circumstances, it may be useful to cancel fetch requests, process executions, or anything else that supports graceful shutdown via AbortController.

Here is an example of canceling a long-running fetch:

faktory.register("JobWithHeaders", (...args) => async ({ job }) => {
  await fetch("service:8080/some/long/running/endpoint", { signal: job.signal });
});

And here is an example of what canceling a long-running process may look like:

import { execa } from 'execa';

faktory.register("JobWithHeaders", (...args) => async ({ job }) => {
  await execa("ffmpeg", [/* arg1, arg2, ..., argN */], { cancelSignal: job.signal });
});
@jbielick
Copy link
Owner

jbielick commented Dec 5, 2024

Hey @knpwrs

I love the idea. A couple questions came to mind about implementation:

  1. When using the AbortController to shutdown workers, would this be a graceful shutdown (allow jobs to complete with timeout) or abrupt (FAIL the jobs immediately)?
  2. If the abort signal is sent and that abort signal is also given to a fetch or execa async task, I can imagine it throwing / rejecting from the fetch or execa call, which would fail the job, so what would be the desired behavior based on the answer to # 1 above?
  3. context.job is the job payload itself, which would not have additional properties according to the Faktory schema, so we may want signal to be a top-level key on the context object like so:
faktory.register("JobWithHeaders", (...args) => async ({ signal }) => {
  await fetch("service:8080/some/long/running/endpoint", { signal });
});

Any objections?

@knpwrs
Copy link
Author

knpwrs commented Dec 5, 2024

That all seems right. Aborting a fetch should throw an AbortError if I am not mistaken. This would also allow the user to implement additional cleanup logic in catch.

@jbielick
Copy link
Owner

Here's what I have in terms of documenting the behavior:

Shutdown and AbortSignal

A job may be interrupted when a worker shuts down. In this case there are two mechanisms to ensure graceful interruption: the shutdown timeout and the execution context AbortSignal. The shutdown timeout is configured in WorkerOptions. When a worker is instructed to stop (via process signal or server message), it will stop accepting new work (e.g. quiet) and wait the configured duration for any in-progress jobs to complete uninterrupted. If this duration elapses and jobs are still in progress, these jobs will receive an AbortSignal via context.signal. All jobs will be FAILed on the Faktory server, allowing them to retry later. The abort signal can be used to interrupt asynchronous processes and perform some cleanup tasks before an abrupt exit (process.exit). After the abort signal is sent, a job will have 3 seconds to perform cleanup before the process is abruptly exited.

Example - A long-running subprocess:

faktory.register("JobWithAbort", (...args) => async ({ signal }) => {
  try {
    await execa("ffmpeg", [/* arg1, arg2, ..., argN */], { cancelSignal: signal });
  } catch (e) {
    if (e.code === "ABORT_ERR") {
      // Remove some tempfiles or other type of cleanup...
      // Propagating the ABORT_ERR is not necessary, the job will be FAILed if it was in-progress 
      // at the end of the shutdown timeout
    }
  }
});

And a test case showing the behavior:

test("jobs are FAILed after the AbortSignal is sent during a hard shutdown and have time to clean up", async (t) => {
  const args = [1, 2, "three"];
  const queue = randQueue();
  const jobtype = "sleepy";
  const events: string[] = [];
  let exitCode: number | null = null;
  const jobs = [
    await push({ args, queue, jobtype }),
    await push({ args, queue, jobtype }),
  ];
  const fetched = jobs.slice();
  let failed: Map<string, JobPayload> = new Map();
  let started = 0;

  await mocked(async (server, port) => {
    await new Promise<void>(async (resolve) => {
      const originalExit = process.exit;
      // @ts-ignore
      process.exit = (code?: number = 0) => {
        events.push("EXIT");
        exitCode = code;
        process.exit = originalExit;
        resolve();
      };
      server
        .on("BEAT", mocked.beat())
        .on("FETCH", ({ socket }) => {
          events.push("FETCH");
          const job = fetched.pop();
          if (!job) {
            throw new Error("too many fetches");
          }
          return mocked.fetch(job)({ socket });
        })
        .on(
          "FAIL",
          ({
            data,
            socket,
          }: {
            data: JobPayload;
            socket: ServerControl["socket"];
          }) => {
            events.push("FAIL");
            failed.set(data.jid, data);
            return mocked.fail()({ socket });
          }
        )
        .on("ACK", ({ socket }: { socket: ServerControl["socket"] }) => {
          events.push("ACK");
          return mocked.ok()({ socket });
        });

      const worker = create({
        concurrency: 2,
        queues: [queue],
        port,
        timeout: 0.05,
        registry: {
          [jobtype]:
            async () =>
            async ({ signal }: MiddlewareContext) => {
              events.push("START");
              started += 1;
              if (started == 2) {
                // request stop after both are in progress (and sleeping)
                events.push("STOP");
                worker.stop();
              }
              try {
                // this settimeout will be interrupted with an ABORT_ERR
                // it is not caught, so it is propagated to the handler
                // and naturally FAILs
                await setTimeout(5000, undefined, { signal });
              } catch (e) {
                console.error(e);
              } finally {
                // this has a chance to run before exit
                events.push("FINALLY");
                // if something is placed on the event loop, it will occur after the job is FAILed
                await sleep(50);
                events.push("FINALLY AFTER ASYNC");
              }
            },
        },
      });
      events.push("WORK");
      await worker.work();
    });
    t.is(failed.size, 2);
    t.deepEqual([...failed.keys()].sort(), jobs.map((j) => j.jid).sort());
    t.deepEqual(events, [
      "WORK",
      "FETCH",
      "START",
      "FETCH",
      "START",
      "STOP",
      "FINALLY",
      "FINALLY",
      "FAIL",
      "FAIL",
      "FINALLY AFTER ASYNC",
      "FINALLY AFTER ASYNC",
      "EXIT",
    ]);
  });
});

Any comments, objections, missing features?

@knpwrs
Copy link
Author

knpwrs commented Dec 19, 2024

That all sounds really great! Would a job being canceled also trigger the signal?

@jbielick
Copy link
Owner

I’m probably missing something obvious—what does it mean to cancel a job? Where is that occurring?

@knpwrs
Copy link
Author

knpwrs commented Dec 20, 2024

If my present understanding of how Faktory works is correct then you can effectively cancel a running job by using the mutate API and sending a kill command: https://github.com/contribsys/faktory/wiki/Mutate-API#commands

In my case, say a video is encoding with ffmpeg and then that video gets deleted, the system would send a kill to any running processing job for that video and the worker should then free up to grab another video.

@jbielick
Copy link
Owner

That would be a really cool feature for the server to support, but I don't think it does that today. The Mutate API only works on the Retries, Dead, and Scheduled Sets. The queues themselves and the jobs currently in progress are not mutable at this time.

I think the best option is probably to A. keep some state in a distributed database indicating whether the job should be canceled, poll this with a separate interval within your job, and use an AbortController manually to interrupt the subprocess or B. use some sort of message bus where you can publish/subscribe and listen for kill signals based on the JID or something.

I could see the Faktory server signalling a worker with a particular JID to interrupt that specific job when the worker heartbeats to the server, but this is not very frequent (normally 15 second interval) and it would still have to be something the faktory server exposed APIs for.

@mperham
Copy link

mperham commented Dec 20, 2024

A CANCEL <jid> command is possible. It would have to be asynchronous though. Faktory can’t interrupt jobs in progress. You’re welcome to open a feature request.

@knpwrs
Copy link
Author

knpwrs commented Dec 20, 2024

Cool. Given all of that I think everything @jbielick wrote for the signal stuff makes sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants