Skip to content
0

node:worker_threads

worker 模块,充分利用多线程能力

通信

使用方式和进程通信类似,通过监听 message 事件接收消息和 postMessage 发送数据

import { Worker } from 'node:worker_threads'

const worker = new Worker('worker.js')

worker.on('message', message => {
  console.log(`Recived message from worker: ${message}`)
})

worker.postMessage('Hello from main thread')
import { parentPort } from 'node:worker_threads'

parentPort.on('message', message => {
  console.log(`Recived message from worker: ${message}`)
  parentPort.postMessage('Hello from worker thread')
})

实现异步线程池

预期的使用方式

const worker = new Worker(async ({ n }) => {
    return new Promise((r) => {
      setTimeout(() => {
        r(n + 1)
      }, Math.floor(Math.random() * 100))
    })
  })

  const results = await Promise.all([
    worker.run({ n: 1 }),
    worker.run({ n: 2 }),
    worker.run({ n: 3 }),
    worker.run({ n: 4 }),
    worker.run({ n: 5 }),
    worker.run({ n: 6 }),
    worker.run({ n: 7 }),
    worker.run({ n: 8 }),
    worker.run({ n: 9 })
  ])

  worker.stop()
  expect(results).toMatchObject([2, 3, 4, 5, 6, 7, 8, 9, 10])

具体实现,代码来自 yyx990803yyx990803/okie

import os from 'os'
import { Worker as _Worker } from 'worker_threads'

interface NodeWorker extends _Worker {
  currentResolve: ((value: any) => void) | null
  currentReject: ((err: Error) => void) | null
}

export interface Options {
  max?: number
}

export class Worker<Args extends any[], Ret = any> {
  private code: string
  private max: number
  private pool: NodeWorker[]
  private idlePool: NodeWorker[]
  private queue: [(worker: NodeWorker) => void, (err: Error) => void][]

  constructor(
    fn: (...args: Args) => Promise<Ret> | Ret,
    options: Options = {}
  ) {
    this.code = genWorkerCode(fn)
    this.max = options.max || Math.max(1, os.cpus().length - 1)
    this.pool = []
    this.idlePool = []
    this.queue = []
  }

  async run(...args: Args): Promise<Ret> {
    const worker = await this._getAvailableWorker()
    return new Promise<Ret>((resolve, reject) => {
      worker.currentResolve = resolve
      worker.currentReject = reject
      worker.postMessage(args)
    })
  }

  stop() {
    this.pool.forEach((w) => w.unref())
    this.queue.forEach(([_, reject]) =>
      reject(
        new Error('Main worker pool stopped before a worker was available.')
      )
    )
    this.pool = []
    this.idlePool = []
    this.queue = []
  }

  private async _getAvailableWorker(): Promise<NodeWorker> {
    // has idle one?
    if (this.idlePool.length) {
      return this.idlePool.shift()!
    }

    // can spawn more?
    if (this.pool.length < this.max) {
      const worker = new _Worker(this.code, { eval: true }) as NodeWorker

      worker.on('message', (res) => {
        worker.currentResolve && worker.currentResolve(res)
        worker.currentResolve = null
        this._assignDoneWorker(worker)
      })

      worker.on('error', (err) => {
        worker.currentReject && worker.currentReject(err)
        worker.currentReject = null
      })

      worker.on('exit', (code) => {
        const i = this.pool.indexOf(worker)
        if (i > -1) this.pool.splice(i, 1)
        if (code !== 0 && worker.currentReject) {
          worker.currentReject(
            new Error(`Worker stopped with non-0 exit code ${code}`)
          )
          worker.currentReject = null
        }
      })

      this.pool.push(worker)
      return worker
    }

    // no one is available, we have to wait
    let resolve: (worker: NodeWorker) => void
    let reject: (err: Error) => any
    const onWorkerAvailablePromise = new Promise<NodeWorker>((r, rj) => {
      resolve = r
      reject = rj
    })
    this.queue.push([resolve!, reject!])
    return onWorkerAvailablePromise
  }

  private _assignDoneWorker(worker: NodeWorker) {
    // someone's waiting already?
    if (this.queue.length) {
      const [resolve] = this.queue.shift()!
      resolve(worker)
      return
    }
    // take a rest.
    this.idlePool.push(worker)
  }
}

function genWorkerCode(fn: Function) {
  return `
const doWork = ${fn.toString()}

const { parentPort } = require('worker_threads')

parentPort.on('message', async (args) => {
  const res = await doWork(...args)
  parentPort.postMessage(res)
})
  `
}

Released under the MIT License.