Skip to content
0

Stream

"streams are arrays over time"

Node.js中的流(Stream)是一种抽象接口,用于表示数据传输的一种模型。

流是 Node.js 中处理流数据的抽象接口。 node:stream 模块提供了用于实现流接口的 API。 Node.js 提供了许多流对象。例如,对 HTTP 服务器的请求和 process.stdout 都是流实例。 流可以是可读的、可写的或两者兼而有之。所有流都是 EventEmitter 的实例。

Node.js 中有四种基本的流类型:

  • Writable - 可写流,可以向其写入数据,如 fs.createWriteStream
  • Readable - 可读流,可以从中读取数据,如 fs.createReadStream
  • Duplex - 可读又可写的流,如 net.Socket
  • Transform - 在读写过程中可以修改或转换数据的 Duplex 流,如 zlib.createGzip。 流被广泛应用在文件、网络、压缩等I/O操作中,是Node.js实现高性能I/O操作的重要抽象。可以通过pipe方法将流连接起来,构建流水线,实现流式的数据处理。

pipe

继承于 Stream 的对象都有一个 pipe 方法,pipe 方法接受一个可写流参数,使用流 pipe 方法,可读流会自动切换至流动模式。

举个例子,由于 process.stdin 是可读流, process.stdout 是可写流,所以可以用如下形式调用:

// index.js
process.stdin.pipe(process.stdout)

作用就是会把输入流作为输出流输出,效果如下:

$ node index.js
hello # 手动输入 hello
hello
world # 手动输入 world
world

原理

管道原理可以看作 dataend 两个事情的封装:

- process.stdin.pipe(process.stdout)
+ process.stdin.on('data', chunk => {
+     process.stdout.write(chunk)
+ })
+ process.stdin.on('end', () => {
+     process.stdout.end()
+ })

chunk 的含义

当我们看到如下类似的代码,通过监听 data 事件来获得 chunk

stream.on('data', (chunk) => {
  process.stdout.write(chunk)
})

stream.on('end', () => {
  console.log('End')
})

初次接触上面关于流的代码,可能会有如下疑问:

  • chunk 是什么?
  • 为什么可以从“流”中得到“块”数据?
  • data 事件什么时候被触发?
  • end 事件什么时候被触发?

我们来逐一解答,上面的 chunk 其实就是流中的数据块,如 thisissome

undefined
undefined

所以 chunk 就是流中的连续数据块,当一个“子流”和其它“子流”是独立的,他就是一个 chunk 数据块。

此时就会触发 data 事件,我们就能接收到 chunk

我们还有一个疑问是 end 事件的触发的时机呢?如何确定后面是否还有数据呢?

在 JS 中,我们可以推入 null 代表流的结尾,我们写一个自定义可读流

import { Readable } from 'node:stream'

class MyCustomReadableStream extends Readable {
  constructor() {
    super()
    this.data = ['data1', 'data2', 'data3']
  }

  _read() {
    const chunk = this.data.shift()
    if (chunk) {
      this.push(chunk)
    }
    else {
      // 数据已经读取完,调用 push(null) 来触发 'end' 事件
      this.push(null)
    }
  }
}

const customStream = new MyCustomReadableStream()

customStream.on('data', (chunk) => {
  console.log('Received chunk of data:', chunk.toString())
})

customStream.on('end', () => {
  console.log('Custom stream reading is complete.')
})
$ tsx test.ts
Received chunk of data: data1
Received chunk of data: data2
Received chunk of data: data3
Custom stream reading is complete.

流的消费

同步和异步

我们先来看如下场景

场景一:两个 listener 同步监听 data 事件

const customStream = new MyCustomReadableStream()

customStream.on('data', (chunk) => {
  console.log('Comsumer 1 get:', chunk.toString())
})
customStream.on('data', (chunk) => {
  console.log('Comsumer 2 get:', chunk.toString())
})

/*
Comsumer 1 get: data1
Comsumer 2 get: data1
Comsumer 1 get: data2
Comsumer 2 get: data2
Comsumer 1 get: data3
Comsumer 2 get: data3
*/

场景二:一个异步 listener,一个同步 listener

const customStream = new MyCustomReadableStream()

customStream.on('data', (chunk) => {
  console.log('Comsumer 1 get:', chunk.toString())
})
setTimeout(() => {
  customStream.on('data', (chunk) => {
    console.log('Comsumer 2 get:', chunk.toString()) // => 消费不到数据,(只能消费到1000ms之后的数据)
  })
}, 1000)

/*
Comsumer 1 get: data1
Comsumer 1 get: data2
Comsumer 1 get: data3
*/

场景三:一个异步 listener

const customStream = new MyCustomReadableStream()

setTimeout(() => {
  customStream.on('data', (chunk) => {
    console.log('Comsumer 2 get:', chunk.toString())
  })
}, 1000)

/*
Comsumer 2 get: data1
Comsumer 2 get: data2
Comsumer 2 get: data3
*/

上面的不同表现说明了:

  1. 一旦有监听者订阅了数据事件,emit 被触发

事件发射器将寻找订阅的事件监听器。然后它将使用给定的参数执行这些回调,它就会立即调用其 Callback (场景一)

  • 查找 Callback 和消费 Callback 是同步进行的,不会等待所有的消费者,但一旦 Callback 生效,后面的数据不会被错过,例如场景二
  • 直到找到 Callback 时,资源才会被消费,场景三

暂停和恢复

参考如下场景

const { Readable } = require('node:stream')

class MyStream extends Readable {
  #count = 0
  _read(size) {
    this.push(':-)')
    if (this.#count++ === 5)
      this.push(null)
  }
}

const stream = new MyStream()

stream.on('data', (chunk) => {
  console.log(chunk.toString())
})
/**
1
1
:-)
1
:-)
1
:-)
1
:-)
1
:-)
:-)
 */
const { Readable } = require('node:stream')

class MyStream extends Readable {
  #count = 0
  _read(size) {
    this.push(':-)')
    if (this.#count++ === 5)
      this.push(null)
  }
}

const stream = new MyStream()

stream.on('data', (chunk) => {
  console.log(chunk.toString())
  stream.pause() // Pause receiving data to simulate processing delay
  setTimeout(() => {
    stream.resume() // Resume after 1000ms
  }, 1000)
})
/**
1
1
:-)
1
1
1
1
:-)
:-)
:-)
:-)
:-)
 */

在上面的 Case1 中,通过往 stream 添加 data 时间监听器,触发 stream 的流动,_read 方法开始执行,与此同时,数据被监听器消费到

但是在 Case2 中,我们使用 stream.pause()stream.resume() 方法手动对流的流动状态进行切换,可以看到,这并不影响数据的消费,尽管 _read 方法在流的暂停阶段也一直在执行,但是最终的数据也没有丢失。一旦 stream 恢复流动,监听器依然能得到数据

参考 NodeJs 的 backpressure 管理:https://blog.platformatic.dev/a-guide-to-reading-and-writing-nodejs-streams#heading-managing-backpressure-with-pause-and-resume

Object Mode

使用 Nodejs 创建的流都专门针对 string 、Buffer(orUnit8Array),但是流的实现可以用 Javascript 中的其他值(null 除外),这种流就称为以对象模式运行

创建流时,使用 objectMode 选项将流实例切换到对象模式。尝试将现有流切换到对象模式是不安全的

可读流的两种读取模式

可读流以两种模式运行: 流动和停止。(这个与上面提到的对象模式无关)

  • 在流动模式下,数据会从系统底层读取,并尽快通过 EventEmitter 提供给应用程序
  • 在停止模式下,必须显式调用 Stream.read() 才能从流中读取到数据

所有流都以停止模式开始,但可以通过如下方式切换到流动模式:

  • 流添加了 data 的事件监听器(正是我们上面的触发方式)
  • 调用了 Stream.resume() 方法
  • 调用 Stream.pipe() 方法将数据发送到 Writable

流动模式也可退回到停止模式:

  • 流的 data 的事件监听器消费完了所有的事件(接收到了 null)

TIP

流的流动模式将会维持事件循环的运行,从而导致进程不退出,直到变为停止模式

Released under the MIT License.