-
-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature Request: Pass cancellation signal to job functions so they can gracefully shut down if necessary #409
Comments
Hey @knpwrs I love the idea. A couple questions came to mind about implementation:
faktory.register("JobWithHeaders", (...args) => async ({ signal }) => {
await fetch("service:8080/some/long/running/endpoint", { signal });
}); Any objections? |
That all seems right. Aborting a fetch should throw an |
Here's what I have in terms of documenting the behavior: Shutdown and AbortSignalA job may be interrupted when a worker shuts down. In this case there are two mechanisms to ensure graceful interruption: the shutdown timeout and the execution context AbortSignal. The shutdown timeout is configured in Example - A long-running subprocess: faktory.register("JobWithAbort", (...args) => async ({ signal }) => {
try {
await execa("ffmpeg", [/* arg1, arg2, ..., argN */], { cancelSignal: signal });
} catch (e) {
if (e.code === "ABORT_ERR") {
// Remove some tempfiles or other type of cleanup...
// Propagating the ABORT_ERR is not necessary, the job will be FAILed if it was in-progress
// at the end of the shutdown timeout
}
}
}); And a test case showing the behavior: test("jobs are FAILed after the AbortSignal is sent during a hard shutdown and have time to clean up", async (t) => {
const args = [1, 2, "three"];
const queue = randQueue();
const jobtype = "sleepy";
const events: string[] = [];
let exitCode: number | null = null;
const jobs = [
await push({ args, queue, jobtype }),
await push({ args, queue, jobtype }),
];
const fetched = jobs.slice();
let failed: Map<string, JobPayload> = new Map();
let started = 0;
await mocked(async (server, port) => {
await new Promise<void>(async (resolve) => {
const originalExit = process.exit;
// @ts-ignore
process.exit = (code?: number = 0) => {
events.push("EXIT");
exitCode = code;
process.exit = originalExit;
resolve();
};
server
.on("BEAT", mocked.beat())
.on("FETCH", ({ socket }) => {
events.push("FETCH");
const job = fetched.pop();
if (!job) {
throw new Error("too many fetches");
}
return mocked.fetch(job)({ socket });
})
.on(
"FAIL",
({
data,
socket,
}: {
data: JobPayload;
socket: ServerControl["socket"];
}) => {
events.push("FAIL");
failed.set(data.jid, data);
return mocked.fail()({ socket });
}
)
.on("ACK", ({ socket }: { socket: ServerControl["socket"] }) => {
events.push("ACK");
return mocked.ok()({ socket });
});
const worker = create({
concurrency: 2,
queues: [queue],
port,
timeout: 0.05,
registry: {
[jobtype]:
async () =>
async ({ signal }: MiddlewareContext) => {
events.push("START");
started += 1;
if (started == 2) {
// request stop after both are in progress (and sleeping)
events.push("STOP");
worker.stop();
}
try {
// this settimeout will be interrupted with an ABORT_ERR
// it is not caught, so it is propagated to the handler
// and naturally FAILs
await setTimeout(5000, undefined, { signal });
} catch (e) {
console.error(e);
} finally {
// this has a chance to run before exit
events.push("FINALLY");
// if something is placed on the event loop, it will occur after the job is FAILed
await sleep(50);
events.push("FINALLY AFTER ASYNC");
}
},
},
});
events.push("WORK");
await worker.work();
});
t.is(failed.size, 2);
t.deepEqual([...failed.keys()].sort(), jobs.map((j) => j.jid).sort());
t.deepEqual(events, [
"WORK",
"FETCH",
"START",
"FETCH",
"START",
"STOP",
"FINALLY",
"FINALLY",
"FAIL",
"FAIL",
"FINALLY AFTER ASYNC",
"FINALLY AFTER ASYNC",
"EXIT",
]);
});
}); Any comments, objections, missing features? |
That all sounds really great! Would a job being canceled also trigger the signal? |
I’m probably missing something obvious—what does it mean to cancel a job? Where is that occurring? |
If my present understanding of how Faktory works is correct then you can effectively cancel a running job by using the mutate API and sending a In my case, say a video is encoding with ffmpeg and then that video gets deleted, the system would send a |
That would be a really cool feature for the server to support, but I don't think it does that today. The Mutate API only works on the Retries, Dead, and Scheduled Sets. The queues themselves and the jobs currently in progress are not mutable at this time. I think the best option is probably to A. keep some state in a distributed database indicating whether the job should be canceled, poll this with a separate interval within your job, and use an AbortController manually to interrupt the subprocess or B. use some sort of message bus where you can publish/subscribe and listen for kill signals based on the JID or something. I could see the Faktory server signalling a worker with a particular JID to interrupt that specific job when the worker heartbeats to the server, but this is not very frequent (normally 15 second interval) and it would still have to be something the faktory server exposed APIs for. |
A CANCEL <jid> command is possible. It would have to be asynchronous though. Faktory can’t interrupt jobs in progress. You’re welcome to open a feature request. |
Cool. Given all of that I think everything @jbielick wrote for the signal stuff makes sense. |
There are two situations where I think an
AbortController
would be useful:In those circumstances, it may be useful to cancel
fetch
requests, process executions, or anything else that supports graceful shutdown viaAbortController
.Here is an example of canceling a long-running fetch:
And here is an example of what canceling a long-running process may look like:
The text was updated successfully, but these errors were encountered: