Node.js has become my hammer of choice for most systems programming type jobs. In an earlier post I talked about how to use CouchDB to store the progress and state of jobs that need doing. Here I will demonstrate how I trigger those jobs and update CouchDB using a fairly simple node.js program.
Two key features of node that makes this program possible are spawn
and being able to read and manipulate the environment variables.
var spawn = require('child_process').spawn var env = process.env
Node.js is fully capable of using child processes. One can choose from exec
, execFile
, spawn
, and fork
. For my usage, the spawn
function does exactly what I want—it creates a child process that reports back when it exits.
The other useful tool is the ability to access the current running environment using the process.env
variable. This allows my program to take note of any environment variables that are already set, and to fill in any missing variables that my child process might need.
Concurrency via queue
Using spawn
one can fire off as many jobs as desired. Suppose you have a machine with four cores, then calling spawn four times will efficiently use your processing power. Unfortunately it isn’t usually that simple. Instead, what typically happens is that you have a lot of separable data crunching tasks that need to be run, and you want to have four data processing jobs running at all times until the work is all done. To accomplish this, the spawn function will need to be called four times (to fill up the processors) and then will need to spawn a new job whenever one of the existing jobs finishes.
Conceptually, what you want to do is to create a routine that will process four things at a time, and be able to hold remaining jobs in a queue (hint hint) to load them up when another job finishes. One could write one’s own processing system with callbacks and lists, but why bother? Instead you should use the excellent, well supported, widely used, and exhaustively tested async
library with a queue
function that does the job perfectly.
var async = require('async') var queue = async.queue
The queue
function takes two arguments: queue(worker, concurrency)
. The worker
is a function that will do whatever job that needs doing. The concurrency
is the number of parallel jobs that will be run at a time. In node’s usual asynchronous workflow, these parallel jobs aren’t so much "concurrent" as non-blocking. So if you have a worker
to go look up files in the file system and you set the concurrency
to be four, then at most four jobs at a time will be looking for files and waiting for the file system to respond with the results. In normal usage, however, only a single thread will process the concurrent jobs, so if they all have something to do at the same time, you will still only use one CPU.
In our example, we are going to actually spawn a number of separate processes equal to the concurrency
argument. The function we pass to the worker
argument will do the spawning. This worker
function needs to accept two arguments: worker(task, callback)
. The task
is the next item in the queue for the worker to process, and the callback
is a function created by the async
library that your worker
function needs to call when it is done. As per the usual node.js idiom, the callback should be passed null
as the first argument if everything goes well, or something non-null if there is a problem.
Calling spawn
For this example, I will be setting up R jobs to process California air districts. R can be called in an interactive session, or in a batch mode using the Rscript
command. From reading the spawn
documentation, the first argument is the command to run (in this case, Rscript
), the second argument is an array of command line options, and the third argument is an options argument for spawn itself. Of the options argument, I am most interested in the ability to set the environment variables for the spawned process. So in a nutshell, the spawn process looks like this:
var opts = { cwd: './Rwork', env: process.env } // can modify child env with needed key-value pairs opts.env['YEAR']=2013 opts.env['BASIN']='SC' var RCall = ['--no-restore','--no-save','my_R_script.R'] var R = spawn('Rscript', RCall, opts)
In the above code, the first instruction sets up the options object with a default working directory and the current working environment. Next I override two variables in the environment, the year and the basin. (I use "associative array" way of using objects so that my eyes see environment variables being set.) Next I set up the array for the second argument to spawn
with the command line options for the Rscript
program. Finally I call spawn
and save the results in the variable R
.
A basic worker function does the above that is suitable for passing to the queue
function is shown below. The primary difference is that it handles the exit of the R job, and calls the callback. In this example, I have a dummy return code just to show that you can return whatever you like from the R script, so that error conditions or unusual stopping points can be flagged with your own exit messages.
function setup_R_job(opts,done){ var R = spawn('Rscript', RCall, opts) R.on('exit',function(code){ console.log('got exit code: '+code) if(code==1){ // do something special done() }else{ done() } return null }) return null }
Putting it all together
To use async.queue
, you first pass it the worker function shown above along with the desired parallelism, and it will return to you a queue that you can use. You then push things onto the queue, and then sit around and wait for them to finish. In the code snippet below I create one options object that is standard, and then use underscore’s clone function to copy it and modify what I need for each job. I also used undercore’s each
function because it is habit, but really any looping construct would work fine.
var queue = async.queue var jobs = 4 // typically a command line option, because it is unique // to the machine var basin_queue=queue(setup_R_job, jobs) // baseline options for every job var opts = { cwd: './Rwork', // where I've stashed my R scripts env: process.env } var years = [2007,2008,2009,2010,2011,2012,2013] var airbasins = [ 'GBV', 'MD', 'NEP', 'SC', 'SF', 'LC', 'MC', 'LT', 'NCC', 'NC', 'SCC', 'SD', 'SJV', 'SS', 'SV' ] _.each(years,function(year){ _.each(airbasins,function(basin){ var o = _.clone(opts,true) o.env['YEAR'] = year o.env['AIRBASIN'] = basin basin_queue.push(o) }) })
In other words, I create a unique set of options for each call, in which the only real difference is that I’ve set the environment variables to be the year and the airbasin that I want my R job to process. There are other ways to pass data in to R, including the usual command line tools, but I prefer using environment variables because that’s the way I’ve always done it.
Managing jobs across computers
Of course, what I’ve written here is only half the story. Using async.queue
and spawn
will squeeze the most effort out of your available processing power, but it will still take a long time for one machine to process all those years and all those airbasins (at least for what I need to do). By using CouchDB as described in an earlier post, I can customize the work needed to be done on each machine in a few ways. First, I can hit my status database from within node.js to find out which years and which basins need to be processed at all before I load them up in the queue. Another option is to delay that check until the current queue has spare capacity. I also hit CouchDB from with my R code to make sure that each job within the year and basin (there are many) still needs doing. In short, I use CouchDB to store the state of the overall progress of my work, and I have coded the node.js and R code to minimize the risk of overlap, so that the chance of duplicating effort between machines is minimized.
Because CouchDB can easily replicate between machines, I can also stand some network latency and even network isolation if I am clever. For example, some evenings I want to stream a movie over my home DSL line, and I don’t want to have CouchDB hogging bandwidth with its replications. So I can just unplug the ethernet to my server and run it by itself. As long as I make sure that machine is processing a unique set of basins and years, I can relax and let it run, then plug the network in when the movie is over and let CouchDB sync up the results in the background.
I like your approach
I was thinking about creating a small web api using Node.js and the express framework to expose R functions I need.
I’m not an R expert an I’ve not yet figured how to feed the data sets eventually coming from the http request into the RScript file you suggest environment variables but I’m not sure how to get them inside the Rscript
@MG, if you mean you want to accept arbitrary data from a web client, then all you have to do is point R to the temporary file that contains the uploaded data set. That isn’t an R issue, it is a node.js/web server issue. The uploaded files are stashed in a temp file somewhere. If you put the name of that file in an environment variable, then you can read it into R. Alternately, you could pass it into R using the command line.
@jmarca thank for suggestions
I’ve opened a stackoverflow to better show you
http://stackoverflow.com/questions/27184535/r-nodejs-integration
My advice is to walk, then run.
First get rid of the whole web interface part. Just try to call R from node.js with the JSON data you want. There are millions of ways to do it, but first you need a better understanding of what you’re doing.
Then get rid of the R part, and figure out how to get some input from a web UI.
Write tests for both halves that pass.
Then put them together.
Other than that basic advice, I can’t help you.
Pingback: Modernizing my approach to my R packages | Contour Line