node.jsのstreamはディスクやネットワークのIO処理を行うためのオブジェクトだが、それ自身ではIO処理を行わず別のstreamの後処理、前処理を行うフィルタとしても使うことができる。npmを探すと書き出す前にデータの暗号化や圧縮を行うstreamライブラリなどが見つかるだろう。
というわけで今度はstream。以前はストリームのラッパークラスを作って行ごとに処理を行うコードを書いたが、あんまりnodeっぽくないので、よりnodeらしくstreamとpipeで行処理を実装してみた。以下ソース。
var stream = require('stream'); var util = require('util'); function LineStream() { this.writable = true; this.readable = true; this.ended = false; this.encoding = 'utf8'; this.buf = ''; this.searched = false; }; util.inherits(LineStream, stream.Stream); /* readable stream method */ LineStream.prototype.setEncoding = function(encoding) { this.encoding = encoding; }; LineStream.prototype.pause = function() { this.paused = true; }; LineStream.prototype.resume = function() { this.paused = false; this.searchLine(); }; LineStream.prototype.destroy = function() { // do nothing }; LineStream.prototype.close = LineStream.prototype.destroy; /* writable stream method */ LineStream.prototype.write = function(data) { if (this.ended) { return false; } if (Buffer.isBuffer(data)) { this.buf += data.toString(this.encoding); } else { this.buf += data; } if (!this.searched) { this.searchLine(); } return true; }; LineStream.prototype.searchLine = function() { if (this.paused) { return; } this.searched = true; var pos = this.buf.indexOf("\n"); if (pos >= 0) { var line = this.buf.substring(0, pos+1); this.buf = this.buf.substring(pos+1); this.emit('data', line); var self = this; process.nextTick(function(){ self.searchLine(); }); } else { if (this.ended) { if (this.buf) { this.emit('data', this.buf); } this.emit('end'); this.ended = false; } this.searched = false; } }; LineStream.prototype.end = function(data) { if (data) { this.write(data); } this.ended = true; };
var ls = new LineStream(); var count = 0; ls.on("data", function(data){ process.stdout.write(++count + ": " + data); }); var rs = require('fs').createReadStream("hoge.txt"); rs.setEncoding('utf8'); rs.pipe(ls);
フィルタとして使う場合は書き込み、読み込みの両方を行うので、ReadableかつWritableなStreamとして実装する。最低でもデータを受け取って処理するwriteメソッド、行に分割して書き出すemitイベントが必要だ。あとは辻褄が合うように残りのStreamのメソッドを実装して、イベントを発生させればよさそう。pipeメソッドはstreamから継承される。
とりあえず動いたが正直これで正しいのか分からん。node 0.10ではStreamの仕様が変わって、独自のStreamが実装しやすくなるとのこと。