Skip to content

异步上下文跟踪

[稳定版: 2 - 稳定版]

稳定版: 2 稳定性: 2 - 稳定版

源代码: lib/async_hooks.js

简介

这些类用于关联状态并在回调和 Promise 链中传播状态。它们允许在 Web 请求或任何其他异步持续时间的整个生命周期中存储数据。它类似于其他语言中的线程局部存储。

AsyncLocalStorageAsyncResource 类是 node:async_hooks 模块的一部分:

js
import { AsyncLocalStorage, AsyncResource } from 'node:async_hooks'
js
const { AsyncLocalStorage, AsyncResource } = require('node:async_hooks')

类: AsyncLocalStorage

[历史]

版本变更
v16.4.0AsyncLocalStorage 现在是稳定版。之前,它是实验性的。
v13.10.0, v12.17.0新增于: v13.10.0, v12.17.0

此类创建在异步操作中保持一致的存储。

虽然你可以在 node:async_hooks 模块之上创建你自己的实现,但 AsyncLocalStorage 应该是首选,因为它是一个高性能且内存安全的实现,其中包含一些不容易实现的重要优化。

以下示例使用 AsyncLocalStorage 来构建一个简单的日志记录器,该记录器为传入的 HTTP 请求分配 ID 并将其包含在每个请求中记录的消息中。

js
import http from 'node:http'
import { AsyncLocalStorage } from 'node:async_hooks'

const asyncLocalStorage = new AsyncLocalStorage()

function logWithId(msg) {
  const id = asyncLocalStorage.getStore()
  console.log(`${id !== undefined ? id : '-'}:`, msg)
}

let idSeq = 0
http
  .createServer((req, res) => {
    asyncLocalStorage.run(idSeq++, () => {
      logWithId('start')
      // Imagine any chain of async operations here
      setImmediate(() => {
        logWithId('finish')
        res.end()
      })
    })
  })
  .listen(8080)

http.get('http://localhost:8080')
http.get('http://localhost:8080')
// Prints:
//   0: start
//   1: start
//   0: finish
//   1: finish
js
const http = require('node:http')
const { AsyncLocalStorage } = require('node:async_hooks')

const asyncLocalStorage = new AsyncLocalStorage()

function logWithId(msg) {
  const id = asyncLocalStorage.getStore()
  console.log(`${id !== undefined ? id : '-'}:`, msg)
}

let idSeq = 0
http
  .createServer((req, res) => {
    asyncLocalStorage.run(idSeq++, () => {
      logWithId('start')
      // Imagine any chain of async operations here
      setImmediate(() => {
        logWithId('finish')
        res.end()
      })
    })
  })
  .listen(8080)

http.get('http://localhost:8080')
http.get('http://localhost:8080')
// Prints:
//   0: start
//   1: start
//   0: finish
//   1: finish

每个 AsyncLocalStorage 实例都维护一个独立的存储上下文。多个实例可以安全地同时存在,而不会互相干扰数据。

new AsyncLocalStorage()

[历史]

版本变更
v19.7.0, v18.16.0移除了实验性 onPropagate 选项。
v19.2.0, v18.13.0添加了 onPropagate 选项。
v13.10.0, v12.17.0新增于:v13.10.0, v12.17.0

创建一个新的 AsyncLocalStorage 实例。存储仅在 run() 调用内或 enterWith() 调用后提供。

静态方法: AsyncLocalStorage.bind(fn)

新增于: v19.8.0, v18.16.0

[稳定性: 1 - 实验性]

稳定性: 1 稳定性: 1 - 实验性

  • fn <函数> 要绑定到当前执行上下文的函数。
  • 返回值: <函数> 一个在捕获的执行上下文中调用 fn 的新函数。

将给定的函数绑定到当前执行上下文。

静态方法: AsyncLocalStorage.snapshot()

新增于: v19.8.0, v18.16.0

[稳定性: 1 - 实验性]

稳定性: 1 稳定性: 1 - 实验性

  • 返回值: <Function> 一个新的函数,其签名为 (fn: (...args) : R, ...args) : R

捕获当前执行上下文并返回一个函数,该函数接受一个函数作为参数。每当调用返回的函数时,它都会在捕获的上下文中调用传递给它的函数。

js
const asyncLocalStorage = new AsyncLocalStorage()
const runInAsyncScope = asyncLocalStorage.run(123, () => AsyncLocalStorage.snapshot())
const result = asyncLocalStorage.run(321, () => runInAsyncScope(() => asyncLocalStorage.getStore()))
console.log(result) // 返回 123

AsyncLocalStorage.snapshot() 可以替换 AsyncResource 用于简单的异步上下文跟踪目的,例如:

js
class Foo {
  #runInAsyncScope = AsyncLocalStorage.snapshot()

  get() {
    return this.#runInAsyncScope(() => asyncLocalStorage.getStore())
  }
}

const foo = asyncLocalStorage.run(123, () => new Foo())
console.log(asyncLocalStorage.run(321, () => foo.get())) // 返回 123

asyncLocalStorage.disable()

新增于:v13.10.0, v12.17.0

[稳定性:1 - 实验性]

稳定性:1 稳定性:1 - 实验性

禁用 AsyncLocalStorage 实例。所有后续对 asyncLocalStorage.getStore() 的调用都将返回 undefined,直到再次调用 asyncLocalStorage.run()asyncLocalStorage.enterWith()

调用 asyncLocalStorage.disable() 时,将退出与该实例关联的所有当前上下文。

asyncLocalStorage 可以被垃圾回收之前,需要调用 asyncLocalStorage.disable()。这并不适用于 asyncLocalStorage 提供的存储,因为这些对象将与相应的异步资源一起被垃圾回收。

asyncLocalStorage 在当前进程中不再使用时,使用此方法。

asyncLocalStorage.getStore()

新增于:v13.10.0, v12.17.0

返回当前存储。如果在通过调用 asyncLocalStorage.run()asyncLocalStorage.enterWith() 初始化的异步上下文之外调用,则返回 undefined

asyncLocalStorage.enterWith(store)

新增于: v13.11.0, v12.17.0

[稳定性: 1 - 实验性]

稳定性: 1 稳定性: 1 - 实验性

在当前同步执行的剩余部分切换到上下文,然后通过任何后续的异步调用持久化存储。

示例:

js
const store = { id: 1 }
// 使用给定的 store 对象替换之前的 store
asyncLocalStorage.enterWith(store)
asyncLocalStorage.getStore() // 返回 store 对象
someAsyncOperation(() => {
  asyncLocalStorage.getStore() // 返回相同的对象
})

此转换将持续整个同步执行。这意味着,例如,如果在事件处理程序中进入上下文,则后续的事件处理程序也将在此上下文中运行,除非使用 AsyncResource 特定绑定到另一个上下文。这就是为什么除非有充分理由使用后者方法,否则应优先使用 run() 的原因。

js
const store = { id: 1 }

emitter.on('my-event', () => {
  asyncLocalStorage.enterWith(store)
})
emitter.on('my-event', () => {
  asyncLocalStorage.getStore() // 返回相同的对象
})

asyncLocalStorage.getStore() // 返回 undefined
emitter.emit('my-event')
asyncLocalStorage.getStore() // 返回相同的对象

asyncLocalStorage.run(store, callback[, ...args])

新增于:v13.10.0, v12.17.0

在上下文内同步运行一个函数并返回其返回值。存储在回调函数外部是不可访问的。存储可被回调函数内创建的任何异步操作访问。

可选参数 args 将传递给回调函数。

如果回调函数抛出错误,run() 也会抛出该错误。堆栈跟踪不受此调用的影响,并且上下文将退出。

示例:

js
const store = { id: 2 }
try {
  asyncLocalStorage.run(store, () => {
    asyncLocalStorage.getStore() // 返回 store 对象
    setTimeout(() => {
      asyncLocalStorage.getStore() // 返回 store 对象
    }, 200)
    throw new Error()
  })
} catch (e) {
  asyncLocalStorage.getStore() // 返回 undefined
  // 错误将在此处被捕获
}

asyncLocalStorage.exit(callback[, ...args])

新增于:v13.10.0, v12.17.0

[稳定性:1 - 实验性]

稳定性:1 稳定性:1 - 实验性

在上下文之外同步运行一个函数并返回其返回值。在回调函数或回调函数中创建的异步操作中无法访问存储。在回调函数中执行的任何 getStore() 调用都将始终返回 undefined

可选参数 args 将传递给回调函数。

如果回调函数抛出错误,则 exit() 也会抛出该错误。此调用不会影响堆栈跟踪,并且上下文将重新进入。

示例:

js
// 在对 run 的调用中
try {
  asyncLocalStorage.getStore() // 返回存储对象或值
  asyncLocalStorage.exit(() => {
    asyncLocalStorage.getStore() // 返回 undefined
    throw new Error()
  })
} catch (e) {
  asyncLocalStorage.getStore() // 返回相同的对象或值
  // 此处将捕获错误
}

使用 async/await

如果在一个异步函数中,只有一个 await 调用在一个上下文中运行,则应使用以下模式:

js
async function fn() {
  await asyncLocalStorage.run(new Map(), () => {
    asyncLocalStorage.getStore().set('key', value)
    return foo() // foo 的返回值将被等待
  })
}

在这个例子中,存储区只在回调函数和 foo 调用的函数中可用。在 run 外部,调用 getStore 将返回 undefined

故障排除:上下文丢失

在大多数情况下,AsyncLocalStorage 可以正常工作。在少数情况下,当前存储区会在某个异步操作中丢失。

如果您的代码是基于回调的,只需使用 util.promisify() 将其转换为 Promise,它就可以开始与原生 Promise 一起工作。

如果您需要使用基于回调的 API 或您的代码假定自定义 thenable 实现,请使用 AsyncResource 类将异步操作与正确的执行上下文关联。通过在您怀疑导致丢失的调用之后记录 asyncLocalStorage.getStore() 的内容来查找导致上下文丢失的函数调用。当代码记录 undefined 时,最后调用的回调可能是导致上下文丢失的原因。

类: AsyncResource

[历史]

版本变更
v16.4.0AsyncResource 现在已稳定。之前,它是实验性的。

AsyncResource 类旨在被嵌入程序的异步资源扩展。使用它,用户可以轻松触发他们自己资源的生命周期事件。

init 钩子将在实例化 AsyncResource 时触发。

以下是 AsyncResource API 的概述。

js
import { AsyncResource, executionAsyncId } from 'node:async_hooks'

// AsyncResource() 旨在被扩展。实例化一个新的 AsyncResource() 也会触发 init。如果省略 triggerAsyncId,则使用 async_hook.executionAsyncId()。
const asyncResource = new AsyncResource(type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false })

// 在资源的执行上下文中运行函数。这将
// * 建立资源的上下文
// * 触发 AsyncHooks before 回调
// * 使用提供的参数调用提供的函数 `fn`
// * 触发 AsyncHooks after 回调
// * 恢复原始执行上下文
asyncResource.runInAsyncScope(fn, thisArg, ...args)

// 调用 AsyncHooks destroy 回调。
asyncResource.emitDestroy()

// 返回分配给 AsyncResource 实例的唯一 ID。
asyncResource.asyncId()

// 返回 AsyncResource 实例的触发器 ID。
asyncResource.triggerAsyncId()
js
const { AsyncResource, executionAsyncId } = require('node:async_hooks')

// AsyncResource() 旨在被扩展。实例化一个新的 AsyncResource() 也会触发 init。如果省略 triggerAsyncId,则使用 async_hook.executionAsyncId()。
const asyncResource = new AsyncResource(type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false })

// 在资源的执行上下文中运行函数。这将
// * 建立资源的上下文
// * 触发 AsyncHooks before 回调
// * 使用提供的参数调用提供的函数 `fn`
// * 触发 AsyncHooks after 回调
// * 恢复原始执行上下文
asyncResource.runInAsyncScope(fn, thisArg, ...args)

// 调用 AsyncHooks destroy 回调。
asyncResource.emitDestroy()

// 返回分配给 AsyncResource 实例的唯一 ID。
asyncResource.asyncId()

// 返回 AsyncResource 实例的触发器 ID。
asyncResource.triggerAsyncId()

new AsyncResource(type[, options])

  • type <字符串> 异步事件的类型。
  • options <对象>
    • triggerAsyncId <数字> 创建此异步事件的执行上下文的 ID。默认值: executionAsyncId()
    • requireManualDestroy <布尔值> 如果设置为 true,则在对象被垃圾回收时禁用 emitDestroy。通常不需要设置此值(即使手动调用 emitDestroy),除非检索资源的 asyncId 并使用它调用敏感 API 的 emitDestroy。如果设置为 false,则只有当至少有一个活动的 destroy 钩子时,垃圾回收上的 emitDestroy 调用才会发生。默认值: false

示例用法:

js
class DBQuery extends AsyncResource {
  constructor(db) {
    super('DBQuery')
    this.db = db
  }

  getInfo(query, callback) {
    this.db.get(query, (err, data) => {
      this.runInAsyncScope(callback, null, err, data)
    })
  }

  close() {
    this.db = null
    this.emitDestroy()
  }
}

静态方法: AsyncResource.bind(fn[, type[, thisArg]])

[历史]

版本变更
v20.0.0绑定函数中添加的 asyncResource 属性已弃用,将在未来版本中移除。
v17.8.0, v16.15.0thisArg 未定义时,默认值已更改为使用调用者的 this
v16.0.0添加了可选的 thisArg
v14.8.0, v12.19.0新增于:v14.8.0, v12.19.0
  • fn <函数> 要绑定到当前执行上下文的函数。
  • type <字符串> 与底层 AsyncResource 关联的可选名称。
  • thisArg <任意>

将给定函数绑定到当前执行上下文。

asyncResource.bind(fn[, thisArg])

[历史]

版本变更
v20.0.0绑定函数中添加的 asyncResource 属性已弃用,将在未来版本中移除。
v17.8.0, v16.15.0thisArg 未定义时,默认值已更改为使用调用者的 this
v16.0.0添加了可选的 thisArg
v14.8.0, v12.19.0新增于:v14.8.0, v12.19.0
  • fn <函数> 要绑定到当前 AsyncResource 作用域的函数。
  • thisArg <任意>

将给定函数绑定到此 AsyncResource 的作用域中执行。

asyncResource.runInAsyncScope(fn[, thisArg, ...args])

新增于: v9.6.0

  • fn <Function> 在此异步资源的执行上下文中调用的函数。
  • thisArg <any> 用于函数调用的接收者。
  • ...args <any> 传递给函数的可选参数。

在异步资源的执行上下文中使用提供的参数调用提供的函数。这将建立上下文,触发 AsyncHooks before 回调,调用函数,触发 AsyncHooks after 回调,然后恢复原始执行上下文。

asyncResource.emitDestroy()

调用所有 destroy hook。这只能调用一次。如果调用多次,则会抛出错误。这必须手动调用。如果资源被 GC 收集,则永远不会调用 destroy hook。

asyncResource.asyncId()

  • 返回值: <number> 分配给资源的唯一 asyncId

asyncResource.triggerAsyncId()

  • 返回值: <number> 传递给 AsyncResource 构造函数的相同 triggerAsyncId

使用 AsyncResourceWorker 线程池

以下示例展示了如何使用 AsyncResource 类为 Worker 池提供正确的异步跟踪。其他资源池,例如数据库连接池,可以遵循类似的模型。

假设任务是将两个数字相加,使用名为 task_processor.js 的文件,其内容如下:

js
import { parentPort } from 'node:worker_threads'
parentPort.on('message', task => {
  parentPort.postMessage(task.a + task.b)
})
js
const { parentPort } = require('node:worker_threads')
parentPort.on('message', task => {
  parentPort.postMessage(task.a + task.b)
})

围绕它构建的 Worker 池可以使用以下结构:

js
import { AsyncResource } from 'node:async_hooks'
import { EventEmitter } from 'node:events'
import { Worker } from 'node:worker_threads'

const kTaskInfo = Symbol('kTaskInfo')
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent')

class WorkerPoolTaskInfo extends AsyncResource {
  constructor(callback) {
    super('WorkerPoolTaskInfo')
    this.callback = callback
  }

  done(err, result) {
    this.runInAsyncScope(this.callback, null, err, result)
    this.emitDestroy() // `TaskInfo`s are used only once.
  }
}

export default class WorkerPool extends EventEmitter {
  constructor(numThreads) {
    super()
    this.numThreads = numThreads
    this.workers = []
    this.freeWorkers = []
    this.tasks = []

    for (let i = 0; i < numThreads; i++) this.addNewWorker()

    // Any time the kWorkerFreedEvent is emitted, dispatch
    // the next task pending in the queue, if any.
    this.on(kWorkerFreedEvent, () => {
      if (this.tasks.length > 0) {
        const { task, callback } = this.tasks.shift()
        this.runTask(task, callback)
      }
    })
  }

  addNewWorker() {
    const worker = new Worker(new URL('task_processor.js', import.meta.url))
    worker.on('message', result => {
      // In case of success: Call the callback that was passed to `runTask`,
      // remove the `TaskInfo` associated with the Worker, and mark it as free
      // again.
      worker[kTaskInfo].done(null, result)
      worker[kTaskInfo] = null
      this.freeWorkers.push(worker)
      this.emit(kWorkerFreedEvent)
    })
    worker.on('error', err => {
      // In case of an uncaught exception: Call the callback that was passed to
      // `runTask` with the error.
      if (worker[kTaskInfo]) worker[kTaskInfo].done(err, null)
      else this.emit('error', err)
      // Remove the worker from the list and start a new Worker to replace the
      // current one.
      this.workers.splice(this.workers.indexOf(worker), 1)
      this.addNewWorker()
    })
    this.workers.push(worker)
    this.freeWorkers.push(worker)
    this.emit(kWorkerFreedEvent)
  }

  runTask(task, callback) {
    if (this.freeWorkers.length === 0) {
      // No free threads, wait until a worker thread becomes free.
      this.tasks.push({ task, callback })
      return
    }

    const worker = this.freeWorkers.pop()
    worker[kTaskInfo] = new WorkerPoolTaskInfo(callback)
    worker.postMessage(task)
  }

  close() {
    for (const worker of this.workers) worker.terminate()
  }
}
js
const { AsyncResource } = require('node:async_hooks')
const { EventEmitter } = require('node:events')
const path = require('node:path')
const { Worker } = require('node:worker_threads')

const kTaskInfo = Symbol('kTaskInfo')
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent')

class WorkerPoolTaskInfo extends AsyncResource {
  constructor(callback) {
    super('WorkerPoolTaskInfo')
    this.callback = callback
  }

  done(err, result) {
    this.runInAsyncScope(this.callback, null, err, result)
    this.emitDestroy() // `TaskInfo`s are used only once.
  }
}

class WorkerPool extends EventEmitter {
  constructor(numThreads) {
    super()
    this.numThreads = numThreads
    this.workers = []
    this.freeWorkers = []
    this.tasks = []

    for (let i = 0; i < numThreads; i++) this.addNewWorker()

    // Any time the kWorkerFreedEvent is emitted, dispatch
    // the next task pending in the queue, if any.
    this.on(kWorkerFreedEvent, () => {
      if (this.tasks.length > 0) {
        const { task, callback } = this.tasks.shift()
        this.runTask(task, callback)
      }
    })
  }

  addNewWorker() {
    const worker = new Worker(path.resolve(__dirname, 'task_processor.js'))
    worker.on('message', result => {
      // In case of success: Call the callback that was passed to `runTask`,
      // remove the `TaskInfo` associated with the Worker, and mark it as free
      // again.
      worker[kTaskInfo].done(null, result)
      worker[kTaskInfo] = null
      this.freeWorkers.push(worker)
      this.emit(kWorkerFreedEvent)
    })
    worker.on('error', err => {
      // In case of an uncaught exception: Call the callback that was passed to
      // `runTask` with the error.
      if (worker[kTaskInfo]) worker[kTaskInfo].done(err, null)
      else this.emit('error', err)
      // Remove the worker from the list and start a new Worker to replace the
      // current one.
      this.workers.splice(this.workers.indexOf(worker), 1)
      this.addNewWorker()
    })
    this.workers.push(worker)
    this.freeWorkers.push(worker)
    this.emit(kWorkerFreedEvent)
  }

  runTask(task, callback) {
    if (this.freeWorkers.length === 0) {
      // No free threads, wait until a worker thread becomes free.
      this.tasks.push({ task, callback })
      return
    }

    const worker = this.freeWorkers.pop()
    worker[kTaskInfo] = new WorkerPoolTaskInfo(callback)
    worker.postMessage(task)
  }

  close() {
    for (const worker of this.workers) worker.terminate()
  }
}

module.exports = WorkerPool

如果没有 WorkerPoolTaskInfo 对象添加的显式跟踪,则回调似乎与各个 Worker 对象相关联。但是,Worker 的创建与任务的创建无关,并且不提供有关何时调度任务的信息。

此池可以使用如下方式:

js
import WorkerPool from './worker_pool.js'
import os from 'node:os'

const pool = new WorkerPool(os.availableParallelism())

let finished = 0
for (let i = 0; i < 10; i++) {
  pool.runTask({ a: 42, b: 100 }, (err, result) => {
    console.log(i, err, result)
    if (++finished === 10) pool.close()
  })
}
js
const WorkerPool = require('./worker_pool.js')
const os = require('node:os')

const pool = new WorkerPool(os.availableParallelism())

let finished = 0
for (let i = 0; i < 10; i++) {
  pool.runTask({ a: 42, b: 100 }, (err, result) => {
    console.log(i, err, result)
    if (++finished === 10) pool.close()
  })
}

AsyncResourceEventEmitter 集成

EventEmitter 触发的事件监听器可能在与调用 eventEmitter.on() 时活动的执行上下文不同的上下文中运行。

以下示例显示了如何使用 AsyncResource 类将事件监听器与正确的执行上下文正确关联。相同的方法可以应用于 Stream 或类似的事件驱动类。

js
import { createServer } from 'node:http'
import { AsyncResource, executionAsyncId } from 'node:async_hooks'

const server = createServer((req, res) => {
  req.on(
    'close',
    AsyncResource.bind(() => {
      // 执行上下文绑定到当前外部作用域。
    })
  )
  req.on('close', () => {
    // 执行上下文绑定到导致发出 'close' 的作用域。
  })
  res.end()
}).listen(3000)
js
const { createServer } = require('node:http')
const { AsyncResource, executionAsyncId } = require('node:async_hooks')

const server = createServer((req, res) => {
  req.on(
    'close',
    AsyncResource.bind(() => {
      // 执行上下文绑定到当前外部作用域。
    })
  )
  req.on('close', () => {
    // 执行上下文绑定到导致发出 'close' 的作用域。
  })
  res.end()
}).listen(3000)