Skip to content

Commit

Permalink
websocket improvements and standalone mode (#14)
Browse files Browse the repository at this point in the history
* removed isomorphic-ws as this is a clear nodejs app

* adding websocket heartbeat

* report heartbeat errors

* report quota only in DP mode

* added changesets

* fixed time comments
  • Loading branch information
esterlus authored Jul 11, 2024
1 parent 23748fc commit 1fc0027
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 53 deletions.
5 changes: 5 additions & 0 deletions .changeset/fast-donkeys-notice.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'uhttp-exit-app': minor
---

Enable exit application startup without connecting to discovery platform
5 changes: 5 additions & 0 deletions .changeset/odd-eagles-wave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'uhttp-exit-app': patch
---

Implemented client side heartbeat to determine websocket disconnects
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"@types/debug": "^4.1.12",
"@types/jest": "^29.5.12",
"@types/node": "^20.12.12",
"@types/ws": "^8.5.10",
"@typescript-eslint/eslint-plugin": "^7.11.0",
"@typescript-eslint/parser": "^7.11.0",
"eslint": "8",
Expand All @@ -34,8 +35,7 @@
"dependencies": {
"@hoprnet/uhttp-lib": "^3.0.2",
"debug": "^4.3.4",
"isomorphic-ws": "^5.0.0",
"sqlite3": "^5.1.7",
"ws": "^8.17.0"
"ws": "^8.18.0"
}
}
159 changes: 109 additions & 50 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import WS from 'isomorphic-ws';
import WebSocket from 'ws';
import {
DpApi,
EndpointApi,
Expand All @@ -17,30 +17,42 @@ import log from './logger';
import * as RequestStore from './request-store';
import Version from './version';

const SocketReconnectTimeout = 1e3; // 1sek
const RequestPurgeTimeout = 60e3; // 60sek
const ValidCounterPeriod = 1e3 * 60 * 60; // 1hour
// WebSocket heartbeats
const HeartBeatInterval = 30e3; // 30 sec
// Hoprd nodes version to be considered as valid relays
const RelayNodesCompatVersions = ['2.1'];
// Removing segments from incomplete requests after this grace period
const RequestPurgeTimeout = 60e3; // 60 sec
// base interval for checking relays
const SetupRelayPeriod = 1e3 * 60 * 15; // 15 min
// reconnect timeout for the websocket after closure
const SocketReconnectTimeout = 3e3; // 3 sec
// Time period in which counters for crypto are considered valid
const ValidCounterPeriod = 1e3 * 60 * 60; // 1hour

type State = {
socket?: WS.WebSocket;
socket?: WebSocket;
privateKey: Uint8Array;
publicKey: Uint8Array;
peerId: string;
cache: SegmentCache.Cache;
deleteTimer: Map<string, ReturnType<typeof setTimeout>>; // deletion timer of requests in segment cache
requestStore: RequestStore.RequestStore;
relays: string[];
heartbeatInterval?: ReturnType<typeof setInterval>;
};

type OpsDP = {
endpoint: string;
nodeAccessToken: string;
};

type Ops = {
privateKey: string;
publicKey: string;
apiEndpoint: URL;
accessToken: string;
discoveryPlatformEndpoint: string;
nodeAccessToken: string;
discoveryPlatform?: OpsDP;
dbFile: string;
};

Expand Down Expand Up @@ -87,11 +99,13 @@ async function setup(ops: Ops): Promise<State> {
const cache = SegmentCache.init();
const deleteTimer = new Map();

const logOpts = {
const logOpts: Record<string, string> = {
publicKey: ops.publicKey,
apiEndpoint: ops.apiEndpoint,
discoveryPlatformEndpoint: ops.discoveryPlatformEndpoint,
apiEndpoint: ops.apiEndpoint.href,
};
if (ops.discoveryPlatform) {
logOpts.discoveryPlatformEndpoint = ops.discoveryPlatform.endpoint;
}
log.info('%s started with %o', ExitNode.prettyPrint(peerId, Version, Date.now(), []), logOpts);

return {
Expand All @@ -106,7 +120,7 @@ async function setup(ops: Ops): Promise<State> {
}

function setupSocket(state: State, ops: Ops) {
const socket = NodeApi.connectWS(ops);
const socket = connectWS(ops);
if (!socket) {
log.error('error opening websocket');
process.exit(3);
Expand All @@ -116,18 +130,27 @@ function setupSocket(state: State, ops: Ops) {

socket.on('error', (err: Error) => {
log.error('error on socket: %o', err);
socket.onmessage = false;
socket.onmessage = null;
socket.close();
});

socket.on('close', (evt: WS.CloseEvent) => {
socket.on('close', (evt: WebSocket.CloseEvent) => {
log.warn('closing socket %o - attempting reconnect', evt);
clearInterval(state.heartbeatInterval);
// attempt reconnect
setTimeout(() => setupSocket(state, ops), SocketReconnectTimeout);
});

socket.on('open', () => {
log.verbose('opened websocket listener');
clearInterval(state.heartbeatInterval);
state.heartbeatInterval = setInterval(() => {
socket.ping((err?: Error) => {
if (err) {
log.error('error on ping: %o', err);
}
});
}, HeartBeatInterval);
});

state.socket = socket;
Expand All @@ -147,7 +170,7 @@ function removeExpired(state: State) {
}

function scheduleRemoveExpired(state: State) {
// schdule next run somehwere between 1h and 1h and 10m
// schedule next run somehwere between 1h and 1h and 10m
const next = ValidCounterPeriod + Math.floor(Math.random() * 10 * 60e3);
const logH = Math.floor(next / 1000 / 60 / 60);
const logM = Math.round(next / 1000 / 60) - logH * 60;
Expand Down Expand Up @@ -201,7 +224,7 @@ function scheduleSetupRelays(state: State, ops: Ops) {
}

function onMessage(state: State, ops: Ops) {
return function (evt: WS.MessageEvent) {
return function (evt: WebSocket.MessageEvent) {
const recvAt = performance.now();
const raw = evt.data.toString();
const msg = JSON.parse(raw) as Msg;
Expand Down Expand Up @@ -499,35 +522,57 @@ function sendResponse(
});
});

// inform DP non blocking
setTimeout(() => {
const lastReqSeg = cacheEntry.segments.get(cacheEntry.count - 1) as Segment.Segment;
const rpcMethod = determineRPCmethod(reqPayload.body);
const quotaRequest: DpApi.QuotaParams = {
clientId: reqPayload.clientId,
rpcMethod,
segmentCount: cacheEntry.count,
lastSegmentLength: lastReqSeg.body.length,
chainId: reqPayload.chainId,
type: 'request',
};
if (ops.discoveryPlatform) {
reportToDiscoveryPlatform({
cacheEntry,
opsDP: ops.discoveryPlatform,
reqPayload,
segments,
});
}
}

const lastRespSeg = segments[segments.length - 1];
const quotaResponse: DpApi.QuotaParams = {
clientId: reqPayload.clientId,
rpcMethod,
segmentCount: segments.length,
lastSegmentLength: lastRespSeg.body.length,
chainId: reqPayload.chainId,
type: 'response',
};
async function reportToDiscoveryPlatform({
cacheEntry,
opsDP,
reqPayload,
segments,
}: {
cacheEntry: SegmentCache.Entry;
opsDP: OpsDP;
reqPayload: Payload.ReqPayload;
segments: Segment.Segment[];
}) {
const lastReqSeg = cacheEntry.segments.get(cacheEntry.count - 1) as Segment.Segment;
const rpcMethod = determineRPCmethod(reqPayload.body);
const quotaRequest: DpApi.QuotaParams = {
clientId: reqPayload.clientId,
rpcMethod,
segmentCount: cacheEntry.count,
lastSegmentLength: lastReqSeg.body.length,
chainId: reqPayload.chainId,
type: 'request',
};

DpApi.postQuota(ops, quotaRequest).catch((err) => {
log.error('error recording request quota: %o', err);
});
DpApi.postQuota(ops, quotaResponse).catch((err) => {
log.error('error recording response quota: %o', err);
});
const lastRespSeg = segments[segments.length - 1];
const quotaResponse: DpApi.QuotaParams = {
clientId: reqPayload.clientId,
rpcMethod,
segmentCount: segments.length,
lastSegmentLength: lastRespSeg.body.length,
chainId: reqPayload.chainId,
type: 'response',
};

const conn = {
discoveryPlatformEndpoint: opsDP.endpoint,
nodeAccessToken: opsDP.nodeAccessToken,
};
DpApi.postQuota(conn, quotaRequest).catch((err) => {
log.error('error recording request quota: %o', err);
});
DpApi.postQuota(conn, quotaResponse).catch((err) => {
log.error('error recording response quota: %o', err);
});
}

Expand All @@ -547,6 +592,13 @@ function determineRPCmethod(body?: string) {
}
}

function connectWS({ apiEndpoint, accessToken }: Ops): WebSocket {
const wsURL = new URL('/api/v3/messages/websocket', apiEndpoint);
wsURL.protocol = apiEndpoint.protocol === 'https:' ? 'wss:' : 'ws:';
wsURL.search = `?apiToken=${accessToken}`;
return new WebSocket(wsURL);
}

// if this file is the entrypoint of the nodejs process
if (require.main === module) {
if (!process.env.UHTTP_EA_PRIVATE_KEY) {
Expand All @@ -561,23 +613,30 @@ if (require.main === module) {
if (!process.env.UHTTP_EA_HOPRD_ACCESS_TOKEN) {
throw new Error("Missing 'UHTTP_EA_HOPRD_ACCESS_TOKEN' env var.");
}
if (!process.env.UHTTP_EA_DISCOVERY_PLATFORM_ENDPOINT) {
throw new Error("Missing 'UHTTP_EA_DISCOVERY_PLATFORM_ENDPOINT' env var.");
}
if (!process.env.UHTTP_EA_DISCOVERY_PLATFORM_ACCESS_TOKEN) {
throw new Error("Missing 'UHTTP_EA_DISCOVERY_PLATFORM_ACCESS_TOKEN' env var.");
}
if (!process.env.UHTTP_EA_DATABASE_FILE) {
throw new Error("Missing 'UHTTP_EA_DATABASE_FILE' env var.");
}

const dpEndpoint = process.env.UHTTP_EA_DISCOVERY_PLATFORM_ENDPOINT;
let discoveryPlatform;
if (dpEndpoint) {
if (!process.env.UHTTP_EA_DISCOVERY_PLATFORM_ACCESS_TOKEN) {
throw new Error(
"Missing 'UHTTP_EA_DISCOVERY_PLATFORM_ACCESS_TOKEN' env var alongside provided UHTTP_EA_DISCOVERY_PLATFORM_ENDPOINT.",
);
}
discoveryPlatform = {
endpoint: dpEndpoint,
nodeAccessToken: process.env.UHTTP_EA_DISCOVERY_PLATFORM_ACCESS_TOKEN,
};
}

start({
privateKey: process.env.UHTTP_EA_PRIVATE_KEY,
publicKey: process.env.UHTTP_EA_PUBLIC_KEY,
apiEndpoint: new URL(process.env.UHTTP_EA_HOPRD_ENDPOINT),
accessToken: process.env.UHTTP_EA_HOPRD_ACCESS_TOKEN,
discoveryPlatformEndpoint: process.env.UHTTP_EA_DISCOVERY_PLATFORM_ENDPOINT,
nodeAccessToken: process.env.UHTTP_EA_DISCOVERY_PLATFORM_ACCESS_TOKEN,
discoveryPlatform,
dbFile: process.env.UHTTP_EA_DATABASE_FILE,
});
}
14 changes: 13 additions & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,13 @@
resolved "https://registry.yarnpkg.com/@types/stack-utils/-/stack-utils-2.0.3.tgz#6209321eb2c1712a7e7466422b8cb1fc0d9dd5d8"
integrity sha512-9aEbYZ3TbYMznPdcdr3SmIrLXwC/AKZXQeCf9Pgao5CKb8CyHuEX5jzWPTkvregvhRJHcpRO6BFoGW9ycaOkYw==

"@types/ws@^8.5.10":
version "8.5.10"
resolved "https://registry.yarnpkg.com/@types/ws/-/ws-8.5.10.tgz#4acfb517970853fa6574a3a6886791d04a396787"
integrity sha512-vmQSUcfalpIq0R9q7uTo2lXs6eGIpt9wtnLdMv9LVpIjCA/+ufZRozlVoVelIYixx1ugCBKDhn89vnsEGOCx9A==
dependencies:
"@types/node" "*"

"@types/yargs-parser@*":
version "21.0.3"
resolved "https://registry.yarnpkg.com/@types/yargs-parser/-/yargs-parser-21.0.3.tgz#815e30b786d2e8f0dcd85fd5bcf5e1a04d008f15"
Expand Down Expand Up @@ -5142,11 +5149,16 @@ write-file-atomic@^4.0.2:
imurmurhash "^0.1.4"
signal-exit "^3.0.7"

ws@^8.16.0, ws@^8.17.0:
ws@^8.16.0:
version "8.17.0"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.17.0.tgz#d145d18eca2ed25aaf791a183903f7be5e295fea"
integrity sha512-uJq6108EgZMAl20KagGkzCKfMEjxmKvZHG7Tlq0Z6nOky7YF7aq4mOx6xK8TJ/i1LeK4Qus7INktacctDgY8Ow==

ws@^8.18.0:
version "8.18.0"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.18.0.tgz#0d7505a6eafe2b0e712d232b42279f53bc289bbc"
integrity sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw==

y18n@^4.0.0:
version "4.0.3"
resolved "https://registry.yarnpkg.com/y18n/-/y18n-4.0.3.tgz#b5f259c82cd6e336921efd7bfd8bf560de9eeedf"
Expand Down

0 comments on commit 1fc0027

Please sign in to comment.