Skip to content

Commit

Permalink
Stop captcha storage job from killing express (#1470)
Browse files Browse the repository at this point in the history
Co-authored-by: Chris Taylor <[email protected]>
  • Loading branch information
HughParry and forgetso authored Oct 23, 2024
1 parent 9202bb7 commit 9bd2142
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 57 deletions.
2 changes: 2 additions & 0 deletions packages/cli/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ async function main() {
unsolved: { count: 0 },
});

log.info(config);

if (config.devOnlyWatchEvents) {
log.warn(
`
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export async function start(

// Start the scheduled jobs
if (env.pair) {
storeCaptchasExternally(env.pair, env.config).catch((err) => {
storeCaptchasExternally(env.config).catch((err) => {
console.error("Failed to start scheduler:", err);
});
getClientList(env.pair, env.config).catch((err) => {
Expand Down
3 changes: 2 additions & 1 deletion packages/env/src/provider.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { ProsopoConfigOutput } from "@prosopo/types";
// Copyright 2021-2024 Prosopo (UK) Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -12,6 +11,8 @@ import type { ProsopoConfigOutput } from "@prosopo/types";
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import type { ProsopoConfigOutput } from "@prosopo/types";
import { Environment } from "./env.js";

export class ProviderEnvironment extends Environment {
Expand Down
57 changes: 29 additions & 28 deletions packages/provider/src/schedulers/captchaScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import type { KeyringPair } from "@polkadot/keyring/types";
import { ProviderEnvironment } from "@prosopo/env";
import { type ProsopoConfigOutput, ScheduledTaskNames } from "@prosopo/types";
import { Worker, isMainThread, threadId } from "node:worker_threads";
import { getLoggerDefault } from "@prosopo/common";
import type { ProsopoConfigOutput } from "@prosopo/types";
import { CronJob } from "cron";
import { Tasks } from "../tasks/tasks.js";
import { checkIfTaskIsRunning } from "../util.js";

export async function storeCaptchasExternally(
pair: KeyringPair,
config: ProsopoConfigOutput,
) {
const env = new ProviderEnvironment(config, pair);
await env.isReady();

const tasks = new Tasks(env);
export async function storeCaptchasExternally(config: ProsopoConfigOutput) {
const logger = getLoggerDefault();
logger.log(
`Main script - isMainThread: ${isMainThread}, threadId: ${threadId}, pid: ${process.pid}`,
);

// Set the cron schedule to run on user configured schedule or every hour
const defaultSchedule = "0 * * * *";
Expand All @@ -36,22 +31,28 @@ export async function storeCaptchasExternally(
: defaultSchedule
: defaultSchedule;

const job = new CronJob(cronSchedule, async () => {
const taskRunning = await checkIfTaskIsRunning(
ScheduledTaskNames.StoreCommitmentsExternal,
env.getDb(),
);
env.logger.info(
`${ScheduledTaskNames.StoreCommitmentsExternal} task running: ${taskRunning}`,
const job = new CronJob(cronSchedule, () => {
logger.log(`Creating worker - from main thread: ${threadId}`);
const worker = new Worker(
new URL("../workers/storeCaptchaWorker.js", import.meta.url),
{
workerData: { config },
},
);
if (!taskRunning) {
env.logger.info(
`${ScheduledTaskNames.StoreCommitmentsExternal} task....`,
);
await tasks.clientTaskManager.storeCommitmentsExternal().catch((err) => {
env.logger.error(err);
});
}

worker.on("message", (message) => {
logger.log(`Worker message: ${message}`);
});

worker.on("error", (error) => {
logger.error(`Worker error: ${error}`);
});

worker.on("exit", (code) => {
if (code !== 0) {
logger.error(`Worker stopped with exit code ${code}`);
}
});
});

job.start();
Expand Down
47 changes: 29 additions & 18 deletions packages/provider/src/tasks/client/clientTasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ import {
ScheduledTaskNames,
ScheduledTaskStatus,
} from "@prosopo/types";
import type { ClientRecord, IProviderDatabase } from "@prosopo/types-database";
import type {
ClientRecord,
IProviderDatabase,
PoWCaptchaStored,
UserCommitment,
} from "@prosopo/types-database";

export class ClientTaskManager {
config: ProsopoConfigOutput;
Expand Down Expand Up @@ -63,7 +68,7 @@ export class ClientTaskManager {
await this.providerDB.getUnstoredDappUserPoWCommitments();

// filter to only get records that have been updated since the last task
if (lastTask) {
if (lastTask?.updated) {
this.logger.info(
`Filtering records to only get updated records: ${JSON.stringify(lastTask)}`,
);
Expand All @@ -74,24 +79,30 @@ export class ClientTaskManager {
taskID,
);

commitments = commitments.filter(
(commitment) =>
lastTask.updated &&
commitment.lastUpdatedTimestamp &&
(commitment.lastUpdatedTimestamp > lastTask.updated ||
!commitment.lastUpdatedTimestamp),
);

powRecords = powRecords.filter((commitment) => {
const isCommitmentUpdated = (
commitment: UserCommitment | PoWCaptchaStored,
): boolean => {
const { lastUpdatedTimestamp, storedAtTimestamp } = commitment;
return (
lastTask.updated &&
commitment.lastUpdatedTimestamp &&
// either the update stamp is more recent than the last time this task ran or there is no update stamp,
// so it is a new record
(commitment.lastUpdatedTimestamp > lastTask.updated ||
!commitment.lastUpdatedTimestamp)
!lastUpdatedTimestamp ||
!storedAtTimestamp ||
lastUpdatedTimestamp > storedAtTimestamp
);
});
};

const commitmentUpdated = (
commitment: UserCommitment | PoWCaptchaStored,
): boolean => {
return !!lastTask.updated && isCommitmentUpdated(commitment);
};

commitments = commitments.filter((commitment) =>
commitmentUpdated(commitment),
);

powRecords = powRecords.filter((commitment) =>
commitmentUpdated(commitment),
);
}

if (commitments.length || powRecords.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@

import type { KeyringPair } from "@polkadot/keyring/types";
import { ProviderEnvironment } from "@prosopo/env";
import { type ProsopoConfigOutput, ScheduledTaskNames } from "@prosopo/types";
import type { ProsopoConfigOutput } from "@prosopo/types";
import { CronJob } from "cron";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { storeCaptchasExternally } from "../../../schedulers/captchaScheduler.js";
import { Tasks } from "../../../tasks/tasks.js";
import { storeCaptchaWorker } from "../../../workers/storeCaptchaWorker.js";

vi.mock("@prosopo/env", () => ({
ProviderEnvironment: vi.fn().mockImplementation(() => ({
Expand Down Expand Up @@ -56,6 +57,30 @@ vi.mock("../../../util.js", () => ({
checkIfTaskIsRunning: vi.fn().mockResolvedValue(false),
}));

vi.mock("node:worker_threads", () => ({
isMainThread: vi.fn().mockReturnValue(true),
threadId: vi.fn().mockReturnValue(0),
parentPort: {
postMessage: vi.fn(),
},
Worker: vi.fn().mockImplementation((url, options) => ({
on: vi.fn(),
})),
workerData: {
config: {
account: {
secret:
"puppy cream effort carbon despair leg pyramid cotton endorse immense drill peasant",
},
scheduledTasks: {
captchaScheduler: {
schedule: "0 * * * *",
},
},
},
},
}));

describe("storeCaptchasExternally", () => {
let mockPair: KeyringPair;
let mockConfig: ProsopoConfigOutput;
Expand All @@ -72,19 +97,23 @@ describe("storeCaptchasExternally", () => {
});

it("should initialize environment and start cron job", async () => {
await storeCaptchasExternally(mockPair, mockConfig);
await storeCaptchasExternally(mockConfig);

expect(ProviderEnvironment).toHaveBeenCalledWith(mockConfig, mockPair);
expect(Tasks).toHaveBeenCalled();
expect(CronJob).toHaveBeenCalledWith("0 * * * *", expect.any(Function));
});

it("should log message when cron job runs", async () => {
await storeCaptchasExternally(mockPair, mockConfig);
it("should initialize environment and start cron job", async () => {
await storeCaptchaWorker();
expect(ProviderEnvironment).toHaveBeenCalled();
expect(Tasks).toHaveBeenCalled();
});

it("should log message when cron job runs", async () => {
await storeCaptchaWorker();
// biome-ignore lint/suspicious/noExplicitAny: TODO fix
const envInstance = (ProviderEnvironment as any).mock.results[0].value;
expect(envInstance.logger.info).toHaveBeenCalledWith(
expect(envInstance.logger.info).toHaveBeenNthCalledWith(
1,
"StoreCommitmentsExternal task running: false",
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,23 +225,25 @@ describe("ClientTaskManager", () => {
it("should not store commitments externally if they have been stored", async () => {
const mockCommitments: Pick<
UserCommitment,
"id" | "lastUpdatedTimestamp"
"id" | "lastUpdatedTimestamp" | "storedAtTimestamp"
>[] = [
{
id: "commitment1",
// Image commitments were stored at time 1
lastUpdatedTimestamp: 1,
storedAtTimestamp: 1,
},
];

const mockPoWCommitments: Pick<
PoWCaptchaStored,
"challenge" | "lastUpdatedTimestamp"
"challenge" | "lastUpdatedTimestamp" | "storedAtTimestamp"
>[] = [
{
challenge: "1234567___userAccount___dappAccount",
// PoW commitments were stored at time 3
lastUpdatedTimestamp: 3,
storedAtTimestamp: 1,
},
];

Expand Down
63 changes: 63 additions & 0 deletions packages/provider/src/workers/storeCaptchaWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2021-2024 Prosopo (UK) Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import {
isMainThread,
parentPort,
threadId,
workerData,
} from "node:worker_threads";
import { getPairAsync } from "@prosopo/contract";
import { ProviderEnvironment } from "@prosopo/env";
import { ScheduledTaskNames } from "@prosopo/types";
import { Tasks } from "../tasks/tasks.js";
import { checkIfTaskIsRunning } from "../util.js";

export const storeCaptchaWorker = async () => {
const config = workerData.config;
const secret = config.account.secret;

const pair = await getPairAsync(secret);

const env = new ProviderEnvironment(config, pair);
await env.isReady();

env.logger.log(
`Worker script - isMainThread: ${isMainThread}, threadId: ${threadId}, pid: ${process.pid}`,
);

const tasks = new Tasks(env);

const taskRunning = await checkIfTaskIsRunning(
ScheduledTaskNames.StoreCommitmentsExternal,
env.getDb(),
);
env.logger.info(
`${ScheduledTaskNames.StoreCommitmentsExternal} task running: ${taskRunning}`,
);
if (!taskRunning) {
env.logger.info(`${ScheduledTaskNames.StoreCommitmentsExternal} task....`);
await tasks.clientTaskManager.storeCommitmentsExternal().catch((err) => {
env.logger.error(err);
});
}

if (parentPort)
parentPort.postMessage(
`Store Captcha task completed in worker thread: ${threadId}`,
);
};

if (!isMainThread) {
storeCaptchaWorker();
}

0 comments on commit 9bd2142

Please sign in to comment.