Skip to content

Releases: closeio/tasktiger

v0.16

17 Nov 14:36
754f3e9
Compare
Choose a tag to compare

Other changes

  • Handle hard timeout in parent process (f3b3e24)
  • Add queue name to logs (a090d00)

v0.15

21 Apr 06:15
5b54a12
Compare
Choose a tag to compare

Other changes

  • Populate Task.ts field in Task.from_id function (019bf18)
  • Add TaskTiger.would_process_configured_queue() function (217152d)

v0.14

16 Apr 07:46
3ba32e1
Compare
Choose a tag to compare

Other changes

  • Add Task.time_last_queued property getter (6d2285d)

v0.13

29 Mar 20:44
v0.13
3821283
Compare
Choose a tag to compare

Breaking changes

Changing the locking mechanism

This new version of TaskTiger uses a new locking mechanism: the Lock provided by redis-py. It is incompatible with the old locking mechanism we were using, and several core functions in TaskTiger depends on locking to work correctly, so this warrants a careful migration process.

You can perform this migration in two ways: via a live migration, or via a downtime migration. After the migration, there's an optional cleanup step.

The live migration
  1. Update your environment to TaskTiger 0.12 as usual.

  2. Deploy TaskTiger as it is in the commit SHA cf600449d594ac22e6d8393dc1009a84b52be0c1. In pip parlance, it would be:

    -e git+ssh://[email protected]/closeio/tasktiger.git@cf600449d594ac22e6d8393dc1009a84b52be0c1#egg=tasktiger
    
  3. Wait at least 2-3 minutes with it running in production in all your TaskTiger workers. This is to give time for the old locks to expire, and after that the new locks will be fully in effect.

  4. Deploy TaskTiger 0.13. Your system is migrated.

The downtime migration
  1. Update your environment to TaskTiger 0.12 as usual.
  2. Scale your TaskTiger workers down to zero.
  3. Deploy TaskTiger 0.13. Your system is migrated.
The cleanup step

Run the script in scripts/redis_scan.py to delete the old lock keys from your Redis instance:

./scripts/redis_scan.py --host HOST --port PORT --db DB --print --match "t:lock:*" --ttl 300

The flags:

  • --host: The Redis host. Required.
  • --port: The port the Redis instance is listening on. Defaults to 6379.
  • --db: The Redis database. Defaults to 0.
  • --print: If you want the script to print which keys it is modifying, use this.
  • --match: What pattern to look for. If you didn't change the default prefix TaskTiger uses for keys, this will be t:lock:*, otherwise it will be PREFIX:lock:*. By default, scans all keys.
  • --ttl: A TTL to set. A TTL of 300 will give you time to undo if you want to halt the migration for whatever reason. (Just call this command again with --ttl -1.) By default, does not change keys' TTLs.

Plus, there is:

  • --file: A log file that will receive the changes made. Defaults to redis-stats.log in the current working directory.
  • --delay: How long, in seconds, to wait between SCAN iterations. Defaults to 0.1.

v0.12

25 Mar 17:55
v0.12
97d7642
Compare
Choose a tag to compare

Breaking changes

  • Drop support for redis-py 2 (#183)

Other changes

  • Make the TaskTiger instance available for the task via global state (#170)
  • Support for custom task runners (#175)
  • Add ability to configure a poll- vs push-method for task runners to discover new tasks (#176)
  • unique_key specifies the list of kwargs to use to construct the unique key (#180)

Bugfixes

  • Ensure task exists in the given queue when retrieving it (#184)
  • Clear retried executions from successful periodic tasks (#188)

v0.10.1

29 Oct 19:27
5caa497
Compare
Choose a tag to compare

Major Changes:

  • Breaking Change: Unique tasks ids have changed slightly. See more details below.
  • TaskTiger.purge_errored_tasks has been added
  • Breaking Change: Internal refactoring moved a lot of code out of __init__.py. This should be transparent unless calling code uses internal variables/constants. If this affects you, it should be obvious as only imports should break. The fix is just to change the import path.

Breaking Change: Unique Task Ids Changed

This breaking change only affects periodic tasks, and scheduled unique tasks. See #146 for details.

Unique tasks rely on their ID to enforce uniqueness. The id is a hash of function_name, args, and kwargs. There were some cases where creating unique scheduled tasks manually using Task objects or manually .delay()-ing a periodic task would inconsistently use None for args/kwargs instead of [] and {}. With this release, args and kwargs will always be normalized to []/{} no matter how the Task was created. Existing scheduled unique tasks will have to be migrated to use a consistent id format.

Here's a script that migrates task ids:

#!/usr/bin/python
# -*- coding: utf-8 -*-

"""Re-schedule tasks that have malformed ids

Be sure to ``pip install click tasktiger limitlion tqdm`` as well.

Recommended steps:
    * stop tasktiger workers
    * upgrade tasktiger version
    * run this script (without --apply) and check the logs to make sure it's
      doing what you'd expect
    * run this script (with --apply), which corrects all unique scheduled task
      ids
    * start tasktiger workers
"""

from __future__ import absolute_import, print_function, unicode_literals

import datetime
import json

import click
import limitlion
import tqdm
from redis import Redis
from tasktiger import Task, TaskTiger
from tasktiger._internal import SCHEDULED, gen_unique_id, queue_matches

# Connect to Redis (defaults to localhost)
redis_connection = Redis(decode_responses=True)

# Initialize tasktiger
tiger = TaskTiger(redis_connection)

# Initialize limitlion (optional, see throttling comment below)
limitlion.throttle_configure(redis_connection)


class JSONLineLog(object):
    """Safe and convenient json logger

    Usage::

        with JSONLineLog("my_file.json") as logger:
            logger.write({'key': 'this is serialized as json'})
    """

    def __init__(self, filename):
        self.filename = filename

    def __enter__(self):
        self.file = open(self.filename, 'a')
        self.file.__enter__()
        return self

    def write(self, log_entry):
        print(json.dumps(log_entry), file=self.file)

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.file.__exit__(exc_type, exc_val, exc_tb)


@click.command()
@click.option(
    "--apply",
    is_flag=True,
    help="Actually make these changes. This is not a drill!",
    default=False,
)
@click.option("--limit", help="Limit to processing N tasks", default=None)
@click.option(
    "--only-queues",
    help="Only process these queues (comma delimited)",
    default=None,
)
@click.option(
    "--exclude-queues",
    help="Exclude these queues from processing (comma delimited)",
    default=None,
)
def main(apply=False, limit=None, only_queues=None, exclude_queues=None):
    # warn if not applying change
    if not apply:
        print('*** NO CHANGES WILL BE MADE')
        print('To apply this migration run with --apply.')
    else:
        print('*** CHANGES WILL BE APPLIED')
    print()

    # If we're actually running this on a production redis instance, we
    # probably don't want to iterate overall the keys as fast as we can.
    # limitlion is a simple token-bucket throttle that gets the job done.
    # This step is optional but recommended. If you don't want to use
    # limitlion, maybe do something simple like ``lambda: time.sleep(.1)``
    throttle = limitlion.throttle_wait('migrations', rps=10)

    # actually do the migration
    with JSONLineLog("task_id_migration.json") as migration_log:
        print("Writing log to {}".format(migration_log.filename))

        if limit:
            limit = int(limit)
        if only_queues:
            only_queues = only_queues.split(",")
        if exclude_queues:
            exclude_queues = exclude_queues.split(",")

        def unique_scheduled_tasks():
            """Yields all unique scheduled tasks"""
            queues_with_scheduled_tasks = tiger.connection.smembers(
                tiger._key(SCHEDULED)
            )
            for queue in queues_with_scheduled_tasks:
                if not queue_matches(
                    queue,
                    only_queues=only_queues,
                    exclude_queues=exclude_queues,
                ):
                    continue

                skip = 0
                total_tasks = None
                task_limit = 5000
                while total_tasks is None or skip < total_tasks:
                    throttle()
                    total_tasks, tasks = Task.tasks_from_queue(
                        tiger, queue, SCHEDULED, skip=skip, limit=task_limit
                    )
                    for task in tasks:
                        if task.unique:
                            yield task
                    skip += task_limit

        # note that tqdm is completely optional here, but shows a nice progress
        # bar.
        total_with_wrong_id = 0
        total_processed = 0
        for idx, task in enumerate(tqdm.tqdm(unique_scheduled_tasks())):
            # generate the new correct id
            correct_id = gen_unique_id(
                task.serialized_func, task.args, task.kwargs
            )
            if task.id != correct_id:
                total_with_wrong_id += 1
                when = datetime.datetime.fromtimestamp(
                    tiger.connection.zscore(
                        tiger._key(SCHEDULED, task.queue), task.id
                    )
                )
                migration_log.write(
                    {
                        "incorrect_task_id": task.id,
                        "correct_task_id": correct_id,
                        "serialized_func": task.serialized_func,
                        "queue": task.queue,
                        "ts": datetime.datetime.utcnow(),
                        "apply": apply,
                        "scheduled_at": when,
                    }
                )
                # Reschedule the task with the correct id. There's a 10 second
                # buffer here in case any tasktigers are still running so we're
                # not racing with them.
                if apply and (
                    when - datetime.datetime.utcnow()
                ) > datetime.timedelta(seconds=10):
                    new_task = task.clone()
                    new_task._data["id"] = correct_id
                    new_task.delay(when=when)
                    task.cancel()
                    throttle()

            total_processed = idx + 1
            if limit and total_processed >= limit:
                break

        print(
            'Processed {} tasks, found {} with incorrect ids'.format(
                total_processed, total_with_wrong_id
            )
        )

    print("Done")


if __name__ == "__main__":
    main()

Minor Changes

  • The codebase is now formatted with black
  • Some additional testing infrastructure has been added to make local development/testing easier

v0.9.5

30 Jul 11:20
6368eab
Compare
Choose a tag to compare

Major changes:

  • Support setting max queue size with max_queue_size parameter and raise QueueFullException when it is reached.
  • Refactor single worker queues to use new Semaphore lock that allows setting the maximum number of workers that should process a particular queue.
  • Added CHILD_CONTEXT_MANAGERS configuration setting that allows specifying context managers that will be invoked before/after the forked child process runs.
  • Added --no-store-tracebacks option to not include tracebacks in execution histories. This can be used to reduce Redis storage requirements.