Tasks within Transactions using RabbitMQ
Sun 18 Dec 2011 13:55 EST
In a discussion of moving from Google AppEngine to EC2, a participant mentioned that the only “odd” GAE service he made use of was transactional task queues, and that there was no direct analogue of this service in other queueing systems.
Here’s one way of building a transactional task queue using RabbitMQ and your favourite SQL database.
- a table in the database will hold pending tasks
- messages going through RabbitMQ will inform workers of the arrival of new tasks
- the available tasks will be load-balanced between n workers
- a “cleaner” process makes sure all tasks are eventually run and that failed tasks are eventually detected
In this article, I’ll spell out the requirements for the system, and discuss the details of the database tables, RabbitMQ queues, and daemon processes required. I’ll then evaluate the solution presented in terms of the requirements, and conclude by discussing a few variations on the solution that might be worth exploring.
Requirements
This is what the GAE Task Queue API has to say about transactional task queueing:
You can enqueue a task as part of a datastore transaction, such that the task is only enqueued—and guaranteed to be enqueued—if the transaction is committed successfully. Tasks added within a transaction are considered to be a part of it and have the same level of isolation and consistency.
Our requirements, then, are:
- all successfully-enqueued tasks should eventually run
- successfully-enqueued tasks should be runnable shortly after transaction commit
- tasks where the transaction rolled back should never run
- the system should be robust against race conditions
- successfully-enqueued tasks shouldn’t be run more than once
where a “successfully-enqueued” task is one that was enqueued as part of a successfully-committed transaction.
Tables & Queues
Create a table—let’s call it tasks
—in your SQL database (here
imagined to be PostgreSQL):
CREATE TABLE tasks (
id TEXT PRIMARY KEY,
description TEXT UNIQUE,
start_time TIMESTAMP WITH TIME ZONE
);
Each column is important:
-
the
id
column uniquely names the job. Use a UUID, or the MD5 or SHA-1 hash of thedescription
, or some other suitably-unique name for the job. -
the
description
column describes the task to be performed. You could use JSON, a URL, a Java class or method name, or any other means of identifying what is to be done. -
the
start_time
column is used to make sure only a single worker starts performing the task, and to detect failed tasks.
Next, create a queue in your RabbitMQ instance; let’s call it
task_queue
.
Queue_Declare(queue = 'task_queue', durable = True)
The tasks
table will serve as the “master list” of work remaining to
be done. The task_queue
queue will only be used to rapidly notify
workers that a new task is waiting for them. If messages are lost from
that queue on occasion, no harm will be done: the “cleaner” process,
described below, will pick up the pieces.
Enqueueing a Task
To enqueue a task as part of a database transaction, create a row for the task as part of the transaction, and commit normally:
BEGIN;
-- ... the main part of your transaction goes here ...
INSERT INTO tasks
VALUES ('ad5b4ccf3902db006405074c721a990e', 'mytask', NULL);
COMMIT;
The new row must have its start_time
column set to NULL
.
At any time near the end of the transaction—either before or just
after the commit—send a message to the task_queue
containing the
id
of the new task:
Basic_Publish(exchange = '',
routing_key = 'task_queue',
body = 'ad5b4ccf3902db006405074c721a990e')
Worker Processes
Your workers should connect to the database and to RabbitMQ as
usual. Each worker process should
Basic_Consume
from the task_queue
. Whenever a message comes in containing a task
id
—for example, the ID of the task enqueued just above,
ad5b4ccf3902db006405074c721a990e
—the worker should:
-
BEGIN
a transaction. -
SELECT * FROM tasks WHERE id = 'ad5b4ccf3902db006405074c721a990e' FOR UPDATE
The
FOR UPDATE
is important: it prevents a race condition if two workers accidentally start looking at the same job at the same time. -
If no row is returned, wait for a few seconds (the “retry time”) and retry the
SELECT
. The message travelling through RabbitMQ could have overtaken the database commit. If after a couple of retries (the “giveup time”) the row is still absent, it’s likely the sender’s database commit didn’t complete, so move on to waiting for the next message arriving fromtask_queue
. -
Check the
start_time
of the returned row. If it’s anything butNULL
, some other worker has already started this job, so move on to the next message. -
UPDATE tasks SET start_time = NOW() WHERE id = 'ad5b4ccf3902db006405074c721a990e'
-
COMMIT
the transaction. At this point, this worker instance is considered to have claimed the job. -
Perform the task described by the
description
column of the row. -
DELETE FROM tasks WHERE id = 'ad5b4ccf3902db006405074c721a990e'
At step 3, where a worker may give up on a given notification and move
on to the next one, it’s possible that the worker might just not have
waited for long enough for the record to appear in the
database. Similarly, at step 4, the worker that had claimed the task
may have crashed or been terminated before it was able to DELETE
the
task record from the database. In both cases, the “cleaner” process
will make sure the job gets done eventually.
The “Cleaner” Process
The “cleaner” process should connect to the database and to RabbitMQ as usual. It should periodically—say, every minute (the “cleanup interval”) —wake up and perform the following routine:
-
SELECT * FROM tasks WHERE start_time IS NULL
-
Each row returned indicates a possibly-overlooked task, so a repeat message to
task_queue
should be sent, just like the ones described above. The worker pool will pick up the message as usual and will start work on the task. The use ofFOR UPDATE
in each worker ensures that the job won’t be performed twice. -
SELECT * FROM tasks WHERE start_time < (NOW() - INTERVAL '1 hour')
-
Each row returned indicates a possible crashed or deadlocked task. Vary the interval as required! For some jobs, one hour is too long; for others, much too short. The “cleaner” should kick off task-specific recovery actions:
-
sometimes it’s appropriate to simply retry the job:
UPDATE
thestart_time
toNULL
, and then post a message totask_queue
. -
sometimes a compensating job should be enqueued, either as a task (in which case use the normal task enqueueing process) or as a simple function call within the “cleaner”.
-
sometimes the job should be abandoned, and a “dead job” notification sent to a system operator: perhaps send an email, or send a message to an appropriate exchange using RabbitMQ.
-
Evaluation
How did we do in satisfying our requirements?
Requirement 1 is satisfied by the action of the “cleaner”: any time a
row is successfully inserted into the tasks
table, the combined
action of the “cleaner” and the worker processes make sure the job
will be run.
Requirement 2 is satisfied by the use of the task_queue
: the message
sent through RabbitMQ serves to alert a worker that an item is headed
their way in the database.
Requirement 3 is satisfied by the use of transactional inserts into
the tasks
table: if the commit is rolled back, the row will never
appear in the database, and even if a message was sent on
task_queue
, the worker will eventually stop waiting for the database
row to arrive, and the “cleaner” will never hear of the rolled-back
job to begin with.
Requirement 4 is satisfied by the use of the retry timers, which will
catch most races between the database and RabbitMQ. In cases where
severe delays mean that the workers give up on a job before its row
appears, the “cleaner” process will notice, and will resend a job
notification message to task_queue
.
Requirement 5 is satisfied by the use of the start_time
field in
each row, in conjunction with SELECT ... FOR UPDATE
to ensure that
only one worker will be able to claim a job for processing.
Variations
There are plenty of variations on this basic pattern.
-
You could make the “cleaner” process only resubmit jobs that have been waiting for more than two “cleanup intervals”, to lower the (already small) probability of (harmless) double notifications. You’d only need to do this if your database was bad at performing
SELECT ... FOR UPDATE
queries. -
You could automatically scale your worker pool: instead of posting notifications directly to the
task_queue
, you’d send them to atask_exchange
. They’d then be distributed not only to workers but to a special “scaler” process. Each worker would send a notification to the “scaler” when each task was completed, and the “scaler” would keep statistics on the arrival rate and the completion rate of tasks. New worker process instances would be started by the “scaler” in situations where the arrival rate was greatly exceeding the completion rate, and old instances would be torn down in the opposite situation. -
The (highly experimental) presence exchange RabbitMQ plugin could be used to detect crashed jobs more quickly.
-
Instead of
DELETE
ing tasks, an additional timestamp columnfinish_time
could be added to the table, and the queries and updates could be adjusted to use it to simulate job deletion. That way, a permanent record is kept of all the tasks the system has performed.
And, of course, the “retry time”, “giveup time”, “cleanup interval” and crashed-task-detection-interval can all be adjusted to the needs of a given system.
Finally, in cases where you don’t need transactional task queueing, systems like Celery provide an amazing wealth of general task-execution functionality, and will almost certainly serve you better than rolling your own.