Skip to content

Commit

Permalink
Expand cluster package API
Browse files Browse the repository at this point in the history
  • Loading branch information
t-ski committed Sep 10, 2024
1 parent 8ae36de commit 1e87830
Show file tree
Hide file tree
Showing 60 changed files with 1,048 additions and 735 deletions.
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,22 @@
"prepare": "npx husky",
"clean:buildinfo": "find ./packages -name 'tsconfig.debug.tsbuildinfo' -delete",
"clean:builds": "find ./packages -type d -name 'build' -exec rm -rf {} \\;",
"clean": "npm run clean:buildinfo && npm run clean:builds",
"debug": "npm run debug -ws",
"clean:logs": "rm -rf ./test-app/.logs",
"clean": "npm run clean:buildinfo && npm run clean:builds && npm run clean:logs",
"debug": "npm run clean && npm run debug -ws",
"build:w:rjs-core": "npm run build -w @rapidjs.org/rjs-core",
"build:w:rjs-cluster": "npm run build -w @rapidjs.org/rjs-cluster",
"build:w:rjs": "npm run build -w @rapidjs.org/rjs",
"build:w:rjs-proxy": "npm run build -w @rapidjs.org/rjs-proxy",
"build:w:rjs-cli": "npm run build -w @rapidjs.org/rjs-cli",
"build:all": "npm run build:w:rjs-core && npm run build:w:rjs-cluster && npm run build:w:rjs && npm run build:w:rjs-proxy && npm run build:w:rjs-cli",
"build": "npm run clean:buildinfo && npm run build:all",
"build": "npm run build:w:rjs-core && npm run build:w:rjs-cluster && npm run build:w:rjs && npm run build:w:rjs-proxy && npm run build:w:rjs-cli",
"test:app": "./rjs.sh start -W ./test-app/",
"test:app:dev": "./rjs.sh start -W ./test-app/ -D",
"test:w:rjs": "npm run test -w @rapidjs.org/rjs",
"test:w:rjs-proxy": "npm run test -w @rapidjs.org/rjs-proxy",
"test:w:rjs-cli": "npm run test -w @rapidjs.org/rjs-cli",
"test": "npm run test:w:rjs && npm run test:w:rjs-proxy && npm run test:w:rjs-cli",
"test-app": "node ./test-app/app",
"lint": "npx eslint ./packages/*/src/**/*.ts",
"lint:fix": "npx eslint --fix ./packages/*/src/**/*.ts",
"format": "npx prettier --check ./packages/*/src/**/*.ts",
Expand Down
1 change: 1 addition & 0 deletions packages/rjs-cli/src/registry.generate/Template.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { join, resolve } from "path";
import { Args } from "../Args";
import { Printer } from "../Printer";

// TODO: From remote?
export class Template {
constructor(templateName: string) {
const sourcePath: string = join(__dirname, "../../templates/", templateName);
Expand Down
204 changes: 204 additions & 0 deletions packages/rjs-cluster/src/AWorkerCluster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import { EventEmitter } from "events";
import { cpus } from "os";

import { TStatus } from "./.shared/global.types";
import { ISerialRequest, ISerialResponse } from "./.shared/global.interfaces";
import { Options } from "./.shared/Options";
import { Logger } from "./.shared/Logger";
import { WORKER_ERROR_CODE } from "./local.constants";
import { IErrorLimiterOptions, ErrorLimiter } from "./ErrorLimiter";


// errors approach
interface IWorker {
resolve: (sRes: ISerialResponse) => void;
reject: (sRes: ISerialResponse) => void;
}

interface IActiveWorker extends IWorker {
timeout: NodeJS.Timeout;
}

interface IPendingAssignment<T> extends IWorker {
sReq: ISerialRequest;

data?: T;
}


export interface IAdapterConfiguration {
modulePath: string;

options?: unknown;
}

export interface IClusterOptions {
baseSize: number;
timeout: number;
maxPending: number;

logsDirPath?: string; // To: CWD
silent?: boolean;
errorLimiterOptions?: Partial<IErrorLimiterOptions>;
}

export abstract class AWorkerCluster<Worker extends EventEmitter, T = void> extends EventEmitter {
private readonly activeWorkers: Map<Worker, IActiveWorker> = new Map();
private readonly idleWorkers: Worker[] = [];
private readonly pendingAssignments: IPendingAssignment<T>[] = [];
private readonly options: IClusterOptions;
private readonly logger: Logger;

protected readonly errorLimiter: ErrorLimiter;
protected readonly workerModulePath: string;
protected readonly adapterConfig: IAdapterConfiguration;

constructor(workerModulePath: string, adapterConfig: IAdapterConfiguration, options?: Partial<IClusterOptions>) {
super();

this.options = new Options<IClusterOptions>(options, {
baseSize: Math.min(Math.max((options ?? {}).baseSize ?? Infinity, 1), cpus().length - 1),
timeout: 30000,
maxPending: Infinity,
silent: false
}).object;

this.logger = new Logger(
this.options.logsDirPath ? Logger.defaultPath(this.options.logsDirPath) : null,
this.options.silent
); // TODO: Log intercept adapter out?

this.errorLimiter = new ErrorLimiter(options.errorLimiterOptions)
.on("feed", (err: Error) => {
this.logger.error(err);

this.emit("error", err);
})
.on("terminate", (onInit: boolean) => {
this.logger.error(new Error(
onInit
? "Error in worker scope"
: "Dense aggregate of errors in worker scope"
));

process.exit(WORKER_ERROR_CODE);
});
this.workerModulePath = workerModulePath;
this.adapterConfig = adapterConfig;

setImmediate(async () => {
for(let i = 0; i < this.options.baseSize; i++) {
await this.spawnWorker();
}

this.emit("online", this);
});
}

protected abstract createWorker(): Worker|Promise<Worker>;
protected abstract destroyWorker(worker: Worker): void;
protected abstract activateWorker(worker: Worker, sReq: ISerialRequest, reqData?: T): void;

private activate() {
if(!this.pendingAssignments.length
|| !this.idleWorkers.length) return;

const worker: Worker = this.idleWorkers.shift()!;
const assignment: IPendingAssignment<T> = this.pendingAssignments.shift()!;

this.activateWorker(worker, assignment.sReq, assignment.data);

this.activeWorkers.set(worker, {
resolve: assignment.resolve,
reject: assignment.reject,

timeout: setTimeout(() => {
this.deactivateWorkerWithError(worker, 408);
}, this.options.timeout)
});
}

protected getWorkerId(worker: Worker): number {
const optimisticWorkerCast = worker as unknown as {
threadId: number;
pid: number;
};

return optimisticWorkerCast.threadId ?? optimisticWorkerCast.pid;
}

protected async spawnWorker(): Promise<Worker> {
const worker: Worker = await this.createWorker();

(worker as Worker & { stdout: EventEmitter; })
.stdout.on("data", (message: string) => this.logger.info(message));
(worker as Worker & { stderr: EventEmitter; })
.stderr.on("data", (message: string) => this.logger.error(message));

this.idleWorkers.push(worker);

return worker;
}

protected deactivateWorker(worker: Worker, sRes?: ISerialResponse) {
const activeWorker: IActiveWorker = this.activeWorkers.get(worker)!;

if(!activeWorker) return;

clearTimeout(activeWorker.timeout);

activeWorker.resolve({
status: 200,
headers: {},

...sRes
});

this.idleWorkers.push(worker);

this.activeWorkers.delete(worker);

this.activate();
}

protected deactivateWorkerWithError(worker: Worker, err?: TStatus) {
const activeWorker: IActiveWorker = this.activeWorkers.get(worker)!;

if(!activeWorker) return;

activeWorker.resolve({
status: isNaN(err) ? err : 500,
headers: {},
});

this.deactivateWorker(worker);
}

public handleRequest(sReq: ISerialRequest, data?: T): Promise<ISerialResponse> {
return new Promise((resolve, reject) => {
if(this.pendingAssignments.length >= (this.options.maxPending ?? Infinity)) {
reject();

return;
}

this.pendingAssignments
.push({
sReq, data,
resolve, reject
});

this.activate();
});
}

public destroy() {
Array.from(this.activeWorkers.keys())
.concat(this.idleWorkers)
.forEach((worker: Worker) => {
this.destroyWorker(worker);
});
}

// TODO: Elastic size
}
Loading

0 comments on commit 1e87830

Please sign in to comment.