node:worker_threads
worker 模块,充分利用多线程能力
通信
使用方式和进程通信类似,通过监听 message 事件接收消息和 postMessage 发送数据
import { Worker } from 'node:worker_threads'
const worker = new Worker('worker.js')
worker.on('message', message => {
console.log(`Recived message from worker: ${message}`)
})
worker.postMessage('Hello from main thread')import { parentPort } from 'node:worker_threads'
parentPort.on('message', message => {
console.log(`Recived message from worker: ${message}`)
parentPort.postMessage('Hello from worker thread')
})实现异步线程池
预期的使用方式
const worker = new Worker(async ({ n }) => {
return new Promise((r) => {
setTimeout(() => {
r(n + 1)
}, Math.floor(Math.random() * 100))
})
})
const results = await Promise.all([
worker.run({ n: 1 }),
worker.run({ n: 2 }),
worker.run({ n: 3 }),
worker.run({ n: 4 }),
worker.run({ n: 5 }),
worker.run({ n: 6 }),
worker.run({ n: 7 }),
worker.run({ n: 8 }),
worker.run({ n: 9 })
])
worker.stop()
expect(results).toMatchObject([2, 3, 4, 5, 6, 7, 8, 9, 10])具体实现,代码来自 yyx990803 的 yyx990803/okie
import os from 'os'
import { Worker as _Worker } from 'worker_threads'
interface NodeWorker extends _Worker {
currentResolve: ((value: any) => void) | null
currentReject: ((err: Error) => void) | null
}
export interface Options {
max?: number
}
export class Worker<Args extends any[], Ret = any> {
private code: string
private max: number
private pool: NodeWorker[]
private idlePool: NodeWorker[]
private queue: [(worker: NodeWorker) => void, (err: Error) => void][]
constructor(
fn: (...args: Args) => Promise<Ret> | Ret,
options: Options = {}
) {
this.code = genWorkerCode(fn)
this.max = options.max || Math.max(1, os.cpus().length - 1)
this.pool = []
this.idlePool = []
this.queue = []
}
async run(...args: Args): Promise<Ret> {
const worker = await this._getAvailableWorker()
return new Promise<Ret>((resolve, reject) => {
worker.currentResolve = resolve
worker.currentReject = reject
worker.postMessage(args)
})
}
stop() {
this.pool.forEach((w) => w.unref())
this.queue.forEach(([_, reject]) =>
reject(
new Error('Main worker pool stopped before a worker was available.')
)
)
this.pool = []
this.idlePool = []
this.queue = []
}
private async _getAvailableWorker(): Promise<NodeWorker> {
// has idle one?
if (this.idlePool.length) {
return this.idlePool.shift()!
}
// can spawn more?
if (this.pool.length < this.max) {
const worker = new _Worker(this.code, { eval: true }) as NodeWorker
worker.on('message', (res) => {
worker.currentResolve && worker.currentResolve(res)
worker.currentResolve = null
this._assignDoneWorker(worker)
})
worker.on('error', (err) => {
worker.currentReject && worker.currentReject(err)
worker.currentReject = null
})
worker.on('exit', (code) => {
const i = this.pool.indexOf(worker)
if (i > -1) this.pool.splice(i, 1)
if (code !== 0 && worker.currentReject) {
worker.currentReject(
new Error(`Worker stopped with non-0 exit code ${code}`)
)
worker.currentReject = null
}
})
this.pool.push(worker)
return worker
}
// no one is available, we have to wait
let resolve: (worker: NodeWorker) => void
let reject: (err: Error) => any
const onWorkerAvailablePromise = new Promise<NodeWorker>((r, rj) => {
resolve = r
reject = rj
})
this.queue.push([resolve!, reject!])
return onWorkerAvailablePromise
}
private _assignDoneWorker(worker: NodeWorker) {
// someone's waiting already?
if (this.queue.length) {
const [resolve] = this.queue.shift()!
resolve(worker)
return
}
// take a rest.
this.idlePool.push(worker)
}
}
function genWorkerCode(fn: Function) {
return `
const doWork = ${fn.toString()}
const { parentPort } = require('worker_threads')
parentPort.on('message', async (args) => {
const res = await doWork(...args)
parentPort.postMessage(res)
})
`
}