Stream
"streams are arrays over time"
Node.js中的流(Stream)是一种抽象接口,用于表示数据传输的一种模型。
流是 Node.js 中处理流数据的抽象接口。 node:stream 模块提供了用于实现流接口的 API。 Node.js 提供了许多流对象。例如,对 HTTP 服务器的请求和 process.stdout 都是流实例。 流可以是可读的、可写的或两者兼而有之。所有流都是 EventEmitter 的实例。
Node.js 中有四种基本的流类型:
Writable- 可写流,可以向其写入数据,如fs.createWriteStream。Readable- 可读流,可以从中读取数据,如fs.createReadStream。Duplex- 可读又可写的流,如net.Socket。Transform- 在读写过程中可以修改或转换数据的Duplex流,如zlib.createGzip。 流被广泛应用在文件、网络、压缩等I/O操作中,是Node.js实现高性能I/O操作的重要抽象。可以通过pipe方法将流连接起来,构建流水线,实现流式的数据处理。
pipe
继承于 Stream 的对象都有一个 pipe 方法,pipe 方法接受一个可写流参数,使用流 pipe 方法,可读流会自动切换至流动模式。
举个例子,由于 process.stdin 是可读流, process.stdout 是可写流,所以可以用如下形式调用:
// index.js
process.stdin.pipe(process.stdout)作用就是会把输入流作为输出流输出,效果如下:
$ node index.js
hello # 手动输入 hello
hello
world # 手动输入 world
world原理
管道原理可以看作 data 和 end 两个事情的封装:
- process.stdin.pipe(process.stdout)
+ process.stdin.on('data', chunk => {
+ process.stdout.write(chunk)
+ })
+ process.stdin.on('end', () => {
+ process.stdout.end()
+ })chunk 的含义
当我们看到如下类似的代码,通过监听 data 事件来获得 chunk
stream.on('data', (chunk) => {
process.stdout.write(chunk)
})
stream.on('end', () => {
console.log('End')
})初次接触上面关于流的代码,可能会有如下疑问:
chunk是什么?- 为什么可以从“流”中得到“块”数据?
data事件什么时候被触发?end事件什么时候被触发?
我们来逐一解答,上面的 chunk 其实就是流中的数据块,如 this、is、some:
所以 chunk 就是流中的连续数据块,当一个“子流”和其它“子流”是独立的,他就是一个 chunk 数据块。
此时就会触发 data 事件,我们就能接收到 chunk
我们还有一个疑问是 end 事件的触发的时机呢?如何确定后面是否还有数据呢?
在 JS 中,我们可以推入 null 代表流的结尾,我们写一个自定义可读流
import { Readable } from 'node:stream'
class MyCustomReadableStream extends Readable {
constructor() {
super()
this.data = ['data1', 'data2', 'data3']
}
_read() {
const chunk = this.data.shift()
if (chunk) {
this.push(chunk)
}
else {
// 数据已经读取完,调用 push(null) 来触发 'end' 事件
this.push(null)
}
}
}
const customStream = new MyCustomReadableStream()
customStream.on('data', (chunk) => {
console.log('Received chunk of data:', chunk.toString())
})
customStream.on('end', () => {
console.log('Custom stream reading is complete.')
})$ tsx test.ts
Received chunk of data: data1
Received chunk of data: data2
Received chunk of data: data3
Custom stream reading is complete.流的消费
同步和异步
我们先来看如下场景
场景一:两个 listener 同步监听 data 事件
const customStream = new MyCustomReadableStream()
customStream.on('data', (chunk) => {
console.log('Comsumer 1 get:', chunk.toString())
})
customStream.on('data', (chunk) => {
console.log('Comsumer 2 get:', chunk.toString())
})
/*
Comsumer 1 get: data1
Comsumer 2 get: data1
Comsumer 1 get: data2
Comsumer 2 get: data2
Comsumer 1 get: data3
Comsumer 2 get: data3
*/场景二:一个异步 listener,一个同步 listener
const customStream = new MyCustomReadableStream()
customStream.on('data', (chunk) => {
console.log('Comsumer 1 get:', chunk.toString())
})
setTimeout(() => {
customStream.on('data', (chunk) => {
console.log('Comsumer 2 get:', chunk.toString()) // => 消费不到数据,(只能消费到1000ms之后的数据)
})
}, 1000)
/*
Comsumer 1 get: data1
Comsumer 1 get: data2
Comsumer 1 get: data3
*/场景三:一个异步 listener
const customStream = new MyCustomReadableStream()
setTimeout(() => {
customStream.on('data', (chunk) => {
console.log('Comsumer 2 get:', chunk.toString())
})
}, 1000)
/*
Comsumer 2 get: data1
Comsumer 2 get: data2
Comsumer 2 get: data3
*/上面的不同表现说明了:
- 一旦有监听者订阅了数据事件,emit 被触发
事件发射器将寻找订阅的事件监听器。然后它将使用给定的参数执行这些回调,它就会立即调用其 Callback (场景一)
- 查找 Callback 和消费 Callback 是同步进行的,不会等待所有的消费者,但一旦 Callback 生效,后面的数据不会被错过,例如场景二
- 直到找到 Callback 时,资源才会被消费,场景三
暂停和恢复
参考如下场景
const { Readable } = require('node:stream')
class MyStream extends Readable {
#count = 0
_read(size) {
this.push(':-)')
if (this.#count++ === 5)
this.push(null)
}
}
const stream = new MyStream()
stream.on('data', (chunk) => {
console.log(chunk.toString())
})
/**
1
1
:-)
1
:-)
1
:-)
1
:-)
1
:-)
:-)
*/const { Readable } = require('node:stream')
class MyStream extends Readable {
#count = 0
_read(size) {
this.push(':-)')
if (this.#count++ === 5)
this.push(null)
}
}
const stream = new MyStream()
stream.on('data', (chunk) => {
console.log(chunk.toString())
stream.pause() // Pause receiving data to simulate processing delay
setTimeout(() => {
stream.resume() // Resume after 1000ms
}, 1000)
})
/**
1
1
:-)
1
1
1
1
:-)
:-)
:-)
:-)
:-)
*/在上面的 Case1 中,通过往 stream 添加 data 时间监听器,触发 stream 的流动,_read 方法开始执行,与此同时,数据被监听器消费到
但是在 Case2 中,我们使用 stream.pause() 和 stream.resume() 方法手动对流的流动状态进行切换,可以看到,这并不影响数据的消费,尽管 _read 方法在流的暂停阶段也一直在执行,但是最终的数据也没有丢失。一旦 stream 恢复流动,监听器依然能得到数据
参考 NodeJs 的 backpressure 管理:https://blog.platformatic.dev/a-guide-to-reading-and-writing-nodejs-streams#heading-managing-backpressure-with-pause-and-resume
Object Mode
使用 Nodejs 创建的流都专门针对 string 、Buffer(orUnit8Array),但是流的实现可以用 Javascript 中的其他值(null 除外),这种流就称为以对象模式运行
创建流时,使用 objectMode 选项将流实例切换到对象模式。尝试将现有流切换到对象模式是不安全的
可读流的两种读取模式
可读流以两种模式运行: 流动和停止。(这个与上面提到的对象模式无关)
- 在流动模式下,数据会从系统底层读取,并尽快通过
EventEmitter提供给应用程序 - 在停止模式下,必须显式调用
Stream.read()才能从流中读取到数据
所有流都以停止模式开始,但可以通过如下方式切换到流动模式:
- 流添加了
data的事件监听器(正是我们上面的触发方式) - 调用了
Stream.resume()方法 - 调用
Stream.pipe()方法将数据发送到Writable
流动模式也可退回到停止模式:
- 流的
data的事件监听器消费完了所有的事件(接收到了 null)
TIP
流的流动模式将会维持事件循环的运行,从而导致进程不退出,直到变为停止模式