Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose celery queue metrics #1275

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions flower/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
VERSION = (2, 0, 0)
__version__ = '.'.join(map(str, VERSION)) + '-dev'
VERSION = (2, 0, 1)
__version__ = '.'.join(map(str, VERSION))
12 changes: 1 addition & 11 deletions flower/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,17 +390,7 @@ async def get(self):
:statuscode 401: unauthorized request
:statuscode 503: result backend is not configured
"""
app = self.application

http_api = None
if app.transport == 'amqp' and app.options.broker_api:
http_api = app.options.broker_api

broker = Broker(app.capp.connection().as_uri(include_password=True),
http_api=http_api, broker_options=self.capp.conf.broker_transport_options,
broker_use_ssl=self.capp.conf.broker_use_ssl)

queues = await broker.queues(self.get_active_queue_names())
queues = await self.get_active_queue_lengths()
self.write({'active_queues': queues})


Expand Down
53 changes: 49 additions & 4 deletions flower/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@

from tornado import ioloop
from tornado.httpserver import HTTPServer
from tornado.ioloop import PeriodicCallback, IOLoop
from tornado.web import url

from .urls import handlers as default_handlers
from .events import Events
from .events import Events, get_prometheus_metrics
from .inspector import Inspector
from .options import default_options

from .utils.broker import get_active_queue_lengths

logger = logging.getLogger(__name__)

# TODO: does this need to be configuration from options?
BROKER_METRICS_UPDATE_INTERVAL_SECONDS = 10
# Main dashboard view is updated regardless of this, because it subscribes to live events from celery.
WORKER_DETAILS_UPDATE_INTERVAL = 120

if sys.version_info[0] == 3 and sys.version_info[1] >= 8 and sys.platform.startswith('win'):
import asyncio
Expand Down Expand Up @@ -79,7 +83,9 @@ def start(self):
server.add_socket(socket)

self.started = True
self.update_workers()
self.io_loop.spawn_callback(self.update_broker_metrics)
# otherwise self.workers are only updated on UI events and metrics get outdated after some time
self.io_loop.spawn_callback(self.update_worker_details)
self.io_loop.start()

def stop(self):
Expand All @@ -101,3 +107,42 @@ def workers(self):

def update_workers(self, workername=None):
return self.inspector.inspect(workername)

async def update_broker_metrics(self):
logger.debug("Updating broker metrics.")

def is_worker_alive(worker_name):
worker = self.events.state.workers.data.get(worker_name)
if not worker:
return None
return worker.alive
while True:
next_call = tornado.gen.sleep(BROKER_METRICS_UPDATE_INTERVAL_SECONDS)
try:
active_queues = await get_active_queue_lengths(self)
metrics = get_prometheus_metrics()
# clear old data to not leave metrics for queues no longer active
metrics.queue_online_workers.clear()
metrics.queue_length.clear()
for queue_entry in active_queues:
queue = queue_entry["name"]
metrics.queue_length.labels(queue).set(queue_entry["messages"])
nr_of_workers = sum(
1 for name, data in self.workers.items() if
is_worker_alive(name) and any(q["name"] == queue for q in data.get("active_queues", []))
)
metrics.queue_online_workers.labels(queue).set(nr_of_workers)
except Exception as e:
logger.warning("Updating broker metrics failed with %s", repr(e))
else:
logger.debug("Done updating metrics.")
await next_call

async def update_worker_details(self):
while True:
next_call = tornado.gen.sleep(WORKER_DETAILS_UPDATE_INTERVAL)
try:
self.update_workers()
except Exception as e:
logger.warning("Failed to update workers list from celery %s", repr(e))
await next_call
2 changes: 2 additions & 0 deletions flower/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def __init__(self):
"Number of tasks currently executing at a worker",
['worker']
)
self.queue_length = Gauge('flower_broker_queue_length', "Broker queue length", ['queue'])
self.queue_online_workers = Gauge('flower_broker_queue_online_workers', "Workers online per queue", ['queue'])


class EventsState(State):
Expand Down
42 changes: 41 additions & 1 deletion flower/utils/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ class RedisBase(BrokerBase):
DEFAULT_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]

def __init__(self, broker_url, *_, **kwargs):
def __init__(self, broker_url, io_loop=None, *args, **kwargs):
super().__init__(broker_url)
self.io_loop = io_loop or ioloop.IOLoop.instance()
self.redis = None

if not redis:
Expand All @@ -109,6 +110,11 @@ def _q_for_pri(self, queue, pri):
return '{0}{1}{2}'.format(*((queue, self.sep, pri) if pri else (queue, '', '')))

async def queues(self, names):
# TODO: use redis.asyncio instead of synchronous client with ThreadPoolExecutor
queue_sizes = await self.io_loop.run_in_executor(None, self._queues_synchronous, names)
return queue_sizes

def _queues_synchronous(self, names):
queue_stats = []
for name in names:
priority_names = [self.broker_prefix + self._q_for_pri(
Expand Down Expand Up @@ -244,6 +250,40 @@ async def queues(self, names):
raise NotImplementedError


def get_active_queue_names(application):
queues = set([])
for _, info in application.workers.items():
for q in info.get('active_queues', []):
queues.add(q['name'])
return queues


async def get_active_queue_lengths(application):
app = application
capp = application.capp
broker_options = capp.conf.BROKER_TRANSPORT_OPTIONS

http_api = None
if app.transport == 'amqp' and app.options.broker_api:
http_api = app.options.broker_api

broker_use_ssl = None
if capp.conf.BROKER_USE_SSL:
broker_use_ssl = capp.conf.BROKER_USE_SSL

broker = Broker(app.capp.connection().as_uri(include_password=True),
http_api=http_api, broker_options=broker_options, broker_use_ssl=broker_use_ssl)

queue_names = get_active_queue_names(application)

if not queue_names:
queue_names = set([capp.conf.CELERY_DEFAULT_QUEUE]) | \
set([q.name for q in capp.conf.CELERY_QUEUES or [] if q.name])

queues = await broker.queues(sorted(queue_names))
return queues


async def main():
broker_url = sys.argv[1] if len(sys.argv) > 1 else 'amqp://'
queue_name = sys.argv[2] if len(sys.argv) > 2 else 'celery'
Expand Down
16 changes: 7 additions & 9 deletions flower/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import tornado

from ..utils import template, bugreport, strtobool
from ..utils.broker import get_active_queue_names, get_active_queue_lengths

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -125,12 +126,9 @@ def format_task(self, task):
return task

def get_active_queue_names(self):
queues = set([])
for _, info in self.application.workers.items():
for queue in info.get('active_queues', []):
queues.add(queue['name'])

if not queues:
queues = set([self.capp.conf.task_default_queue]) |\
{q.name for q in self.capp.conf.task_queues or [] if q.name}
return sorted(queues)
return get_active_queue_names(self.application)


async def get_active_queue_lengths(self):
queues = await get_active_queue_lengths(self.application)
return queues