On Task Systems

Posted: 24 Mar 2013
Categories: software engineering node.js ruby taskrabbit

Click to expand

Why Task Systems?

I have been thinking a lot about task-queue systems lately. A queue system is integral to any modern application as it can preform jobs in the background like sending email, processing bulk updates, etc. We all know that Cron is bad and has no place in this modern world of ours, and we need a distributed way of handling these actions :D

At work we recently undertook the (succesfull!) migration from DelayedJob to Resque and investigated many other options. We also have a custom add-on to Resque (to be released soon) which transforms resque into a multi-app pub/sub message bus. I also recently had the ~~chore~~ opportunity to re-write actionHero's task system to include support for more modes. I have been thinking about task systems a lot.

Tasks vs Messages:

I want to make the distinction up front about what I consider a Task System vs. a Message System. To me, the main destination is the notion of a delay and deliverability. With a task system, you assume that there is a queue of actions to be eventually preformed which is likely to, at times, fill faster than can be worked. With a task system you also make the assumption that each message/action/event/task needs to be inspected, and must be delivered to a broker/worker (with the alternative being the message is broadcast to whomever is available to hear it, including a collection of 0 listeners).

tldr: A Message system delivers the message to anyone who is listening during the broadcast, while a task system enqueues the message until the recipient can read it.


The most important thing a task system needs is a persistent atomic list. This can be a database table or an array residing in-memory of some application (a list in redis is a great example). Usually, you also want to back this up to disk.

Next, the list needs the property that it is somehow possible that items can be added uniquely to it and removed uniquely. That is to say that if a task is added, one-and-only-one copy of that task will ever be present, and that it is possible to remove one-and-only-one item from that last.

For Redis based task systems (like Resque and Sidekiq), Redis is a single-threaded app, and allows a worker to pop off an element from a list. Being single threaded, this means that 2 clients who request a pop at the exact same time will be ordered and handled in turn. If there was only one item to pop, only one client would get the data. Tasks can be added by a similar push method.

For Database-backed queues, it's still possible to ensure an "atomic" pop, but it's a little trickier. Lets assume we have table with the following structure:

id | created_at | data | run_at | locked_by | locked_at

This structure is very similar to that created by DelayJobs. Adding a task to a relational database is as simple as a normal insert statement. Most relational Databases have the property that if you do not supply a primary key, one will be uniquely assigned for you, and this ID can be thought of at the place within the task list. Working the queue is a different matter. A worker simply cannot select id from tasks where run_at >= NOW() AND locked_by IS NULL; because this can easily result in a race condition because the acts of select and update (where a worker would claim a task) are not atomic. Furthermore, only some relational databases have support for SELECT FOR UPDATE or other types of operations which will allow a read and a write in the same transaction. (update by @nolliesnom on Hacker News)

To solve this universally, there is actually a multi-step process (by universally I will be assuming a mySQL set of features). First the worker needs to select an eligible task per the sql statement above. DelayedJob actually selects 5 tasks at once in case one of the jobs is claimed by another worker during this next step. The worker then writes a lockerby but not a lockedat. This indicates that the worker intends to work this job, but hasn't started yet. The worker then reads the job again to ensure that their lock is still there (it's possible for 2 workers to be on this step for the same job). If the lock is still there, then the worker can lay a locked_at, and start working the job. Here's how DJ does it for ActiveRecord (mysql, postgres, etc). This process is slower than a redis pop, but generally effective.

The interesting thing about DelayJob's approach (when compared to the Redis version) above is that the task never 'leaves' the queue even when it's being worked on. That means that if a worker crashes, the data is not lost, and can be examined. It is certainly possible for a redis worker to add back his task's information, but if there will be a short while between the pop and a subsequent write. There are backends for DelayJob which use noSQL databases (mongo), and they work almost exactly the same way, since these databases don't have a unique "pop" function.

Priority and Multiple Queues

A pattern Resque made famous was the notion of queues and priority. By default, rescue creates "high", "medium", "default", and "low" queues. This allows you to add normal tasks to the "default" queue, but if you have something important you want want to be worked ASAP, you can add it to the "high" queue and it will be worked by the next available worker. When you launch a a worker, you can also tell it which queues to work in order. You can also say "*", and work all queues in alphabetical order.

Delayed Tasks

Sometimes tasks need to be delayed. If I want to "check a service in 10 minutes" or "send a welcome email 15 minutes after a user signs up", you need not only the above notion of priority, but also of chronology.

Database-backed task systems have this easy. Add a column with a timestamp (run_at above), and just check that against the current time (.. AND where "run_at" <= NOW() ..) in the pop's where clause. Easy!

Redis-based systems have it harder. You don't want to be popping each and every task, de-serializing it's data, checking the timestamp, and then (most of the time) returning it to the queue. This is both dangerous and time consuming. resque-scheduler solves this problem by keeping n timestampe'd queues for tasks that haven't run yet (and a list of those queues in a hash). This allows for the scheduler to move a task over to the "working" queue whenever its time is ready.

Recurrent Tasks

There are a certain subset of tasks which repeat. A task like "make a database backup" or "email the marketing teal stats" repeat with a fixed frequency. Both DelayJob and Resque don't have support for this type of task natively, but conceptually if you have delayed tasks, you can always re-enqueue yourself after you finish. Resque Scheduler added support for a "schedule" in later releases that lets you define a yml list of tasks to run, and when to run them.

Duplicatable tasks

This last type of task takes us back to cron. This is the type of task I really do want to run "every day at midnight" on all my servers. This would be a task like "delete the /tmp directory's contents" or "restart apache". While seemingly simple, this is the hardest type of task to implement in a task system, as it tends to go against the general point of an "atomic, first-come, first serve" queue. As far as I know, there isn't a mechanism for this in either DJ or Resque, but I did recently add one to actionHero

My ideal task system (AKA How actionHero does it)

When building actionHero's task system, I took a look at all the types of tasks above, and tried to sort out how to build a system that could accommodate all these options. Luckily, I had some great existing systems I could ~~borrow~~ steal concepts from.

Anti-paterns I wanted to correct

ActionHero's data model

actionHero defines tasks in /tasks:

exports.sayHello = {
    name: 'sayHello',
    description: 'I say hello',
    scope: 'any',
    frequency: null,
    run: function(api, params, next){
        api.log("Hello, " + params.name);
        next(null, true);

and you can invoke the task in your code like so:

var task = new api.task({
    name: "sayHello",
    runAt: new Date().getTime() + 30000, // run 30 seconds from now
    params: {name: 'Evan'}, // any optional params to pass to the task

and then you can either task.enqueue() or tasks.run()

First, note the metadata. scope is the toggle for defining if the tasks is duplicatable or not. "any" means that any worker can work this task atomically. "all" means that the task will be worked by all servers currently running, and duplicated accordingly (I'll explain how later).

Next, you can choose to either run or enqueue the task. The difference between run and enqueue+now is that run() invokes the task in-process rather than passing it to the first available worker. This is useful if you really need to invoke a task now for a request.

runAt is an optional parameter to define when the task should be run (which defaults to now). This isn't a guarantee that the task will be run exactly at that time (as all the workers might be busy), but rathe that the task is "ready" to be run at that time. This solves the "delayed tasks" problem.

Finally the frequency option. This is used to denote if a task is recurrent, meaning if the task should re-enqueue itself when it completes.

actionHero uses redis to store its task data. To enable "all" task scopes, this means that every server (not worker) needs a 'local' queue of what to work on in addition to the global list. To enable the runAt parameter, there needs to be a 'holding pen' for tasks which aren't ready ro run yet. This leafs us to:

api.tasks.queues = {
    globalQueue: 'actionHero:tasks:global',
    delayedQueuePrefix: 'actionHero:tasks:delayed',
    localQueue: 'actionHero:tasks:' + api.id.replace(/:/g,'-'),
    data: 'actionHero:tasks:data', // actually a hash
    workerStatus: 'actionHero:tasks:workerStatus', // actually a hash
    enqueuedPeriodicTasks: 'actionHero:tasks:enqueuedPeriodicTasks'

and the worker's run the following pattern:

api.taskProcessor.prototype.process = function(callback){
    var self = this;
                self.setWorkerStatus("idle", function(){
                    if(self.running == true){
                        self.timer = setTimeout(function(){
                        }, self.cycleTimeMS); 
                    if(typeof callback == "function"){ callback(); }

Interestingly, the only step that actually "preforms" the task is self.processLocalQueue. The rest of the methods move the task's pointer from queue to another. Remember that the actual job's data is stored in the data hash. This allows us to do things like check enqueuedPeriodicTasks at boot and ensure that all the periodic tasks I know about (where frequency > 0) are in system somewhere by inspecting the data hash. Even currently processing jobs will be present.

To run a duplicatable tasks, the worker which moves the job from the globalQueue to it's own localQueue will also place a duplicate copy in every other server's queue.

For delayed tasks, the delayedQueuePrefix is used to create a number of queues of the form delayedQueuePrefix + timestamp. A queue may hold a number of jobs which can be run at that time. Rather than checking if every individual tasks is ready to be run, we can simply use the names of the delayed queues and compare them to the current time. As the delayed queues are still atomic lists, it safe for many workers to pop from them at once, and re-insert the job's pointed into the globalQueue to be worked like normal.



I did not have a goal in sharing my thoughts on task systems with you, but hopefully the comparison of how various systems work will be helpful as you choose what to use for your next project.

<< API-First.com: When does this Not Apply? Forklift: Moving Big Databases Around >>

Follow me on Twitter Follow me on Github


comments powered by Disqus