diff --git a/ZelBack/src/services/fluxCommunication.js b/ZelBack/src/services/fluxCommunication.js index ca269b576..75f2042f2 100644 --- a/ZelBack/src/services/fluxCommunication.js +++ b/ZelBack/src/services/fluxCommunication.js @@ -73,10 +73,18 @@ async function handleAppMessages(message, fromIP, port) { const rebroadcastToPeers = await appsService.storeAppTemporaryMessage(message.data, true); if (rebroadcastToPeers === true) { const messageString = serviceHelper.ensureString(message); - const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIP && client.port !== port); + const wsListOut = outgoingConnections; + const outPeerIndex = outgoingConnections.findIndex((client) => client._socket.remoteAddress === fromIP && client.port === port); + if (outPeerIndex >= 0) { + wsListOut.splice(outPeerIndex, 1); + } fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); - await serviceHelper.delay(100); - const wsList = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIP && client.port !== port); + await serviceHelper.delay(500); + const wsList = incomingConnections; + const incPeerIndex = incomingConnections.findIndex((client) => client._socket.remoteAddress.replace('::ffff:', '') === fromIP && client.port === port); + if (incPeerIndex >= 0) { + wsList.splice(incPeerIndex, 1); + } fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); } } catch (error) { @@ -102,10 +110,18 @@ async function handleAppRunningMessage(message, fromIP, port) { const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(message, currentTimeStamp, 240000); if (rebroadcastToPeers === true && timestampOK) { const messageString = serviceHelper.ensureString(message); - const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIP && client.port !== port); + const wsListOut = outgoingConnections; + const outPeerIndex = outgoingConnections.findIndex((client) => client._socket.remoteAddress === fromIP && client.port === port); + if (outPeerIndex >= 0) { + wsListOut.splice(outPeerIndex, 1); + } fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); await serviceHelper.delay(500); - const wsList = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIP && client.port !== port); + const wsList = incomingConnections; + const incPeerIndex = incomingConnections.findIndex((client) => client._socket.remoteAddress.replace('::ffff:', '') === fromIP && client.port === port); + if (incPeerIndex >= 0) { + wsList.splice(incPeerIndex, 1); + } fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); } } catch (error) { @@ -130,10 +146,18 @@ async function handleIPChangedMessage(message, fromIP, port) { const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(message, currentTimeStamp, 240000); if (rebroadcastToPeers && timestampOK) { const messageString = serviceHelper.ensureString(message); - const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIP && client.port !== port); + const wsListOut = outgoingConnections; + const outPeerIndex = outgoingConnections.findIndex((client) => client._socket.remoteAddress === fromIP && client.port === port); + if (outPeerIndex >= 0) { + wsListOut.splice(outPeerIndex, 1); + } fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); await serviceHelper.delay(500); - const wsList = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIP && client.port !== port); + const wsList = incomingConnections; + const incPeerIndex = incomingConnections.findIndex((client) => client._socket.remoteAddress.replace('::ffff:', '') === fromIP && client.port === port); + if (incPeerIndex >= 0) { + wsList.splice(incPeerIndex, 1); + } fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); } } catch (error) { @@ -158,10 +182,18 @@ async function handleAppRemovedMessage(message, fromIP, port) { const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(message, currentTimeStamp, 240000); if (rebroadcastToPeers && timestampOK) { const messageString = serviceHelper.ensureString(message); - const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIP && client.port !== port); + const wsListOut = outgoingConnections; + const outPeerIndex = outgoingConnections.findIndex((client) => client._socket.remoteAddress === fromIP && client.port === port); + if (outPeerIndex >= 0) { + wsListOut.splice(outPeerIndex, 1); + } fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut); await serviceHelper.delay(500); - const wsList = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIP && client.port !== port); + const wsList = incomingConnections; + const incPeerIndex = incomingConnections.findIndex((client) => client._socket.remoteAddress.replace('::ffff:', '') === fromIP && client.port === port); + if (incPeerIndex >= 0) { + wsList.splice(incPeerIndex, 1); + } fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList); } } catch (error) { @@ -302,9 +334,9 @@ function handleIncomingConnection(websocket, req, expressWS) { } }); ws.on('error', async (msg) => { - const ip = ws._socket.remoteAddress; + const ip = ws._socket.remoteAddress.replace('::ffff:', ''); log.warn(`Incoming connection error ${ip}:${port}`); - const ocIndex = incomingConnections.findIndex((incomingCon) => ws._socket.remoteAddress === incomingCon._socket.remoteAddress && ws.port === incomingCon.port); + const ocIndex = incomingConnections.findIndex((incomingCon) => ip === incomingCon._socket.remoteAddress.replace('::ffff:', '') && ws.port === incomingCon.port); const foundPeer = incomingPeers.find((mypeer) => mypeer.ip === ip && mypeer.port === port); if (ocIndex > -1) { incomingConnections.splice(ocIndex, 1); @@ -318,9 +350,9 @@ function handleIncomingConnection(websocket, req, expressWS) { log.warn(`Incoming connection errored with: ${msg}`); }); ws.on('close', async (msg) => { - const ip = ws._socket.remoteAddress; + const ip = ws._socket.remoteAddress.replace('::ffff:', ''); log.warn(`Incoming connection close ${ip}:${port}`); - const ocIndex = incomingConnections.findIndex((incomingCon) => ws._socket.remoteAddress === incomingCon._socket.remoteAddress && ws.port === incomingCon.port); + const ocIndex = incomingConnections.findIndex((incomingCon) => ip === incomingCon._socket.remoteAddress.replace('::ffff:', '') && ws.port === incomingCon.port); const foundPeer = incomingPeers.find((mypeer) => mypeer.ip === ip && mypeer.port === port); if (ocIndex > -1) { incomingConnections.splice(ocIndex, 1); diff --git a/package.json b/package.json index c824e4ba8..a5cee18cf 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "flux", - "version": "4.20.1", + "version": "4.20.2", "description": "Flux, Your Gateway to a Decentralized World", "repository": { "type": "git", diff --git a/tests/unit/fluxCommunication.test.js b/tests/unit/fluxCommunication.test.js index 240248f31..0d92ea2a4 100644 --- a/tests/unit/fluxCommunication.test.js +++ b/tests/unit/fluxCommunication.test.js @@ -123,8 +123,16 @@ describe('fluxCommunication tests', () => { incomingConnections.push(wsIncoming); const messageString = JSON.stringify(message); - const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIp && client.port !== port); - const wsListIn = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIp && client.port !== port); + const wsListOut = outgoingConnections; + const outPeerIndex = outgoingConnections.findIndex((client) => client._socket.remoteAddress === fromIp && client.port === port); + if (outPeerIndex >= 0) { + wsListOut.splice(outPeerIndex, 1); + } + const wsListIn = incomingConnections; + const incPeerIndex = incomingConnections.findIndex((client) => client._socket.remoteAddress.replace('::ffff:', '') === fromIp && client.port === port); + if (incPeerIndex >= 0) { + wsListIn.splice(incPeerIndex, 1); + } await fluxCommunication.handleAppMessages(message, fromIp, port);