diff --git a/ZelBack/config/default.js b/ZelBack/config/default.js index 7fcee3da1..922584a65 100644 --- a/ZelBack/config/default.js +++ b/ZelBack/config/default.js @@ -74,6 +74,7 @@ module.exports = { rpcport: 16124, porttestnet: 26125, rpcporttestnet: 26124, + zmqport: 28332, }, minimumFluxBenchAllowedVersion: '4.0.0', minimumFluxOSAllowedVersion: '5.4.0', diff --git a/ZelBack/src/services/appsService.js b/ZelBack/src/services/appsService.js index 17320ca3b..1eeb85843 100644 --- a/ZelBack/src/services/appsService.js +++ b/ZelBack/src/services/appsService.js @@ -11817,8 +11817,7 @@ async function checkMyAppsAvailability() { throw err.message; }); await serviceHelper.delay(10 * 1000); - // eslint-disable-next-line no-await-in-loop - let askingIP = await fluxNetworkHelper.getRandomConnection(); + let askingIP = fluxNetworkHelper.getRandomConnection(); if (!askingIP) { checkMyAppsAvailability(); return; @@ -11993,11 +11992,9 @@ async function checkInstallingAppPortAvailable(portsToTest = []) { }); } await serviceHelper.delay(10 * 1000); - // eslint-disable-next-line no-await-in-loop - let askingIP = await fluxNetworkHelper.getRandomConnection(); + let askingIP = fluxNetworkHelper.getRandomConnection(); while (!askingIP || askingIP.split(':')[0] === myIP) { - // eslint-disable-next-line no-await-in-loop - askingIP = await fluxNetworkHelper.getRandomConnection(); + askingIP = fluxNetworkHelper.getRandomConnection(); } let askingIpPort = config.server.apiport; if (askingIP.includes(':')) { // has port specification diff --git a/ZelBack/src/services/daemonService/daemonServiceFluxnodeRpcs.js b/ZelBack/src/services/daemonService/daemonServiceFluxnodeRpcs.js index c5c8a4d30..cff70a74e 100644 --- a/ZelBack/src/services/daemonService/daemonServiceFluxnodeRpcs.js +++ b/ZelBack/src/services/daemonService/daemonServiceFluxnodeRpcs.js @@ -207,13 +207,15 @@ async function startFluxNode(req, res) { async function viewDeterministicFluxNodeList(req, res) { let { filter } = req.params; filter = filter || req.query.filter; + const useCache = req.params.useCache ?? true; + const rpccall = 'viewdeterministiczelnodelist'; // viewdeterministicfluxnodelist const rpcparameters = []; if (filter) { rpcparameters.push(filter); } - response = await daemonServiceUtils.executeCall(rpccall, rpcparameters); + response = await daemonServiceUtils.executeCall(rpccall, rpcparameters, { useCache }); return res ? res.json(response) : response; } diff --git a/ZelBack/src/services/daemonService/daemonServiceUtils.js b/ZelBack/src/services/daemonService/daemonServiceUtils.js index d2180a0fa..754f363a4 100644 --- a/ZelBack/src/services/daemonService/daemonServiceUtils.js +++ b/ZelBack/src/services/daemonService/daemonServiceUtils.js @@ -39,7 +39,9 @@ let daemonCallRunning = false; * @param {string[]} params RPC parameters. * @returns {object} Message. */ -async function executeCall(rpc, params) { +async function executeCall(rpc, params, options = {}) { + const useCache = options.useCache ?? true; + const rpcparameters = params || []; try { let data; @@ -71,11 +73,11 @@ async function executeCall(rpc, params) { const randomDelay = Math.floor((Math.random() * 25)) + 10; await serviceHelper.delay(randomDelay); } - if (rpc === 'getBlock') { + if (useCache && rpc === 'getBlock') { data = blockCache.get(rpc + serviceHelper.ensureString(rpcparameters)); - } else if (rpc === 'getRawTransaction') { + } else if (useCache && rpc === 'getRawTransaction') { data = rawTxCache.get(rpc + serviceHelper.ensureString(rpcparameters)); - } else { + } else if (useCache) { data = cache.get(rpc + serviceHelper.ensureString(rpcparameters)); } if (!data) { diff --git a/ZelBack/src/services/enterpriseNodesService.js b/ZelBack/src/services/enterpriseNodesService.js index 83a0c55ae..d3d9545d6 100644 --- a/ZelBack/src/services/enterpriseNodesService.js +++ b/ZelBack/src/services/enterpriseNodesService.js @@ -1,5 +1,5 @@ const config = require('config'); -const fluxCommunicationUtils = require('./fluxCommunicationUtils'); +const networkStateService = require('./networkStateService'); const messageHelper = require('./messageHelper'); const dbHelper = require('./dbHelper'); const log = require('../lib/log'); @@ -12,7 +12,7 @@ const globalAppsInformation = config.database.appsglobal.collections.appsInforma */ async function getEnterpriseList() { try { - const nodeList = await fluxCommunicationUtils.deterministicFluxList(); + const nodeList = networkStateService.networkState(); const enterpriseList = []; // txhash, outidx, pubkey, score, ip, payment_address?, tier, // user collateralization, 200k in flux nodes is most trusted, get 500 points // 200k flux in nodes, is most trusted, 5 * 40, 200 * 1, get 500 points diff --git a/ZelBack/src/services/fluxCommunication.js b/ZelBack/src/services/fluxCommunication.js index 884e5819c..dec23b02e 100644 --- a/ZelBack/src/services/fluxCommunication.js +++ b/ZelBack/src/services/fluxCommunication.js @@ -8,7 +8,7 @@ const serviceHelper = require('./serviceHelper'); const verificationHelper = require('./verificationHelper'); const daemonServiceMiscRpcs = require('./daemonService/daemonServiceMiscRpcs'); const fluxCommunicationMessagesSender = require('./fluxCommunicationMessagesSender'); -const fluxCommunicationUtils = require('./fluxCommunicationUtils'); +const networkStateService = require('./networkStateService'); const fluxNetworkHelper = require('./fluxNetworkHelper'); const messageHelper = require('./messageHelper'); const { @@ -71,7 +71,8 @@ async function handleAppMessages(message, fromIP, port) { // eslint-disable-next-line global-require const appsService = require('./appsService'); const rebroadcastToPeers = await appsService.storeAppTemporaryMessage(message.data, true); - if (rebroadcastToPeers === true) { + + if (rebroadcastToPeers) { const messageString = serviceHelper.ensureString(message); const wsListOut = []; outgoingConnections.forEach((client) => { @@ -112,30 +113,33 @@ async function handleAppRunningMessage(message, fromIP, port) { // eslint-disable-next-line global-require const appsService = require('./appsService'); const rebroadcastToPeers = await appsService.storeAppRunningMessage(message.data); - const currentTimeStamp = Date.now(); - const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(message, currentTimeStamp, 240000); - if (rebroadcastToPeers === true && timestampOK) { - const messageString = serviceHelper.ensureString(message); - const wsListOut = []; - outgoingConnections.forEach((client) => { - if (client.ip === fromIP && client.port === port) { - // do not broadcast to this peer - } else { - wsListOut.push(client); - } - }); - fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); - await serviceHelper.delay(500); - const wsList = []; - incomingConnections.forEach((client) => { - if (client.ip === fromIP && client.port === port) { - // do not broadcast to this peer - } else { - wsList.push(client); - } - }); - fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); - } + + if (!rebroadcastToPeers) return; + + const stale = networkStateService.isBroadcastStale(message, 240_000); + + if (stale) return; + + const messageString = serviceHelper.ensureString(message); + const wsListOut = []; + outgoingConnections.forEach((client) => { + if (client.ip === fromIP && client.port === port) { + // do not broadcast to this peer + } else { + wsListOut.push(client); + } + }); + fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); + await serviceHelper.delay(500); + const wsList = []; + incomingConnections.forEach((client) => { + if (client.ip === fromIP && client.port === port) { + // do not broadcast to this peer + } else { + wsList.push(client); + } + }); + fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); } catch (error) { log.error(error); } @@ -154,30 +158,33 @@ async function handleIPChangedMessage(message, fromIP, port) { // eslint-disable-next-line global-require const appsService = require('./appsService'); const rebroadcastToPeers = await appsService.storeIPChangedMessage(message.data); - const currentTimeStamp = Date.now(); - const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(message, currentTimeStamp, 240000); - if (rebroadcastToPeers && timestampOK) { - const messageString = serviceHelper.ensureString(message); - const wsListOut = []; - outgoingConnections.forEach((client) => { - if (client.ip === fromIP && client.port === port) { - // do not broadcast to this peer - } else { - wsListOut.push(client); - } - }); - fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); - await serviceHelper.delay(500); - const wsList = []; - incomingConnections.forEach((client) => { - if (client.ip === fromIP && client.port === port) { - // do not broadcast to this peer - } else { - wsList.push(client); - } - }); - fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); - } + + if (!rebroadcastToPeers) return; + + const stale = networkStateService.isBroadcastStale(message, 240_000); + + if (stale) return; + + const messageString = serviceHelper.ensureString(message); + const wsListOut = []; + outgoingConnections.forEach((client) => { + if (client.ip === fromIP && client.port === port) { + // do not broadcast to this peer + } else { + wsListOut.push(client); + } + }); + fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); + await serviceHelper.delay(500); + const wsList = []; + incomingConnections.forEach((client) => { + if (client.ip === fromIP && client.port === port) { + // do not broadcast to this peer + } else { + wsList.push(client); + } + }); + fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); } catch (error) { log.error(error); } @@ -196,30 +203,33 @@ async function handleAppRemovedMessage(message, fromIP, port) { // eslint-disable-next-line global-require const appsService = require('./appsService'); const rebroadcastToPeers = await appsService.storeAppRemovedMessage(message.data); - const currentTimeStamp = Date.now(); - const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(message, currentTimeStamp, 240000); - if (rebroadcastToPeers && timestampOK) { - const messageString = serviceHelper.ensureString(message); - const wsListOut = []; - outgoingConnections.forEach((client) => { - if (client.ip === fromIP && client.port === port) { - // do not broadcast to this peer - } else { - wsListOut.push(client); - } - }); - fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); - await serviceHelper.delay(500); - const wsList = []; - incomingConnections.forEach((client) => { - if (client.ip === fromIP && client.port === port) { - // do not broadcast to this peer - } else { - wsList.push(client); - } - }); - fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); - } + + if (!rebroadcastToPeers) return; + + const stale = networkStateService.isBroadcastStale(message, 240_000); + + if (stale) return; + + const messageString = serviceHelper.ensureString(message); + const wsListOut = []; + outgoingConnections.forEach((client) => { + if (client.ip === fromIP && client.port === port) { + // do not broadcast to this peer + } else { + wsListOut.push(client); + } + }); + fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); + await serviceHelper.delay(500); + const wsList = []; + incomingConnections.forEach((client) => { + if (client.ip === fromIP && client.port === port) { + // do not broadcast to this peer + } else { + wsList.push(client); + } + }); + fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); } catch (error) { log.error(error); } @@ -302,11 +312,10 @@ function handleIncomingConnection(websocket, req) { } */ const msgObj = serviceHelper.ensureObject(msg.data); - const { pubKey } = msgObj; - const { timestamp } = msgObj; - const { signature } = msgObj; - const { version } = msgObj; - const { data } = msgObj; + const { + pubKey, timestamp, signature, version, data, + } = msgObj; + if (!pubKey || !timestamp || !signature || !version || !data) { try { log.info(`Invalid received from incoming peer ${peer.ip}:${peer.port}. Closing incoming connection`); @@ -318,19 +327,19 @@ function handleIncomingConnection(websocket, req) { } // check if we have the message in cache. If yes, return false. If not, store it and continue - await serviceHelper.delay(Math.floor(Math.random() * 75 + 1)); // await max 75 miliseconds random, should jelp on processing duplicated messages received at same timestamp + // await max 75 miliseconds random, should help on processing duplicated messages received at same timestamp + await serviceHelper.delay(Math.floor(Math.random() * 75 + 1)); + const messageHash = hash(msgObj.data); if (myCacheTemp.has(messageHash)) { return; } + myCacheTemp.set(messageHash, messageHash); - // check rate limit + const rateOK = fluxNetworkHelper.lruRateLimit(`${ipv4Peer}:${port}`, 90); - if (!rateOK) { - return; // do not react to the message - } + if (!rateOK) return; - // check blocked list if (blockedPubKeysCache.has(pubKey)) { try { log.info('Closing incoming connection, peer is on blockedList'); @@ -340,39 +349,19 @@ function handleIncomingConnection(websocket, req) { } return; } - const currentTimeStamp = Date.now(); - const messageOK = await fluxCommunicationUtils.verifyFluxBroadcast(msgObj, undefined, currentTimeStamp); - if (messageOK === true) { - const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(msgObj, currentTimeStamp); - if (timestampOK === true) { - try { - if (msgObj.data.type === 'zelappregister' || msgObj.data.type === 'zelappupdate' || msgObj.data.type === 'fluxappregister' || msgObj.data.type === 'fluxappupdate') { - handleAppMessages(msgObj, peer.ip, peer.port); - } else if (msgObj.data.type === 'fluxapprequest') { - fluxCommunicationMessagesSender.respondWithAppMessage(msgObj, ws); - } else if (msgObj.data.type === 'fluxapprunning') { - handleAppRunningMessage(msgObj, peer.ip, peer.port); - } else if (msgObj.data.type === 'fluxipchanged') { - handleIPChangedMessage(msgObj, peer.ip, peer.port); - } else if (msgObj.data.type === 'fluxappremoved') { - handleAppRemovedMessage(msgObj, peer.ip, peer.port); - } else { - log.warn(`Unrecognised message type of ${msgObj.data.type}`); - } - } catch (e) { - log.error(e); - } - } - } else { + + const messageOk = await networkStateService.verifyBroadcast(msgObj); + + if (!messageOk) { // we dont like this peer as it sent wrong message (wrong, or message belonging to node no longer on network). Lets close the connection // and add him to blocklist try { // check if message comes from IP belonging to the public Key - let zl = await fluxCommunicationUtils.deterministicFluxList(pubKey); // this itself is sufficient. + let zl = await networkStateService.getFluxnodesByPubkey(pubKey); // this itself is sufficient. let nodeFound = zl.find((n) => n.ip.split(':')[0] === peer.ip && (n.ip.split(':')[1] || 16127) === peer.port); if (!nodeFound) { // check if message comes from IP belonging to the public Key - zl = await fluxCommunicationUtils.deterministicFluxList(); // this itself is sufficient. + zl = networkStateService.networkState(); // this itself is sufficient. const possibleNodes = zl.filter((key) => key.pubkey === pubKey); // another check in case sufficient check failed on daemon level nodeFound = possibleNodes.find((n) => n.ip.split(':')[0] === peer.ip && (n.ip.split(':')[1] || 16127) === peer.port); if (!nodeFound) { @@ -391,8 +380,52 @@ function handleIncomingConnection(websocket, req) { } catch (e) { log.error(e); } + return; + } + + const stale = networkStateService.isBroadcastStale(msgObj); + + // I'm not really sure of the intent here. It seems like if a message is older + // than 5 minutes, we silently drop it. + + // Then, if it's a register or update message, we rebroadcast regardless. + + // However, if it's a running, changed or removed message, if it is older than + // 4 minutes, we silently drop it. + + // The reason I don't like this is we are checking the timestamp twice. Seems like + // each handler should just handle the timestamp itself. The running, changes and removed + // handlers are all basically the same anyway. The only difference is the store procedure called. + + // The store should be on the db, not on appsService. I'm assuming you were getting circular imports, + // hence the local require; if we moved store procedures to db, it wouldn't be a problem. + + if (stale) return; + + // pretty sure we can remove the zel ones, not used anymore + const appMessageTypes = ['fluxappregister', 'fluxappupdate', 'zelappupdate', 'zelappregister']; + + const { type: msgType } = data; + + try { + if (appMessageTypes.includes(msgType)) { + handleAppMessages(msgObj, peer.ip, peer.port); + } else if (msgType === 'fluxapprequest') { + fluxCommunicationMessagesSender.respondWithAppMessage(msgObj, ws); + } else if (msgType === 'fluxapprunning') { + handleAppRunningMessage(msgObj, peer.ip, peer.port); + } else if (msgType === 'fluxipchanged') { + handleIPChangedMessage(msgObj, peer.ip, peer.port); + } else if (msgType === 'fluxappremoved') { + handleAppRemovedMessage(msgObj, peer.ip, peer.port); + } else { + log.warn(`Unrecognised message type of ${msgType}`); + } + } catch (e) { + log.error(e); } }; + ws.onclose = (msg) => { const { ip } = ws; log.info(`Incoming connection to ${ip}:${port} closed with code ${msg.code}`); @@ -613,11 +646,10 @@ async function initiateAndHandleConnection(connection) { messageNumber = 0; } */ const msgObj = serviceHelper.ensureObject(evt.data); - const { pubKey } = msgObj; - const { timestamp } = msgObj; - const { signature } = msgObj; - const { version } = msgObj; - const { data } = msgObj; + const { + pubKey, timestamp, signature, version, data, + } = msgObj; + if (!pubKey || !timestamp || !signature || !version || !data) { try { log.info(`Invalid received from outgoing peer ${ip}:${port}. Closing outgoing connection`); @@ -627,21 +659,20 @@ async function initiateAndHandleConnection(connection) { } return; } + // check if we have the message in cache. If yes, return false. If not, store it and continue await serviceHelper.delay(Math.floor(Math.random() * 75 + 1)); // await max 75 miliseconds random, should help processing duplicated messages received at same timestamp + const messageHash = hash(msgObj.data); if (myCacheTemp.has(messageHash)) { return; } + myCacheTemp.set(messageHash, messageHash); - // incoming messages from outgoing connections - const currentTimeStamp = Date.now(); // ms - // check rate limit + const rateOK = fluxNetworkHelper.lruRateLimit(`${ip}:${port}`, 90); - if (!rateOK) { - return; // do not react to the message - } - // check blocked list + if (!rateOK) return; + if (blockedPubKeysCache.has(pubKey)) { try { log.info('Closing outgoing connection, peer is on blockedList'); @@ -651,27 +682,19 @@ async function initiateAndHandleConnection(connection) { } return; } - const messageOK = await fluxCommunicationUtils.verifyOriginalFluxBroadcast(msgObj, undefined, currentTimeStamp); - if (messageOK === true) { - if (msgObj.data.type === 'zelappregister' || msgObj.data.type === 'zelappupdate' || msgObj.data.type === 'fluxappregister' || msgObj.data.type === 'fluxappupdate') { - handleAppMessages(msgObj, ip, port); - } else if (msgObj.data.type === 'fluxapprequest') { - fluxCommunicationMessagesSender.respondWithAppMessage(msgObj, websocket); - } else if (msgObj.data.type === 'fluxapprunning') { - handleAppRunningMessage(msgObj, ip, port); - } else if (msgObj.data.type === 'fluxipchanged') { - handleIPChangedMessage(msgObj, ip, port); - } else if (msgObj.data.type === 'fluxappremoved') { - handleAppRemovedMessage(msgObj, ip, port); - } else { - log.warn(`Unrecognised message type of ${msgObj.data.type}`); - } - } else { + + const stale = networkStateService.isBroadcastStale(msgObj); + + if (stale) return; + + const messageOk = await networkStateService.verifyBroadcast(msgObj); + + if (!messageOk) { // we dont like this peer as it sent wrong message (wrong, or message belonging to node no longer on network). Lets close the connection // and add him to blocklist try { // check if message comes from IP belonging to the public Key - const zl = await fluxCommunicationUtils.deterministicFluxList(pubKey); // this itself is sufficient. + const zl = await networkStateService.getFluxnodesByPubkey(pubKey); // this itself is sufficient. const possibleNodes = zl.filter((key) => key.pubkey === pubKey); // another check in case sufficient check failed on daemon level const nodeFound = possibleNodes.find((n) => n.ip === connection); // connection is either ip or ip:port (if port is not 16127) if (!nodeFound) { @@ -685,6 +708,25 @@ async function initiateAndHandleConnection(connection) { } catch (e) { log.error(e); } + return; + } + + const appMessageTypes = ['fluxappregister', 'fluxappupdate', 'zelappupdate', 'zelappregister']; + + const { type: msgType } = data; + + if (appMessageTypes.includes(msgType)) { + handleAppMessages(msgObj, ip, port); + } else if (msgType === 'fluxapprequest') { + fluxCommunicationMessagesSender.respondWithAppMessage(msgObj, websocket); + } else if (msgType === 'fluxapprunning') { + handleAppRunningMessage(msgObj, ip, port); + } else if (msgType === 'fluxipchanged') { + handleIPChangedMessage(msgObj, ip, port); + } else if (msgType === 'fluxappremoved') { + handleAppRemovedMessage(msgObj, ip, port); + } else { + log.warn(`Unrecognised message type of ${msgType}`); } }; @@ -753,7 +795,7 @@ async function addPeer(req, res) { * @param {object} res Response. * @returns {object} Message. */ -async function addOutgoingPeer(req, res) { +function addOutgoingPeer(req, res) { try { let { ip } = req.params; ip = ip || req.query.ip; @@ -779,7 +821,7 @@ async function addOutgoingPeer(req, res) { return res.json(errMessage); } - const nodeList = await fluxCommunicationUtils.deterministicFluxList(); + const nodeList = networkStateService.networkState(); const fluxNode = nodeList.find((node) => node.ip.split(':')[0] === ip.split(':')[0] && (node.ip.split(':')[1] || 16127) === port); if (!fluxNode) { const errMessage = messageHelper.createErrorMessage(`FluxNode ${ip.split(':')[0]}:${port} is not confirmed on the network.`); @@ -815,7 +857,7 @@ async function fluxDiscovery() { const myIP = await fluxNetworkHelper.getMyFluxIPandPort(); if (myIP) { - nodeList = await fluxCommunicationUtils.deterministicFluxList(); + nodeList = networkStateService.networkState(); numberOfFluxNodes = nodeList.length; const fluxNode = nodeList.find((node) => node.ip === myIP); if (!fluxNode) { @@ -858,7 +900,7 @@ async function fluxDiscovery() { // additional precaution const clientExists = outgoingConnections.find((client) => client.ip === ipInc && client.port === portInc); const clientIncomingExists = incomingConnections.find((client) => client.ip === ipInc && client.port === portInc); - if (!clientExists && !clientIncomingExists) { + if (ipInc && !clientExists && !clientIncomingExists) { deterministicPeerConnections = true; initiateAndHandleConnection(ip); // eslint-disable-next-line no-await-in-loop @@ -874,7 +916,7 @@ async function fluxDiscovery() { // additional precaution const clientExists = outgoingConnections.find((client) => client.ip === ipInc && client.port === portInc); const clientIncomingExists = incomingConnections.find((client) => client.ip === ipInc && client.port === portInc); - if (!clientExists && !clientIncomingExists) { + if (ipInc && !clientExists && !clientIncomingExists) { deterministicPeerConnections = true; // eslint-disable-next-line no-await-in-loop await serviceHelper.axiosGet(`http://${ipInc}:${portInc}/flux/addoutgoingpeer/${myIP}`).catch((error) => log.error(error)); @@ -889,7 +931,7 @@ async function fluxDiscovery() { while ((outgoingConnections.length < 14 || [...new Set(outgoingConnections.map((client) => client.ip))].length < 9) && index < 100) { // Max of 14 outgoing connections - 8 possible deterministic + min. 6 random index += 1; // eslint-disable-next-line no-await-in-loop - const connection = await fluxNetworkHelper.getRandomConnection(); + const connection = fluxNetworkHelper.getRandomConnection(); if (connection) { const ipInc = connection.split(':')[0]; const portInc = connection.split(':')[1] || '16127'; @@ -897,7 +939,7 @@ async function fluxDiscovery() { const sameConnectedIp = currentIpsConnTried.find((connectedIP) => connectedIP === ipInc); const clientExists = outgoingConnections.find((client) => client.ip === ipInc && client.port === portInc); const clientIncomingExists = incomingConnections.find((client) => client.ip === ipInc && client.port === portInc); - if (!sameConnectedIp && !clientExists && !clientIncomingExists) { + if (ipInc && !sameConnectedIp && !clientExists && !clientIncomingExists) { log.info(`Adding random Flux peer: ${connection}`); currentIpsConnTried.push(connection); initiateAndHandleConnection(connection); @@ -910,7 +952,7 @@ async function fluxDiscovery() { while ((incomingConnections.length < 12 || [...new Set(incomingConnections.map((client) => client.ip))].length < 5) && index < 100) { // Max of 12 incoming connections - 8 possible deterministic + min. 4 random (we will get more random as others nodes have more random outgoing connections) index += 1; // eslint-disable-next-line no-await-in-loop - const connection = await fluxNetworkHelper.getRandomConnection(); + const connection = fluxNetworkHelper.getRandomConnection(); if (connection) { const ipInc = connection.split(':')[0]; const portInc = connection.split(':')[1] || '16127'; @@ -918,7 +960,7 @@ async function fluxDiscovery() { const sameConnectedIp = currentIpsConnTried.find((connectedIP) => connectedIP === ipInc); const clientExists = outgoingConnections.find((client) => client.ip === ipInc && client.port === portInc); const clientIncomingExists = incomingConnections.find((client) => client.ip === ipInc && client.port === portInc); - if (!sameConnectedIp && !clientExists && !clientIncomingExists) { + if (ipInc && !sameConnectedIp && !clientExists && !clientIncomingExists) { log.info(`Asking random Flux ${connection} to add us as a peer`); currentIpsConnTried.push(connection); // eslint-disable-next-line no-await-in-loop diff --git a/ZelBack/src/services/fluxCommunicationMessagesSender.js b/ZelBack/src/services/fluxCommunicationMessagesSender.js index a7ae1cf69..b48b626c9 100644 --- a/ZelBack/src/services/fluxCommunicationMessagesSender.js +++ b/ZelBack/src/services/fluxCommunicationMessagesSender.js @@ -10,6 +10,16 @@ const { outgoingConnections, outgoingPeers, incomingPeers, incomingConnections, } = require('./utils/establishedConnections'); +/** + * @typedef {{ + * version: number, + * timestamp: number, + * pubKey: string, + * signature: string, + * data : object, + * }} FluxNetworkMessage + */ + // default cache const LRUoptions = { max: 1000, diff --git a/ZelBack/src/services/fluxCommunicationUtils.js b/ZelBack/src/services/fluxCommunicationUtils.js deleted file mode 100644 index 4ebca9db4..000000000 --- a/ZelBack/src/services/fluxCommunicationUtils.js +++ /dev/null @@ -1,226 +0,0 @@ -/* eslint-disable no-underscore-dangle */ -const { LRUCache } = require('lru-cache'); -const log = require('../lib/log'); -const serviceHelper = require('./serviceHelper'); -const verificationHelper = require('./verificationHelper'); -const daemonServiceFluxnodeRpcs = require('./daemonService/daemonServiceFluxnodeRpcs'); -// default cache -const LRUoptions = { - max: 20000, // currently 20000 nodes - ttl: 1000 * 240, // 240 seconds, allow up to 2 blocks - maxAge: 1000 * 240, // 240 seconds, allow up to 2 blocks -}; - -const myCache = new LRUCache(LRUoptions); - -let addingNodesToCache = false; - -/** - * To constantly update deterministic Flux list every 2 minutes so we always trigger cache and have up to date value - */ -async function constantlyUpdateDeterministicFluxList() { - try { - while (addingNodesToCache) { - // prevent several instances filling the cache at the same time. - // eslint-disable-next-line no-await-in-loop - await serviceHelper.delay(100); - } - addingNodesToCache = true; - const request = { - params: {}, - query: {}, - }; - const daemonFluxNodesList = await daemonServiceFluxnodeRpcs.viewDeterministicFluxNodeList(request); - if (daemonFluxNodesList.status === 'success') { - const generalFluxList = daemonFluxNodesList.data || []; - myCache.set('fluxList', generalFluxList); - } - addingNodesToCache = false; - await serviceHelper.delay(2 * 60 * 1000); // 2 minutes - constantlyUpdateDeterministicFluxList(); - } catch (error) { - addingNodesToCache = false; - log.error(error); - await serviceHelper.delay(2 * 60 * 1000); // 2 minutes - constantlyUpdateDeterministicFluxList(); - } -} - -/** - * To get deterministc Flux list from cache. - * @param {string} filter Filter. Can only be a publicKey. - * @returns {(*|*)} Value of any type or an empty array of any type. - */ -async function deterministicFluxList(filter) { - try { - while (addingNodesToCache) { - // prevent several instances filling the cache at the same time. - // eslint-disable-next-line no-await-in-loop - await serviceHelper.delay(100); - } - let fluxList; - if (filter) { - fluxList = myCache.get(`fluxList${serviceHelper.ensureString(filter)}`); - } else { - fluxList = myCache.get('fluxList'); - } - if (!fluxList) { - let generalFluxList = myCache.get('fluxList'); - addingNodesToCache = true; - if (!generalFluxList) { - const request = { - params: {}, - query: {}, - }; - const daemonFluxNodesList = await daemonServiceFluxnodeRpcs.viewDeterministicFluxNodeList(request); - if (daemonFluxNodesList.status === 'success') { - generalFluxList = daemonFluxNodesList.data || []; - myCache.set('fluxList', generalFluxList); - if (filter) { - const filterFluxList = generalFluxList.filter((node) => node.pubkey === filter); - myCache.set(`fluxList${serviceHelper.ensureString(filter)}`, filterFluxList); - } - } - } else { // surely in filtered branch too - const filterFluxList = generalFluxList.filter((node) => node.pubkey === filter); - myCache.set(`fluxList${serviceHelper.ensureString(filter)}`, filterFluxList); - } - addingNodesToCache = false; - if (filter) { - fluxList = myCache.get(`fluxList${serviceHelper.ensureString(filter)}`); - } else { - fluxList = myCache.get('fluxList'); - } - } - return fluxList || []; - } catch (error) { - log.error(error); - return []; - } -} - -/** - * To verify Flux broadcast. - * @param {object} data Data containing public key, timestamp, signature and version. - * @param {object[]} obtainedFluxNodesList List of FluxNodes. - * @param {number} currentTimeStamp Current timestamp. - * @returns {boolean} False unless message is successfully verified. - */ -async function verifyFluxBroadcast(data, obtainedFluxNodesList, currentTimeStamp) { - const dataObj = serviceHelper.ensureObject(data); - const { pubKey } = dataObj; - const { timestamp } = dataObj; // ms - const { signature } = dataObj; - const { version } = dataObj; - // only version 1 is active - if (version !== 1) { - return false; - } - const message = serviceHelper.ensureString(dataObj.data); - // is timestamp valid ? - // eslint-disable-next-line no-param-reassign - currentTimeStamp = currentTimeStamp || Date.now(); // ms - if (currentTimeStamp < (timestamp - 120000)) { // message was broadcasted in the future. Allow 120 sec clock sync - log.error('Message from future'); - return false; - } - - let node = null; - if (obtainedFluxNodesList) { // for test purposes. - node = obtainedFluxNodesList.find((key) => key.pubkey === pubKey); - if (!node) { - return false; - } - } - if (!node) { - // node that broadcasted the message has to be on list - // pubkey of the broadcast has to be on the list - let zl = await deterministicFluxList(pubKey); - if (dataObj.data && dataObj.data.type === 'fluxapprunning') { - node = zl.find((key) => key.pubkey === pubKey && dataObj.data.ip && dataObj.data.ip === key.ip); // check ip is on the network and belongs to broadcasted public key - if (!node) { - zl = await deterministicFluxList(); - node = zl.find((key) => key.pubkey === pubKey && dataObj.data.ip === key.ip); // check ip is on the network and belongs to broadcasted public key - if (!node) { - log.warn(`Invalid fluxapprunning message, ip: ${dataObj.data.ip} pubkey: ${pubKey}`); // most of invalids are caused because our deterministic list is cached for couple of minutes - return false; - } - } - } else if (dataObj.data && dataObj.data.type === 'fluxipchanged') { - node = zl.find((key) => key.pubkey === pubKey && dataObj.data.oldIP && dataObj.data.oldIP === key.ip); // check ip is on the network and belongs to broadcasted public key - if (!node) { - zl = await deterministicFluxList(); - node = zl.find((key) => key.pubkey === pubKey && dataObj.data.oldIP === key.ip); // check ip is on the network and belongs to broadcasted public key - if (!node) { - log.warn(`Invalid fluxipchanged message, oldIP: ${dataObj.data.oldIP} pubkey: ${pubKey}`); - return false; - } - } - } else if (dataObj.data && dataObj.data.type === 'fluxappremoved') { - node = zl.find((key) => key.pubkey === pubKey && dataObj.data.ip && dataObj.data.ip === key.ip); // check ip is on the network and belongs to broadcasted public key - if (!node) { - zl = await deterministicFluxList(); - node = zl.find((key) => key.pubkey === pubKey && dataObj.data.ip === key.ip); // check ip is on the network and belongs to broadcasted public key - if (!node) { - log.warn(`Invalid fluxappremoved message, ip: ${dataObj.data.ip} pubkey: ${pubKey}`); - return false; - } - } - } else { - node = zl.find((key) => key.pubkey === pubKey); - } - } - if (!node) { - log.warn(`No node belonging to ${pubKey} found`); - return false; - } - const messageToVerify = version + message + timestamp; - const verified = verificationHelper.verifyMessage(messageToVerify, pubKey, signature); - if (verified === true) { - return true; - } - return false; -} - -/** - * To verify timestamp in Flux broadcast. - * @param {object} data Data. - * @param {number} currentTimeStamp Current timestamp. - * @returns {boolean} False unless current timestamp is within 5 minutes of the data object's timestamp. - */ -function verifyTimestampInFluxBroadcast(data, currentTimeStamp, maxOld = 300000) { - // eslint-disable-next-line no-param-reassign - const dataObj = serviceHelper.ensureObject(data); - const { timestamp } = dataObj; // ms - // eslint-disable-next-line no-param-reassign - currentTimeStamp = currentTimeStamp || Date.now(); // ms - if (currentTimeStamp < (timestamp + maxOld)) { // not older than 5 mins - return true; - } - log.warn(`Timestamp ${timestamp} of message is too old ${currentTimeStamp}}`); - return false; -} - -/** - * To verify original Flux broadcast. Extends verifyFluxBroadcast by not allowing request older than 5 mins. - * @param {object} data Data. - * @param {object[]} obtainedFluxNodeList List of FluxNodes. - * @param {number} currentTimeStamp Current timestamp. - * @returns {boolean} False unless message is successfully verified. - */ -async function verifyOriginalFluxBroadcast(data, obtainedFluxNodeList, currentTimeStamp) { - const timeStampOK = verifyTimestampInFluxBroadcast(data, currentTimeStamp); - if (timeStampOK) { - const broadcastOK = await verifyFluxBroadcast(data, obtainedFluxNodeList, currentTimeStamp); - return broadcastOK; - } - return false; -} - -module.exports = { - constantlyUpdateDeterministicFluxList, - verifyTimestampInFluxBroadcast, - verifyOriginalFluxBroadcast, - deterministicFluxList, - verifyFluxBroadcast, -}; diff --git a/ZelBack/src/services/fluxNetworkHelper.js b/ZelBack/src/services/fluxNetworkHelper.js index bc5ad3eb1..a6a6ad5a6 100644 --- a/ZelBack/src/services/fluxNetworkHelper.js +++ b/ZelBack/src/services/fluxNetworkHelper.js @@ -18,7 +18,7 @@ const daemonServiceBenchmarkRpcs = require('./daemonService/daemonServiceBenchma const daemonServiceWalletRpcs = require('./daemonService/daemonServiceWalletRpcs'); const benchmarkService = require('./benchmarkService'); const verificationHelper = require('./verificationHelper'); -const fluxCommunicationUtils = require('./fluxCommunicationUtils'); +const networkStateService = require('./networkStateService'); const { outgoingConnections, outgoingPeers, incomingPeers, incomingConnections, } = require('./utils/establishedConnections'); @@ -268,7 +268,7 @@ async function checkAppAvailability(req, res) { const ipPort = processedBody.port; // pubkey of the message has to be on the list - const zl = await fluxCommunicationUtils.deterministicFluxList(pubKey); // this itself is sufficient. + const zl = await networkStateService.getFluxnodesByPubkey(pubKey); // this itself is sufficient. const node = zl.find((key) => key.pubkey === pubKey); // another check in case sufficient check failed on daemon level const dataToVerify = processedBody; delete dataToVerify.signature; @@ -406,8 +406,8 @@ async function getFluxNodePublicKey(privatekey) { * To get a random connection. * @returns {string} IP:Port or just IP if default. */ -async function getRandomConnection() { - const nodeList = await fluxCommunicationUtils.deterministicFluxList(); +function getRandomConnection() { + const nodeList = networkStateService.networkState(); const zlLength = nodeList.length; if (zlLength === 0) { return null; @@ -682,7 +682,7 @@ async function checkMyFluxAvailability(retryNumber = 0) { if (!fluxBenchVersionAllowed) { return false; } - let askingIP = await getRandomConnection(); + let askingIP = getRandomConnection(); if (typeof askingIP !== 'string' || typeof myFluxIP !== 'string' || myFluxIP === askingIP) { return false; } @@ -764,7 +764,7 @@ async function checkMyFluxAvailability(retryNumber = 0) { } const measuredUptime = fluxUptime(); if (measuredUptime.status === 'success' && measuredUptime.data > config.fluxapps.minUpTime) { // node has been running for 30 minutes. Upon starting a node, there can be dos that needs resetting - const nodeList = await fluxCommunicationUtils.deterministicFluxList(); + const nodeList = networkStateService.networkState(); // nodeList must include our fluxnode ip myIP let myCorrectIp = `${myIP}:${apiPort}`; if (apiPort === 16127 || apiPort === '16127') { @@ -903,7 +903,7 @@ async function checkDeterministicNodesCollisions() { }, 120 * 1000); return; } - const nodeList = await fluxCommunicationUtils.deterministicFluxList(); + const nodeList = networkStateService.networkState(); const result = nodeList.filter((node) => node.ip === myIP); const nodeStatus = await daemonServiceFluxnodeRpcs.getFluxNodeStatus(); if (nodeStatus.status === 'success') { // different scenario is caught elsewhere diff --git a/ZelBack/src/services/networkStateService.js b/ZelBack/src/services/networkStateService.js new file mode 100644 index 000000000..ec278f8e9 --- /dev/null +++ b/ZelBack/src/services/networkStateService.js @@ -0,0 +1,194 @@ +const config = require('config'); +const log = require('../lib/log'); +const serviceHelper = require('./serviceHelper'); +const verificationHelper = require('./verificationHelper'); +const daemonServiceFluxnodeRpcs = require('./daemonService/daemonServiceFluxnodeRpcs'); +const networkStateManager = require('./utils/networkStateManager'); + +/** + * @typedef {import('./utils/networkStateManager').Fluxnode} Fluxnode + * @typedef {import('./fluxCommunicationMessagesSender').FluxNetworkMessage} FluxNetworkMessage + */ + +/** + * The NetworkStateManager object. Responsible for fetching the nodelist, + * and maintaining indexes for fast access. + */ +let stateManager = null; + +/** + * Starts the network state service. It will either subscribe to the fluxd zmq + * endpoint if possible, or fallback to polling. + * @returns {void} + */ +async function start() { + const { daemon: { zmqport } } = config ?? { daemon: { zmqport: 16126 } }; + + const zmqEndpoint = `tcp://127.0.0.1:${zmqport}`; + + return new Promise((resolve, reject) => { + if (stateManager) resolve(); + + const fetcher = async (filter = null) => { + const options = { params: { useCache: false, filter }, query: { filter: null } }; + + const res = await daemonServiceFluxnodeRpcs.viewDeterministicFluxNodeList( + options, + ); + + const nodes = res.status === 'success' ? res.data : null; + + return nodes; + }; + + stateManager = new networkStateManager.NetworkStateManager(fetcher, { + intervalMs: 120_000, + zmqEndpoint, + }); + + const timeout = setTimeout( + () => reject(new Error('Unable To Start: Timeout of 300s reached')), + 300_000, + ); + + stateManager.once('populated', () => { + clearTimeout(timeout); + resolve(); + }); + + stateManager.start(); + }); +} + +async function stop() { + if (!stateManager) return; + + await stateManager.stop(); + stateManager = null; +} + +/** + * Returns the entire fluxnode network state + * @returns {Array} + */ +function networkState() { + if (!stateManager) return []; + + return stateManager.state; +} + +async function getFluxnodesByPubkey(pubkey) { + const nodes = await stateManager.search(pubkey, 'pubkey'); + // in the future, just return the map. + return nodes ? [...nodes.values()] : []; +} + +/** + * + * @param {FluxNetworkMessage} broadcast + * @param {{maxAge?: number}} options + * @returns {boolean} + */ +function isBroadcastStale(broadcast, options = {}) { + const maxAge = options.maxAge || 300_000; + + const { timestamp } = broadcast; + const now = Date.now(); + + if (now > timestamp + maxAge) { + log.warn( + `isBroadcastStale: Timestamp ${timestamp} of broadcast is too old ${now}}`, + ); + return true; + } + return false; +} + +/** + * To verify Flux broadcast. + * @param {object} broadcast Flux network layer message containing public key, timestamp, signature and version. + * @returns {Promise} False unless message is successfully verified. + */ +async function verifyBroadcast(broadcast) { + const { + pubKey, timestamp, signature, version, data: payload, + } = broadcast; + + if (version !== 1) return false; + + const message = serviceHelper.ensureString(payload); + + if (!message) return false; + + const { type: msgType } = payload; + + if (!msgType) return false; + + const now = Date.now(); + + // message was broadcasted in the future. Allow 120 sec clock sync + if (now < timestamp - 120_000) { + log.error('VerifyBroadcast: Message from future, rejecting'); + return false; + } + + const nodes = await stateManager.search(pubKey, 'pubkey'); + + let error = ''; + let target = ''; + + switch (msgType) { + case 'fluxapprunning': + target = payload.ip; + // most of invalids are caused because our deterministic list is cached for couple of minutes + error = `Invalid fluxapprunning message, ip: ${payload.ip} pubkey: ${pubKey}`; + break; + + case 'fluxipchanged': + target = payload.oldIP; + error = `Invalid fluxipchanged message, oldIP: ${payload.oldIP} pubkey: ${pubKey}`; + break; + + case 'fluxappremoved': + target = payload.ip; + error = `Invalid fluxappremoved message, ip: ${payload.ip} pubkey: ${pubKey}`; + break; + + // zelappregister zelappupdate fluxappregister fluxappupdate + default: + // we take the first node. Why??? What does this validate? + target = nodes.size ? nodes.keys().next().value : null; + error = `No node belonging to ${pubKey} found`; + } + + const node = nodes.get(target) || await stateManager.search(target, 'endpoint'); + + if (!node) { + log.warn(error); + return false; + } + + const messageToVerify = version + message + timestamp; + const verified = verificationHelper.verifyMessage( + messageToVerify, + pubKey, + signature, + ); + return verified; +} + +if (require.main === module) { + start(); + setInterval(() => { + console.log(stateManager.search('045ae66321cfc172086d79252323b6cd4b83460e580e88f220582affda8a83b3ec68078ad80f7e465c42c3ef9bc01b912b3663e2ba09057bc43fbedf0afa9f3864', 'pubkey')); + }, 5_000); +} + +module.exports = { + getFluxnodesByPubkey, + isBroadcastStale, + networkState, + start, + stop, + verifyBroadcast, +}; diff --git a/ZelBack/src/services/serviceManager.js b/ZelBack/src/services/serviceManager.js index bdea40a34..9637c69f7 100644 --- a/ZelBack/src/services/serviceManager.js +++ b/ZelBack/src/services/serviceManager.js @@ -4,7 +4,7 @@ const log = require('../lib/log'); const dbHelper = require('./dbHelper'); const explorerService = require('./explorerService'); const fluxCommunication = require('./fluxCommunication'); -const fluxCommunicationUtils = require('./fluxCommunicationUtils'); +const networkStateService = require('./networkStateService'); const fluxNetworkHelper = require('./fluxNetworkHelper'); const appsService = require('./appsService'); const daemonServiceMiscRpcs = require('./daemonService/daemonServiceMiscRpcs'); @@ -103,9 +103,9 @@ async function startFluxFunctions() { }); log.info('Mongodb zelnodetransactions dropped'); - setTimeout(() => { - fluxCommunicationUtils.constantlyUpdateDeterministicFluxList(); // updates deterministic flux list for communication every 2 minutes, so we always trigger cache and have up to date value - }, 15 * 1000); + // fetch the fluxnode list. This is probably broken - if daemon isn't running, will hang here + await networkStateService.start(); + setTimeout(async () => { log.info('Rechecking firewall app rules'); fluxNetworkHelper.purgeUFW(); diff --git a/ZelBack/src/services/systemService.js b/ZelBack/src/services/systemService.js index 940372e52..231429b7e 100644 --- a/ZelBack/src/services/systemService.js +++ b/ZelBack/src/services/systemService.js @@ -516,6 +516,15 @@ async function monitorSystem() { } } +async function restartSystemdService(service) { + const { error } = await serviceHelper.runCommand('systemctl', { + runAsRoot: true, + params: ['restart', service], + }); + + return !error; +} + async function mongoDBConfig() { log.info('MongoDB file config verification...'); try { @@ -601,11 +610,11 @@ async function mongodGpgKeyVeryfity() { } log.info('The key was updated successfully.'); return true; - // eslint-disable-next-line no-else-return + // eslint-disable-next-line no-else-return } else { throw new Error('MongoDB version not found.'); } - // eslint-disable-next-line no-else-return + // eslint-disable-next-line no-else-return } else { log.info('MongoDB GPG key is still valid.'); return true; @@ -631,6 +640,7 @@ module.exports = { monitorSyncthingPackage, queueAptGetCommand, resetTimers, + restartSystemdService, updateAptCache, upgradePackage, mongoDBConfig, diff --git a/ZelBack/src/services/utils/daemonrpcClient.js b/ZelBack/src/services/utils/daemonrpcClient.js index 73e89506b..a6e491adf 100644 --- a/ZelBack/src/services/utils/daemonrpcClient.js +++ b/ZelBack/src/services/utils/daemonrpcClient.js @@ -2,8 +2,12 @@ const daemonrpc = require('daemonrpc'); const fullnode = require('fullnode'); const config = require('config'); +userconfig = require('../../../../config/userconfig'); + +const { initial: isTestnet } = userconfig; + const fnconfig = new fullnode.Config(); -const isTestnet = userconfig.initial.testnet; + const rpcuser = fnconfig.rpcuser() || 'rpcuser'; const rpcpassword = fnconfig.rpcpassword() || 'rpcpassword'; const rpcport = fnconfig.rpcport() || (isTestnet === true ? config.daemon.rpcporttestnet : config.daemon.rpcport); diff --git a/ZelBack/src/services/utils/networkStateManager.js b/ZelBack/src/services/utils/networkStateManager.js new file mode 100644 index 000000000..6549f6ddb --- /dev/null +++ b/ZelBack/src/services/utils/networkStateManager.js @@ -0,0 +1,412 @@ +// const { Worker } = require('node:worker_threads'); + +const { EventEmitter } = require('node:events'); +// const { FluxController } = require('../zelflux/ZelBack/src/services/utils/fluxController'); + +const { FluxController } = require('./fluxController'); + +const zmq = require('zeromq'); + +/** + * The Fluxnode as returned by fluxd + * @typedef {{ + * collateral: string, + * txhash: string, + * outidx: number, + * ip: string, + * network: string, + * added_height: number, + * confirmed_height: number, + * last_confirmed_height: number, + * last_paid_height: number, + * tier: string, + * payment_address: string, + * pubkey: string, + * activesince: string, + * lastpaid: string, + * amount: string, + * rank: number + * }} Fluxnode + */ + +class NetworkStateManager extends EventEmitter { + #state = []; + + #pubkeyIndex = new Map(); + + #endpointIndex = new Map(); + + #controller = new FluxController(); + + #socket = zmq.socket('sub'); + + #socketConnected = null; + + #indexStart = null; + + #updatingState = false; + + /** + * @type { "polling" | "subscription" } + */ + #updateTrigger = 'subscription'; + + /** + * + * @param {()=>Promise>} stateFetcher + * @param {{intervalMs?: number, zmqEndpoint?: string, fallbackTimeout?: number}} options + */ + constructor(stateFetcher, options = {}) { + super(); + + this.stateFetcher = stateFetcher; + this.intervalMs = options.intervalMs || 120_000; + this.zmqEndpoint = options.zmqEndpoint || null; + + if (!this.zmqEndpoint) this.#updateTrigger = 'polling'; + + this.fallbackTimeout = options.fallbackTimeout || 120_000; + + this.#socket.setsockopt(zmq.ZMQ_RECONNECT_IVL, 500); + this.#socket.setsockopt(zmq.ZMQ_RECONNECT_IVL_MAX, 15_000); + this.#socket.setsockopt(zmq.ZMQ_HEARTBEAT_IVL, 20_000); + this.#socket.setsockopt(zmq.ZMQ_HEARTBEAT_TIMEOUT, 60_000); + this.#socket.setsockopt(zmq.ZMQ_CONNECT_TIMEOUT, 3_000); + this.#socket.monitor(); + } + + get state() { + // should probably return a complete copy + return this.#state; + } + + get updateTrigger() { + return this.#updateTrigger; + } + + get indexesReady() { + return this.#controller.lock.ready; + } + + /** + * Index map. Has to be a getter and not a field, as the field doesn't update + * the reference. + */ + get #indexes() { + return { pubkey: this.#pubkeyIndex, endpoint: this.#endpointIndex }; + } + + #setIndexes(pubkeyIndex, endpointIndex) { + this.#pubkeyIndex = pubkeyIndex; + this.#endpointIndex = endpointIndex; + } + + async #buildIndexes(nodes) { + // nodes.forEach((node) => { + // if (!this.#pubkeyIndex.has(node.pubkey)) { + // this.#pubkeyIndex.set(node.pubkey, new Map()); + // } + // this.#pubkeyIndex.get(node.pubkey).set(node.ip, node); + // this.#endpointIndex.set(node.ip, node); + // }); + + // if we are building an index already, just wait for it to finish. + // maybe look at cancelling it in future. + await this.#controller.lock.enable(); + + const nodeCount = nodes.length; + + const pubkeyIndex = new Map(); + const endpointIndex = new Map(); + + function iterIndexes(startIndex, callback) { + const endIndex = startIndex + 1000; + const chunk = nodes.slice(startIndex, endIndex); + + chunk.forEach((node) => { + const nodesByPubkey = pubkeyIndex.get(node.pubkey) + || pubkeyIndex.set(node.pubkey, new Map()).get(node.pubkey); + + nodesByPubkey.set(node.ip, node); + endpointIndex.set(node.ip, node); + }); + + if (endIndex >= nodeCount) { + callback(); + return; + } + + setImmediate(iterIndexes.bind(this, endIndex, callback)); + } + + // Yield to the event queue here, this way we are only ever doing O(1000), + // instead of O(n). With around 13k nodes, this was taking on average 8ms. + // I.e. the event queue was blocked for 8ms. Now we yield. I was using a + // worker here, but overkill for what we are doing. + + return new Promise((resolve) => { + iterIndexes(0, () => { + this.#setIndexes(pubkeyIndex, endpointIndex); + console.log('pubkeyIndexSize:', pubkeyIndex.size); + console.log('endpointIndexSize:', endpointIndex.size); + this.#controller.lock.disable(); + resolve(); + }); + }); + } + + reset() { + // recreate objects so they can't be mutated externally + this.#pubkeyIndex = new Map(); + this.#endpointIndex = new Map(); + this.#state = []; + } + + async fetchNetworkState() { + console.log('fetching state'); + this.#updatingState = true; + // always use monotonic clock for any elapsed times + const start = process.hrtime.bigint(); + + let state = []; + + while (!state.length) { + if (this.#controller.aborted) break; + + const fetchStart = process.hrtime.bigint(); + // eslint-disable-next-line no-await-in-loop + const res = await this.stateFetcher().catch(() => { + console.log('state fetcher error'); + return []; + }); + + console.log('Fetch finished, elapsed ms:', Number(process.hrtime.bigint() - fetchStart) / 1000000); + + // eslint-disable-next-line no-await-in-loop + state = res || await this.#controller.sleep(15_000); + } + + const populated = Boolean(this.#state.length); + + if (state.length) { + console.log('Nodes found:', state.length); + + this.#state = state; + + console.log('Setting state and indexes'); + this.#indexStart = process.hrtime.bigint(); + + await this.#buildIndexes(this.#state); + console.log('Indexes created, elapsed ms:', Number(process.hrtime.bigint() - this.#indexStart) / 1000000); + this.#indexStart = null; + + if (!populated) this.emit('populated'); + this.emit('updated'); + } + + this.#updatingState = false; + + const elapsed = Number(process.hrtime.bigint() - start) / 1000000; + return this.intervalMs - elapsed; + } + + #handleBlock(msgBuf, seqBuf) { + const blockId = msgBuf.toString('hex'); + const seq = seqBuf.readUInt32LE(0); + console.log(`hashblock: ${blockId}, sequence: ${seq}`); + + if (this.#updatingState) this.#controller.abort(); + this.fetchNetworkState(); + + // should we await fetch before emitting block? + this.emit('block', blockId); + } + + #handleMessage(topicBuf, msgBuf, seqBuf) { + const topic = topicBuf.toString(); + + switch (topic) { + case 'hashblock': + this.#handleBlock(msgBuf, seqBuf); + break; + default: + throw new Error(`Unknown topic: ${topic}`); + } + } + + /** + * Test if the zmqendpoint is connectable. Takes about 8 seconds to fail + */ + async #probeZmqEndpoint() { + const maxRetries = 10; + let retryCounter = 0; + + const probe = zmq.socket('sub'); + + probe.setsockopt(zmq.ZMQ_RECONNECT_IVL, 200); + probe.setsockopt(zmq.ZMQ_RECONNECT_IVL_MAX, 1000); + probe.setsockopt(zmq.ZMQ_CONNECT_TIMEOUT, 3_000); + + probe.monitor(); + + return new Promise((resolve) => { + probe.on('connect', () => { + console.log('probe connected'); + probe.unmonitor(); + probe.close(); + resolve(true); + }); + + probe.on('connect_retry', (retryTimer) => { + retryCounter += 1; + + if (retryCounter < maxRetries) { + console.log('probe retrying ms:', retryTimer); + return; + } + + console.log('max retries hit, bailing'); + probe.unmonitor(); + probe.close(); + resolve(false); + }); + + probe.connect(this.zmqEndpoint); + }); + } + + async #startSubscription() { + console.log('start subscription'); + await this.fetchNetworkState(); + + const zmqEnabled = await this.#probeZmqEndpoint(); + + if (!zmqEnabled) { + console.log('zmq not enabled'); + this.#updateTrigger = 'polling'; + this.#startPolling(); + return; + } + + console.log('probe connected, setting up subscription'); + + this.#socket.on('connect', () => { + if (this.#socketConnected === false) { + // State: previously connected, now reconnected. + clearTimeout(this.fallbackTimer); + this.fallbackTimer = null; + this.#controller.stopLoop(); + } + + this.#socketConnected = true; + this.emit('zmqConnected'); + }); + + this.#socket.on('disconnect', () => { + this.#socketConnected = false; + this.emit('zmqDisconnected'); + this.fallbackTimer = setTimeout(() => this.#startPolling(), this.fallbackTimeout); + }); + + this.#socket.on('message', (...args) => this.#handleMessage(...args)); + + this.#socket.connect(this.zmqEndpoint); + this.#socket.subscribe('hashblock'); + } + + #startPolling() { + this.#controller.startLoop(this.fetchNetworkState); + } + + async start() { + const starter = this.zmqEndpoint + ? this.#startSubscription + : this.#startPolling; + + starter.bind(this)(); + } + + async stop() { + // is this right? + await this.#controller.abort(); + } + + /** + * Find a node in the fluxnode network state by either pubkey or endpoint + * + * @param {string} filter Pubkey or endpoint (ip:port) + * @param {"pubkey"|"endpoint"} type + * @returns + */ + async search(filter, type) { + if (!filter) return null; + if (!Object.keys(this.#indexes).includes(type)) return null; + + // if we are mid stroke indexing, may as well wait the 10ms (max) and get the + // latest block + await this.indexesReady; + + const cached = this.#indexes[type].get(filter); + return cached || null; + } +} + +async function main() { + // eslint-disable-next-line global-require + const daemonServiceFluxnodeRpcs = require('../daemonService/daemonServiceFluxnodeRpcs'); + + const fetcher = async (filter = null) => { + const options = { params: { useCache: false, filter }, query: { filter: null } }; + + const res = await daemonServiceFluxnodeRpcs.viewDeterministicFluxNodeList(options); + + if (res.status === 'success') { + return res.data; + } + console.log('fetcher not success'); + return []; + }; + + const network = new NetworkStateManager(fetcher, { intervalMs: 120_000, zmqEndpoint: 'tcp://127.0.0.1:28332' }); + network.on('updated', () => { + console.log('received updated event'); + }); + network.on('populated', async () => { + console.log('received populated event'); + console.log('Search result populated:', await network.search('212.71.244.159:16137', 'endpoint')); + }); + network.start(); + setInterval(async () => { + // await network.search('212.71.244.159:16137', 'endpoint'); + console.log('Search pubkey:', await network.search('045ae66321cfc172086d79252323b6cd4b83460e580e88f220582affda8a83b3ec68078ad80f7e465c42c3ef9bc01b912b3663e2ba09057bc43fbedf0afa9f3864', 'pubkey')); + }, 5_000); +} + +if (require.main === module) { + main(); +} + +module.exports = { NetworkStateManager }; + +// interesting stuff: + +// 6 nodes with no ip address + +// ~ 420ms fetch time (on localhost) ~ 8.2Mb i/o. Not sure if this is time for fluxd +// to generate the list, or for the actual i/o on localhost. + +// ~ 20ms to build cache. This was 8ms under no load, so obviously, yielding +// to the event queue is a good thing as there is other work to be done. + +// if we need to search... we wait for indexes. What about if fetching? +// do we try for a search without waiting, then if a cache miss, we wait for +// the search to finish? + +// fetching state +// Fetch finished, elapsed ms: 418.639369 +// Nodes found: 13047 +// Setting state and indexes +// pubkeyIndexSize: 3011 +// endpointIndexSize: 13041 +// Indexes created, elapsed ms: 18.25089 +// New Flux App Removed message received. diff --git a/apiServer.js b/apiServer.js index d6ffa0dd2..5b52f717b 100644 --- a/apiServer.js +++ b/apiServer.js @@ -1,4 +1,4 @@ -global.userconfig = require('./config/userconfig'); +globalThis.userconfig = require('./config/userconfig'); if (typeof AbortController === 'undefined') { // polyfill for nodeJS 14.18.1 - without having to use experimental features diff --git a/helpers/repositories.json b/helpers/repositories.json index 78a1f9b08..392e14079 100644 --- a/helpers/repositories.json +++ b/helpers/repositories.json @@ -453,7 +453,6 @@ "ghcr.io/marctheshark3/sigmanaut-mining-pool-ui", "garyotto", "4haiko/dante-socks5-proxy", - "nyusternie", "jepbura/gaganode", "nyusternie", "moultor", diff --git a/package.json b/package.json index 3ca75a8ba..625d474a0 100644 --- a/package.json +++ b/package.json @@ -122,7 +122,8 @@ "tar-fs": "~3.0.6", "ws": "~7.5.9", "xterm": "~5.1.0", - "zeltrezjs": "~2.12.0" + "zeltrezjs": "~2.12.0", + "zeromq": "~5.3.1" }, "devDependencies": { "@babel/core": "~7.23.6", diff --git a/tests/unit/fluxCommunication.test.js b/tests/unit/fluxCommunication.test.js index 0a9cc0df5..83bc5847d 100644 --- a/tests/unit/fluxCommunication.test.js +++ b/tests/unit/fluxCommunication.test.js @@ -9,7 +9,7 @@ const fluxCommunicationMessagesSender = require('../../ZelBack/src/services/flux const fluxNetworkHelper = require('../../ZelBack/src/services/fluxNetworkHelper'); const dbHelper = require('../../ZelBack/src/services/dbHelper'); const verificationHelper = require('../../ZelBack/src/services/verificationHelper'); -const fluxCommunicationUtils = require('../../ZelBack/src/services/fluxCommunicationUtils'); +const networkStateService = require('../../ZelBack/src/services/networkStateService'); const daemonServiceMiscRpcs = require('../../ZelBack/src/services/daemonService/daemonServiceMiscRpcs'); const appsService = require('../../ZelBack/src/services/appsService'); const generalService = require('../../ZelBack/src/services/generalService'); @@ -986,7 +986,7 @@ describe('fluxCommunication tests', () => { }, }); lruRateLimitStub.returns(false); - const checkObjectSpy = sinon.spy(fluxCommunicationUtils, 'verifyOriginalFluxBroadcast'); + const checkObjectSpy = sinon.spy(networkStateService, 'verifyBroadcast'); await fluxCommunication.initiateAndHandleConnection(ip); await waitForWsConnected(wsserver); @@ -1043,53 +1043,49 @@ describe('fluxCommunication tests', () => { sinon.assert.calledWith(logSpy, 'Connection 127.0.0.2:16127 removed from outgoingPeers'); }); - const appRequestCommands = ['fluxapprequest']; - // eslint-disable-next-line no-restricted-syntax - for (const command of appRequestCommands) { - // eslint-disable-next-line no-loop-func - it(`should handle the ${command} message properly`, async () => { - const message = JSON.stringify({ - timestamp: Date.now(), - pubKey: '1234asd', - signature: 'blabla', - version: 1, - data: { - type: `${command}`, - }, - }); - const waitForWsConnected = (wss) => new Promise((resolve, reject) => { - wss.on('connection', (ws) => { - ws.send(message); - resolve(); - }); - // eslint-disable-next-line no-param-reassign - wss.onerror = (err) => { - reject(err); - }; - }); - const ip = '127.0.0.2'; - wsserver = new WebSocket.Server({ host: '127.0.0.2', port: 16127 }); - lruRateLimitStub.returns(true); - sinon.stub(LRUCache.prototype, 'has').returns(false); - const verifyOriginalFluxBroadcastStub = sinon.stub(fluxCommunicationUtils, 'verifyOriginalFluxBroadcast').returns(true); - const respondWithAppMessageStub = sinon.stub(fluxCommunicationMessagesSender, 'respondWithAppMessage').returns(true); - daemonServiceMiscRpcsStub.returns({ - data: - { - synced: false, - height: 0, - }, + // eslint-disable-next-line no-loop-func + it('should handle the fluxapprequest message properly', async () => { + const message = JSON.stringify({ + timestamp: Date.now(), + pubKey: '1234asd', + signature: 'blabla', + version: 1, + data: { + type: 'fluxapprequest', + }, + }); + const waitForWsConnected = (wss) => new Promise((resolve, reject) => { + wss.on('connection', (ws) => { + ws.send(message); + resolve(); }); - await fluxCommunication.initiateAndHandleConnection(ip); + // eslint-disable-next-line no-param-reassign + wss.onerror = (err) => { + reject(err); + }; + }); + const ip = '127.0.0.2'; + wsserver = new WebSocket.Server({ host: '127.0.0.2', port: 16127 }); + lruRateLimitStub.returns(true); + sinon.stub(LRUCache.prototype, 'has').returns(false); + const verifyBroadcastStub = sinon.stub(networkStateService, 'verifyBroadcast').returns(true); + const respondWithAppMessageStub = sinon.stub(fluxCommunicationMessagesSender, 'respondWithAppMessage').returns(true); + daemonServiceMiscRpcsStub.returns({ + data: + { + synced: false, + height: 0, + }, + }); + await fluxCommunication.initiateAndHandleConnection(ip); - await waitForWsConnected(wsserver); - // slight delay to let onopen to be triggered - await serviceHelper.delay(100); + await waitForWsConnected(wsserver); + // slight delay to let onopen to be triggered + await serviceHelper.delay(100); - sinon.assert.calledOnceWithExactly(verifyOriginalFluxBroadcastStub, JSON.parse(message), undefined, sinon.match.number); - sinon.assert.calledWith(respondWithAppMessageStub, JSON.parse(message)); - }); - } + sinon.assert.calledOnceWithExactly(verifyBroadcastStub, JSON.parse(message)); + sinon.assert.calledWith(respondWithAppMessageStub, JSON.parse(message)); + }); const registerUpdateAppList = ['zelappregister', 'zelappupdate', 'fluxappregister', 'fluxappupdate']; // eslint-disable-next-line no-restricted-syntax @@ -1119,7 +1115,7 @@ describe('fluxCommunication tests', () => { wsserver = new WebSocket.Server({ host: '127.0.0.2', port: 16127 }); lruRateLimitStub.returns(true); sinon.stub(LRUCache.prototype, 'has').returns(false); - const verifyOriginalFluxBroadcastStub = sinon.stub(fluxCommunicationUtils, 'verifyOriginalFluxBroadcast').returns(true); + const verifyFluxBroadcast = sinon.stub(networkStateService, 'verifyBroadcast').returns(true); const storeAppTemporaryMessageStub = sinon.stub(appsService, 'storeAppTemporaryMessage').returns(false); daemonServiceMiscRpcsStub.returns({ data: @@ -1134,7 +1130,7 @@ describe('fluxCommunication tests', () => { // slight delay to let onopen to be triggered await serviceHelper.delay(100); - sinon.assert.calledOnceWithExactly(verifyOriginalFluxBroadcastStub, JSON.parse(message), undefined, sinon.match.number); + sinon.assert.calledOnceWithExactly(verifyFluxBroadcast, JSON.parse(message)); sinon.assert.calledOnceWithExactly(storeAppTemporaryMessageStub, JSON.parse(message).data, true); }); } @@ -1167,7 +1163,7 @@ describe('fluxCommunication tests', () => { wsserver = new WebSocket.Server({ host: '127.0.0.2', port: 16127 }); lruRateLimitStub.returns(true); sinon.stub(LRUCache.prototype, 'has').returns(false); - const verifyOriginalFluxBroadcastStub = sinon.stub(fluxCommunicationUtils, 'verifyOriginalFluxBroadcast').returns(true); + const verifyBroadcast = sinon.stub(networkStateService, 'verifyBroadcast').returns(true); const storeAppRunningMessageStub = sinon.stub(appsService, 'storeAppRunningMessage').returns(false); daemonServiceMiscRpcsStub.returns({ data: @@ -1182,7 +1178,7 @@ describe('fluxCommunication tests', () => { // slight delay to let onopen to be triggered await serviceHelper.delay(100); - sinon.assert.calledOnceWithExactly(verifyOriginalFluxBroadcastStub, JSON.parse(message), undefined, sinon.match.number); + sinon.assert.calledOnceWithExactly(verifyBroadcast, JSON.parse(message)); sinon.assert.calledOnceWithExactly(storeAppRunningMessageStub, JSON.parse(message).data); }); } @@ -1413,7 +1409,7 @@ describe('fluxCommunication tests', () => { ]; sinon.stub(fluxNetworkHelper, 'getMyFluxIPandPort').returns('44.192.51.11:16127'); fluxNetworkHelper.setMyFluxIp('44.192.51.11'); - sinon.stub(fluxCommunicationUtils, 'deterministicFluxList').returns(fluxNodeList); + sinon.stub(networkStateService, 'networkState').returns(fluxNodeList); sinon.stub(serviceHelper, 'delay').resolves(() => new Promise((resolve) => { setTimeout(resolve, 50); })); const infoSpy = sinon.spy(log, 'info'); daemonServiceStub.returns({ diff --git a/tests/unit/fluxNetworkHelper.test.js b/tests/unit/fluxNetworkHelper.test.js index 68fc44fa7..ddc59ad62 100644 --- a/tests/unit/fluxNetworkHelper.test.js +++ b/tests/unit/fluxNetworkHelper.test.js @@ -14,7 +14,7 @@ const daemonServiceUtils = require('../../ZelBack/src/services/daemonService/dae const daemonServiceBenchmarkRpcs = require('../../ZelBack/src/services/daemonService/daemonServiceBenchmarkRpcs'); const daemonServiceWalletRpcs = require('../../ZelBack/src/services/daemonService/daemonServiceWalletRpcs'); const daemonServiceFluxnodeRpcs = require('../../ZelBack/src/services/daemonService/daemonServiceFluxnodeRpcs'); -const fluxCommunicationUtils = require('../../ZelBack/src/services/fluxCommunicationUtils'); +const networkStateService = require('../../ZelBack/src/services/networkStateService'); const benchmarkService = require('../../ZelBack/src/services/benchmarkService'); const verificationHelper = require('../../ZelBack/src/services/verificationHelper'); @@ -472,7 +472,7 @@ describe('fluxNetworkHelper tests', () => { rank: 1, }, ]; - deterministicFluxListStub = sinon.stub(fluxCommunicationUtils, 'deterministicFluxList'); + deterministicFluxListStub = sinon.stub(networkStateService, 'networkState'); fluxNetworkHelper.setMyFluxIp('83.52.214.240:16167'); }); @@ -1094,7 +1094,7 @@ describe('fluxNetworkHelper tests', () => { rank: 0, }, ]; - sinon.stub(fluxCommunicationUtils, 'deterministicFluxList').returns(deterministicFluxnodeListResponse); + sinon.stub(networkStateService, 'networkState').returns(deterministicFluxnodeListResponse); sinon.stub(daemonServiceWalletRpcs, 'createConfirmationTransaction').returns(true); sinon.stub(serviceHelper, 'delay').returns(true); }); @@ -1340,7 +1340,7 @@ describe('fluxNetworkHelper tests', () => { }]; getBenchmarksStub = sinon.stub(daemonServiceBenchmarkRpcs, 'getBenchmarks'); isDaemonSyncedStub = sinon.stub(daemonServiceMiscRpcs, 'isDaemonSynced'); - deterministicFluxListStub = sinon.stub(fluxCommunicationUtils, 'deterministicFluxList'); + deterministicFluxListStub = sinon.stub(networkStateService, 'networkState'); getFluxNodeStatusStub = sinon.stub(daemonServiceFluxnodeRpcs, 'getFluxNodeStatus'); fluxNetworkHelper.setDosMessage(null); fluxNetworkHelper.setDosStateValue(0); diff --git a/tests/unit/fluxCommunicationUtils.test.js b/tests/unit/networkStateService.test.js similarity index 87% rename from tests/unit/fluxCommunicationUtils.test.js rename to tests/unit/networkStateService.test.js index bcc586b02..f43b2ca66 100644 --- a/tests/unit/fluxCommunicationUtils.test.js +++ b/tests/unit/networkStateService.test.js @@ -1,16 +1,17 @@ const chai = require('chai'); -const { LRUCache } = require('lru-cache'); -const sinon = require('sinon'); -const proxyquire = require('proxyquire'); const { expect } = chai; -let fluxCommunicationUtils = require('../../ZelBack/src/services/fluxCommunicationUtils'); + +const sinon = require('sinon'); + const fluxCommunicationMessagesSender = require('../../ZelBack/src/services/fluxCommunicationMessagesSender'); const daemonServiceFluxnodeRpcs = require('../../ZelBack/src/services/daemonService/daemonServiceFluxnodeRpcs'); const fluxList = require('./data/listfluxnodes.json'); -describe('fluxCommunicationUtils tests', () => { - describe('deterministicFluxList tests', () => { +const networkStateService = require('../../ZelBack/src/services/networkStateService'); + +describe('networkStateService tests', () => { + describe('networkState tests', () => { const deterministicFluxnodeListResponseBase = { data: [ { @@ -75,35 +76,27 @@ describe('fluxCommunicationUtils tests', () => { daemonStub = sinon.stub(daemonServiceFluxnodeRpcs, 'viewDeterministicFluxNodeList'); }); - afterEach(() => { - daemonStub.restore(); + afterEach(async () => { sinon.restore(); + await networkStateService.stop(); }); it('should return the whole list if the filter was not provided', async () => { - // Start with clear cache - fluxCommunicationUtils = proxyquire( - '../../ZelBack/src/services/fluxCommunicationUtils', - { 'lru-cache': LRUCache }, - ); const deterministicFluxnodeListResponse = { ...deterministicFluxnodeListResponseBase, status: 'success', }; daemonStub.resolves(deterministicFluxnodeListResponse); - const deterministicFluxListResult = await fluxCommunicationUtils.deterministicFluxList(); + await networkStateService.start(); + + const deterministicFluxListResult = networkStateService.networkState(); expect(deterministicFluxListResult).to.eql(deterministicFluxnodeListResponse.data); sinon.assert.calledOnce(daemonStub); }); it('should return the list filtered out with proper public key', async () => { - // Start with clear cache - fluxCommunicationUtils = proxyquire( - '../../ZelBack/src/services/fluxCommunicationUtils', - { 'lru-cache': LRUCache }, - ); const filteredPubKey = '04d50620a31f045c61be42bad44b7a9424ffb6de37bf256b88f00e118e59736165255f2f4585b36c7e1f8f3e20db4fa4e55e61cc01dc7a5cd2b2ed0153627588dc'; const expectedResult = [{ collateral: 'COutPoint(46c9ae0313fc128d0fb4327f5babc7868fe557035b58e0a7cb475cfd8819f8c7, 0)', @@ -148,18 +141,15 @@ describe('fluxCommunicationUtils tests', () => { }; daemonStub.resolves(deterministicFluxnodeListResponse); - const deterministicFluxListResult = await fluxCommunicationUtils.deterministicFluxList(filteredPubKey); + await networkStateService.start(); + + const deterministicFluxListResult = await networkStateService.getFluxnodesByPubkey(filteredPubKey); expect(deterministicFluxListResult).to.eql(expectedResult); sinon.assert.calledOnce(daemonStub); }); it('should return an empty list if the public key does not match', async () => { - // Start with clear cache - fluxCommunicationUtils = proxyquire( - '../../ZelBack/src/services/fluxCommunicationUtils', - { 'lru-cache': LRUCache }, - ); const filteredPubKey = '04d50620a31f045c61be42bad44b7a9424asdfde37bf256b88f00e118e59736165255f2f4585b36c7e1f8f3e20db4fa4e55e61cc01dc7a5cd2b2ed0153627588dc'; const expectedResult = []; @@ -169,29 +159,30 @@ describe('fluxCommunicationUtils tests', () => { }; daemonStub.resolves(deterministicFluxnodeListResponse); - const deterministicFluxListResult = await fluxCommunicationUtils.deterministicFluxList(filteredPubKey); + await networkStateService.start(); + + const deterministicFluxListResult = await networkStateService.getFluxnodesByPubkey(filteredPubKey); expect(deterministicFluxListResult).to.eql(expectedResult); - sinon.assert.calledOnce(daemonStub); + // once at start, once again to find non existent pubkey + sinon.assert.calledTwice(daemonStub); }); it('should get list from cache with no filter applied', async () => { - // Stub cache to simulate the actual lru-cache called - const getCacheStub = sinon.stub(); - const stubCache = sinon.stub().callsFake(() => ({ - get: getCacheStub, - })); - const stubCacheB = { LRUCache: stubCache }; - getCacheStub.withArgs('fluxList').returns(deterministicFluxnodeListResponseBase.data); - fluxCommunicationUtils = proxyquire( - '../../ZelBack/src/services/fluxCommunicationUtils', - { 'lru-cache': stubCacheB }, - ); - - const deterministicFluxListResult = await fluxCommunicationUtils.deterministicFluxList(); + const deterministicFluxnodeListResponse = { + ...deterministicFluxnodeListResponseBase, + status: 'success', + }; + daemonStub.resolves(deterministicFluxnodeListResponse); + + await networkStateService.start(); + + sinon.assert.calledOnce(daemonStub); + + const deterministicFluxListResult = networkStateService.networkState(); expect(deterministicFluxListResult).to.eql(deterministicFluxnodeListResponseBase.data); - sinon.assert.calledOnceWithExactly(getCacheStub, 'fluxList'); + sinon.assert.calledOnce(daemonStub); }); it('should get list from cache with filter applied', async () => { @@ -232,22 +223,21 @@ describe('fluxCommunicationUtils tests', () => { amount: '2000.00', rank: 1, }]; - // Stub cache to simulate the actual lru-cache called - const getCacheStub = sinon.stub(); - const stubCache = sinon.stub().callsFake(() => ({ - get: getCacheStub, - })); - const stubCacheB = { LRUCache: stubCache }; - getCacheStub.withArgs(`fluxList${filteredPubKey}`).returns(expectedResult); - fluxCommunicationUtils = proxyquire( - '../../ZelBack/src/services/fluxCommunicationUtils', - { 'lru-cache': stubCacheB }, - ); - - const deterministicFluxListResult = await fluxCommunicationUtils.deterministicFluxList(filteredPubKey); + + const deterministicFluxnodeListResponse = { + ...deterministicFluxnodeListResponseBase, + status: 'success', + }; + daemonStub.resolves(deterministicFluxnodeListResponse); + + await networkStateService.start(); + + sinon.assert.calledOnce(daemonStub); + + const deterministicFluxListResult = await networkStateService.getFluxnodesByPubkey(filteredPubKey); expect(deterministicFluxListResult).to.eql(expectedResult); - sinon.assert.calledOnceWithExactly(getCacheStub, `fluxList${filteredPubKey}`); + sinon.assert.calledOnce(daemonStub); }); });