Documentation for version:

0.33.5

Description

This package intends to offer a priority-based remote task queue solution using Redis as the transport and persistence layer, and JSON for a common interchange format.

Semantically, this module implements a 0/1 or 1+ queue with optional retries. That is, it attempts to execute every task once by default, or >1 manually, or >1 automatically with ‘visibility timeouts’.

If a ‘manual’ retry task raises an exception, it will not automatically retry, but you can manually retry the task and specify the maximum attempts. Similarly, for tasks with visibility timeouts, if the task rasises an exception or doesn’t complete, it will be retried up to the limit of retries provided.

See the Retries section below.

Full documentation is available: https://josiahcarlson.github.io/rpqueue/

Getting started

In order to execute tasks, you must ensure that rpqueue knows about your tasks that can be executed, you must configure rpqueue to connect to your Redis server, then you must start the task execution daemon:

from mytasks import usertasks1, usertasks2, ...
import rpqueue

rpqueue.set_redis_connection_settings(host, port, db)
rpqueue.execute_tasks()

Alternatively, rpqueue offers a command-line interface to do the same, though you must provide the name of a module or package that imports all modules or packages that define tasks that you want to run. For example:

# tasks.py
from tasks import accounting, cleanup, ...
# any other imports or configuration necessary, put them here

# run from the command-line
python -m rpqueue.run --module=tasks --host=... --port=... --db=...

Example uses

Say that you have a module usertasks1 with a task to be executed called echo_to_stdout. Your module may look like the following:

from rpqueue import task

@task
def echo_to_stdout(message):
    print(message)

To call the above task, you would use:

echo_to_stdout.execute(...)
echo_to_stdout.execute(..., delay=delay_in_seconds)

You can also schedule a task to be repeatedly executed with the periodic_task decorator:

@periodic_task(25, queue="low")
def function1():
    # Will be executed every 25 seconds from within the 'low' queue.
    pass

Retries

Tasks may be provided an optional attempts argument, which specifies the total number of times the task will try to be executed before failing. By default, all tasks have attempts set at 1, unless otherwise specified:

@task(attempts=3)
def fail_until_zero(value, **kwargs):
    try:
        if value != 0:
            value -= 1
            raise Exception
    except:
        fail_until_zero.retry(value, **kwargs)
    else:
        print "succeeded"

If passed the value 3, “succeeded” will never be printed. Why? The first try has value=3, attempts=3, and fails. The second pass has value=2, attempts=2, and fails. The third pass has value=1, attempts=1, fails, and the retry returns without retrying. The attempts value is the total number of attempts, including the first, and all retries.

Automatic retries with vis_timeout

Included with rpqueue 0.30.0 or later, you can give tasks (and now data queues) a visibility timeout, which is (per Amazon SQS-style semantics) a time for how long the task has to execute correctly before being automatically re-entered into the queue.:

@task(attempts=20, vis_timeout=5, use_dead=False)
def usually_eventually_succeed(**kwargs):
    # (4/5)**20  is ~ 0.0115, so call chain fails about 1% of the time
    if not random.randrange(5):
        return "done!"

    time.sleep(6) # fail silently

Deadletter task queue

If you would like to know which tasks failed, failed calls can be automatically entered into a deadletter queue.:

@rpqueue.task(attempts=5, vis_timeout=5, use_dead=True)
def fails_to_dead(**kwargs):
    # (4/5)**5  is 0.32768, so call chain fails about 33% of the time
    if not random.randrange(5):
        return "done!"

    time.sleep(6) # fail silently

task_deadletter = rpqueue.Data(rpqueue.DEADLETTER_QUEUE, is_tasks=True)
dead_tasks = task_deadletter.get_data(items=5)

See help(rpqueue.Data) for more.

Waiting for task execution

As of version .19, RPQueue offers the ability to wait on a task until it begins execution:

@task
def my_task(args):
    # do something

executing_task = my_task.execute()
if executing_task.wait(5):
    # task is either being executed, or it is done
else:
    # task has not started execution yet

With the ability to wait for a task to complete, you can have the ability to add deadlines by inserting a call to executing_task.cancel() in the else block above.

Automatically storing results of tasks

As of version .19, RPQueue offers the ability to store the result returned by a task as it completes:

@task(save_results=30)
def task_with_results():
    return 5

etask = task_with_results.execute()
if etask.wait(5):
    print etask.result # should print 5

The save_results argument can be passed to tasks, periodic tasks, and even cron tasks (described below). The value passed will be how long the result is stored in Redis, in seconds. All results must be json-encodable.

Additional features

Crontab

Support for cron_tasks using a crontab-like syntax requires the Python crontab module: http://pypi.python.org/pypi/crontab/ , allowing for:

@cron_task('0 5 tue * *')
def function2():
    # Will be executed every Tuesday at 5AM.
    pass

Data queues

Put data in queues, not tasks. I mean, should have probably been here from the start, but it’s here now.

Convenient features:
  • 1-1000 data items per read, at your discretion

  • vis_timeout

  • attempts

  • use_dead

  • refresh data if you want to keep working on it (we don’t identify the reader, so you should use an explicit lock if you want guaranteed exclusivity)

A few examples:

# 0/1 queue
dq = rpqueue.Data('best_effort')
dq.put_data([item1, item2, item3, ...])
items = dq.get_data(2) # {<uuid>: <item>, ...}

# Up to 5 deliveries, with 5 second delay before re-insertion
dq5 = rpqueue.Data('retry_processing', attempts=5, vis_timeout=5)
dq5.put_data([item1, item2, item3, ...])
items = dq5.get_data(2) # {<uuid>: <item>, ...}
items2 = dq5.get_data(2, vis_timeout=20) # override timeout on read
refreshed = set(dq5.refresh_data(items, vis_timeout=7)) # refresh our lock
items = {k:v for k,v in items if k in refreshed}
dq5.done_data(items)
dq5.done_data(items2)

# Up to 1 try with a 5 second delay before insertion into deadletter queue
dqd = rpqueue.Data('retry_processing', attempts=1, vis_timeout=5, use_dead=True)
dqd.put_data([item1, item2, item3, ...])
items = dqd.get_data(2) # {<uuid>: <item>, ...}
items2 = dqd.get_data(2, vis_timeout=20) # override timeout on read
refreshed = set(dqd.refresh_data(items, vis_timeout=7)) # refresh our lock
items = {k:v for k,v in items if k in refreshed}
dqd.done_data(items)
time.sleep(20)
# items2 are now "dead"
dead = rpqueue.Data(rpqueue.DEADLETTER_QUEUE)
dead_items = dead.get_data(2) # these have a different format, see docs!

A longer example closer to what would be seen in practice:

aggregate_queue = rpqueue.Data("aggregate_stats", vis_timeout=30, use_dead=False)

@rpqueue.periodic_task(60)
def aggregate():
    # If vis_timeout is not provided, will use the queue default.
    # If vis_timeout is <= 0, will act as a 0/1 queue, and later "done data"
    # calling is unnecessary.
    data = aggregate_queue.get_data(items=100, vis_timeout=5)
    # data is a dictionary: {<uuid>: <item>, <uuid>: <item>, ...}
    # do something with data
    done_with = []
    for id, value in data.items():
        # do something with value
        done_with.append(id)

    aggregate_queue.refresh_data(data) # still working!

    # You can pass any iterator that naturally iterates over the uuids you
    # want to be "done" with.
    aggregate_queue.done_data(done_with)
    # also okay:
    # aggregate_queue.done_data(data)
    # aggregate_queue.done_data(tuple(data))
    # aggregate_queue.done_data(list(data))

Sponsors

Don’t like LGPL? Sponsor the project and get almost any license you want.

This project has been partly sponsored by structd.com and hCaptcha.com, both of whom received licenses that match their needs appropriately. Historically, rpqueue has been used to help support the delivery of millions of food orders at chownow.com, billions of ad impressions for system1.com, and billions of captchas for hCaptcha.com.

Thank you to our sponsors and those who have consumed our services.

You are welcome for the good service.

Your company link here.

Rpqueue module contents

rpqueue (Redis Priority Queue)

Originally written July 5, 2011 Copyright 2011-2022 Josiah Carlson Released under the GNU LGPL v2.1 available: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html

Other licenses may be available upon request.

class rpqueue.Data(queue, attempts=1, vis_timeout=0, use_dead=None, is_tasks=False, is_dead=False)

An object that represents an abstract data queue with 0/1 or 1+ removal semantics. Internally works much the same way as tasks, just different keys, and no explicit associated task.

For named queues with arbitrary names, with / without default timeouts and deadletter queues:

# 0/1 with no retries is the default behavior
data_queue = rpqueue.Data("queue_name")

# 0/1 or 1+ queue, with up to 5 times visible, visibility controlled by
# the receiver, no deadletter queue
data_queue = rpqueue.Data("queue_name", attempts=5)
data_queue.get_data() # 0/1 queue
data_queue.get_data(vis_timeout=5) # 1+ queue with vis_timeout > 0

# 0/1 or 1+ queue with default visibility timeout, and automatic
# deadletter queue insertion on failure with 1+ queue operation
data_queue = rpqueue.Data("queue_name", attempts=5, vis_timeout=5, use_dead=True, is_tasks=False)
data_queue.get_data(vis_timeout=0) # 0/1 queue
data_queue.get_data(vis_timeout=5) # 1+ queue with vis_timeout > 0

If you find that you need access to the various deadletter queues (because you are using them and want to clean them out)…

For the Data deadletter Queue:

data_dlq = rpqueue.Data(rpqueue.DEADLETTER_QUEUE)

For the Tasks deadletter Queue, with this interface:

tasks_dlq = rpqueue.Data(rpqueue.DEADLETTER_QUEUE, is_tasks=True)

You can only pull data from these queues in a 0/1 fashion with this

currently_available()

Returns the number of (available, invisible) items in the queue.

delete_all()

Deletes all data in the queue. If you want to delete data from the DEADLETTER_QUEUE, you should:

Data(DEADLETTER_QUEUE).delete_all()

To clear out the Tasks DEADLETTER_QUEUE:

Data(DEADLETTER_QUEUE, is_tasks=True).delete_all()

For all other queues:

Data(<name>, is_tasks=<true or false>).delete_all()
done_data(items)

Call when you are done with data that has a visibility timeout > 0.

get_data(items=1, vis_timeout=None, get_timeout=None)

Gets 1-1000 data items from the queue.

  • items - number of items to get from the queue, 1 to 1000

  • vis_timeout

    • If 0 or None, will pull items in 0/1 fashion.

    • If >0, will pull items in a 1+ fashion with the visibility timeout as provided (you must call done_data(items) to ‘finish’ with queue items, or refresh_data(items, vis_timeout) to refresh the visibility timeout on certain tasks)

    • If <0, will peek items in the queue without pulling them

  • get_timeout - how long to wait for at least 1 item

Note: if you are using the DEADLETTER_QUEUE, all item removals are 0/1

If this is a data queue, will return:

{<uuid>: <data_item>, <uuid>: <data_item>}

If this is a data deadletter queue, will return:

{<uuid>: [<uuid>, '', <your data>, <extra fields>, <insert time>], ...}

If this is a task queue or task deadletter queue, will return:

{<uuid>: [<uuid>, <task_function_name>, <args>, <kwargs>, <scheduled time>], ...}
# to access the function by <task_function_name>, see: rpqueue.REGISTRY

Use:

dq = Data('queue_name', attempts=3)
inserted = dq.put_data([1,2,3,4])
removed = dq.get_data(2, vis_timeout=30) # get the data with a timeout
dq.refresh_data(removed, vis_timeout=45) # to update / refresh the lease
dq.done_data(removed) # when done processing
put_data(data, use_dead=None, is_one=False, chunksize=512, delay=0)

Puts data items into the queue. Data is assumed to be a list of data items, each of which will be assigned a new UUID as an identifier before being placed into queues / mappings as necessary.

  • data - list of json-encodable data items

  • use_dead - if provided and true-ish, will spew attempts exceeded data items into data DEADLETTER_QUEUE (queue has a default, can be overridden here to enable on this inserted data)

  • is_one - if you provide a list to data, but just want that to be one item, you can ensure that with is_one=True

  • chunksize - the number of items to insert per pipeline flush

  • delay - if >0, will delay the inserted items from reading for the provided number of seconds, requires vis_timeout > 0

refresh_data(items, vis_timeout=None)

Refreshes the vis_timeout on your provided item uuids, if available.

Returns: List of item uuids refreshed with the provided vis_timeout.

class rpqueue.EnqueuedTask(name, taskid, queue, task=None)

An object that allows for simple status checks on tasks being executed right now. Returned by task.execute() or call_task().

property args

Get the arguments that were passed to this task.

cancel()

Cancel the task, if it has not been executed yet.

Returns True if the cancellation was successful. Returns False if no information about task cancellation is known. Returns -1 if the task executed, saved a result, and the result was deleted.

property result

Get the value returned by the task. Requires task.save_results > 0.

See :py:function:result(taskid)

property status

Get the status of this task.

wait(timeout=30)

Will wait up to the specified timeout in seconds for the task to start execution, returning True if the task has at least started. Requires task.save_results > 0, and you need to check before the results would have expired for consistent behavior (remember: we can’t check status on data that no longer exists).

exception rpqueue.NoLock

Raised when a lock cannot be acquired

class rpqueue.SimpleLock(conn, name, duration=1)

This lock is dirt simple. You shouldn’t use it for anything unless you want it to fail fast when the lock is already held.

If Redis had a “setnxex key value ttl” that set the ‘key’ to ‘value’ if it wasn’t already set, and also set the expiration to ‘ttl’, this lock wouldn’t exist.

(Redis now has this functionality, but we need to support legacy)

refresh()

Refreshes a lock

class rpqueue.Task(queue, name, function, delay=None, never_skip=False, attempts=1, retry_delay=30, low_delay_okay=False, save_results=0, vis_timeout=0, use_dead=None)

An object that represents a task to be executed. These will replace functions when any of the @task, @periodic_task, or @cron_task decorators have been applied to a function.

execute(*args, **kwargs)

Invoke this task with the given arguments inside a task processor.

Optional arguments:

  • delay - how long to delay the execution of this task for, in seconds

  • taskid - override the taskid on this call, can be used to choose a destination key for the results (be careful!)

  • _queue - override the queue to be used in this call, which can be used to alter priorities of individual calls when coupled with queue priorities

next(now=None)

Calculates the next run time of recurring tasks.

retry(*args, **kwargs)

Invoke this task as a retry with the given arguments inside a task processor.

To retry, the task must accept _attempts as a parameter, either directly or via **kwargs.

rpqueue.call_task(name, queue, *args, **kwargs)

Calls a task with the given name, using the provided queue if the queue was otherwise not known. Returns an EnqueuedTask, which you can use to track task progress. Used via:

t = rpqueue.call_task('dotted.task_name', None or QUEUE, *args, **kw)
t.wait(1)
t.status
t.result

If you want to see a ValueError if your task is not known, pass _verify_call=True as a keyword argument.

rpqueue.clear_queue(queue, conn=None, delete=False, is_data=False)

Delete all items in a given queue, optionally deleting the queue itself.

rpqueue.cron_task(crontab, queue=b'default', never_skip=False, attempts=1, retry_delay=30, save_results=0, name=None)

Decorator to allow the automatic repeated execution of a function on a schedule with a crontab syntax. Crontab syntax provided by the ‘crontab’ Python module: http://pypi.python.org/pypi/crontab/ Which must also be installed to use this decorator.

Similar in use to the @periodic_task decorator:

@cron_task('* * * * *')
def function1():
    'will be executed every minute'

@cron_task('*/5 * * * *', queue='bar')
def function2():
    'will be executed every 5 minutes from within the 'bar' queue.'

If never_skip is provided and is considered True, it will attempt to never skip a scheduled task, just like the @periodic_task decorator.

Please see the crontab package documentation or the crontab Wikipedia page for more information on the meaning of the schedule.

rpqueue.execute_tasks(queues=None, threads_per_process=1, processes=1, wait_per_thread=1, module=None)

Will execute tasks from the (optionally) provided queues until the first value in the global SHOULD_QUIT is considered false.

rpqueue.flush_tasks(complete=False, force_delayed=False, wait_on_hidden=False, timeout=None, log_every=0)

Wait for all tasks currently in the queue to start executing, optionally waiting for their completion.

Returns 3 lists: done, started, waiting. These represent the task ids that are in the done, started, and waiting states.

If complete the argument is true-ish, we will wait for output / status returned by the task’s explicit completion before we consider the task complete. If not true-ish, we’ll consider a task complete if it has started at all (so started is empty in this case, as all are considered completed).

If force_delayed is true-ish, will force all delayed and periodic tasks to execute immediately, though we do not wait for, nor return the status of automatically-executed periodic tasks in our lists (only tasks explicitly executed with `TASKX.execute(...)`). Also implies wait_on_hidden.

If wait_on_hidden is true-ish, will also wait on tasks that have already been started, if we are waiting for completion.

If timeout is >0, will wait up to that many seconds for the flush to finish. If timeout is None or <0, we will wait up to 2**32 seconds. If the timeout expires, we will see items in waiting. If the timeout does not expire, and we complete without raising an exception, there should be no tasks in the waiting state.

If log_every is >0, will log our progress at the INFO level every log_every seconds until we are finished / timed out.

rpqueue.get_task(name)

Get a task dynamically by name. The task’s module must be loaded first.

rpqueue.known_queues(conn=None)

Get a list of all known queues.

rpqueue.new_rpqueue(name, pfix=None)

Creates a new rpqueue state for running separate rpqueue task systems in the same codebase. This simplifies configuration for multiple Redis servers, and allows for using the same Redis server without using queue names (provide a ‘prefix’ for all RPqueue-related keys).

If an rpqueue with the same name already exists, and has the same prefix, return that rpqueue module.

rpqueue.periodic_task(run_every, queue=b'default', never_skip=False, attempts=1, retry_delay=30, low_delay_okay=False, save_results=0, name=None)

Decorator to allow the automatic repeated execution of a function every run_every seconds, which can be provided via int, long, float, or via a datetime.timedelta instance. Run from the context of the given queue. Used via:

@periodic_task(25)
def function1():
    'will be executed every 25 seconds from within the 'default' queue.'

@periodic_task(timedelta(minutes=5), queue='bar')
def function2():
    'will be executed every 5 minutes from within the 'bar' queue.'

If never_skip is provided and is considered True like:

@periodic_task(60, never_skip=True)
def function3():
    pass

… and the function was scheduled to be executed at 4:15PM and 5 seconds, but actually executed at 4:25PM and 13 seconds, then prior to execution, it will be rescheduled to execute at 4:16PM and 5 seconds, which is 60 seconds after the earlier scheduled time (it never skips a scheduled time). If you instead had the periodic task defined as:

@periodic_task(60, never_skip=False)
def function4():
    pass

… and the function was scheduled to be executed at 4:15PM and 5 seconds, but actually executed at 4:25PM and 13 seconds, then prior to execution, it will be rescheduled to execute at 4:26PM and 13 seconds, which is 60 seconds after the current time (it skips any missed scheduled time).

rpqueue.queue_sizes(conn=None)

Return a list of all known queues, their sizes, and the number of items that have been seen in the queue.

rpqueue.result(taskid, conn=None)

Get the results of remotely executing tasks from one or more taskids.

If a task is configured with save_results>0, any remote execution of that task will save its return value to expire after that many seconds.

These two ways of fetching the result are equivalent:

>>> remote = task.execute()
>>> # some concurrent logic
>>> result = remote.result

>>> taskid = task.execute().taskid
>>> # some concurrent logic
>>> result = rpqueue.result(taskid)

Note

If you have more than one taskid whose results you want to fetch, check out rpqueue.results() below.

rpqueue.results(taskids, conn=None)

Get the results of remotely executing tasks from one or more taskids.

If a task is configured with save_results>0, any remote execution of that task will save its return value to expire after that many seconds.

These two ways of fetching the result are equivalent:

>>> remote = [t.execute() for t in tasks]
>>> # some concurrent logic
>>> results = [r.result for r in remote]

>>> taskids = [t.execute().taskid for t in tasks]
>>> # some concurrent logic
>>> results = rpqueue.results(taskids)
rpqueue.script_load(script)

Borrowed and updated from my book, Redis in Action: https://github.com/josiahcarlson/redis-in-action/blob/master/python/ch11_listing_source.py

rpqueue.set_key_prefix(pfix)

Run before starting any tasks or task runners; will set the prefix on keys in Redis, allowing for multiple parallel rpqueue executions in the same Redis without worrying about queue names.

rpqueue.set_priority(queue, qpri, conn=None)

Set the priority of a queue. Lower values means higher priorities. Queues with priorities come before queues without priorities.

rpqueue.set_redis_connection(conn)

Sets the global pooled connection to the provided connection object. Useful for environments where additional pooling or other options are desired or required.

rpqueue.set_redis_connection_settings(host='localhost', port=6379, db=0, password=None, socket_timeout=30, unix_socket_path=None, ssl=False, ssl_ca_certs=None)

Sets the global redis connection settings for the queue. If not called before use, will connect to localhost:6379 with no password and db 0.

rpqueue.task(args=None, queue=None, attempts=1, retry_delay=30, save_results=0, vis_timeout=0, use_dead=False, **kwargs)

Decorator to allow the transparent execution of a function as a task.

Used like:

@task(queue='bar')
def function1(arg1, arg2, ...):
    'will execute from within the 'bar' queue.'

@task
def function2(arg1, arg2, ...):
    'will execute from within the 'default' queue.'