From 0cf411066ce5cb1ce36122aad512ea3f7f54ebca Mon Sep 17 00:00:00 2001 From: Mher Movsisyan Date: Sat, 17 Jun 2023 18:44:18 -0400 Subject: [PATCH 1/3] Set release version --- flower/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flower/__init__.py b/flower/__init__.py index 24c8e6f7..2c96ce3c 100644 --- a/flower/__init__.py +++ b/flower/__init__.py @@ -1,2 +1,2 @@ VERSION = (2, 0, 0) -__version__ = '.'.join(map(str, VERSION)) + '-dev' +__version__ = '.'.join(map(str, VERSION)) From cf39575472e84648aa2d00c73826a60b1f6d828e Mon Sep 17 00:00:00 2001 From: Mher Movsisyan Date: Sun, 13 Aug 2023 10:38:29 -0400 Subject: [PATCH 2/3] Set relaes version --- flower/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flower/__init__.py b/flower/__init__.py index 2c96ce3c..1d024efd 100644 --- a/flower/__init__.py +++ b/flower/__init__.py @@ -1,2 +1,2 @@ -VERSION = (2, 0, 0) +VERSION = (2, 0, 1) __version__ = '.'.join(map(str, VERSION)) From 18121a17584331b9ad393039ce2132d6ab217ed9 Mon Sep 17 00:00:00 2001 From: Rafal Semik Date: Fri, 14 Apr 2023 11:29:18 +0200 Subject: [PATCH 3/3] Expose queue length as a prometheus metric Implements #1173 * Quick fix synchronous redis calls. Redis broker calls were synchronous, so blocking the main thread. This moves them to ThreadPoolExecutor * Update workers list regularly for up-to-date metrics Previously self.workers was only updated only on UI events. This is no longer fine as we use the data to produce up-to-date metrics --- flower/api/tasks.py | 12 +-------- flower/app.py | 53 +++++++++++++++++++++++++++++++++++++--- flower/events.py | 2 ++ flower/utils/broker.py | 42 ++++++++++++++++++++++++++++++- flower/views/__init__.py | 16 ++++++------ 5 files changed, 100 insertions(+), 25 deletions(-) diff --git a/flower/api/tasks.py b/flower/api/tasks.py index 730c290e..b8943c70 100644 --- a/flower/api/tasks.py +++ b/flower/api/tasks.py @@ -390,17 +390,7 @@ async def get(self): :statuscode 401: unauthorized request :statuscode 503: result backend is not configured """ - app = self.application - - http_api = None - if app.transport == 'amqp' and app.options.broker_api: - http_api = app.options.broker_api - - broker = Broker(app.capp.connection().as_uri(include_password=True), - http_api=http_api, broker_options=self.capp.conf.broker_transport_options, - broker_use_ssl=self.capp.conf.broker_use_ssl) - - queues = await broker.queues(self.get_active_queue_names()) + queues = await self.get_active_queue_lengths() self.write({'active_queues': queues}) diff --git a/flower/app.py b/flower/app.py index 3427e098..336d2c46 100644 --- a/flower/app.py +++ b/flower/app.py @@ -8,16 +8,20 @@ from tornado import ioloop from tornado.httpserver import HTTPServer +from tornado.ioloop import PeriodicCallback, IOLoop from tornado.web import url from .urls import handlers as default_handlers -from .events import Events +from .events import Events, get_prometheus_metrics from .inspector import Inspector from .options import default_options - +from .utils.broker import get_active_queue_lengths logger = logging.getLogger(__name__) - +# TODO: does this need to be configuration from options? +BROKER_METRICS_UPDATE_INTERVAL_SECONDS = 10 +# Main dashboard view is updated regardless of this, because it subscribes to live events from celery. +WORKER_DETAILS_UPDATE_INTERVAL = 120 if sys.version_info[0] == 3 and sys.version_info[1] >= 8 and sys.platform.startswith('win'): import asyncio @@ -79,7 +83,9 @@ def start(self): server.add_socket(socket) self.started = True - self.update_workers() + self.io_loop.spawn_callback(self.update_broker_metrics) + # otherwise self.workers are only updated on UI events and metrics get outdated after some time + self.io_loop.spawn_callback(self.update_worker_details) self.io_loop.start() def stop(self): @@ -101,3 +107,42 @@ def workers(self): def update_workers(self, workername=None): return self.inspector.inspect(workername) + + async def update_broker_metrics(self): + logger.debug("Updating broker metrics.") + + def is_worker_alive(worker_name): + worker = self.events.state.workers.data.get(worker_name) + if not worker: + return None + return worker.alive + while True: + next_call = tornado.gen.sleep(BROKER_METRICS_UPDATE_INTERVAL_SECONDS) + try: + active_queues = await get_active_queue_lengths(self) + metrics = get_prometheus_metrics() + # clear old data to not leave metrics for queues no longer active + metrics.queue_online_workers.clear() + metrics.queue_length.clear() + for queue_entry in active_queues: + queue = queue_entry["name"] + metrics.queue_length.labels(queue).set(queue_entry["messages"]) + nr_of_workers = sum( + 1 for name, data in self.workers.items() if + is_worker_alive(name) and any(q["name"] == queue for q in data.get("active_queues", [])) + ) + metrics.queue_online_workers.labels(queue).set(nr_of_workers) + except Exception as e: + logger.warning("Updating broker metrics failed with %s", repr(e)) + else: + logger.debug("Done updating metrics.") + await next_call + + async def update_worker_details(self): + while True: + next_call = tornado.gen.sleep(WORKER_DETAILS_UPDATE_INTERVAL) + try: + self.update_workers() + except Exception as e: + logger.warning("Failed to update workers list from celery %s", repr(e)) + await next_call diff --git a/flower/events.py b/flower/events.py index cd15d7a2..88c4ef08 100644 --- a/flower/events.py +++ b/flower/events.py @@ -52,6 +52,8 @@ def __init__(self): "Number of tasks currently executing at a worker", ['worker'] ) + self.queue_length = Gauge('flower_broker_queue_length', "Broker queue length", ['queue']) + self.queue_online_workers = Gauge('flower_broker_queue_online_workers', "Workers online per queue", ['queue']) class EventsState(State): diff --git a/flower/utils/broker.py b/flower/utils/broker.py index 51255c4a..85c8436e 100644 --- a/flower/utils/broker.py +++ b/flower/utils/broker.py @@ -89,8 +89,9 @@ class RedisBase(BrokerBase): DEFAULT_SEP = '\x06\x16' DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9] - def __init__(self, broker_url, *_, **kwargs): + def __init__(self, broker_url, io_loop=None, *args, **kwargs): super().__init__(broker_url) + self.io_loop = io_loop or ioloop.IOLoop.instance() self.redis = None if not redis: @@ -109,6 +110,11 @@ def _q_for_pri(self, queue, pri): return '{0}{1}{2}'.format(*((queue, self.sep, pri) if pri else (queue, '', ''))) async def queues(self, names): + # TODO: use redis.asyncio instead of synchronous client with ThreadPoolExecutor + queue_sizes = await self.io_loop.run_in_executor(None, self._queues_synchronous, names) + return queue_sizes + + def _queues_synchronous(self, names): queue_stats = [] for name in names: priority_names = [self.broker_prefix + self._q_for_pri( @@ -244,6 +250,40 @@ async def queues(self, names): raise NotImplementedError +def get_active_queue_names(application): + queues = set([]) + for _, info in application.workers.items(): + for q in info.get('active_queues', []): + queues.add(q['name']) + return queues + + +async def get_active_queue_lengths(application): + app = application + capp = application.capp + broker_options = capp.conf.BROKER_TRANSPORT_OPTIONS + + http_api = None + if app.transport == 'amqp' and app.options.broker_api: + http_api = app.options.broker_api + + broker_use_ssl = None + if capp.conf.BROKER_USE_SSL: + broker_use_ssl = capp.conf.BROKER_USE_SSL + + broker = Broker(app.capp.connection().as_uri(include_password=True), + http_api=http_api, broker_options=broker_options, broker_use_ssl=broker_use_ssl) + + queue_names = get_active_queue_names(application) + + if not queue_names: + queue_names = set([capp.conf.CELERY_DEFAULT_QUEUE]) | \ + set([q.name for q in capp.conf.CELERY_QUEUES or [] if q.name]) + + queues = await broker.queues(sorted(queue_names)) + return queues + + async def main(): broker_url = sys.argv[1] if len(sys.argv) > 1 else 'amqp://' queue_name = sys.argv[2] if len(sys.argv) > 2 else 'celery' diff --git a/flower/views/__init__.py b/flower/views/__init__.py index fbd80b01..31994da2 100644 --- a/flower/views/__init__.py +++ b/flower/views/__init__.py @@ -10,6 +10,7 @@ import tornado from ..utils import template, bugreport, strtobool +from ..utils.broker import get_active_queue_names, get_active_queue_lengths logger = logging.getLogger(__name__) @@ -125,12 +126,9 @@ def format_task(self, task): return task def get_active_queue_names(self): - queues = set([]) - for _, info in self.application.workers.items(): - for queue in info.get('active_queues', []): - queues.add(queue['name']) - - if not queues: - queues = set([self.capp.conf.task_default_queue]) |\ - {q.name for q in self.capp.conf.task_queues or [] if q.name} - return sorted(queues) + return get_active_queue_names(self.application) + + + async def get_active_queue_lengths(self): + queues = await get_active_queue_lengths(self.application) + return queues