Releases: closeio/tasktiger
v0.16
v0.15
v0.14
v0.13
Breaking changes
Changing the locking mechanism
This new version of TaskTiger uses a new locking mechanism: the Lock
provided by redis-py. It is incompatible with the old locking mechanism we were using, and several core functions in TaskTiger depends on locking to work correctly, so this warrants a careful migration process.
You can perform this migration in two ways: via a live migration, or via a downtime migration. After the migration, there's an optional cleanup step.
The live migration
-
Update your environment to TaskTiger 0.12 as usual.
-
Deploy TaskTiger as it is in the commit SHA
cf600449d594ac22e6d8393dc1009a84b52be0c1
. Inpip
parlance, it would be:-e git+ssh://[email protected]/closeio/tasktiger.git@cf600449d594ac22e6d8393dc1009a84b52be0c1#egg=tasktiger
-
Wait at least 2-3 minutes with it running in production in all your TaskTiger workers. This is to give time for the old locks to expire, and after that the new locks will be fully in effect.
-
Deploy TaskTiger 0.13. Your system is migrated.
The downtime migration
- Update your environment to TaskTiger 0.12 as usual.
- Scale your TaskTiger workers down to zero.
- Deploy TaskTiger 0.13. Your system is migrated.
The cleanup step
Run the script in scripts/redis_scan.py
to delete the old lock keys from your Redis instance:
./scripts/redis_scan.py --host HOST --port PORT --db DB --print --match "t:lock:*" --ttl 300
The flags:
--host
: The Redis host. Required.--port
: The port the Redis instance is listening on. Defaults to6379
.--db
: The Redis database. Defaults to0
.--print
: If you want the script to print which keys it is modifying, use this.--match
: What pattern to look for. If you didn't change the default prefix TaskTiger uses for keys, this will bet:lock:*
, otherwise it will bePREFIX:lock:*
. By default, scans all keys.--ttl
: A TTL to set. A TTL of 300 will give you time to undo if you want to halt the migration for whatever reason. (Just call this command again with--ttl -1
.) By default, does not change keys' TTLs.
Plus, there is:
--file
: A log file that will receive the changes made. Defaults toredis-stats.log
in the current working directory.--delay
: How long, in seconds, to wait betweenSCAN
iterations. Defaults to0.1
.
v0.12
Breaking changes
- Drop support for redis-py 2 (#183)
Other changes
- Make the
TaskTiger
instance available for the task via global state (#170) - Support for custom task runners (#175)
- Add ability to configure a poll- vs push-method for task runners to discover new tasks (#176)
unique_key
specifies the list of kwargs to use to construct the unique key (#180)
Bugfixes
v0.10.1
Major Changes:
- Breaking Change: Unique tasks ids have changed slightly. See more details below.
TaskTiger.purge_errored_tasks
has been added- Breaking Change: Internal refactoring moved a lot of code out of
__init__.py
. This should be transparent unless calling code uses internal variables/constants. If this affects you, it should be obvious as only imports should break. The fix is just to change the import path.
Breaking Change: Unique Task Ids Changed
This breaking change only affects periodic tasks, and scheduled unique tasks. See #146 for details.
Unique tasks rely on their ID to enforce uniqueness. The id is a hash of function_name
, args
, and kwargs
. There were some cases where creating unique scheduled tasks manually using Task
objects or manually .delay()
-ing a periodic task would inconsistently use None
for args
/kwargs
instead of []
and {}
. With this release, args
and kwargs
will always be normalized to []
/{}
no matter how the Task
was created. Existing scheduled unique tasks will have to be migrated to use a consistent id format.
Here's a script that migrates task ids:
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""Re-schedule tasks that have malformed ids
Be sure to ``pip install click tasktiger limitlion tqdm`` as well.
Recommended steps:
* stop tasktiger workers
* upgrade tasktiger version
* run this script (without --apply) and check the logs to make sure it's
doing what you'd expect
* run this script (with --apply), which corrects all unique scheduled task
ids
* start tasktiger workers
"""
from __future__ import absolute_import, print_function, unicode_literals
import datetime
import json
import click
import limitlion
import tqdm
from redis import Redis
from tasktiger import Task, TaskTiger
from tasktiger._internal import SCHEDULED, gen_unique_id, queue_matches
# Connect to Redis (defaults to localhost)
redis_connection = Redis(decode_responses=True)
# Initialize tasktiger
tiger = TaskTiger(redis_connection)
# Initialize limitlion (optional, see throttling comment below)
limitlion.throttle_configure(redis_connection)
class JSONLineLog(object):
"""Safe and convenient json logger
Usage::
with JSONLineLog("my_file.json") as logger:
logger.write({'key': 'this is serialized as json'})
"""
def __init__(self, filename):
self.filename = filename
def __enter__(self):
self.file = open(self.filename, 'a')
self.file.__enter__()
return self
def write(self, log_entry):
print(json.dumps(log_entry), file=self.file)
def __exit__(self, exc_type, exc_val, exc_tb):
self.file.__exit__(exc_type, exc_val, exc_tb)
@click.command()
@click.option(
"--apply",
is_flag=True,
help="Actually make these changes. This is not a drill!",
default=False,
)
@click.option("--limit", help="Limit to processing N tasks", default=None)
@click.option(
"--only-queues",
help="Only process these queues (comma delimited)",
default=None,
)
@click.option(
"--exclude-queues",
help="Exclude these queues from processing (comma delimited)",
default=None,
)
def main(apply=False, limit=None, only_queues=None, exclude_queues=None):
# warn if not applying change
if not apply:
print('*** NO CHANGES WILL BE MADE')
print('To apply this migration run with --apply.')
else:
print('*** CHANGES WILL BE APPLIED')
print()
# If we're actually running this on a production redis instance, we
# probably don't want to iterate overall the keys as fast as we can.
# limitlion is a simple token-bucket throttle that gets the job done.
# This step is optional but recommended. If you don't want to use
# limitlion, maybe do something simple like ``lambda: time.sleep(.1)``
throttle = limitlion.throttle_wait('migrations', rps=10)
# actually do the migration
with JSONLineLog("task_id_migration.json") as migration_log:
print("Writing log to {}".format(migration_log.filename))
if limit:
limit = int(limit)
if only_queues:
only_queues = only_queues.split(",")
if exclude_queues:
exclude_queues = exclude_queues.split(",")
def unique_scheduled_tasks():
"""Yields all unique scheduled tasks"""
queues_with_scheduled_tasks = tiger.connection.smembers(
tiger._key(SCHEDULED)
)
for queue in queues_with_scheduled_tasks:
if not queue_matches(
queue,
only_queues=only_queues,
exclude_queues=exclude_queues,
):
continue
skip = 0
total_tasks = None
task_limit = 5000
while total_tasks is None or skip < total_tasks:
throttle()
total_tasks, tasks = Task.tasks_from_queue(
tiger, queue, SCHEDULED, skip=skip, limit=task_limit
)
for task in tasks:
if task.unique:
yield task
skip += task_limit
# note that tqdm is completely optional here, but shows a nice progress
# bar.
total_with_wrong_id = 0
total_processed = 0
for idx, task in enumerate(tqdm.tqdm(unique_scheduled_tasks())):
# generate the new correct id
correct_id = gen_unique_id(
task.serialized_func, task.args, task.kwargs
)
if task.id != correct_id:
total_with_wrong_id += 1
when = datetime.datetime.fromtimestamp(
tiger.connection.zscore(
tiger._key(SCHEDULED, task.queue), task.id
)
)
migration_log.write(
{
"incorrect_task_id": task.id,
"correct_task_id": correct_id,
"serialized_func": task.serialized_func,
"queue": task.queue,
"ts": datetime.datetime.utcnow(),
"apply": apply,
"scheduled_at": when,
}
)
# Reschedule the task with the correct id. There's a 10 second
# buffer here in case any tasktigers are still running so we're
# not racing with them.
if apply and (
when - datetime.datetime.utcnow()
) > datetime.timedelta(seconds=10):
new_task = task.clone()
new_task._data["id"] = correct_id
new_task.delay(when=when)
task.cancel()
throttle()
total_processed = idx + 1
if limit and total_processed >= limit:
break
print(
'Processed {} tasks, found {} with incorrect ids'.format(
total_processed, total_with_wrong_id
)
)
print("Done")
if __name__ == "__main__":
main()
Minor Changes
- The codebase is now formatted with
black
- Some additional testing infrastructure has been added to make local development/testing easier
v0.9.5
Major changes:
- Support setting max queue size with
max_queue_size
parameter and raiseQueueFullException
when it is reached. - Refactor single worker queues to use new Semaphore lock that allows setting the maximum number of workers that should process a particular queue.
- Added
CHILD_CONTEXT_MANAGERS
configuration setting that allows specifying context managers that will be invoked before/after the forked child process runs. - Added
--no-store-tracebacks
option to not include tracebacks in execution histories. This can be used to reduce Redis storage requirements.