streamに汎用フィルタ処理

今度は任意のfunctionをフィルタ関数として使用するstreamを書いてみた。先日書いたテキストの行処理を行うストリームと組み合わせると便利に使えそう。

var stream = require('stream');
var util = require('util');

function StreamFilter(filter){
    this.writable = true;
    this.readable = true;
    this.filter = filter;   
}

util.inherits(StreamFilter, stream.Stream);

StreamFilter.prototype.write = function(data) {
	var filteredData = this.filter(data);
	// フィルタ結果がundefinedでなければdataイベントを発火する
	if (filteredData !== undefined) {
	    this.emit("data", filteredData);
	}
};
StreamFilter.prototype.end = function(data) {
    if (data) {
        this.write(data);
    }
};
var LineStream = require('./line').LineStream;
var ls = new LineStream();

var fs = require('fs')
var rs = fs.createReadStream("error.log");
rs.setEncoding('utf8');

rs
.pipe(ls)
.pipe(new StreamFilter(function(data){
    // ERRORという文字列を含む行のみ抜き出す 
    if (data.indexOf("ERROR") > -1) return data;
}))
.pipe(process.stdout);

streamでgrepできちゃいました。