Stream
前端精进(后端方向)-Node.js 全解
第一个Stream例子
const fs = require('fs' )
const stream = fs. createWriteStream(' ./big file.txt' )
for(let i=0; i<10000; i++){
stream. write(~这是第${i} 行内容,我们需要很多很多内容,
要不停地写文件啊啊啊啊啊啊回车\n )
stream.end() //别忘了关掉stream
console.log( ' done' )
分析
打开流,多次往里面塞内容,关闭流
看起来就是可以多次写嘛,没什么大不了的
最终我们得到一个128兆左右的文件
Stream-流
释义 stream 是水流,但默认没有水
stream.write可以让水流中有水(数据)
每次写的小数据叫做chunk(块)
产生数据的一段叫做source(源头)
得到数据的一段叫做sink(水池)
第二个例子
//请先引入fs和http
const server = http. createServer( )
server.on( ' request',( request, response)=>{
fs.readFile(' ./big_ file. txt', (error ,
data)=>{
if(error) throw error
response.end( data )
console.log( ' done ' )
})
})
server.listen(8888)
console.log("8888")
分析
用任务管理器看看Node.js 内存占用,大概130Mb
第三个例子
用Stream 改写第二个例子 // 请先引入 fs 和 http
const server = http.createServer()
server.on('request', (request, response)=>{
const stream = fs.createReadStream('/big_file.txt')
stream.pipe(response)
stream.on('end',()=> console.log('done'))
})
server.listen(8888)
分析
查看Node.js内存占用,基本不会高于30Mb
可以用curl来做这段测试,速度会快很多
文件stream和response stream通过管道相连
管道
释义
两个流可以用一个管道相连
stream1的末尾连接上stream2的开端
只要 stream1有数据,就会流到 stream2
常用代码
stream1.pipe(stream2)
链式操作 a.pipe(b).pipe(c)
//等价于
a.pipe(b)
b.pipe(c)
管道续
管道可以通过事件实现
// stream1 一有数据就基给stream2
stream1.on('data', (chunk)=>{
stream2.Write( chunk)
})
/ stream1 停了,就停掉stream2
stream1.on('end',()=>{
stream2.end( )
})
Stream对象的原型链
s=fs.createReadStream(path)
//4.js
const fs = require('fs')
const s = fs.createReadStream('./big_file.txt')
console.log(s)
//这里打印出来的东西不能继续看下面一层是什么,需要使用
//node --inspect-brk 4.js
那么它的对象层级为
自身属性(由fs.ReadStream 构造)
原型:stream.Readable.prototype
二级原型: stream.Stream.prototype
三级原型: events.EventEmitter.prototype
四级原型: Object.prototype
Stream 对象都继承了EventEmitter
支持的事件和方法
drain面试常考:某一次写完了
finish: 整个都写完了
cork/uncork: 强制把所有写入的数据都缓冲到内存中。
Stream分类 名称 特点 Readable 可读
Writable 可写
Duplex 可读可写(双向) 读和写不会有交叉
Transform 可读可写(变化) 自己写自己读,中间会有个转化器
Readable Stream
静止态 paused 和流动态 flowing
默认处于 paused 态
添加 data事件监听,它就变为 flowing态
删掉 data事件监听,它就变为 paused 态
pause()可以将它变为 paused
resume() 可以将它变为 flowing
Writable Stream
drain流干了事件
表示可以加点水了
我们调用 stream.write(chunk) 的时候,可能会得到false
false的意思是你写太快了,数据积压了
这个时候我们就不能再write了,要监听drain
等drain事件触发了,我们才能继续write
finish事件 调用 stream.end()之后,而且
缓冲区数据都已经传给底层系统之后,
触发finish事件
之前讲的内容, 都是在使用Stream, 怎么创建自己的流,给别人用?
创建一个Writable Stream
const { Writable } = require( "'stream') ;
const outStream = new Writable( {
write( chunk, encoding, callback) {
console. log( chunk. toString() );
callback( );
});
process. stdin. pipe( outStream) ;
//保存文件为writable.js 然后用node运行
//不管你输入什么,都会得到相同的结果
创建一个 Readable Stream
const { Readable } = require("stream");
const inStream = new Readable();
inStream.push("ABCDEFGHIJKLM") ;
inStream .push("NOPQRSTUVWXYZ");
inStream.push(null); // No more data
inStream.pipe(process.stdout); // 保存文件为 readable.js 然后用 node 运行
// 我们先把所有数据都 push 进去了,然后 pipe
续
const { Readable } = require("stream"); }
const inStream = new Readable({
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
7)
})
inStream.currentCharCode = 65 // A
inStream.pipe(process.stdout)
// 保存文件为 readable2.js然后用 node 运行
// 这次的数据是按需供给的,对方调用read 我们才会给一次数据
Duplex Stream
const { Duplex } = require("stream");
const inoutStream =new Duplex({ write(chunk, encoding, callback) { console.log(chunk.toString());
callback();
},
read(size) {
this.push(String.fromCharCode(this.currentCharCode++)); if (this.currentCharCode > 90) {
this.push(null);
});
inoutStream.currentCharCode = 65:
process.stdin.pipe(inoutStream).pipe(process.stdout);
// 源代码来源见参考链接
Transform Stream
const { Transform }= require("stream");
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCaseTr)
.pipe(process.stdout);
// 源代码来源见参考链接内置的
内置的Transform Stream
const fs =require("fs");
const zlib = require("zlib");//做gzip压缩
const file = process.argv[2];//读第二个参数,也就是用户传的一个路径
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + ".gz"));
续
const fs = require("fs");
const zlib = require(" zlib");
const file = process.argv[2];
fs.createReadStream(file)
.pipe( zlib. createGzip( ))
.on("data", () => process.stdout.write("."))
.pipe(fs.createWriteStream(file + ".zz"))
.on("finish", () => console.log( "Done") ) ;
续2
//略
const file = process.argv[2];
const {Transform} = require("stream");
const reportProgress = new Transform({
transform(chunk, encoding, callback) {
process.stdout.write(".");
this.push(chunk);
callback(null, chunk);
}
});
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(reportProgress)
.pipe(fs.createWriteStream(file + ".zz"))
.on("finish", () => console.log("Done"));
续3
压缩和加密
const CRYPTO require("crypto");
fs.createReadStream(file)
.pipe(crypto.createCipher("aes192", "123456")) //createCipher已经弃用
.pipe(zlib.createGzip())
.pipe(reportProgress)
.pipe(fs.createWriteStream(file + ".zz"))
.on("finish", () => console.log("Done"));
Stream用途非常广
在Node.js里随处可见 Node.js 中的 Steam
数据流中的积压问题
背压BackPressure
什么是背压?
往一个流里面写数据写的太多了,就会堵住,堵住了之后该如何把积压的流畅起来?(会用到cork)
Node.js Stream文档
文档里面有你想要知道的所有细节
面试题