Documentation for version:
0.33.5
Description¶
This package intends to offer a priority-based remote task queue solution using Redis as the transport and persistence layer, and JSON for a common interchange format.
Semantically, this module implements a 0/1 or 1+ queue with optional retries. That is, it attempts to execute every task once by default, or >1 manually, or >1 automatically with ‘visibility timeouts’.
If a ‘manual’ retry task raises an exception, it will not automatically retry, but you can manually retry the task and specify the maximum attempts. Similarly, for tasks with visibility timeouts, if the task rasises an exception or doesn’t complete, it will be retried up to the limit of retries provided.
See the Retries section below.
Full documentation is available: https://josiahcarlson.github.io/rpqueue/
Getting started¶
In order to execute tasks, you must ensure that rpqueue knows about your tasks that can be executed, you must configure rpqueue to connect to your Redis server, then you must start the task execution daemon:
from mytasks import usertasks1, usertasks2, ...
import rpqueue
rpqueue.set_redis_connection_settings(host, port, db)
rpqueue.execute_tasks()
Alternatively, rpqueue offers a command-line interface to do the same, though you must provide the name of a module or package that imports all modules or packages that define tasks that you want to run. For example:
# tasks.py
from tasks import accounting, cleanup, ...
# any other imports or configuration necessary, put them here
# run from the command-line
python -m rpqueue.run --module=tasks --host=... --port=... --db=...
Example uses¶
Say that you have a module usertasks1
with a task to be executed called
echo_to_stdout
. Your module may look like the following:
from rpqueue import task
@task
def echo_to_stdout(message):
print(message)
To call the above task, you would use:
echo_to_stdout.execute(...)
echo_to_stdout.execute(..., delay=delay_in_seconds)
You can also schedule a task to be repeatedly executed with the
periodic_task
decorator:
@periodic_task(25, queue="low")
def function1():
# Will be executed every 25 seconds from within the 'low' queue.
pass
Retries¶
Tasks may be provided an optional attempts
argument, which specifies the
total number of times the task will try to be executed before failing. By
default, all tasks have attempts
set at 1, unless otherwise specified:
@task(attempts=3)
def fail_until_zero(value, **kwargs):
try:
if value != 0:
value -= 1
raise Exception
except:
fail_until_zero.retry(value, **kwargs)
else:
print "succeeded"
If passed the value 3
, “succeeded” will never be printed. Why? The first
try has value=3, attempts=3, and fails. The second pass has value=2,
attempts=2, and fails. The third pass has value=1, attempts=1, fails, and the
retry returns without retrying. The attempts
value is the total number of
attempts, including the first, and all retries.
Automatic retries with vis_timeout¶
Included with rpqueue 0.30.0 or later, you can give tasks (and now data queues) a visibility timeout, which is (per Amazon SQS-style semantics) a time for how long the task has to execute correctly before being automatically re-entered into the queue.:
@task(attempts=20, vis_timeout=5, use_dead=False)
def usually_eventually_succeed(**kwargs):
# (4/5)**20 is ~ 0.0115, so call chain fails about 1% of the time
if not random.randrange(5):
return "done!"
time.sleep(6) # fail silently
Deadletter task queue¶
If you would like to know which tasks failed, failed calls can be automatically entered into a deadletter queue.:
@rpqueue.task(attempts=5, vis_timeout=5, use_dead=True)
def fails_to_dead(**kwargs):
# (4/5)**5 is 0.32768, so call chain fails about 33% of the time
if not random.randrange(5):
return "done!"
time.sleep(6) # fail silently
task_deadletter = rpqueue.Data(rpqueue.DEADLETTER_QUEUE, is_tasks=True)
dead_tasks = task_deadletter.get_data(items=5)
See help(rpqueue.Data)
for more.
Waiting for task execution¶
As of version .19, RPQueue offers the ability to wait on a task until it begins execution:
@task
def my_task(args):
# do something
executing_task = my_task.execute()
if executing_task.wait(5):
# task is either being executed, or it is done
else:
# task has not started execution yet
With the ability to wait for a task to complete, you can have the ability to
add deadlines by inserting a call to executing_task.cancel()
in the else
block above.
Automatically storing results of tasks¶
As of version .19, RPQueue offers the ability to store the result returned by a task as it completes:
@task(save_results=30)
def task_with_results():
return 5
etask = task_with_results.execute()
if etask.wait(5):
print etask.result # should print 5
The save_results
argument can be passed to tasks, periodic tasks, and even
cron tasks (described below). The value passed will be how long the result is
stored in Redis, in seconds. All results must be json-encodable.
Additional features¶
Crontab¶
Support for cron_tasks using a crontab-like syntax requires the Python crontab module: http://pypi.python.org/pypi/crontab/ , allowing for:
@cron_task('0 5 tue * *')
def function2():
# Will be executed every Tuesday at 5AM.
pass
Data queues¶
Put data in queues, not tasks. I mean, should have probably been here from the start, but it’s here now.
- Convenient features:
1-1000 data items per read, at your discretion
vis_timeout
attempts
use_dead
refresh data if you want to keep working on it (we don’t identify the reader, so you should use an explicit lock if you want guaranteed exclusivity)
A few examples:
# 0/1 queue
dq = rpqueue.Data('best_effort')
dq.put_data([item1, item2, item3, ...])
items = dq.get_data(2) # {<uuid>: <item>, ...}
# Up to 5 deliveries, with 5 second delay before re-insertion
dq5 = rpqueue.Data('retry_processing', attempts=5, vis_timeout=5)
dq5.put_data([item1, item2, item3, ...])
items = dq5.get_data(2) # {<uuid>: <item>, ...}
items2 = dq5.get_data(2, vis_timeout=20) # override timeout on read
refreshed = set(dq5.refresh_data(items, vis_timeout=7)) # refresh our lock
items = {k:v for k,v in items if k in refreshed}
dq5.done_data(items)
dq5.done_data(items2)
# Up to 1 try with a 5 second delay before insertion into deadletter queue
dqd = rpqueue.Data('retry_processing', attempts=1, vis_timeout=5, use_dead=True)
dqd.put_data([item1, item2, item3, ...])
items = dqd.get_data(2) # {<uuid>: <item>, ...}
items2 = dqd.get_data(2, vis_timeout=20) # override timeout on read
refreshed = set(dqd.refresh_data(items, vis_timeout=7)) # refresh our lock
items = {k:v for k,v in items if k in refreshed}
dqd.done_data(items)
time.sleep(20)
# items2 are now "dead"
dead = rpqueue.Data(rpqueue.DEADLETTER_QUEUE)
dead_items = dead.get_data(2) # these have a different format, see docs!
A longer example closer to what would be seen in practice:
aggregate_queue = rpqueue.Data("aggregate_stats", vis_timeout=30, use_dead=False)
@rpqueue.periodic_task(60)
def aggregate():
# If vis_timeout is not provided, will use the queue default.
# If vis_timeout is <= 0, will act as a 0/1 queue, and later "done data"
# calling is unnecessary.
data = aggregate_queue.get_data(items=100, vis_timeout=5)
# data is a dictionary: {<uuid>: <item>, <uuid>: <item>, ...}
# do something with data
done_with = []
for id, value in data.items():
# do something with value
done_with.append(id)
aggregate_queue.refresh_data(data) # still working!
# You can pass any iterator that naturally iterates over the uuids you
# want to be "done" with.
aggregate_queue.done_data(done_with)
# also okay:
# aggregate_queue.done_data(data)
# aggregate_queue.done_data(tuple(data))
# aggregate_queue.done_data(list(data))
Sponsors¶
Don’t like LGPL? Sponsor the project and get almost any license you want.
This project has been partly sponsored by structd.com and hCaptcha.com, both of whom received licenses that match their needs appropriately. Historically, rpqueue has been used to help support the delivery of millions of food orders at chownow.com, billions of ad impressions for system1.com, and billions of captchas for hCaptcha.com.
Thank you to our sponsors and those who have consumed our services.
You are welcome for the good service.
Your company link here.
Rpqueue module contents¶
rpqueue (Redis Priority Queue)
Originally written July 5, 2011 Copyright 2011-2022 Josiah Carlson Released under the GNU LGPL v2.1 available: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html
Other licenses may be available upon request.
- class rpqueue.Data(queue, attempts=1, vis_timeout=0, use_dead=None, is_tasks=False, is_dead=False)¶
An object that represents an abstract data queue with 0/1 or 1+ removal semantics. Internally works much the same way as tasks, just different keys, and no explicit associated task.
For named queues with arbitrary names, with / without default timeouts and deadletter queues:
# 0/1 with no retries is the default behavior data_queue = rpqueue.Data("queue_name") # 0/1 or 1+ queue, with up to 5 times visible, visibility controlled by # the receiver, no deadletter queue data_queue = rpqueue.Data("queue_name", attempts=5) data_queue.get_data() # 0/1 queue data_queue.get_data(vis_timeout=5) # 1+ queue with vis_timeout > 0 # 0/1 or 1+ queue with default visibility timeout, and automatic # deadletter queue insertion on failure with 1+ queue operation data_queue = rpqueue.Data("queue_name", attempts=5, vis_timeout=5, use_dead=True, is_tasks=False) data_queue.get_data(vis_timeout=0) # 0/1 queue data_queue.get_data(vis_timeout=5) # 1+ queue with vis_timeout > 0
If you find that you need access to the various deadletter queues (because you are using them and want to clean them out)…
For the Data deadletter Queue:
data_dlq = rpqueue.Data(rpqueue.DEADLETTER_QUEUE)
For the Tasks deadletter Queue, with this interface:
tasks_dlq = rpqueue.Data(rpqueue.DEADLETTER_QUEUE, is_tasks=True)
You can only pull data from these queues in a 0/1 fashion with this
- currently_available()¶
Returns the number of (available, invisible) items in the queue.
- delete_all()¶
Deletes all data in the queue. If you want to delete data from the DEADLETTER_QUEUE, you should:
Data(DEADLETTER_QUEUE).delete_all()
To clear out the Tasks DEADLETTER_QUEUE:
Data(DEADLETTER_QUEUE, is_tasks=True).delete_all()
For all other queues:
Data(<name>, is_tasks=<true or false>).delete_all()
- done_data(items)¶
Call when you are done with data that has a visibility timeout > 0.
- get_data(items=1, vis_timeout=None, get_timeout=None)¶
Gets 1-1000 data items from the queue.
items
- number of items to get from the queue, 1 to 1000vis_timeout
If 0 or None, will pull items in 0/1 fashion.
If >0, will pull items in a 1+ fashion with the visibility timeout as provided (you must call
done_data(items)
to ‘finish’ with queue items, or refresh_data(items, vis_timeout) to refresh the visibility timeout on certain tasks)If <0, will peek items in the queue without pulling them
get_timeout - how long to wait for at least 1 item
Note: if you are using the DEADLETTER_QUEUE, all item removals are 0/1
If this is a data queue, will return:
{<uuid>: <data_item>, <uuid>: <data_item>}
If this is a data deadletter queue, will return:
{<uuid>: [<uuid>, '', <your data>, <extra fields>, <insert time>], ...}
If this is a task queue or task deadletter queue, will return:
{<uuid>: [<uuid>, <task_function_name>, <args>, <kwargs>, <scheduled time>], ...} # to access the function by <task_function_name>, see: rpqueue.REGISTRY
Use:
dq = Data('queue_name', attempts=3) inserted = dq.put_data([1,2,3,4]) removed = dq.get_data(2, vis_timeout=30) # get the data with a timeout dq.refresh_data(removed, vis_timeout=45) # to update / refresh the lease dq.done_data(removed) # when done processing
- put_data(data, use_dead=None, is_one=False, chunksize=512, delay=0)¶
Puts data items into the queue. Data is assumed to be a list of data items, each of which will be assigned a new UUID as an identifier before being placed into queues / mappings as necessary.
data
- list of json-encodable data itemsuse_dead
- if provided and true-ish, will spew attempts exceeded data items into data DEADLETTER_QUEUE (queue has a default, can be overridden here to enable on this inserted data)is_one
- if you provide a list todata
, but just want that to be one item, you can ensure that withis_one=True
chunksize
- the number of items to insert per pipeline flushdelay
- if >0, will delay the inserted items from reading for the provided number of seconds, requiresvis_timeout
> 0
- refresh_data(items, vis_timeout=None)¶
Refreshes the vis_timeout on your provided item uuids, if available.
Returns: List of item uuids refreshed with the provided vis_timeout.
- class rpqueue.EnqueuedTask(name, taskid, queue, task=None)¶
An object that allows for simple status checks on tasks being executed right now. Returned by task.execute() or call_task().
- property args¶
Get the arguments that were passed to this task.
- cancel()¶
Cancel the task, if it has not been executed yet.
Returns True if the cancellation was successful. Returns False if no information about task cancellation is known. Returns -1 if the task executed, saved a result, and the result was deleted.
- property result¶
Get the value returned by the task. Requires task.save_results > 0.
See :py:function:
result(taskid)
- property status¶
Get the status of this task.
- wait(timeout=30)¶
Will wait up to the specified timeout in seconds for the task to start execution, returning True if the task has at least started. Requires task.save_results > 0, and you need to check before the results would have expired for consistent behavior (remember: we can’t check status on data that no longer exists).
- exception rpqueue.NoLock¶
Raised when a lock cannot be acquired
- class rpqueue.SimpleLock(conn, name, duration=1)¶
This lock is dirt simple. You shouldn’t use it for anything unless you want it to fail fast when the lock is already held.
If Redis had a “setnxex key value ttl” that set the ‘key’ to ‘value’ if it wasn’t already set, and also set the expiration to ‘ttl’, this lock wouldn’t exist.
(Redis now has this functionality, but we need to support legacy)
- refresh()¶
Refreshes a lock
- class rpqueue.Task(queue, name, function, delay=None, never_skip=False, attempts=1, retry_delay=30, low_delay_okay=False, save_results=0, vis_timeout=0, use_dead=None)¶
An object that represents a task to be executed. These will replace functions when any of the
@task
,@periodic_task
, or@cron_task
decorators have been applied to a function.- execute(*args, **kwargs)¶
Invoke this task with the given arguments inside a task processor.
Optional arguments:
delay - how long to delay the execution of this task for, in seconds
taskid - override the taskid on this call, can be used to choose a destination key for the results (be careful!)
_queue - override the queue to be used in this call, which can be used to alter priorities of individual calls when coupled with queue priorities
- next(now=None)¶
Calculates the next run time of recurring tasks.
- retry(*args, **kwargs)¶
Invoke this task as a retry with the given arguments inside a task processor.
To retry, the task must accept
_attempts
as a parameter, either directly or via**kwargs
.
- rpqueue.call_task(name, queue, *args, **kwargs)¶
Calls a task with the given name, using the provided queue if the queue was otherwise not known. Returns an
EnqueuedTask
, which you can use to track task progress. Used via:t = rpqueue.call_task('dotted.task_name', None or QUEUE, *args, **kw) t.wait(1) t.status t.result
If you want to see a
ValueError
if your task is not known, pass_verify_call=True
as a keyword argument.
- rpqueue.clear_queue(queue, conn=None, delete=False, is_data=False)¶
Delete all items in a given queue, optionally deleting the queue itself.
- rpqueue.cron_task(crontab, queue=b'default', never_skip=False, attempts=1, retry_delay=30, save_results=0, name=None)¶
Decorator to allow the automatic repeated execution of a function on a schedule with a crontab syntax. Crontab syntax provided by the ‘crontab’ Python module: http://pypi.python.org/pypi/crontab/ Which must also be installed to use this decorator.
Similar in use to the @periodic_task decorator:
@cron_task('* * * * *') def function1(): 'will be executed every minute' @cron_task('*/5 * * * *', queue='bar') def function2(): 'will be executed every 5 minutes from within the 'bar' queue.'
If never_skip is provided and is considered True, it will attempt to never skip a scheduled task, just like the @periodic_task decorator.
Please see the crontab package documentation or the crontab Wikipedia page for more information on the meaning of the schedule.
- rpqueue.execute_tasks(queues=None, threads_per_process=1, processes=1, wait_per_thread=1, module=None)¶
Will execute tasks from the (optionally) provided queues until the first value in the global SHOULD_QUIT is considered false.
- rpqueue.flush_tasks(complete=False, force_delayed=False, wait_on_hidden=False, timeout=None, log_every=0)¶
Wait for all tasks currently in the queue to start executing, optionally waiting for their completion.
Returns 3 lists:
done, started, waiting
. These represent the task ids that are in thedone
,started
, andwaiting
states.If
complete
the argument is true-ish, we will wait for output / status returned by the task’s explicit completion before we consider the task complete. If not true-ish, we’ll consider a task complete if it has started at all (so started is empty in this case, as all are considered completed).If
force_delayed
is true-ish, will force all delayed and periodic tasks to execute immediately, though we do not wait for, nor return the status of automatically-executed periodic tasks in our lists (only tasks explicitly executed with`TASKX.execute(...)`
). Also implieswait_on_hidden
.If
wait_on_hidden
is true-ish, will also wait on tasks that have already been started, if we are waiting for completion.If
timeout
is >0, will wait up to that many seconds for the flush to finish. If timeout is None or <0, we will wait up to 2**32 seconds. If the timeout expires, we will see items in waiting. If the timeout does not expire, and we complete without raising an exception, there should be no tasks in the waiting state.If
log_every
is >0, will log our progress at the INFO level everylog_every
seconds until we are finished / timed out.
- rpqueue.get_task(name)¶
Get a task dynamically by name. The task’s module must be loaded first.
- rpqueue.known_queues(conn=None)¶
Get a list of all known queues.
- rpqueue.new_rpqueue(name, pfix=None)¶
Creates a new rpqueue state for running separate rpqueue task systems in the same codebase. This simplifies configuration for multiple Redis servers, and allows for using the same Redis server without using queue names (provide a ‘prefix’ for all RPqueue-related keys).
If an rpqueue with the same name already exists, and has the same prefix, return that rpqueue module.
- rpqueue.periodic_task(run_every, queue=b'default', never_skip=False, attempts=1, retry_delay=30, low_delay_okay=False, save_results=0, name=None)¶
Decorator to allow the automatic repeated execution of a function every run_every seconds, which can be provided via int, long, float, or via a datetime.timedelta instance. Run from the context of the given queue. Used via:
@periodic_task(25) def function1(): 'will be executed every 25 seconds from within the 'default' queue.' @periodic_task(timedelta(minutes=5), queue='bar') def function2(): 'will be executed every 5 minutes from within the 'bar' queue.'
If never_skip is provided and is considered True like:
@periodic_task(60, never_skip=True) def function3(): pass
… and the function was scheduled to be executed at 4:15PM and 5 seconds, but actually executed at 4:25PM and 13 seconds, then prior to execution, it will be rescheduled to execute at 4:16PM and 5 seconds, which is 60 seconds after the earlier scheduled time (it never skips a scheduled time). If you instead had the periodic task defined as:
@periodic_task(60, never_skip=False) def function4(): pass
… and the function was scheduled to be executed at 4:15PM and 5 seconds, but actually executed at 4:25PM and 13 seconds, then prior to execution, it will be rescheduled to execute at 4:26PM and 13 seconds, which is 60 seconds after the current time (it skips any missed scheduled time).
- rpqueue.queue_sizes(conn=None)¶
Return a list of all known queues, their sizes, and the number of items that have been seen in the queue.
- rpqueue.result(taskid, conn=None)¶
Get the results of remotely executing tasks from one or more taskids.
If a task is configured with save_results>0, any remote execution of that task will save its return value to expire after that many seconds.
These two ways of fetching the result are equivalent:
>>> remote = task.execute() >>> # some concurrent logic >>> result = remote.result >>> taskid = task.execute().taskid >>> # some concurrent logic >>> result = rpqueue.result(taskid)
Note
If you have more than one taskid whose results you want to fetch, check out
rpqueue.results()
below.
- rpqueue.results(taskids, conn=None)¶
Get the results of remotely executing tasks from one or more taskids.
If a task is configured with save_results>0, any remote execution of that task will save its return value to expire after that many seconds.
These two ways of fetching the result are equivalent:
>>> remote = [t.execute() for t in tasks] >>> # some concurrent logic >>> results = [r.result for r in remote] >>> taskids = [t.execute().taskid for t in tasks] >>> # some concurrent logic >>> results = rpqueue.results(taskids)
- rpqueue.script_load(script)¶
Borrowed and updated from my book, Redis in Action: https://github.com/josiahcarlson/redis-in-action/blob/master/python/ch11_listing_source.py
- rpqueue.set_key_prefix(pfix)¶
Run before starting any tasks or task runners; will set the prefix on keys in Redis, allowing for multiple parallel rpqueue executions in the same Redis without worrying about queue names.
- rpqueue.set_priority(queue, qpri, conn=None)¶
Set the priority of a queue. Lower values means higher priorities. Queues with priorities come before queues without priorities.
- rpqueue.set_redis_connection(conn)¶
Sets the global pooled connection to the provided connection object. Useful for environments where additional pooling or other options are desired or required.
- rpqueue.set_redis_connection_settings(host='localhost', port=6379, db=0, password=None, socket_timeout=30, unix_socket_path=None, ssl=False, ssl_ca_certs=None)¶
Sets the global redis connection settings for the queue. If not called before use, will connect to localhost:6379 with no password and db 0.
- rpqueue.task(args=None, queue=None, attempts=1, retry_delay=30, save_results=0, vis_timeout=0, use_dead=False, **kwargs)¶
Decorator to allow the transparent execution of a function as a task.
Used like:
@task(queue='bar') def function1(arg1, arg2, ...): 'will execute from within the 'bar' queue.' @task def function2(arg1, arg2, ...): 'will execute from within the 'default' queue.'