Asynchrone Kontextverfolgung
[Stabil: 2 - Stabil]
Stabil: 2 Stabilität: 2 - Stabil
Quellcode: lib/async_hooks.js
Einleitung
Diese Klassen werden verwendet, um Zustände zuzuordnen und diese über Callbacks und Promise-Ketten hinweg zu propagieren. Sie ermöglichen die Speicherung von Daten während der gesamten Lebensdauer einer Webanforderung oder einer anderen asynchronen Dauer. Es ähnelt dem threadlokalen Speicher in anderen Programmiersprachen.
Die Klassen AsyncLocalStorage
und AsyncResource
sind Teil des Moduls node:async_hooks
:
import { AsyncLocalStorage, AsyncResource } from 'node:async_hooks'
const { AsyncLocalStorage, AsyncResource } = require('node:async_hooks')
Klasse: AsyncLocalStorage
[Historie]
Version | Änderungen |
---|---|
v16.4.0 | AsyncLocalStorage ist jetzt stabil. Zuvor war es experimentell. |
v13.10.0, v12.17.0 | Hinzugefügt in: v13.10.0, v12.17.0 |
Diese Klasse erstellt Speicher, die über asynchrone Operationen hinweg kohärent bleiben.
Obwohl Sie Ihre eigene Implementierung auf Basis des Moduls node:async_hooks
erstellen können, sollte AsyncLocalStorage
bevorzugt werden, da es sich um eine performante und speichersichere Implementierung handelt, die erhebliche Optimierungen beinhaltet, die nicht offensichtlich zu implementieren sind.
Das folgende Beispiel verwendet AsyncLocalStorage
, um einen einfachen Logger zu erstellen, der eingehenden HTTP-Anfragen IDs zuweist und diese in Nachrichten einschließt, die innerhalb jeder Anfrage protokolliert werden.
import http from 'node:http'
import { AsyncLocalStorage } from 'node:async_hooks'
const asyncLocalStorage = new AsyncLocalStorage()
function logWithId(msg) {
const id = asyncLocalStorage.getStore()
console.log(`${id !== undefined ? id : '-'}:`, msg)
}
let idSeq = 0
http
.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start')
// Stellen Sie sich hier eine beliebige Kette asynchroner Operationen vor
setImmediate(() => {
logWithId('finish')
res.end()
})
})
})
.listen(8080)
http.get('http://localhost:8080')
http.get('http://localhost:8080')
// Gibt aus:
// 0: start
// 1: start
// 0: finish
// 1: finish
const http = require('node:http')
const { AsyncLocalStorage } = require('node:async_hooks')
const asyncLocalStorage = new AsyncLocalStorage()
function logWithId(msg) {
const id = asyncLocalStorage.getStore()
console.log(`${id !== undefined ? id : '-'}:`, msg)
}
let idSeq = 0
http
.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start')
// Stellen Sie sich hier eine beliebige Kette asynchroner Operationen vor
setImmediate(() => {
logWithId('finish')
res.end()
})
})
})
.listen(8080)
http.get('http://localhost:8080')
http.get('http://localhost:8080')
// Gibt aus:
// 0: start
// 1: start
// 0: finish
// 1: finish
Jede Instanz von AsyncLocalStorage
verwaltet einen unabhängigen Speicherkontext. Mehrere Instanzen können gleichzeitig sicher existieren, ohne dass das Risiko besteht, dass sie die Daten der anderen beeinträchtigen.
new AsyncLocalStorage()
[Verlauf]
Version | Änderungen |
---|---|
v19.7.0, v18.16.0 | Option onPropagate entfernt. |
v19.2.0, v18.13.0 | Option onPropagate hinzugefügt. |
v13.10.0, v12.17.0 | Hinzugefügt in: v13.10.0, v12.17.0 |
Erzeugt eine neue Instanz von AsyncLocalStorage
. Der Speicher wird nur innerhalb eines run()
-Aufrufs oder nach einem enterWith()
-Aufruf bereitgestellt.
Statische Methode: AsyncLocalStorage.bind(fn)
Hinzugefügt in: v19.8.0, v18.16.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
fn
<Funktion> Die an den aktuellen Ausführungskontext zu bindende Funktion.- Rückgabewert: <Funktion> Eine neue Funktion, die
fn
innerhalb des erfassten Ausführungskontexts aufruft.
Bindet die gegebene Funktion an den aktuellen Ausführungskontext.
Statische Methode: AsyncLocalStorage.snapshot()
Hinzugefügt in: v19.8.0, v18.16.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
- Rückgabewert: <Funktion> Eine neue Funktion mit der Signatur
(fn: (...args) : R, ...args) : R
.
Erfasst den aktuellen Ausführungskontext und gibt eine Funktion zurück, die eine Funktion als Argument akzeptiert. Wann immer die zurückgegebene Funktion aufgerufen wird, ruft sie die ihr übergebene Funktion innerhalb des erfassten Kontexts auf.
const asyncLocalStorage = new AsyncLocalStorage()
const runInAsyncScope = asyncLocalStorage.run(123, () => AsyncLocalStorage.snapshot())
const result = asyncLocalStorage.run(321, () => runInAsyncScope(() => asyncLocalStorage.getStore()))
console.log(result) // gibt 123 zurück
AsyncLocalStorage.snapshot() kann die Verwendung von AsyncResource für einfache Zwecke der asynchronen Kontextverfolgung ersetzen, zum Beispiel:
class Foo {
#runInAsyncScope = AsyncLocalStorage.snapshot()
get() {
return this.#runInAsyncScope(() => asyncLocalStorage.getStore())
}
}
const foo = asyncLocalStorage.run(123, () => new Foo())
console.log(asyncLocalStorage.run(321, () => foo.get())) // gibt 123 zurück
asyncLocalStorage.disable()
Hinzugefügt in: v13.10.0, v12.17.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
Deaktiviert die Instanz von AsyncLocalStorage
. Alle nachfolgenden Aufrufe von asyncLocalStorage.getStore()
geben undefined
zurück, bis asyncLocalStorage.run()
oder asyncLocalStorage.enterWith()
erneut aufgerufen wird.
Beim Aufruf von asyncLocalStorage.disable()
werden alle aktuellen mit der Instanz verknüpften Kontexte beendet.
Der Aufruf von asyncLocalStorage.disable()
ist erforderlich, bevor der asyncLocalStorage
garbage collected werden kann. Dies gilt nicht für Stores, die von asyncLocalStorage
bereitgestellt werden, da diese Objekte zusammen mit den entsprechenden asynchronen Ressourcen garbage collected werden.
Verwenden Sie diese Methode, wenn asyncLocalStorage
im aktuellen Prozess nicht mehr verwendet wird.
asyncLocalStorage.getStore()
Hinzugefügt in: v13.10.0, v12.17.0
- Rückgabewert: <any>
Gibt den aktuellen Store zurück. Wenn außerhalb eines asynchronen Kontexts aufgerufen wird, der durch Aufrufen von asyncLocalStorage.run()
oder asyncLocalStorage.enterWith()
initialisiert wurde, gibt er undefined
zurück.
asyncLocalStorage.enterWith(store)
Hinzugefügt in: v13.11.0, v12.17.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
store
<any>
Wechselt für den Rest der aktuellen synchronen Ausführung in den Kontext und speichert dann den Store über alle folgenden asynchronen Aufrufe hinweg.
Beispiel:
const store = { id: 1 }
// Ersetzt den vorherigen Store durch das gegebene Store-Objekt
asyncLocalStorage.enterWith(store)
asyncLocalStorage.getStore() // Gibt das Store-Objekt zurück
someAsyncOperation(() => {
asyncLocalStorage.getStore() // Gibt dasselbe Objekt zurück
})
Dieser Übergang wird für die gesamte synchrone Ausführung fortgesetzt. Das bedeutet, dass, wenn der Kontext beispielsweise innerhalb eines Ereignis-Handlers eingegeben wird, nachfolgende Ereignis-Handler auch innerhalb dieses Kontexts ausgeführt werden, es sei denn, sie sind speziell mit einer AsyncResource
an einen anderen Kontext gebunden. Aus diesem Grund sollte run()
enterWith()
vorgezogen werden, es sei denn, es gibt triftige Gründe, die letztere Methode zu verwenden.
const store = { id: 1 }
emitter.on('my-event', () => {
asyncLocalStorage.enterWith(store)
})
emitter.on('my-event', () => {
asyncLocalStorage.getStore() // Gibt dasselbe Objekt zurück
})
asyncLocalStorage.getStore() // Gibt undefined zurück
emitter.emit('my-event')
asyncLocalStorage.getStore() // Gibt dasselbe Objekt zurück
asyncLocalStorage.run(store, callback[, ...args])
Hinzugefügt in: v13.10.0, v12.17.0
store
<any>callback
<Function>...args
<any>
Führt eine Funktion synchron innerhalb eines Kontexts aus und gibt deren Rückgabewert zurück. Der Store ist außerhalb der Callback-Funktion nicht zugänglich. Der Store ist für alle innerhalb des Callbacks erstellten asynchronen Operationen zugänglich.
Die optionalen args
werden an die Callback-Funktion übergeben.
Wenn die Callback-Funktion einen Fehler auslöst, wird der Fehler auch von run()
ausgelöst. Der Stacktrace wird durch diesen Aufruf nicht beeinflusst und der Kontext wird verlassen.
Beispiel:
const store = { id: 2 }
try {
asyncLocalStorage.run(store, () => {
asyncLocalStorage.getStore() // Gibt das Store-Objekt zurück
setTimeout(() => {
asyncLocalStorage.getStore() // Gibt das Store-Objekt zurück
}, 200)
throw new Error()
})
} catch (e) {
asyncLocalStorage.getStore() // Gibt undefined zurück
// Der Fehler wird hier abgefangen
}
asyncLocalStorage.exit(callback[, ...args])
Hinzugefügt in: v13.10.0, v12.17.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
callback
<Function>...args
<any>
Führt eine Funktion synchron außerhalb eines Kontexts aus und gibt deren Rückgabewert zurück. Der Store ist innerhalb der Callback-Funktion oder der innerhalb des Callbacks erstellten asynchronen Operationen nicht zugänglich. Jeder getStore()
-Aufruf innerhalb der Callback-Funktion gibt immer undefined
zurück.
Die optionalen args
werden an die Callback-Funktion übergeben.
Wenn die Callback-Funktion einen Fehler auslöst, wird der Fehler auch von exit()
ausgelöst. Der Stacktrace wird durch diesen Aufruf nicht beeinflusst und der Kontext wird wieder betreten.
Beispiel:
// Innerhalb eines Aufrufs von run
try {
asyncLocalStorage.getStore() // Gibt das Store-Objekt oder den Wert zurück
asyncLocalStorage.exit(() => {
asyncLocalStorage.getStore() // Gibt undefined zurück
throw new Error()
})
} catch (e) {
asyncLocalStorage.getStore() // Gibt dasselbe Objekt oder denselben Wert zurück
// Der Fehler wird hier abgefangen
}
Verwendung mit async/await
Wenn innerhalb einer asynchronen Funktion nur ein await
-Aufruf innerhalb eines Kontextes ausgeführt werden soll, sollte folgendes Muster verwendet werden:
async function fn() {
await asyncLocalStorage.run(new Map(), () => {
asyncLocalStorage.getStore().set('key', value)
return foo() // Der Rückgabewert von foo wird gewartet
})
}
In diesem Beispiel ist der Store nur in der Callback-Funktion und den von foo
aufgerufenen Funktionen verfügbar. Außerhalb von run
liefert der Aufruf von getStore
undefined
.
Fehlerbehebung: Kontextverlust
In den meisten Fällen funktioniert AsyncLocalStorage
problemlos. In seltenen Fällen geht der aktuelle Store in einer der asynchronen Operationen verloren.
Wenn Ihr Code callback-basiert ist, reicht es aus, ihn mit util.promisify()
zu promisifizieren, damit er mit nativen Promises funktioniert.
Wenn Sie eine callback-basierte API verwenden müssen oder Ihr Code eine benutzerdefinierte Thenable-Implementierung voraussetzt, verwenden Sie die Klasse AsyncResource
, um die asynchrone Operation dem richtigen Ausführungskontext zuzuordnen. Finden Sie den Funktionsaufruf, der für den Kontextverlust verantwortlich ist, indem Sie den Inhalt von asyncLocalStorage.getStore()
nach den Aufrufen protokollieren, von denen Sie vermuten, dass sie für den Verlust verantwortlich sind. Wenn der Code undefined
protokolliert, ist der zuletzt aufgerufene Callback wahrscheinlich für den Kontextverlust verantwortlich.
Klasse: AsyncResource
[Versionsgeschichte]
Version | Änderungen |
---|---|
v16.4.0 | AsyncResource ist jetzt stabil. Zuvor war es experimentell. |
Die Klasse AsyncResource
ist so konzipiert, dass sie von den asynchronen Ressourcen des Embedders erweitert werden kann. Damit können Benutzer die Lebenszyklusereignisse ihrer eigenen Ressourcen einfach auslösen.
Der init
-Hook wird ausgelöst, wenn eine AsyncResource
instanziiert wird.
Im Folgenden finden Sie eine Übersicht über die AsyncResource
-API.
import { AsyncResource, executionAsyncId } from 'node:async_hooks'
// AsyncResource() soll erweitert werden. Das Instanziieren einer
// neuen AsyncResource() löst auch init aus. Wenn triggerAsyncId weggelassen wird, dann
// wird async_hook.executionAsyncId() verwendet.
const asyncResource = new AsyncResource(type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false })
// Führt eine Funktion im Ausführungskontext der Ressource aus. Dies wird
// * den Kontext der Ressource etablieren
// * die AsyncHooks-Callbacks vor dem Aufruf auslösen
// * die bereitgestellte Funktion `fn` mit den angegebenen Argumenten aufrufen
// * die AsyncHooks-Callbacks nach dem Aufruf auslösen
// * den ursprünglichen Ausführungskontext wiederherstellen
asyncResource.runInAsyncScope(fn, thisArg, ...args)
// Ruft AsyncHooks-destroy-Callbacks auf.
asyncResource.emitDestroy()
// Gibt die eindeutige ID zurück, die der AsyncResource-Instanz zugewiesen wurde.
asyncResource.asyncId()
// Gibt die Trigger-ID für die AsyncResource-Instanz zurück.
asyncResource.triggerAsyncId()
const { AsyncResource, executionAsyncId } = require('node:async_hooks')
// AsyncResource() soll erweitert werden. Das Instanziieren einer
// neuen AsyncResource() löst auch init aus. Wenn triggerAsyncId weggelassen wird, dann
// wird async_hook.executionAsyncId() verwendet.
const asyncResource = new AsyncResource(type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false })
// Führt eine Funktion im Ausführungskontext der Ressource aus. Dies wird
// * den Kontext der Ressource etablieren
// * die AsyncHooks-Callbacks vor dem Aufruf auslösen
// * die bereitgestellte Funktion `fn` mit den angegebenen Argumenten aufrufen
// * die AsyncHooks-Callbacks nach dem Aufruf auslösen
// * den ursprünglichen Ausführungskontext wiederherstellen
asyncResource.runInAsyncScope(fn, thisArg, ...args)
// Ruft AsyncHooks-destroy-Callbacks auf.
asyncResource.emitDestroy()
// Gibt die eindeutige ID zurück, die der AsyncResource-Instanz zugewiesen wurde.
asyncResource.asyncId()
// Gibt die Trigger-ID für die AsyncResource-Instanz zurück.
asyncResource.triggerAsyncId()
new AsyncResource(type[, options])
type
<string> Der Typ des asynchronen Ereignisses.options
<Object>triggerAsyncId
<number> Die ID des Ausführungskontexts, der dieses asynchrone Ereignis erstellt hat. Standardwert:executionAsyncId()
.requireManualDestroy
<boolean> Wenn auftrue
gesetzt, deaktiviertemitDestroy
, wenn das Objekt garbage collected wird. Dies muss normalerweise nicht gesetzt werden (auch wennemitDestroy
manuell aufgerufen wird), es sei denn, dieasyncId
der Ressource wird abgerufen und dieemitDestroy
der sensiblen API wird damit aufgerufen. Wenn auffalse
gesetzt, findet deremitDestroy
-Aufruf beim Garbage Collecting nur statt, wenn mindestens ein aktiverdestroy
-Hook vorhanden ist. Standardwert:false
.
Beispielhafte Verwendung:
class DBQuery extends AsyncResource {
constructor(db) {
super('DBQuery')
this.db = db
}
getInfo(query, callback) {
this.db.get(query, (err, data) => {
this.runInAsyncScope(callback, null, err, data)
})
}
close() {
this.db = null
this.emitDestroy()
}
}
Statische Methode: AsyncResource.bind(fn[, type[, thisArg]])
[Historie]
Version | Änderungen |
---|---|
v20.0.0 | Die Eigenschaft asyncResource , die der gebundenen Funktion hinzugefügt wurde, ist veraltet und wird in einer zukünftigen Version entfernt. |
v17.8.0, v16.15.0 | Der Standardwert, wenn thisArg undefiniert ist, wurde geändert, um this vom Aufrufer zu verwenden. |
v16.0.0 | Optionales thisArg hinzugefügt. |
v14.8.0, v12.19.0 | Hinzugefügt in: v14.8.0, v12.19.0 |
fn
<Function> Die Funktion, die an den aktuellen Ausführungskontext gebunden werden soll.type
<string> Ein optionaler Name, der der zugrunde liegendenAsyncResource
zugeordnet werden soll.thisArg
<any>
Bindet die gegebene Funktion an den aktuellen Ausführungskontext.
asyncResource.bind(fn[, thisArg])
[Historie]
Version | Änderungen |
---|---|
v20.0.0 | Die asyncResource -Eigenschaft, die der gebundenen Funktion hinzugefügt wurde, ist veraltet und wird in einer zukünftigen Version entfernt. |
v17.8.0, v16.15.0 | Der Standardwert, wenn thisArg undefiniert ist, wurde geändert, um this vom Aufrufer zu verwenden. |
v16.0.0 | Optionales thisArg hinzugefügt. |
v14.8.0, v12.19.0 | Hinzugefügt in: v14.8.0, v12.19.0 |
fn
<Function> Die Funktion, die an den aktuellenAsyncResource
gebunden werden soll.thisArg
<any>
Bindet die gegebene Funktion an den Scope dieser AsyncResource
zur Ausführung.
asyncResource.runInAsyncScope(fn[, thisArg, ...args])
Hinzugefügt in: v9.6.0
fn
<Function> Die Funktion, die im Ausführungskontext dieser asynchronen Ressource aufgerufen werden soll.thisArg
<any> Der Empfänger, der für den Funktionsaufruf verwendet werden soll....args
<any> Optionale Argumente, die an die Funktion übergeben werden sollen.
Ruft die bereitgestellte Funktion mit den bereitgestellten Argumenten im Ausführungskontext der asynchronen Ressource auf. Dies wird den Kontext erstellen, die AsyncHooks vor den Callbacks auslösen, die Funktion aufrufen, die AsyncHooks nach den Callbacks auslösen und dann den ursprünglichen Ausführungskontext wiederherstellen.
asyncResource.emitDestroy()
- Rückgabewert: <AsyncResource> Ein Verweis auf
asyncResource
.
Ruft alle destroy
-Hooks auf. Dies sollte nur einmal aufgerufen werden. Ein Fehler wird ausgegeben, wenn es mehr als einmal aufgerufen wird. Dies muss manuell aufgerufen werden. Wenn die Ressource vom Garbage Collector gesammelt wird, werden die destroy
-Hooks niemals aufgerufen.
asyncResource.asyncId()
- Rückgabewert: <number> Die eindeutige
asyncId
, die der Ressource zugewiesen ist.
asyncResource.triggerAsyncId()
- Rückgabewert: <number> Die gleiche
triggerAsyncId
, die an denAsyncResource
-Konstruktor übergeben wird.
Verwendung von AsyncResource
für einen Worker
-Thread-Pool
Das folgende Beispiel zeigt, wie man die Klasse AsyncResource
verwendet, um eine korrekte asynchrone Nachverfolgung für einen Worker
-Pool bereitzustellen. Andere Ressourcenpools, wie z. B. Datenbankverbindungspools, können einem ähnlichen Modell folgen.
Angenommen, die Aufgabe besteht darin, zwei Zahlen zu addieren, wobei eine Datei namens task_processor.js
mit folgendem Inhalt verwendet wird:
import { parentPort } from 'node:worker_threads'
parentPort.on('message', task => {
parentPort.postMessage(task.a + task.b)
})
const { parentPort } = require('node:worker_threads')
parentPort.on('message', task => {
parentPort.postMessage(task.a + task.b)
})
Ein Worker-Pool darum herum könnte die folgende Struktur verwenden:
import { AsyncResource } from 'node:async_hooks'
import { EventEmitter } from 'node:events'
import { Worker } from 'node:worker_threads'
const kTaskInfo = Symbol('kTaskInfo')
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent')
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo')
this.callback = callback
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result)
this.emitDestroy() // `TaskInfo`s are used only once.
}
}
export default class WorkerPool extends EventEmitter {
constructor(numThreads) {
super()
this.numThreads = numThreads
this.workers = []
this.freeWorkers = []
this.tasks = []
for (let i = 0; i < numThreads; i++) this.addNewWorker()
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift()
this.runTask(task, callback)
}
})
}
addNewWorker() {
const worker = new Worker(new URL('task_processor.js', import.meta.url))
worker.on('message', result => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result)
worker[kTaskInfo] = null
this.freeWorkers.push(worker)
this.emit(kWorkerFreedEvent)
})
worker.on('error', err => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo]) worker[kTaskInfo].done(err, null)
else this.emit('error', err)
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1)
this.addNewWorker()
})
this.workers.push(worker)
this.freeWorkers.push(worker)
this.emit(kWorkerFreedEvent)
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback })
return
}
const worker = this.freeWorkers.pop()
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback)
worker.postMessage(task)
}
close() {
for (const worker of this.workers) worker.terminate()
}
}
const { AsyncResource } = require('node:async_hooks')
const { EventEmitter } = require('node:events')
const path = require('node:path')
const { Worker } = require('node:worker_threads')
const kTaskInfo = Symbol('kTaskInfo')
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent')
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo')
this.callback = callback
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result)
this.emitDestroy() // `TaskInfo`s are used only once.
}
}
class WorkerPool extends EventEmitter {
constructor(numThreads) {
super()
this.numThreads = numThreads
this.workers = []
this.freeWorkers = []
this.tasks = []
for (let i = 0; i < numThreads; i++) this.addNewWorker()
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift()
this.runTask(task, callback)
}
})
}
addNewWorker() {
const worker = new Worker(path.resolve(__dirname, 'task_processor.js'))
worker.on('message', result => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result)
worker[kTaskInfo] = null
this.freeWorkers.push(worker)
this.emit(kWorkerFreedEvent)
})
worker.on('error', err => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo]) worker[kTaskInfo].done(err, null)
else this.emit('error', err)
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1)
this.addNewWorker()
})
this.workers.push(worker)
this.freeWorkers.push(worker)
this.emit(kWorkerFreedEvent)
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback })
return
}
const worker = this.freeWorkers.pop()
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback)
worker.postMessage(task)
}
close() {
for (const worker of this.workers) worker.terminate()
}
}
module.exports = WorkerPool
Ohne die explizite Verfolgung, die durch die WorkerPoolTaskInfo
-Objekte hinzugefügt wird, würde es so aussehen, als wären die Callbacks den einzelnen Worker
-Objekten zugeordnet. Die Erstellung der Worker
ist jedoch nicht mit der Erstellung der Aufgaben verbunden und liefert keine Informationen darüber, wann Aufgaben geplant wurden.
Dieser Pool könnte wie folgt verwendet werden:
import WorkerPool from './worker_pool.js'
import os from 'node:os'
const pool = new WorkerPool(os.availableParallelism())
let finished = 0
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result)
if (++finished === 10) pool.close()
})
}
const WorkerPool = require('./worker_pool.js')
const os = require('node:os')
const pool = new WorkerPool(os.availableParallelism())
let finished = 0
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result)
if (++finished === 10) pool.close()
})
}
Integration von AsyncResource
mit EventEmitter
Ereignislistener, die von einem EventEmitter
ausgelöst werden, werden möglicherweise in einem anderen Ausführungskontext ausgeführt als dem, der aktiv war, als eventEmitter.on()
aufgerufen wurde.
Das folgende Beispiel zeigt, wie die Klasse AsyncResource
verwendet wird, um einen Ereignislistener korrekt dem richtigen Ausführungskontext zuzuordnen. Der gleiche Ansatz kann auf einen Stream
oder eine ähnliche ereignisgesteuerte Klasse angewendet werden.
import { createServer } from 'node:http'
import { AsyncResource, executionAsyncId } from 'node:async_hooks'
const server = createServer((req, res) => {
req.on(
'close',
AsyncResource.bind(() => {
// Ausführungskontext ist an den aktuellen äußeren Bereich gebunden.
})
)
req.on('close', () => {
// Ausführungskontext ist an den Bereich gebunden, der das Auslösen von 'close' verursacht hat.
})
res.end()
}).listen(3000)
const { createServer } = require('node:http')
const { AsyncResource, executionAsyncId } = require('node:async_hooks')
const server = createServer((req, res) => {
req.on(
'close',
AsyncResource.bind(() => {
// Ausführungskontext ist an den aktuellen äußeren Bereich gebunden.
})
)
req.on('close', () => {
// Ausführungskontext ist an den Bereich gebunden, der das Auslösen von 'close' verursacht hat.
})
res.end()
}).listen(3000)