Skip to content

Asynchrone Kontextverfolgung

[Stabil: 2 - Stabil]

Stabil: 2 Stabilität: 2 - Stabil

Quellcode: lib/async_hooks.js

Einleitung

Diese Klassen werden verwendet, um Zustände zuzuordnen und diese über Callbacks und Promise-Ketten hinweg zu propagieren. Sie ermöglichen die Speicherung von Daten während der gesamten Lebensdauer einer Webanforderung oder einer anderen asynchronen Dauer. Es ähnelt dem threadlokalen Speicher in anderen Programmiersprachen.

Die Klassen AsyncLocalStorage und AsyncResource sind Teil des Moduls node:async_hooks:

js
import { AsyncLocalStorage, AsyncResource } from 'node:async_hooks'
js
const { AsyncLocalStorage, AsyncResource } = require('node:async_hooks')

Klasse: AsyncLocalStorage

[Historie]

VersionÄnderungen
v16.4.0AsyncLocalStorage ist jetzt stabil. Zuvor war es experimentell.
v13.10.0, v12.17.0Hinzugefügt in: v13.10.0, v12.17.0

Diese Klasse erstellt Speicher, die über asynchrone Operationen hinweg kohärent bleiben.

Obwohl Sie Ihre eigene Implementierung auf Basis des Moduls node:async_hooks erstellen können, sollte AsyncLocalStorage bevorzugt werden, da es sich um eine performante und speichersichere Implementierung handelt, die erhebliche Optimierungen beinhaltet, die nicht offensichtlich zu implementieren sind.

Das folgende Beispiel verwendet AsyncLocalStorage, um einen einfachen Logger zu erstellen, der eingehenden HTTP-Anfragen IDs zuweist und diese in Nachrichten einschließt, die innerhalb jeder Anfrage protokolliert werden.

js
import http from 'node:http'
import { AsyncLocalStorage } from 'node:async_hooks'

const asyncLocalStorage = new AsyncLocalStorage()

function logWithId(msg) {
  const id = asyncLocalStorage.getStore()
  console.log(`${id !== undefined ? id : '-'}:`, msg)
}

let idSeq = 0
http
  .createServer((req, res) => {
    asyncLocalStorage.run(idSeq++, () => {
      logWithId('start')
      // Stellen Sie sich hier eine beliebige Kette asynchroner Operationen vor
      setImmediate(() => {
        logWithId('finish')
        res.end()
      })
    })
  })
  .listen(8080)

http.get('http://localhost:8080')
http.get('http://localhost:8080')
// Gibt aus:
//   0: start
//   1: start
//   0: finish
//   1: finish
js
const http = require('node:http')
const { AsyncLocalStorage } = require('node:async_hooks')

const asyncLocalStorage = new AsyncLocalStorage()

function logWithId(msg) {
  const id = asyncLocalStorage.getStore()
  console.log(`${id !== undefined ? id : '-'}:`, msg)
}

let idSeq = 0
http
  .createServer((req, res) => {
    asyncLocalStorage.run(idSeq++, () => {
      logWithId('start')
      // Stellen Sie sich hier eine beliebige Kette asynchroner Operationen vor
      setImmediate(() => {
        logWithId('finish')
        res.end()
      })
    })
  })
  .listen(8080)

http.get('http://localhost:8080')
http.get('http://localhost:8080')
// Gibt aus:
//   0: start
//   1: start
//   0: finish
//   1: finish

Jede Instanz von AsyncLocalStorage verwaltet einen unabhängigen Speicherkontext. Mehrere Instanzen können gleichzeitig sicher existieren, ohne dass das Risiko besteht, dass sie die Daten der anderen beeinträchtigen.

new AsyncLocalStorage()

[Verlauf]

VersionÄnderungen
v19.7.0, v18.16.0Option onPropagate entfernt.
v19.2.0, v18.13.0Option onPropagate hinzugefügt.
v13.10.0, v12.17.0Hinzugefügt in: v13.10.0, v12.17.0

Erzeugt eine neue Instanz von AsyncLocalStorage. Der Speicher wird nur innerhalb eines run()-Aufrufs oder nach einem enterWith()-Aufruf bereitgestellt.

Statische Methode: AsyncLocalStorage.bind(fn)

Hinzugefügt in: v19.8.0, v18.16.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

  • fn <Funktion> Die an den aktuellen Ausführungskontext zu bindende Funktion.
  • Rückgabewert: <Funktion> Eine neue Funktion, die fn innerhalb des erfassten Ausführungskontexts aufruft.

Bindet die gegebene Funktion an den aktuellen Ausführungskontext.

Statische Methode: AsyncLocalStorage.snapshot()

Hinzugefügt in: v19.8.0, v18.16.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

  • Rückgabewert: <Funktion> Eine neue Funktion mit der Signatur (fn: (...args) : R, ...args) : R.

Erfasst den aktuellen Ausführungskontext und gibt eine Funktion zurück, die eine Funktion als Argument akzeptiert. Wann immer die zurückgegebene Funktion aufgerufen wird, ruft sie die ihr übergebene Funktion innerhalb des erfassten Kontexts auf.

js
const asyncLocalStorage = new AsyncLocalStorage()
const runInAsyncScope = asyncLocalStorage.run(123, () => AsyncLocalStorage.snapshot())
const result = asyncLocalStorage.run(321, () => runInAsyncScope(() => asyncLocalStorage.getStore()))
console.log(result) // gibt 123 zurück

AsyncLocalStorage.snapshot() kann die Verwendung von AsyncResource für einfache Zwecke der asynchronen Kontextverfolgung ersetzen, zum Beispiel:

js
class Foo {
  #runInAsyncScope = AsyncLocalStorage.snapshot()

  get() {
    return this.#runInAsyncScope(() => asyncLocalStorage.getStore())
  }
}

const foo = asyncLocalStorage.run(123, () => new Foo())
console.log(asyncLocalStorage.run(321, () => foo.get())) // gibt 123 zurück

asyncLocalStorage.disable()

Hinzugefügt in: v13.10.0, v12.17.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

Deaktiviert die Instanz von AsyncLocalStorage. Alle nachfolgenden Aufrufe von asyncLocalStorage.getStore() geben undefined zurück, bis asyncLocalStorage.run() oder asyncLocalStorage.enterWith() erneut aufgerufen wird.

Beim Aufruf von asyncLocalStorage.disable() werden alle aktuellen mit der Instanz verknüpften Kontexte beendet.

Der Aufruf von asyncLocalStorage.disable() ist erforderlich, bevor der asyncLocalStorage garbage collected werden kann. Dies gilt nicht für Stores, die von asyncLocalStorage bereitgestellt werden, da diese Objekte zusammen mit den entsprechenden asynchronen Ressourcen garbage collected werden.

Verwenden Sie diese Methode, wenn asyncLocalStorage im aktuellen Prozess nicht mehr verwendet wird.

asyncLocalStorage.getStore()

Hinzugefügt in: v13.10.0, v12.17.0

Gibt den aktuellen Store zurück. Wenn außerhalb eines asynchronen Kontexts aufgerufen wird, der durch Aufrufen von asyncLocalStorage.run() oder asyncLocalStorage.enterWith() initialisiert wurde, gibt er undefined zurück.

asyncLocalStorage.enterWith(store)

Hinzugefügt in: v13.11.0, v12.17.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

Wechselt für den Rest der aktuellen synchronen Ausführung in den Kontext und speichert dann den Store über alle folgenden asynchronen Aufrufe hinweg.

Beispiel:

js
const store = { id: 1 }
// Ersetzt den vorherigen Store durch das gegebene Store-Objekt
asyncLocalStorage.enterWith(store)
asyncLocalStorage.getStore() // Gibt das Store-Objekt zurück
someAsyncOperation(() => {
  asyncLocalStorage.getStore() // Gibt dasselbe Objekt zurück
})

Dieser Übergang wird für die gesamte synchrone Ausführung fortgesetzt. Das bedeutet, dass, wenn der Kontext beispielsweise innerhalb eines Ereignis-Handlers eingegeben wird, nachfolgende Ereignis-Handler auch innerhalb dieses Kontexts ausgeführt werden, es sei denn, sie sind speziell mit einer AsyncResource an einen anderen Kontext gebunden. Aus diesem Grund sollte run() enterWith() vorgezogen werden, es sei denn, es gibt triftige Gründe, die letztere Methode zu verwenden.

js
const store = { id: 1 }

emitter.on('my-event', () => {
  asyncLocalStorage.enterWith(store)
})
emitter.on('my-event', () => {
  asyncLocalStorage.getStore() // Gibt dasselbe Objekt zurück
})

asyncLocalStorage.getStore() // Gibt undefined zurück
emitter.emit('my-event')
asyncLocalStorage.getStore() // Gibt dasselbe Objekt zurück

asyncLocalStorage.run(store, callback[, ...args])

Hinzugefügt in: v13.10.0, v12.17.0

Führt eine Funktion synchron innerhalb eines Kontexts aus und gibt deren Rückgabewert zurück. Der Store ist außerhalb der Callback-Funktion nicht zugänglich. Der Store ist für alle innerhalb des Callbacks erstellten asynchronen Operationen zugänglich.

Die optionalen args werden an die Callback-Funktion übergeben.

Wenn die Callback-Funktion einen Fehler auslöst, wird der Fehler auch von run() ausgelöst. Der Stacktrace wird durch diesen Aufruf nicht beeinflusst und der Kontext wird verlassen.

Beispiel:

js
const store = { id: 2 }
try {
  asyncLocalStorage.run(store, () => {
    asyncLocalStorage.getStore() // Gibt das Store-Objekt zurück
    setTimeout(() => {
      asyncLocalStorage.getStore() // Gibt das Store-Objekt zurück
    }, 200)
    throw new Error()
  })
} catch (e) {
  asyncLocalStorage.getStore() // Gibt undefined zurück
  // Der Fehler wird hier abgefangen
}

asyncLocalStorage.exit(callback[, ...args])

Hinzugefügt in: v13.10.0, v12.17.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

Führt eine Funktion synchron außerhalb eines Kontexts aus und gibt deren Rückgabewert zurück. Der Store ist innerhalb der Callback-Funktion oder der innerhalb des Callbacks erstellten asynchronen Operationen nicht zugänglich. Jeder getStore()-Aufruf innerhalb der Callback-Funktion gibt immer undefined zurück.

Die optionalen args werden an die Callback-Funktion übergeben.

Wenn die Callback-Funktion einen Fehler auslöst, wird der Fehler auch von exit() ausgelöst. Der Stacktrace wird durch diesen Aufruf nicht beeinflusst und der Kontext wird wieder betreten.

Beispiel:

js
// Innerhalb eines Aufrufs von run
try {
  asyncLocalStorage.getStore() // Gibt das Store-Objekt oder den Wert zurück
  asyncLocalStorage.exit(() => {
    asyncLocalStorage.getStore() // Gibt undefined zurück
    throw new Error()
  })
} catch (e) {
  asyncLocalStorage.getStore() // Gibt dasselbe Objekt oder denselben Wert zurück
  // Der Fehler wird hier abgefangen
}

Verwendung mit async/await

Wenn innerhalb einer asynchronen Funktion nur ein await-Aufruf innerhalb eines Kontextes ausgeführt werden soll, sollte folgendes Muster verwendet werden:

js
async function fn() {
  await asyncLocalStorage.run(new Map(), () => {
    asyncLocalStorage.getStore().set('key', value)
    return foo() // Der Rückgabewert von foo wird gewartet
  })
}

In diesem Beispiel ist der Store nur in der Callback-Funktion und den von foo aufgerufenen Funktionen verfügbar. Außerhalb von run liefert der Aufruf von getStore undefined.

Fehlerbehebung: Kontextverlust

In den meisten Fällen funktioniert AsyncLocalStorage problemlos. In seltenen Fällen geht der aktuelle Store in einer der asynchronen Operationen verloren.

Wenn Ihr Code callback-basiert ist, reicht es aus, ihn mit util.promisify() zu promisifizieren, damit er mit nativen Promises funktioniert.

Wenn Sie eine callback-basierte API verwenden müssen oder Ihr Code eine benutzerdefinierte Thenable-Implementierung voraussetzt, verwenden Sie die Klasse AsyncResource, um die asynchrone Operation dem richtigen Ausführungskontext zuzuordnen. Finden Sie den Funktionsaufruf, der für den Kontextverlust verantwortlich ist, indem Sie den Inhalt von asyncLocalStorage.getStore() nach den Aufrufen protokollieren, von denen Sie vermuten, dass sie für den Verlust verantwortlich sind. Wenn der Code undefined protokolliert, ist der zuletzt aufgerufene Callback wahrscheinlich für den Kontextverlust verantwortlich.

Klasse: AsyncResource

[Versionsgeschichte]

VersionÄnderungen
v16.4.0AsyncResource ist jetzt stabil. Zuvor war es experimentell.

Die Klasse AsyncResource ist so konzipiert, dass sie von den asynchronen Ressourcen des Embedders erweitert werden kann. Damit können Benutzer die Lebenszyklusereignisse ihrer eigenen Ressourcen einfach auslösen.

Der init-Hook wird ausgelöst, wenn eine AsyncResource instanziiert wird.

Im Folgenden finden Sie eine Übersicht über die AsyncResource-API.

js
import { AsyncResource, executionAsyncId } from 'node:async_hooks'

// AsyncResource() soll erweitert werden. Das Instanziieren einer
// neuen AsyncResource() löst auch init aus. Wenn triggerAsyncId weggelassen wird, dann
// wird async_hook.executionAsyncId() verwendet.
const asyncResource = new AsyncResource(type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false })

// Führt eine Funktion im Ausführungskontext der Ressource aus. Dies wird
// * den Kontext der Ressource etablieren
// * die AsyncHooks-Callbacks vor dem Aufruf auslösen
// * die bereitgestellte Funktion `fn` mit den angegebenen Argumenten aufrufen
// * die AsyncHooks-Callbacks nach dem Aufruf auslösen
// * den ursprünglichen Ausführungskontext wiederherstellen
asyncResource.runInAsyncScope(fn, thisArg, ...args)

// Ruft AsyncHooks-destroy-Callbacks auf.
asyncResource.emitDestroy()

// Gibt die eindeutige ID zurück, die der AsyncResource-Instanz zugewiesen wurde.
asyncResource.asyncId()

// Gibt die Trigger-ID für die AsyncResource-Instanz zurück.
asyncResource.triggerAsyncId()
js
const { AsyncResource, executionAsyncId } = require('node:async_hooks')

// AsyncResource() soll erweitert werden. Das Instanziieren einer
// neuen AsyncResource() löst auch init aus. Wenn triggerAsyncId weggelassen wird, dann
// wird async_hook.executionAsyncId() verwendet.
const asyncResource = new AsyncResource(type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false })

// Führt eine Funktion im Ausführungskontext der Ressource aus. Dies wird
// * den Kontext der Ressource etablieren
// * die AsyncHooks-Callbacks vor dem Aufruf auslösen
// * die bereitgestellte Funktion `fn` mit den angegebenen Argumenten aufrufen
// * die AsyncHooks-Callbacks nach dem Aufruf auslösen
// * den ursprünglichen Ausführungskontext wiederherstellen
asyncResource.runInAsyncScope(fn, thisArg, ...args)

// Ruft AsyncHooks-destroy-Callbacks auf.
asyncResource.emitDestroy()

// Gibt die eindeutige ID zurück, die der AsyncResource-Instanz zugewiesen wurde.
asyncResource.asyncId()

// Gibt die Trigger-ID für die AsyncResource-Instanz zurück.
asyncResource.triggerAsyncId()

new AsyncResource(type[, options])

  • type <string> Der Typ des asynchronen Ereignisses.
  • options <Object>
    • triggerAsyncId <number> Die ID des Ausführungskontexts, der dieses asynchrone Ereignis erstellt hat. Standardwert: executionAsyncId().
    • requireManualDestroy <boolean> Wenn auf true gesetzt, deaktiviert emitDestroy, wenn das Objekt garbage collected wird. Dies muss normalerweise nicht gesetzt werden (auch wenn emitDestroy manuell aufgerufen wird), es sei denn, die asyncId der Ressource wird abgerufen und die emitDestroy der sensiblen API wird damit aufgerufen. Wenn auf false gesetzt, findet der emitDestroy-Aufruf beim Garbage Collecting nur statt, wenn mindestens ein aktiver destroy-Hook vorhanden ist. Standardwert: false.

Beispielhafte Verwendung:

js
class DBQuery extends AsyncResource {
  constructor(db) {
    super('DBQuery')
    this.db = db
  }

  getInfo(query, callback) {
    this.db.get(query, (err, data) => {
      this.runInAsyncScope(callback, null, err, data)
    })
  }

  close() {
    this.db = null
    this.emitDestroy()
  }
}

Statische Methode: AsyncResource.bind(fn[, type[, thisArg]])

[Historie]

VersionÄnderungen
v20.0.0Die Eigenschaft asyncResource, die der gebundenen Funktion hinzugefügt wurde, ist veraltet und wird in einer zukünftigen Version entfernt.
v17.8.0, v16.15.0Der Standardwert, wenn thisArg undefiniert ist, wurde geändert, um this vom Aufrufer zu verwenden.
v16.0.0Optionales thisArg hinzugefügt.
v14.8.0, v12.19.0Hinzugefügt in: v14.8.0, v12.19.0
  • fn <Function> Die Funktion, die an den aktuellen Ausführungskontext gebunden werden soll.
  • type <string> Ein optionaler Name, der der zugrunde liegenden AsyncResource zugeordnet werden soll.
  • thisArg <any>

Bindet die gegebene Funktion an den aktuellen Ausführungskontext.

asyncResource.bind(fn[, thisArg])

[Historie]

VersionÄnderungen
v20.0.0Die asyncResource-Eigenschaft, die der gebundenen Funktion hinzugefügt wurde, ist veraltet und wird in einer zukünftigen Version entfernt.
v17.8.0, v16.15.0Der Standardwert, wenn thisArg undefiniert ist, wurde geändert, um this vom Aufrufer zu verwenden.
v16.0.0Optionales thisArg hinzugefügt.
v14.8.0, v12.19.0Hinzugefügt in: v14.8.0, v12.19.0
  • fn <Function> Die Funktion, die an den aktuellen AsyncResource gebunden werden soll.
  • thisArg <any>

Bindet die gegebene Funktion an den Scope dieser AsyncResource zur Ausführung.

asyncResource.runInAsyncScope(fn[, thisArg, ...args])

Hinzugefügt in: v9.6.0

  • fn <Function> Die Funktion, die im Ausführungskontext dieser asynchronen Ressource aufgerufen werden soll.
  • thisArg <any> Der Empfänger, der für den Funktionsaufruf verwendet werden soll.
  • ...args <any> Optionale Argumente, die an die Funktion übergeben werden sollen.

Ruft die bereitgestellte Funktion mit den bereitgestellten Argumenten im Ausführungskontext der asynchronen Ressource auf. Dies wird den Kontext erstellen, die AsyncHooks vor den Callbacks auslösen, die Funktion aufrufen, die AsyncHooks nach den Callbacks auslösen und dann den ursprünglichen Ausführungskontext wiederherstellen.

asyncResource.emitDestroy()

Ruft alle destroy-Hooks auf. Dies sollte nur einmal aufgerufen werden. Ein Fehler wird ausgegeben, wenn es mehr als einmal aufgerufen wird. Dies muss manuell aufgerufen werden. Wenn die Ressource vom Garbage Collector gesammelt wird, werden die destroy-Hooks niemals aufgerufen.

asyncResource.asyncId()

  • Rückgabewert: <number> Die eindeutige asyncId, die der Ressource zugewiesen ist.

asyncResource.triggerAsyncId()

  • Rückgabewert: <number> Die gleiche triggerAsyncId, die an den AsyncResource-Konstruktor übergeben wird.

Verwendung von AsyncResource für einen Worker-Thread-Pool

Das folgende Beispiel zeigt, wie man die Klasse AsyncResource verwendet, um eine korrekte asynchrone Nachverfolgung für einen Worker-Pool bereitzustellen. Andere Ressourcenpools, wie z. B. Datenbankverbindungspools, können einem ähnlichen Modell folgen.

Angenommen, die Aufgabe besteht darin, zwei Zahlen zu addieren, wobei eine Datei namens task_processor.js mit folgendem Inhalt verwendet wird:

js
import { parentPort } from 'node:worker_threads'
parentPort.on('message', task => {
  parentPort.postMessage(task.a + task.b)
})
js
const { parentPort } = require('node:worker_threads')
parentPort.on('message', task => {
  parentPort.postMessage(task.a + task.b)
})

Ein Worker-Pool darum herum könnte die folgende Struktur verwenden:

js
import { AsyncResource } from 'node:async_hooks'
import { EventEmitter } from 'node:events'
import { Worker } from 'node:worker_threads'

const kTaskInfo = Symbol('kTaskInfo')
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent')

class WorkerPoolTaskInfo extends AsyncResource {
  constructor(callback) {
    super('WorkerPoolTaskInfo')
    this.callback = callback
  }

  done(err, result) {
    this.runInAsyncScope(this.callback, null, err, result)
    this.emitDestroy() // `TaskInfo`s are used only once.
  }
}

export default class WorkerPool extends EventEmitter {
  constructor(numThreads) {
    super()
    this.numThreads = numThreads
    this.workers = []
    this.freeWorkers = []
    this.tasks = []

    for (let i = 0; i < numThreads; i++) this.addNewWorker()

    // Any time the kWorkerFreedEvent is emitted, dispatch
    // the next task pending in the queue, if any.
    this.on(kWorkerFreedEvent, () => {
      if (this.tasks.length > 0) {
        const { task, callback } = this.tasks.shift()
        this.runTask(task, callback)
      }
    })
  }

  addNewWorker() {
    const worker = new Worker(new URL('task_processor.js', import.meta.url))
    worker.on('message', result => {
      // In case of success: Call the callback that was passed to `runTask`,
      // remove the `TaskInfo` associated with the Worker, and mark it as free
      // again.
      worker[kTaskInfo].done(null, result)
      worker[kTaskInfo] = null
      this.freeWorkers.push(worker)
      this.emit(kWorkerFreedEvent)
    })
    worker.on('error', err => {
      // In case of an uncaught exception: Call the callback that was passed to
      // `runTask` with the error.
      if (worker[kTaskInfo]) worker[kTaskInfo].done(err, null)
      else this.emit('error', err)
      // Remove the worker from the list and start a new Worker to replace the
      // current one.
      this.workers.splice(this.workers.indexOf(worker), 1)
      this.addNewWorker()
    })
    this.workers.push(worker)
    this.freeWorkers.push(worker)
    this.emit(kWorkerFreedEvent)
  }

  runTask(task, callback) {
    if (this.freeWorkers.length === 0) {
      // No free threads, wait until a worker thread becomes free.
      this.tasks.push({ task, callback })
      return
    }

    const worker = this.freeWorkers.pop()
    worker[kTaskInfo] = new WorkerPoolTaskInfo(callback)
    worker.postMessage(task)
  }

  close() {
    for (const worker of this.workers) worker.terminate()
  }
}
js
const { AsyncResource } = require('node:async_hooks')
const { EventEmitter } = require('node:events')
const path = require('node:path')
const { Worker } = require('node:worker_threads')

const kTaskInfo = Symbol('kTaskInfo')
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent')

class WorkerPoolTaskInfo extends AsyncResource {
  constructor(callback) {
    super('WorkerPoolTaskInfo')
    this.callback = callback
  }

  done(err, result) {
    this.runInAsyncScope(this.callback, null, err, result)
    this.emitDestroy() // `TaskInfo`s are used only once.
  }
}

class WorkerPool extends EventEmitter {
  constructor(numThreads) {
    super()
    this.numThreads = numThreads
    this.workers = []
    this.freeWorkers = []
    this.tasks = []

    for (let i = 0; i < numThreads; i++) this.addNewWorker()

    // Any time the kWorkerFreedEvent is emitted, dispatch
    // the next task pending in the queue, if any.
    this.on(kWorkerFreedEvent, () => {
      if (this.tasks.length > 0) {
        const { task, callback } = this.tasks.shift()
        this.runTask(task, callback)
      }
    })
  }

  addNewWorker() {
    const worker = new Worker(path.resolve(__dirname, 'task_processor.js'))
    worker.on('message', result => {
      // In case of success: Call the callback that was passed to `runTask`,
      // remove the `TaskInfo` associated with the Worker, and mark it as free
      // again.
      worker[kTaskInfo].done(null, result)
      worker[kTaskInfo] = null
      this.freeWorkers.push(worker)
      this.emit(kWorkerFreedEvent)
    })
    worker.on('error', err => {
      // In case of an uncaught exception: Call the callback that was passed to
      // `runTask` with the error.
      if (worker[kTaskInfo]) worker[kTaskInfo].done(err, null)
      else this.emit('error', err)
      // Remove the worker from the list and start a new Worker to replace the
      // current one.
      this.workers.splice(this.workers.indexOf(worker), 1)
      this.addNewWorker()
    })
    this.workers.push(worker)
    this.freeWorkers.push(worker)
    this.emit(kWorkerFreedEvent)
  }

  runTask(task, callback) {
    if (this.freeWorkers.length === 0) {
      // No free threads, wait until a worker thread becomes free.
      this.tasks.push({ task, callback })
      return
    }

    const worker = this.freeWorkers.pop()
    worker[kTaskInfo] = new WorkerPoolTaskInfo(callback)
    worker.postMessage(task)
  }

  close() {
    for (const worker of this.workers) worker.terminate()
  }
}

module.exports = WorkerPool

Ohne die explizite Verfolgung, die durch die WorkerPoolTaskInfo-Objekte hinzugefügt wird, würde es so aussehen, als wären die Callbacks den einzelnen Worker-Objekten zugeordnet. Die Erstellung der Worker ist jedoch nicht mit der Erstellung der Aufgaben verbunden und liefert keine Informationen darüber, wann Aufgaben geplant wurden.

Dieser Pool könnte wie folgt verwendet werden:

js
import WorkerPool from './worker_pool.js'
import os from 'node:os'

const pool = new WorkerPool(os.availableParallelism())

let finished = 0
for (let i = 0; i < 10; i++) {
  pool.runTask({ a: 42, b: 100 }, (err, result) => {
    console.log(i, err, result)
    if (++finished === 10) pool.close()
  })
}
js
const WorkerPool = require('./worker_pool.js')
const os = require('node:os')

const pool = new WorkerPool(os.availableParallelism())

let finished = 0
for (let i = 0; i < 10; i++) {
  pool.runTask({ a: 42, b: 100 }, (err, result) => {
    console.log(i, err, result)
    if (++finished === 10) pool.close()
  })
}

Integration von AsyncResource mit EventEmitter

Ereignislistener, die von einem EventEmitter ausgelöst werden, werden möglicherweise in einem anderen Ausführungskontext ausgeführt als dem, der aktiv war, als eventEmitter.on() aufgerufen wurde.

Das folgende Beispiel zeigt, wie die Klasse AsyncResource verwendet wird, um einen Ereignislistener korrekt dem richtigen Ausführungskontext zuzuordnen. Der gleiche Ansatz kann auf einen Stream oder eine ähnliche ereignisgesteuerte Klasse angewendet werden.

js
import { createServer } from 'node:http'
import { AsyncResource, executionAsyncId } from 'node:async_hooks'

const server = createServer((req, res) => {
  req.on(
    'close',
    AsyncResource.bind(() => {
      // Ausführungskontext ist an den aktuellen äußeren Bereich gebunden.
    })
  )
  req.on('close', () => {
    // Ausführungskontext ist an den Bereich gebunden, der das Auslösen von 'close' verursacht hat.
  })
  res.end()
}).listen(3000)
js
const { createServer } = require('node:http')
const { AsyncResource, executionAsyncId } = require('node:async_hooks')

const server = createServer((req, res) => {
  req.on(
    'close',
    AsyncResource.bind(() => {
      // Ausführungskontext ist an den aktuellen äußeren Bereich gebunden.
    })
  )
  req.on('close', () => {
    // Ausführungskontext ist an den Bereich gebunden, der das Auslösen von 'close' verursacht hat.
  })
  res.end()
}).listen(3000)