A real-world use of PL/Perl

Last week I wrote a node.js program to parse and copy a CSV file into PostgreSQL. The data included several columns of detector data, and then a catch-all column called XML that was supposed to contain the raw read from the detector. The XML column was a big old ASCII escaped blob of text, and I just ignored it and stuffed it into its own table.

Unfortunately, as is always the case with these things, the XML column wasn’t XML at all. Instead, it contained what looked like a Perl object dumped using Data::Dumper. I couldn’t easily rewrite my node.js program to break up that Perl object, and I certainly didn’t want to rewrite my well-tested node.js program in Perl.

Enter PL/Perl.

I’ve never really had a need for PL/Perl. The PostgreSQL documentation page promotes the ability to use Perl’s string-munging facilities. But here I had an even simpler use case. I just want to call out to Perl, eval() the object, then stash the results.

The reason I’m writing this post is that I’ve never quite gotten the hang of how to use stored procedures in PostgreSQL. This is sort of a “note to my future self” in case I forget containing some of the things I figured out.

First, the initial program I wrote looks like this:

CREATE OR REPLACE FUNCTION perl_xml_segment_decoder (TEXT) RETURNS bt_xml_segment AS $$
    use strict;
    my $unescape = sub {
        my $escaped = shift;
        $escaped =~ s/%u([0-9a-f]{4})/chr(hex($1))/eig;
        $escaped =~ s/%([0-9a-f]{2})/chr(hex($1))/eig;
        return $escaped;
    }; # borrowed from  URI::Escape::JavaScript 

    my $chars = $unescape->( $_[0] );
    my $VAR1;
    eval($chars);

    # clean up some entries we are not using
    my $segment = $VAR1->{'segment'};
    $segment->{'ts'} = $segment->{'Timestamp'};
    my %bar = map { lc $_ => $segment->{$_} } qw{
      SegmentID
      FromLocationID
      ToLocationID
      Route
      GroupBy
      ProjectID
      ts
      NumTrips
      Speed
      Distance
      EstimatedTimeTaken
      TravelTime
    };
    return \%bar;
$$ LANGUAGE plperl;

This takes in one of the “XML” strings, and returns a column type bt_xml_segment that is defined by:

CREATE TABLE bt_xml_segment (
  segmentid      integer primary key,
  fromlocationid integer REFERENCES bt_xml_location (locationid),
  tolocationid   integer REFERENCES bt_xml_location (locationid),
  route          varchar(128),
  groupby        integer,
  projectid      integer REFERENCES bt_xml_project (projectid),
  ts    timestamp with time zone not null,
  numtrips       integer,
  speed          numeric,
  distance           numeric,
  estimatedtimetaken numeric,
  traveltime         numeric
);

One thing I’ve never gotten the hang of is how to call functions. Following the docs, I can call this function as follows:

select * from  perl_xml_segment_decoder('%24VAR1%20%3D%20%7B%0A%20%20%27location%27%20%3D%3E%20%7B%0A%20%20%20%20%27Active%27%20%3D%3E%201%2C%0A%20%20%20%20%27LastCheckin%27%20%3D ... %20%20%27TravelTime%27%20%3D%3E%20%27356.285714285714%27%0A%20%20%7D%0A%7D%3B%0A');

and I would get back a lovely tabular output like this:

 segmentid | fromlocationid | tolocationid | route | groupby | projectid |           ts           |  numtrips |      speed       | distance | estimatedtimetaken |    traveltime    
-----------+----------------+--------------+-------+---------+-----------+------------------------+----------+------------------+----------+--------------------+------------------
      4558 |           3481 |         3472 | SR-39 |      15 |       672 | 2014-07-15 17:30:00-07 |       14 | 8.04274565301844 |      0.8 |                 86 | 356.285714285714
(1 row)

But the semantics of that call are strange to me. What the query says is to treat the function like it is a table. This is reasonable, but what I want to do is call the function on each row of another table, like so:

select perl_xml_segment_decoder(xml.data) from perlhash as xml;

But that returns an array output:

                                      perl_xml_segment_decoder                                      
----------------------------------------------------------------------------------------------------
 (4558,3481,3472,SR-39,15,672,"2014-07-15 17:30:00-07",14,8.04274565301844,0.8,86,356.285714285714)
(1 row)

This is more difficult to use in an INSERT clause. While I could contort that, and make it work, I decided to instead just keep the function as a function, and include the query to the XML data table within the function. Again, the excellent PostgreSQL docs are quite helpful, and explain how to query a table from Perl and then iterate over each returned row. My new function looks like this:

CREATE OR REPLACE FUNCTION perl_xml_segment_obs_decoder () RETURNS setof bt_xml_observation AS $$
    use strict;
    my $unescape = sub {
        my $escaped = shift;
        $escaped =~ s/%u([0-9a-f]{4})/chr(hex($1))/eig;
        $escaped =~ s/%([0-9a-f]{2})/chr(hex($1))/eig;
        return $escaped;
    }; # borrowed from  URI::Escape::JavaScript 

    my $sth = spi_query("SELECT * FROM perlhash");
    while ( defined( my $row = spi_fetchrow($sth) ) ) {
        my $chars = $unescape->( $row->{data} );
        my $VAR1;
        eval($chars);

        # clean up some entries we are not using
        my $segment = $VAR1->{'segment'};
        $segment->{'ts'} = $segment->{'Timestamp'};
        my %bar = map { lc $_ => $segment->{$_} } qw{
          SegmentID
          ts
          NumTrips
          Speed
          Distance
          EstimatedTimeTaken
          TravelTime
        };
        $bar{data_ts}         = $row->{ts};
        $bar{radar_lane_id}   = $row->{radar_lane_id};
        $bar{station_lane_id} = $row->{station_lane_id};
        return_next \%bar;
    }
    return undef;
$$ LANGUAGE plperl;

Because I'm actually following along my git commits, and because I was refactoring things and tuning my relational database tables as I developed, this function returns a different table type from before:

CREATE TABLE bt_xml_observation(
  segmentid      integer not null references bt_xml_segment(segmentid),
  ts    timestamp with time zone not null,
  data_ts timestamp with time zone not null,
  radar_lane_id integer,
  station_lane_id integer,
  numtrips       integer,
  speed          numeric,
  distance           numeric,
  estimatedtimetaken numeric,
  traveltime         numeric,
  primary key(segmentid,ts,data_ts,radar_lane_id,station_lane_id),
  foreign key (data_ts,radar_lane_id,station_lane_id) references smartsig.bluetooth_data(ts,radar_lane_id,station_lane_id)
);

I use this function within an insert statement, as follows:

insert into bt_xml_observation  (select  * from perl_xml_segment_obs_decoder()) ;

In some cases (when populating the segments and location tables, for example), the output of the function includes duplicates. Rather than handle them in the Perl code using a hash or something, I decided to keep the PL/Perl simple and use SQL to remove duplicates. My query for loading up the segments table (the 8 unique segments about which the data was collected) is:

insert into smartsig.bt_xml_segment  (select distinct * from smartsig.perl_xml_segment_decoder()) ;

Finally, I expanded my node.js code to make use of these functions. Each data file (representing an hour of data) was 18MB. My code loads up one file, saves the XML/Perl hash data into a “TEMP” table, and then uses that table to populate the observations. The insert statements use WITH clauses to query the functions, as well as to join those call with the existing data so as to avoid the error of inserting duplicates. Finally, my code is careful to populate the tables in order so that the various foreign key constraints are satisfied. (Note that I like to build my SQL statements as an array that I then “join” together. I do that in whatever language I’m programming in because it makes it easy to slot in dynamic variables, print diagnostic output, etc)

    this.perl_parser=function(client,callback){
        // essentially, I have to do these in order:

        var insert_statements = []
        insert_statements.push([
            "with"
            ,"a as ("
            ,"  select distinct * from perl_xml_project_decoder_from_location()"
            ,"),"
            ,"b as ("
            ,"  select a.*"
            ,"  from a"
            ,"  left outer join bt_xml_project z USING (projectid)"
            ,"  where z.projectid is null"
            ,")"
            ,"insert into bt_xml_project (projectid,title) (select projectid,title from b)"
        ].join(' '))

        insert_statements.push(
            ["with a as ("
             ,"select aa.*,count(*) as cnt from perl_xml_location_decoder_from_location() aa"
             ,"left outer join bt_xml_location z USING(locationid)"
             ,"where z.locationid is null"
             ,"group by aa.locationid,aa.locationname,aa.latitude,aa.longitude,aa.projectid"
             ,"),"
             ,"b as ("
             ,"select locationid,locationname,latitude,longitude,projectid,"
             ,"rank() OVER (PARTITION BY locationid ORDER BY cnt DESC) AS pos"
             ,"from a"
             ,")"
             ,"insert into bt_xml_location (locationid,locationname,latitude,longitude,projectid)"
             ,"(select locationid,locationname,latitude,longitude,projectid"
             ,"from b"
             ,"where pos=1)"].join(' ')
            )
        insert_statements.push([
            "with a as (select distinct aa.* from perl_xml_segment_decoder() aa"
            ,"left outer join bt_xml_segment z USING(segmentid)"
            ,"where z.segmentid is null)"
            ,"insert into bt_xml_segment (segmentid,fromlocationid,tolocationid,route,groupby,projectid)"
            ,"(select segmentid,fromlocationid,tolocationid,route,groupby,projectid from a)"
        ].join(' '))
        insert_statements.push(
            'insert into bt_xml_observation  (select  * from perl_xml_segment_obs_decoder())'
        )


        var q = queue(1);  // using queue (https://github.com/mbostock/queue)
                           // with parallelism of 1 to make sure each task 
                           // executes in order

        insert_statements.forEach(function(statement) {
            q.defer(function(cb){
                client.query(statement
                             ,function (err, result) {
                                 //console.log(statement)
                                 return cb(err)
                             })
            })
            return null
        })
        q.awaitAll(function(error, results) {
            //console.log("all done with insert statements")
            return callback()
        })

    }

And there you have it: a node.js program that runs SQL queries that use Perl code embedded in PL/Perl functions.

The gory details can be found in my github repo for this.

When in doubt, use async.queue()

As with many other satisfied users, my goto library for handling asynchronous processing in node.js is the excellent async library. But what works in small doses doesn’t always work for larger problems.

Specifically, a common use pattern for me is to use it to handle checking things in CouchDB. Often I’m too lazy to code up a proper bulk docs call, so I’ll just run off a bunch of queries asynchronously. This evening I was testing some such code out and it was working for test cases with 10 and 100 items, but it fell over with “double callback” errors when I loaded up 9,000+ items to the list.

The problem of course is that async really means async. When you have an array with 9,000 items in it, and you use, say, filter on it like so:

var my_array=[...]
async.filter(my_array,
        function(item,cb){
                check_true_or_false_via_calling_couchdb(item,cb)
                return null
        },
        function(results_array){
                done(null,results_array)
                return null
        })

then what is happening is that filter is firing off as many hits as it can to CouchDB, which in this case is 9000+. This breaks things, with CouchDB shutting down, my SSH tunnels blocking things, etc etc.
The plumbing has gone “higgledly piggedly”, like that old Bloom County punchline.

So instead, use async’s queue:

var filtered_tasks = []
var q = async.queue(function(task,callback){
            filter_grids(task,function(doit){
                if(doit){
                    // keep these
                    filtered_tasks.push(task)
                }// drop those
                return callback()
            })
        },100)
// assign a callback for when the queue drains
q.drain = function() {
    //console.log('all items have been processed');
    cb_alltasks(null,filtered_tasks)
}
var tasks = _.map(grid_records
                 ,function(v,k){
                      var task = {'options':_.clone(config)}
                      task.cell_id = k
                      task.year = year
                      _.extend(task,v)
                      return task
                  })
q.push(tasks)

I chose the concurrency by playing with it. I 10 is too slow (took 25 seconds), 100 takes 9 seconds, and 1000 takes 9 seconds.

How I fire off multiple R jobs from node.js

Node.js has become my hammer of choice for most systems programming type jobs. In an earlier post I talked about how to use CouchDB to store the progress and state of jobs that need doing. Here I will demonstrate how I trigger those jobs and update CouchDB using a fairly simple node.js program.

Two key features of node that makes this program possible are spawn and being able to read and manipulate the environment variables.

var spawn = require('child_process').spawn
var env = process.env

Node.js is fully capable of using child processes. One can choose from exec, execFile, spawn, and fork. For my usage, the spawn function does exactly what I want—it creates a child process that reports back when it exits.

The other useful tool is the ability to access the current running environment using the process.env variable. This allows my program to take note of any environment variables that are already set, and to fill in any missing variables that my child process might need.

Concurrency via queue

Using spawn one can fire off as many jobs as desired. Suppose you have a machine with four cores, then calling spawn four times will efficiently use your processing power. Unfortunately it isn’t usually that simple. Instead, what typically happens is that you have a lot of separable data crunching tasks that need to be run, and you want to have four data processing jobs running at all times until the work is all done. To accomplish this, the spawn function will need to be called four times (to fill up the processors) and then will need to spawn a new job whenever one of the existing jobs finishes.

Continue reading

CouchDB and Erlang

Typical left-field introduction

As far as I understand it, the ability to run Erlang views natively is likely to be removed in the future because it does not offer any sandboxing of the content, and so the view can execute arbitrary commands on the server. So Erlang views are likely to go away.

Problem: big docs crash JSON.parse()

That said, I have a use case for Erlang views. Continue reading

Using superagent to authenticate a user-agent in node.js, plus a bonus bug!

Summary

This post describes how I use the superagent library to test access to restricted resources on my web server. This is something that I found to take a bit more effort than I expected, so I thought I’d write this up for the greater good.

Context

I am running a website in which some resources are open to the internet, while others require authentication versus our CAS server.

I have been logging into the CAS server using request. But in the interest of trying out different libraries and all that, I decided to rewrite my
method using superagent.

I need to log into the CAS server from node.js because I am writing tests that verify that the protected resources are hidden to non-authenticated users, and available to authenticated ones. Continue reading

Preventing server timeout in node.js

Update 2013-10-08. This is an old post but continues to get page views, so clearly it is still a problem. The feature is now documented (see link below) and this post is still correct.

Original post, 2011-03-30

This is something I spent an hour or so trying to track down today, so I thought I’d write it up in the hopes that someone else is spared the trouble.

First of all, I have both web client and server written in node.js. My server is designed such that it first checks for cached versions of URLs, and if the file doesn’t exist, then it hits the database and creates the file. This second step can take a long time, and so I wanted to write a utility script that I could trigger manually to update the cache of files whenever the database changes.

So I wrote the script using javascript and node, but was getting a strange error in which the client would die if the server took longer than two minutes to complete the request. No amount of abusing the code on the client would change this, even though the node.js source code seemed to indicate that no timeout was ever being set on the client socket, and most questions on the internet were about how to limit the timeout, not set it to forever.

Turns out the suspicious setTimeout( 2 * 60 * 1000 ) in http.js at line 986 was indeed the culprit. I originally ignored that line, as it was only setting the timeout for the server-side socket. But then, after editing that line in the code and recompiling (grasping at straws), re-running the client using the recompiled node and still getting exactly 2 minutes for the socket to die, it suddenly hit me that my server was timing out, not my client!

So with a single undocumented call inside of the handler in question, I had no more troubles:

res.writeHead(200, { 'Content-Type': 'application/json' });
res.connection.setTimeout(0); // this could take a while

Note the second line above. While the 0.4.4 API docs don’t state this fact, the http response object exposes the socket as the connection object (I found this on the mailing list in this thread http://groups.google.com/group/nodejs/browse_thread/thread/376d600fb87f498a). So res.connection gives a hook to the Socket’s setTimeout function, and setting that to zero drops the default 2 minute limit.

November 2012 I’m still doing this in node.js 0.6.8+ 0.8.x, setTimeout is still part of net, and the http server is still by default using a 2 minute timeout. And github is still awesome.

August 2014 update. Yes, still there: 2 minute timeout. Really this isn’t a bug that needs fixing, because who wants a server to go away for two minutes in these days of attention deficit disorder web surfing. But I wish it was documented in the docs. Apparently this behavior will change soon: https://github.com/joyent/node/issues/4704

And with the release of 0.10.x, but it is now documented. See server set timeout and response set timeout.

When I modify my own code to use 0.10.x, I will put up a new post.