How I fire off multiple R jobs from node.js

Node.js has become my hammer of choice for most systems programming type jobs. In an earlier post I talked about how to use CouchDB to store the progress and state of jobs that need doing. Here I will demonstrate how I trigger those jobs and update CouchDB using a fairly simple node.js program.

Two key features of node that makes this program possible are spawn and being able to read and manipulate the environment variables.

var spawn = require('child_process').spawn
var env = process.env

Node.js is fully capable of using child processes. One can choose from exec, execFile, spawn, and fork. For my usage, the spawn function does exactly what I want—it creates a child process that reports back when it exits.

The other useful tool is the ability to access the current running environment using the process.env variable. This allows my program to take note of any environment variables that are already set, and to fill in any missing variables that my child process might need.

Concurrency via queue

Using spawn one can fire off as many jobs as desired. Suppose you have a machine with four cores, then calling spawn four times will efficiently use your processing power. Unfortunately it isn’t usually that simple. Instead, what typically happens is that you have a lot of separable data crunching tasks that need to be run, and you want to have four data processing jobs running at all times until the work is all done. To accomplish this, the spawn function will need to be called four times (to fill up the processors) and then will need to spawn a new job whenever one of the existing jobs finishes.

Conceptually, what you want to do is to create a routine that will process four things at a time, and be able to hold remaining jobs in a queue (hint hint) to load them up when another job finishes. One could write one’s own processing system with callbacks and lists, but why bother? Instead you should use the excellent, well supported, widely used, and exhaustively tested async library with a queue function that does the job perfectly.

var async = require('async')
var queue = async.queue

The queue function takes two arguments: queue(worker, concurrency). The worker is a function that will do whatever job that needs doing. The concurrency is the number of parallel jobs that will be run at a time. In node’s usual asynchronous workflow, these parallel jobs aren’t so much "concurrent" as non-blocking. So if you have a worker to go look up files in the file system and you set the concurrency to be four, then at most four jobs at a time will be looking for files and waiting for the file system to respond with the results. In normal usage, however, only a single thread will process the concurrent jobs, so if they all have something to do at the same time, you will still only use one CPU.

In our example, we are going to actually spawn a number of separate processes equal to the concurrency argument. The function we pass to the worker argument will do the spawning. This worker function needs to accept two arguments: worker(task, callback). The task is the next item in the queue for the worker to process, and the callback is a function created by the async library that your worker function needs to call when it is done. As per the usual node.js idiom, the callback should be passed null as the first argument if everything goes well, or something non-null if there is a problem.

Calling spawn

For this example, I will be setting up R jobs to process California air districts. R can be called in an interactive session, or in a batch mode using the Rscript command. From reading the spawn documentation, the first argument is the command to run (in this case, Rscript), the second argument is an array of command line options, and the third argument is an options argument for spawn itself. Of the options argument, I am most interested in the ability to set the environment variables for the spawned process. So in a nutshell, the spawn process looks like this:

var opts = { cwd: './Rwork',
             env: process.env
// can modify child env with needed key-value pairs
var RCall = ['--no-restore','--no-save','my_R_script.R']
var R  = spawn('Rscript', RCall, opts)

In the above code, the first instruction sets up the options object with a default working directory and the current working environment. Next I override two variables in the environment, the year and the basin. (I use "associative array" way of using objects so that my eyes see environment variables being set.) Next I set up the array for the second argument to spawn with the command line options for the Rscript program. Finally I call spawn and save the results in the variable R.

A basic worker function does the above that is suitable for passing to the queue function is shown below. The primary difference is that it handles the exit of the R job, and calls the callback. In this example, I have a dummy return code just to show that you can return whatever you like from the R script, so that error conditions or unusual stopping points can be flagged with your own exit messages.

function setup_R_job(opts,done){

    var R  = spawn('Rscript', RCall, opts)
        console.log('got exit code: '+code)
            // do something special
        return null
    return null

Putting it all together

To use async.queue, you first pass it the worker function shown above along with the desired parallelism, and it will return to you a queue that you can use. You then push things onto the queue, and then sit around and wait for them to finish. In the code snippet below I create one options object that is standard, and then use underscore’s clone function to copy it and modify what I need for each job. I also used undercore’s each function because it is habit, but really any looping construct would work fine.

var queue = async.queue
var jobs = 4 // typically a command line option, because it is unique
             // to the machine
var basin_queue=queue(setup_R_job, jobs)
// baseline options for every job
var opts = { cwd: './Rwork',    // where I've stashed my R scripts
             env: process.env

var years = [2007,2008,2009,2010,2011,2012,2013]
var airbasins = [ 'GBV', 'MD', 'NEP', 'SC', 'SF', 'LC', 'MC',
                  'LT',  'NCC',  'NC',  'SCC', 'SD',  'SJV',
                  'SS',  'SV' ]

        var o = _.clone(opts,true)
        o.env['YEAR'] = year
        o.env['AIRBASIN'] = basin

In other words, I create a unique set of options for each call, in which the only real difference is that I’ve set the environment variables to be the year and the airbasin that I want my R job to process. There are other ways to pass data in to R, including the usual command line tools, but I prefer using environment variables because that’s the way I’ve always done it.

Managing jobs across computers

Of course, what I’ve written here is only half the story. Using async.queue and spawn will squeeze the most effort out of your available processing power, but it will still take a long time for one machine to process all those years and all those airbasins (at least for what I need to do). By using CouchDB as described in an earlier post, I can customize the work needed to be done on each machine in a few ways. First, I can hit my status database from within node.js to find out which years and which basins need to be processed at all before I load them up in the queue. Another option is to delay that check until the current queue has spare capacity. I also hit CouchDB from with my R code to make sure that each job within the year and basin (there are many) still needs doing. In short, I use CouchDB to store the state of the overall progress of my work, and I have coded the node.js and R code to minimize the risk of overlap, so that the chance of duplicating effort between machines is minimized.

Because CouchDB can easily replicate between machines, I can also stand some network latency and even network isolation if I am clever. For example, some evenings I want to stream a movie over my home DSL line, and I don’t want to have CouchDB hogging bandwidth with its replications. So I can just unplug the ethernet to my server and run it by itself. As long as I make sure that machine is processing a unique set of basins and years, I can relax and let it run, then plug the network in when the movie is over and let CouchDB sync up the results in the background.

5 thoughts on “How I fire off multiple R jobs from node.js

  1. I like your approach
    I was thinking about creating a small web api using Node.js and the express framework to expose R functions I need.
    I’m not an R expert an I’ve not yet figured how to feed the data sets eventually coming from the http request into the RScript file you suggest environment variables but I’m not sure how to get them inside the Rscript

  2. @MG, if you mean you want to accept arbitrary data from a web client, then all you have to do is point R to the temporary file that contains the uploaded data set. That isn’t an R issue, it is a node.js/web server issue. The uploaded files are stashed in a temp file somewhere. If you put the name of that file in an environment variable, then you can read it into R. Alternately, you could pass it into R using the command line.

  3. My advice is to walk, then run.

    First get rid of the whole web interface part. Just try to call R from node.js with the JSON data you want. There are millions of ways to do it, but first you need a better understanding of what you’re doing.

    Then get rid of the R part, and figure out how to get some input from a web UI.

    Write tests for both halves that pass.

    Then put them together.

    Other than that basic advice, I can’t help you.

  4. Pingback: Modernizing my approach to my R packages | Contour Line

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.