Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

probe and diagnostic endpoints #763

Merged
merged 15 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions tdrive/backend/node/config/custom-environment-variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@
"logger": {
"level": "LOG_LEVEL"
},
"diagnostics": {
"skipKeys": {
"__name": "DIAG_SKIP_KEYS",
"__format": "json"
},
"probeSecret": "DIAG_PROBE_SECRET",
shepilov marked this conversation as resolved.
Show resolved Hide resolved
"statsLogPeriodMs": "DIAG_STATS_PRINT_PERIOD_MS",
"statsFullStatsLogPeriodMs": "DIAG_FULL_STATS_PRINT_PERIOD_MS"
},
"webserver": {
"host": "TWAKE_DRIVE_HOST",
"logger": {
Expand Down
7 changes: 7 additions & 0 deletions tdrive/backend/node/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
"logger": {
"level": "debug"
},
"diagnostics": {
"skipKeys": [],
"probeSecret": "",
"statsLogPeriodMs": 120000,
"statsFullStatsLogPeriodMs": 600000
},
"tracker": {
"type": "segment",
"segment": {
Expand Down Expand Up @@ -226,6 +232,7 @@
},
"services": [
"auth",
"diagnostics",
"push",
"storage",
"webserver",
Expand Down
266 changes: 266 additions & 0 deletions tdrive/backend/node/src/core/platform/framework/api/diagnostics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
import assert from "node:assert";
import config from "../../../config";
import { getLogger } from "../logger";

const logger = getLogger("Diagnostics");

/**
* Values that can match a set of diagnostic providers.
*
* `startup`, `ready` and `live` are meant to match the meanings of the corresponding
* kubernetes probes:
* https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/
*/
export type TDiagnosticTag =
| "*" // Special value that includes everything, and is always included
| "startup" // Tests that are absolutely required (and light) to even begin the other tests
| "ready" // Tests required before traffic can be sent our way
| "live" // Tests required to prevent from being restarted
| "stats" // Expensive diagnostics that should not often be ran, but can be system wide
| "stats-full"; // Expensive diagnostics that should not often be ran, and then probably for a single key

/** Detail requested from platform service self-diagnostics */
export enum TServiceDiagnosticDepth {
/** Minimal cost information that tests functioning service */
alive = "alive",
/** Statistics that have a little impact enough for periodic tracking into a time series */
stats_track = "stats_track",
/** Statistics that should be included when looking specifically at general statistics */
stats_basic = "stats_basic",
/** Statistics possibly expensive and large to calculate, for occasional debug operations */
stats_deep = "stats_deep",
}

const serviceDiagnosticDepthToTags: { [depth in TServiceDiagnosticDepth]: TDiagnosticTag[] } = {
[TServiceDiagnosticDepth.alive]: ["startup", "live", "ready"],
[TServiceDiagnosticDepth.stats_track]: ["ready", "stats"],
[TServiceDiagnosticDepth.stats_basic]: ["stats", "stats-full"],
[TServiceDiagnosticDepth.stats_deep]: ["stats-full"],
};

interface IDiagnosticsConfig {
// Diagnostic keys that should be considered ok without evaluation
skipKeys?: string[];
// This secret must be provided to the diagnostic endpoints as a query param
// therefor it's likely to leak, through logs etc, for ex. and should not
// relied on for security because disabling diagnostics. At worst this
// provides access to the DB statistics.
probeSecret?: string;
// Period at which to log TDiagnosticTag `stats`. 0 to disable.
statsLogPeriodMs: number;
// Period at which to log TDiagnosticTag `stats-full`. 0 to disable.
statsFullStatsLogPeriodMs: number;
}

export const getConfig = (): IDiagnosticsConfig => {
let configSection = config.get("diagnostics") as IDiagnosticsConfig;
if (typeof configSection.skipKeys === "string")
configSection = {
...configSection,
skipKeys: (configSection.skipKeys as string)
.trim()
.split(/[,\s]+/g)
.filter(x => !!x),
};

const getNumberFromConfig = (value): number => {
if (typeof value == "number") return value;
if (typeof value == "string") return parseInt(value, 10) ?? 0;
return 0;
};
return {
...configSection,
statsLogPeriodMs: getNumberFromConfig(configSection.statsLogPeriodMs),
statsFullStatsLogPeriodMs: getNumberFromConfig(configSection.statsFullStatsLogPeriodMs),
};
};

/** Code-wide unique key for each provider */
export type TDiagnosticKey = string;

/** Each provider should return an object of this format. The key of the provider defines the schema. */
export type TDiagnosticResult = { ok: boolean; warn?: string; empty?: boolean } & {
[key: string]: unknown;
};

/** Implemented by objects that want to provide data to the diagnostic check */
export interface IDiagnosticProvider {
/** Code-wide unique key underwhich the result of `get` will be included */
key: TDiagnosticKey;

/** This result is present in any included request tag */
tags: TDiagnosticTag[] | "*";

/**
* If set, this provider will be polled at that interval.
* If `undefined`, this provider will be ran at each request.
*/
pollPeriodMs?: number;

/**
* Returns an object as presented to a diagnostic requester.
* Warning: this could be public and readable to the internet.
*/
get(): Promise<TDiagnosticResult>;
}

/**
* Platform services that can provide generic diagnostic implementations may use this interface.
*
* Matching from {@link TDiagnosticTag} to {@link TServiceDiagnosticDepth} is expected to
* be done by intermediary providers. This is because not all services are equally critical,
* or have the same tolerable down times.
*/
export interface IServiceDiagnosticProvider {
/** The return format is specific to each service, but should include a `{ok: boolean}` field. */
getDiagnostics(depth: TServiceDiagnosticDepth): Promise<TDiagnosticResult>;
}

const isProviderIncludedInTag = (
tag: TDiagnosticTag,
provider: IDiagnosticProvider,
config: IDiagnosticsConfig,
) =>
(provider.tags === "*" || provider.tags.indexOf(tag) >= 0 || provider.tags.indexOf("*") >= 0) &&
(!config.skipKeys?.length || !config.skipKeys.includes(provider.key));

// registered providers with `pollPeriodMs === undefined`
const immediateDiagnosticProviders: IDiagnosticProvider[] = [];
// registered providers with `pollPeriodMs !== undefined`
const periodicDiagnosticProviders: IDiagnosticProvider[] = [];

const now = () => Math.round(process.uptime() * 1000);

const isKeyAlreadyRegistered = (key: TDiagnosticKey) =>
immediateDiagnosticProviders.some(provider => key == provider.key) ||
periodicDiagnosticProviders.some(provider => key == provider.key);

// stores results of all the `pollPeriodMs` truthy providers
const latestPeriodicDiagnostics: { [key: TDiagnosticKey]: object } = {};
const recordDiagnostic = (startMs: number, key: TDiagnosticKey, data?: object, error?: object) =>
(latestPeriodicDiagnostics[key] = {
durationMs: Math.round(now() - startMs),
...(error ? { ok: false, error } : { ...data }),
});

const runProvider = async (provider, log) => {
const startMs = now();
try {
const result = await provider.get();
if (!result.ok)
logger.error(
{ diagnostic: provider.key, ...result },
"Got diagnostic provider result with ok=false",
);
else if (result.warn)
logger.warn(
{ diagnostic: provider.key, ...result },
"Got diagnostic provider result with ok=true but a warning",
);
else if (log && !result.empty)
logger.info({ diagnostic: provider.key, ...result }, "Diagnostic provider result");
return recordDiagnostic(startMs, provider.key, result);
} catch (err) {
logger.error({ err, provider: provider.key }, "Failed to read diagnostic provider");
return recordDiagnostic(startMs, provider.key, undefined, err);
}
};

const pendingTimeouts: number[] = []; // Pending return values from `setTimeout` calls
const forgetPendingTimeout = (timeoutId: number) => {
const index = pendingTimeouts.indexOf(timeoutId);
assert(index >= 0);
pendingTimeouts.splice(index, 1);
};

let hasShutdown = false;
const ensureHasntShutdown = () => {
if (hasShutdown) throw new Error("Diagnostics service already shutdown");
};

export default {
/** Add a provider to be included in diagnostics output */
registerProviders(...providers: IDiagnosticProvider[]) {
ensureHasntShutdown();
providers.forEach(provider => {
if (isKeyAlreadyRegistered(provider.key)) throw new Error("Provider with duplicate key");
if (provider.pollPeriodMs) {
periodicDiagnosticProviders.push(provider);
} else {
immediateDiagnosticProviders.push(provider);
return;
}
let triggerUpdate: () => void = () => undefined; // The empty function is for the linter. I love you linter <3
const updateProvider = (timeoutId: number) => async () => {
forgetPendingTimeout(timeoutId);
await runProvider(provider, false);
triggerUpdate();
};
triggerUpdate = () => pendingTimeouts.push(setTimeout(updateProvider, provider.pollPeriodMs));
triggerUpdate();
});
},

/** Create providers to match from {@link IServiceDiagnosticProvider} to multiple {@link IDiagnosticProvider}s */
registerServiceProviders(
name: string,
getService: () => IServiceDiagnosticProvider,
overrideTags: Partial<
typeof serviceDiagnosticDepthToTags | { [key in TServiceDiagnosticDepth]: false }
> = {},
) {
this.registerProviders(
...Object.values(TServiceDiagnosticDepth)
.map(depth => {
const defaultTags = serviceDiagnosticDepthToTags[depth];
if (!defaultTags) throw new Error(`Unknown depth ${JSON.stringify(depth)}`);
const tags = overrideTags[depth] ?? defaultTags;
if (tags === false) return null;
return {
key: `${name}-${depth}`,
tags,
get: () => getService().getDiagnostics(depth),
};
})
.filter(x => !!x),
);
},

/** Cancel all pending diagnostic updates */
shutdown() {
ensureHasntShutdown();
pendingTimeouts.forEach(timeout => clearTimeout(timeout));
hasShutdown = true;
},

/**
* Return the values of all providers which include the provided tag.
*
* @param log if `true`, print each individual log output even if succesful
*/
async get(
tag: TDiagnosticTag,
log: boolean,
): Promise<{ ok: boolean } | { [key: TDiagnosticKey]: TDiagnosticResult }> {
const config = getConfig();
const result = { ok: true };
let atLeastOneCheck = false;
periodicDiagnosticProviders.forEach(provider => {
if (!isProviderIncludedInTag(tag, provider, config)) return;
atLeastOneCheck = true;
result[provider.key] = latestPeriodicDiagnostics[provider.key];
if (!result[provider.key].ok) result.ok = false;
});
await Promise.all(
immediateDiagnosticProviders.map(async provider => {
if (!isProviderIncludedInTag(tag, provider, config)) return;
atLeastOneCheck = true;
const providerResult = await runProvider(provider, log);
if (!providerResult.ok) result.ok = false;
return (result[provider.key] = providerResult);
}),
);
if (!atLeastOneCheck) result.ok = false;
return result;
},
};
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { TdriveServiceProvider } from "../../framework";
import type { IServiceDiagnosticProvider } from "../../framework/api/diagnostics";
import { Connector } from "./services/orm/connectors";
import Manager from "./services/orm/manager";
import Repository from "./services/orm/repository/repository";
import { EntityTarget } from "./services/orm/types";

export interface DatabaseServiceAPI extends TdriveServiceProvider {
export interface DatabaseServiceAPI extends TdriveServiceProvider, IServiceDiagnosticProvider {
/**
* Get the database connector
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import { MongoConnectionOptions } from "./orm/connectors/mongodb/mongodb";
import { EntityTarget } from "./orm/types";
import { RepositoryManager } from "./orm/repository/manager";
import { PostgresConnectionOptions } from "./orm/connectors/postgres/postgres";
import type {
TDiagnosticResult,
TServiceDiagnosticDepth,
} from "../../../framework/api/diagnostics";

export default class DatabaseService implements DatabaseServiceAPI {
version = "1";
Expand Down Expand Up @@ -38,6 +42,15 @@ export default class DatabaseService implements DatabaseServiceAPI {
}
}

async getDiagnostics(depth: TServiceDiagnosticDepth): Promise<TDiagnosticResult> {
const connector = this.getConnector();
const result = await connector.getDiagnostics(depth);
return {
type: connector.getType(),
...result,
};
}

getManager(): Manager<unknown> {
return new Manager<unknown>(this.connector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ import { ConnectionOptions, DatabaseType } from "../..";
import { FindOptions } from "../repository/repository";
import { ColumnDefinition, EntityDefinition } from "../types";
import { ListResult, Paginable, Pagination } from "../../../../../framework/api/crud-service";
import type {
TDiagnosticResult,
TServiceDiagnosticDepth,
} from "../../../../../framework/api/diagnostics";

export abstract class AbstractConnector<T extends ConnectionOptions> implements Connector {
constructor(protected type: DatabaseType, protected options: T, protected secret: string) {}

abstract connect(): Promise<this>;
abstract disconnect(): Promise<this>;
abstract getDiagnostics(depth: TServiceDiagnosticDepth): Promise<TDiagnosticResult>;

abstract drop(): Promise<this>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { ColumnDefinition, EntityDefinition } from "../types";
import { FindOptions } from "../repository/repository";
import { ListResult, Paginable, Pagination } from "../../../../../framework/api/crud-service";
import { PostgresConnectionOptions } from "./postgres/postgres";
import type { IServiceDiagnosticProvider } from "../../../../../framework/api/diagnostics";

export * from "./mongodb/mongodb";

Expand All @@ -14,7 +15,7 @@ export type UpsertOptions = {

export type RemoveOptions = any;

export interface Connector extends Initializable {
export interface Connector extends Initializable, IServiceDiagnosticProvider {
/**
* Connect to the database
*/
Expand Down
Loading
Loading