Skip to content

Commit

Permalink
Merge pull request #1166 from RunOnFlux/development
Browse files Browse the repository at this point in the history
v4.20.2
  • Loading branch information
TheTrunk authored Nov 30, 2023
2 parents d8b0a80 + 47d7e40 commit 4987248
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 16 deletions.
58 changes: 45 additions & 13 deletions ZelBack/src/services/fluxCommunication.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,18 @@ async function handleAppMessages(message, fromIP, port) {
const rebroadcastToPeers = await appsService.storeAppTemporaryMessage(message.data, true);
if (rebroadcastToPeers === true) {
const messageString = serviceHelper.ensureString(message);
const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIP && client.port !== port);
const wsListOut = outgoingConnections;
const outPeerIndex = outgoingConnections.findIndex((client) => client._socket.remoteAddress === fromIP && client.port === port);
if (outPeerIndex >= 0) {
wsListOut.splice(outPeerIndex, 1);
}
fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut);
await serviceHelper.delay(100);
const wsList = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIP && client.port !== port);
await serviceHelper.delay(500);
const wsList = incomingConnections;
const incPeerIndex = incomingConnections.findIndex((client) => client._socket.remoteAddress.replace('::ffff:', '') === fromIP && client.port === port);
if (incPeerIndex >= 0) {
wsList.splice(incPeerIndex, 1);
}
fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList);
}
} catch (error) {
Expand All @@ -102,10 +110,18 @@ async function handleAppRunningMessage(message, fromIP, port) {
const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(message, currentTimeStamp, 240000);
if (rebroadcastToPeers === true && timestampOK) {
const messageString = serviceHelper.ensureString(message);
const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIP && client.port !== port);
const wsListOut = outgoingConnections;
const outPeerIndex = outgoingConnections.findIndex((client) => client._socket.remoteAddress === fromIP && client.port === port);
if (outPeerIndex >= 0) {
wsListOut.splice(outPeerIndex, 1);
}
fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut);
await serviceHelper.delay(500);
const wsList = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIP && client.port !== port);
const wsList = incomingConnections;
const incPeerIndex = incomingConnections.findIndex((client) => client._socket.remoteAddress.replace('::ffff:', '') === fromIP && client.port === port);
if (incPeerIndex >= 0) {
wsList.splice(incPeerIndex, 1);
}
fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList);
}
} catch (error) {
Expand All @@ -130,10 +146,18 @@ async function handleIPChangedMessage(message, fromIP, port) {
const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(message, currentTimeStamp, 240000);
if (rebroadcastToPeers && timestampOK) {
const messageString = serviceHelper.ensureString(message);
const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIP && client.port !== port);
const wsListOut = outgoingConnections;
const outPeerIndex = outgoingConnections.findIndex((client) => client._socket.remoteAddress === fromIP && client.port === port);
if (outPeerIndex >= 0) {
wsListOut.splice(outPeerIndex, 1);
}
fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut);
await serviceHelper.delay(500);
const wsList = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIP && client.port !== port);
const wsList = incomingConnections;
const incPeerIndex = incomingConnections.findIndex((client) => client._socket.remoteAddress.replace('::ffff:', '') === fromIP && client.port === port);
if (incPeerIndex >= 0) {
wsList.splice(incPeerIndex, 1);
}
fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList);
}
} catch (error) {
Expand All @@ -158,10 +182,18 @@ async function handleAppRemovedMessage(message, fromIP, port) {
const timestampOK = fluxCommunicationUtils.verifyTimestampInFluxBroadcast(message, currentTimeStamp, 240000);
if (rebroadcastToPeers && timestampOK) {
const messageString = serviceHelper.ensureString(message);
const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIP && client.port !== port);
const wsListOut = outgoingConnections;
const outPeerIndex = outgoingConnections.findIndex((client) => client._socket.remoteAddress === fromIP && client.port === port);
if (outPeerIndex >= 0) {
wsListOut.splice(outPeerIndex, 1);
}
fluxCommunicationMessagesSender.sendToAllPeers(messageString, wsListOut);
await serviceHelper.delay(500);
const wsList = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIP && client.port !== port);
const wsList = incomingConnections;
const incPeerIndex = incomingConnections.findIndex((client) => client._socket.remoteAddress.replace('::ffff:', '') === fromIP && client.port === port);
if (incPeerIndex >= 0) {
wsList.splice(incPeerIndex, 1);
}
fluxCommunicationMessagesSender.sendToAllIncomingConnections(messageString, wsList);
}
} catch (error) {
Expand Down Expand Up @@ -302,9 +334,9 @@ function handleIncomingConnection(websocket, req, expressWS) {
}
});
ws.on('error', async (msg) => {
const ip = ws._socket.remoteAddress;
const ip = ws._socket.remoteAddress.replace('::ffff:', '');
log.warn(`Incoming connection error ${ip}:${port}`);
const ocIndex = incomingConnections.findIndex((incomingCon) => ws._socket.remoteAddress === incomingCon._socket.remoteAddress && ws.port === incomingCon.port);
const ocIndex = incomingConnections.findIndex((incomingCon) => ip === incomingCon._socket.remoteAddress.replace('::ffff:', '') && ws.port === incomingCon.port);
const foundPeer = incomingPeers.find((mypeer) => mypeer.ip === ip && mypeer.port === port);
if (ocIndex > -1) {
incomingConnections.splice(ocIndex, 1);
Expand All @@ -318,9 +350,9 @@ function handleIncomingConnection(websocket, req, expressWS) {
log.warn(`Incoming connection errored with: ${msg}`);
});
ws.on('close', async (msg) => {
const ip = ws._socket.remoteAddress;
const ip = ws._socket.remoteAddress.replace('::ffff:', '');
log.warn(`Incoming connection close ${ip}:${port}`);
const ocIndex = incomingConnections.findIndex((incomingCon) => ws._socket.remoteAddress === incomingCon._socket.remoteAddress && ws.port === incomingCon.port);
const ocIndex = incomingConnections.findIndex((incomingCon) => ip === incomingCon._socket.remoteAddress.replace('::ffff:', '') && ws.port === incomingCon.port);
const foundPeer = incomingPeers.find((mypeer) => mypeer.ip === ip && mypeer.port === port);
if (ocIndex > -1) {
incomingConnections.splice(ocIndex, 1);
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "flux",
"version": "4.20.1",
"version": "4.20.2",
"description": "Flux, Your Gateway to a Decentralized World",
"repository": {
"type": "git",
Expand Down
12 changes: 10 additions & 2 deletions tests/unit/fluxCommunication.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,16 @@ describe('fluxCommunication tests', () => {
incomingConnections.push(wsIncoming);

const messageString = JSON.stringify(message);
const wsListOut = outgoingConnections.filter((client) => client._socket.remoteAddress !== fromIp && client.port !== port);
const wsListIn = incomingConnections.filter((client) => client._socket.remoteAddress.replace('::ffff:', '') !== fromIp && client.port !== port);
const wsListOut = outgoingConnections;
const outPeerIndex = outgoingConnections.findIndex((client) => client._socket.remoteAddress === fromIp && client.port === port);
if (outPeerIndex >= 0) {
wsListOut.splice(outPeerIndex, 1);
}
const wsListIn = incomingConnections;
const incPeerIndex = incomingConnections.findIndex((client) => client._socket.remoteAddress.replace('::ffff:', '') === fromIp && client.port === port);
if (incPeerIndex >= 0) {
wsListIn.splice(incPeerIndex, 1);
}

await fluxCommunication.handleAppMessages(message, fromIp, port);

Expand Down

0 comments on commit 4987248

Please sign in to comment.