Skip to content

Commit

Permalink
Worker page, with warnings.
Browse files Browse the repository at this point in the history
  • Loading branch information
guydavis committed Aug 24, 2021
1 parent 7ff47a5 commit edc6e51
Show file tree
Hide file tree
Showing 12 changed files with 380 additions and 40 deletions.
7 changes: 5 additions & 2 deletions api/commands/chiadog_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ def dispatch_action(job):
else:
raise Exception("Unsupported action {0} for monitoring.".format(action))

def start_chiadog():
def start_chiadog(chain = None):
#app.logger.info("Starting monitoring....")
blockchains = [ b.strip() for b in os.environ['blockchains'].split(',') ]
if chain:
blockchains = [ chain ]
else:
blockchains = [ b.strip() for b in os.environ['blockchains'].split(',') ]
for blockchain in blockchains:
try:
workdir = "/{0}dog".format(blockchain)
Expand Down
2 changes: 1 addition & 1 deletion common/models/farms.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ class Farm(db.Model):
total_flax = sa.Column(sa.REAL, nullable=True)
flax_netspace_size = sa.Column(sa.REAL, nullable=True) # GiB
flax_expected_time_to_win = sa.Column(sa.String(length=64), nullable=True)

created_at = sa.Column(sa.DateTime(), server_default=func.now())
updated_at = sa.Column(sa.DateTime(), onupdate=func.now())
8 changes: 6 additions & 2 deletions web/actions/chia.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ def load_farm_summary():
farms = db.session.query(f.Farm).order_by(f.Farm.hostname).all()
return FarmSummary(farms)

def load_plots_farming():
plots = db.session.query(p.Plot).order_by(p.Plot.created_at.desc()).all()
def load_plots_farming(hostname=None):
query = db.session.query(p.Plot).order_by(p.Plot.created_at.desc())
if hostname:
plots = query.filter(p.Plot.hostname==hostname)
else:
plots = query.all()
return FarmPlots(plots)

def recent_challenges():
Expand Down
8 changes: 6 additions & 2 deletions web/actions/plotman.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@
# Don't query plotman unless at least this long since last time.
RELOAD_MINIMUM_SECS = 30

def load_plotting_summary():
plottings = db.session.query(pl.Plotting).all()
def load_plotting_summary(hostname=None):
query = db.session.query(pl.Plotting)
if hostname:
plottings = query.filter(pl.Plotting.hostname==hostname)
else:
plottings = query.all()
return PlottingSummary(plottings)

def load_plotters():
Expand Down
49 changes: 25 additions & 24 deletions web/actions/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,23 @@ def plot_count_diff(since):
result = ''
try:
latest = db.session.query(StatPlotCount).order_by(StatPlotCount.created_at.desc()).limit(1).first()
#app.logger.info(latest.value)
#app.logger.debug(latest.value)
before = db.session.query(StatPlotCount).filter(StatPlotCount.created_at <= since).order_by(StatPlotCount.created_at.desc()).limit(1).first()
#app.logger.info(before.value)
#app.logger.debug(before.value)
if (latest.value - before.value) != 0:
result = "%+0g in last day." % (latest.value - before.value)
except Exception as ex:
app.logger.info("Failed to query for day diff of plot_count because {0}".format(str(ex)))
#app.logger.info("Result is: {0}".format(result))
app.logger.debug("Failed to query for day diff of plot_count because {0}".format(str(ex)))
#app.logger.debug("Result is: {0}".format(result))
return result

def plots_size_diff(since):
result = ''
try:
latest = db.session.query(StatPlotsSize).order_by(StatPlotsSize.created_at.desc()).limit(1).first()
#app.logger.info(latest.value)
#app.logger.debug(latest.value)
before = db.session.query(StatPlotsSize).filter(StatPlotsSize.created_at <= since).order_by(StatPlotsSize.created_at.desc()).limit(1).first()
#app.logger.info(before.value)
#app.logger.debug(before.value)
gibs = (latest.value - before.value)
fmtted = converters.gib_to_fmt(gibs)
if fmtted == "0.000 B":
Expand All @@ -73,31 +73,31 @@ def plots_size_diff(since):
else:
result = fmtted
except Exception as ex:
app.logger.info("Failed to query for day diff of plots_size because {0}".format(str(ex)))
#app.logger.info("Result is: {0}".format(result))
app.logger.debug("Failed to query for day diff of plots_size because {0}".format(str(ex)))
#app.logger.debug("Result is: {0}".format(result))
return result

def total_coin_diff(since, blockchain):
result = ''
try:
latest = db.session.query(StatTotalChia).filter(StatTotalChia.blockchain==blockchain).order_by(StatTotalChia.created_at.desc()).limit(1).first()
#app.logger.info(latest.value)
#app.logger.debug(latest.value)
before = db.session.query(StatTotalChia).filter(StatTotalChia.blockchain==blockchain, StatTotalChia.created_at <= since).order_by(StatTotalChia.created_at.desc()).limit(1).first()
#app.logger.info(before.value)
#app.logger.debug(before.value)
if (latest.value - before.value) != 0:
result = "%+6g in last day." % (latest.value - before.value)
except Exception as ex:
app.logger.info("Failed to query for day diff of total_chia because {0}".format(str(ex)))
#app.logger.info("Result is: {0}".format(result))
app.logger.debug("Failed to query for day diff of total_chia because {0}".format(str(ex)))
#app.logger.debug("Result is: {0}".format(result))
return result

def netspace_size_diff(since, blockchain):
result = ''
try:
latest = db.session.query(StatNetspaceSize).filter(StatNetspaceSize.blockchain==blockchain).order_by(StatNetspaceSize.created_at.desc()).limit(1).first()
#app.logger.info(latest.value)
#app.logger.debug(latest.value)
before = db.session.query(StatNetspaceSize).filter(StatNetspaceSize.blockchain==blockchain, StatNetspaceSize.created_at <= since).order_by(StatNetspaceSize.created_at.desc()).limit(1).first()
#app.logger.info(before.value)
#app.logger.debug(before.value)
gibs = (latest.value - before.value)
fmtted = converters.gib_to_fmt(gibs)
if fmtted == "0.000 B":
Expand All @@ -107,8 +107,8 @@ def netspace_size_diff(since, blockchain):
else:
result = "{0} in last day.".format(fmtted)
except Exception as ex:
app.logger.info("Failed to query for day diff of netspace_size because {0}".format(str(ex)))
#app.logger.info("Result is: {0}".format(result))
app.logger.debug("Failed to query for day diff of netspace_size because {0}".format(str(ex)))
#app.logger.debug("Result is: {0}".format(result))
return result

class DailyWorker:
Expand All @@ -129,7 +129,7 @@ def load_daily_farming_summaries():
def daily_summaries(since, hostname, blockchain):
result = None
try:
#app.logger.info(since)
#app.logger.debug(since)
result = db.session.query(Alert).filter(
Alert.hostname==hostname,
Alert.blockchain==blockchain,
Expand All @@ -138,7 +138,7 @@ def daily_summaries(since, hostname, blockchain):
Alert.service == "DAILY"
).order_by(Alert.created_at.desc()).first()
except Exception as ex:
app.logger.info("Failed to query for latest daily summary for {0} - {1} because {2}".format(
app.logger.debug("Failed to query for latest daily summary for {0} - {1} because {2}".format(
hostname, blockchain, str(ex)))
return result

Expand Down Expand Up @@ -172,18 +172,19 @@ def load_recent_disk_usage(disk_type):
else:
path_values.append('null')
summary_by_worker[hostname][path] = path_values
app.logger.info(summary_by_worker.keys())
app.logger.debug(summary_by_worker.keys())
return summary_by_worker

def load_current_disk_usage(disk_type):
def load_current_disk_usage(disk_type, hostname=None):
db = get_stats_db()
cur = db.cursor()
summary_by_worker = {}
value_factor = "" # Leave at GB for plotting disks
if disk_type == "plots":
value_factor = "/1024" # Divide to TB for plots disks
for wk in chia.load_farmers():
hostname = wk['hostname']
if hostname and hostname != wk['hostname']:
continue
paths = []
used = []
free = []
Expand All @@ -192,7 +193,7 @@ def load_current_disk_usage(disk_type):
sql = "select path, value{0}, created_at from stat_{1}_disk_free where hostname = ? group by path having max(created_at)".format(value_factor, disk_type)
free_result =cur.execute(sql, [ wk['hostname'], ]).fetchall()
if len(used_result) != len(free_result):
app.logger.info("Found mismatched count of disk used/free stats for {0}".format(disk_type))
app.logger.debug("Found mismatched count of disk used/free stats for {0}".format(disk_type))
else:
for used_row in used_result:
paths.append(used_row[0])
Expand All @@ -202,6 +203,6 @@ def load_current_disk_usage(disk_type):
free.append(free_row[1])
continue
if len(paths):
summary_by_worker[hostname] = { "paths": paths, "used": used, "free": free}
#app.logger.info(summary_by_worker.keys())
summary_by_worker[wk['hostname']] = { "paths": paths, "used": used, "free": free}
#app.logger.debug(summary_by_worker.keys())
return summary_by_worker
46 changes: 43 additions & 3 deletions web/actions/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Actions around managing distributed workers and their status.
#

import asyncio
import datetime
import os
import psutil
Expand All @@ -19,6 +20,7 @@
from common.models import workers as w
from common.config import globals
from web.models.worker import WorkerSummary
from web.rpc import chia

ALL_TABLES_BY_HOSTNAME = [
'alerts',
Expand All @@ -35,9 +37,12 @@
'workers'
]


def load_worker_summary():
workers = db.session.query(w.Worker).order_by(w.Worker.hostname).all()
def load_worker_summary(hostname = None):
query = db.session.query(w.Worker).order_by(w.Worker.hostname)
if hostname:
workers = query.filter(w.Worker.hostname==hostname)
else:
workers = query.all()
return WorkerSummary(workers)

def get_worker_by_hostname(hostname):
Expand All @@ -51,3 +56,38 @@ def prune_workers_status(hostnames):
db.session.execute("DELETE FROM " + table + " WHERE hostname = :hostname OR hostname = :displayname",
{"hostname":hostname, "displayname":worker.displayname})
db.session.commit()

class WorkerWarning:

def __init__(self, title, message, level="info"):
self.title = title
self.message = message
if level == "info":
self.icon = "info-circle"
elif level == "error":
self.icon = "exclamation-circle"

def plot_count_from_summary(hostname):
try:
hostname = socket.gethostbyname(hostname)
harvesters = asyncio.run(chia.load_plots_per_harvester())
#app.logger.info(harvesters.keys())
if hostname in harvesters:
return len(harvesters[hostname])
except Exception as ex:
app.logger.info("Failed to get harvester plot count for {0} due to {1}.".format(hostname, str(ex)))
return None

def generate_warnings(worker, plots):
warnings = []
worker_plot_file_count = len(plots.rows)
worker_summary_plot_count = plot_count_from_summary(worker.hostname)
if not worker_summary_plot_count:
warnings.append(WorkerWarning("Disconnected harvester!",
"Farm summary reports no harvester for {0}, but Machinaris found {1} plots on disk. Further <a href='https://github.com/guydavis/machinaris/wiki/FAQ#farming-summary-and-file-listing-report-different-plot-counts' target='_blank' class='text-white'>investigation of the worker harvesting service</a> is recommended.".format(
worker.hostname, worker_plot_file_count)))
elif abs(worker_summary_plot_count - worker_plot_file_count) > 2:
warnings.append(WorkerWarning("Mismatched plot counts!",
"Farm summary reports {0} plots for {1}, but Machinaris found {2} plots on disk. Further <a href='https://github.com/guydavis/machinaris/wiki/FAQ#farming-summary-and-file-listing-report-different-plot-counts' target='_blank' class='text-white'>investigation of the worker harvesting service</a> is recommended.".format(
worker_summary_plot_count, worker.hostname, worker_plot_file_count)))
return warnings
18 changes: 16 additions & 2 deletions web/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,23 @@ def workers():
if request.method == 'POST':
if request.form.get('action') == "prune":
worker.prune_workers_status(request.form.getlist('hostname'))
workers = worker.load_worker_summary()
wkrs = worker.load_worker_summary()
return render_template('workers.html', reload_seconds=120,
workers=workers, global_config=gc, now=gc['now'])
workers=wkrs, global_config=gc, now=gc['now'])

@app.route('/worker', methods=['GET'])
def worker_route():
gc = globals.load()
hostname=request.args.get('hostname')
wkr = worker.load_worker_summary(hostname=hostname).workers[0]
plots = chia.load_plots_farming(hostname=hostname)
plotting = plotman.load_plotting_summary(hostname=hostname)
plots_disk_usage = stats.load_current_disk_usage('plots',hostname=hostname)
plotting_disk_usage = stats.load_current_disk_usage('plotting',hostname=hostname)
warnings = worker.generate_warnings(wkr, plots)
return render_template('worker.html', worker=wkr,
plots=plots, plotting=plotting, plots_disk_usage=plots_disk_usage,
plotting_disk_usage=plotting_disk_usage, warnings=warnings, global_config=gc)

@app.route('/network/blockchain')
def network_blockchain():
Expand Down
44 changes: 44 additions & 0 deletions web/rpc/chia.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# RPC interactions with Chia on controller via WebUI
#

import asyncio
import datetime

from chia.rpc.full_node_rpc_client import FullNodeRpcClient
from chia.rpc.farmer_rpc_client import FarmerRpcClient
from chia.util.default_root import DEFAULT_ROOT_PATH
from chia.util.ints import uint16
from chia.util.config import load_config as load_chia_config

from web import app

async def load_plots_per_harvester():
harvesters = {}
try:
config = load_chia_config(DEFAULT_ROOT_PATH, 'config.yaml')
farmer_rpc_port = config["farmer"]["rpc_port"]
farmer = await FarmerRpcClient.create(
'localhost', uint16(farmer_rpc_port), DEFAULT_ROOT_PATH, config
)
result = await farmer.get_harvesters()
farmer.close()
await farmer.await_closed()
for harvester in result["harvesters"]:
host = harvester["connection"]["host"]
plots = harvester["plots"]
harvester_plots = []
for plot in plots:
harvester_plots.append({
"type": "solo" if (plot["pool_contract_puzzle_hash"] is None) else "portable",
"plot_id": plot['plot_id'],
"file_size": plot['file_size'], # bytes
"filename": plot['filename'], # full path and name
"plot_public_key": plot['plot_public_key'],
"pool_contract_puzzle_hash": plot['pool_contract_puzzle_hash'],
"pool_public_key": plot['pool_public_key'],
})
harvesters[host] = harvester_plots
except Exception as ex:
app.logger.info("Error getting plots via RPC: {0}".format(str(ex)))
return harvesters
1 change: 0 additions & 1 deletion web/templates/farming/plots.html
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ <h5 class="modal-title" id="analyzeModalLabel">Plotting Analysis</h5>
</div>

<div class="p-3 mb-4 bg-light border rounded-3">

<div class="table-responsive">
<table id="data" class="table table-dark">
<thead>
Expand Down
2 changes: 1 addition & 1 deletion web/templates/plots_check.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
function load_plots_check(first_load=false) {
var output = document.getElementById('output');
var xhr = new XMLHttpRequest();
xhr.open('GET', "{{ url_for('farming') }}?hostname="+ get('hostname') + "&check=plots&first_load="+first_load);
xhr.open('GET', "{{ url_for('farming_plots') }}?hostname="+ get('hostname') + "&check=plots&first_load="+first_load);
xhr.send();
xhr.onload = function () {
output.textContent = xhr.responseText;
Expand Down
Loading

0 comments on commit edc6e51

Please sign in to comment.