Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
RunOnFluxBot committed Jan 18, 2024
1 parent fa68d5e commit 7362d83
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 97 deletions.
120 changes: 26 additions & 94 deletions services/appsService.js
Original file line number Diff line number Diff line change
Expand Up @@ -3364,8 +3364,11 @@ async function registerAppLocally(appSpecs, componentSpecs, res) {
}

if (!isComponent) {
// eslint-disable-next-line no-use-before-define
const dockerNetworkAddrValue = Math.floor(Math.random() * (254 - 0 + 1) + 0);
// last character of appName determines gateway
const lastCharCode = appName.charCodeAt(appName.length - 1);

const dockerNetworkAddrValue = lastCharCode;

const fluxNetworkStatus = {
status: `Checking Flux App network of ${appName}...`,
};
Expand Down Expand Up @@ -3735,8 +3738,10 @@ async function softRegisterAppLocally(appSpecs, componentSpecs, res) {
}

if (!isComponent) {
// eslint-disable-next-line no-use-before-define
const dockerNetworkAddrValue = Math.floor(Math.random() * (254 - 0 + 1) + 0);
// last character of appName determines gateway
const lastCharCode = appName.charCodeAt(appName.length - 1);

const dockerNetworkAddrValue = lastCharCode;

const fluxNetworkStatus = {
status: `Checking Flux App network of ${appName}...`,
Expand Down Expand Up @@ -6580,7 +6585,7 @@ async function storeIPChangedMessage(message) {
const db = dbHelper.databaseConnection();
const database = db.db(config.database.appsglobal.database);
const query = { ip: message.oldIP };
const update = { $set: { ip: message.newIP, broadcastedAt: new Date(message.broadcastedAt) } };
const update = { $set: { ip: message.newIP } };
await dbHelper.updateInDatabase(database, globalAppsLocations, query, update);

// all stored, rebroadcast
Expand Down Expand Up @@ -8981,32 +8986,8 @@ async function trySpawningGlobalApplication() {
/**
* To check and notify peers of running apps. Checks if apps are installed, stopped or running.
*/
let nodeConfirmedOnLastCheck = true;
async function checkAndNotifyPeersOfRunningApps() {
try {
const isNodeConfirmed = await generalService.isNodeStatusConfirmed();
if (!isNodeConfirmed) {
if (!nodeConfirmedOnLastCheck) {
const installedAppsRes = await installedApps();
if (installedAppsRes.status !== 'success') {
throw new Error('Failed to get installed Apps');
}
const appsInstalled = installedAppsRes.data;
// eslint-disable-next-line no-restricted-syntax
for (const installedApp of appsInstalled) {
log.info(`Application ${installedApp.name} going to be removed from node as the node is not confirmed on the network for more than 2 hours..`);
log.warn(`Removing application ${installedApp.name} locally`);
// eslint-disable-next-line no-await-in-loop
await removeAppLocally(installedApp.name, null, false, true, true);
log.warn(`Application ${installedApp.name} locally removed`);
// eslint-disable-next-line no-await-in-loop
await serviceHelper.delay(config.fluxapps.removal.delay * 1000); // wait for 6 mins so we don't have more removals at the same time
}
}
nodeConfirmedOnLastCheck = false;
return;
}
nodeConfirmedOnLastCheck = true;
// get my external IP and check that it is longer than 5 in length.
const benchmarkResponse = await daemonServiceBenchmarkRpcs.getBenchmarks();
let myIP = null;
Expand Down Expand Up @@ -9113,6 +9094,8 @@ async function checkAndNotifyPeersOfRunningApps() {
}
});
installedAndRunning.push(...masterSlaveAppsInstalled);
const syncStatus = daemonServiceMiscRpcs.isDaemonSynced();
const daemonHeight = syncStatus.data.height || 0;
const apps = [];
try {
// eslint-disable-next-line no-restricted-syntax
Expand All @@ -9136,7 +9119,7 @@ async function checkAndNotifyPeersOfRunningApps() {
// store it in local database first
// eslint-disable-next-line no-await-in-loop
await storeAppRunningMessage(newAppRunningMessage);
if (installedAndRunning.length === 1) {
if (daemonHeight < config.fluxapps.apprunningv2 || installedAndRunning.length === 1) {
// eslint-disable-next-line no-await-in-loop
await fluxCommunicationMessagesSender.broadcastMessageToOutgoing(newAppRunningMessage);
// eslint-disable-next-line no-await-in-loop
Expand All @@ -9146,7 +9129,7 @@ async function checkAndNotifyPeersOfRunningApps() {
// broadcast messages about running apps to all peers
}
}
if (installedAndRunning.length > 1) {
if (daemonHeight >= config.fluxapps.apprunningv2 && installedAndRunning.length > 1) {
// send v2 unique message instead
const newAppRunningMessageV2 = {
type: 'fluxapprunning',
Expand All @@ -9167,6 +9150,7 @@ async function checkAndNotifyPeersOfRunningApps() {
log.error(err);
// removeAppLocally(stoppedApp);
}

log.info('Running Apps broadcasted');
} catch (error) {
log.error(error);
Expand Down Expand Up @@ -9324,43 +9308,21 @@ async function checkAndRemoveApplicationInstance() {
// eslint-disable-next-line no-await-in-loop
const runningAppList = await getRunningAppList(installedApp.name);
const minInstances = installedApp.instances || config.fluxapps.minimumInstances; // introduced in v3 of apps specs
if (runningAppList.length > minInstances) {
if (runningAppList.length > (minInstances + config.fluxapps.maximumAdditionalInstances)) {
// eslint-disable-next-line no-await-in-loop
const appDetails = await getApplicationGlobalSpecifications(installedApp.name);
if (appDetails) {
log.info(`Application ${installedApp.name} is already spawned on ${runningAppList.length} instances. Checking if should be unninstalled from the FluxNode..`);
runningAppList.sort((a, b) => {
if (!a.runningSince && b.runningSince) {
return 1;
}
if (a.runningSince && !b.runningSince) {
return -1;
}
if (a.runningSince < b.runningSince) {
return 1;
}
if (a.runningSince > b.runningSince) {
return -1;
}
if (a.ip < b.ip) {
return 1;
}
if (a.ip > b.ip) {
return -1;
}
return 0;
});
// eslint-disable-next-line no-await-in-loop
const myIP = await fluxNetworkHelper.getMyFluxIPandPort();
const index = runningAppList.findIndex((x) => x.ip === myIP);
if (index === 0) {
log.info(`Application ${installedApp.name} going to be removed from node as it was the latest one running it to install it..`);
log.info(`Application ${installedApp.name} is already spawned on ${runningAppList.length} instances. Checking removal availability..`);
const randomNumber = Math.floor((Math.random() * config.fluxapps.removal.probability));
if (randomNumber === 0) {
log.warn(`Removing application ${installedApp.name} locally`);
// eslint-disable-next-line no-await-in-loop
await removeAppLocally(installedApp.name, null, false, true, true);
log.warn(`Application ${installedApp.name} locally removed`);
// eslint-disable-next-line no-await-in-loop
await serviceHelper.delay(config.fluxapps.removal.delay * 1000); // wait for 6 mins so we don't have more removals at the same time
} else {
log.info(`Other Fluxes are evaluating application ${installedApp.name} removal.`);
}
}
}
Expand Down Expand Up @@ -10369,18 +10331,6 @@ async function syncthingApps() {
// eslint-disable-next-line no-await-in-loop
const runningAppList = await getRunningAppList(installedApp.name);
runningAppList.sort((a, b) => {
if (!a.runningSince && b.runningSince) {
return -1;
}
if (a.runningSince && !b.runningSince) {
return 1;
}
if (a.runningSince < b.runningSince) {
return -1;
}
if (a.runningSince > b.runningSince) {
return 1;
}
if (a.broadcastedAt < b.broadcastedAt) {
return -1;
}
Expand Down Expand Up @@ -10412,13 +10362,10 @@ async function syncthingApps() {
if (cache.numberOfExecutions === cache.numberOfExecutionsRequired) {
syncthingFolder.type = 'sendreceive';
} else if (cache.numberOfExecutions === cache.numberOfExecutionsRequired + 1) {
log.info(`SyncthingApps changing syncthing type to sendreceive for appIdentifier ${appId}`);
log.info(`SyncthingApps starting appIdentifier ${appId}`);
syncthingFolder.type = 'sendreceive';
if (containerDataFlags.includes('r')) {
log.info(`SyncthingApps starting appIdentifier ${appId}`);
// eslint-disable-next-line no-await-in-loop
await appDockerRestart(id);
}
// eslint-disable-next-line no-await-in-loop
await appDockerRestart(id);
cache.restarted = true;
}
receiveOnlySyncthingAppsCache.set(appId, cache);
Expand Down Expand Up @@ -10556,18 +10503,6 @@ async function syncthingApps() {
const runningAppList = await getRunningAppList(installedApp.name);
log.info(`SyncthingApps appIdentifier ${appId} is running on nodes ${JSON.stringify(runningAppList)}`);
runningAppList.sort((a, b) => {
if (!a.runningSince && b.runningSince) {
return -1;
}
if (a.runningSince && !b.runningSince) {
return 1;
}
if (a.runningSince < b.runningSince) {
return -1;
}
if (a.runningSince > b.runningSince) {
return 1;
}
if (a.broadcastedAt < b.broadcastedAt) {
return -1;
}
Expand Down Expand Up @@ -10603,11 +10538,8 @@ async function syncthingApps() {
} else if (cache.numberOfExecutions === cache.numberOfExecutionsRequired + 1) {
log.info(`SyncthingApps starting appIdentifier ${appId}`);
syncthingFolder.type = 'sendreceive';
if (containerDataFlags.includes('r')) {
log.info(`SyncthingApps starting appIdentifier ${appId}`);
// eslint-disable-next-line no-await-in-loop
await appDockerRestart(id);
}
// eslint-disable-next-line no-await-in-loop
await appDockerRestart(id);
cache.restarted = true;
}
receiveOnlySyncthingAppsCache.set(appId, cache);
Expand Down
5 changes: 2 additions & 3 deletions services/serviceManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ async function startFluxFunctions() {
await databaseTemp.collection(config.database.appsglobal.collections.appsTemporaryMessages).createIndex({ receivedAt: 1 }, { expireAfterSeconds: 3600 }); // todo longer time? dropIndexes()
log.info('Temporary database prepared');
log.info('Preparing Flux Apps locations');
await databaseTemp.collection(config.database.appsglobal.collections.appsLocations).dropIndex({ broadcastedAt: 1 });
// more than 2 hours and 5m. Meaning we have not received status message for a long time. So that node is no longer on a network or app is down.
await databaseTemp.collection(config.database.appsglobal.collections.appsLocations).createIndex({ broadcastedAt: 1 }, { expireAfterSeconds: 7500 });
// more than 1 hour. Meaning we have not received status message for a long time. So that node is no longer on a network or app is down.
await databaseTemp.collection(config.database.appsglobal.collections.appsLocations).createIndex({ broadcastedAt: 1 }, { expireAfterSeconds: 3900 });
log.info('Flux Apps locations prepared');
fluxNetworkHelper.adjustFirewall();
log.info('Firewalls checked');
Expand Down

0 comments on commit 7362d83

Please sign in to comment.