A real-world use of PL/Perl

Last week I wrote a node.js program to parse and copy a CSV file into PostgreSQL. The data included several columns of detector data, and then a catch-all column called XML that was supposed to contain the raw read from the detector. The XML column was a big old ASCII escaped blob of text, and I just ignored it and stuffed it into its own table.

Unfortunately, as is always the case with these things, the XML column wasn’t XML at all. Instead, it contained what looked like a Perl object dumped using Data::Dumper. I couldn’t easily rewrite my node.js program to break up that Perl object, and I certainly didn’t want to rewrite my well-tested node.js program in Perl.

Enter PL/Perl.

I’ve never really had a need for PL/Perl. The PostgreSQL documentation page promotes the ability to use Perl’s string-munging facilities. But here I had an even simpler use case. I just want to call out to Perl, eval() the object, then stash the results.

The reason I’m writing this post is that I’ve never quite gotten the hang of how to use stored procedures in PostgreSQL. This is sort of a “note to my future self” in case I forget containing some of the things I figured out.

First, the initial program I wrote looks like this:

CREATE OR REPLACE FUNCTION perl_xml_segment_decoder (TEXT) RETURNS bt_xml_segment AS $$
    use strict;
    my $unescape = sub {
        my $escaped = shift;
        $escaped =~ s/%u([0-9a-f]{4})/chr(hex($1))/eig;
        $escaped =~ s/%([0-9a-f]{2})/chr(hex($1))/eig;
        return $escaped;
    }; # borrowed from  URI::Escape::JavaScript 

    my $chars = $unescape->( $_[0] );
    my $VAR1;
    eval($chars);

    # clean up some entries we are not using
    my $segment = $VAR1->{'segment'};
    $segment->{'ts'} = $segment->{'Timestamp'};
    my %bar = map { lc $_ => $segment->{$_} } qw{
      SegmentID
      FromLocationID
      ToLocationID
      Route
      GroupBy
      ProjectID
      ts
      NumTrips
      Speed
      Distance
      EstimatedTimeTaken
      TravelTime
    };
    return \%bar;
$$ LANGUAGE plperl;

This takes in one of the “XML” strings, and returns a column type bt_xml_segment that is defined by:

CREATE TABLE bt_xml_segment (
  segmentid      integer primary key,
  fromlocationid integer REFERENCES bt_xml_location (locationid),
  tolocationid   integer REFERENCES bt_xml_location (locationid),
  route          varchar(128),
  groupby        integer,
  projectid      integer REFERENCES bt_xml_project (projectid),
  ts    timestamp with time zone not null,
  numtrips       integer,
  speed          numeric,
  distance           numeric,
  estimatedtimetaken numeric,
  traveltime         numeric
);

One thing I’ve never gotten the hang of is how to call functions. Following the docs, I can call this function as follows:

select * from  perl_xml_segment_decoder('%24VAR1%20%3D%20%7B%0A%20%20%27location%27%20%3D%3E%20%7B%0A%20%20%20%20%27Active%27%20%3D%3E%201%2C%0A%20%20%20%20%27LastCheckin%27%20%3D ... %20%20%27TravelTime%27%20%3D%3E%20%27356.285714285714%27%0A%20%20%7D%0A%7D%3B%0A');

and I would get back a lovely tabular output like this:

 segmentid | fromlocationid | tolocationid | route | groupby | projectid |           ts           |  numtrips |      speed       | distance | estimatedtimetaken |    traveltime    
-----------+----------------+--------------+-------+---------+-----------+------------------------+----------+------------------+----------+--------------------+------------------
      4558 |           3481 |         3472 | SR-39 |      15 |       672 | 2014-07-15 17:30:00-07 |       14 | 8.04274565301844 |      0.8 |                 86 | 356.285714285714
(1 row)

But the semantics of that call are strange to me. What the query says is to treat the function like it is a table. This is reasonable, but what I want to do is call the function on each row of another table, like so:

select perl_xml_segment_decoder(xml.data) from perlhash as xml;

But that returns an array output:

                                      perl_xml_segment_decoder                                      
----------------------------------------------------------------------------------------------------
 (4558,3481,3472,SR-39,15,672,"2014-07-15 17:30:00-07",14,8.04274565301844,0.8,86,356.285714285714)
(1 row)

This is more difficult to use in an INSERT clause. While I could contort that, and make it work, I decided to instead just keep the function as a function, and include the query to the XML data table within the function. Again, the excellent PostgreSQL docs are quite helpful, and explain how to query a table from Perl and then iterate over each returned row. My new function looks like this:

CREATE OR REPLACE FUNCTION perl_xml_segment_obs_decoder () RETURNS setof bt_xml_observation AS $$
    use strict;
    my $unescape = sub {
        my $escaped = shift;
        $escaped =~ s/%u([0-9a-f]{4})/chr(hex($1))/eig;
        $escaped =~ s/%([0-9a-f]{2})/chr(hex($1))/eig;
        return $escaped;
    }; # borrowed from  URI::Escape::JavaScript 

    my $sth = spi_query("SELECT * FROM perlhash");
    while ( defined( my $row = spi_fetchrow($sth) ) ) {
        my $chars = $unescape->( $row->{data} );
        my $VAR1;
        eval($chars);

        # clean up some entries we are not using
        my $segment = $VAR1->{'segment'};
        $segment->{'ts'} = $segment->{'Timestamp'};
        my %bar = map { lc $_ => $segment->{$_} } qw{
          SegmentID
          ts
          NumTrips
          Speed
          Distance
          EstimatedTimeTaken
          TravelTime
        };
        $bar{data_ts}         = $row->{ts};
        $bar{radar_lane_id}   = $row->{radar_lane_id};
        $bar{station_lane_id} = $row->{station_lane_id};
        return_next \%bar;
    }
    return undef;
$$ LANGUAGE plperl;

Because I'm actually following along my git commits, and because I was refactoring things and tuning my relational database tables as I developed, this function returns a different table type from before:

CREATE TABLE bt_xml_observation(
  segmentid      integer not null references bt_xml_segment(segmentid),
  ts    timestamp with time zone not null,
  data_ts timestamp with time zone not null,
  radar_lane_id integer,
  station_lane_id integer,
  numtrips       integer,
  speed          numeric,
  distance           numeric,
  estimatedtimetaken numeric,
  traveltime         numeric,
  primary key(segmentid,ts,data_ts,radar_lane_id,station_lane_id),
  foreign key (data_ts,radar_lane_id,station_lane_id) references smartsig.bluetooth_data(ts,radar_lane_id,station_lane_id)
);

I use this function within an insert statement, as follows:

insert into bt_xml_observation  (select  * from perl_xml_segment_obs_decoder()) ;

In some cases (when populating the segments and location tables, for example), the output of the function includes duplicates. Rather than handle them in the Perl code using a hash or something, I decided to keep the PL/Perl simple and use SQL to remove duplicates. My query for loading up the segments table (the 8 unique segments about which the data was collected) is:

insert into smartsig.bt_xml_segment  (select distinct * from smartsig.perl_xml_segment_decoder()) ;

Finally, I expanded my node.js code to make use of these functions. Each data file (representing an hour of data) was 18MB. My code loads up one file, saves the XML/Perl hash data into a “TEMP” table, and then uses that table to populate the observations. The insert statements use WITH clauses to query the functions, as well as to join those call with the existing data so as to avoid the error of inserting duplicates. Finally, my code is careful to populate the tables in order so that the various foreign key constraints are satisfied. (Note that I like to build my SQL statements as an array that I then “join” together. I do that in whatever language I’m programming in because it makes it easy to slot in dynamic variables, print diagnostic output, etc)

    this.perl_parser=function(client,callback){
        // essentially, I have to do these in order:

        var insert_statements = []
        insert_statements.push([
            "with"
            ,"a as ("
            ,"  select distinct * from perl_xml_project_decoder_from_location()"
            ,"),"
            ,"b as ("
            ,"  select a.*"
            ,"  from a"
            ,"  left outer join bt_xml_project z USING (projectid)"
            ,"  where z.projectid is null"
            ,")"
            ,"insert into bt_xml_project (projectid,title) (select projectid,title from b)"
        ].join(' '))

        insert_statements.push(
            ["with a as ("
             ,"select aa.*,count(*) as cnt from perl_xml_location_decoder_from_location() aa"
             ,"left outer join bt_xml_location z USING(locationid)"
             ,"where z.locationid is null"
             ,"group by aa.locationid,aa.locationname,aa.latitude,aa.longitude,aa.projectid"
             ,"),"
             ,"b as ("
             ,"select locationid,locationname,latitude,longitude,projectid,"
             ,"rank() OVER (PARTITION BY locationid ORDER BY cnt DESC) AS pos"
             ,"from a"
             ,")"
             ,"insert into bt_xml_location (locationid,locationname,latitude,longitude,projectid)"
             ,"(select locationid,locationname,latitude,longitude,projectid"
             ,"from b"
             ,"where pos=1)"].join(' ')
            )
        insert_statements.push([
            "with a as (select distinct aa.* from perl_xml_segment_decoder() aa"
            ,"left outer join bt_xml_segment z USING(segmentid)"
            ,"where z.segmentid is null)"
            ,"insert into bt_xml_segment (segmentid,fromlocationid,tolocationid,route,groupby,projectid)"
            ,"(select segmentid,fromlocationid,tolocationid,route,groupby,projectid from a)"
        ].join(' '))
        insert_statements.push(
            'insert into bt_xml_observation  (select  * from perl_xml_segment_obs_decoder())'
        )


        var q = queue(1);  // using queue (https://github.com/mbostock/queue)
                           // with parallelism of 1 to make sure each task 
                           // executes in order

        insert_statements.forEach(function(statement) {
            q.defer(function(cb){
                client.query(statement
                             ,function (err, result) {
                                 //console.log(statement)
                                 return cb(err)
                             })
            })
            return null
        })
        q.awaitAll(function(error, results) {
            //console.log("all done with insert statements")
            return callback()
        })

    }

And there you have it: a node.js program that runs SQL queries that use Perl code embedded in PL/Perl functions.

The gory details can be found in my github repo for this.

quick tests are great when documentation is thin

I have 14,000 odd items that I want to copy from PostgreSQL into CouchDB. While bulkdocs is great, 14,000 is too much. So I want to group the big array into a lot of smaller arrays.

I thought there was a simple function in [lodash](http://lodash.com) that I could use, and remembered having used [groupBy](http://lodash.com/docs#groupBy) in the past.

But the docs are slightly wrong. They imply that the callback function gets passed one argument, the array element, but the usual idiom for these sorts of functions is that they are passed two or three arguments: the array element, the index of the element, and the entire array.

Sure enough that is what is done:

var _ = require('lodash')
var groups = _.groupBy([4.2, 6.1, 6.4], function(num,idx,third) {
                 console.log(num,idx,third)
                 return idx % 2
             });

console.log(groups)

Running this (node test.js) produces

4.2 0 [ 4.2, 6.1, 6.4 ]
6.1 1 [ 4.2, 6.1, 6.4 ]
6.4 2 [ 4.2, 6.1, 6.4 ]
{ '0': [ 4.2, 6.4 ], '1': [ 6.1 ] }

So I can group by massive array into smaller arrays by munging the index.

When in doubt, use async.queue()

As with many other satisfied users, my goto library for handling asynchronous processing in node.js is the excellent async library. But what works in small doses doesn’t always work for larger problems.

Specifically, a common use pattern for me is to use it to handle checking things in CouchDB. Often I’m too lazy to code up a proper bulk docs call, so I’ll just run off a bunch of queries asynchronously. This evening I was testing some such code out and it was working for test cases with 10 and 100 items, but it fell over with “double callback” errors when I loaded up 9,000+ items to the list.

The problem of course is that async really means async. When you have an array with 9,000 items in it, and you use, say, filter on it like so:

var my_array=[...]
async.filter(my_array,
        function(item,cb){
                check_true_or_false_via_calling_couchdb(item,cb)
                return null
        },
        function(results_array){
                done(null,results_array)
                return null
        })

then what is happening is that filter is firing off as many hits as it can to CouchDB, which in this case is 9000+. This breaks things, with CouchDB shutting down, my SSH tunnels blocking things, etc etc.
The plumbing has gone “higgledly piggedly”, like that old Bloom County punchline.

So instead, use async’s queue:

var filtered_tasks = []
var q = async.queue(function(task,callback){
            filter_grids(task,function(doit){
                if(doit){
                    // keep these
                    filtered_tasks.push(task)
                }// drop those
                return callback()
            })
        },100)
// assign a callback for when the queue drains
q.drain = function() {
    //console.log('all items have been processed');
    cb_alltasks(null,filtered_tasks)
}
var tasks = _.map(grid_records
                 ,function(v,k){
                      var task = {'options':_.clone(config)}
                      task.cell_id = k
                      task.year = year
                      _.extend(task,v)
                      return task
                  })
q.push(tasks)

I chose the concurrency by playing with it. I 10 is too slow (took 25 seconds), 100 takes 9 seconds, and 1000 takes 9 seconds.

From simple examples to complicated real world cases

I have a really irritating use-case for a CouchDB view. I have several hundred million documents representing hourly data for 4km grid cells in California, and I need to group them by areas. For example, grid cell i=100, j=223 is in Mendocino County, and in the “NORTH COAST” air basin. Of course I have the geometry of the grid cells and the geometry of the counties, air basins, and so on, in PostgreSQL/PostGIS, and I usually just shoot off a query to get the relationship and I’m done. This is CouchDB, however, and views cannot rely on external information lest they become idemimpotent (I made that up). Everything that the view needs must be in the view from the start.

Fair enough, I set up the SQL queries and generated my 9,800+ row JavaScript hash lookup table that maps grid cell to various areas of interest. Now I want to mix that into the view without pulling my hair out.

There is a really simple example in the CouchDB wiki. I’ve reproduced it below:

 {
   _id:"_design/test",
   language: "javascript",
   whatever : {
     stringzone : "exports.string = 'plankton';",
     commonjs : {
       whynot : "exports.test = require('../stringzone')",
       upper : "exports.testing = require('./whynot').test.string.toUpperCase()"
     }
   },
   shows: {
     simple: "function() {return 'ok'};",
     requirey : "function() { var lib = require('whatever/commonjs/upper'); return lib.testing; };"
   },
   views: {
     lib: { 
       foo: "exports.bar = 42;" 
     },
     test: { 
       map: "function(doc) { emit(doc._id, require('views/lib/foo').bar); }"
     }
   }
  }

So where the above example says foo: "exports.bar = 42;", I want to add in my massive hashtable. Obviously cutting and pasting so many lines is not the way to go. Instead, I’m using a couchapp tool.

The concept of a couchapp used to get more press that it currently seems to, but the basic idea is to use code to load up your design doc with attachments and views. In my case, I couldn’t care less about the attachments and the notion of a webapp stored and served by CouchDB. I just want to programmatically construct the view document, and push it to CouchDB. I chose to use node.couchapp.js. I could also have "rolled my own", and in fact I probably will this afternoon. I am playing around with grunt, so I used grunt_couchapp (after patching it a bit to use cookie based authentication).

The basic structure of my directory is the following


config.json
package.json
Gruntfile.js
app.js
lib
├── cellmembership.json
└── dump_membership.js
node_modules
├── ...
└── ...

The config.json file contains my database details, including my username and password. package.json contains the npm dependencies, mostly containing what was pulled in by the grunt_couchapp tool, and the node_modules directory holds all the node modules. I do not have an _attachments directory, so I make sure my design doc has no attachments!

Before getting to app.js, in which the design document is defined, I will first talk about what goes into it. The lookup table is stored as a JSON object in lib/cellmembership.json. The contents looks like:

{ "100_223":{"airbasin":"NORTH COAST","bas":"NC","county":"MENDOCINO","fips":"23","airdistrict":"MENDOCINO COUNTY AQMD","dis":"MEN"},
 "100_224":{"airbasin":"NORTH COAST","bas":"NC","county":"MENDOCINO","fips":"23","airdistrict":"MENDOCINO COUNTY AQMD","dis":"MEN"},
   ... 9,890 more lines like this ...
 "304_48":{"airbasin":"SALTON SEA","bas":"SS","county":"IMPERIAL","fips":"13","airdistrict":"IMPERIAL COUNTY APCD","dis":"IMP"},
 "98_247":{"airbasin":"NORTH COAST","bas":"NC","county":"HUMBOLDT","fips":"12","airdistrict":"NORTH COAST UNIFIED AQMD","dis":"NCU"}
}

The view code that uses this file is saved to lib/dump_membership.js, and looks like:

module.exports = function(doc){
    var lookup = require('views/lib/cellmembership').lookup
    emit(lookup[doc.cell_id].county, doc.value)
}

These two pieces are put together in app.js, that looks like this:

var couchapp = require('couchapp')
var cellmembership = require('./lib/cellmembership.json')
var mapfun = require('./lib/dump_membership')

var ddoc = {
    _id: '_design/calvad',
    rewrites: [{
      from: '',
      to: 'index.html',
      method: 'GET',
      query: {}
    },{
      from: '/*',
      to: '/*'
    }],
    views: {
        "lib":{
            "cellmembership":"exports.lookup="+JSON.stringify(cellmembership)
        },
        "test":{
            "map":mapfun
        }
    },
    lists: {},
    shows: {}
};


module.exports = ddoc;

So instead of "exports.bar=42;", I put in "exports.lookup="+JSON.stringify(...). The key insight that the simple example didn’t really convey is that you want your entire "library" module to be a string. So in this case that means saving my JSON lookup document as a string using JSON.stringify. I probably could have just loaded it directly using fs.readfile(), but I like this way, because it soothes my worries about malformed JSON. If the JSON is screwed up, the app.js won’t run, and the failure happens right away, not in the midst of cranking through hundreds of millions of documents.

The other bit that I didn’t get from the example was how to include an external function in the design document. What I did was pretty simple, and it worked. I just did "map":mapfun. This is exactly the opposite of what needed to be done with the views:lib:cellmembership.. construct. There the exports.lookup= statement needs to be a string inside of the JavaScript, whereas the assignment of the map function needs to be actual JavaScript code, not the string representation of that code.

This is exactly the kind of inconsistency that drives me nuts and that nobody ever thinks to document, because only crazies like me run into those edge cases.

How I fire off multiple R jobs from node.js

Node.js has become my hammer of choice for most systems programming type jobs. In an earlier post I talked about how to use CouchDB to store the progress and state of jobs that need doing. Here I will demonstrate how I trigger those jobs and update CouchDB using a fairly simple node.js program.

Two key features of node that makes this program possible are spawn and being able to read and manipulate the environment variables.

var spawn = require('child_process').spawn
var env = process.env

Node.js is fully capable of using child processes. One can choose from exec, execFile, spawn, and fork. For my usage, the spawn function does exactly what I want—it creates a child process that reports back when it exits.

The other useful tool is the ability to access the current running environment using the process.env variable. This allows my program to take note of any environment variables that are already set, and to fill in any missing variables that my child process might need.

Concurrency via queue

Using spawn one can fire off as many jobs as desired. Suppose you have a machine with four cores, then calling spawn four times will efficiently use your processing power. Unfortunately it isn’t usually that simple. Instead, what typically happens is that you have a lot of separable data crunching tasks that need to be run, and you want to have four data processing jobs running at all times until the work is all done. To accomplish this, the spawn function will need to be called four times (to fill up the processors) and then will need to spawn a new job whenever one of the existing jobs finishes.

Continue reading

Using CouchDB to store state: My hack to manage multi-machine data processing

This article describes how I use CouchDB to manage multiple computing jobs. I make no claims that this is the best way to do things. Rather I want to show how using CouchDB in a small way gradually led to a solution that I could not have come up with using a traditional relational database.

The fundamental problem is that I don’t know what I am doing when it comes to managing a cluster of available computers. As a researcher I often run into big problems that require lots of data crunching. I have access to about 6 computers at any given time, two older, low-powered servers, two better servers, and two workstations, one at work and one at home. If one computer can’t handle a task, it usually means I have to spread the pain around on as many idle CPUs as I can. Of course I’ve heard of cloud computing solutions from Amazon, Joyent, and others, but quite frankly I’ve never had the time and the budget to try out these services for myself.

At the same time, although I can install and manage Gentoo on my machines, I’m not really a sysadmin, and I really can’t wire up a proper distributed heterogeneous computing environment using cool technologies like Ømq. What I’ve always done is human-in-the-loop parallel processing. My problems have some natural parallelism—for example, the data might be split across the 58 counties of California. This means that I can manually run one job per county on each available CPU core.

This human-in-the-loop distributed computer model has its limits however. Sometimes it is difficult to get every available machine to have the same computational environment. Other times it just gets to be a pain to have to manually check on all the jobs and keep track of which are done and which still need doing. And when a job crashes halfway through, then my manual method sucks pretty hard, as it usually means restarting that job from the beginning.

Continue reading

CouchDB and Erlang

Typical left-field introduction

As far as I understand it, the ability to run Erlang views natively is likely to be removed in the future because it does not offer any sandboxing of the content, and so the view can execute arbitrary commands on the server. So Erlang views are likely to go away.

Problem: big docs crash JSON.parse()

That said, I have a use case for Erlang views. Continue reading

Using superagent to authenticate a user-agent in node.js, plus a bonus bug!

Summary

This post describes how I use the superagent library to test access to restricted resources on my web server. This is something that I found to take a bit more effort than I expected, so I thought I’d write this up for the greater good.

Context

I am running a website in which some resources are open to the internet, while others require authentication versus our CAS server.

I have been logging into the CAS server using request. But in the interest of trying out different libraries and all that, I decided to rewrite my
method using superagent.

I need to log into the CAS server from node.js because I am writing tests that verify that the protected resources are hidden to non-authenticated users, and available to authenticated ones. Continue reading

CAS validate

My first program pushed up to npm turned out to be a javascript CAS (www.jasig.org/cas) library I wrote for our portal at http://www.ctmlabs.net. The main think holding me up pushing anything to npm was the lack of tests. While I never run tests on packages downloaded from npm (one area where CPAN is definitely better than npm…all tests are run by CPAN on install), I felt that I couldn’t claim that a package was “eligible” for adding to npm until I could prove to myself that it worked like I thought it did.

The tests turned out to be a lot harder to write than I expected. I used Mocha, and excellent test framework, and should, a handy assertion library. But the hard part was getting a session with CAS server to work. Continue reading

More progress figuring out asynchronous programming in node.js

I had an old recursive directory creation program I wrote a while back for a node.js server I’m running, but it never seemed to work right.

Last week I went looking for something on github, and found a gist that seemed to be what I wanted, but it wasn’t. It didn’t understand absolute paths, and split on the ‘/’ which caused it to try to create the directory ”, which failed.

So I retooled my program, and did the work of figuring out why it worked when invoked with one directory, but failed when invoked on a list of directories.

It all gets back to the fact that JavaScript is passing references around, so you have to be careful to protect variables that are used to call functions.

First, here is my final version
Continue reading