diff --git a/docs/config.rst b/docs/config.rst index 1ae5e0b27..0bfcc1620 100644 --- a/docs/config.rst +++ b/docs/config.rst @@ -244,7 +244,7 @@ Enable support of `X-Real-Ip` and `X-Scheme` headers tasks_columns ~~~~~~~~~~~~~ -Specifies list of comma-delimited columns on `/tasks/` page. `all` value +Specifies list of` comma-delimited columns on `/tasks/` page. `all` value enables all columns. Columns on the page can be reordered using drag and drop. (by default, `tasks_columns="name,uuid,state,args,kwargs,result,received,started,runtime,worker"`) @@ -313,6 +313,9 @@ See `Authentication` for usage examples purge_offline_workers ~~~~~~~~~~~~~~~~~~~~~ -Time (in seconds) after which offline workers are automatically removed from dashboard. +Time (in seconds) after which: -If omitted, offline workers remain on the dashboard. +- offline workers are automatically removed from dashboard. +- any prometheus metrics containing offline worker in the label values are removed. + +If omitted, offline workers remain on the dashboard/in prometheus metrics. diff --git a/flower/api/prometheus_metrics.py b/flower/api/prometheus_metrics.py new file mode 100644 index 000000000..970f4454e --- /dev/null +++ b/flower/api/prometheus_metrics.py @@ -0,0 +1,77 @@ +import logging +from typing import List, Tuple, Set + +from prometheus_client import Counter as PrometheusCounter, Histogram, Gauge +from prometheus_client.metrics import MetricWrapperBase + +logger = logging.getLogger(__name__) + + +class LabelNames: + WORKER = 'worker' + TYPE = 'type' + TASK = 'task' + + +class PrometheusMetrics(object): + events = PrometheusCounter( + 'flower_events_total', "Number of events", [LabelNames.WORKER, LabelNames.TYPE, LabelNames.TASK] + ) + runtime = Histogram('flower_task_runtime_seconds', "Task runtime", [LabelNames.WORKER, LabelNames.TASK]) + prefetch_time = Gauge( + 'flower_task_prefetch_time_seconds', + "The time the task spent waiting at the celery worker to be executed.", + [LabelNames.WORKER, LabelNames.TASK] + ) + number_of_prefetched_tasks = Gauge( + 'flower_worker_prefetched_tasks', + 'Number of tasks of given type prefetched at a worker', + [LabelNames.WORKER, LabelNames.TASK] + ) + worker_online = Gauge('flower_worker_online', "Worker online status", [LabelNames.WORKER]) + worker_number_of_currently_executing_tasks = Gauge( + 'flower_worker_number_of_currently_executing_tasks', + "Number of tasks currently executing at a worker", + [LabelNames.WORKER] + ) + + @property + def transient_metrics(self) -> List[MetricWrapperBase]: + return [ + self.events, + self.runtime, + self.prefetch_time, + self.number_of_prefetched_tasks, + self.worker_online, + self.worker_number_of_currently_executing_tasks + ] + + def remove_metrics_for_offline_workers(self, offline_workers: Set[str]): + for metric in self.transient_metrics: + labels_sets_for_offline_workers = self._get_label_sets_for_offline_workers( + metric=metric, offline_workers=offline_workers + ) + for label_set in labels_sets_for_offline_workers: + try: + metric.remove(*label_set) + logger.debug('Removed label set: %s for metric %s', label_set, metric) + except KeyError: + pass + + @staticmethod + def _get_label_sets_for_offline_workers( + metric: MetricWrapperBase, offline_workers: Set[str] + ) -> Set[Tuple[str, ...]]: + sampled_metrics = metric.collect() + + label_sets_for_offline_workers = set() + for sampled_metric in sampled_metrics: + for sample in sampled_metric.samples: + labels = sample.labels + worker = labels.get(LabelNames.WORKER) + if worker is None or worker not in offline_workers: + continue + + label_sets_for_offline_workers.add(tuple([labels[label_name] for label_name in metric._labelnames])) + + return label_sets_for_offline_workers diff --git a/flower/events.py b/flower/events.py index 38bc98f1f..cf57888c9 100644 --- a/flower/events.py +++ b/flower/events.py @@ -5,6 +5,7 @@ import collections from functools import partial +from typing import Dict, Any, Set from tornado.ioloop import IOLoop from tornado.ioloop import PeriodicCallback @@ -13,35 +14,14 @@ from celery.events.state import State from . import api +from .api.prometheus_metrics import PrometheusMetrics +from .options import options from collections import Counter -from prometheus_client import Counter as PrometheusCounter, Histogram, Gauge - logger = logging.getLogger(__name__) -class PrometheusMetrics(object): - events = PrometheusCounter('flower_events_total', "Number of events", ['worker', 'type', 'task']) - runtime = Histogram('flower_task_runtime_seconds', "Task runtime", ['worker', 'task']) - prefetch_time = Gauge( - 'flower_task_prefetch_time_seconds', - "The time the task spent waiting at the celery worker to be executed.", - ['worker', 'task'] - ) - number_of_prefetched_tasks = Gauge( - 'flower_worker_prefetched_tasks', - 'Number of tasks of given type prefetched at a worker', - ['worker', 'task'] - ) - worker_online = Gauge('flower_worker_online', "Worker online status", ['worker']) - worker_number_of_currently_executing_tasks = Gauge( - 'flower_worker_number_of_currently_executing_tasks', - "Number of tasks currently executing at a worker", - ['worker'] - ) - - class EventsState(State): # EventsState object is created and accessed only from ioloop thread @@ -103,6 +83,85 @@ def event(self, event): if cls: cls.send_message(event) + def get_online_workers(self) -> Dict[str, Any]: + workers = self._get_workers() + if options.purge_offline_workers is not None: + for name in self.get_offline_workers(workers=workers): + workers.pop(name) + + return workers + + def _get_workers(self) -> Dict[str, Any]: + workers = {} + for name, values in self.counter.items(): + if name not in self.workers: + continue + worker = self.workers[name] + info = dict(values) + info.update(self._as_dict(worker)) + info.update(status=worker.alive) + workers[name] = info + + return workers + + @staticmethod + def get_offline_workers(workers: Dict[str, Any]) -> Set[str]: + timestamp = int(time.time()) + offline_workers = set() + for name, info in workers.items(): + if info.get('status', True): + continue + + heartbeats = info.get('heartbeats', []) + last_heartbeat = int(max(heartbeats)) if heartbeats else None + if not last_heartbeat or timestamp - last_heartbeat > options.purge_offline_workers: + offline_workers.add(name) + + return offline_workers + + @classmethod + def _as_dict(cls, worker) -> Dict[str, Any]: + if hasattr(worker, '_fields'): + return dict((k, worker.__getattribute__(k)) for k in worker._fields) + else: + return cls._info(worker) + + @classmethod + def _info(cls, worker) -> Dict[str, Any]: + _fields = ( + 'hostname', + 'pid', + 'freq', + 'heartbeats', + 'clock', + 'active', + 'processed', + 'loadavg', + 'sw_ident', + 'sw_ver', + 'sw_sys' + ) + + def _keys(): + for key in _fields: + value = getattr(worker, key, None) + if value is not None: + yield key, value + + return dict(_keys()) + + def remove_metrics_for_offline_workers(self): + if options.purge_offline_workers is None: + return + + offline_workers = self.get_offline_workers(workers=self._get_workers()) + if not offline_workers: + return + + self.metrics.remove_metrics_for_offline_workers( + offline_workers=offline_workers + ) + class Events(threading.Thread): events_enable_interval = 5000 diff --git a/flower/options.py b/flower/options.py index 45e096ab2..106d0a62b 100644 --- a/flower/options.py +++ b/flower/options.py @@ -50,7 +50,7 @@ define("auto_refresh", default=True, help="refresh dashboards", type=bool) define("purge_offline_workers", default=None, type=int, - help="time (in seconds) after which offline workers are purged from dashboard") + help="time (in seconds) after which offline workers are purged from dashboard and prometheus metrics") define("cookie_secret", type=str, default=None, help="secure cookie secret") define("conf", default=DEFAULT_CONFIG_FILE, diff --git a/flower/views/dashboard.py b/flower/views/dashboard.py index efa3e36e0..d959d5e25 100644 --- a/flower/views/dashboard.py +++ b/flower/views/dashboard.py @@ -1,12 +1,10 @@ import logging -import time from tornado import web from tornado import gen +from ..events import EventsState from ..views import BaseHandler -from ..options import options -from ..api.workers import ListWorkers logger = logging.getLogger(__name__) @@ -19,7 +17,7 @@ def get(self): refresh = self.get_argument('refresh', default=False, type=bool) json = self.get_argument('json', default=False, type=bool) - events = self.application.events.state + events_state: EventsState = self.application.events.state if refresh: try: @@ -27,31 +25,7 @@ def get(self): except Exception as e: logger.exception('Failed to update workers: %s', e) - workers = {} - for name, values in events.counter.items(): - if name not in events.workers: - continue - worker = events.workers[name] - info = dict(values) - info.update(self._as_dict(worker)) - info.update(status=worker.alive) - workers[name] = info - - if options.purge_offline_workers is not None: - timestamp = int(time.time()) - offline_workers = [] - for name, info in workers.items(): - if info.get('status', True): - continue - - heartbeats = info.get('heartbeats', []) - last_heartbeat = int(max(heartbeats)) if heartbeats else None - if not last_heartbeat or timestamp - last_heartbeat > options.purge_offline_workers: - offline_workers.append(name) - - for name in offline_workers: - workers.pop(name) - + workers = events_state.get_online_workers() if json: self.write(dict(data=list(workers.values()))) else: @@ -59,24 +33,3 @@ def get(self): workers=workers, broker=self.application.capp.connection().as_uri(), autorefresh=1 if self.application.options.auto_refresh else 0) - - @classmethod - def _as_dict(cls, worker): - if hasattr(worker, '_fields'): - return dict((k, worker.__getattribute__(k)) for k in worker._fields) - else: - return cls._info(worker) - - @classmethod - def _info(cls, worker): - _fields = ('hostname', 'pid', 'freq', 'heartbeats', 'clock', - 'active', 'processed', 'loadavg', 'sw_ident', - 'sw_ver', 'sw_sys') - - def _keys(): - for key in _fields: - value = getattr(worker, key, None) - if value is not None: - yield key, value - - return dict(_keys()) diff --git a/flower/views/monitor.py b/flower/views/monitor.py index 302cc8589..84608b926 100644 --- a/flower/views/monitor.py +++ b/flower/views/monitor.py @@ -2,12 +2,16 @@ from tornado import gen +from ..events import EventsState from ..views import BaseHandler class Metrics(BaseHandler): @gen.coroutine def get(self): + events_state: EventsState = self.application.events.state + events_state.remove_metrics_for_offline_workers() + self.write(prometheus_client.generate_latest()) self.set_header("Content-Type", "text/plain") diff --git a/tests/unit/api/test_prometheus_metrics.py b/tests/unit/api/test_prometheus_metrics.py new file mode 100644 index 000000000..7b6c587d3 --- /dev/null +++ b/tests/unit/api/test_prometheus_metrics.py @@ -0,0 +1,35 @@ +from unittest.mock import Mock, call + +from flower.api.prometheus_metrics import PrometheusMetrics +from tests.unit import AsyncHTTPTestCase + + +class TestPrometheusMetrics(AsyncHTTPTestCase): + def test_remove_metrics_for_offline_workers_removes_label_sets_containing_offline_worker(self): + prometheus_metrics = PrometheusMetrics() + worker_online = 'worker_online' + worker_offline = 'worker_offline' + event_type = 'task-started' + task = 'tasks.add' + other_task = 'tasks.mul' + worker_online_label_set = (worker_online, event_type, task) + worker_offline_label_set = (worker_offline, event_type, task) + other_worker_offline_label_set = (worker_offline, event_type, other_task) + + prometheus_metrics.events.labels(*worker_online_label_set).inc() + prometheus_metrics.events.labels(*worker_offline_label_set).inc() + prometheus_metrics.events.labels(*other_worker_offline_label_set).inc() + + offline_workers = {worker_offline} + + mock_remove = Mock() + prometheus_metrics.events.remove = mock_remove + prometheus_metrics.remove_metrics_for_offline_workers(offline_workers=offline_workers) + + mock_remove.assert_has_calls( + [ + call(*worker_offline_label_set), + call(*other_worker_offline_label_set), + ], + any_order=True + ) diff --git a/tests/unit/test_events.py b/tests/unit/test_events.py new file mode 100644 index 000000000..85c087e27 --- /dev/null +++ b/tests/unit/test_events.py @@ -0,0 +1,208 @@ +import time +from unittest.mock import patch, Mock + +from celery.events import Event + +from flower.events import EventsState +from tests.unit import AsyncHTTPTestCase + + +class TestGetOnlineWorkers(AsyncHTTPTestCase): + def test_returns_current_worker_info(self): + state = EventsState() + worker = 'worker1' + tasks_executing = 12 + state.get_or_create_worker(worker) + events = [ + Event('worker-online', hostname=worker), + Event('worker-heartbeat', hostname=worker, active=tasks_executing) + ] + + fake_event_time = time.time() + for i, e in enumerate(events): + e['clock'] = i + e['local_received'] = fake_event_time + i + state.event(e) + + expected_workers_info = { + worker: + { + 'active': tasks_executing, + 'clock': 1, + 'freq': 60, + 'heartbeats': [fake_event_time, fake_event_time + 1], + 'hostname': worker, + 'loadavg': None, + 'pid': None, + 'processed': None, + 'status': True, + 'sw_ident': None, + 'sw_sys': None, + 'sw_ver': None, + 'worker-heartbeat': 1, + 'worker-online': 1 + } + } + + self.assertEqual(state.get_online_workers(), expected_workers_info) + + def test_removes_info_for_offline_workers_if_purging_is_enabled(self): + state = EventsState() + worker_offline = 'worker1' + worker_online = 'worker2' + tasks_executing = 12 + state.get_or_create_worker(worker_offline) + events = [ + Event('worker-online', hostname=worker_offline), + Event('worker-heartbeat', hostname=worker_offline, active=tasks_executing), + Event('worker-offline', hostname=worker_offline), + Event('worker-online', hostname=worker_online), + ] + + fake_event_time = time.time() + for i, e in enumerate(events): + e['clock'] = i + e['local_received'] = fake_event_time + i + state.event(e) + + with patch('flower.events.options') as mock_options: + mock_options.purge_offline_workers = 0 + workers_info = state.get_online_workers() + + expected_workers_info = { + worker_online: + { + 'active': None, + 'clock': 3, + 'freq': 60, + 'heartbeats': [fake_event_time + 3], + 'hostname': worker_online, + 'loadavg': None, + 'pid': None, + 'processed': None, + 'status': True, + 'sw_ident': None, + 'sw_sys': None, + 'sw_ver': None, + 'worker-online': 1 + } + } + + self.assertEqual(workers_info, expected_workers_info) + + +class TestGetOfflineWorkers(AsyncHTTPTestCase): + def test_returns_empty_set_if_all_workers_are_online(self): + state = EventsState() + workers = { + 'worker1': { + 'status': True, + }, + 'worker2': { + 'status': True + } + } + + self.assertEqual(state.get_offline_workers(workers=workers), set()) + + def test_returns_offline_worker_missing_heartbeats(self): + state = EventsState() + worker = 'worker1' + workers = { + worker: { + 'status': False, + 'heartbeats': [] + }, + } + + self.assertEqual(state.get_offline_workers(workers=workers), {worker}) + + @patch('flower.events.time.time') + def test_does_not_return_worker_as_offline_if_last_heartbeat_within_purge_offline_workers_option(self, mock_time): + fake_timestamp = 100 + purge_offline_workers = 10 + mock_time.return_value = fake_timestamp + + state = EventsState() + worker = 'worker1' + workers = { + worker: { + 'status': False, + 'heartbeats': [fake_timestamp - purge_offline_workers - 2, fake_timestamp - purge_offline_workers] + }, + } + + with patch('flower.events.options') as mock_options: + mock_options.purge_offline_workers = purge_offline_workers + self.assertEqual(state.get_offline_workers(workers=workers), set()) + + @patch('flower.events.time.time') + def test_returns_offline_worker_if_last_heartbeat_too_old(self, mock_time): + fake_timestamp = 100 + purge_offline_workers = 10 + mock_time.return_value = fake_timestamp + + state = EventsState() + worker = 'worker1' + workers = { + worker: { + 'status': False, + 'heartbeats': [fake_timestamp - purge_offline_workers - 2, fake_timestamp - purge_offline_workers - 1] + }, + } + + with patch('flower.events.options') as mock_options: + mock_options.purge_offline_workers = purge_offline_workers + self.assertEqual(state.get_offline_workers(workers=workers), {worker}) + + +class TestRemoveMetricsForOfflineWorkers(AsyncHTTPTestCase): + def test_does_not_remove_metrics_if_purge_offline_workers_is_none(self): + state = EventsState() + mock_remove_metrics_for_offline_workers = Mock() + state.metrics.remove_metrics_for_offline_workers = mock_remove_metrics_for_offline_workers + + with patch('flower.events.options') as mock_options: + mock_options.purge_offline_workers = None + state.remove_metrics_for_offline_workers() + + mock_remove_metrics_for_offline_workers.assert_not_called() + + def test_does_not_remove_metrics_if_there_are_no_offline_workers(self): + state = EventsState() + mock_remove_metrics_for_offline_workers = Mock() + state.metrics.remove_metrics_for_offline_workers = mock_remove_metrics_for_offline_workers + + with patch('flower.events.options') as mock_options: + mock_options.purge_offline_workers = 0 + state.remove_metrics_for_offline_workers() + + mock_remove_metrics_for_offline_workers.assert_not_called() + + def test_removes_metrics_for_offline_workers_only(self): + state = EventsState() + worker_offline = 'worker1' + worker_online = 'worker2' + state.get_or_create_worker(worker_offline) + state.get_or_create_worker(worker_online) + events = [ + Event('worker-online', hostname=worker_offline), + Event('worker-heartbeat', hostname=worker_offline, active=1), + Event('worker-offline', hostname=worker_offline), + Event('worker-online', hostname=worker_online), + ] + + for i, e in enumerate(events): + e['clock'] = i + import time + e['local_received'] = time.time() + state.event(e) + + mock_remove_metrics_for_offline_workers = Mock() + state.metrics.remove_metrics_for_offline_workers = mock_remove_metrics_for_offline_workers + + with patch('flower.events.options') as mock_options: + mock_options.purge_offline_workers = 0 + state.remove_metrics_for_offline_workers() + + mock_remove_metrics_for_offline_workers.assert_called_once_with(offline_workers={worker_offline}) diff --git a/tests/unit/views/test_dashboard.py b/tests/unit/views/test_dashboard.py index 571257daa..b6ec29b16 100644 --- a/tests/unit/views/test_dashboard.py +++ b/tests/unit/views/test_dashboard.py @@ -16,7 +16,6 @@ from celery.utils import uuid from flower.events import EventsState -from flower.options import options class DashboardTests(AsyncHTTPTestCase): @@ -73,7 +72,7 @@ def test_purge_offline_workers(self): local_received=time.time())) self.app.events.state = state - with patch('flower.views.dashboard.options') as mock_options: + with patch('flower.events.options') as mock_options: mock_options.purge_offline_workers = 0 r = self.get('/dashboard') diff --git a/tests/unit/views/test_monitor.py b/tests/unit/views/test_monitor.py index b2041a700..643a85000 100644 --- a/tests/unit/views/test_monitor.py +++ b/tests/unit/views/test_monitor.py @@ -1,6 +1,7 @@ import re import time from datetime import datetime, timedelta +from unittest.mock import patch from celery.events import Event from kombu import uuid @@ -197,6 +198,55 @@ def test_worker_prefetched_tasks_metric(self): f'flower_worker_prefetched_tasks{{task="{task_name}",worker="{worker_name}"}} 1.0' in metrics ) + def test_metrics_are_removed_if_purging_is_enabled(self): + """ + This test generates metrics for all measured values and as a last event it has worker-offline event which + together with purge_offline_workers = 0 setting causes all those metrics to be removed from their Prometheus + metric objects. The result should be no metric values presented in the '/metrics' endpoint response. + """ + state = EventsState() + worker_name = 'worker1' + task_name = 'task1' + state.get_or_create_worker(worker_name) + events = task_succeeded_events(worker=worker_name, name=task_name, id='123') + events.append(Event('worker-offline', hostname=worker_name)) + + task_received = time.time() + task_started = task_received + 3 + for i, e in enumerate(events): + e['clock'] = i + e['local_received'] = time.time() + if e['type'] == 'task-received': + e['timestamp'] = task_received + if e['type'] == 'task-started': + e['timestamp'] = task_started + state.event(e) + self.app.events.state = state + + with patch('flower.events.options') as mock_options: + mock_options.purge_offline_workers = 0 + metrics = self.get('/metrics').body.decode('utf-8') + + self.assertTrue( + f'flower_task_runtime_seconds_count{{task="{task_name}",worker="{worker_name}"}} 1.0' not in metrics + ) + self.assertTrue( + f'flower_events_total{{task="{task_name}",type="task-received",worker="{worker_name}"}} 1.0' not in metrics + ) + self.assertTrue( + f'flower_events_total{{task="{task_name}",type="task-started",worker="{worker_name}"}} 1.0' not in metrics + ) + self.assertTrue( + f'flower_events_total{{task="{task_name}",type="task-succeeded",worker="{worker_name}"}} 1.0' not in metrics + ) + self.assertTrue(f'flower_worker_online{{worker="{worker_name}"}} 0.0' not in metrics) + self.assertTrue( + f'flower_worker_prefetched_tasks{{task="{task_name}",worker="{worker_name}"}} 0.0' not in metrics + ) + self.assertTrue( + f'flower_task_prefetch_time_seconds{{task="{task_name}",worker="{worker_name}"}} 0.0' not in metrics + ) + class HealthcheckTests(AsyncHTTPTestCase): def setUp(self):