Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
RunOnFluxBot committed Jan 20, 2024
1 parent 3f1befe commit 00dd6e8
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 124 deletions.
127 changes: 27 additions & 100 deletions services/appsService.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ let masterSlaveAppsRunning = false;

const hashesNumberOfSearchs = new Map();

const appsThatMightBeUsingOldGatewayIpAssignment = ['HNSDoH', 'dane', 'fdm', 'Jetpack2', 'fdmdedicated', 'isokosse', 'ChainBraryDApp', 'health', 'ethercalc'];

const nodeSpecs = {
cpuCores: 0,
ram: 0,
Expand Down Expand Up @@ -3366,10 +3364,11 @@ async function registerAppLocally(appSpecs, componentSpecs, res) {
}

if (!isComponent) {
let dockerNetworkAddrValue = Math.floor(Math.random() * (254 - 0 + 1) + 0);
if (appsThatMightBeUsingOldGatewayIpAssignment.includes(appName)) {
dockerNetworkAddrValue = appName.charCodeAt(appName.length - 1);
}
// last character of appName determines gateway
const lastCharCode = appName.charCodeAt(appName.length - 1);

const dockerNetworkAddrValue = lastCharCode;

const fluxNetworkStatus = {
status: `Checking Flux App network of ${appName}...`,
};
Expand Down Expand Up @@ -3739,10 +3738,11 @@ async function softRegisterAppLocally(appSpecs, componentSpecs, res) {
}

if (!isComponent) {
let dockerNetworkAddrValue = Math.floor(Math.random() * (254 - 0 + 1) + 0);
if (appsThatMightBeUsingOldGatewayIpAssignment.includes(appName)) {
dockerNetworkAddrValue = appName.charCodeAt(appName.length - 1);
}
// last character of appName determines gateway
const lastCharCode = appName.charCodeAt(appName.length - 1);

const dockerNetworkAddrValue = lastCharCode;

const fluxNetworkStatus = {
status: `Checking Flux App network of ${appName}...`,
};
Expand Down Expand Up @@ -6585,7 +6585,7 @@ async function storeIPChangedMessage(message) {
const db = dbHelper.databaseConnection();
const database = db.db(config.database.appsglobal.database);
const query = { ip: message.oldIP };
const update = { $set: { ip: message.newIP, broadcastedAt: new Date(message.broadcastedAt) } };
const update = { $set: { ip: message.newIP } };
await dbHelper.updateInDatabase(database, globalAppsLocations, query, update);

// all stored, rebroadcast
Expand Down Expand Up @@ -8986,32 +8986,8 @@ async function trySpawningGlobalApplication() {
/**
* To check and notify peers of running apps. Checks if apps are installed, stopped or running.
*/
let nodeConfirmedOnLastCheck = true;
async function checkAndNotifyPeersOfRunningApps() {
try {
const isNodeConfirmed = await generalService.isNodeStatusConfirmed();
if (!isNodeConfirmed) {
if (!nodeConfirmedOnLastCheck) {
const installedAppsRes = await installedApps();
if (installedAppsRes.status !== 'success') {
throw new Error('Failed to get installed Apps');
}
const appsInstalled = installedAppsRes.data;
// eslint-disable-next-line no-restricted-syntax
for (const installedApp of appsInstalled) {
log.info(`Application ${installedApp.name} going to be removed from node as the node is not confirmed on the network for more than 2 hours..`);
log.warn(`Removing application ${installedApp.name} locally`);
// eslint-disable-next-line no-await-in-loop
await removeAppLocally(installedApp.name, null, false, true, true);
log.warn(`Application ${installedApp.name} locally removed`);
// eslint-disable-next-line no-await-in-loop
await serviceHelper.delay(config.fluxapps.removal.delay * 1000); // wait for 6 mins so we don't have more removals at the same time
}
}
nodeConfirmedOnLastCheck = false;
return;
}
nodeConfirmedOnLastCheck = true;
// get my external IP and check that it is longer than 5 in length.
const benchmarkResponse = await daemonServiceBenchmarkRpcs.getBenchmarks();
let myIP = null;
Expand Down Expand Up @@ -9118,6 +9094,8 @@ async function checkAndNotifyPeersOfRunningApps() {
}
});
installedAndRunning.push(...masterSlaveAppsInstalled);
const syncStatus = daemonServiceMiscRpcs.isDaemonSynced();
const daemonHeight = syncStatus.data.height || 0;
const apps = [];
try {
// eslint-disable-next-line no-restricted-syntax
Expand All @@ -9141,7 +9119,7 @@ async function checkAndNotifyPeersOfRunningApps() {
// store it in local database first
// eslint-disable-next-line no-await-in-loop
await storeAppRunningMessage(newAppRunningMessage);
if (installedAndRunning.length === 1) {
if (daemonHeight < config.fluxapps.apprunningv2 || installedAndRunning.length === 1) {
// eslint-disable-next-line no-await-in-loop
await fluxCommunicationMessagesSender.broadcastMessageToOutgoing(newAppRunningMessage);
// eslint-disable-next-line no-await-in-loop
Expand All @@ -9151,7 +9129,7 @@ async function checkAndNotifyPeersOfRunningApps() {
// broadcast messages about running apps to all peers
}
}
if (installedAndRunning.length > 1) {
if (daemonHeight >= config.fluxapps.apprunningv2 && installedAndRunning.length > 1) {
// send v2 unique message instead
const newAppRunningMessageV2 = {
type: 'fluxapprunning',
Expand All @@ -9172,6 +9150,7 @@ async function checkAndNotifyPeersOfRunningApps() {
log.error(err);
// removeAppLocally(stoppedApp);
}

log.info('Running Apps broadcasted');
} catch (error) {
log.error(error);
Expand Down Expand Up @@ -9329,43 +9308,21 @@ async function checkAndRemoveApplicationInstance() {
// eslint-disable-next-line no-await-in-loop
const runningAppList = await getRunningAppList(installedApp.name);
const minInstances = installedApp.instances || config.fluxapps.minimumInstances; // introduced in v3 of apps specs
if (runningAppList.length > minInstances) {
if (runningAppList.length > (minInstances + config.fluxapps.maximumAdditionalInstances)) {
// eslint-disable-next-line no-await-in-loop
const appDetails = await getApplicationGlobalSpecifications(installedApp.name);
if (appDetails) {
log.info(`Application ${installedApp.name} is already spawned on ${runningAppList.length} instances. Checking if should be unninstalled from the FluxNode..`);
runningAppList.sort((a, b) => {
if (!a.runningSince && b.runningSince) {
return 1;
}
if (a.runningSince && !b.runningSince) {
return -1;
}
if (a.runningSince < b.runningSince) {
return 1;
}
if (a.runningSince > b.runningSince) {
return -1;
}
if (a.ip < b.ip) {
return 1;
}
if (a.ip > b.ip) {
return -1;
}
return 0;
});
// eslint-disable-next-line no-await-in-loop
const myIP = await fluxNetworkHelper.getMyFluxIPandPort();
const index = runningAppList.findIndex((x) => x.ip === myIP);
if (index === 0) {
log.info(`Application ${installedApp.name} going to be removed from node as it was the latest one running it to install it..`);
log.info(`Application ${installedApp.name} is already spawned on ${runningAppList.length} instances. Checking removal availability..`);
const randomNumber = Math.floor((Math.random() * config.fluxapps.removal.probability));
if (randomNumber === 0) {
log.warn(`Removing application ${installedApp.name} locally`);
// eslint-disable-next-line no-await-in-loop
await removeAppLocally(installedApp.name, null, false, true, true);
log.warn(`Application ${installedApp.name} locally removed`);
// eslint-disable-next-line no-await-in-loop
await serviceHelper.delay(config.fluxapps.removal.delay * 1000); // wait for 6 mins so we don't have more removals at the same time
} else {
log.info(`Other Fluxes are evaluating application ${installedApp.name} removal.`);
}
}
}
Expand Down Expand Up @@ -10374,18 +10331,6 @@ async function syncthingApps() {
// eslint-disable-next-line no-await-in-loop
const runningAppList = await getRunningAppList(installedApp.name);
runningAppList.sort((a, b) => {
if (!a.runningSince && b.runningSince) {
return -1;
}
if (a.runningSince && !b.runningSince) {
return 1;
}
if (a.runningSince < b.runningSince) {
return -1;
}
if (a.runningSince > b.runningSince) {
return 1;
}
if (a.broadcastedAt < b.broadcastedAt) {
return -1;
}
Expand Down Expand Up @@ -10417,13 +10362,10 @@ async function syncthingApps() {
if (cache.numberOfExecutions === cache.numberOfExecutionsRequired) {
syncthingFolder.type = 'sendreceive';
} else if (cache.numberOfExecutions === cache.numberOfExecutionsRequired + 1) {
log.info(`SyncthingApps changing syncthing type to sendreceive for appIdentifier ${appId}`);
log.info(`SyncthingApps starting appIdentifier ${appId}`);
syncthingFolder.type = 'sendreceive';
if (containerDataFlags.includes('r')) {
log.info(`SyncthingApps starting appIdentifier ${appId}`);
// eslint-disable-next-line no-await-in-loop
await appDockerRestart(id);
}
// eslint-disable-next-line no-await-in-loop
await appDockerRestart(id);
cache.restarted = true;
}
receiveOnlySyncthingAppsCache.set(appId, cache);
Expand Down Expand Up @@ -10561,18 +10503,6 @@ async function syncthingApps() {
const runningAppList = await getRunningAppList(installedApp.name);
log.info(`SyncthingApps appIdentifier ${appId} is running on nodes ${JSON.stringify(runningAppList)}`);
runningAppList.sort((a, b) => {
if (!a.runningSince && b.runningSince) {
return -1;
}
if (a.runningSince && !b.runningSince) {
return 1;
}
if (a.runningSince < b.runningSince) {
return -1;
}
if (a.runningSince > b.runningSince) {
return 1;
}
if (a.broadcastedAt < b.broadcastedAt) {
return -1;
}
Expand Down Expand Up @@ -10608,11 +10538,8 @@ async function syncthingApps() {
} else if (cache.numberOfExecutions === cache.numberOfExecutionsRequired + 1) {
log.info(`SyncthingApps starting appIdentifier ${appId}`);
syncthingFolder.type = 'sendreceive';
if (containerDataFlags.includes('r')) {
log.info(`SyncthingApps starting appIdentifier ${appId}`);
// eslint-disable-next-line no-await-in-loop
await appDockerRestart(id);
}
// eslint-disable-next-line no-await-in-loop
await appDockerRestart(id);
cache.restarted = true;
}
receiveOnlySyncthingAppsCache.set(appId, cache);
Expand Down
20 changes: 0 additions & 20 deletions services/fluxNetworkHelper.js
Original file line number Diff line number Diff line change
Expand Up @@ -1293,9 +1293,6 @@ async function adjustFirewall() {
}
}

/**
* To clean a firewall deny policies, and delete them from it.
*/
async function purgeUFW() {
try {
const cmdAsync = util.promisify(nodecmd.get);
Expand Down Expand Up @@ -1331,22 +1328,6 @@ async function purgeUFW() {
}
}

/**
* This fix a docker security issue where docker containers can access host network, for example to create port forwarding on hosts
*/
async function removeDockerContainerAccessToHost() {
try {
const cmdAsync = util.promisify(nodecmd.get);
const dropAccessToHostNetwork = "sudo iptables -I DOCKER-USER -d $(ip route | grep \"src $(ip addr show dev $(ip route | awk '/default/ {print $5}') | grep \"inet\" | awk 'NR==1{print $2}' | cut -d'/' -f 1)\" | awk '{print $1}') -j DROP";
await cmdAsync(dropAccessToHostNetwork).catch((error) => log.error(`Error executing dropAccessToHostNetwork command:${error}`));
const giveHostAccessToDockerNetwork = "sudo iptables -I FORWARD -i DOCKER-USER -d $(ip route | grep \"src $(ip addr show dev $(ip route | awk '/default/ {print $5}') | grep \"inet\" | awk 'NR==1{print $2}' | cut -d'/' -f 1)\" | awk '{print $1}') -m state --state ESTABLISHED,RELATED -j ACCEPT";
await cmdAsync(giveHostAccessToDockerNetwork).catch((error) => log.error(`Error executing giveHostAccessToDockerNetwork command:${error}`));
log.info('Access to host from containers removed');
} catch (error) {
log.error(error);
}
}

const lruRateOptions = {
max: 500,
ttl: 1000 * 15, // 15 seconds
Expand Down Expand Up @@ -1477,5 +1458,4 @@ module.exports = {
isPortUserBlocked,
allowNodeToBindPrivilegedPorts,
installNetcat,
removeDockerContainerAccessToHost,
};
6 changes: 2 additions & 4 deletions services/serviceManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ async function startFluxFunctions() {
await databaseTemp.collection(config.database.appsglobal.collections.appsTemporaryMessages).createIndex({ receivedAt: 1 }, { expireAfterSeconds: 3600 }); // todo longer time? dropIndexes()
log.info('Temporary database prepared');
log.info('Preparing Flux Apps locations');
await databaseTemp.collection(config.database.appsglobal.collections.appsLocations).dropIndex({ broadcastedAt: 1 });
// more than 2 hours and 5m. Meaning we have not received status message for a long time. So that node is no longer on a network or app is down.
await databaseTemp.collection(config.database.appsglobal.collections.appsLocations).createIndex({ broadcastedAt: 1 }, { expireAfterSeconds: 7500 });
// more than 1 hour. Meaning we have not received status message for a long time. So that node is no longer on a network or app is down.
await databaseTemp.collection(config.database.appsglobal.collections.appsLocations).createIndex({ broadcastedAt: 1 }, { expireAfterSeconds: 3900 });
log.info('Flux Apps locations prepared');
fluxNetworkHelper.adjustFirewall();
log.info('Firewalls checked');
Expand All @@ -94,7 +93,6 @@ async function startFluxFunctions() {
setTimeout(() => {
log.info('Rechecking firewall app rules');
fluxNetworkHelper.purgeUFW();
fluxNetworkHelper.removeDockerContainerAccessToHost();
appsService.testAppMount(); // test if our node can mount a volume
}, 30 * 1000);
setTimeout(() => {
Expand Down

0 comments on commit 00dd6e8

Please sign in to comment.