From 1fc0027237af83c3f4813d2842858e76d25673d8 Mon Sep 17 00:00:00 2001 From: Ronny Esterluss Date: Thu, 11 Jul 2024 09:11:40 +0200 Subject: [PATCH] websocket improvements and standalone mode (#14) * removed isomorphic-ws as this is a clear nodejs app * adding websocket heartbeat * report heartbeat errors * report quota only in DP mode * added changesets * fixed time comments --- .changeset/fast-donkeys-notice.md | 5 + .changeset/odd-eagles-wave.md | 5 + package.json | 4 +- src/index.ts | 159 ++++++++++++++++++++---------- yarn.lock | 14 ++- 5 files changed, 134 insertions(+), 53 deletions(-) create mode 100644 .changeset/fast-donkeys-notice.md create mode 100644 .changeset/odd-eagles-wave.md diff --git a/.changeset/fast-donkeys-notice.md b/.changeset/fast-donkeys-notice.md new file mode 100644 index 0000000..a5f146b --- /dev/null +++ b/.changeset/fast-donkeys-notice.md @@ -0,0 +1,5 @@ +--- +'uhttp-exit-app': minor +--- + +Enable exit application startup without connecting to discovery platform diff --git a/.changeset/odd-eagles-wave.md b/.changeset/odd-eagles-wave.md new file mode 100644 index 0000000..fd04d99 --- /dev/null +++ b/.changeset/odd-eagles-wave.md @@ -0,0 +1,5 @@ +--- +'uhttp-exit-app': patch +--- + +Implemented client side heartbeat to determine websocket disconnects diff --git a/package.json b/package.json index 214da0e..5827c65 100644 --- a/package.json +++ b/package.json @@ -21,6 +21,7 @@ "@types/debug": "^4.1.12", "@types/jest": "^29.5.12", "@types/node": "^20.12.12", + "@types/ws": "^8.5.10", "@typescript-eslint/eslint-plugin": "^7.11.0", "@typescript-eslint/parser": "^7.11.0", "eslint": "8", @@ -34,8 +35,7 @@ "dependencies": { "@hoprnet/uhttp-lib": "^3.0.2", "debug": "^4.3.4", - "isomorphic-ws": "^5.0.0", "sqlite3": "^5.1.7", - "ws": "^8.17.0" + "ws": "^8.18.0" } } diff --git a/src/index.ts b/src/index.ts index b536fe0..0a6740a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,4 @@ -import WS from 'isomorphic-ws'; +import WebSocket from 'ws'; import { DpApi, EndpointApi, @@ -17,14 +17,21 @@ import log from './logger'; import * as RequestStore from './request-store'; import Version from './version'; -const SocketReconnectTimeout = 1e3; // 1sek -const RequestPurgeTimeout = 60e3; // 60sek -const ValidCounterPeriod = 1e3 * 60 * 60; // 1hour +// WebSocket heartbeats +const HeartBeatInterval = 30e3; // 30 sec +// Hoprd nodes version to be considered as valid relays const RelayNodesCompatVersions = ['2.1']; +// Removing segments from incomplete requests after this grace period +const RequestPurgeTimeout = 60e3; // 60 sec +// base interval for checking relays const SetupRelayPeriod = 1e3 * 60 * 15; // 15 min +// reconnect timeout for the websocket after closure +const SocketReconnectTimeout = 3e3; // 3 sec +// Time period in which counters for crypto are considered valid +const ValidCounterPeriod = 1e3 * 60 * 60; // 1hour type State = { - socket?: WS.WebSocket; + socket?: WebSocket; privateKey: Uint8Array; publicKey: Uint8Array; peerId: string; @@ -32,6 +39,12 @@ type State = { deleteTimer: Map>; // deletion timer of requests in segment cache requestStore: RequestStore.RequestStore; relays: string[]; + heartbeatInterval?: ReturnType; +}; + +type OpsDP = { + endpoint: string; + nodeAccessToken: string; }; type Ops = { @@ -39,8 +52,7 @@ type Ops = { publicKey: string; apiEndpoint: URL; accessToken: string; - discoveryPlatformEndpoint: string; - nodeAccessToken: string; + discoveryPlatform?: OpsDP; dbFile: string; }; @@ -87,11 +99,13 @@ async function setup(ops: Ops): Promise { const cache = SegmentCache.init(); const deleteTimer = new Map(); - const logOpts = { + const logOpts: Record = { publicKey: ops.publicKey, - apiEndpoint: ops.apiEndpoint, - discoveryPlatformEndpoint: ops.discoveryPlatformEndpoint, + apiEndpoint: ops.apiEndpoint.href, }; + if (ops.discoveryPlatform) { + logOpts.discoveryPlatformEndpoint = ops.discoveryPlatform.endpoint; + } log.info('%s started with %o', ExitNode.prettyPrint(peerId, Version, Date.now(), []), logOpts); return { @@ -106,7 +120,7 @@ async function setup(ops: Ops): Promise { } function setupSocket(state: State, ops: Ops) { - const socket = NodeApi.connectWS(ops); + const socket = connectWS(ops); if (!socket) { log.error('error opening websocket'); process.exit(3); @@ -116,18 +130,27 @@ function setupSocket(state: State, ops: Ops) { socket.on('error', (err: Error) => { log.error('error on socket: %o', err); - socket.onmessage = false; + socket.onmessage = null; socket.close(); }); - socket.on('close', (evt: WS.CloseEvent) => { + socket.on('close', (evt: WebSocket.CloseEvent) => { log.warn('closing socket %o - attempting reconnect', evt); + clearInterval(state.heartbeatInterval); // attempt reconnect setTimeout(() => setupSocket(state, ops), SocketReconnectTimeout); }); socket.on('open', () => { log.verbose('opened websocket listener'); + clearInterval(state.heartbeatInterval); + state.heartbeatInterval = setInterval(() => { + socket.ping((err?: Error) => { + if (err) { + log.error('error on ping: %o', err); + } + }); + }, HeartBeatInterval); }); state.socket = socket; @@ -147,7 +170,7 @@ function removeExpired(state: State) { } function scheduleRemoveExpired(state: State) { - // schdule next run somehwere between 1h and 1h and 10m + // schedule next run somehwere between 1h and 1h and 10m const next = ValidCounterPeriod + Math.floor(Math.random() * 10 * 60e3); const logH = Math.floor(next / 1000 / 60 / 60); const logM = Math.round(next / 1000 / 60) - logH * 60; @@ -201,7 +224,7 @@ function scheduleSetupRelays(state: State, ops: Ops) { } function onMessage(state: State, ops: Ops) { - return function (evt: WS.MessageEvent) { + return function (evt: WebSocket.MessageEvent) { const recvAt = performance.now(); const raw = evt.data.toString(); const msg = JSON.parse(raw) as Msg; @@ -499,35 +522,57 @@ function sendResponse( }); }); - // inform DP non blocking - setTimeout(() => { - const lastReqSeg = cacheEntry.segments.get(cacheEntry.count - 1) as Segment.Segment; - const rpcMethod = determineRPCmethod(reqPayload.body); - const quotaRequest: DpApi.QuotaParams = { - clientId: reqPayload.clientId, - rpcMethod, - segmentCount: cacheEntry.count, - lastSegmentLength: lastReqSeg.body.length, - chainId: reqPayload.chainId, - type: 'request', - }; + if (ops.discoveryPlatform) { + reportToDiscoveryPlatform({ + cacheEntry, + opsDP: ops.discoveryPlatform, + reqPayload, + segments, + }); + } +} - const lastRespSeg = segments[segments.length - 1]; - const quotaResponse: DpApi.QuotaParams = { - clientId: reqPayload.clientId, - rpcMethod, - segmentCount: segments.length, - lastSegmentLength: lastRespSeg.body.length, - chainId: reqPayload.chainId, - type: 'response', - }; +async function reportToDiscoveryPlatform({ + cacheEntry, + opsDP, + reqPayload, + segments, +}: { + cacheEntry: SegmentCache.Entry; + opsDP: OpsDP; + reqPayload: Payload.ReqPayload; + segments: Segment.Segment[]; +}) { + const lastReqSeg = cacheEntry.segments.get(cacheEntry.count - 1) as Segment.Segment; + const rpcMethod = determineRPCmethod(reqPayload.body); + const quotaRequest: DpApi.QuotaParams = { + clientId: reqPayload.clientId, + rpcMethod, + segmentCount: cacheEntry.count, + lastSegmentLength: lastReqSeg.body.length, + chainId: reqPayload.chainId, + type: 'request', + }; - DpApi.postQuota(ops, quotaRequest).catch((err) => { - log.error('error recording request quota: %o', err); - }); - DpApi.postQuota(ops, quotaResponse).catch((err) => { - log.error('error recording response quota: %o', err); - }); + const lastRespSeg = segments[segments.length - 1]; + const quotaResponse: DpApi.QuotaParams = { + clientId: reqPayload.clientId, + rpcMethod, + segmentCount: segments.length, + lastSegmentLength: lastRespSeg.body.length, + chainId: reqPayload.chainId, + type: 'response', + }; + + const conn = { + discoveryPlatformEndpoint: opsDP.endpoint, + nodeAccessToken: opsDP.nodeAccessToken, + }; + DpApi.postQuota(conn, quotaRequest).catch((err) => { + log.error('error recording request quota: %o', err); + }); + DpApi.postQuota(conn, quotaResponse).catch((err) => { + log.error('error recording response quota: %o', err); }); } @@ -547,6 +592,13 @@ function determineRPCmethod(body?: string) { } } +function connectWS({ apiEndpoint, accessToken }: Ops): WebSocket { + const wsURL = new URL('/api/v3/messages/websocket', apiEndpoint); + wsURL.protocol = apiEndpoint.protocol === 'https:' ? 'wss:' : 'ws:'; + wsURL.search = `?apiToken=${accessToken}`; + return new WebSocket(wsURL); +} + // if this file is the entrypoint of the nodejs process if (require.main === module) { if (!process.env.UHTTP_EA_PRIVATE_KEY) { @@ -561,23 +613,30 @@ if (require.main === module) { if (!process.env.UHTTP_EA_HOPRD_ACCESS_TOKEN) { throw new Error("Missing 'UHTTP_EA_HOPRD_ACCESS_TOKEN' env var."); } - if (!process.env.UHTTP_EA_DISCOVERY_PLATFORM_ENDPOINT) { - throw new Error("Missing 'UHTTP_EA_DISCOVERY_PLATFORM_ENDPOINT' env var."); - } - if (!process.env.UHTTP_EA_DISCOVERY_PLATFORM_ACCESS_TOKEN) { - throw new Error("Missing 'UHTTP_EA_DISCOVERY_PLATFORM_ACCESS_TOKEN' env var."); - } if (!process.env.UHTTP_EA_DATABASE_FILE) { throw new Error("Missing 'UHTTP_EA_DATABASE_FILE' env var."); } + const dpEndpoint = process.env.UHTTP_EA_DISCOVERY_PLATFORM_ENDPOINT; + let discoveryPlatform; + if (dpEndpoint) { + if (!process.env.UHTTP_EA_DISCOVERY_PLATFORM_ACCESS_TOKEN) { + throw new Error( + "Missing 'UHTTP_EA_DISCOVERY_PLATFORM_ACCESS_TOKEN' env var alongside provided UHTTP_EA_DISCOVERY_PLATFORM_ENDPOINT.", + ); + } + discoveryPlatform = { + endpoint: dpEndpoint, + nodeAccessToken: process.env.UHTTP_EA_DISCOVERY_PLATFORM_ACCESS_TOKEN, + }; + } + start({ privateKey: process.env.UHTTP_EA_PRIVATE_KEY, publicKey: process.env.UHTTP_EA_PUBLIC_KEY, apiEndpoint: new URL(process.env.UHTTP_EA_HOPRD_ENDPOINT), accessToken: process.env.UHTTP_EA_HOPRD_ACCESS_TOKEN, - discoveryPlatformEndpoint: process.env.UHTTP_EA_DISCOVERY_PLATFORM_ENDPOINT, - nodeAccessToken: process.env.UHTTP_EA_DISCOVERY_PLATFORM_ACCESS_TOKEN, + discoveryPlatform, dbFile: process.env.UHTTP_EA_DATABASE_FILE, }); } diff --git a/yarn.lock b/yarn.lock index 6e6f4ea..092206a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1155,6 +1155,13 @@ resolved "https://registry.yarnpkg.com/@types/stack-utils/-/stack-utils-2.0.3.tgz#6209321eb2c1712a7e7466422b8cb1fc0d9dd5d8" integrity sha512-9aEbYZ3TbYMznPdcdr3SmIrLXwC/AKZXQeCf9Pgao5CKb8CyHuEX5jzWPTkvregvhRJHcpRO6BFoGW9ycaOkYw== +"@types/ws@^8.5.10": + version "8.5.10" + resolved "https://registry.yarnpkg.com/@types/ws/-/ws-8.5.10.tgz#4acfb517970853fa6574a3a6886791d04a396787" + integrity sha512-vmQSUcfalpIq0R9q7uTo2lXs6eGIpt9wtnLdMv9LVpIjCA/+ufZRozlVoVelIYixx1ugCBKDhn89vnsEGOCx9A== + dependencies: + "@types/node" "*" + "@types/yargs-parser@*": version "21.0.3" resolved "https://registry.yarnpkg.com/@types/yargs-parser/-/yargs-parser-21.0.3.tgz#815e30b786d2e8f0dcd85fd5bcf5e1a04d008f15" @@ -5142,11 +5149,16 @@ write-file-atomic@^4.0.2: imurmurhash "^0.1.4" signal-exit "^3.0.7" -ws@^8.16.0, ws@^8.17.0: +ws@^8.16.0: version "8.17.0" resolved "https://registry.yarnpkg.com/ws/-/ws-8.17.0.tgz#d145d18eca2ed25aaf791a183903f7be5e295fea" integrity sha512-uJq6108EgZMAl20KagGkzCKfMEjxmKvZHG7Tlq0Z6nOky7YF7aq4mOx6xK8TJ/i1LeK4Qus7INktacctDgY8Ow== +ws@^8.18.0: + version "8.18.0" + resolved "https://registry.yarnpkg.com/ws/-/ws-8.18.0.tgz#0d7505a6eafe2b0e712d232b42279f53bc289bbc" + integrity sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw== + y18n@^4.0.0: version "4.0.3" resolved "https://registry.yarnpkg.com/y18n/-/y18n-4.0.3.tgz#b5f259c82cd6e336921efd7bfd8bf560de9eeedf"