Node.js中stream(流模块)的说明
为什么应该使用流 var http = require('http'); var server = http.createServer(function (req,res) {
}); 其次,上面的代码可能会造成很不好的用户体验,因为用户在接收到任何的内容之前首先需要等待程序将文件内容完全读入到内存中。 所幸的是,(req,res)参数都是流对象,这意味着我们可以使用一种更好的方法来实现上面的需求: var http = require('http'); var server = http.createServer(function (req,res) {
}); 除此之外,使用.pipe()方法还有别的好处,比如说它可以自动控制后端压力,以便在客户端连接缓慢的时候node可以将尽可能少的缓存放到内存中。 想要将数据进行压缩?我们可以使用相应的流模块完成这项工作! var http = require('http'); var server = http.createServer(function (req,res) {
}); 一旦你学会使用流api,你可以将这些流模块像搭乐高积木或者像连接水管一样拼凑起来,从此以后你可能再也不会去使用那些没有流API的模块获取和推送数据了。 流模块基础 pipe 无论哪一种流,都会使用.pipe()方法来实现输入和输出。 .pipe()函数很简单,它仅仅是接受一个源头src并将数据输出到一个可写的流dst中: src.pipe(dst) a.pipe(b).pipe(c).pipe(d) a.pipe(b); a | b | c | d readable流 Readable流可以产出数据,你可以将这些数据传送到一个writable,transform或者duplex流中,只需要调用pipe()方法: readableStream.pipe(dst) 现在我们就来创建一个readable流! var Readable = require('stream').Readable; var rs = new Readable; rs.pipe(process.stdout); $ node read0.js 需要注意的一点是我们在将数据输出到process.stdout之前已经将内容推送进readable流rs中,但是所有的数据依然是可写的。 这是因为在你使用.push()将数据推进一个readable流中时,一直要到另一个东西来消耗数据之前,数据都会存在一个缓存中。 然而,在更多的情况下,我们想要的是当需要数据时数据才会产生,以此来避免大量的缓存数据。 我们可以通过定义一个._read函数来实现按需推送数据: var Readable = require('stream').Readable; var c = 97;
}; rs.pipe(process.stdout); $ node read1.js _read函数也可以获取一个size参数来指明消耗者想要读取多少比特的数据,但是这个参数是可选的。 需要注意到的是你可以使用util.inherit()来继承一个Readable流。 为了说明只有在数据消耗者出现时,_read函数才会被调用,我们可以将上面的代码简单的修改一下: var Readable = require('stream').Readable; var c = 97 - 1; rs._read = function () {
}; rs.pipe(process.stdout); process.on('exit',function () {
}); $ node read2.js | head -c5 另外,process.stdout.on('error',fn)处理器也很重要,因为当head不再关心我们的程序输出时,操作系统将会向我们的进程发送一个SIGPIPE信号,此时process.stdout将会捕获到一个EPIPE错误。 上面这些复杂的部分在和操作系统相关的交互中是必要的,但是如果你直接和node中的流交互的话,则可有可无。 如果你创建了一个readable流,并且想要将任何的值推送到其中的话,确保你在创建流的时候指定了objectMode参数,Readable({ objectMode: true })。 消耗一个readable流 大部分时候,将一个readable流直接pipe到另一种类型的流或者使用through或者concat-stream创建的流中,是一件很容易的事情。但是有时我们也会需要直接来消耗一个readable流。 process.stdin.on('readable',function () {
}); $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js 当流结束时,.read()将返回null,因为此时已经没有更多的字节可以供我们获取了。 你也可以告诉.read()方法来返回n个字节的数据。虽然所有核心对象中的流都支持这种方式,但是对于对象流来说这种方法并不可用。 下面是一个例子,在这里我们制定每次读取3个字节的数据: process.stdin.on('readable',function () {
}); $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js process.stdin.on('readable',function () {
}); $ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 使用unshift()方法能够放置我们进行不必要的缓存拷贝。在下面的代码中我们将创建一个分割新行的可读解析器: var offset = 0; process.stdin.on('readable',function () {
}); $ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js writable流 一个writable流指的是只能流进不能流出的流: src.pipe(writableStream) 只需要定义一个._write(chunk,enc,next)函数,你就可以将一个readable流的数据释放到其中: var Writable = require('stream').Writable;
}; process.stdin.pipe(ws); $ (echo beep; sleep 1; echo boop) | node write0.js 第二个参数enc代表编码的字符串,但是只有在opts.decodeString为false的时候你才可以写一个字符串。 第三个参数,next(err)是一个回调函数,使用这个回调函数你可以告诉数据消耗者可以写更多的数据。你可以有选择性的传递一个错误对象error,这时会在流实体上触发一个emit事件。 在从一个readable流向一个writable流传数据的过程中,数据会自动被转换为Buffer对象,除非你在创建writable流的时候制定了decodeStrings参数为false,Writable({decodeStrings: false})。 如果你需要传递对象,需要指定objectMode参数为true,Writable({ objectMode: true })。 向一个writable流中写东西 如果你需要向一个writable流中写东西,只需要调用.write(data)即可。 process.stdout.write('beep boopn'); var fs = require('fs'); ws.write('beep '); setTimeout(function () {
},1000); $ node writing1.js 如果你想要等待缓存情况,可以监听drain事件。 transform流 你可以将transform流想象成一个流的中间部分,它可以读也可写,但是并不保存数据,它只负责处理流经它的数据。 duplex流 Duplex流是一个可读也可写的流,就好像一个电话,可以接收也可以发送语音。一个rpc交换是一个duplex流的最好的例子。如果你看到过下面这样的代码: a.pipe(b).pipe(a) classic流 Classic流是一个古老的接口,最早出现在node 0.4中。虽然现在不怎么用,但是我们最好还是来了解一下它的工作原理。 无论何时,只要一个流对象注册了一个data监听器,它就会自动的切换到classic模式,并且根据旧API的方式运行。 classic readable流 Classic readable流只是一个事件发射器,当有数据消耗者出现时发射emit事件,当输出数据完毕时发射end事件。 我们可以同构检查stream.readable来检查一个classic流对象是否可读。 下面是一个简单的readable流对象的例子,程序的运行结果将会输出A到J: var Stream = require('stream'); var c = 64;
},100); stream.pipe(process.stdout); $ node classic0.js process.stdin.on('data',function (buf) {
});
}); $ (echo beep; sleep 1; echo boop) | node classic1.js 如果你自己创建流对象,永远不要绑定data和end监听器。如果你需要和旧版本的流兼容,最好使用第三方库来实现.pipe()方法。 例如,你可以使用through模块来避免显式的使用data和end监听器: var through = require('through'); function write (buf) {
}
} $ (echo beep; sleep 1; echo boop) | node through.js var concat = require('concat-stream');
})); $ echo '{"beep":"boop"}' | node concat.js classic writable流 Classic writable流非常简单。其中只定义了.write(buf),.end(buf),以及.desctory()方法。其中.end(buf)的参数buf是可选参数,但是一般来说node程序员还是喜欢使用.end(buf)这种写法。 本文参考 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |