From 32f7d3c1c0b566aadd7d4b01b2d51320976d8c62 Mon Sep 17 00:00:00 2001 From: David White Date: Fri, 28 Jun 2024 11:20:13 +0100 Subject: [PATCH 01/16] First commit (no tests) --- ZelBack/src/services/appsService.js | 9 +- .../src/services/enterpriseNodesService.js | 4 +- ZelBack/src/services/fluxCommunication.js | 331 ++++++++++-------- .../fluxCommunicationMessagesSender.js | 10 + .../src/services/fluxCommunicationUtils.js | 226 ------------ ZelBack/src/services/fluxNetworkHelper.js | 14 +- ZelBack/src/services/networkStateService.js | 176 ++++++++++ ZelBack/src/services/serviceManager.js | 6 +- .../src/services/utils/networkStateManager.js | 191 ++++++++++ 9 files changed, 579 insertions(+), 388 deletions(-) delete mode 100644 ZelBack/src/services/fluxCommunicationUtils.js create mode 100644 ZelBack/src/services/networkStateService.js create mode 100644 ZelBack/src/services/utils/networkStateManager.js diff --git a/ZelBack/src/services/appsService.js b/ZelBack/src/services/appsService.js index 17320ca3b..1eeb85843 100644 --- a/ZelBack/src/services/appsService.js +++ b/ZelBack/src/services/appsService.js @@ -11817,8 +11817,7 @@ async function checkMyAppsAvailability() { throw err.message; }); await serviceHelper.delay(10 * 1000); - // eslint-disable-next-line no-await-in-loop - let askingIP = await fluxNetworkHelper.getRandomConnection(); + let askingIP = fluxNetworkHelper.getRandomConnection(); if (!askingIP) { checkMyAppsAvailability(); return; @@ -11993,11 +11992,9 @@ async function checkInstallingAppPortAvailable(portsToTest = []) { }); } await serviceHelper.delay(10 * 1000); - // eslint-disable-next-line no-await-in-loop - let askingIP = await fluxNetworkHelper.getRandomConnection(); + let askingIP = fluxNetworkHelper.getRandomConnection(); while (!askingIP || askingIP.split(':')[0] === myIP) { - // eslint-disable-next-line no-await-in-loop - askingIP = await fluxNetworkHelper.getRandomConnection(); + askingIP = fluxNetworkHelper.getRandomConnection(); } let askingIpPort = config.server.apiport; if (askingIP.includes(':')) { // has port specification diff --git a/ZelBack/src/services/enterpriseNodesService.js b/ZelBack/src/services/enterpriseNodesService.js index 83a0c55ae..d3d9545d6 100644 --- a/ZelBack/src/services/enterpriseNodesService.js +++ b/ZelBack/src/services/enterpriseNodesService.js @@ -1,5 +1,5 @@ const config = require('config'); -const fluxCommunicationUtils = require('./fluxCommunicationUtils'); +const networkStateService = require('./networkStateService'); const messageHelper = require('./messageHelper'); const dbHelper = require('./dbHelper'); const log = require('../lib/log'); @@ -12,7 +12,7 @@ const globalAppsInformation = config.database.appsglobal.collections.appsInforma */ async function getEnterpriseList() { try { - const nodeList = await fluxCommunicationUtils.deterministicFluxList(); + const nodeList = networkStateService.networkState(); const enterpriseList = []; // txhash, outidx, pubkey, score, ip, payment_address?, tier, // user collateralization, 200k in flux nodes is most trusted, get 500 points // 200k flux in nodes, is most trusted, 5 * 40, 200 * 1, get 500 points diff --git a/ZelBack/src/services/fluxCommunication.js b/ZelBack/src/services/fluxCommunication.js index 884e5819c..3d6f8d37e 100644 --- a/ZelBack/src/services/fluxCommunication.js +++ b/ZelBack/src/services/fluxCommunication.js @@ -8,7 +8,7 @@ const serviceHelper = require('./serviceHelper'); const verificationHelper = require('./verificationHelper'); const daemonServiceMiscRpcs = require('./daemonService/daemonServiceMiscRpcs'); const fluxCommunicationMessagesSender = require('./fluxCommunicationMessagesSender'); -const fluxCommunicationUtils = require('./fluxCommunicationUtils'); +const networkStateService = require('./networkStateService'); const fluxNetworkHelper = require('./fluxNetworkHelper'); const messageHelper = require('./messageHelper'); const { @@ -71,7 +71,8 @@ async function handleAppMessages(message, fromIP, port) { // eslint-disable-next-line global-require const appsService = require('./appsService'); const rebroadcastToPeers = await appsService.storeAppTemporaryMessage(message.data, true); - if (rebroadcastToPeers === true) { + + if (rebroadcastToPeers) { const messageString = serviceHelper.ensureString(message); const wsListOut = []; outgoingConnections.forEach((client) => { @@ -112,30 +113,33 @@ async function handleAppRunningMessage(message, fromIP, port) { // eslint-disable-next-line global-require const appsService = require('./appsService'); const rebroadcastToPeers = await appsService.storeAppRunningMessage(message.data); - const currentTimeStamp = Date.now(); - const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(message, currentTimeStamp, 240000); - if (rebroadcastToPeers === true && timestampOK) { - const messageString = serviceHelper.ensureString(message); - const wsListOut = []; - outgoingConnections.forEach((client) => { - if (client.ip === fromIP && client.port === port) { - // do not broadcast to this peer - } else { - wsListOut.push(client); - } - }); - fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); - await serviceHelper.delay(500); - const wsList = []; - incomingConnections.forEach((client) => { - if (client.ip === fromIP && client.port === port) { - // do not broadcast to this peer - } else { - wsList.push(client); - } - }); - fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); - } + + if (!rebroadcastToPeers) return; + + const stale = networkStateService.isBroadcastStale(message, 240_000); + + if (stale) return; + + const messageString = serviceHelper.ensureString(message); + const wsListOut = []; + outgoingConnections.forEach((client) => { + if (client.ip === fromIP && client.port === port) { + // do not broadcast to this peer + } else { + wsListOut.push(client); + } + }); + fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); + await serviceHelper.delay(500); + const wsList = []; + incomingConnections.forEach((client) => { + if (client.ip === fromIP && client.port === port) { + // do not broadcast to this peer + } else { + wsList.push(client); + } + }); + fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); } catch (error) { log.error(error); } @@ -154,30 +158,33 @@ async function handleIPChangedMessage(message, fromIP, port) { // eslint-disable-next-line global-require const appsService = require('./appsService'); const rebroadcastToPeers = await appsService.storeIPChangedMessage(message.data); - const currentTimeStamp = Date.now(); - const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(message, currentTimeStamp, 240000); - if (rebroadcastToPeers && timestampOK) { - const messageString = serviceHelper.ensureString(message); - const wsListOut = []; - outgoingConnections.forEach((client) => { - if (client.ip === fromIP && client.port === port) { - // do not broadcast to this peer - } else { - wsListOut.push(client); - } - }); - fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); - await serviceHelper.delay(500); - const wsList = []; - incomingConnections.forEach((client) => { - if (client.ip === fromIP && client.port === port) { - // do not broadcast to this peer - } else { - wsList.push(client); - } - }); - fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); - } + + if (!rebroadcastToPeers) return; + + const stale = networkStateService.isBroadcastStale(message, 240_000); + + if (stale) return; + + const messageString = serviceHelper.ensureString(message); + const wsListOut = []; + outgoingConnections.forEach((client) => { + if (client.ip === fromIP && client.port === port) { + // do not broadcast to this peer + } else { + wsListOut.push(client); + } + }); + fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); + await serviceHelper.delay(500); + const wsList = []; + incomingConnections.forEach((client) => { + if (client.ip === fromIP && client.port === port) { + // do not broadcast to this peer + } else { + wsList.push(client); + } + }); + fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); } catch (error) { log.error(error); } @@ -196,30 +203,33 @@ async function handleAppRemovedMessage(message, fromIP, port) { // eslint-disable-next-line global-require const appsService = require('./appsService'); const rebroadcastToPeers = await appsService.storeAppRemovedMessage(message.data); - const currentTimeStamp = Date.now(); - const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(message, currentTimeStamp, 240000); - if (rebroadcastToPeers && timestampOK) { - const messageString = serviceHelper.ensureString(message); - const wsListOut = []; - outgoingConnections.forEach((client) => { - if (client.ip === fromIP && client.port === port) { - // do not broadcast to this peer - } else { - wsListOut.push(client); - } - }); - fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); - await serviceHelper.delay(500); - const wsList = []; - incomingConnections.forEach((client) => { - if (client.ip === fromIP && client.port === port) { - // do not broadcast to this peer - } else { - wsList.push(client); - } - }); - fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); - } + + if (!rebroadcastToPeers) return; + + const stale = networkStateService.isBroadcastStale(message, 240_000); + + if (stale) return; + + const messageString = serviceHelper.ensureString(message); + const wsListOut = []; + outgoingConnections.forEach((client) => { + if (client.ip === fromIP && client.port === port) { + // do not broadcast to this peer + } else { + wsListOut.push(client); + } + }); + fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); + await serviceHelper.delay(500); + const wsList = []; + incomingConnections.forEach((client) => { + if (client.ip === fromIP && client.port === port) { + // do not broadcast to this peer + } else { + wsList.push(client); + } + }); + fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); } catch (error) { log.error(error); } @@ -302,11 +312,10 @@ function handleIncomingConnection(websocket, req) { } */ const msgObj = serviceHelper.ensureObject(msg.data); - const { pubKey } = msgObj; - const { timestamp } = msgObj; - const { signature } = msgObj; - const { version } = msgObj; - const { data } = msgObj; + const { + pubKey, timestamp, signature, version, data, + } = msgObj; + if (!pubKey || !timestamp || !signature || !version || !data) { try { log.info(`Invalid received from incoming peer ${peer.ip}:${peer.port}. Closing incoming connection`); @@ -318,19 +327,19 @@ function handleIncomingConnection(websocket, req) { } // check if we have the message in cache. If yes, return false. If not, store it and continue - await serviceHelper.delay(Math.floor(Math.random() * 75 + 1)); // await max 75 miliseconds random, should jelp on processing duplicated messages received at same timestamp + // await max 75 miliseconds random, should help on processing duplicated messages received at same timestamp + await serviceHelper.delay(Math.floor(Math.random() * 75 + 1)); + const messageHash = hash(msgObj.data); if (myCacheTemp.has(messageHash)) { return; } + myCacheTemp.set(messageHash, messageHash); - // check rate limit + const rateOK = fluxNetworkHelper.lruRateLimit(`${ipv4Peer}:${port}`, 90); - if (!rateOK) { - return; // do not react to the message - } + if (!rateOK) return; - // check blocked list if (blockedPubKeysCache.has(pubKey)) { try { log.info('Closing incoming connection, peer is on blockedList'); @@ -340,39 +349,19 @@ function handleIncomingConnection(websocket, req) { } return; } - const currentTimeStamp = Date.now(); - const messageOK = await fluxCommunicationUtils.verifyFluxBroadcast(msgObj, undefined, currentTimeStamp); - if (messageOK === true) { - const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(msgObj, currentTimeStamp); - if (timestampOK === true) { - try { - if (msgObj.data.type === 'zelappregister' || msgObj.data.type === 'zelappupdate' || msgObj.data.type === 'fluxappregister' || msgObj.data.type === 'fluxappupdate') { - handleAppMessages(msgObj, peer.ip, peer.port); - } else if (msgObj.data.type === 'fluxapprequest') { - fluxCommunicationMessagesSender.respondWithAppMessage(msgObj, ws); - } else if (msgObj.data.type === 'fluxapprunning') { - handleAppRunningMessage(msgObj, peer.ip, peer.port); - } else if (msgObj.data.type === 'fluxipchanged') { - handleIPChangedMessage(msgObj, peer.ip, peer.port); - } else if (msgObj.data.type === 'fluxappremoved') { - handleAppRemovedMessage(msgObj, peer.ip, peer.port); - } else { - log.warn(`Unrecognised message type of ${msgObj.data.type}`); - } - } catch (e) { - log.error(e); - } - } - } else { + + const messageOk = await networkStateService.verifyBroadcast(msgObj); + + if (!messageOk) { // we dont like this peer as it sent wrong message (wrong, or message belonging to node no longer on network). Lets close the connection // and add him to blocklist try { // check if message comes from IP belonging to the public Key - let zl = await fluxCommunicationUtils.deterministicFluxList(pubKey); // this itself is sufficient. + let zl = await networkStateService.getFluxnodesByPubkey(pubKey); // this itself is sufficient. let nodeFound = zl.find((n) => n.ip.split(':')[0] === peer.ip && (n.ip.split(':')[1] || 16127) === peer.port); if (!nodeFound) { // check if message comes from IP belonging to the public Key - zl = await fluxCommunicationUtils.deterministicFluxList(); // this itself is sufficient. + zl = networkStateService.networkState(); // this itself is sufficient. const possibleNodes = zl.filter((key) => key.pubkey === pubKey); // another check in case sufficient check failed on daemon level nodeFound = possibleNodes.find((n) => n.ip.split(':')[0] === peer.ip && (n.ip.split(':')[1] || 16127) === peer.port); if (!nodeFound) { @@ -391,8 +380,52 @@ function handleIncomingConnection(websocket, req) { } catch (e) { log.error(e); } + return; + } + + const stale = networkStateService.isBroadcastStale(msgObj); + + // I'm not really sure of the intent here. It seems like if a message is older + // than 5 minutes, we silently drop it. + + // Then, if it's a register or update message, we rebroadcast regardless. + + // However, if it's a running, changed or removed message, if it is older than + // 4 minutes, we silently drop it. + + // The reason I don't like this is we are checking the timestamp twice. Seems like + // each handler should just handle the timestamp itself. The running, changes and removed + // handlers are all basically the same anyway. The only difference is the store procedure called. + + // The store should be on the db, not on appsService. I'm assuming you were getting circular imports, + // hence the local require; if we moved store procedures to db, it wouldn't be a problem. + + if (stale) return; + + // pretty sure we can remove the zel ones, not used anymore + const appMessageTypes = ['fluxappregister', 'fluxappupdate', 'zelappupdate', 'zelappregister']; + + const { type: msgType } = data; + + try { + if (appMessageTypes.includes(msgType)) { + handleAppMessages(msgObj, peer.ip, peer.port); + } else if (msgType === 'fluxapprequest') { + fluxCommunicationMessagesSender.respondWithAppMessage(msgObj, ws); + } else if (msgType === 'fluxapprunning') { + handleAppRunningMessage(msgObj, peer.ip, peer.port); + } else if (msgType === 'fluxipchanged') { + handleIPChangedMessage(msgObj, peer.ip, peer.port); + } else if (msgType === 'fluxappremoved') { + handleAppRemovedMessage(msgObj, peer.ip, peer.port); + } else { + log.warn(`Unrecognised message type of ${msgType}`); + } + } catch (e) { + log.error(e); } }; + ws.onclose = (msg) => { const { ip } = ws; log.info(`Incoming connection to ${ip}:${port} closed with code ${msg.code}`); @@ -613,11 +646,10 @@ async function initiateAndHandleConnection(connection) { messageNumber = 0; } */ const msgObj = serviceHelper.ensureObject(evt.data); - const { pubKey } = msgObj; - const { timestamp } = msgObj; - const { signature } = msgObj; - const { version } = msgObj; - const { data } = msgObj; + const { + pubKey, timestamp, signature, version, data, + } = msgObj; + if (!pubKey || !timestamp || !signature || !version || !data) { try { log.info(`Invalid received from outgoing peer ${ip}:${port}. Closing outgoing connection`); @@ -629,19 +661,17 @@ async function initiateAndHandleConnection(connection) { } // check if we have the message in cache. If yes, return false. If not, store it and continue await serviceHelper.delay(Math.floor(Math.random() * 75 + 1)); // await max 75 miliseconds random, should help processing duplicated messages received at same timestamp + const messageHash = hash(msgObj.data); if (myCacheTemp.has(messageHash)) { return; } + myCacheTemp.set(messageHash, messageHash); - // incoming messages from outgoing connections - const currentTimeStamp = Date.now(); // ms - // check rate limit + const rateOK = fluxNetworkHelper.lruRateLimit(`${ip}:${port}`, 90); - if (!rateOK) { - return; // do not react to the message - } - // check blocked list + if (!rateOK) return; + if (blockedPubKeysCache.has(pubKey)) { try { log.info('Closing outgoing connection, peer is on blockedList'); @@ -651,27 +681,19 @@ async function initiateAndHandleConnection(connection) { } return; } - const messageOK = await fluxCommunicationUtils.verifyOriginalFluxBroadcast(msgObj, undefined, currentTimeStamp); - if (messageOK === true) { - if (msgObj.data.type === 'zelappregister' || msgObj.data.type === 'zelappupdate' || msgObj.data.type === 'fluxappregister' || msgObj.data.type === 'fluxappupdate') { - handleAppMessages(msgObj, ip, port); - } else if (msgObj.data.type === 'fluxapprequest') { - fluxCommunicationMessagesSender.respondWithAppMessage(msgObj, websocket); - } else if (msgObj.data.type === 'fluxapprunning') { - handleAppRunningMessage(msgObj, ip, port); - } else if (msgObj.data.type === 'fluxipchanged') { - handleIPChangedMessage(msgObj, ip, port); - } else if (msgObj.data.type === 'fluxappremoved') { - handleAppRemovedMessage(msgObj, ip, port); - } else { - log.warn(`Unrecognised message type of ${msgObj.data.type}`); - } - } else { + + const stale = networkStateService.isBroadcastStale(msgObj); + + if (stale) return; + + const messageOk = await networkStateService.verifyBroadcast(msgObj); + + if (!messageOk) { // we dont like this peer as it sent wrong message (wrong, or message belonging to node no longer on network). Lets close the connection // and add him to blocklist try { // check if message comes from IP belonging to the public Key - const zl = await fluxCommunicationUtils.deterministicFluxList(pubKey); // this itself is sufficient. + const zl = await networkStateService.getFluxnodesByPubkey(pubKey); // this itself is sufficient. const possibleNodes = zl.filter((key) => key.pubkey === pubKey); // another check in case sufficient check failed on daemon level const nodeFound = possibleNodes.find((n) => n.ip === connection); // connection is either ip or ip:port (if port is not 16127) if (!nodeFound) { @@ -685,6 +707,25 @@ async function initiateAndHandleConnection(connection) { } catch (e) { log.error(e); } + return; + } + + const appMessageTypes = ['fluxappregister', 'fluxappupdate', 'zelappupdate', 'zelappregister']; + + const { type: msgType } = data; + + if (appMessageTypes.includes(msgType)) { + handleAppMessages(msgObj, ip, port); + } else if (msgType === 'fluxapprequest') { + fluxCommunicationMessagesSender.respondWithAppMessage(msgObj, websocket); + } else if (msgType === 'fluxapprunning') { + handleAppRunningMessage(msgObj, ip, port); + } else if (msgType === 'fluxipchanged') { + handleIPChangedMessage(msgObj, ip, port); + } else if (msgType === 'fluxappremoved') { + handleAppRemovedMessage(msgObj, ip, port); + } else { + log.warn(`Unrecognised message type of ${msgType}`); } }; @@ -753,7 +794,7 @@ async function addPeer(req, res) { * @param {object} res Response. * @returns {object} Message. */ -async function addOutgoingPeer(req, res) { +function addOutgoingPeer(req, res) { try { let { ip } = req.params; ip = ip || req.query.ip; @@ -779,7 +820,7 @@ async function addOutgoingPeer(req, res) { return res.json(errMessage); } - const nodeList = await fluxCommunicationUtils.deterministicFluxList(); + const nodeList = networkStateService.networkState(); const fluxNode = nodeList.find((node) => node.ip.split(':')[0] === ip.split(':')[0] && (node.ip.split(':')[1] || 16127) === port); if (!fluxNode) { const errMessage = messageHelper.createErrorMessage(`FluxNode ${ip.split(':')[0]}:${port} is not confirmed on the network.`); @@ -815,7 +856,7 @@ async function fluxDiscovery() { const myIP = await fluxNetworkHelper.getMyFluxIPandPort(); if (myIP) { - nodeList = await fluxCommunicationUtils.deterministicFluxList(); + nodeList = networkStateService.networkState(); numberOfFluxNodes = nodeList.length; const fluxNode = nodeList.find((node) => node.ip === myIP); if (!fluxNode) { @@ -889,7 +930,7 @@ async function fluxDiscovery() { while ((outgoingConnections.length < 14 || [...new Set(outgoingConnections.map((client) => client.ip))].length < 9) && index < 100) { // Max of 14 outgoing connections - 8 possible deterministic + min. 6 random index += 1; // eslint-disable-next-line no-await-in-loop - const connection = await fluxNetworkHelper.getRandomConnection(); + const connection = fluxNetworkHelper.getRandomConnection(); if (connection) { const ipInc = connection.split(':')[0]; const portInc = connection.split(':')[1] || '16127'; @@ -910,7 +951,7 @@ async function fluxDiscovery() { while ((incomingConnections.length < 12 || [...new Set(incomingConnections.map((client) => client.ip))].length < 5) && index < 100) { // Max of 12 incoming connections - 8 possible deterministic + min. 4 random (we will get more random as others nodes have more random outgoing connections) index += 1; // eslint-disable-next-line no-await-in-loop - const connection = await fluxNetworkHelper.getRandomConnection(); + const connection = fluxNetworkHelper.getRandomConnection(); if (connection) { const ipInc = connection.split(':')[0]; const portInc = connection.split(':')[1] || '16127'; diff --git a/ZelBack/src/services/fluxCommunicationMessagesSender.js b/ZelBack/src/services/fluxCommunicationMessagesSender.js index a7ae1cf69..b48b626c9 100644 --- a/ZelBack/src/services/fluxCommunicationMessagesSender.js +++ b/ZelBack/src/services/fluxCommunicationMessagesSender.js @@ -10,6 +10,16 @@ const { outgoingConnections, outgoingPeers, incomingPeers, incomingConnections, } = require('./utils/establishedConnections'); +/** + * @typedef {{ + * version: number, + * timestamp: number, + * pubKey: string, + * signature: string, + * data : object, + * }} FluxNetworkMessage + */ + // default cache const LRUoptions = { max: 1000, diff --git a/ZelBack/src/services/fluxCommunicationUtils.js b/ZelBack/src/services/fluxCommunicationUtils.js deleted file mode 100644 index 4ebca9db4..000000000 --- a/ZelBack/src/services/fluxCommunicationUtils.js +++ /dev/null @@ -1,226 +0,0 @@ -/* eslint-disable no-underscore-dangle */ -const { LRUCache } = require('lru-cache'); -const log = require('../lib/log'); -const serviceHelper = require('./serviceHelper'); -const verificationHelper = require('./verificationHelper'); -const daemonServiceFluxnodeRpcs = require('./daemonService/daemonServiceFluxnodeRpcs'); -// default cache -const LRUoptions = { - max: 20000, // currently 20000 nodes - ttl: 1000 * 240, // 240 seconds, allow up to 2 blocks - maxAge: 1000 * 240, // 240 seconds, allow up to 2 blocks -}; - -const myCache = new LRUCache(LRUoptions); - -let addingNodesToCache = false; - -/** - * To constantly update deterministic Flux list every 2 minutes so we always trigger cache and have up to date value - */ -async function constantlyUpdateDeterministicFluxList() { - try { - while (addingNodesToCache) { - // prevent several instances filling the cache at the same time. - // eslint-disable-next-line no-await-in-loop - await serviceHelper.delay(100); - } - addingNodesToCache = true; - const request = { - params: {}, - query: {}, - }; - const daemonFluxNodesList = await daemonServiceFluxnodeRpcs.viewDeterministicFluxNodeList(request); - if (daemonFluxNodesList.status === 'success') { - const generalFluxList = daemonFluxNodesList.data || []; - myCache.set('fluxList', generalFluxList); - } - addingNodesToCache = false; - await serviceHelper.delay(2 * 60 * 1000); // 2 minutes - constantlyUpdateDeterministicFluxList(); - } catch (error) { - addingNodesToCache = false; - log.error(error); - await serviceHelper.delay(2 * 60 * 1000); // 2 minutes - constantlyUpdateDeterministicFluxList(); - } -} - -/** - * To get deterministc Flux list from cache. - * @param {string} filter Filter. Can only be a publicKey. - * @returns {(*|*)} Value of any type or an empty array of any type. - */ -async function deterministicFluxList(filter) { - try { - while (addingNodesToCache) { - // prevent several instances filling the cache at the same time. - // eslint-disable-next-line no-await-in-loop - await serviceHelper.delay(100); - } - let fluxList; - if (filter) { - fluxList = myCache.get(`fluxList${serviceHelper.ensureString(filter)}`); - } else { - fluxList = myCache.get('fluxList'); - } - if (!fluxList) { - let generalFluxList = myCache.get('fluxList'); - addingNodesToCache = true; - if (!generalFluxList) { - const request = { - params: {}, - query: {}, - }; - const daemonFluxNodesList = await daemonServiceFluxnodeRpcs.viewDeterministicFluxNodeList(request); - if (daemonFluxNodesList.status === 'success') { - generalFluxList = daemonFluxNodesList.data || []; - myCache.set('fluxList', generalFluxList); - if (filter) { - const filterFluxList = generalFluxList.filter((node) => node.pubkey === filter); - myCache.set(`fluxList${serviceHelper.ensureString(filter)}`, filterFluxList); - } - } - } else { // surely in filtered branch too - const filterFluxList = generalFluxList.filter((node) => node.pubkey === filter); - myCache.set(`fluxList${serviceHelper.ensureString(filter)}`, filterFluxList); - } - addingNodesToCache = false; - if (filter) { - fluxList = myCache.get(`fluxList${serviceHelper.ensureString(filter)}`); - } else { - fluxList = myCache.get('fluxList'); - } - } - return fluxList || []; - } catch (error) { - log.error(error); - return []; - } -} - -/** - * To verify Flux broadcast. - * @param {object} data Data containing public key, timestamp, signature and version. - * @param {object[]} obtainedFluxNodesList List of FluxNodes. - * @param {number} currentTimeStamp Current timestamp. - * @returns {boolean} False unless message is successfully verified. - */ -async function verifyFluxBroadcast(data, obtainedFluxNodesList, currentTimeStamp) { - const dataObj = serviceHelper.ensureObject(data); - const { pubKey } = dataObj; - const { timestamp } = dataObj; // ms - const { signature } = dataObj; - const { version } = dataObj; - // only version 1 is active - if (version !== 1) { - return false; - } - const message = serviceHelper.ensureString(dataObj.data); - // is timestamp valid ? - // eslint-disable-next-line no-param-reassign - currentTimeStamp = currentTimeStamp || Date.now(); // ms - if (currentTimeStamp < (timestamp - 120000)) { // message was broadcasted in the future. Allow 120 sec clock sync - log.error('Message from future'); - return false; - } - - let node = null; - if (obtainedFluxNodesList) { // for test purposes. - node = obtainedFluxNodesList.find((key) => key.pubkey === pubKey); - if (!node) { - return false; - } - } - if (!node) { - // node that broadcasted the message has to be on list - // pubkey of the broadcast has to be on the list - let zl = await deterministicFluxList(pubKey); - if (dataObj.data && dataObj.data.type === 'fluxapprunning') { - node = zl.find((key) => key.pubkey === pubKey && dataObj.data.ip && dataObj.data.ip === key.ip); // check ip is on the network and belongs to broadcasted public key - if (!node) { - zl = await deterministicFluxList(); - node = zl.find((key) => key.pubkey === pubKey && dataObj.data.ip === key.ip); // check ip is on the network and belongs to broadcasted public key - if (!node) { - log.warn(`Invalid fluxapprunning message, ip: ${dataObj.data.ip} pubkey: ${pubKey}`); // most of invalids are caused because our deterministic list is cached for couple of minutes - return false; - } - } - } else if (dataObj.data && dataObj.data.type === 'fluxipchanged') { - node = zl.find((key) => key.pubkey === pubKey && dataObj.data.oldIP && dataObj.data.oldIP === key.ip); // check ip is on the network and belongs to broadcasted public key - if (!node) { - zl = await deterministicFluxList(); - node = zl.find((key) => key.pubkey === pubKey && dataObj.data.oldIP === key.ip); // check ip is on the network and belongs to broadcasted public key - if (!node) { - log.warn(`Invalid fluxipchanged message, oldIP: ${dataObj.data.oldIP} pubkey: ${pubKey}`); - return false; - } - } - } else if (dataObj.data && dataObj.data.type === 'fluxappremoved') { - node = zl.find((key) => key.pubkey === pubKey && dataObj.data.ip && dataObj.data.ip === key.ip); // check ip is on the network and belongs to broadcasted public key - if (!node) { - zl = await deterministicFluxList(); - node = zl.find((key) => key.pubkey === pubKey && dataObj.data.ip === key.ip); // check ip is on the network and belongs to broadcasted public key - if (!node) { - log.warn(`Invalid fluxappremoved message, ip: ${dataObj.data.ip} pubkey: ${pubKey}`); - return false; - } - } - } else { - node = zl.find((key) => key.pubkey === pubKey); - } - } - if (!node) { - log.warn(`No node belonging to ${pubKey} found`); - return false; - } - const messageToVerify = version + message + timestamp; - const verified = verificationHelper.verifyMessage(messageToVerify, pubKey, signature); - if (verified === true) { - return true; - } - return false; -} - -/** - * To verify timestamp in Flux broadcast. - * @param {object} data Data. - * @param {number} currentTimeStamp Current timestamp. - * @returns {boolean} False unless current timestamp is within 5 minutes of the data object's timestamp. - */ -function verifyTimestampInFluxBroadcast(data, currentTimeStamp, maxOld = 300000) { - // eslint-disable-next-line no-param-reassign - const dataObj = serviceHelper.ensureObject(data); - const { timestamp } = dataObj; // ms - // eslint-disable-next-line no-param-reassign - currentTimeStamp = currentTimeStamp || Date.now(); // ms - if (currentTimeStamp < (timestamp + maxOld)) { // not older than 5 mins - return true; - } - log.warn(`Timestamp ${timestamp} of message is too old ${currentTimeStamp}}`); - return false; -} - -/** - * To verify original Flux broadcast. Extends verifyFluxBroadcast by not allowing request older than 5 mins. - * @param {object} data Data. - * @param {object[]} obtainedFluxNodeList List of FluxNodes. - * @param {number} currentTimeStamp Current timestamp. - * @returns {boolean} False unless message is successfully verified. - */ -async function verifyOriginalFluxBroadcast(data, obtainedFluxNodeList, currentTimeStamp) { - const timeStampOK = verifyTimestampInFluxBroadcast(data, currentTimeStamp); - if (timeStampOK) { - const broadcastOK = await verifyFluxBroadcast(data, obtainedFluxNodeList, currentTimeStamp); - return broadcastOK; - } - return false; -} - -module.exports = { - constantlyUpdateDeterministicFluxList, - verifyTimestampInFluxBroadcast, - verifyOriginalFluxBroadcast, - deterministicFluxList, - verifyFluxBroadcast, -}; diff --git a/ZelBack/src/services/fluxNetworkHelper.js b/ZelBack/src/services/fluxNetworkHelper.js index bc5ad3eb1..a6a6ad5a6 100644 --- a/ZelBack/src/services/fluxNetworkHelper.js +++ b/ZelBack/src/services/fluxNetworkHelper.js @@ -18,7 +18,7 @@ const daemonServiceBenchmarkRpcs = require('./daemonService/daemonServiceBenchma const daemonServiceWalletRpcs = require('./daemonService/daemonServiceWalletRpcs'); const benchmarkService = require('./benchmarkService'); const verificationHelper = require('./verificationHelper'); -const fluxCommunicationUtils = require('./fluxCommunicationUtils'); +const networkStateService = require('./networkStateService'); const { outgoingConnections, outgoingPeers, incomingPeers, incomingConnections, } = require('./utils/establishedConnections'); @@ -268,7 +268,7 @@ async function checkAppAvailability(req, res) { const ipPort = processedBody.port; // pubkey of the message has to be on the list - const zl = await fluxCommunicationUtils.deterministicFluxList(pubKey); // this itself is sufficient. + const zl = await networkStateService.getFluxnodesByPubkey(pubKey); // this itself is sufficient. const node = zl.find((key) => key.pubkey === pubKey); // another check in case sufficient check failed on daemon level const dataToVerify = processedBody; delete dataToVerify.signature; @@ -406,8 +406,8 @@ async function getFluxNodePublicKey(privatekey) { * To get a random connection. * @returns {string} IP:Port or just IP if default. */ -async function getRandomConnection() { - const nodeList = await fluxCommunicationUtils.deterministicFluxList(); +function getRandomConnection() { + const nodeList = networkStateService.networkState(); const zlLength = nodeList.length; if (zlLength === 0) { return null; @@ -682,7 +682,7 @@ async function checkMyFluxAvailability(retryNumber = 0) { if (!fluxBenchVersionAllowed) { return false; } - let askingIP = await getRandomConnection(); + let askingIP = getRandomConnection(); if (typeof askingIP !== 'string' || typeof myFluxIP !== 'string' || myFluxIP === askingIP) { return false; } @@ -764,7 +764,7 @@ async function checkMyFluxAvailability(retryNumber = 0) { } const measuredUptime = fluxUptime(); if (measuredUptime.status === 'success' && measuredUptime.data > config.fluxapps.minUpTime) { // node has been running for 30 minutes. Upon starting a node, there can be dos that needs resetting - const nodeList = await fluxCommunicationUtils.deterministicFluxList(); + const nodeList = networkStateService.networkState(); // nodeList must include our fluxnode ip myIP let myCorrectIp = `${myIP}:${apiPort}`; if (apiPort === 16127 || apiPort === '16127') { @@ -903,7 +903,7 @@ async function checkDeterministicNodesCollisions() { }, 120 * 1000); return; } - const nodeList = await fluxCommunicationUtils.deterministicFluxList(); + const nodeList = networkStateService.networkState(); const result = nodeList.filter((node) => node.ip === myIP); const nodeStatus = await daemonServiceFluxnodeRpcs.getFluxNodeStatus(); if (nodeStatus.status === 'success') { // different scenario is caught elsewhere diff --git a/ZelBack/src/services/networkStateService.js b/ZelBack/src/services/networkStateService.js new file mode 100644 index 000000000..b8ca5cead --- /dev/null +++ b/ZelBack/src/services/networkStateService.js @@ -0,0 +1,176 @@ +const log = require('../lib/log'); +const serviceHelper = require('./serviceHelper'); +const verificationHelper = require('./verificationHelper'); +const daemonServiceFluxnodeRpcs = require('./daemonService/daemonServiceFluxnodeRpcs'); +const networkStateManager = require('./utils/networkStateManager'); + +/** + * @typedef {import('./utils/networkStateManager').Fluxnode} Fluxnode + * @typedef {import('./fluxCommunicationMessagesSender').FluxNetworkMessage} FluxNetworkMessage + */ + +/** + * The NetworkStateManager object. Responsible for fetching the nodelist, + * and maintaining indexes for fast access. + */ +let stateManager = null; + +async function start() { + return new Promise((resolve, reject) => { + if (stateManager) resolve(); + + const fetcher = async (filter = null) => { + const options = { params: { filter }, query: { filter: null } }; + + const res = await daemonServiceFluxnodeRpcs.viewDeterministicFluxNodeList( + options, + ); + + if (res.status === 'success') { + return res.data; + } + return []; + }; + + stateManager = new networkStateManager.NetworkStateManager(fetcher, { + intervalMs: 120_000, + }); + const timeout = setTimeout( + () => reject(new Error('Unable To Start: Timeout of 300s reached')), + 300_000, + ); + + stateManager.once('populated', () => { + clearTimeout(timeout); + resolve(); + }); + + stateManager.start(); + }); +} + +async function stop() { + if (!stateManager) return; + + await stateManager.stop(); +} + +/** + * Returns the entire fluxnode network state + * @returns {Array} + */ +function networkState() { + if (!stateManager) return []; + + return stateManager.state; +} + +async function getFluxnodesByPubkey(pubkey) { + const nodes = await stateManager.search(pubkey, 'pubkey'); + // in the future, just return the map. + return [...nodes.values()]; +} + +/** + * + * @param {FluxNetworkMessage} broadcast + * @param {{maxAge?: number}} options + * @returns {boolean} + */ +function isBroadcastStale(broadcast, options = {}) { + const maxAge = options.maxAge || 300_000; + + const { timestamp } = broadcast; + const now = Date.now(); + + if (now > timestamp + maxAge) { + log.warn( + `isBroadcastStale: Timestamp ${timestamp} of broadcast is too old ${now}}`, + ); + return true; + } + return false; +} + +/** + * To verify Flux broadcast. + * @param {object} broadcast Flux network layer message containing public key, timestamp, signature and version. + * @param {{maxAge?: number}} options + * @returns {Promise} False unless message is successfully verified. + */ +async function verifyBroadcast(broadcast) { + const { + pubKey, timestamp, signature, version, data: payload, + } = broadcast; + + if (version !== 1) return false; + + const message = serviceHelper.ensureString(payload); + + if (!message) return false; + + const { type: msgType } = payload; + + if (!msgType) return false; + + const now = Date.now(); + + // message was broadcasted in the future. Allow 120 sec clock sync + if (now < timestamp - 120_000) { + log.error('VerifyBroadcast: Message from future, rejecting'); + return false; + } + + const nodes = await stateManager.search(pubKey, 'pubkey'); + + let error = ''; + let target = ''; + + switch (msgType) { + case 'fluxapprunning': + target = payload.ip; + // most of invalids are caused because our deterministic list is cached for couple of minutes + error = `Invalid fluxapprunning message, ip: ${payload.ip} pubkey: ${pubKey}`; + break; + + case 'fluxipchanged': + target = payload.oldIP; + error = `Invalid fluxipchanged message, oldIP: ${payload.oldIP} pubkey: ${pubKey}`; + break; + + case 'fluxappremoved': + target = payload.ip; + error = `Invalid fluxappremoved message, ip: ${payload.ip} pubkey: ${pubKey}`; + break; + + // zelappregister zelappupdate fluxappregister fluxappupdate + default: + // we take the first node. Why??? What does this validate? + target = nodes.size ? nodes.keys().next().value.ip : null; + error = `No node belonging to ${pubKey} found`; + } + + const node = nodes.get(target) || (await stateManager.search(target, 'endpoint')); + + if (!node) { + log.warn(error); + return false; + } + + const messageToVerify = version + message + timestamp; + const verified = verificationHelper.verifyMessage( + messageToVerify, + pubKey, + signature, + ); + return verified; +} + +module.exports = { + getFluxnodesByPubkey, + isBroadcastStale, + networkState, + start, + stop, + verifyBroadcast, +}; diff --git a/ZelBack/src/services/serviceManager.js b/ZelBack/src/services/serviceManager.js index bdea40a34..b4022a500 100644 --- a/ZelBack/src/services/serviceManager.js +++ b/ZelBack/src/services/serviceManager.js @@ -4,7 +4,7 @@ const log = require('../lib/log'); const dbHelper = require('./dbHelper'); const explorerService = require('./explorerService'); const fluxCommunication = require('./fluxCommunication'); -const fluxCommunicationUtils = require('./fluxCommunicationUtils'); +const networkStateService = require('./networkStateService'); const fluxNetworkHelper = require('./fluxNetworkHelper'); const appsService = require('./appsService'); const daemonServiceMiscRpcs = require('./daemonService/daemonServiceMiscRpcs'); @@ -104,7 +104,9 @@ async function startFluxFunctions() { log.info('Mongodb zelnodetransactions dropped'); setTimeout(() => { - fluxCommunicationUtils.constantlyUpdateDeterministicFluxList(); // updates deterministic flux list for communication every 2 minutes, so we always trigger cache and have up to date value + // updates deterministic flux list for communication every 2 minutes. + // ToDo: subscribe to fluxd block notifications + networkStateService.start(); }, 15 * 1000); setTimeout(async () => { log.info('Rechecking firewall app rules'); diff --git a/ZelBack/src/services/utils/networkStateManager.js b/ZelBack/src/services/utils/networkStateManager.js new file mode 100644 index 000000000..43c62914c --- /dev/null +++ b/ZelBack/src/services/utils/networkStateManager.js @@ -0,0 +1,191 @@ +/** + * This should really subscribe to fluxd blocks using zmq. + * It doesn't make sense to poll every 2 minutes when you often get + * multiple blocks in a short timeframe. + */ + +globalThis.userconfig = { initial: { testnet: false } }; + +const { EventEmitter } = require('node:events'); +const { FluxController } = require('./fluxController'); + +/** + * The Fluxnode as returned by fluxd + * @typedef {{ + * collateral: string, + * txhash: string, + * outidx: number, + * ip: string, + * network: string, + * added_height: number, + * confirmed_height: number, + * last_confirmed_height: number, + * last_paid_height: number, + * tier: string, + * payment_address: string, + * pubkey: string, + * activesince: string, + * lastpaid: string, + * amount: string, + * rank: number + * }} Fluxnode + */ + +class NetworkStateManager extends EventEmitter { + #state = []; + + #pubkeyIndex = new Map(); + + #endpointIndex = new Map(); + + #indexes = { pubkey: this.#pubkeyIndex, endpoint: this.#endpointIndex }; + + #running = false; + + #controller = new FluxController(); + + /** + * + * @param {()=>Promise>} stateFetcher + * @param {{}} options + */ + constructor(stateFetcher, options = {}) { + super(); + + this.stateFetcher = stateFetcher; + this.intervalMs = options.intervalMs || 0; + } + + get state() { + // should probably return a complete copy + return this.#state; + } + + get running() { + return this.#running; + } + + #setIndexes(nodes, filter, type) { + // storing a reference is 64 bits, same as a number, so may + // as well just store the reference, instead of array index + + const remove = Boolean(nodes.length); + + if (remove) { + if (type === 'pubkey') { + const toDelete = this.#indexes.pubkey.get(filter); + this.#indexes.pubkey.delete(filter); + toDelete.forEach((node) => { + this.#indexes.endpoint.delete(node.ip); + }); + } else if (type === 'endpoint') { + this.#indexes.endpoint.delete(filter); + } + + return; + } + + let pubkeyMap = null; + + if (type === 'pubkey') { + pubkeyMap = new Map(); + this.#pubkeyIndex.set(nodes[0].pubkey, pubkeyMap); + } + + nodes.forEach((node) => { + if (type === 'pubkey') pubkeyMap.set(node.ip, node); + + this.#endpointIndex.set(node.ip, node); + }); + } + + #recreateIndexes() { + this.#pubkeyIndex.clear(); + this.#endpointIndex.clear(); + + this.#setIndexes(this.#state); + } + + async start() { + const runner = async () => { + const start = Date.now(); + const state = await this.stateFetcher().catch(() => []); + const populated = Boolean(this.#state.length); + + if (state.length) { + this.#state = state; + this.#recreateIndexes(); + if (!populated) this.emit('populated'); + this.emit('updated'); + } + + const elapsed = Date.now() - start; + return this.intervalMs - elapsed; + }; + + this.#controller.startLoop(runner); + } + + async stop() { + await this.#controller.abort(); + } + + /** + * Find a node in the fluxnode list by either pubkey or endpoint, it + * first looks up the node locally, using O(1) from a Map; failing that, + * it goes out to the api and fetches it, and updates the local cache. + * + * @param {string} filter Pubkey or endpoint (ip:port) + * @param {"pubkey"|"endpoint"} type + * @returns + */ + async search(filter, type) { + if (!filter) return null; + if (!Object.keys(this.#indexes).includes(type)) return null; + + const cached = this.#indexes[type].get(filter); + if (cached) return cached; + + // this is a reference to the object in the LRU cache (daemonServiceutils), + // if you modify this object, you are actually modifying the object in the cache. + // Any LRU cache should not return a direct object, it should return a copy. + const nodes = await this.stateFetcher(filter).catch(() => []); + + this.#setIndexes(nodes, filter, type); + + return this.#indexes[type].get(filter); + } +} + +const daemonServiceFluxnodeRpcs = require('../daemonService/daemonServiceFluxnodeRpcs'); + +async function main() { + const fetcher = async (filter = null) => { + const options = { params: { filter }, query: { filter: null } }; + + const res = await daemonServiceFluxnodeRpcs.viewDeterministicFluxNodeList(options); + + if (res.status === 'success') { + return res.data; + } + return []; + }; + + const network = new NetworkStateManager(fetcher, { intervalMs: 120_000 }); + network.on('updated', () => { + console.log('received updated event'); + }); + network.on('populated', async () => { + console.log('received populated event'); + console.log(await network.search('212.71.244.159:16137', 'endpoint')); + }); + // network.start(); + setInterval(async () => { + // await network.search('212.71.244.159:16137', 'endpoint'); + console.log(await network.search('0404bccaf5d3108439b4897697bf7ce4d045950264e118596e31cc579028a7f808870d6ac59b9c00412d2f354610a9d18b47db80b08ba6536f0ae093c08a3aaccb', 'pubkey')); + }, 5_000); +} + +main(); + +module.exports = { NetworkStateManager }; From ad042dd52d3e36fb379a08c7b63b19159f752473 Mon Sep 17 00:00:00 2001 From: David White Date: Fri, 28 Jun 2024 12:12:43 +0100 Subject: [PATCH 02/16] Fix indexes, add debug --- ZelBack/src/services/networkStateService.js | 9 +++++---- ZelBack/src/services/utils/networkStateManager.js | 12 +++++++----- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/ZelBack/src/services/networkStateService.js b/ZelBack/src/services/networkStateService.js index b8ca5cead..64be52998 100644 --- a/ZelBack/src/services/networkStateService.js +++ b/ZelBack/src/services/networkStateService.js @@ -26,10 +26,11 @@ async function start() { options, ); - if (res.status === 'success') { - return res.data; - } - return []; + const nodes = res.status === 'success' ? res.data : []; + + // testing + log.info('FLUXNODE LIST LENGTH', nodes.length); + return nodes; }; stateManager = new networkStateManager.NetworkStateManager(fetcher, { diff --git a/ZelBack/src/services/utils/networkStateManager.js b/ZelBack/src/services/utils/networkStateManager.js index 43c62914c..1e8df31d1 100644 --- a/ZelBack/src/services/utils/networkStateManager.js +++ b/ZelBack/src/services/utils/networkStateManager.js @@ -65,9 +65,9 @@ class NetworkStateManager extends EventEmitter { return this.#running; } - #setIndexes(nodes, filter, type) { - // storing a reference is 64 bits, same as a number, so may - // as well just store the reference, instead of array index + #setIndexes(nodes, options = {}) { + const filter = options.filter || null; + const type = options.type || 'pubkey'; const remove = Boolean(nodes.length); @@ -151,7 +151,7 @@ class NetworkStateManager extends EventEmitter { // Any LRU cache should not return a direct object, it should return a copy. const nodes = await this.stateFetcher(filter).catch(() => []); - this.#setIndexes(nodes, filter, type); + this.#setIndexes(nodes, { filter, type }); return this.#indexes[type].get(filter); } @@ -186,6 +186,8 @@ async function main() { }, 5_000); } -main(); +if (require.main === module) { + main(); +} module.exports = { NetworkStateManager }; From a9fb985d6199873174f36b91d695f88e7f857f38 Mon Sep 17 00:00:00 2001 From: David White Date: Fri, 28 Jun 2024 12:16:39 +0100 Subject: [PATCH 03/16] Remove test userconfig --- ZelBack/src/services/utils/networkStateManager.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/ZelBack/src/services/utils/networkStateManager.js b/ZelBack/src/services/utils/networkStateManager.js index 1e8df31d1..4a4eb596a 100644 --- a/ZelBack/src/services/utils/networkStateManager.js +++ b/ZelBack/src/services/utils/networkStateManager.js @@ -4,8 +4,6 @@ * multiple blocks in a short timeframe. */ -globalThis.userconfig = { initial: { testnet: false } }; - const { EventEmitter } = require('node:events'); const { FluxController } = require('./fluxController'); From ff1cb1185f0dcf3296ee4b8af5003775e4145d7e Mon Sep 17 00:00:00 2001 From: David White Date: Fri, 28 Jun 2024 12:23:09 +0100 Subject: [PATCH 04/16] Fix inverted match --- ZelBack/src/services/utils/networkStateManager.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ZelBack/src/services/utils/networkStateManager.js b/ZelBack/src/services/utils/networkStateManager.js index 4a4eb596a..a4a66d863 100644 --- a/ZelBack/src/services/utils/networkStateManager.js +++ b/ZelBack/src/services/utils/networkStateManager.js @@ -67,9 +67,7 @@ class NetworkStateManager extends EventEmitter { const filter = options.filter || null; const type = options.type || 'pubkey'; - const remove = Boolean(nodes.length); - - if (remove) { + if (!nodes.length) { if (type === 'pubkey') { const toDelete = this.#indexes.pubkey.get(filter); this.#indexes.pubkey.delete(filter); From a2cbbe27fb8db4f1ea24e55ccdb5c8d963fd9d53 Mon Sep 17 00:00:00 2001 From: David White Date: Fri, 28 Jun 2024 12:48:18 +0100 Subject: [PATCH 05/16] Retry if daemon is down --- ZelBack/src/services/networkStateService.js | 4 +--- ZelBack/src/services/serviceManager.js | 8 +++----- ZelBack/src/services/utils/networkStateManager.js | 15 ++++++++++++++- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/ZelBack/src/services/networkStateService.js b/ZelBack/src/services/networkStateService.js index 64be52998..08f10371f 100644 --- a/ZelBack/src/services/networkStateService.js +++ b/ZelBack/src/services/networkStateService.js @@ -26,10 +26,8 @@ async function start() { options, ); - const nodes = res.status === 'success' ? res.data : []; + const nodes = res.status === 'success' ? res.data : null; - // testing - log.info('FLUXNODE LIST LENGTH', nodes.length); return nodes; }; diff --git a/ZelBack/src/services/serviceManager.js b/ZelBack/src/services/serviceManager.js index b4022a500..9637c69f7 100644 --- a/ZelBack/src/services/serviceManager.js +++ b/ZelBack/src/services/serviceManager.js @@ -103,11 +103,9 @@ async function startFluxFunctions() { }); log.info('Mongodb zelnodetransactions dropped'); - setTimeout(() => { - // updates deterministic flux list for communication every 2 minutes. - // ToDo: subscribe to fluxd block notifications - networkStateService.start(); - }, 15 * 1000); + // fetch the fluxnode list. This is probably broken - if daemon isn't running, will hang here + await networkStateService.start(); + setTimeout(async () => { log.info('Rechecking firewall app rules'); fluxNetworkHelper.purgeUFW(); diff --git a/ZelBack/src/services/utils/networkStateManager.js b/ZelBack/src/services/utils/networkStateManager.js index a4a66d863..863ea06a9 100644 --- a/ZelBack/src/services/utils/networkStateManager.js +++ b/ZelBack/src/services/utils/networkStateManager.js @@ -104,8 +104,21 @@ class NetworkStateManager extends EventEmitter { async start() { const runner = async () => { + // should use monotonic clock for any elapsed times const start = Date.now(); - const state = await this.stateFetcher().catch(() => []); + + let state = null; + + while (!state) { + if (this.#controller.aborted) break; + + // eslint-disable-next-line no-await-in-loop + const res = await this.stateFetcher().catch(() => null); + + // eslint-disable-next-line no-await-in-loop + state = res || await this.#controller.sleep(15_000); + } + const populated = Boolean(this.#state.length); if (state.length) { From 5a26b9c318636300da6ee482840c09ef676a0aa4 Mon Sep 17 00:00:00 2001 From: David White Date: Fri, 28 Jun 2024 13:15:48 +0100 Subject: [PATCH 06/16] Fix up default case --- ZelBack/src/services/networkStateService.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ZelBack/src/services/networkStateService.js b/ZelBack/src/services/networkStateService.js index 08f10371f..488c2f61f 100644 --- a/ZelBack/src/services/networkStateService.js +++ b/ZelBack/src/services/networkStateService.js @@ -145,11 +145,11 @@ async function verifyBroadcast(broadcast) { // zelappregister zelappupdate fluxappregister fluxappupdate default: // we take the first node. Why??? What does this validate? - target = nodes.size ? nodes.keys().next().value.ip : null; + target = nodes.size ? nodes.keys().next().value : null; error = `No node belonging to ${pubKey} found`; } - const node = nodes.get(target) || (await stateManager.search(target, 'endpoint')); + const node = nodes.get(target) || await stateManager.search(target, 'endpoint'); if (!node) { log.warn(error); From e66ef265bdaa8cc97d7003eb47fcd976cd1bc545 Mon Sep 17 00:00:00 2001 From: David White Date: Sun, 30 Jun 2024 10:49:45 +0100 Subject: [PATCH 07/16] Use fluxd pub/sub --- ZelBack/config/default.js | 1 + .../daemonServiceFluxnodeRpcs.js | 4 +- .../daemonService/daemonServiceUtils.js | 36 +++- ZelBack/src/services/fluxCommunication.js | 1 + ZelBack/src/services/networkStateService.js | 78 ++++++- ZelBack/src/services/systemService.js | 14 +- .../src/services/utils/networkStateManager.js | 199 +++++++++++++++--- .../src/services/utils/networkStateWorker.js | 24 +++ lib/fullnode/index.js | 48 +++-- package.json | 3 +- tests/unit/fluxCommunication.test.js | 98 +++++---- tests/unit/fluxNetworkHelper.test.js | 8 +- ...ls.test.js => networkStateService.test.js} | 100 ++++----- 13 files changed, 449 insertions(+), 165 deletions(-) create mode 100644 ZelBack/src/services/utils/networkStateWorker.js rename tests/unit/{fluxCommunicationUtils.test.js => networkStateService.test.js} (87%) diff --git a/ZelBack/config/default.js b/ZelBack/config/default.js index 7fcee3da1..922584a65 100644 --- a/ZelBack/config/default.js +++ b/ZelBack/config/default.js @@ -74,6 +74,7 @@ module.exports = { rpcport: 16124, porttestnet: 26125, rpcporttestnet: 26124, + zmqport: 28332, }, minimumFluxBenchAllowedVersion: '4.0.0', minimumFluxOSAllowedVersion: '5.4.0', diff --git a/ZelBack/src/services/daemonService/daemonServiceFluxnodeRpcs.js b/ZelBack/src/services/daemonService/daemonServiceFluxnodeRpcs.js index c5c8a4d30..cff70a74e 100644 --- a/ZelBack/src/services/daemonService/daemonServiceFluxnodeRpcs.js +++ b/ZelBack/src/services/daemonService/daemonServiceFluxnodeRpcs.js @@ -207,13 +207,15 @@ async function startFluxNode(req, res) { async function viewDeterministicFluxNodeList(req, res) { let { filter } = req.params; filter = filter || req.query.filter; + const useCache = req.params.useCache ?? true; + const rpccall = 'viewdeterministiczelnodelist'; // viewdeterministicfluxnodelist const rpcparameters = []; if (filter) { rpcparameters.push(filter); } - response = await daemonServiceUtils.executeCall(rpccall, rpcparameters); + response = await daemonServiceUtils.executeCall(rpccall, rpcparameters, { useCache }); return res ? res.json(response) : response; } diff --git a/ZelBack/src/services/daemonService/daemonServiceUtils.js b/ZelBack/src/services/daemonService/daemonServiceUtils.js index d2180a0fa..7b53b06db 100644 --- a/ZelBack/src/services/daemonService/daemonServiceUtils.js +++ b/ZelBack/src/services/daemonService/daemonServiceUtils.js @@ -39,7 +39,9 @@ let daemonCallRunning = false; * @param {string[]} params RPC parameters. * @returns {object} Message. */ -async function executeCall(rpc, params) { +async function executeCall(rpc, params, options = {}) { + const useCache = options.useCache ?? true; + const rpcparameters = params || []; try { let data; @@ -71,11 +73,11 @@ async function executeCall(rpc, params) { const randomDelay = Math.floor((Math.random() * 25)) + 10; await serviceHelper.delay(randomDelay); } - if (rpc === 'getBlock') { + if (useCache && rpc === 'getBlock') { data = blockCache.get(rpc + serviceHelper.ensureString(rpcparameters)); - } else if (rpc === 'getRawTransaction') { + } else if (useCache && rpc === 'getRawTransaction') { data = rawTxCache.get(rpc + serviceHelper.ensureString(rpcparameters)); - } else { + } else if (useCache) { data = cache.get(rpc + serviceHelper.ensureString(rpcparameters)); } if (!data) { @@ -177,9 +179,35 @@ function getConfigValue(parameter) { return value; } +/** + * To set a value for a specified key from the configuration file. + * @param {string} parameter Config key. + * @param {string} value Config key value. + * @param {{write?: boolean, replace?: boolean}} options + * @returns {string} Config value. + */ +function setConfigValue(parameter, value, options = {}) { + const write = options.write || false; + const replace = options.replace || false; + + fnconfig.set(parameter, value, replace); + if (write) fnconfig.write(); +} + +/** + * To get the fluxd config dir. + * @returns {string} Config directory path. + */ +function getConfigDir() { + const value = fnconfig.defaultFolderPath; + return value; +} + module.exports = { executeCall, + getConfigDir, getConfigValue, + setConfigValue, // exports for testing purposes setStandardCache, diff --git a/ZelBack/src/services/fluxCommunication.js b/ZelBack/src/services/fluxCommunication.js index 3d6f8d37e..c65e7a903 100644 --- a/ZelBack/src/services/fluxCommunication.js +++ b/ZelBack/src/services/fluxCommunication.js @@ -659,6 +659,7 @@ async function initiateAndHandleConnection(connection) { } return; } + // check if we have the message in cache. If yes, return false. If not, store it and continue await serviceHelper.delay(Math.floor(Math.random() * 75 + 1)); // await max 75 miliseconds random, should help processing duplicated messages received at same timestamp diff --git a/ZelBack/src/services/networkStateService.js b/ZelBack/src/services/networkStateService.js index 488c2f61f..fea35daa8 100644 --- a/ZelBack/src/services/networkStateService.js +++ b/ZelBack/src/services/networkStateService.js @@ -1,7 +1,13 @@ +const fs = require('node:fs/promises'); +const path = require('node:path'); + +const config = require('config'); const log = require('../lib/log'); const serviceHelper = require('./serviceHelper'); +const systemService = require('./systemService'); const verificationHelper = require('./verificationHelper'); const daemonServiceFluxnodeRpcs = require('./daemonService/daemonServiceFluxnodeRpcs'); +const daemonServiceUtils = require('./daemonService/daemonServiceUtils'); const networkStateManager = require('./utils/networkStateManager'); /** @@ -15,12 +21,69 @@ const networkStateManager = require('./utils/networkStateManager'); */ let stateManager = null; +async function enableZmq(zmqEndpoint) { + const fluxConfigDir = daemonServiceUtils.getConfigDir(); + const zmqEnabledPath = path.join(fluxConfigDir, '.zmqEnabled'); + + const exists = Boolean(await fs.stat(zmqEnabledPath).catch(() => false)); + + if (exists) return true; + + let parseError = false; + + try { + const { protocol, hostname, port } = new URL(zmqEndpoint); + + if (!protocol === 'tcp' || !hostname || !port) parseError = true; + } catch { + parseError = true; + } + + if (parseError) { + log.error(`Error parsing zmqEndpoint: ${zmqEndpoint}. Unable to start zmq publisher`); + return false; + } + + const topics = [ + 'zmqpubhashtx', + 'zmqpubhashblock', + 'zmqpubrawblock', + 'zmqpubrawtx', + 'zmqpubsequence', + ]; + + topics.forEach((topic, index) => { + const write = index + 1 === topics.length; + daemonServiceUtils.setConfigValue(topic, zmqEndpoint, { + write, + replace: true, + }); + }); + + await fs.writeFile(zmqEnabledPath, '').catch(() => { }); + + const { error: restartError } = await systemService.restartSystemdService('zelcash.service'); + + if (restartError) { + log.error('Error restarting zelcash.service after config update'); + return false; + } + + return true; +} + async function start() { + const { daemon: { zmqport } } = config ?? { daemon: { zmqport: 28332 } }; + + let zmqEndpoint = `tcp://127.0.0.1:${zmqport}`; + const zmqEnabled = await enableZmq(zmqEndpoint); + zmqEndpoint = zmqEnabled ? zmqEndpoint : null; + return new Promise((resolve, reject) => { if (stateManager) resolve(); const fetcher = async (filter = null) => { - const options = { params: { filter }, query: { filter: null } }; + const options = { params: { useCache: false, filter }, query: { filter: null } }; const res = await daemonServiceFluxnodeRpcs.viewDeterministicFluxNodeList( options, @@ -33,7 +96,9 @@ async function start() { stateManager = new networkStateManager.NetworkStateManager(fetcher, { intervalMs: 120_000, + zmqEndpoint, }); + const timeout = setTimeout( () => reject(new Error('Unable To Start: Timeout of 300s reached')), 300_000, @@ -52,6 +117,7 @@ async function stop() { if (!stateManager) return; await stateManager.stop(); + stateManager = null; } /** @@ -67,7 +133,7 @@ function networkState() { async function getFluxnodesByPubkey(pubkey) { const nodes = await stateManager.search(pubkey, 'pubkey'); // in the future, just return the map. - return [...nodes.values()]; + return nodes ? [...nodes.values()] : []; } /** @@ -94,7 +160,6 @@ function isBroadcastStale(broadcast, options = {}) { /** * To verify Flux broadcast. * @param {object} broadcast Flux network layer message containing public key, timestamp, signature and version. - * @param {{maxAge?: number}} options * @returns {Promise} False unless message is successfully verified. */ async function verifyBroadcast(broadcast) { @@ -165,6 +230,13 @@ async function verifyBroadcast(broadcast) { return verified; } +if (require.main === module) { + start(); + setInterval(() => { + console.log(stateManager.search('045ae66321cfc172086d79252323b6cd4b83460e580e88f220582affda8a83b3ec68078ad80f7e465c42c3ef9bc01b912b3663e2ba09057bc43fbedf0afa9f3864', 'pubkey')); + }, 5_000); +} + module.exports = { getFluxnodesByPubkey, isBroadcastStale, diff --git a/ZelBack/src/services/systemService.js b/ZelBack/src/services/systemService.js index 940372e52..231429b7e 100644 --- a/ZelBack/src/services/systemService.js +++ b/ZelBack/src/services/systemService.js @@ -516,6 +516,15 @@ async function monitorSystem() { } } +async function restartSystemdService(service) { + const { error } = await serviceHelper.runCommand('systemctl', { + runAsRoot: true, + params: ['restart', service], + }); + + return !error; +} + async function mongoDBConfig() { log.info('MongoDB file config verification...'); try { @@ -601,11 +610,11 @@ async function mongodGpgKeyVeryfity() { } log.info('The key was updated successfully.'); return true; - // eslint-disable-next-line no-else-return + // eslint-disable-next-line no-else-return } else { throw new Error('MongoDB version not found.'); } - // eslint-disable-next-line no-else-return + // eslint-disable-next-line no-else-return } else { log.info('MongoDB GPG key is still valid.'); return true; @@ -631,6 +640,7 @@ module.exports = { monitorSyncthingPackage, queueAptGetCommand, resetTimers, + restartSystemdService, updateAptCache, upgradePackage, mongoDBConfig, diff --git a/ZelBack/src/services/utils/networkStateManager.js b/ZelBack/src/services/utils/networkStateManager.js index 863ea06a9..0c50df895 100644 --- a/ZelBack/src/services/utils/networkStateManager.js +++ b/ZelBack/src/services/utils/networkStateManager.js @@ -4,9 +4,13 @@ * multiple blocks in a short timeframe. */ +const { Worker } = require('node:worker_threads'); + const { EventEmitter } = require('node:events'); const { FluxController } = require('./fluxController'); +const zmq = require('zeromq'); + /** * The Fluxnode as returned by fluxd * @typedef {{ @@ -42,6 +46,23 @@ class NetworkStateManager extends EventEmitter { #controller = new FluxController(); + #socket = new zmq.Subscriber({ + reconnectInterval: 500, + reconnectMaxInterval: 15_000, + heartbeatInterval: 20_000, + heartbeatTimeout: 60_000, + connectTimeout: 3_000, + tcpMaxRetransmitTimeout: 120_000, + }); + + #socketConnected = null; + + #updatingState = false; + + #indexWorker = null; + + #indexStart = null; + /** * * @param {()=>Promise>} stateFetcher @@ -52,6 +73,13 @@ class NetworkStateManager extends EventEmitter { this.stateFetcher = stateFetcher; this.intervalMs = options.intervalMs || 0; + this.zmqEndpoint = options.zmqEndpoint || null; + this.fallbackTimeout = options.fallbackTimeout || 120_000; + this.useIndexWorker = options.useIndexWorker || false; + + if (this.useIndexWorker) { + this.#createIndexWorker(); + } } get state() { @@ -63,9 +91,51 @@ class NetworkStateManager extends EventEmitter { return this.#running; } - #setIndexes(nodes, options = {}) { + #createIndexWorker() { + this.#indexWorker = new Worker('./ZelBack/src/services/utils/networkStateWorker.js'); + this.#indexWorker.on('message', (indexes) => { + console.log('Indexes created, elapsed ms:', Number(process.hrtime.bigint() - this.#indexStart) / 1000000); + this.#indexStart = null; + + const { pubkeyIndex, endpointIndex } = indexes; + this.#setIndexes(pubkeyIndex, endpointIndex); + this.emit('updated'); + }); + this.#indexWorker.on('error', (err) => { console.log(err); }); + this.#indexWorker.on('exit', (code) => { console.log(code); }); + } + + #setIndexes(pubkeyIndex, endpointIndex) { + this.#pubkeyIndex = pubkeyIndex; + this.#endpointIndex = endpointIndex; + } + + #buildIndexes(nodes, options = {}) { const filter = options.filter || null; - const type = options.type || 'pubkey'; + const type = options.type || 'stateUpdate'; + + if (type === 'stateUpdate') { + // nodes.forEach((node) => { + // if (!this.#pubkeyIndex.has(node.pubkey)) { + // this.#pubkeyIndex.set(node.pubkey, new Map()); + // } + // this.#pubkeyIndex.get(node.pubkey).set(node.ip, node); + // this.#endpointIndex.set(node.ip, node); + // }); + nodes.forEach((node) => { + const nodesByPubkey = this.#pubkeyIndex.get(node.pubkey) + || this.#pubkeyIndex.set(node.pubkey, new Map()).get(node.pubkey); + + nodesByPubkey.set(node.ip, node); + this.#endpointIndex.set(node.ip, node); + }); + // nodes.forEach((node) => { + // const pubkeyIndex = this.#pubkeyIndex.get(node.pubkey) || new Map(); + // pubkeyIndex.set(node.ip, node); + // this.#endpointIndex.set(node.ip, node); + // }); + return; + } if (!nodes.length) { if (type === 'pubkey') { @@ -99,43 +169,115 @@ class NetworkStateManager extends EventEmitter { this.#pubkeyIndex.clear(); this.#endpointIndex.clear(); - this.#setIndexes(this.#state); + this.#buildIndexes(this.#state); } - async start() { - const runner = async () => { - // should use monotonic clock for any elapsed times - const start = Date.now(); + reset() { + // recreate objects so they can't be mutated externally + this.#pubkeyIndex = new Map(); + this.#endpointIndex = new Map(); + this.#state = []; + } - let state = null; + async fetchNetworkState() { + this.#updatingState = true; + // should use monotonic clock for any elapsed times + const start = process.hrtime.bigint(); - while (!state) { - if (this.#controller.aborted) break; + let state = null; - // eslint-disable-next-line no-await-in-loop - const res = await this.stateFetcher().catch(() => null); + while (!state) { + if (this.#controller.aborted) break; - // eslint-disable-next-line no-await-in-loop - state = res || await this.#controller.sleep(15_000); - } + const fetchStart = process.hrtime.bigint(); + // eslint-disable-next-line no-await-in-loop + const res = await this.stateFetcher().catch(() => null); + console.log('Fetch finished, elapsed ms:', Number(process.hrtime.bigint() - fetchStart) / 1000000); + + // eslint-disable-next-line no-await-in-loop + state = res || await this.#controller.sleep(15_000); + } - const populated = Boolean(this.#state.length); + const populated = Boolean(this.#state.length); - if (state.length) { - this.#state = state; + if (state.length) { + console.log('Nodes found:', state.length); + console.log('Setting state and indexes'); + this.#indexStart = process.hrtime.bigint(); + this.#state = state; + + if (!populated) this.emit('populated'); + + if (this.useIndexWorker) { + this.#indexWorker.postMessage(state); + } else { this.#recreateIndexes(); - if (!populated) this.emit('populated'); + console.log('Indexes created, elapsed ms:', Number(process.hrtime.bigint() - this.#indexStart) / 1000000); + this.#indexStart = null; + this.emit('updated'); } + } + + this.#updatingState = false; + + const elapsed = Number(process.hrtime.bigint() - start) / 1000000; + return this.intervalMs - elapsed; + } + + async #handleBlocks() { + await this.fetchNetworkState(); + // eslint-disable-next-line no-restricted-syntax + for await (const [topicBuf, msgBuf, seqBuf] of this.#socket) { + const topic = topicBuf.toString(); + const blockId = msgBuf.toString('hex'); + const seq = seqBuf.readUInt32LE(0); + console.log(`${topic}: ${blockId}, sequence: ${seq}`); + this.emit('block', blockId); + + if (this.#updatingState) this.#controller.abort(); + this.fetchNetworkState(); + } + } + + #startSubscription() { + this.#socket.events.on('connect', () => { + if (this.#socketConnected === false) { + clearTimeout(this.fallbackTimer); + this.fallbackTimer = null; + this.#controller.stopLoop(); + } + + this.#socketConnected = true; + this.emit('zmqConnected'); + }); + + this.#socket.events.on('disconnect', () => { + this.#socketConnected = false; + this.emit('zmqDisconnected'); + this.fallbackTimer = setTimeout(() => this.#startPolling(), this.fallbackTimeout); + }); + + this.#socket.connect(`tcp://${this.zmqEndpoint}`); + this.#socket.subscribe('hashblock'); - const elapsed = Date.now() - start; - return this.intervalMs - elapsed; - }; + this.#handleBlocks(); + } + + #startPolling() { + this.#controller.startLoop(this.fetchNetworkState); + } + + async start() { + const starter = this.zmqEndpoint + ? this.#startSubscription + : this.#startPolling; - this.#controller.startLoop(runner); + starter.bind(this)(); } async stop() { + // is this right? await this.#controller.abort(); } @@ -148,21 +290,22 @@ class NetworkStateManager extends EventEmitter { * @param {"pubkey"|"endpoint"} type * @returns */ - async search(filter, type) { + search(filter, type) { if (!filter) return null; if (!Object.keys(this.#indexes).includes(type)) return null; const cached = this.#indexes[type].get(filter); - if (cached) return cached; + // if (cached) return cached; + return cached || null; // this is a reference to the object in the LRU cache (daemonServiceutils), // if you modify this object, you are actually modifying the object in the cache. // Any LRU cache should not return a direct object, it should return a copy. - const nodes = await this.stateFetcher(filter).catch(() => []); + // const nodes = await this.stateFetcher(filter).catch(() => []); - this.#setIndexes(nodes, { filter, type }); + // this.#setIndexes(nodes, { filter, type }); - return this.#indexes[type].get(filter); + // return this.#indexes[type].get(filter); } } diff --git a/ZelBack/src/services/utils/networkStateWorker.js b/ZelBack/src/services/utils/networkStateWorker.js new file mode 100644 index 000000000..1e41a22cd --- /dev/null +++ b/ZelBack/src/services/utils/networkStateWorker.js @@ -0,0 +1,24 @@ +const { parentPort } = require('node:worker_threads'); + +parentPort.on('message', (nodes) => { + const pubkeyIndex = new Map(); + const endpointIndex = new Map(); + + nodes.forEach((node) => { + const nodesByPubkey = pubkeyIndex.get(node.pubkey) + || pubkeyIndex.set(node.pubkey, new Map()).get(node.pubkey); + + nodesByPubkey.set(node.ip, node); + endpointIndex.set(node.ip, node); + }); + + parentPort.postMessage({ pubkeyIndex, endpointIndex }); +}); + +// nodes.forEach((node) => { +// if (!this.#pubkeyIndex.has(node.pubkey)) { +// this.#pubkeyIndex.set(node.pubkey, new Map()); +// } +// this.#pubkeyIndex.get(node.pubkey).set(node.ip, node); +// this.#endpointIndex.set(node.ip, node); +// }); diff --git a/lib/fullnode/index.js b/lib/fullnode/index.js index 51150e486..c9f8a6c64 100644 --- a/lib/fullnode/index.js +++ b/lib/fullnode/index.js @@ -60,23 +60,18 @@ function Config() { this.defaultConfigPath = confFilePath; this.defaultFolderPath = folderPath; - // Read all lines from config file, remove commented lines and empty lines. - this.filteredlines = fs.readFileSync(confFilePath, 'utf-8').split('\n').filter((line) => line.trim() && !line.trim().startsWith('#')); + this.lines = fs.readFileSync(confFilePath, 'utf-8').split('\n').map((line) => line.trim()); + // Create a dictionary from keys and values - // Maximum of 6 = signs can be in config line - this.keysvalues = this.filteredlines.reduce((map, line) => { - const sp = line.split('=', 6); - if (sp.length === 2) { - map[sp[0].trim()] = sp[1].trim(); - } else if (sp.length === 3) { - map[sp[0].trim()] = `${sp[1].trim()}=${sp[2].trim()}`; - } else if (sp.length === 4) { - map[sp[0].trim()] = `${sp[1].trim()}=${sp[2].trim()}=${sp[3].trim()}`; - } else if (sp.length === 5) { - map[sp[0].trim()] = `${sp[1].trim()}=${sp[2].trim()}=${sp[3].trim()}=${sp[4].trim()}`; - } else if (sp.length === 6) { - map[sp[0].trim()] = `${sp[1].trim()}=${sp[2].trim()}=${sp[3].trim()}=${sp[4].trim()}=${sp[5].trim()}`; - } + this.keysvalues = this.lines.reduce((map, line) => { + const [key, value] = line.split(/=(.*)/, 2); + + if (!key && !value) return map; + + if (!(key in map)) map[key] = value; + else if (typeof map[key] === 'string') map[key] = [map[key], value]; + else map[key].push(value); + return map; }, {}); } @@ -90,6 +85,27 @@ Config.prototype.defaultConfig = function () { Config.prototype.get = function (name) { return this.keysvalues[name]; }; +Config.prototype.set = function (key, value, replace = false) { + // strip whitespace? + const map = this.keysvalues; + if (replace || !(key in map)) map[key] = value; + else if (typeof map[key] === 'string') map[key] = [map[key], value]; + else { + const index = map[key].indexOf(value); + if (index === -1) map[key].push(value); + } +}; +Config.prototype.write = function () { + const lines = Object.entries(this.keysvalues).flatMap((entry) => { + const [key, value] = entry; + + if (typeof value === 'string') return `${key}=${value}`; + return value.map((item) => `${key}=${item}`); + }); + + const fileData = `${lines.join('\n')}\n`; + fs.writeFileSync(this.defaultConfigPath, fileData); +}; Config.prototype.rpcuser = function () { return this.keysvalues.rpcuser; }; diff --git a/package.json b/package.json index 3ca75a8ba..6955c72e4 100644 --- a/package.json +++ b/package.json @@ -122,7 +122,8 @@ "tar-fs": "~3.0.6", "ws": "~7.5.9", "xterm": "~5.1.0", - "zeltrezjs": "~2.12.0" + "zeltrezjs": "~2.12.0", + "zeromq": "^6.0.0-beta.20" }, "devDependencies": { "@babel/core": "~7.23.6", diff --git a/tests/unit/fluxCommunication.test.js b/tests/unit/fluxCommunication.test.js index 0a9cc0df5..83bc5847d 100644 --- a/tests/unit/fluxCommunication.test.js +++ b/tests/unit/fluxCommunication.test.js @@ -9,7 +9,7 @@ const fluxCommunicationMessagesSender = require('../../ZelBack/src/services/flux const fluxNetworkHelper = require('../../ZelBack/src/services/fluxNetworkHelper'); const dbHelper = require('../../ZelBack/src/services/dbHelper'); const verificationHelper = require('../../ZelBack/src/services/verificationHelper'); -const fluxCommunicationUtils = require('../../ZelBack/src/services/fluxCommunicationUtils'); +const networkStateService = require('../../ZelBack/src/services/networkStateService'); const daemonServiceMiscRpcs = require('../../ZelBack/src/services/daemonService/daemonServiceMiscRpcs'); const appsService = require('../../ZelBack/src/services/appsService'); const generalService = require('../../ZelBack/src/services/generalService'); @@ -986,7 +986,7 @@ describe('fluxCommunication tests', () => { }, }); lruRateLimitStub.returns(false); - const checkObjectSpy = sinon.spy(fluxCommunicationUtils, 'verifyOriginalFluxBroadcast'); + const checkObjectSpy = sinon.spy(networkStateService, 'verifyBroadcast'); await fluxCommunication.initiateAndHandleConnection(ip); await waitForWsConnected(wsserver); @@ -1043,53 +1043,49 @@ describe('fluxCommunication tests', () => { sinon.assert.calledWith(logSpy, 'Connection 127.0.0.2:16127 removed from outgoingPeers'); }); - const appRequestCommands = ['fluxapprequest']; - // eslint-disable-next-line no-restricted-syntax - for (const command of appRequestCommands) { - // eslint-disable-next-line no-loop-func - it(`should handle the ${command} message properly`, async () => { - const message = JSON.stringify({ - timestamp: Date.now(), - pubKey: '1234asd', - signature: 'blabla', - version: 1, - data: { - type: `${command}`, - }, - }); - const waitForWsConnected = (wss) => new Promise((resolve, reject) => { - wss.on('connection', (ws) => { - ws.send(message); - resolve(); - }); - // eslint-disable-next-line no-param-reassign - wss.onerror = (err) => { - reject(err); - }; - }); - const ip = '127.0.0.2'; - wsserver = new WebSocket.Server({ host: '127.0.0.2', port: 16127 }); - lruRateLimitStub.returns(true); - sinon.stub(LRUCache.prototype, 'has').returns(false); - const verifyOriginalFluxBroadcastStub = sinon.stub(fluxCommunicationUtils, 'verifyOriginalFluxBroadcast').returns(true); - const respondWithAppMessageStub = sinon.stub(fluxCommunicationMessagesSender, 'respondWithAppMessage').returns(true); - daemonServiceMiscRpcsStub.returns({ - data: - { - synced: false, - height: 0, - }, + // eslint-disable-next-line no-loop-func + it('should handle the fluxapprequest message properly', async () => { + const message = JSON.stringify({ + timestamp: Date.now(), + pubKey: '1234asd', + signature: 'blabla', + version: 1, + data: { + type: 'fluxapprequest', + }, + }); + const waitForWsConnected = (wss) => new Promise((resolve, reject) => { + wss.on('connection', (ws) => { + ws.send(message); + resolve(); }); - await fluxCommunication.initiateAndHandleConnection(ip); + // eslint-disable-next-line no-param-reassign + wss.onerror = (err) => { + reject(err); + }; + }); + const ip = '127.0.0.2'; + wsserver = new WebSocket.Server({ host: '127.0.0.2', port: 16127 }); + lruRateLimitStub.returns(true); + sinon.stub(LRUCache.prototype, 'has').returns(false); + const verifyBroadcastStub = sinon.stub(networkStateService, 'verifyBroadcast').returns(true); + const respondWithAppMessageStub = sinon.stub(fluxCommunicationMessagesSender, 'respondWithAppMessage').returns(true); + daemonServiceMiscRpcsStub.returns({ + data: + { + synced: false, + height: 0, + }, + }); + await fluxCommunication.initiateAndHandleConnection(ip); - await waitForWsConnected(wsserver); - // slight delay to let onopen to be triggered - await serviceHelper.delay(100); + await waitForWsConnected(wsserver); + // slight delay to let onopen to be triggered + await serviceHelper.delay(100); - sinon.assert.calledOnceWithExactly(verifyOriginalFluxBroadcastStub, JSON.parse(message), undefined, sinon.match.number); - sinon.assert.calledWith(respondWithAppMessageStub, JSON.parse(message)); - }); - } + sinon.assert.calledOnceWithExactly(verifyBroadcastStub, JSON.parse(message)); + sinon.assert.calledWith(respondWithAppMessageStub, JSON.parse(message)); + }); const registerUpdateAppList = ['zelappregister', 'zelappupdate', 'fluxappregister', 'fluxappupdate']; // eslint-disable-next-line no-restricted-syntax @@ -1119,7 +1115,7 @@ describe('fluxCommunication tests', () => { wsserver = new WebSocket.Server({ host: '127.0.0.2', port: 16127 }); lruRateLimitStub.returns(true); sinon.stub(LRUCache.prototype, 'has').returns(false); - const verifyOriginalFluxBroadcastStub = sinon.stub(fluxCommunicationUtils, 'verifyOriginalFluxBroadcast').returns(true); + const verifyFluxBroadcast = sinon.stub(networkStateService, 'verifyBroadcast').returns(true); const storeAppTemporaryMessageStub = sinon.stub(appsService, 'storeAppTemporaryMessage').returns(false); daemonServiceMiscRpcsStub.returns({ data: @@ -1134,7 +1130,7 @@ describe('fluxCommunication tests', () => { // slight delay to let onopen to be triggered await serviceHelper.delay(100); - sinon.assert.calledOnceWithExactly(verifyOriginalFluxBroadcastStub, JSON.parse(message), undefined, sinon.match.number); + sinon.assert.calledOnceWithExactly(verifyFluxBroadcast, JSON.parse(message)); sinon.assert.calledOnceWithExactly(storeAppTemporaryMessageStub, JSON.parse(message).data, true); }); } @@ -1167,7 +1163,7 @@ describe('fluxCommunication tests', () => { wsserver = new WebSocket.Server({ host: '127.0.0.2', port: 16127 }); lruRateLimitStub.returns(true); sinon.stub(LRUCache.prototype, 'has').returns(false); - const verifyOriginalFluxBroadcastStub = sinon.stub(fluxCommunicationUtils, 'verifyOriginalFluxBroadcast').returns(true); + const verifyBroadcast = sinon.stub(networkStateService, 'verifyBroadcast').returns(true); const storeAppRunningMessageStub = sinon.stub(appsService, 'storeAppRunningMessage').returns(false); daemonServiceMiscRpcsStub.returns({ data: @@ -1182,7 +1178,7 @@ describe('fluxCommunication tests', () => { // slight delay to let onopen to be triggered await serviceHelper.delay(100); - sinon.assert.calledOnceWithExactly(verifyOriginalFluxBroadcastStub, JSON.parse(message), undefined, sinon.match.number); + sinon.assert.calledOnceWithExactly(verifyBroadcast, JSON.parse(message)); sinon.assert.calledOnceWithExactly(storeAppRunningMessageStub, JSON.parse(message).data); }); } @@ -1413,7 +1409,7 @@ describe('fluxCommunication tests', () => { ]; sinon.stub(fluxNetworkHelper, 'getMyFluxIPandPort').returns('44.192.51.11:16127'); fluxNetworkHelper.setMyFluxIp('44.192.51.11'); - sinon.stub(fluxCommunicationUtils, 'deterministicFluxList').returns(fluxNodeList); + sinon.stub(networkStateService, 'networkState').returns(fluxNodeList); sinon.stub(serviceHelper, 'delay').resolves(() => new Promise((resolve) => { setTimeout(resolve, 50); })); const infoSpy = sinon.spy(log, 'info'); daemonServiceStub.returns({ diff --git a/tests/unit/fluxNetworkHelper.test.js b/tests/unit/fluxNetworkHelper.test.js index 68fc44fa7..ddc59ad62 100644 --- a/tests/unit/fluxNetworkHelper.test.js +++ b/tests/unit/fluxNetworkHelper.test.js @@ -14,7 +14,7 @@ const daemonServiceUtils = require('../../ZelBack/src/services/daemonService/dae const daemonServiceBenchmarkRpcs = require('../../ZelBack/src/services/daemonService/daemonServiceBenchmarkRpcs'); const daemonServiceWalletRpcs = require('../../ZelBack/src/services/daemonService/daemonServiceWalletRpcs'); const daemonServiceFluxnodeRpcs = require('../../ZelBack/src/services/daemonService/daemonServiceFluxnodeRpcs'); -const fluxCommunicationUtils = require('../../ZelBack/src/services/fluxCommunicationUtils'); +const networkStateService = require('../../ZelBack/src/services/networkStateService'); const benchmarkService = require('../../ZelBack/src/services/benchmarkService'); const verificationHelper = require('../../ZelBack/src/services/verificationHelper'); @@ -472,7 +472,7 @@ describe('fluxNetworkHelper tests', () => { rank: 1, }, ]; - deterministicFluxListStub = sinon.stub(fluxCommunicationUtils, 'deterministicFluxList'); + deterministicFluxListStub = sinon.stub(networkStateService, 'networkState'); fluxNetworkHelper.setMyFluxIp('83.52.214.240:16167'); }); @@ -1094,7 +1094,7 @@ describe('fluxNetworkHelper tests', () => { rank: 0, }, ]; - sinon.stub(fluxCommunicationUtils, 'deterministicFluxList').returns(deterministicFluxnodeListResponse); + sinon.stub(networkStateService, 'networkState').returns(deterministicFluxnodeListResponse); sinon.stub(daemonServiceWalletRpcs, 'createConfirmationTransaction').returns(true); sinon.stub(serviceHelper, 'delay').returns(true); }); @@ -1340,7 +1340,7 @@ describe('fluxNetworkHelper tests', () => { }]; getBenchmarksStub = sinon.stub(daemonServiceBenchmarkRpcs, 'getBenchmarks'); isDaemonSyncedStub = sinon.stub(daemonServiceMiscRpcs, 'isDaemonSynced'); - deterministicFluxListStub = sinon.stub(fluxCommunicationUtils, 'deterministicFluxList'); + deterministicFluxListStub = sinon.stub(networkStateService, 'networkState'); getFluxNodeStatusStub = sinon.stub(daemonServiceFluxnodeRpcs, 'getFluxNodeStatus'); fluxNetworkHelper.setDosMessage(null); fluxNetworkHelper.setDosStateValue(0); diff --git a/tests/unit/fluxCommunicationUtils.test.js b/tests/unit/networkStateService.test.js similarity index 87% rename from tests/unit/fluxCommunicationUtils.test.js rename to tests/unit/networkStateService.test.js index bcc586b02..f43b2ca66 100644 --- a/tests/unit/fluxCommunicationUtils.test.js +++ b/tests/unit/networkStateService.test.js @@ -1,16 +1,17 @@ const chai = require('chai'); -const { LRUCache } = require('lru-cache'); -const sinon = require('sinon'); -const proxyquire = require('proxyquire'); const { expect } = chai; -let fluxCommunicationUtils = require('../../ZelBack/src/services/fluxCommunicationUtils'); + +const sinon = require('sinon'); + const fluxCommunicationMessagesSender = require('../../ZelBack/src/services/fluxCommunicationMessagesSender'); const daemonServiceFluxnodeRpcs = require('../../ZelBack/src/services/daemonService/daemonServiceFluxnodeRpcs'); const fluxList = require('./data/listfluxnodes.json'); -describe('fluxCommunicationUtils tests', () => { - describe('deterministicFluxList tests', () => { +const networkStateService = require('../../ZelBack/src/services/networkStateService'); + +describe('networkStateService tests', () => { + describe('networkState tests', () => { const deterministicFluxnodeListResponseBase = { data: [ { @@ -75,35 +76,27 @@ describe('fluxCommunicationUtils tests', () => { daemonStub = sinon.stub(daemonServiceFluxnodeRpcs, 'viewDeterministicFluxNodeList'); }); - afterEach(() => { - daemonStub.restore(); + afterEach(async () => { sinon.restore(); + await networkStateService.stop(); }); it('should return the whole list if the filter was not provided', async () => { - // Start with clear cache - fluxCommunicationUtils = proxyquire( - '../../ZelBack/src/services/fluxCommunicationUtils', - { 'lru-cache': LRUCache }, - ); const deterministicFluxnodeListResponse = { ...deterministicFluxnodeListResponseBase, status: 'success', }; daemonStub.resolves(deterministicFluxnodeListResponse); - const deterministicFluxListResult = await fluxCommunicationUtils.deterministicFluxList(); + await networkStateService.start(); + + const deterministicFluxListResult = networkStateService.networkState(); expect(deterministicFluxListResult).to.eql(deterministicFluxnodeListResponse.data); sinon.assert.calledOnce(daemonStub); }); it('should return the list filtered out with proper public key', async () => { - // Start with clear cache - fluxCommunicationUtils = proxyquire( - '../../ZelBack/src/services/fluxCommunicationUtils', - { 'lru-cache': LRUCache }, - ); const filteredPubKey = '04d50620a31f045c61be42bad44b7a9424ffb6de37bf256b88f00e118e59736165255f2f4585b36c7e1f8f3e20db4fa4e55e61cc01dc7a5cd2b2ed0153627588dc'; const expectedResult = [{ collateral: 'COutPoint(46c9ae0313fc128d0fb4327f5babc7868fe557035b58e0a7cb475cfd8819f8c7, 0)', @@ -148,18 +141,15 @@ describe('fluxCommunicationUtils tests', () => { }; daemonStub.resolves(deterministicFluxnodeListResponse); - const deterministicFluxListResult = await fluxCommunicationUtils.deterministicFluxList(filteredPubKey); + await networkStateService.start(); + + const deterministicFluxListResult = await networkStateService.getFluxnodesByPubkey(filteredPubKey); expect(deterministicFluxListResult).to.eql(expectedResult); sinon.assert.calledOnce(daemonStub); }); it('should return an empty list if the public key does not match', async () => { - // Start with clear cache - fluxCommunicationUtils = proxyquire( - '../../ZelBack/src/services/fluxCommunicationUtils', - { 'lru-cache': LRUCache }, - ); const filteredPubKey = '04d50620a31f045c61be42bad44b7a9424asdfde37bf256b88f00e118e59736165255f2f4585b36c7e1f8f3e20db4fa4e55e61cc01dc7a5cd2b2ed0153627588dc'; const expectedResult = []; @@ -169,29 +159,30 @@ describe('fluxCommunicationUtils tests', () => { }; daemonStub.resolves(deterministicFluxnodeListResponse); - const deterministicFluxListResult = await fluxCommunicationUtils.deterministicFluxList(filteredPubKey); + await networkStateService.start(); + + const deterministicFluxListResult = await networkStateService.getFluxnodesByPubkey(filteredPubKey); expect(deterministicFluxListResult).to.eql(expectedResult); - sinon.assert.calledOnce(daemonStub); + // once at start, once again to find non existent pubkey + sinon.assert.calledTwice(daemonStub); }); it('should get list from cache with no filter applied', async () => { - // Stub cache to simulate the actual lru-cache called - const getCacheStub = sinon.stub(); - const stubCache = sinon.stub().callsFake(() => ({ - get: getCacheStub, - })); - const stubCacheB = { LRUCache: stubCache }; - getCacheStub.withArgs('fluxList').returns(deterministicFluxnodeListResponseBase.data); - fluxCommunicationUtils = proxyquire( - '../../ZelBack/src/services/fluxCommunicationUtils', - { 'lru-cache': stubCacheB }, - ); - - const deterministicFluxListResult = await fluxCommunicationUtils.deterministicFluxList(); + const deterministicFluxnodeListResponse = { + ...deterministicFluxnodeListResponseBase, + status: 'success', + }; + daemonStub.resolves(deterministicFluxnodeListResponse); + + await networkStateService.start(); + + sinon.assert.calledOnce(daemonStub); + + const deterministicFluxListResult = networkStateService.networkState(); expect(deterministicFluxListResult).to.eql(deterministicFluxnodeListResponseBase.data); - sinon.assert.calledOnceWithExactly(getCacheStub, 'fluxList'); + sinon.assert.calledOnce(daemonStub); }); it('should get list from cache with filter applied', async () => { @@ -232,22 +223,21 @@ describe('fluxCommunicationUtils tests', () => { amount: '2000.00', rank: 1, }]; - // Stub cache to simulate the actual lru-cache called - const getCacheStub = sinon.stub(); - const stubCache = sinon.stub().callsFake(() => ({ - get: getCacheStub, - })); - const stubCacheB = { LRUCache: stubCache }; - getCacheStub.withArgs(`fluxList${filteredPubKey}`).returns(expectedResult); - fluxCommunicationUtils = proxyquire( - '../../ZelBack/src/services/fluxCommunicationUtils', - { 'lru-cache': stubCacheB }, - ); - - const deterministicFluxListResult = await fluxCommunicationUtils.deterministicFluxList(filteredPubKey); + + const deterministicFluxnodeListResponse = { + ...deterministicFluxnodeListResponseBase, + status: 'success', + }; + daemonStub.resolves(deterministicFluxnodeListResponse); + + await networkStateService.start(); + + sinon.assert.calledOnce(daemonStub); + + const deterministicFluxListResult = await networkStateService.getFluxnodesByPubkey(filteredPubKey); expect(deterministicFluxListResult).to.eql(expectedResult); - sinon.assert.calledOnceWithExactly(getCacheStub, `fluxList${filteredPubKey}`); + sinon.assert.calledOnce(daemonStub); }); }); From 171550a7f71da05b64d256a9425f334cef14a156 Mon Sep 17 00:00:00 2001 From: David White Date: Sun, 30 Jun 2024 10:53:40 +0100 Subject: [PATCH 08/16] Fix zmq uri --- ZelBack/src/services/utils/networkStateManager.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ZelBack/src/services/utils/networkStateManager.js b/ZelBack/src/services/utils/networkStateManager.js index 0c50df895..6d3526a3a 100644 --- a/ZelBack/src/services/utils/networkStateManager.js +++ b/ZelBack/src/services/utils/networkStateManager.js @@ -258,7 +258,7 @@ class NetworkStateManager extends EventEmitter { this.fallbackTimer = setTimeout(() => this.#startPolling(), this.fallbackTimeout); }); - this.#socket.connect(`tcp://${this.zmqEndpoint}`); + this.#socket.connect(this.zmqEndpoint); this.#socket.subscribe('hashblock'); this.#handleBlocks(); From 7c0715a3b5640d22859b2f921a75519fd1cb692f Mon Sep 17 00:00:00 2001 From: David White Date: Sun, 30 Jun 2024 18:02:30 +0100 Subject: [PATCH 09/16] fix indexes to yield to event queue --- ZelBack/src/services/utils/daemonrpcClient.js | 6 +- .../src/services/utils/networkStateManager.js | 111 ++++++++---------- apiServer.js | 2 +- 3 files changed, 58 insertions(+), 61 deletions(-) diff --git a/ZelBack/src/services/utils/daemonrpcClient.js b/ZelBack/src/services/utils/daemonrpcClient.js index 73e89506b..a6e491adf 100644 --- a/ZelBack/src/services/utils/daemonrpcClient.js +++ b/ZelBack/src/services/utils/daemonrpcClient.js @@ -2,8 +2,12 @@ const daemonrpc = require('daemonrpc'); const fullnode = require('fullnode'); const config = require('config'); +userconfig = require('../../../../config/userconfig'); + +const { initial: isTestnet } = userconfig; + const fnconfig = new fullnode.Config(); -const isTestnet = userconfig.initial.testnet; + const rpcuser = fnconfig.rpcuser() || 'rpcuser'; const rpcpassword = fnconfig.rpcpassword() || 'rpcpassword'; const rpcport = fnconfig.rpcport() || (isTestnet === true ? config.daemon.rpcporttestnet : config.daemon.rpcport); diff --git a/ZelBack/src/services/utils/networkStateManager.js b/ZelBack/src/services/utils/networkStateManager.js index 6d3526a3a..8496ebd21 100644 --- a/ZelBack/src/services/utils/networkStateManager.js +++ b/ZelBack/src/services/utils/networkStateManager.js @@ -40,8 +40,6 @@ class NetworkStateManager extends EventEmitter { #endpointIndex = new Map(); - #indexes = { pubkey: this.#pubkeyIndex, endpoint: this.#endpointIndex }; - #running = false; #controller = new FluxController(); @@ -91,6 +89,10 @@ class NetworkStateManager extends EventEmitter { return this.#running; } + get #indexes() { + return { pubkey: this.#pubkeyIndex, endpoint: this.#endpointIndex }; + } + #createIndexWorker() { this.#indexWorker = new Worker('./ZelBack/src/services/utils/networkStateWorker.js'); this.#indexWorker.on('message', (indexes) => { @@ -110,68 +112,55 @@ class NetworkStateManager extends EventEmitter { this.#endpointIndex = endpointIndex; } - #buildIndexes(nodes, options = {}) { - const filter = options.filter || null; - const type = options.type || 'stateUpdate'; - - if (type === 'stateUpdate') { - // nodes.forEach((node) => { - // if (!this.#pubkeyIndex.has(node.pubkey)) { - // this.#pubkeyIndex.set(node.pubkey, new Map()); - // } - // this.#pubkeyIndex.get(node.pubkey).set(node.ip, node); - // this.#endpointIndex.set(node.ip, node); - // }); - nodes.forEach((node) => { - const nodesByPubkey = this.#pubkeyIndex.get(node.pubkey) - || this.#pubkeyIndex.set(node.pubkey, new Map()).get(node.pubkey); + async #buildIndexes(nodes) { + // nodes.forEach((node) => { + // if (!this.#pubkeyIndex.has(node.pubkey)) { + // this.#pubkeyIndex.set(node.pubkey, new Map()); + // } + // this.#pubkeyIndex.get(node.pubkey).set(node.ip, node); + // this.#endpointIndex.set(node.ip, node); + // }); + + const nodeCount = nodes.length; + + const pubkeyIndex = new Map(); + const endpointIndex = new Map(); + + function iterIndexes(startIndex, callback) { + const endIndex = startIndex + 1000; + const chunk = nodes.slice(startIndex, endIndex); + + chunk.forEach((node) => { + const nodesByPubkey = pubkeyIndex.get(node.pubkey) + || pubkeyIndex.set(node.pubkey, new Map()).get(node.pubkey); nodesByPubkey.set(node.ip, node); - this.#endpointIndex.set(node.ip, node); + endpointIndex.set(node.ip, node); }); - // nodes.forEach((node) => { - // const pubkeyIndex = this.#pubkeyIndex.get(node.pubkey) || new Map(); - // pubkeyIndex.set(node.ip, node); - // this.#endpointIndex.set(node.ip, node); - // }); - return; - } - if (!nodes.length) { - if (type === 'pubkey') { - const toDelete = this.#indexes.pubkey.get(filter); - this.#indexes.pubkey.delete(filter); - toDelete.forEach((node) => { - this.#indexes.endpoint.delete(node.ip); - }); - } else if (type === 'endpoint') { - this.#indexes.endpoint.delete(filter); + if (endIndex >= nodeCount) { + callback(); + return; } - return; - } - - let pubkeyMap = null; - - if (type === 'pubkey') { - pubkeyMap = new Map(); - this.#pubkeyIndex.set(nodes[0].pubkey, pubkeyMap); + setImmediate(iterIndexes.bind(this, endIndex, callback)); } - nodes.forEach((node) => { - if (type === 'pubkey') pubkeyMap.set(node.ip, node); - - this.#endpointIndex.set(node.ip, node); + // Yield to the event queue here, this way we are only ever doing O(1000), + // instead of O(n). With around 13k nodes, this was taking on average 8ms. + // I.e. the event queue was blocked for 8ms. Now we yield. I was using a + // worker here, but overkill for what we are doing. + + return new Promise((resolve) => { + iterIndexes(0, () => { + this.#setIndexes(pubkeyIndex, endpointIndex); + console.log('pubkeyIndexSize:', pubkeyIndex.size); + console.log('endpointIndexSize:', endpointIndex.size); + resolve(); + }); }); } - #recreateIndexes() { - this.#pubkeyIndex.clear(); - this.#endpointIndex.clear(); - - this.#buildIndexes(this.#state); - } - reset() { // recreate objects so they can't be mutated externally this.#pubkeyIndex = new Map(); @@ -180,18 +169,19 @@ class NetworkStateManager extends EventEmitter { } async fetchNetworkState() { + console.log('fetching state'); this.#updatingState = true; // should use monotonic clock for any elapsed times const start = process.hrtime.bigint(); - let state = null; + let state = []; - while (!state) { + while (!state.length) { if (this.#controller.aborted) break; const fetchStart = process.hrtime.bigint(); // eslint-disable-next-line no-await-in-loop - const res = await this.stateFetcher().catch(() => null); + const res = await this.stateFetcher().catch(() => []); console.log('Fetch finished, elapsed ms:', Number(process.hrtime.bigint() - fetchStart) / 1000000); // eslint-disable-next-line no-await-in-loop @@ -202,16 +192,19 @@ class NetworkStateManager extends EventEmitter { if (state.length) { console.log('Nodes found:', state.length); - console.log('Setting state and indexes'); - this.#indexStart = process.hrtime.bigint(); + this.#state = state; if (!populated) this.emit('populated'); + console.log('Setting state and indexes'); + this.#indexStart = process.hrtime.bigint(); + if (this.useIndexWorker) { this.#indexWorker.postMessage(state); } else { - this.#recreateIndexes(); + // this.#recreateIndexes(); + await this.#buildIndexes(this.#state); console.log('Indexes created, elapsed ms:', Number(process.hrtime.bigint() - this.#indexStart) / 1000000); this.#indexStart = null; diff --git a/apiServer.js b/apiServer.js index d6ffa0dd2..5b52f717b 100644 --- a/apiServer.js +++ b/apiServer.js @@ -1,4 +1,4 @@ -global.userconfig = require('./config/userconfig'); +globalThis.userconfig = require('./config/userconfig'); if (typeof AbortController === 'undefined') { // polyfill for nodeJS 14.18.1 - without having to use experimental features From 16834621785fe42ad3e527446a35c996c190ef5c Mon Sep 17 00:00:00 2001 From: David White Date: Thu, 4 Jul 2024 09:57:15 +0100 Subject: [PATCH 10/16] mid update - commiting to switch branch --- ZelBack/src/services/networkStateService.js | 67 +---- .../src/services/utils/networkStateManager.js | 235 ++++++++++-------- helpers/repositories.json | 1 - package.json | 2 +- 4 files changed, 144 insertions(+), 161 deletions(-) diff --git a/ZelBack/src/services/networkStateService.js b/ZelBack/src/services/networkStateService.js index fea35daa8..ec278f8e9 100644 --- a/ZelBack/src/services/networkStateService.js +++ b/ZelBack/src/services/networkStateService.js @@ -1,13 +1,8 @@ -const fs = require('node:fs/promises'); -const path = require('node:path'); - const config = require('config'); const log = require('../lib/log'); const serviceHelper = require('./serviceHelper'); -const systemService = require('./systemService'); const verificationHelper = require('./verificationHelper'); const daemonServiceFluxnodeRpcs = require('./daemonService/daemonServiceFluxnodeRpcs'); -const daemonServiceUtils = require('./daemonService/daemonServiceUtils'); const networkStateManager = require('./utils/networkStateManager'); /** @@ -21,63 +16,15 @@ const networkStateManager = require('./utils/networkStateManager'); */ let stateManager = null; -async function enableZmq(zmqEndpoint) { - const fluxConfigDir = daemonServiceUtils.getConfigDir(); - const zmqEnabledPath = path.join(fluxConfigDir, '.zmqEnabled'); - - const exists = Boolean(await fs.stat(zmqEnabledPath).catch(() => false)); - - if (exists) return true; - - let parseError = false; - - try { - const { protocol, hostname, port } = new URL(zmqEndpoint); - - if (!protocol === 'tcp' || !hostname || !port) parseError = true; - } catch { - parseError = true; - } - - if (parseError) { - log.error(`Error parsing zmqEndpoint: ${zmqEndpoint}. Unable to start zmq publisher`); - return false; - } - - const topics = [ - 'zmqpubhashtx', - 'zmqpubhashblock', - 'zmqpubrawblock', - 'zmqpubrawtx', - 'zmqpubsequence', - ]; - - topics.forEach((topic, index) => { - const write = index + 1 === topics.length; - daemonServiceUtils.setConfigValue(topic, zmqEndpoint, { - write, - replace: true, - }); - }); - - await fs.writeFile(zmqEnabledPath, '').catch(() => { }); - - const { error: restartError } = await systemService.restartSystemdService('zelcash.service'); - - if (restartError) { - log.error('Error restarting zelcash.service after config update'); - return false; - } - - return true; -} - +/** + * Starts the network state service. It will either subscribe to the fluxd zmq + * endpoint if possible, or fallback to polling. + * @returns {void} + */ async function start() { - const { daemon: { zmqport } } = config ?? { daemon: { zmqport: 28332 } }; + const { daemon: { zmqport } } = config ?? { daemon: { zmqport: 16126 } }; - let zmqEndpoint = `tcp://127.0.0.1:${zmqport}`; - const zmqEnabled = await enableZmq(zmqEndpoint); - zmqEndpoint = zmqEnabled ? zmqEndpoint : null; + const zmqEndpoint = `tcp://127.0.0.1:${zmqport}`; return new Promise((resolve, reject) => { if (stateManager) resolve(); diff --git a/ZelBack/src/services/utils/networkStateManager.js b/ZelBack/src/services/utils/networkStateManager.js index 8496ebd21..d1c8a9149 100644 --- a/ZelBack/src/services/utils/networkStateManager.js +++ b/ZelBack/src/services/utils/networkStateManager.js @@ -1,13 +1,7 @@ -/** - * This should really subscribe to fluxd blocks using zmq. - * It doesn't make sense to poll every 2 minutes when you often get - * multiple blocks in a short timeframe. - */ - -const { Worker } = require('node:worker_threads'); +// const { Worker } = require('node:worker_threads'); const { EventEmitter } = require('node:events'); -const { FluxController } = require('./fluxController'); +const { FluxController } = require('../zelflux/ZelBack/src/services/utils/fluxController'); const zmq = require('zeromq'); @@ -40,44 +34,43 @@ class NetworkStateManager extends EventEmitter { #endpointIndex = new Map(); - #running = false; - #controller = new FluxController(); - #socket = new zmq.Subscriber({ - reconnectInterval: 500, - reconnectMaxInterval: 15_000, - heartbeatInterval: 20_000, - heartbeatTimeout: 60_000, - connectTimeout: 3_000, - tcpMaxRetransmitTimeout: 120_000, - }); + #socket = zmq.socket('sub'); #socketConnected = null; - #updatingState = false; + #indexStart = null; - #indexWorker = null; + #updatingState = false; - #indexStart = null; + /** + * @type { "polling" | "subscription" } + */ + #updateTrigger = 'subscription'; /** * * @param {()=>Promise>} stateFetcher - * @param {{}} options + * @param {{intervalMs?: number, zmqEndpoint?: string, fallbackTimeout?: number}} options */ constructor(stateFetcher, options = {}) { super(); this.stateFetcher = stateFetcher; - this.intervalMs = options.intervalMs || 0; + this.intervalMs = options.intervalMs || 120_000; this.zmqEndpoint = options.zmqEndpoint || null; + + if (!this.zmqEndpoint) this.#updateTrigger = 'polling'; + this.fallbackTimeout = options.fallbackTimeout || 120_000; - this.useIndexWorker = options.useIndexWorker || false; - if (this.useIndexWorker) { - this.#createIndexWorker(); - } + this.#socket.setsockopt(zmq.ZMQ_RECONNECT_IVL, 500); + this.#socket.setsockopt(zmq.ZMQ_RECONNECT_IVL_MAX, 15_000); + this.#socket.setsockopt(zmq.ZMQ_HEARTBEAT_IVL, 20_000); + this.#socket.setsockopt(zmq.ZMQ_HEARTBEAT_TIMEOUT, 60_000); + this.#socket.setsockopt(zmq.ZMQ_CONNECT_TIMEOUT, 3_000); + this.#socket.monitor(); } get state() { @@ -85,28 +78,18 @@ class NetworkStateManager extends EventEmitter { return this.#state; } - get running() { - return this.#running; + get updateTrigger() { + return this.#updateTrigger; } + /** + * Index map. Has to be a getter and not a field, as the field doesn't update + * the reference. + */ get #indexes() { return { pubkey: this.#pubkeyIndex, endpoint: this.#endpointIndex }; } - #createIndexWorker() { - this.#indexWorker = new Worker('./ZelBack/src/services/utils/networkStateWorker.js'); - this.#indexWorker.on('message', (indexes) => { - console.log('Indexes created, elapsed ms:', Number(process.hrtime.bigint() - this.#indexStart) / 1000000); - this.#indexStart = null; - - const { pubkeyIndex, endpointIndex } = indexes; - this.#setIndexes(pubkeyIndex, endpointIndex); - this.emit('updated'); - }); - this.#indexWorker.on('error', (err) => { console.log(err); }); - this.#indexWorker.on('exit', (code) => { console.log(code); }); - } - #setIndexes(pubkeyIndex, endpointIndex) { this.#pubkeyIndex = pubkeyIndex; this.#endpointIndex = endpointIndex; @@ -171,7 +154,7 @@ class NetworkStateManager extends EventEmitter { async fetchNetworkState() { console.log('fetching state'); this.#updatingState = true; - // should use monotonic clock for any elapsed times + // always use monotonic clock for any elapsed times const start = process.hrtime.bigint(); let state = []; @@ -181,7 +164,11 @@ class NetworkStateManager extends EventEmitter { const fetchStart = process.hrtime.bigint(); // eslint-disable-next-line no-await-in-loop - const res = await this.stateFetcher().catch(() => []); + const res = await this.stateFetcher().catch(() => { + console.log('state fetcher error'); + return []; + }); + console.log('Fetch finished, elapsed ms:', Number(process.hrtime.bigint() - fetchStart) / 1000000); // eslint-disable-next-line no-await-in-loop @@ -195,21 +182,15 @@ class NetworkStateManager extends EventEmitter { this.#state = state; - if (!populated) this.emit('populated'); - console.log('Setting state and indexes'); this.#indexStart = process.hrtime.bigint(); - if (this.useIndexWorker) { - this.#indexWorker.postMessage(state); - } else { - // this.#recreateIndexes(); - await this.#buildIndexes(this.#state); - console.log('Indexes created, elapsed ms:', Number(process.hrtime.bigint() - this.#indexStart) / 1000000); - this.#indexStart = null; + await this.#buildIndexes(this.#state); + console.log('Indexes created, elapsed ms:', Number(process.hrtime.bigint() - this.#indexStart) / 1000000); + this.#indexStart = null; - this.emit('updated'); - } + if (!populated) this.emit('populated'); + this.emit('updated'); } this.#updatingState = false; @@ -218,24 +199,89 @@ class NetworkStateManager extends EventEmitter { return this.intervalMs - elapsed; } - async #handleBlocks() { - await this.fetchNetworkState(); - // eslint-disable-next-line no-restricted-syntax - for await (const [topicBuf, msgBuf, seqBuf] of this.#socket) { - const topic = topicBuf.toString(); - const blockId = msgBuf.toString('hex'); - const seq = seqBuf.readUInt32LE(0); - console.log(`${topic}: ${blockId}, sequence: ${seq}`); - this.emit('block', blockId); - - if (this.#updatingState) this.#controller.abort(); - this.fetchNetworkState(); + #handleBlock(msgBuf, seqBuf) { + const blockId = msgBuf.toString('hex'); + const seq = seqBuf.readUInt32LE(0); + console.log(`hashblock: ${blockId}, sequence: ${seq}`); + + if (this.#updatingState) this.#controller.abort(); + this.fetchNetworkState(); + + // should we await fetch before emitting block? + this.emit('block', blockId); + } + + #handleMessage(topicBuf, msgBuf, seqBuf) { + const topic = topicBuf.toString(); + + switch (topic) { + case 'hashblock': + this.#handleBlock(msgBuf, seqBuf); + break; + default: + throw new Error(`Unknown topic: ${topic}`); } } - #startSubscription() { - this.#socket.events.on('connect', () => { + /** + * Test if the zmqendpoint is connectable. Takes about 8 seconds to fail + */ + async #probeZmqEndpoint() { + const maxRetries = 10; + let retryCounter = 0; + + const probe = zmq.socket('sub'); + + probe.setsockopt(zmq.ZMQ_RECONNECT_IVL, 200); + probe.setsockopt(zmq.ZMQ_RECONNECT_IVL_MAX, 1000); + probe.setsockopt(zmq.ZMQ_CONNECT_TIMEOUT, 3_000); + + probe.monitor(); + + return new Promise((resolve) => { + probe.on('connect', () => { + console.log('probe connected'); + probe.unmonitor(); + probe.close(); + resolve(true); + }); + + probe.on('connect_retry', (retryTimer) => { + retryCounter += 1; + + if (retryCounter < maxRetries) { + console.log('probe retrying ms:', retryTimer); + return; + } + + console.log('max retries hit, bailing'); + probe.unmonitor(); + probe.close(); + resolve(false); + }); + + probe.connect(this.zmqEndpoint); + }); + } + + async #startSubscription() { + console.log('start subscription'); + await this.fetchNetworkState(); + + const zmqEnabled = await this.#probeZmqEndpoint(); + + if (!zmqEnabled) { + console.log('zmq not enabled'); + this.#updateTrigger = 'polling'; + this.#startPolling(); + return; + } + + console.log('probe connected, setting up subscription'); + + this.#socket.on('connect', () => { if (this.#socketConnected === false) { + // State: previously connected, now reconnected. clearTimeout(this.fallbackTimer); this.fallbackTimer = null; this.#controller.stopLoop(); @@ -245,16 +291,16 @@ class NetworkStateManager extends EventEmitter { this.emit('zmqConnected'); }); - this.#socket.events.on('disconnect', () => { + this.#socket.on('disconnect', () => { this.#socketConnected = false; this.emit('zmqDisconnected'); this.fallbackTimer = setTimeout(() => this.#startPolling(), this.fallbackTimeout); }); + this.#socket.on('message', (...args) => this.#handleMessage(...args)); + this.#socket.connect(this.zmqEndpoint); this.#socket.subscribe('hashblock'); - - this.#handleBlocks(); } #startPolling() { @@ -275,59 +321,50 @@ class NetworkStateManager extends EventEmitter { } /** - * Find a node in the fluxnode list by either pubkey or endpoint, it - * first looks up the node locally, using O(1) from a Map; failing that, - * it goes out to the api and fetches it, and updates the local cache. - * - * @param {string} filter Pubkey or endpoint (ip:port) - * @param {"pubkey"|"endpoint"} type - * @returns - */ + * Find a node in the fluxnode network state by either pubkey or endpoint + * + * @param {string} filter Pubkey or endpoint (ip:port) + * @param {"pubkey"|"endpoint"} type + * @returns + */ search(filter, type) { if (!filter) return null; if (!Object.keys(this.#indexes).includes(type)) return null; const cached = this.#indexes[type].get(filter); - // if (cached) return cached; return cached || null; - - // this is a reference to the object in the LRU cache (daemonServiceutils), - // if you modify this object, you are actually modifying the object in the cache. - // Any LRU cache should not return a direct object, it should return a copy. - // const nodes = await this.stateFetcher(filter).catch(() => []); - - // this.#setIndexes(nodes, { filter, type }); - - // return this.#indexes[type].get(filter); } } -const daemonServiceFluxnodeRpcs = require('../daemonService/daemonServiceFluxnodeRpcs'); - async function main() { + // eslint-disable-next-line global-require + const daemonServiceFluxnodeRpcs = require('../zelflux/ZelBack/src/services/daemonService/daemonServiceFluxnodeRpcs'); + const fetcher = async (filter = null) => { - const options = { params: { filter }, query: { filter: null } }; + const options = { params: { useCache: false, filter }, query: { filter: null } }; const res = await daemonServiceFluxnodeRpcs.viewDeterministicFluxNodeList(options); + console.log(res); if (res.status === 'success') { return res.data; } + console.log('fetcher not success'); return []; }; - const network = new NetworkStateManager(fetcher, { intervalMs: 120_000 }); + const network = new NetworkStateManager(fetcher, { intervalMs: 120_000, zmqEndpoint: 'tcp://127.0.0.1:28332' }); network.on('updated', () => { console.log('received updated event'); }); - network.on('populated', async () => { + network.on('populated', () => { console.log('received populated event'); - console.log(await network.search('212.71.244.159:16137', 'endpoint')); + console.log('Search result populated:', network.search('212.71.244.159:16137', 'endpoint')); }); - // network.start(); + network.start(); setInterval(async () => { // await network.search('212.71.244.159:16137', 'endpoint'); - console.log(await network.search('0404bccaf5d3108439b4897697bf7ce4d045950264e118596e31cc579028a7f808870d6ac59b9c00412d2f354610a9d18b47db80b08ba6536f0ae093c08a3aaccb', 'pubkey')); + console.log('Search pubkey:', network.search('045ae66321cfc172086d79252323b6cd4b83460e580e88f220582affda8a83b3ec68078ad80f7e465c42c3ef9bc01b912b3663e2ba09057bc43fbedf0afa9f3864', 'pubkey')); }, 5_000); } diff --git a/helpers/repositories.json b/helpers/repositories.json index 78a1f9b08..392e14079 100644 --- a/helpers/repositories.json +++ b/helpers/repositories.json @@ -453,7 +453,6 @@ "ghcr.io/marctheshark3/sigmanaut-mining-pool-ui", "garyotto", "4haiko/dante-socks5-proxy", - "nyusternie", "jepbura/gaganode", "nyusternie", "moultor", diff --git a/package.json b/package.json index 6955c72e4..625d474a0 100644 --- a/package.json +++ b/package.json @@ -123,7 +123,7 @@ "ws": "~7.5.9", "xterm": "~5.1.0", "zeltrezjs": "~2.12.0", - "zeromq": "^6.0.0-beta.20" + "zeromq": "~5.3.1" }, "devDependencies": { "@babel/core": "~7.23.6", From 82ab922b55a4dd839dd08cc19c5c7e5579e3026f Mon Sep 17 00:00:00 2001 From: David White Date: Thu, 4 Jul 2024 12:04:46 +0100 Subject: [PATCH 11/16] Wait for index to complete on search --- .../src/services/utils/networkStateManager.js | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/ZelBack/src/services/utils/networkStateManager.js b/ZelBack/src/services/utils/networkStateManager.js index d1c8a9149..d7b6dd4c4 100644 --- a/ZelBack/src/services/utils/networkStateManager.js +++ b/ZelBack/src/services/utils/networkStateManager.js @@ -82,6 +82,10 @@ class NetworkStateManager extends EventEmitter { return this.#updateTrigger; } + get indexesReady() { + return this.#controller.lock.ready; + } + /** * Index map. Has to be a getter and not a field, as the field doesn't update * the reference. @@ -104,6 +108,10 @@ class NetworkStateManager extends EventEmitter { // this.#endpointIndex.set(node.ip, node); // }); + // if we are building an index already, just wait for it to finish. + // maybe look at cancelling it in future. + await this.#controller.lock.enable(); + const nodeCount = nodes.length; const pubkeyIndex = new Map(); @@ -139,6 +147,7 @@ class NetworkStateManager extends EventEmitter { this.#setIndexes(pubkeyIndex, endpointIndex); console.log('pubkeyIndexSize:', pubkeyIndex.size); console.log('endpointIndexSize:', endpointIndex.size); + this.#controller.lock.disable(); resolve(); }); }); @@ -327,10 +336,14 @@ class NetworkStateManager extends EventEmitter { * @param {"pubkey"|"endpoint"} type * @returns */ - search(filter, type) { + async search(filter, type) { if (!filter) return null; if (!Object.keys(this.#indexes).includes(type)) return null; + // if we are mid stroke indexing, may as well wait the 10ms (max) and get the + // latest block + await this.indexesReady; + const cached = this.#indexes[type].get(filter); return cached || null; } @@ -345,7 +358,6 @@ async function main() { const res = await daemonServiceFluxnodeRpcs.viewDeterministicFluxNodeList(options); - console.log(res); if (res.status === 'success') { return res.data; } @@ -357,14 +369,14 @@ async function main() { network.on('updated', () => { console.log('received updated event'); }); - network.on('populated', () => { + network.on('populated', async () => { console.log('received populated event'); - console.log('Search result populated:', network.search('212.71.244.159:16137', 'endpoint')); + console.log('Search result populated:', await network.search('212.71.244.159:16137', 'endpoint')); }); network.start(); setInterval(async () => { // await network.search('212.71.244.159:16137', 'endpoint'); - console.log('Search pubkey:', network.search('045ae66321cfc172086d79252323b6cd4b83460e580e88f220582affda8a83b3ec68078ad80f7e465c42c3ef9bc01b912b3663e2ba09057bc43fbedf0afa9f3864', 'pubkey')); + console.log('Search pubkey:', await network.search('045ae66321cfc172086d79252323b6cd4b83460e580e88f220582affda8a83b3ec68078ad80f7e465c42c3ef9bc01b912b3663e2ba09057bc43fbedf0afa9f3864', 'pubkey')); }, 5_000); } From 3ab451a0dd443738dc1950a1db46faa8f0507f75 Mon Sep 17 00:00:00 2001 From: David White Date: Thu, 4 Jul 2024 12:10:35 +0100 Subject: [PATCH 12/16] Fix paths --- ZelBack/src/services/utils/networkStateManager.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ZelBack/src/services/utils/networkStateManager.js b/ZelBack/src/services/utils/networkStateManager.js index d7b6dd4c4..3a01b84bc 100644 --- a/ZelBack/src/services/utils/networkStateManager.js +++ b/ZelBack/src/services/utils/networkStateManager.js @@ -1,7 +1,9 @@ // const { Worker } = require('node:worker_threads'); const { EventEmitter } = require('node:events'); -const { FluxController } = require('../zelflux/ZelBack/src/services/utils/fluxController'); +// const { FluxController } = require('../zelflux/ZelBack/src/services/utils/fluxController'); + +const { FluxController } = require('./fluxController'); const zmq = require('zeromq'); From 1eabcd00082c1b92341260e7284c7ea17d1bb7eb Mon Sep 17 00:00:00 2001 From: David White Date: Thu, 4 Jul 2024 12:51:31 +0100 Subject: [PATCH 13/16] Dirty fix for broken fluxnodelist (don't connect if no ip) --- ZelBack/src/services/fluxCommunication.js | 4 +-- .../src/services/utils/networkStateManager.js | 25 ++++++++++++++++++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/ZelBack/src/services/fluxCommunication.js b/ZelBack/src/services/fluxCommunication.js index c65e7a903..c036b454e 100644 --- a/ZelBack/src/services/fluxCommunication.js +++ b/ZelBack/src/services/fluxCommunication.js @@ -916,7 +916,7 @@ async function fluxDiscovery() { // additional precaution const clientExists = outgoingConnections.find((client) => client.ip === ipInc && client.port === portInc); const clientIncomingExists = incomingConnections.find((client) => client.ip === ipInc && client.port === portInc); - if (!clientExists && !clientIncomingExists) { + if (ipInc && !clientExists && !clientIncomingExists) { deterministicPeerConnections = true; // eslint-disable-next-line no-await-in-loop await serviceHelper.axiosGet(`http://${ipInc}:${portInc}/flux/addoutgoingpeer/${myIP}`).catch((error) => log.error(error)); @@ -939,7 +939,7 @@ async function fluxDiscovery() { const sameConnectedIp = currentIpsConnTried.find((connectedIP) => connectedIP === ipInc); const clientExists = outgoingConnections.find((client) => client.ip === ipInc && client.port === portInc); const clientIncomingExists = incomingConnections.find((client) => client.ip === ipInc && client.port === portInc); - if (!sameConnectedIp && !clientExists && !clientIncomingExists) { + if (ipInc && !sameConnectedIp && !clientExists && !clientIncomingExists) { log.info(`Adding random Flux peer: ${connection}`); currentIpsConnTried.push(connection); initiateAndHandleConnection(connection); diff --git a/ZelBack/src/services/utils/networkStateManager.js b/ZelBack/src/services/utils/networkStateManager.js index 3a01b84bc..6549f6ddb 100644 --- a/ZelBack/src/services/utils/networkStateManager.js +++ b/ZelBack/src/services/utils/networkStateManager.js @@ -353,7 +353,7 @@ class NetworkStateManager extends EventEmitter { async function main() { // eslint-disable-next-line global-require - const daemonServiceFluxnodeRpcs = require('../zelflux/ZelBack/src/services/daemonService/daemonServiceFluxnodeRpcs'); + const daemonServiceFluxnodeRpcs = require('../daemonService/daemonServiceFluxnodeRpcs'); const fetcher = async (filter = null) => { const options = { params: { useCache: false, filter }, query: { filter: null } }; @@ -387,3 +387,26 @@ if (require.main === module) { } module.exports = { NetworkStateManager }; + +// interesting stuff: + +// 6 nodes with no ip address + +// ~ 420ms fetch time (on localhost) ~ 8.2Mb i/o. Not sure if this is time for fluxd +// to generate the list, or for the actual i/o on localhost. + +// ~ 20ms to build cache. This was 8ms under no load, so obviously, yielding +// to the event queue is a good thing as there is other work to be done. + +// if we need to search... we wait for indexes. What about if fetching? +// do we try for a search without waiting, then if a cache miss, we wait for +// the search to finish? + +// fetching state +// Fetch finished, elapsed ms: 418.639369 +// Nodes found: 13047 +// Setting state and indexes +// pubkeyIndexSize: 3011 +// endpointIndexSize: 13041 +// Indexes created, elapsed ms: 18.25089 +// New Flux App Removed message received. From fb4280747c13b99191b15ad69dc20a579e07fd79 Mon Sep 17 00:00:00 2001 From: David White Date: Thu, 4 Jul 2024 13:10:13 +0100 Subject: [PATCH 14/16] Add missing vars --- ZelBack/src/services/fluxCommunication.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ZelBack/src/services/fluxCommunication.js b/ZelBack/src/services/fluxCommunication.js index c036b454e..dec23b02e 100644 --- a/ZelBack/src/services/fluxCommunication.js +++ b/ZelBack/src/services/fluxCommunication.js @@ -900,7 +900,7 @@ async function fluxDiscovery() { // additional precaution const clientExists = outgoingConnections.find((client) => client.ip === ipInc && client.port === portInc); const clientIncomingExists = incomingConnections.find((client) => client.ip === ipInc && client.port === portInc); - if (!clientExists && !clientIncomingExists) { + if (ipInc && !clientExists && !clientIncomingExists) { deterministicPeerConnections = true; initiateAndHandleConnection(ip); // eslint-disable-next-line no-await-in-loop @@ -960,7 +960,7 @@ async function fluxDiscovery() { const sameConnectedIp = currentIpsConnTried.find((connectedIP) => connectedIP === ipInc); const clientExists = outgoingConnections.find((client) => client.ip === ipInc && client.port === portInc); const clientIncomingExists = incomingConnections.find((client) => client.ip === ipInc && client.port === portInc); - if (!sameConnectedIp && !clientExists && !clientIncomingExists) { + if (ipInc && !sameConnectedIp && !clientExists && !clientIncomingExists) { log.info(`Asking random Flux ${connection} to add us as a peer`); currentIpsConnTried.push(connection); // eslint-disable-next-line no-await-in-loop From 99bee196d1f43a50fd64d578ed6f8df180b74ed0 Mon Sep 17 00:00:00 2001 From: David White Date: Thu, 4 Jul 2024 13:30:08 +0100 Subject: [PATCH 15/16] Remove code that is in another pull now --- .../daemonService/daemonServiceUtils.js | 26 ------------------- .../src/services/utils/networkStateWorker.js | 24 ----------------- 2 files changed, 50 deletions(-) delete mode 100644 ZelBack/src/services/utils/networkStateWorker.js diff --git a/ZelBack/src/services/daemonService/daemonServiceUtils.js b/ZelBack/src/services/daemonService/daemonServiceUtils.js index 7b53b06db..754f363a4 100644 --- a/ZelBack/src/services/daemonService/daemonServiceUtils.js +++ b/ZelBack/src/services/daemonService/daemonServiceUtils.js @@ -179,35 +179,9 @@ function getConfigValue(parameter) { return value; } -/** - * To set a value for a specified key from the configuration file. - * @param {string} parameter Config key. - * @param {string} value Config key value. - * @param {{write?: boolean, replace?: boolean}} options - * @returns {string} Config value. - */ -function setConfigValue(parameter, value, options = {}) { - const write = options.write || false; - const replace = options.replace || false; - - fnconfig.set(parameter, value, replace); - if (write) fnconfig.write(); -} - -/** - * To get the fluxd config dir. - * @returns {string} Config directory path. - */ -function getConfigDir() { - const value = fnconfig.defaultFolderPath; - return value; -} - module.exports = { executeCall, - getConfigDir, getConfigValue, - setConfigValue, // exports for testing purposes setStandardCache, diff --git a/ZelBack/src/services/utils/networkStateWorker.js b/ZelBack/src/services/utils/networkStateWorker.js deleted file mode 100644 index 1e41a22cd..000000000 --- a/ZelBack/src/services/utils/networkStateWorker.js +++ /dev/null @@ -1,24 +0,0 @@ -const { parentPort } = require('node:worker_threads'); - -parentPort.on('message', (nodes) => { - const pubkeyIndex = new Map(); - const endpointIndex = new Map(); - - nodes.forEach((node) => { - const nodesByPubkey = pubkeyIndex.get(node.pubkey) - || pubkeyIndex.set(node.pubkey, new Map()).get(node.pubkey); - - nodesByPubkey.set(node.ip, node); - endpointIndex.set(node.ip, node); - }); - - parentPort.postMessage({ pubkeyIndex, endpointIndex }); -}); - -// nodes.forEach((node) => { -// if (!this.#pubkeyIndex.has(node.pubkey)) { -// this.#pubkeyIndex.set(node.pubkey, new Map()); -// } -// this.#pubkeyIndex.get(node.pubkey).set(node.ip, node); -// this.#endpointIndex.set(node.ip, node); -// }); From b5143fd42abab54362d385a1b35b5b2b6118a67e Mon Sep 17 00:00:00 2001 From: David White Date: Thu, 4 Jul 2024 13:32:33 +0100 Subject: [PATCH 16/16] Revert fullnode changes (in another pull) --- lib/fullnode/index.js | 48 +++++++++++++++---------------------------- 1 file changed, 16 insertions(+), 32 deletions(-) diff --git a/lib/fullnode/index.js b/lib/fullnode/index.js index c9f8a6c64..51150e486 100644 --- a/lib/fullnode/index.js +++ b/lib/fullnode/index.js @@ -60,18 +60,23 @@ function Config() { this.defaultConfigPath = confFilePath; this.defaultFolderPath = folderPath; - this.lines = fs.readFileSync(confFilePath, 'utf-8').split('\n').map((line) => line.trim()); - + // Read all lines from config file, remove commented lines and empty lines. + this.filteredlines = fs.readFileSync(confFilePath, 'utf-8').split('\n').filter((line) => line.trim() && !line.trim().startsWith('#')); // Create a dictionary from keys and values - this.keysvalues = this.lines.reduce((map, line) => { - const [key, value] = line.split(/=(.*)/, 2); - - if (!key && !value) return map; - - if (!(key in map)) map[key] = value; - else if (typeof map[key] === 'string') map[key] = [map[key], value]; - else map[key].push(value); - + // Maximum of 6 = signs can be in config line + this.keysvalues = this.filteredlines.reduce((map, line) => { + const sp = line.split('=', 6); + if (sp.length === 2) { + map[sp[0].trim()] = sp[1].trim(); + } else if (sp.length === 3) { + map[sp[0].trim()] = `${sp[1].trim()}=${sp[2].trim()}`; + } else if (sp.length === 4) { + map[sp[0].trim()] = `${sp[1].trim()}=${sp[2].trim()}=${sp[3].trim()}`; + } else if (sp.length === 5) { + map[sp[0].trim()] = `${sp[1].trim()}=${sp[2].trim()}=${sp[3].trim()}=${sp[4].trim()}`; + } else if (sp.length === 6) { + map[sp[0].trim()] = `${sp[1].trim()}=${sp[2].trim()}=${sp[3].trim()}=${sp[4].trim()}=${sp[5].trim()}`; + } return map; }, {}); } @@ -85,27 +90,6 @@ Config.prototype.defaultConfig = function () { Config.prototype.get = function (name) { return this.keysvalues[name]; }; -Config.prototype.set = function (key, value, replace = false) { - // strip whitespace? - const map = this.keysvalues; - if (replace || !(key in map)) map[key] = value; - else if (typeof map[key] === 'string') map[key] = [map[key], value]; - else { - const index = map[key].indexOf(value); - if (index === -1) map[key].push(value); - } -}; -Config.prototype.write = function () { - const lines = Object.entries(this.keysvalues).flatMap((entry) => { - const [key, value] = entry; - - if (typeof value === 'string') return `${key}=${value}`; - return value.map((item) => `${key}=${item}`); - }); - - const fileData = `${lines.join('\n')}\n`; - fs.writeFileSync(this.defaultConfigPath, fileData); -}; Config.prototype.rpcuser = function () { return this.keysvalues.rpcuser; };