From 9bd2142ac5e027ff8b6299ff1df7143753e929da Mon Sep 17 00:00:00 2001 From: Hugh <90424587+HughParry@users.noreply.github.com> Date: Wed, 23 Oct 2024 17:03:13 +0100 Subject: [PATCH] Stop captcha storage job from killing express (#1470) Co-authored-by: Chris Taylor --- packages/cli/src/cli.ts | 2 + packages/cli/src/start.ts | 2 +- packages/env/src/provider.ts | 3 +- .../src/schedulers/captchaScheduler.ts | 57 ++++++++--------- .../provider/src/tasks/client/clientTasks.ts | 47 ++++++++------ .../schedulers/captchaScheduler.unit.test.ts | 43 ++++++++++--- .../tasks/client/clientTasks.unit.test.ts | 6 +- .../src/workers/storeCaptchaWorker.ts | 63 +++++++++++++++++++ 8 files changed, 166 insertions(+), 57 deletions(-) create mode 100644 packages/provider/src/workers/storeCaptchaWorker.ts diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index 1f88fdbed3..af37b6449f 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -32,6 +32,8 @@ async function main() { unsolved: { count: 0 }, }); + log.info(config); + if (config.devOnlyWatchEvents) { log.warn( ` diff --git a/packages/cli/src/start.ts b/packages/cli/src/start.ts index 28b3dd475b..e9cf91968b 100644 --- a/packages/cli/src/start.ts +++ b/packages/cli/src/start.ts @@ -95,7 +95,7 @@ export async function start( // Start the scheduled jobs if (env.pair) { - storeCaptchasExternally(env.pair, env.config).catch((err) => { + storeCaptchasExternally(env.config).catch((err) => { console.error("Failed to start scheduler:", err); }); getClientList(env.pair, env.config).catch((err) => { diff --git a/packages/env/src/provider.ts b/packages/env/src/provider.ts index 4f5f22ba1a..7d4ce5bdc1 100644 --- a/packages/env/src/provider.ts +++ b/packages/env/src/provider.ts @@ -1,4 +1,3 @@ -import type { ProsopoConfigOutput } from "@prosopo/types"; // Copyright 2021-2024 Prosopo (UK) Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,6 +11,8 @@ import type { ProsopoConfigOutput } from "@prosopo/types"; // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + +import type { ProsopoConfigOutput } from "@prosopo/types"; import { Environment } from "./env.js"; export class ProviderEnvironment extends Environment { diff --git a/packages/provider/src/schedulers/captchaScheduler.ts b/packages/provider/src/schedulers/captchaScheduler.ts index afa2eaef3d..0a57a2ee0d 100644 --- a/packages/provider/src/schedulers/captchaScheduler.ts +++ b/packages/provider/src/schedulers/captchaScheduler.ts @@ -12,21 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -import type { KeyringPair } from "@polkadot/keyring/types"; -import { ProviderEnvironment } from "@prosopo/env"; -import { type ProsopoConfigOutput, ScheduledTaskNames } from "@prosopo/types"; +import { Worker, isMainThread, threadId } from "node:worker_threads"; +import { getLoggerDefault } from "@prosopo/common"; +import type { ProsopoConfigOutput } from "@prosopo/types"; import { CronJob } from "cron"; -import { Tasks } from "../tasks/tasks.js"; -import { checkIfTaskIsRunning } from "../util.js"; -export async function storeCaptchasExternally( - pair: KeyringPair, - config: ProsopoConfigOutput, -) { - const env = new ProviderEnvironment(config, pair); - await env.isReady(); - - const tasks = new Tasks(env); +export async function storeCaptchasExternally(config: ProsopoConfigOutput) { + const logger = getLoggerDefault(); + logger.log( + `Main script - isMainThread: ${isMainThread}, threadId: ${threadId}, pid: ${process.pid}`, + ); // Set the cron schedule to run on user configured schedule or every hour const defaultSchedule = "0 * * * *"; @@ -36,22 +31,28 @@ export async function storeCaptchasExternally( : defaultSchedule : defaultSchedule; - const job = new CronJob(cronSchedule, async () => { - const taskRunning = await checkIfTaskIsRunning( - ScheduledTaskNames.StoreCommitmentsExternal, - env.getDb(), - ); - env.logger.info( - `${ScheduledTaskNames.StoreCommitmentsExternal} task running: ${taskRunning}`, + const job = new CronJob(cronSchedule, () => { + logger.log(`Creating worker - from main thread: ${threadId}`); + const worker = new Worker( + new URL("../workers/storeCaptchaWorker.js", import.meta.url), + { + workerData: { config }, + }, ); - if (!taskRunning) { - env.logger.info( - `${ScheduledTaskNames.StoreCommitmentsExternal} task....`, - ); - await tasks.clientTaskManager.storeCommitmentsExternal().catch((err) => { - env.logger.error(err); - }); - } + + worker.on("message", (message) => { + logger.log(`Worker message: ${message}`); + }); + + worker.on("error", (error) => { + logger.error(`Worker error: ${error}`); + }); + + worker.on("exit", (code) => { + if (code !== 0) { + logger.error(`Worker stopped with exit code ${code}`); + } + }); }); job.start(); diff --git a/packages/provider/src/tasks/client/clientTasks.ts b/packages/provider/src/tasks/client/clientTasks.ts index d248ee8e05..6a14bd14f4 100644 --- a/packages/provider/src/tasks/client/clientTasks.ts +++ b/packages/provider/src/tasks/client/clientTasks.ts @@ -20,7 +20,12 @@ import { ScheduledTaskNames, ScheduledTaskStatus, } from "@prosopo/types"; -import type { ClientRecord, IProviderDatabase } from "@prosopo/types-database"; +import type { + ClientRecord, + IProviderDatabase, + PoWCaptchaStored, + UserCommitment, +} from "@prosopo/types-database"; export class ClientTaskManager { config: ProsopoConfigOutput; @@ -63,7 +68,7 @@ export class ClientTaskManager { await this.providerDB.getUnstoredDappUserPoWCommitments(); // filter to only get records that have been updated since the last task - if (lastTask) { + if (lastTask?.updated) { this.logger.info( `Filtering records to only get updated records: ${JSON.stringify(lastTask)}`, ); @@ -74,24 +79,30 @@ export class ClientTaskManager { taskID, ); - commitments = commitments.filter( - (commitment) => - lastTask.updated && - commitment.lastUpdatedTimestamp && - (commitment.lastUpdatedTimestamp > lastTask.updated || - !commitment.lastUpdatedTimestamp), - ); - - powRecords = powRecords.filter((commitment) => { + const isCommitmentUpdated = ( + commitment: UserCommitment | PoWCaptchaStored, + ): boolean => { + const { lastUpdatedTimestamp, storedAtTimestamp } = commitment; return ( - lastTask.updated && - commitment.lastUpdatedTimestamp && - // either the update stamp is more recent than the last time this task ran or there is no update stamp, - // so it is a new record - (commitment.lastUpdatedTimestamp > lastTask.updated || - !commitment.lastUpdatedTimestamp) + !lastUpdatedTimestamp || + !storedAtTimestamp || + lastUpdatedTimestamp > storedAtTimestamp ); - }); + }; + + const commitmentUpdated = ( + commitment: UserCommitment | PoWCaptchaStored, + ): boolean => { + return !!lastTask.updated && isCommitmentUpdated(commitment); + }; + + commitments = commitments.filter((commitment) => + commitmentUpdated(commitment), + ); + + powRecords = powRecords.filter((commitment) => + commitmentUpdated(commitment), + ); } if (commitments.length || powRecords.length) { diff --git a/packages/provider/src/tests/unit/schedulers/captchaScheduler.unit.test.ts b/packages/provider/src/tests/unit/schedulers/captchaScheduler.unit.test.ts index 2e7d16ac48..6e7acd9346 100644 --- a/packages/provider/src/tests/unit/schedulers/captchaScheduler.unit.test.ts +++ b/packages/provider/src/tests/unit/schedulers/captchaScheduler.unit.test.ts @@ -14,11 +14,12 @@ import type { KeyringPair } from "@polkadot/keyring/types"; import { ProviderEnvironment } from "@prosopo/env"; -import { type ProsopoConfigOutput, ScheduledTaskNames } from "@prosopo/types"; +import type { ProsopoConfigOutput } from "@prosopo/types"; import { CronJob } from "cron"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { storeCaptchasExternally } from "../../../schedulers/captchaScheduler.js"; import { Tasks } from "../../../tasks/tasks.js"; +import { storeCaptchaWorker } from "../../../workers/storeCaptchaWorker.js"; vi.mock("@prosopo/env", () => ({ ProviderEnvironment: vi.fn().mockImplementation(() => ({ @@ -56,6 +57,30 @@ vi.mock("../../../util.js", () => ({ checkIfTaskIsRunning: vi.fn().mockResolvedValue(false), })); +vi.mock("node:worker_threads", () => ({ + isMainThread: vi.fn().mockReturnValue(true), + threadId: vi.fn().mockReturnValue(0), + parentPort: { + postMessage: vi.fn(), + }, + Worker: vi.fn().mockImplementation((url, options) => ({ + on: vi.fn(), + })), + workerData: { + config: { + account: { + secret: + "puppy cream effort carbon despair leg pyramid cotton endorse immense drill peasant", + }, + scheduledTasks: { + captchaScheduler: { + schedule: "0 * * * *", + }, + }, + }, + }, +})); + describe("storeCaptchasExternally", () => { let mockPair: KeyringPair; let mockConfig: ProsopoConfigOutput; @@ -72,19 +97,23 @@ describe("storeCaptchasExternally", () => { }); it("should initialize environment and start cron job", async () => { - await storeCaptchasExternally(mockPair, mockConfig); + await storeCaptchasExternally(mockConfig); - expect(ProviderEnvironment).toHaveBeenCalledWith(mockConfig, mockPair); - expect(Tasks).toHaveBeenCalled(); expect(CronJob).toHaveBeenCalledWith("0 * * * *", expect.any(Function)); }); - it("should log message when cron job runs", async () => { - await storeCaptchasExternally(mockPair, mockConfig); + it("should initialize environment and start cron job", async () => { + await storeCaptchaWorker(); + expect(ProviderEnvironment).toHaveBeenCalled(); + expect(Tasks).toHaveBeenCalled(); + }); + it("should log message when cron job runs", async () => { + await storeCaptchaWorker(); // biome-ignore lint/suspicious/noExplicitAny: TODO fix const envInstance = (ProviderEnvironment as any).mock.results[0].value; - expect(envInstance.logger.info).toHaveBeenCalledWith( + expect(envInstance.logger.info).toHaveBeenNthCalledWith( + 1, "StoreCommitmentsExternal task running: false", ); }); diff --git a/packages/provider/src/tests/unit/tasks/client/clientTasks.unit.test.ts b/packages/provider/src/tests/unit/tasks/client/clientTasks.unit.test.ts index c29237341e..d53d0015b5 100644 --- a/packages/provider/src/tests/unit/tasks/client/clientTasks.unit.test.ts +++ b/packages/provider/src/tests/unit/tasks/client/clientTasks.unit.test.ts @@ -225,23 +225,25 @@ describe("ClientTaskManager", () => { it("should not store commitments externally if they have been stored", async () => { const mockCommitments: Pick< UserCommitment, - "id" | "lastUpdatedTimestamp" + "id" | "lastUpdatedTimestamp" | "storedAtTimestamp" >[] = [ { id: "commitment1", // Image commitments were stored at time 1 lastUpdatedTimestamp: 1, + storedAtTimestamp: 1, }, ]; const mockPoWCommitments: Pick< PoWCaptchaStored, - "challenge" | "lastUpdatedTimestamp" + "challenge" | "lastUpdatedTimestamp" | "storedAtTimestamp" >[] = [ { challenge: "1234567___userAccount___dappAccount", // PoW commitments were stored at time 3 lastUpdatedTimestamp: 3, + storedAtTimestamp: 1, }, ]; diff --git a/packages/provider/src/workers/storeCaptchaWorker.ts b/packages/provider/src/workers/storeCaptchaWorker.ts new file mode 100644 index 0000000000..aa86ad3399 --- /dev/null +++ b/packages/provider/src/workers/storeCaptchaWorker.ts @@ -0,0 +1,63 @@ +// Copyright 2021-2024 Prosopo (UK) Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +import { + isMainThread, + parentPort, + threadId, + workerData, +} from "node:worker_threads"; +import { getPairAsync } from "@prosopo/contract"; +import { ProviderEnvironment } from "@prosopo/env"; +import { ScheduledTaskNames } from "@prosopo/types"; +import { Tasks } from "../tasks/tasks.js"; +import { checkIfTaskIsRunning } from "../util.js"; + +export const storeCaptchaWorker = async () => { + const config = workerData.config; + const secret = config.account.secret; + + const pair = await getPairAsync(secret); + + const env = new ProviderEnvironment(config, pair); + await env.isReady(); + + env.logger.log( + `Worker script - isMainThread: ${isMainThread}, threadId: ${threadId}, pid: ${process.pid}`, + ); + + const tasks = new Tasks(env); + + const taskRunning = await checkIfTaskIsRunning( + ScheduledTaskNames.StoreCommitmentsExternal, + env.getDb(), + ); + env.logger.info( + `${ScheduledTaskNames.StoreCommitmentsExternal} task running: ${taskRunning}`, + ); + if (!taskRunning) { + env.logger.info(`${ScheduledTaskNames.StoreCommitmentsExternal} task....`); + await tasks.clientTaskManager.storeCommitmentsExternal().catch((err) => { + env.logger.error(err); + }); + } + + if (parentPort) + parentPort.postMessage( + `Store Captcha task completed in worker thread: ${threadId}`, + ); +}; + +if (!isMainThread) { + storeCaptchaWorker(); +}