When in doubt, use async.queue()

As with many other satisfied users, my goto library for handling asynchronous processing in node.js is the excellent async library. But what works in small doses doesn’t always work for larger problems.

Specifically, a common use pattern for me is to use it to handle checking things in CouchDB. Often I’m too lazy to code up a proper bulk docs call, so I’ll just run off a bunch of queries asynchronously. This evening I was testing some such code out and it was working for test cases with 10 and 100 items, but it fell over with “double callback” errors when I loaded up 9,000+ items to the list.

The problem of course is that async really means async. When you have an array with 9,000 items in it, and you use, say, filter on it like so:

var my_array=[...]
async.filter(my_array,
        function(item,cb){
                check_true_or_false_via_calling_couchdb(item,cb)
                return null
        },
        function(results_array){
                done(null,results_array)
                return null
        })

then what is happening is that filter is firing off as many hits as it can to CouchDB, which in this case is 9000+. This breaks things, with CouchDB shutting down, my SSH tunnels blocking things, etc etc.
The plumbing has gone “higgledly piggedly”, like that old Bloom County punchline.

So instead, use async’s queue:

var filtered_tasks = []
var q = async.queue(function(task,callback){
            filter_grids(task,function(doit){
                if(doit){
                    // keep these
                    filtered_tasks.push(task)
                }// drop those
                return callback()
            })
        },100)
// assign a callback for when the queue drains
q.drain = function() {
    //console.log('all items have been processed');
    cb_alltasks(null,filtered_tasks)
}
var tasks = _.map(grid_records
                 ,function(v,k){
                      var task = {'options':_.clone(config)}
                      task.cell_id = k
                      task.year = year
                      _.extend(task,v)
                      return task
                  })
q.push(tasks)

I chose the concurrency by playing with it. I 10 is too slow (took 25 seconds), 100 takes 9 seconds, and 1000 takes 9 seconds.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s