streamを使ってnode.jsで行処理する

node.jsのstreamはディスクやネットワークのIO処理を行うためのオブジェクトだが、それ自身ではIO処理を行わず別のstreamの後処理、前処理を行うフィルタとしても使うことができる。npmを探すと書き出す前にデータの暗号化や圧縮を行うstreamライブラリなどが見つかるだろう。

というわけで今度はstream。以前はストリームのラッパークラスを作って行ごとに処理を行うコードを書いたが、あんまりnodeっぽくないので、よりnodeらしくstreamとpipeで行処理を実装してみた。以下ソース。

var stream = require('stream');
var util = require('util');

function LineStream() {
    this.writable = true;
    this.readable = true;
    this.ended = false;
    this.encoding = 'utf8';
    this.buf = '';
    this.searched = false;
};

util.inherits(LineStream, stream.Stream);

/* readable stream method */

LineStream.prototype.setEncoding = function(encoding) {
    this.encoding = encoding;
};

LineStream.prototype.pause = function() {
    this.paused = true;
};

LineStream.prototype.resume = function() {
    this.paused = false;
    this.searchLine();
};

LineStream.prototype.destroy = function() {
    // do nothing
};


LineStream.prototype.close = LineStream.prototype.destroy;

/* writable stream method */

LineStream.prototype.write = function(data) {
    if (this.ended) {
        return false;
    }
    
    if (Buffer.isBuffer(data)) {
        this.buf += data.toString(this.encoding); 
    } else {
        this.buf += data;
    }

    if (!this.searched) {
        this.searchLine();
    }
    return true;
};


LineStream.prototype.searchLine = function() {
    if (this.paused) {
        return;
    }
    this.searched = true;

    var pos = this.buf.indexOf("\n");
    if (pos >= 0) {    
        var line = this.buf.substring(0, pos+1);
        this.buf = this.buf.substring(pos+1);
        this.emit('data', line);
        var self = this;
        process.nextTick(function(){    
            self.searchLine();
        });
    } else {
        if (this.ended) {
            if (this.buf) {
                this.emit('data', this.buf);
            }
            this.emit('end');
            this.ended = false;
        }
        this.searched = false;
    }
};

LineStream.prototype.end = function(data) {
    if (data) {
        this.write(data);
    }
    this.ended = true;
};
var ls = new LineStream();
var count = 0;
ls.on("data", function(data){ 
    process.stdout.write(++count + ": " + data);
});

var rs = require('fs').createReadStream("hoge.txt");
rs.setEncoding('utf8');

rs.pipe(ls);


フィルタとして使う場合は書き込み、読み込みの両方を行うので、ReadableかつWritableなStreamとして実装する。最低でもデータを受け取って処理するwriteメソッド、行に分割して書き出すemitイベントが必要だ。あとは辻褄が合うように残りのStreamのメソッドを実装して、イベントを発生させればよさそう。pipeメソッドはstreamから継承される。

とりあえず動いたが正直これで正しいのか分からん。node 0.10ではStreamの仕様が変わって、独自のStreamが実装しやすくなるとのこと。