Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus: fix offline worker metrics #1135

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions flower/api/prometheus_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import logging
from typing import List, Tuple, Set

from prometheus_client import Counter as PrometheusCounter, Histogram, Gauge
from prometheus_client.metrics import MetricWrapperBase

logger = logging.getLogger(__name__)


class LabelNames:
WORKER = 'worker'
TYPE = 'type'
TASK = 'task'


class PrometheusMetrics(object):
events = PrometheusCounter(
'flower_events_total', "Number of events", [LabelNames.WORKER, LabelNames.TYPE, LabelNames.TASK]
)
runtime = Histogram('flower_task_runtime_seconds', "Task runtime", [LabelNames.WORKER, LabelNames.TASK])
prefetch_time = Gauge(
'flower_task_prefetch_time_seconds',
"The time the task spent waiting at the celery worker to be executed.",
[LabelNames.WORKER, LabelNames.TASK]
)
number_of_prefetched_tasks = Gauge(
'flower_worker_prefetched_tasks',
'Number of tasks of given type prefetched at a worker',
[LabelNames.WORKER, LabelNames.TASK]
)
worker_online = Gauge('flower_worker_online', "Worker online status", [LabelNames.WORKER])
worker_number_of_currently_executing_tasks = Gauge(
'flower_worker_number_of_currently_executing_tasks',
"Number of tasks currently executing at a worker",
[LabelNames.WORKER]
)

@property
def transient_metrics(self) -> List[MetricWrapperBase]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we think that this should be configurable, maybe we could make a second PR and add it as an additional feature. For now I think it makes total sense just to wipe any existing records from metrics for label sets containing offline workers. Otherwise the graphs are polluted with a reading from like a few days ago...

return [
self.events,
self.runtime,
self.prefetch_time,
self.number_of_prefetched_tasks,
self.worker_online,
self.worker_number_of_currently_executing_tasks
]

def remove_metrics_for_offline_workers(self, offline_workers: Set[str]):
for metric in self.transient_metrics:
labels_sets_for_offline_workers = self._get_label_sets_for_offline_workers(
metric=metric, offline_workers=offline_workers
)
for label_set in labels_sets_for_offline_workers:
try:
metric.remove(*label_set)
logger.debug('Removed label set: %s for metric %s', label_set, metric)
except KeyError:
Copy link
Contributor Author

@Tomasz-Kluczkowski Tomasz-Kluczkowski Aug 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the metric could already be removed in another thread, it's best to just pass

pass

@staticmethod
def _get_label_sets_for_offline_workers(
metric: MetricWrapperBase, offline_workers: Set[str]
) -> Set[Tuple[str, ...]]:
sampled_metrics = metric.collect()
Copy link
Contributor Author

@Tomasz-Kluczkowski Tomasz-Kluczkowski Aug 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.collect() gets a Metric object with samples. The samples have a labels dictionary representing the label name to label value. We are interested in getting those values and checking if the one under key 'worker' is in our offline_workers set.

Not the prettiest way of getting this data but collect is the only common, public method of all the metric types we use that provides any info like this...

I hope I can at some point bake it into the metrics themselves.


label_sets_for_offline_workers = set()
for sampled_metric in sampled_metrics:
for sample in sampled_metric.samples:
labels = sample.labels
worker = labels.get(LabelNames.WORKER)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to use enums instead of strings to avoid typo problems hence I used LabelNames enum in metric definitions and use it here to make sure we access the correct one. If you think it's too much....we can always go back to strings.

if worker is None or worker not in offline_workers:
continue

label_sets_for_offline_workers.add(tuple([labels[label_name] for label_name in metric._labelnames]))
Copy link
Contributor Author

@Tomasz-Kluczkowski Tomasz-Kluczkowski Aug 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is important to generate label values in correct order as in the ._labelnames, otherwise we will not remove it from the metric (KeyError will be raised which we catch and pass). That's why I access this private attribute here. It is unlikely they will change implementation and remove this attribute... I think it is small enough smell to let it exist for now.

If I find the time, I would like to make it into a read-only property of a metric in prometheus_client project itself - then I will amend this here or even better add a method in prometheus_client to get current label sets as well...


return label_sets_for_offline_workers
104 changes: 81 additions & 23 deletions flower/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import collections

from functools import partial
from typing import Dict, Any, Set

from tornado.ioloop import IOLoop
from tornado.ioloop import PeriodicCallback
Expand All @@ -13,35 +14,14 @@
from celery.events.state import State

from . import api
from .api.prometheus_metrics import PrometheusMetrics
from .options import options

from collections import Counter

from prometheus_client import Counter as PrometheusCounter, Histogram, Gauge

logger = logging.getLogger(__name__)


class PrometheusMetrics(object):
events = PrometheusCounter('flower_events_total', "Number of events", ['worker', 'type', 'task'])
runtime = Histogram('flower_task_runtime_seconds', "Task runtime", ['worker', 'task'])
prefetch_time = Gauge(
'flower_task_prefetch_time_seconds',
"The time the task spent waiting at the celery worker to be executed.",
['worker', 'task']
)
number_of_prefetched_tasks = Gauge(
'flower_worker_prefetched_tasks',
'Number of tasks of given type prefetched at a worker',
['worker', 'task']
)
worker_online = Gauge('flower_worker_online', "Worker online status", ['worker'])
worker_number_of_currently_executing_tasks = Gauge(
'flower_worker_number_of_currently_executing_tasks',
"Number of tasks currently executing at a worker",
['worker']
)


class EventsState(State):
# EventsState object is created and accessed only from ioloop thread

Expand Down Expand Up @@ -103,6 +83,84 @@ def event(self, event):
if cls:
cls.send_message(event)

def remove_metrics_for_offline_workers(self):
if options.purge_offline_workers is not None:
offline_workers = self.get_offline_workers(workers=self.get_workers())
if not offline_workers:
return

self.metrics.remove_metrics_for_offline_workers(
offline_workers=offline_workers
)

def get_online_workers(self) -> Dict[str, Any]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably notice I started introducing typing into the project.
The benefits are:

  • any modern IDE gives you nice autocompletion
  • easier reading/understanding inputs and outputs especially between multiple methods/objects

workers = self.get_workers()
if options.purge_offline_workers is not None:
for name in self.get_offline_workers(workers=workers):
workers.pop(name)

return workers

def get_workers(self) -> Dict[str, Any]:
workers = {}
for name, values in self.counter.items():
if name not in self.workers:
continue
worker = self.workers[name]
info = dict(values)
info.update(self._as_dict(worker))
info.update(status=worker.alive)
workers[name] = info

return workers

@staticmethod
def get_offline_workers(workers: Dict[str, Any]) -> Set[str]:
timestamp = int(time.time())
offline_workers = set()
for name, info in workers.items():
if info.get('status', True):
continue

heartbeats = info.get('heartbeats', [])
last_heartbeat = int(max(heartbeats)) if heartbeats else None
if not last_heartbeat or timestamp - last_heartbeat > options.purge_offline_workers:
offline_workers.add(name)
logger.debug('Found offline worker: %s', name)

return offline_workers

@classmethod
def _as_dict(cls, worker) -> Dict[str, Any]:
if hasattr(worker, '_fields'):
return dict((k, worker.__getattribute__(k)) for k in worker._fields)
else:
return cls._info(worker)

@classmethod
def _info(cls, worker) -> Dict[str, Any]:
_fields = (
'hostname',
'pid',
'freq',
'heartbeats',
'clock',
'active',
'processed',
'loadavg',
'sw_ident',
'sw_ver',
'sw_sys'
)

def _keys():
for key in _fields:
value = getattr(worker, key, None)
if value is not None:
yield key, value

return dict(_keys())


class Events(threading.Thread):
events_enable_interval = 5000
Expand Down
53 changes: 3 additions & 50 deletions flower/views/dashboard.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import logging
import time

from tornado import web
from tornado import gen

from ..events import EventsState
from ..views import BaseHandler
from ..options import options
from ..api.workers import ListWorkers


logger = logging.getLogger(__name__)
Expand All @@ -19,64 +17,19 @@ def get(self):
refresh = self.get_argument('refresh', default=False, type=bool)
json = self.get_argument('json', default=False, type=bool)

events = self.application.events.state
events_state: EventsState = self.application.events.state

if refresh:
try:
self.application.update_workers()
except Exception as e:
logger.exception('Failed to update workers: %s', e)

workers = {}
for name, values in events.counter.items():
if name not in events.workers:
continue
worker = events.workers[name]
info = dict(values)
info.update(self._as_dict(worker))
info.update(status=worker.alive)
workers[name] = info

if options.purge_offline_workers is not None:
timestamp = int(time.time())
offline_workers = []
for name, info in workers.items():
if info.get('status', True):
continue

heartbeats = info.get('heartbeats', [])
last_heartbeat = int(max(heartbeats)) if heartbeats else None
if not last_heartbeat or timestamp - last_heartbeat > options.purge_offline_workers:
offline_workers.append(name)

for name in offline_workers:
workers.pop(name)

workers = events_state.get_online_workers()
if json:
self.write(dict(data=list(workers.values())))
else:
self.render("dashboard.html",
workers=workers,
broker=self.application.capp.connection().as_uri(),
autorefresh=1 if self.application.options.auto_refresh else 0)

@classmethod
def _as_dict(cls, worker):
if hasattr(worker, '_fields'):
return dict((k, worker.__getattribute__(k)) for k in worker._fields)
else:
return cls._info(worker)

@classmethod
def _info(cls, worker):
_fields = ('hostname', 'pid', 'freq', 'heartbeats', 'clock',
'active', 'processed', 'loadavg', 'sw_ident',
'sw_ver', 'sw_sys')

def _keys():
for key in _fields:
value = getattr(worker, key, None)
if value is not None:
yield key, value

return dict(_keys())
4 changes: 4 additions & 0 deletions flower/views/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@

from tornado import gen

from ..events import EventsState
from ..views import BaseHandler


class Metrics(BaseHandler):
@gen.coroutine
def get(self):
events_state: EventsState = self.application.events.state
events_state.remove_metrics_for_offline_workers()

self.write(prometheus_client.generate_latest())
self.set_header("Content-Type", "text/plain")

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/views/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def test_purge_offline_workers(self):
local_received=time.time()))
self.app.events.state = state

with patch('flower.views.dashboard.options') as mock_options:
with patch('flower.events.options') as mock_options:
mock_options.purge_offline_workers = 0
r = self.get('/dashboard')

Expand Down