
- Home
- Docs
- Patterns
- Contributing
- Chat
- Queues
- Workers
- Results
- Jobs
- Scheduling Jobs
- Job Registries
- Monitoring
- Connections
- Testing
A job is a Python object, representing a function that is invokedasynchronously in a worker (background) process. Any Python function can beinvoked asynchronously, by simply pushing a reference to the function and itsarguments onto a queue. This is called enqueueing.
Enqueueing Jobs
To put jobs on queues, first declare a function:
import requestsdef count_words_at_url(url): resp = requests.get(url) return len(resp.text.split())
Noticed anything? There’s nothing special about this function! Any Pythonfunction call can be put on an RQ queue.
To put this potentially expensive word count for a given URL in the background,simply do this:
from rq import Queuefrom redis import Redisfrom somewhere import count_words_at_urlimport time# Tell RQ what Redis connection to useredis_conn = Redis()q = Queue(connection=redis_conn) # no args implies the default queue# Delay execution of count_words_at_url('http://nvie.com')job = q.enqueue(count_words_at_url, 'http://nvie.com')print(job.result) # => None# Now, wait a while, until the worker is finishedtime.sleep(2)print(job.result) # => 889
If you want to put the work on a specific queue, simply specify its name:
q = Queue('low', connection=redis_conn)q.enqueue(count_words_at_url, 'http://nvie.com')
Notice the Queue('low')
in the example above? You can use any queue name, soyou can quite flexibly distribute work to your own desire. A common namingpattern is to name your queues after priorities (e.g. high
, medium
,low
).
In addition, you can add a few options to modify the behaviour of the queuedjob. By default, these are popped out of the kwargs that will be passed to thejob function.
job_timeout
specifies the maximum runtime of the job before it’s interrupted and marked asfailed
. Its default unit is second and it can be an integer or a string representing an integer(e.g.2
,'2'
). Furthermore, it can be a string with specify unit including hour, minute, second(e.g.'1h'
,'3m'
,'5s'
).result_ttl
specifies how long (in seconds) successful jobs and theirresults are kept. Expired jobs will be automatically deleted. Defaults to 500 seconds.ttl
specifies the maximum queued time (in seconds) of the job before it’s discarded.This argument defaults toNone
(infinite TTL).failure_ttl
specifies how long failed jobs are kept (defaults to 1 year)depends_on
specifies another job (or list of jobs) that must complete before thisjob will be queued.job_id
allows you to manually specify this job’sjob_id
at_front
will place the job at the front of the queue, instead of thebackdescription
to add additional description to enqueued jobs.on_success
allows you to run a function after a job completes successfullyon_failure
allows you to run a function after a job failsargs
andkwargs
: use these to explicitly pass arguments and keyword to theunderlying job function. This is useful if your function happens to haveconflicting argument names with RQ, for exampledescription
orttl
.
In the last case, if you want to pass description
and ttl
keyword argumentsto your job and not to RQ’s enqueue function, this is what you do:
q = Queue('low', connection=redis_conn)q.enqueue(count_words_at_url, ttl=30, # This ttl will be used by RQ args=('http://nvie.com',), kwargs={ 'description': 'Function description', # This is passed on to count_words_at_url 'ttl': 15 # This is passed on to count_words_at_url function })
For cases where the web process doesn’t have access to the source code runningin the worker (i.e. code base X invokes a delayed function from code base Y),you can pass the function as a string reference, too.
q = Queue('low', connection=redis_conn)q.enqueue('my_package.my_module.my_func', 3, 4)
Bulk Job Enqueueing
New in version 1.9.0.
You can also enqueue multiple jobs in bulk with queue.enqueue_many()
and Queue.prepare_data()
:
jobs = q.enqueue_many( [ Queue.prepare_data(count_words_at_url, 'http://nvie.com', job_id='my_job_id'), Queue.prepare_data(count_words_at_url, 'http://nvie.com', job_id='my_other_job_id'), ])
which will enqueue all the jobs in a single redis pipeline
which you can optionally pass in yourself:
with q.connection.pipeline() as pipe: jobs = q.enqueue_many( [ Queue.prepare_data(count_words_at_url, 'http://nvie.com', job_id='my_job_id'), Queue.prepare_data(count_words_at_url, 'http://nvie.com', job_id='my_other_job_id'), ], pipeline=pipe ) pipe.execute()
Queue.prepare_data
accepts all arguments that Queue.parse_args
does EXCEPT for depends_on
,which is not supported at this time, so dependencies will be up to you to setup.
Job dependencies
RQ allows you to chain the execution of multiple jobs.To execute a job that depends on another job, use the depends_on
argument:
q = Queue('low', connection=my_redis_conn)report_job = q.enqueue(generate_report)q.enqueue(send_report, depends_on=report_job)
Specifying multiple dependencies are also supported:
queue = Queue('low', connection=redis)foo_job = queue.enqueue(foo)bar_job = queue.enqueue(bar)baz_job = queue.enqueue(baz, depends_on=[foo_job, bar_job])
The ability to handle job dependencies allows you to split a big job intoseveral smaller ones. A job that is dependent on another is enqueued only whenits dependency finishes successfully.
Job Callbacks
New in version 1.9.0.
If you want to execute a function whenever a job completes or fails, RQ provideson_success
and on_failure
callbacks.
queue.enqueue(say_hello, on_success=report_success, on_failure=report_failure)
Success Callback
Success callbacks must be a function that accepts job
, connection
and result
arguments.Your function should also accept *args
and **kwargs
so your application doesn’t breakwhen additional parameters are added.
def report_success(job, connection, result, *args, **kwargs): pass
Success callbacks are executed after job execution is complete, before dependents are enqueued.If an exception happens when your callback is executed, job status will be set to FAILED
and dependents won’t be enqueued.
Callbacks are limited to 60 seconds of execution time. If you want to execute a long running job,consider using RQ’s job dependency feature instead.
Failure Callbacks
Failure callbacks are functions that accept job
, connection
, type
, value
and traceback
arguments. type
, value
and traceback
values returned by sys.exc_info(), which is the exception raised when executing your job.
def report_failure(job, connection, type, value, traceback): pass
Failure callbacks are limited to 60 seconds of execution time.
CLI Enqueueing
New in version 1.10.0.
If you prefer enqueueing jobs via the command line interface or do not use pythonyou can use this.
Usage:
rq enqueue [OPTIONS] FUNCTION [ARGUMENTS]
Options:
-q, --queue [value]
The name of the queue.--timeout [value]
Specifies the maximum runtime of the job before it is interrupted and marked as failed.--result-ttl [value]
Specifies how long successful jobs and their results are kept.--ttl [value]
Specifies the maximum queued time of the job before it is discarded.--failure-ttl [value]
Specifies how long failed jobs are kept.--description [value]
Additional description of the job--depends-on [value]
Specifies another job id that must complete before this job will be queued.--job-id [value]
The id of this job--at-front
Will place the job at the front of the queue, instead of the end--retry-max [value]
Maximum number of retries--retry-interval [value]
Interval between retries in seconds--schedule-in [value]
Delay until the function is enqueued (e.g. 10s, 5m, 2d).--schedule-at [value]
Schedule job to be enqueued at a certain time formatted in ISO 8601 without timezone (e.g. 2021-05-27T21:45:00).--quiet
Only logs errors.
Function:
There are two options:
- Execute a function: dot-separated string of package, module and function (Just like passing a string to
queue.enqueue()
). - Execute a python file: dot-separated pathname of the file. Because it is technically an import
__name__ == '__main__'
will not work.
Arguments:
plain text | json | literal-eval | |
---|---|---|---|
keyword | [key]=[value] | [key]:=[value] | [key]%=[value] |
no keyword | [value] | :[value] | %[value] |
Where [key]
is the keyword and [value]
is the value which is parsed with the correspondingparsing method.
If the first character of [value]
is @
the subsequent path will be read.
Examples:
rq enqueue path.to.func abc
->queue.enqueue(path.to.func, 'abc')
rq enqueue path.to.func abc=def
->queue.enqueue(path.to.func, abc='def')
rq enqueue path.to.func ':{"json": "abc"}'
->queue.enqueue(path.to.func, {'json': 'abc'})
rq enqueue path.to.func 'key:={"json": "abc"}'
->queue.enqueue(path.to.func, key={'json': 'abc'})
rq enqueue path.to.func '%1, 2'
->queue.enqueue(path.to.func, (1, 2))
rq enqueue path.to.func '%None'
->queue.enqueue(path.to.func, None)
rq enqueue path.to.func '%True'
->queue.enqueue(path.to.func, True)
rq enqueue path.to.func 'key%=(1, 2)'
->queue.enqueue(path.to.func, key=(1, 2))
rq enqueue path.to.func 'key%={"foo": True}'
->queue.enqueue(path.to.func, key={"foo": True})
rq enqueue path.to.func @path/to/file
->queue.enqueue(path.to.func, open('path/to/file', 'r').read())
rq enqueue path.to.func key=@path/to/file
->queue.enqueue(path.to.func, key=open('path/to/file', 'r').read())
rq enqueue path.to.func :@path/to/file.json
->queue.enqueue(path.to.func, json.loads(open('path/to/file.json', 'r').read()))
rq enqueue path.to.func key:=@path/to/file.json
->queue.enqueue(path.to.func, key=json.loads(open('path/to/file.json', 'r').read()))
Warning: Do not use plain text without keyword if you do not know what the value is.If the value starts with @
, :
or %
or includes =
it would be recognised as something else.
Working with Queues
Besides enqueuing jobs, Queues have a few useful methods:
from rq import Queuefrom redis import Redisredis_conn = Redis()q = Queue(connection=redis_conn)# Getting the number of jobs in the queue# Note: Only queued jobs are counted, not including deferred onesprint(len(q))# Retrieving jobsqueued_job_ids = q.job_ids # Gets a list of job IDs from the queuequeued_jobs = q.jobs # Gets a list of enqueued job instancesjob = q.fetch_job('my_id') # Returns job having ID "my_id"# Emptying a queue, this will delete all jobs in this queueq.empty()# Deleting a queueq.delete(delete_jobs=True) # Passing in `True` will remove all jobs in the queue# queue is now unusable. It can be recreated by enqueueing jobs to it.
On the Design
With RQ, you don’t have to set up any queues upfront, and you don’t have tospecify any channels, exchanges, routing rules, or whatnot. You can just putjobs onto any queue you want. As soon as you enqueue a job to a queue thatdoes not exist yet, it is created on the fly.
RQ does not use an advanced broker to do the message routing for you. Youmay consider this an awesome advantage or a handicap, depending on the problemyou’re solving.
Lastly, it does not speak a portable protocol, since it depends on pickleto serialize the jobs, so it’s a Python-only system.
The delayed result
When jobs get enqueued, the queue.enqueue()
method returns a Job
instance.This is nothing more than a proxy object that can be used to check the outcomeof the actual job.
For this purpose, it has a convenience result
accessor property, thatwill return None
when the job is not yet finished, or a non-None
value whenthe job has finished (assuming the job has a return value in the first place,of course).
The @job
decorator
If you’re familiar with Celery, you might be used to its @task
decorator.Starting from RQ >= 0.3, there exists a similar decorator:
from rq.decorators import job@job('low', connection=my_redis_conn, timeout=5)def add(x, y): return x + yjob = add.delay(3, 4)time.sleep(1)print(job.result)
Bypassing workers
For testing purposes, you can enqueue jobs without delegating the actualexecution to a worker (available since version 0.3.1). To do this, pass theis_async=False
argument into the Queue constructor:
>>> q = Queue('low', is_async=False, connection=my_redis_conn)>>> job = q.enqueue(fib, 8)>>> job.result21
The above code runs without an active worker and executes fib(8)
synchronously within the same process. You may know this behaviour from Celeryas ALWAYS_EAGER
. Note, however, that you still need a working connection toa redis instance for storing states related to job execution and completion.
The worker
To learn about workers, see the workers documentation.
Considerations for jobs
Technically, you can put any Python function call on a queue, but that does notmean it’s always wise to do so. Some things to consider before putting a jobon a queue:
- Make sure that the function’s
__module__
is importable by the worker. Inparticular, this means that you cannot enqueue functions that are declared inthe__main__
module. - Make sure that the worker and the work generator share exactly the samesource code.
- Make sure that the function call does not depend on its context. Inparticular, global variables are evil (as always), but also any state thatthe function depends on (for example a “current” user or “current” webrequest) is not there when the worker will process it. If you want work donefor the “current” user, you should resolve that user to a concrete instanceand pass a reference to that user object to the job as an argument.
Limitations
RQ workers will only run on systems that implement fork()
. Most notably,this means it is not possible to run the workers on Windows without using the Windows Subsystem for Linux and running in a bash shell.
FAQs
RQ: Documentation Overview? ›
RQ: Documentation Overview. A job is a Python object, representing a function that is invoked asynchronously in a worker (background) process. Any Python function can be invoked asynchronously, by simply pushing a reference to the function and its arguments onto a queue. This is called enqueueing.
What is Python RQ? ›RQ, also known as Redis Queue, is a Python library that allows developers to enqueue jobs to be processed in the background with workers. The RQ workers will be called when it's time to execute the queue in the background.
How do I start an RQ worker? ›To start crunching work, simply start a worker from the root of your project directory: $ rq worker high default low *** Listening for work on high, default, low Got send_newsletter('me@nvie.com') from default Job ended normally without result *** Listening for work on high, default, low ...
What is Redis RQ? ›RQ (Redis Queue) is a simple Python library for queueing jobs and processing them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry. It can be integrated in your web stack easily.
What is RQ in Django? ›Django-RQ is a simple app that allows you to configure your queues in django's settings.py and easily use them in your project.
What is a task queue? ›Task queues let applications perform work, called tasks, asynchronously outside of a user request. If an app needs to execute work in the background, it adds tasks to task queues. The tasks are executed later, by worker services. The Task Queue service is designed for asynchronous work.
What is a queue worker? ›App Service. The core components of this architecture are a web front end that serves client requests, and a worker that performs resource-intensive tasks, long-running workflows, or batch jobs. The web front end communicates with the worker through a message queue.
Can Redis be used as a queue? ›Using Redis with Redis Queue allows you to enter those complex tasks into a queue, so the Redis worker executes these tasks outside of your application's HTTP server. In this article, we will build an app that enqueues jobs with Redis queue, performs a function on those jobs and returns the result of the function.
How do I clear my Redis queue? ›To clear data of a DCS Redis 4.0 or 5.0 instance, you can run the FLUSHDB or FLUSHALL command in redis-cli, use the data clearing function on the DCS console, or run the FLUSHDB command on Web CLI. To clear data of a Redis Cluster instance, run the FLUSHDB or FLUSHALL command on every shard of the instance.
How do I use Redis message queue? ›- Step 1: Connecting to Redis message queue. ...
- Step 2: Writing/Reading to a stream: ...
- Step 3: Publishing on Redis message queue Pub/Sub: ...
- Step 4: Subscribing to a channel on Redis message queue Pub/Sub:
What is Redis and Kafka? ›
Redis' in-memory database is an almost perfect fit for use-cases where short-lived messages and persistence aren't required. On the other hand, Kafka is a high throughput distributed queue which is built for storing a large amount of data for longer periods of time.
What is the difference between Redis and RabbitMQ? ›...
Comparison Table of RabbitMQ vs Redis.
RabbitMQ | Redis |
---|---|
It can be scaled up to send around 50K messages per second. | It can be scaled up to send around a million messages per second. |
Amazon SQS and Redis are primarily classified as "Message Queue" and "In-Memory Databases" tools respectively. "Easy to use, reliable" is the primary reason why developers consider Amazon SQS over the competitors, whereas "Performance" was stated as the key factor in picking Redis.
What does RQ stand for? ›RQ means "Real Quick," "Random Question," and "Rage Quit" (a gaming term).
What is RQ What is its value for fats? ›RQ values for fat, protein, and carbohydrate are 0.7, 0.8, and 1.0, respectively.
What is the formula of respiratory quotient? ›The respiratory quotient (RQ) is defined as the proportion between the release of carbon dioxide and the oxygen consumption, which is one of the indices of metabolic function estimation:[43]RQ=ΔCO2ΔO2.
What is the use of job queues? ›A job queue contains an ordered list of jobs waiting to be processed by a subsystem. The job queue is the first place that a submitted batch job goes before becoming active in a subsystem. The job is held here until a number of factors are met.
What is the difference between message queue and task queue? ›A Message Queue is a mechanism for sharing information, between processes, threads, systems. An AppEngine task Queue is a way for an AppEngine application to say to itself, I need to do this, but I am going to do it later, outside of the context of a client request.
How do I create a task queue? ›To add a new task to a queue, click Create task. Enter the details of your task and select a queue from the Queue dropdown menu.
What is difference between job and queue? ›Jobs and Queues
The line itself is the Queue, and each customer in the line is a Job. In order to process Jobs in the Queue you need command line processes or daemons.
What is process queue? ›
A queueing process is a model of waiting lines, constructed so that queue length and waiting times can be predicted. Networks of connected queues allow similar models for more complex situations where routing between queues plays a role.
What is queue scheduler? ›The processes that are entering into the system are stored in the Job Queue. Suppose if the processes are in the Ready state are generally placed in the Ready Queue. The processes waiting for a device are placed in Device Queues. There are unique device queues which are available for every I/O device.
How big can a Redis value be? ›All string values are limited to 512 MiB. This is the size limit you probably care most about. EDIT: Because keys in Redis are strings, the maximum key size is 512 MiB. The maximum number of keys is 2^32 - 1 = 4,294,967,295.
When should I use Redis? ›Redis can be used with streaming solutions such as Apache Kafka and Amazon Kinesis as an in-memory data store to ingest, process, and analyze real-time data with sub-millisecond latency. Redis is an ideal choice for real-time analytics use cases such as social media analytics, ad targeting, personalization, and IoT.
How can I check Redis data? ›A Redis server has 16 databases by default.
You can check the actual number by running redis-cli config get databases. In interactive mode, the database number is displayed in the prompt within square braces. For example, 127.0. 0.1:6379[13] shows that the 13th database is in use.
The way to test for this in Redis is to simply query the key. If the key is empty, populate it. If it is a string use get (or exists). If it is a hash then query for the specific members you need.
How do I get all Redis keys? ›To list the keys in the Redis data store, use the KEYS command followed by a specific pattern. Redis will search the keys for all the keys matching the specified pattern. In our example, we can use an asterisk (*) to match all the keys in the data store to get all the keys.
How do I select a database in Redis? ›Redis databases are numbered from 0 to 15 and, by default, you connect to database 0 when you connect to your Redis instance. However, you can change the database you're using with the select command after you connect: select 15.
Is Redis a message broker? ›At its core, Redis is an in-memory data store that can be used as either a high-performance key-value store or as a message broker.
What is the difference between message queue and message broker? ›Message queues provide means for the different applications to push information to the queue. The message broker simply takes the information from the sender, translates it between different messaging protocols, if needed, and delivers the message to the correct receiver.
Is a Redis queue persistent? ›
Redis has two persistence mechanisms: RDB and AOF. RDB uses a scheduler global snapshooting and AOF writes update to an apappend-only log file similar to MySql. You can use one of them or both. When Redis reboots,it constructes data from reading the RDB file or AOF file.
Is Kafka faster than Redis? ›...
Redis vs Kafka Comparison Table.
Comparison Points | Redis | Kafka |
---|---|---|
Speed | Faster | Not as fast as Redis |
The main idea behind Kafka is to continuously process streaming data; with additional options to query stored data. Kafka is good enough as a database for some use cases. However, the query capabilities of Kafka are not good enough for some other use cases.
Can Kafka be used as a cache? ›With default settings caching is enabled within Kafka Streams but RocksDB caching is disabled. Thus, to avoid high write traffic it is recommended to enable RocksDB caching if Kafka Streams caching is turned off. For example, the RocksDB Block Cache could be set to 100MB and Write Buffer size to 32 MB.
Why Kafka is better than RabbitMQ? ›Kafka offers much higher performance than message brokers like RabbitMQ. It uses sequential disk I/O to boost performance, making it a suitable option for implementing queues. It can achieve high throughput (millions of messages per second) with limited resources, a necessity for big data use cases.
What is the fastest message broker? ›We wrote IronMQ from the ground up as a cloud-agnostic message queue service with a focus on performance and easy deployment and management.
Which is the best message broker? ›The most popular message brokers are RabbitMQ, Apache Kafka, Redis, Amazon SQS, and Amazon SNS. Each of them is a great and powerful tool to use.
What is Redis store? ›Redis, which stands for Remote Dictionary Server, is a fast, open source, in-memory, key-value data store. The project started when Salvatore Sanfilippo, the original developer of Redis, wanted to improve the scalability of his Italian startup.
What is Dramatiq? ›Dramatiq (source code) is a Python-based task queue created as an alernative to the ubiquitous Celery project Dramatiq supports RabbitMQ and Redis as message brokers. Dramatiq is an implementation of the task queues concept. Learn more in the web development chapter or view the table of contents for all topics.
Can't connect to Redis? ›Firewall restriction is another common reason that can trigger the “could not connect to Redis connection refused”. By default Redis server listen to the TCP port 6379. If another application is using the port or if the firewall restrictions blocks the port, it can trigger the connection refused error.
How do I use RabbitMQ in Python? ›
- Following Program send.py will send a single message to the queue.
- Step 4:Open terminal. Run the Send.py The producer program will stop after every run: python send.py [x] Sent 'Hello RabbitMQ! ...
- python receive.py. [*] Waiting for messages. ...
- Note: Continuously send a message through RabbitMQ.