-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Prometheus: fix offline worker metrics #1135
base: master
Are you sure you want to change the base?
Changes from 8 commits
1620e28
230d8f7
9ec035a
ffba617
10efad5
a7d967a
c1ccc40
83e3915
542caee
a8c2e2f
8356853
fb74e95
9c157d1
9bcd382
f45c64d
1a48a12
e0fc2d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
import logging | ||
from typing import List, Tuple, Set | ||
|
||
from prometheus_client import Counter as PrometheusCounter, Histogram, Gauge | ||
from prometheus_client.metrics import MetricWrapperBase | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class LabelNames: | ||
WORKER = 'worker' | ||
TYPE = 'type' | ||
TASK = 'task' | ||
|
||
|
||
class PrometheusMetrics(object): | ||
events = PrometheusCounter( | ||
'flower_events_total', "Number of events", [LabelNames.WORKER, LabelNames.TYPE, LabelNames.TASK] | ||
) | ||
runtime = Histogram('flower_task_runtime_seconds', "Task runtime", [LabelNames.WORKER, LabelNames.TASK]) | ||
prefetch_time = Gauge( | ||
'flower_task_prefetch_time_seconds', | ||
"The time the task spent waiting at the celery worker to be executed.", | ||
[LabelNames.WORKER, LabelNames.TASK] | ||
) | ||
number_of_prefetched_tasks = Gauge( | ||
'flower_worker_prefetched_tasks', | ||
'Number of tasks of given type prefetched at a worker', | ||
[LabelNames.WORKER, LabelNames.TASK] | ||
) | ||
worker_online = Gauge('flower_worker_online', "Worker online status", [LabelNames.WORKER]) | ||
worker_number_of_currently_executing_tasks = Gauge( | ||
'flower_worker_number_of_currently_executing_tasks', | ||
"Number of tasks currently executing at a worker", | ||
[LabelNames.WORKER] | ||
) | ||
|
||
@property | ||
def transient_metrics(self) -> List[MetricWrapperBase]: | ||
return [ | ||
self.events, | ||
self.runtime, | ||
self.prefetch_time, | ||
self.number_of_prefetched_tasks, | ||
self.worker_online, | ||
self.worker_number_of_currently_executing_tasks | ||
] | ||
|
||
def remove_metrics_for_offline_workers(self, offline_workers: Set[str]): | ||
for metric in self.transient_metrics: | ||
labels_sets_for_offline_workers = self._get_label_sets_for_offline_workers( | ||
metric=metric, offline_workers=offline_workers | ||
) | ||
for label_set in labels_sets_for_offline_workers: | ||
try: | ||
metric.remove(*label_set) | ||
logger.debug('Removed label set: %s for metric %s', label_set, metric) | ||
except KeyError: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the metric could already be removed in another thread, it's best to just |
||
pass | ||
|
||
@staticmethod | ||
def _get_label_sets_for_offline_workers( | ||
metric: MetricWrapperBase, offline_workers: Set[str] | ||
) -> Set[Tuple[str, ...]]: | ||
sampled_metrics = metric.collect() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Not the prettiest way of getting this data but I hope I can at some point bake it into the metrics themselves. |
||
|
||
label_sets_for_offline_workers = set() | ||
for sampled_metric in sampled_metrics: | ||
for sample in sampled_metric.samples: | ||
labels = sample.labels | ||
worker = labels.get(LabelNames.WORKER) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer to use enums instead of strings to avoid typo problems hence I used LabelNames enum in metric definitions and use it here to make sure we access the correct one. If you think it's too much....we can always go back to strings. |
||
if worker is None or worker not in offline_workers: | ||
continue | ||
|
||
label_sets_for_offline_workers.add(tuple([labels[label_name] for label_name in metric._labelnames])) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is important to generate label values in correct order as in the ._labelnames, otherwise we will not remove it from the metric (KeyError will be raised which we catch and pass). That's why I access this private attribute here. It is unlikely they will change implementation and remove this attribute... I think it is small enough smell to let it exist for now. If I find the time, I would like to make it into a read-only property of a metric in prometheus_client project itself - then I will amend this here or even better add a method in prometheus_client to get current label sets as well... |
||
|
||
return label_sets_for_offline_workers |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
import collections | ||
|
||
from functools import partial | ||
from typing import Dict, Any, Set | ||
|
||
from tornado.ioloop import IOLoop | ||
from tornado.ioloop import PeriodicCallback | ||
|
@@ -13,35 +14,14 @@ | |
from celery.events.state import State | ||
|
||
from . import api | ||
from .api.prometheus_metrics import PrometheusMetrics | ||
from .options import options | ||
|
||
from collections import Counter | ||
|
||
from prometheus_client import Counter as PrometheusCounter, Histogram, Gauge | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class PrometheusMetrics(object): | ||
events = PrometheusCounter('flower_events_total', "Number of events", ['worker', 'type', 'task']) | ||
runtime = Histogram('flower_task_runtime_seconds', "Task runtime", ['worker', 'task']) | ||
prefetch_time = Gauge( | ||
'flower_task_prefetch_time_seconds', | ||
"The time the task spent waiting at the celery worker to be executed.", | ||
['worker', 'task'] | ||
) | ||
number_of_prefetched_tasks = Gauge( | ||
'flower_worker_prefetched_tasks', | ||
'Number of tasks of given type prefetched at a worker', | ||
['worker', 'task'] | ||
) | ||
worker_online = Gauge('flower_worker_online', "Worker online status", ['worker']) | ||
worker_number_of_currently_executing_tasks = Gauge( | ||
'flower_worker_number_of_currently_executing_tasks', | ||
"Number of tasks currently executing at a worker", | ||
['worker'] | ||
) | ||
|
||
|
||
class EventsState(State): | ||
# EventsState object is created and accessed only from ioloop thread | ||
|
||
|
@@ -103,6 +83,84 @@ def event(self, event): | |
if cls: | ||
cls.send_message(event) | ||
|
||
def remove_metrics_for_offline_workers(self): | ||
if options.purge_offline_workers is not None: | ||
offline_workers = self.get_offline_workers(workers=self.get_workers()) | ||
if not offline_workers: | ||
return | ||
|
||
self.metrics.remove_metrics_for_offline_workers( | ||
offline_workers=offline_workers | ||
) | ||
|
||
def get_online_workers(self) -> Dict[str, Any]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You probably notice I started introducing typing into the project.
|
||
workers = self.get_workers() | ||
if options.purge_offline_workers is not None: | ||
for name in self.get_offline_workers(workers=workers): | ||
workers.pop(name) | ||
|
||
return workers | ||
|
||
def get_workers(self) -> Dict[str, Any]: | ||
workers = {} | ||
for name, values in self.counter.items(): | ||
if name not in self.workers: | ||
continue | ||
worker = self.workers[name] | ||
info = dict(values) | ||
info.update(self._as_dict(worker)) | ||
info.update(status=worker.alive) | ||
workers[name] = info | ||
|
||
return workers | ||
|
||
@staticmethod | ||
def get_offline_workers(workers: Dict[str, Any]) -> Set[str]: | ||
timestamp = int(time.time()) | ||
offline_workers = set() | ||
for name, info in workers.items(): | ||
if info.get('status', True): | ||
continue | ||
|
||
heartbeats = info.get('heartbeats', []) | ||
last_heartbeat = int(max(heartbeats)) if heartbeats else None | ||
if not last_heartbeat or timestamp - last_heartbeat > options.purge_offline_workers: | ||
offline_workers.add(name) | ||
logger.debug('Found offline worker: %s', name) | ||
|
||
return offline_workers | ||
|
||
@classmethod | ||
def _as_dict(cls, worker) -> Dict[str, Any]: | ||
if hasattr(worker, '_fields'): | ||
return dict((k, worker.__getattribute__(k)) for k in worker._fields) | ||
else: | ||
return cls._info(worker) | ||
|
||
@classmethod | ||
def _info(cls, worker) -> Dict[str, Any]: | ||
_fields = ( | ||
'hostname', | ||
'pid', | ||
'freq', | ||
'heartbeats', | ||
'clock', | ||
'active', | ||
'processed', | ||
'loadavg', | ||
'sw_ident', | ||
'sw_ver', | ||
'sw_sys' | ||
) | ||
|
||
def _keys(): | ||
for key in _fields: | ||
value = getattr(worker, key, None) | ||
if value is not None: | ||
yield key, value | ||
|
||
return dict(_keys()) | ||
|
||
|
||
class Events(threading.Thread): | ||
events_enable_interval = 5000 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we think that this should be configurable, maybe we could make a second PR and add it as an additional feature. For now I think it makes total sense just to wipe any existing records from metrics for label sets containing offline workers. Otherwise the graphs are polluted with a reading from like a few days ago...