Skip to content

Commit

Permalink
limit tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
denis-olyanyuk committed Aug 1, 2023
1 parent 066cf49 commit f3080a6
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 2 deletions.
4 changes: 3 additions & 1 deletion flower/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ def __init__(self, options=None, capp=None, events=None,
enable_events=self.options.enable_events,
io_loop=self.io_loop,
max_workers_in_memory=self.options.max_workers,
max_tasks_in_memory=self.options.max_tasks)
max_tasks_in_memory=self.options.max_tasks,
limit_tasks_by_type=self.options.limit_tasks_by_type
)
self.started = False

def start(self):
Expand Down
27 changes: 26 additions & 1 deletion flower/events.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import collections
import datetime
import logging
import shelve
import threading
Expand Down Expand Up @@ -115,7 +116,7 @@ class Events(threading.Thread):

# pylint: disable=too-many-arguments
def __init__(self, capp, io_loop, db=None, persistent=False,
enable_events=True, state_save_interval=0,
enable_events=True, state_save_interval=0, limit_tasks_by_type=None,
**kwargs):
threading.Thread.__init__(self)
self.daemon = True
Expand All @@ -128,6 +129,7 @@ def __init__(self, capp, io_loop, db=None, persistent=False,
self.enable_events = enable_events
self.state = None
self.state_save_timer = None
self.limit_tasks_by_type = limit_tasks_by_type

if self.persistent:
logger.debug("Loading state from '%s'...", self.db)
Expand All @@ -140,6 +142,10 @@ def __init__(self, capp, io_loop, db=None, persistent=False,
self.state_save_timer = PeriodicCallback(self.save_state,
state_save_interval)

if self.limit_tasks_by_type:
self.clear_tasks_by_type_timer = PeriodicCallback(self.clear_tasks_by_type,
1000 * 60)

if not self.state:
self.state = EventsState(**kwargs)

Expand All @@ -156,6 +162,10 @@ def start(self):
logger.debug("Starting state save timer...")
self.state_save_timer.start()

if self.clear_tasks_by_type:
logger.debug("Starting clear tasks by type timer...")
self.clear_tasks_by_type_timer.start()

def stop(self):
if self.enable_events:
logger.debug("Stopping enable events timer...")
Expand Down Expand Up @@ -208,3 +218,18 @@ def on_enable_events(self):
def on_event(self, event):
# Call EventsState.event in ioloop thread to avoid synchronization
self.io_loop.add_callback(partial(self.state.event, event))

def clear_tasks_by_type(self):
now = datetime.datetime.now()
for obj in self.limit_tasks_by_type:
timedelta, max_count = obj.get('timedelta'), obj.get('max_count')

# self.state.tasks_by_type are weakSet, so we could get task after deletion.
for count, (uuid, task) in enumerate(self.state._tasks_by_type(obj.get('type')), start=1):
if task.state != 'SUCCESS' or task.state == 'FAILURE' and obj.get('clear_failed'):
continue

if timedelta and task.timestamp <= (now - timedelta).timestamp() or max_count and max_count < count:
del self.state.tasks[task.uuid]
self.state.rebuild_taskheap()

2 changes: 2 additions & 0 deletions flower/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
help="maximum number of workers to keep in memory")
define("max_tasks", type=int, default=100000,
help="maximum number of tasks to keep in memory")
define("limit_tasks_by_type", type=list, default=[],
help="limits number of tasks to keep in memory by type (by max number or timedelta)")
define("db", type=str, default='flower',
help="flower database file")
define("persistent", type=bool, default=False,
Expand Down

0 comments on commit f3080a6

Please sign in to comment.