Tiptoe across the stream

I used streams in node.js last night. I can’t say I “get” them or that I am fluent in all use cases, but I can at least contribute one super basic use case that I couldn’t find explained anywhere.

In my application, I have a library that converts GeoJSON files into CSV files, dumping the geometry data and just preserving the properties. Because that was one of the design criteria, the GeoJSON documents have buried in their ‘properties’ hash a ‘header’ object that I pull out and use as column headers, and a ‘data’ object that is an array of data, matching the headers.

When I wrote the code, I wanted a streaming JSON parser, but I didn’t like the ones I tested, so instead I just parse the whole doc, find the header, find the data, and pass the result to the csv library.

Last night `npm install` on my laptop upgraded the csv library (http://www.adaltas.com/projects/node-csv) to 0.2.x, which brought with it a increased emphasis on streams along with some minor api changes. I also was working on merging multiple files together, which meant I really wanted to stream the results as I completed each file.

But my problem was that I had no concept of how to write data to the csv stream and get it to work. The examples and test cases all had things like

csv()
.from.path(__dirname+'/columns.in', {
    columns: true
})
.to.stream(process.stdout, {
    columns: ['id', 'name'],
    end: false
})

But I wanted to write stuff over and over again. I tried a minimal hacky fix by just piping one file at a time to the res stream, but somehow my code kept closing the stream. After repeatedly re-reading the same passages over and over again, I finally figured out that what I wanted was the “.write()” method of the stream interface. I want to call “csv.write(row)” when it is available, and then just let the csv do the piping to the res stream. So I rewrote things a bit, and this is what I ended up with.

First, I create the csv writer. Because this is an express web app, I want the output to go to the response object in the function call. I did not set the ‘source’ of the stream, because instead I am going to be writing that line by line.

return function csv_formatter_service(req,res,next){
    var csv_writer = csv()
    csv_writer.pipe(res)
...

Then I do some uninteresting stuff in callbacks to get filenames that I want to concatenate, and then I do the following to actually do the CSV conversion:

function(err,files){
    if(err) return next(err)
    res.writeHead(200, { 'Content-Type': 'text/csv' });
    // create a common builder so that I only dump one header row
    // and can number each record consecutively
    var builder = buildCSV({'columns':column_names}
                          ,function(err,rows){
                               // simple callback for builder, used for debugging mostly
                               if(err){return err;}
                               return null;
                           })
    // handy dandy async call makes sure that each file is processed consecutively
    // so that their output is not jumbled together on the stream
    async.forEachSeries(files
                       ,function(file,cb){
                            // eventually replace the read file with a streaming JSON parser
                            fs.readFile(file,'utf8', function(err,data){
                                // pass the data and csv_writer to the function 
                                // that parses the JSON
                                var err = builder(err,data,csv_writer)
                                // err is the output of the callback defined above (null or error)
                                cb(err);
                            })
                        }
                       ,function(err){
                            // simple placeholder callback for forEachSeries.  end the response
                            res.end(err)
                            return null
                        })
    // emacs complains if I don't return null, and anyway it is good practice
    return null
)

Now for the JSON parser code. Whereas before I would parse the JSON doc, get an array of lines, and then create the csv() call with that array as the source, now I skip a step and just write each line to the csv writer object passed in.

function buildCSV(options,cb){
    var header_row
    var column_names = options.columns ? options.columns : {}
    var total_data_idx = -1

    return function(err,doc,writestream){
        var fcoll = JSON.parse(doc)
        var features = fcoll.features;
        var rows = 0;
        var datacols=[];
        var features_length = features.length;
        // code to figure out the header row, then 
        if(header_row === undefined){
            header_row = _.map(['i','document'].concat(datacols)
                              ,function(key){
                                   return column_names[key] || key
                               })

            // Woo Hoo, just *write* data to the stream, and off it goes to the client!
            writestream.write(header_row)
            
        }
        // now process the data rows
        for (var i = 0; i < features_length; i++){
            var props = features[i].properties;
            var datalen = props.data.length;
            if(props.data !== undefined && Array.isArray(props.data) && datalen){
                for (var dataidx = 0; dataidx < datalen; dataidx++){
                    total_data_idx++
                    var width = props.data[dataidx].length
                    var entry = [total_data_idx,props.document]
                    entry = entry.concat(
                        datacols.map(function(key,idx){
                            if(props.data[dataidx] === undefined
                             || props.data[dataidx][idx] === undefined){
                                return '';
                            }else{
                                return props.data[dataidx][idx] ;
                            }
                        })
                    )

                    // Again, no fuss no muss no temporary arrays.  
                    // Just write to the stream and send the data on its way
                    writestream.write(entry)
                    rows++  // no, I'm not sure why I am tracking rows of data separately but whatever
                            // probably debugging cruft.
                }
            }else{
                // no data
                total_data_idx++  // even in the no data case, I send a row of commas, 
                                  // so increment the counter

               // record of empties, I choose you! 
               writestream.write([total_data_idx,props.docid]
                                                       .concat(
                                                           datacols.map(function(key){
                                                               // this map returns nothing, 
                                                               // to fill out the commas one per column
                                                               return ''; 
                                                           })
                                                       )
                                 );
 
            }
        }
        return cb(null,rows)
    };

And that is it, my minor revelation. With streams, you can just write to them, and the data goes on its way. In this case, the csv library has been set up to handle arrays and strings and such, so I can just write arrays. Specifically, the docs state:

write(data, [preserve])

Implementation of the Writable Stream API with a larger signature. Data may be a string, a buffer, an array or an object.

If data is a string or a buffer, it could span multiple lines. If data is an object or an array, it must represent a single line. Preserve is for line which are not considered as CSV data.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s