Using CouchDB to store state: My hack to manage multi-machine data processing

This article describes how I use CouchDB to manage multiple computing jobs. I make no claims that this is the best way to do things. Rather I want to show how using CouchDB in a small way gradually led to a solution that I could not have come up with using a traditional relational database.

The fundamental problem is that I don’t know what I am doing when it comes to managing a cluster of available computers. As a researcher I often run into big problems that require lots of data crunching. I have access to about 6 computers at any given time, two older, low-powered servers, two better servers, and two workstations, one at work and one at home. If one computer can’t handle a task, it usually means I have to spread the pain around on as many idle CPUs as I can. Of course I’ve heard of cloud computing solutions from Amazon, Joyent, and others, but quite frankly I’ve never had the time and the budget to try out these services for myself.

At the same time, although I can install and manage Gentoo on my machines, I’m not really a sysadmin, and I really can’t wire up a proper distributed heterogeneous computing environment using cool technologies like Ømq. What I’ve always done is human-in-the-loop parallel processing. My problems have some natural parallelism—for example, the data might be split across the 58 counties of California. This means that I can manually run one job per county on each available CPU core.

This human-in-the-loop distributed computer model has its limits however. Sometimes it is difficult to get every available machine to have the same computational environment. Other times it just gets to be a pain to have to manually check on all the jobs and keep track of which are done and which still need doing. And when a job crashes halfway through, then my manual method sucks pretty hard, as it usually means restarting that job from the beginning.

Using CouchDB to keep track of state

About two years ago I came up with a small improvement to my computing solution using CouchDB. Instead of using a piece of paper to tick off completed jobs, I just wrote that "completed" information to CouchDB, and used a view to queue up what jobs needed doing. So for a job that involved looking at counties, each document might look like this:

{
  "_id" : "06001",
  "properties":{
       "name": "Alameda",
       "type": "county"
       },
  "status_2007": {
       "rawdata": "1",
       "imputed": 1,
       "geo_job": "2013-08-20",
       "data_job":"in progress"
       },
  "status_2008": {
       "rawdata": "1",
       "imputed": 1,
       "geo_job": "2013-08-26"
       },
  "status_2009": {
       "rawdata": "1",
       "imputed": 1
       }
}

To use this database, I can either pick a county by its FIPS code (which is how I’ve defined the document ids) and inspect whether or not the status for "geo_job" is done for the year of data in question, or I can make a view that lists off all of the documents that are not "done". A view that would give me the FIPS code and year for each county that still needed the "geo_job" processing step to be run would be something like:

function(doc) {
  var yearmatch = new RegExp("^status_(\\d{4})$");
  for(var k in  doc){
    if(k !== '_id' && k !== '_rev'){
      if(yearmatch.test(k) &&(
           doc.properties.type==="county")
           && ( doc[k]['geo_job'] === undefined
                || !doc[k]['geo_job']
                || doc[k]['geo_job'] === 'todo'
                )
        ){
           var match = yearmatch.exec(k)
           var year = match[1]
           emit([year,doc.properties.name],null);
      }
    }
  }
}

With this view I can also add a simple "_count" reduce to the view to get the number of counties remaining for each year.

Every query to this view will get a list of counties that still need processing. To use this in practice, each processing job queries the view and gets a county and year to process, then immediately tries to write an update to the status document in which the "geo_job" state for the year in question is changed to "in progress" or something, which will remove it from this "todo" view.

Because of how CouchDB works, if two process pick the same county and year and both try to change the status document to read "in progress" at the same time, only one write will win. The winning process will move on to cranking out the job for the chosen year and county, and the losing process or processes will hit the view and get another county and year to process.

Local reads and remote writes

The next step in my use of CouchDB was to experiment with replication. Rather than using a single central server as my CouchDB server, I set up CouchDB on all of my machines and replicated the status database to all of them. The key change to CouchDB that made this possible was the creation of the _replicator database around version 1.1.1. With this addition, it became possible to set up continuous replications between databases that would restart whenever CouchDB was restarted. I chose one server, call it A, as the "main" CouchDB server, and had every other server do pull replication from that server. Because I was afraid of race conditions, I had each process still doing the "check out" using the central server A, but the checks using te "todo" view were done locally. My though was that even if a machine’s replication got behind the central machine, at least I wouldn’t duplicate effort by starting a job multiple times.

On Machine B, the document entry in the _replicator would look like:

{
   "_id": "A_via_tunnel_tracking_db_pull",
   "_rev": "9-88b826902d274208c57469ceec5ccd34",
   "source": "http://user:pass@127.0.0.1:5985/data%2ftracking",
   "target": "data/tracking",
   "create_target": false,
   "continuous": true,
   "user_ctx": {
       "name": "james",
       "roles": [
           "_admin"
       ]
   }
   ...
}

In this case, I had set up an SSH tunnel from machine B to machine A, and have mapped port 5984 on machine A to 5985 on machine B, so the URL for machine A is http://127.0.0.1:5985. I also included the user and password of an admin account so that any changes to the design documents (for example, updating the view that lists the "todo" jobs) would get propagated correctly.

Local reads and local writes

After setting up this system, I started watching how long it took for writes to A to propagate back to machines B, C, and D. For example, if a job on machine D finished working on 2008 data for Alameda County, it would write that to the status document in CouchDB on machine A. The replications would then echo this change to B, C, and D, and all databases would be in sync again. What I didn’t know is how long this echo would take. In practice, given what I was doing and how fast my network connections are, the changes transmitted almost immediately. So the next step was to read and write locally.

In this way of keeping track of state, each machine only considered its own copy of the CouchDB status database. It would read its local view to see what to process next, and would write locally to check out a county and year for processing. In this set up it is almost certain that I will create conflicts. There is almost guaranteed to be at least one case in which two machines try to check out the same county and year at the same time, and do so successfully. The "harm" in this case is that the same chunk of data gets worked on twice.

Note that this race condition only exists between machines. If I have 8 jobs on machine A, then they all read and write to the same database, and there will not be any duplication of effort. The only problem comes in if a task on machine A and on some other machine (looking at a different database) try to start the same county/year at the same time. The way to minimize this built-in race condition is to minimize the likelihood that they start at "the same time". By staring at the replications between machines, it was apparent to me that even between my house and my office, the replication would complete within a second or two. So as long as the original jobs on a machine started a few seconds apart, it was unlikely that they would ever sync up exactly again, given the differences in the size of each data set.

The final setup

Using CouchDB 1.4.x, I am now using this approach, bit with two-way push replication rather than push-pull. I put machine A at the middle, and make sure every machine replicates to and from A. On every machine that is not machine A, I setup an SSH tunnel to machine A that maps port 5984 to port 5985 on localhost. If A goes down, then I have a problem, but the problem is bigger than replicating a status database around different machines.

A push/pull pair might look like this. First, machine A (the central hub machine) will either have direct connections or a SSH tunnel to each other machine. Suppose machine B is on the local network at 192.168.0.4, then the replication doc to push from A to B would look like:

{
   "_id": "tracking_db_push_B",
   "_rev": "3-daf5f79737e170e1675807ea610bfb78",
   "source": "data/tracking",
   "target": "http://user:pass@192.168.0.4:5984/data%2ftracking",
   "create_target": true,
   "continuous": true,
   "user_ctx": {
       "name": "james",
       "roles": [
           "_admin"
       ]
   }
   ...
}

Similarly, the replication doc to machine C that is hiding behind an SSH tunnel on port 5986 would look like this:

{
   "_id": "tracking_db_push_C",
   "_rev": "3-90ae5b9bcb9f49f18d9bf99d9fe263e8",
   "source": "data/tracking",
   "target": "http://user:pass@127.0.0.1:5986/data%2ftracking",
   "create_target": true,
   "continuous": true,
   "user_ctx": {
       "name": "james",
       "roles": [
           "_admin"
       ]
   }
   ...
}

At the other end, machine B would have a push replication document that pushes to machine A, on 192.168.0.1, as so:

{
   "_id": "tracking_db_push_A",
   "_rev": "3-5d31ec8c5d9bd9c72558de6b99224891",
   "source": "data/tracking",
   "target": "http://user:pass@192.168.0.1:5984/data%2ftracking",
   "create_target": true,
   "continuous": true,
   "user_ctx": {
       "name": "james",
       "roles": [
           "_admin"
       ]
   }
   ...
}

The remote machine C that needs to communicate over the SSH tunnel would have a push replication document that looks like the one above, but this time using port 5985:

{
   "_id": "tracking_db_push_A",
   "_rev": "6-eb0c3adcde7aa3d7be6d454cb8ac1f13",
   "source": "data/tracking",
   "target": "http://user:pass@127.0.0.1:5985/data%2ftracking",
   "create_target": true,
   "continuous": true,
   "user_ctx": {
       "name": "james",
       "roles": [
           "_admin"
       ]
   }
   ...
}

To make things easier, I like to keep the central replication machine on port 5985 on all of the machines that communicate over SSH tunneling.

Conclusion

CouchDB is one of many document oriented databases that have become popular in the past few years. I am using it to good effect in a number of ways. This post showed how ingrained it has become to my basic data processing work flow. While some day I will figure out how to use multi-machine file systems and cloud services, for the moment I am getting by just fine using CouchDB to help manage work across multiple machines.

While my example above uses the idea of processing one county at a time, in practice I use a much finer-grained notion of state. If a county has 2000 detector stations in it and each detector must be processed, then the state database keeps track of each detector. I’ve also begun storing some other, interesting to know details in the status database. For example, when first processing a county, say when completing the “geo_job”, I might generate a GEO_JSON shape for the county. Rather than stash that in a file system or some other database, I can stick it right there in the status database. Similarly a data processing job might produce summary charts for a year’s worth of data. Rather than storing them in a temp directly, I have had good success writing them as attachments to the detector’s document in the status database.

I’ve recently begun experimenting with using CouchDB as both a place to store data and a place to keep track of job state. In this use case, the replication issues are a little bit more difficult, as I’m copying gigabytes of data between machines. On the plus side, I only have to move the data once between machines, rather than every time I run a job and need a chunk of data. On the other hand, some of my replication jobs are stuck at low percent complete rates (23%, 45%, etc) because the data processing jobs are writing new data about as fast as it can be copied between machines using the replication connections. When and if I sort out more of the issues there, I’ll write up my experiences.

For the moment, however, I can whole-heartedly recommend using CouchDB as a way to keep status databases synced between machines, as long as the databases stay below a gigabyte or so. If a status database creeps up past 5GB, the replication required to add a new machine into the mix becomes more significant, and it might be better to split off a new database.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s