تتبع السياق غير المتزامن
[مستقر: 2 - مستقر]
مستقر: 2 الاستقرار: 2 - مستقر
شفرة المصدر: lib/async_hooks.js
مقدمة
تُستخدم هذه الفئات لربط الحالة ونشرها عبر معاودة الاتصال وسلاسل الوعد. تسمح بتخزين البيانات طوال مدة طلب الويب أو أي مدة غير متزامنة أخرى. وهو مشابه للتخزين المحلي للخيط في اللغات الأخرى.
تعد الفئتان AsyncLocalStorage
و AsyncResource
جزءًا من الوحدة النمطية node:async_hooks
:
import { AsyncLocalStorage, AsyncResource } from 'node:async_hooks';
const { AsyncLocalStorage, AsyncResource } = require('node:async_hooks');
الفئة: AsyncLocalStorage
[السجل]
الإصدار | التغييرات |
---|---|
الإصدار v16.4.0 | أصبح AsyncLocalStorage الآن مستقرًا. سابقًا، كان تجريبيًا. |
الإصداران v13.10.0 و v12.17.0 | تمت الإضافة في: v13.10.0 و v12.17.0 |
تنشئ هذه الفئة مخازن تظل متماسكة من خلال العمليات غير المتزامنة.
في حين أنه يمكنك إنشاء التنفيذ الخاص بك فوق الوحدة النمطية node:async_hooks
، يجب تفضيل AsyncLocalStorage
لأنه تنفيذ آمن للذاكرة وذو أداء عالٍ ويتضمن تحسينات كبيرة ليس من الواضح تنفيذها.
يستخدم المثال التالي AsyncLocalStorage
لإنشاء مُسجل بسيط يقوم بتعيين معرفات لطلبات HTTP الواردة ويتضمنها في الرسائل المسجلة داخل كل طلب.
import http from 'node:http';
import { AsyncLocalStorage } from 'node:async_hooks';
const asyncLocalStorage = new AsyncLocalStorage();
function logWithId(msg) {
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : '-'}:`, msg);
}
let idSeq = 0;
http.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start');
// تخيل أي سلسلة من العمليات غير المتزامنة هنا
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);
http.get('http://localhost:8080');
http.get('http://localhost:8080');
// طباعة:
// 0: start
// 1: start
// 0: finish
// 1: finish
const http = require('node:http');
const { AsyncLocalStorage } = require('node:async_hooks');
const asyncLocalStorage = new AsyncLocalStorage();
function logWithId(msg) {
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : '-'}:`, msg);
}
let idSeq = 0;
http.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start');
// تخيل أي سلسلة من العمليات غير المتزامنة هنا
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);
http.get('http://localhost:8080');
http.get('http://localhost:8080');
// طباعة:
// 0: start
// 1: start
// 0: finish
// 1: finish
تحتفظ كل نسخة من AsyncLocalStorage
بسياق تخزين مستقل. يمكن أن توجد نسخ متعددة بأمان في وقت واحد دون التعرض لخطر التداخل مع بيانات بعضها البعض.
new AsyncLocalStorage()
[التاريخ]
الإصدار | التغييرات |
---|---|
v19.7.0, v18.16.0 | تمت إزالة الخيار التجريبي onPropagate. |
v19.2.0, v18.13.0 | إضافة الخيار onPropagate. |
v13.10.0, v12.17.0 | تمت الإضافة في: v13.10.0, v12.17.0 |
يقوم بإنشاء نسخة جديدة من AsyncLocalStorage
. يتم توفير المخزن فقط داخل استدعاء run()
أو بعد استدعاء enterWith()
.
طريقة ثابتة: AsyncLocalStorage.bind(fn)
تمت الإضافة في: v19.8.0, v18.16.0
[مستقر: 1 - تجريبي]
مستقر: 1 الاستقرار: 1 - تجريبي
fn
<Function> الدالة المراد ربطها بسياق التنفيذ الحالي.- الإرجاع: <Function> دالة جديدة تستدعي
fn
داخل سياق التنفيذ الذي تم التقاطه.
يقوم بربط الدالة المعطاة بسياق التنفيذ الحالي.
طريقة ثابتة: AsyncLocalStorage.snapshot()
تمت الإضافة في: v19.8.0, v18.16.0
[مستقر: 1 - تجريبي]
مستقر: 1 الاستقرار: 1 - تجريبي
- الإرجاع: <Function> دالة جديدة تحمل التوقيع
(fn: (...args) : R, ...args) : R
.
يلتقط سياق التنفيذ الحالي ويعيد دالة تقبل دالة كوسيطة. كلما تم استدعاء الدالة المُعادة، فإنها تستدعي الدالة التي تم تمريرها إليها داخل السياق الذي تم التقاطه.
const asyncLocalStorage = new AsyncLocalStorage();
const runInAsyncScope = asyncLocalStorage.run(123, () => AsyncLocalStorage.snapshot());
const result = asyncLocalStorage.run(321, () => runInAsyncScope(() => asyncLocalStorage.getStore()));
console.log(result); // returns 123
يمكن لـ AsyncLocalStorage.snapshot() أن يحل محل استخدام AsyncResource لأغراض تتبع سياق غير متزامن بسيط، على سبيل المثال:
class Foo {
#runInAsyncScope = AsyncLocalStorage.snapshot();
get() { return this.#runInAsyncScope(() => asyncLocalStorage.getStore()); }
}
const foo = asyncLocalStorage.run(123, () => new Foo());
console.log(asyncLocalStorage.run(321, () => foo.get())); // returns 123
asyncLocalStorage.disable()
أضيف في: الإصدار 13.10.0، الإصدار 12.17.0
[مستقر: 1 - تجريبي]
مستقر: 1 الاستقرار: 1 - تجريبي
يعطِّل نسخة AsyncLocalStorage
. ستعيد جميع الاستدعاءات اللاحقة لـ asyncLocalStorage.getStore()
القيمة undefined
حتى يتم استدعاء asyncLocalStorage.run()
أو asyncLocalStorage.enterWith()
مرة أخرى.
عند استدعاء asyncLocalStorage.disable()
، سيتم إنهاء جميع السياقات الحالية المرتبطة بالنسخة.
يُعد استدعاء asyncLocalStorage.disable()
مطلوبًا قبل أن يتم جمع asyncLocalStorage
كقمامة. لا ينطبق هذا على المخازن التي يوفرها asyncLocalStorage
، حيث يتم جمع هذه الكائنات كقمامة جنبًا إلى جنب مع موارد async المقابلة.
استخدم هذه الطريقة عندما لا يكون asyncLocalStorage
قيد الاستخدام بعد الآن في العملية الحالية.
asyncLocalStorage.getStore()
أضيف في: الإصدار 13.10.0، الإصدار 12.17.0
- يُرجع: <any>
إرجاع المتجر الحالي. إذا تم استدعاؤه خارج سياق غير متزامن تم تهيئته عن طريق استدعاء asyncLocalStorage.run()
أو asyncLocalStorage.enterWith()
، فإنه يُرجع undefined
.
asyncLocalStorage.enterWith(store)
أضيف في: الإصدار 13.11.0، الإصدار 12.17.0
[مستقر: 1 - تجريبي]
مستقر: 1 الاستقرار: 1 - تجريبي
store
<any>
الانتقال إلى السياق للفترة المتبقية من التنفيذ المتزامن الحالي ثم الاحتفاظ بالمتجر من خلال أي استدعاءات غير متزامنة لاحقة.
مثال:
const store = { id: 1 };
// يستبدل المتجر السابق بكائن المتجر المحدد
asyncLocalStorage.enterWith(store);
asyncLocalStorage.getStore(); // يُرجع كائن المتجر
someAsyncOperation(() => {
asyncLocalStorage.getStore(); // يُرجع نفس الكائن
});
سيستمر هذا الانتقال لكامل التنفيذ المتزامن. هذا يعني أنه على سبيل المثال، إذا تم إدخال السياق داخل معالج أحداث، فسيتم أيضًا تشغيل معالجات الأحداث اللاحقة داخل هذا السياق ما لم يتم ربطها تحديدًا بسياق آخر باستخدام AsyncResource
. لهذا السبب يجب تفضيل run()
على enterWith()
ما لم تكن هناك أسباب قوية لاستخدام الطريقة الأخيرة.
const store = { id: 1 };
emitter.on('my-event', () => {
asyncLocalStorage.enterWith(store);
});
emitter.on('my-event', () => {
asyncLocalStorage.getStore(); // يُرجع نفس الكائن
});
asyncLocalStorage.getStore(); // يُرجع undefined
emitter.emit('my-event');
asyncLocalStorage.getStore(); // يُرجع نفس الكائن
asyncLocalStorage.run(store, callback[, ...args])
أُضيف في: الإصدار v13.10.0, v12.17.0
store
<any>callback
<Function>...args
<any>
يشغّل دالة بشكل متزامن داخل سياق ويعيد قيمة الإرجاع الخاصة بها. لا يمكن الوصول إلى المخزن خارج دالة الاستدعاء. يمكن الوصول إلى المخزن لأي عمليات غير متزامنة تم إنشاؤها داخل دالة الاستدعاء.
يتم تمرير args
الاختيارية إلى دالة الاستدعاء.
إذا طرحت دالة الاستدعاء خطأً، فسيتم طرح الخطأ بواسطة run()
أيضًا. لا تتأثر آثار التتبع بهذه المكالمة ويتم الخروج من السياق.
مثال:
const store = { id: 2 };
try {
asyncLocalStorage.run(store, () => {
asyncLocalStorage.getStore(); // يعيد كائن المخزن
setTimeout(() => {
asyncLocalStorage.getStore(); // يعيد كائن المخزن
}, 200);
throw new Error();
});
} catch (e) {
asyncLocalStorage.getStore(); // يعيد undefined
// سيتم التقاط الخطأ هنا
}
asyncLocalStorage.exit(callback[, ...args])
أُضيف في: الإصدار v13.10.0, v12.17.0
[مستقر: 1 - تجريبي]
مستقر: 1 الاستقرار: 1 - تجريبي
callback
<Function>...args
<any>
يشغّل دالة بشكل متزامن خارج سياق ويعيد قيمة الإرجاع الخاصة بها. لا يمكن الوصول إلى المخزن داخل دالة الاستدعاء أو العمليات غير المتزامنة التي تم إنشاؤها داخل دالة الاستدعاء. أي استدعاء getStore()
يتم إجراؤه داخل دالة الاستدعاء سيعيد دائمًا undefined
.
يتم تمرير args
الاختيارية إلى دالة الاستدعاء.
إذا طرحت دالة الاستدعاء خطأً، فسيتم طرح الخطأ بواسطة exit()
أيضًا. لا تتأثر آثار التتبع بهذه المكالمة ويتم إعادة إدخال السياق.
مثال:
// ضمن استدعاء لـ run
try {
asyncLocalStorage.getStore(); // يعيد كائن أو قيمة المخزن
asyncLocalStorage.exit(() => {
asyncLocalStorage.getStore(); // يعيد undefined
throw new Error();
});
} catch (e) {
asyncLocalStorage.getStore(); // يعيد نفس الكائن أو القيمة
// سيتم التقاط الخطأ هنا
}
الاستخدام مع async/await
إذا كان سيتم تشغيل استدعاء await
واحد فقط في سياق دالة غير متزامنة، فيجب استخدام النمط التالي:
async function fn() {
await asyncLocalStorage.run(new Map(), () => {
asyncLocalStorage.getStore().set('key', value);
return foo(); // سيتم انتظار القيمة المرجعة لـ foo
});
}
في هذا المثال، يكون المتجر متاحًا فقط في دالة رد الاتصال والدوال التي يتم استدعاؤها بواسطة foo
. خارج run
، سيؤدي استدعاء getStore
إلى إرجاع undefined
.
استكشاف الأخطاء وإصلاحها: فقدان السياق
في معظم الحالات، يعمل AsyncLocalStorage
دون مشاكل. في حالات نادرة، يتم فقدان المتجر الحالي في إحدى العمليات غير المتزامنة.
إذا كان التعليمات البرمجية الخاصة بك تعتمد على ردود الاتصال، فيكفي تحويلها إلى وعد باستخدام util.promisify()
حتى تبدأ في العمل مع الوعود الأصلية.
إذا كنت بحاجة إلى استخدام واجهة برمجة تطبيقات تعتمد على ردود الاتصال أو كان التعليمات البرمجية الخاصة بك تفترض تطبيقًا مخصصًا قابلاً للتنفيذ، فاستخدم الفئة AsyncResource
لربط العملية غير المتزامنة بسياق التنفيذ الصحيح. ابحث عن استدعاء الدالة المسؤول عن فقدان السياق عن طريق تسجيل محتوى asyncLocalStorage.getStore()
بعد الاستدعاءات التي تشك في أنها مسؤولة عن الفقدان. عندما يسجل التعليمات البرمجية undefined
، فمن المحتمل أن يكون رد الاتصال الأخير الذي تم استدعاؤه مسؤولاً عن فقدان السياق.
الفئة: AsyncResource
[السجل]
الإصدار | التغييرات |
---|---|
v16.4.0 | AsyncResource مستقرة الآن. كانت تجريبية سابقًا. |
تم تصميم الفئة AsyncResource
ليتم توسيعها بواسطة موارد المضمن غير المتزامنة. باستخدام هذا، يمكن للمستخدمين بسهولة تشغيل أحداث دورة حياة مواردهم الخاصة.
سيتم تشغيل الخطاف init
عند إنشاء مثيل لـ AsyncResource
.
فيما يلي نظرة عامة على واجهة برمجة تطبيقات AsyncResource
.
import { AsyncResource, executionAsyncId } from 'node:async_hooks';
// AsyncResource() مخصصة للتوسيع. إنشاء مثيل
// جديد لـ AsyncResource() يؤدي أيضًا إلى تشغيل init. إذا تم حذف triggerAsyncId، فسيتم
// استخدام async_hook.executionAsyncId().
const asyncResource = new AsyncResource(
type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);
// قم بتشغيل دالة في سياق تنفيذ المورد. هذا سوف
// * إنشاء سياق المورد
// * تشغيل ردود الاتصال AsyncHooks before
// * استدعاء الدالة المتوفرة `fn` مع الوسائط المقدمة
// * تشغيل ردود الاتصال AsyncHooks after
// * استعادة سياق التنفيذ الأصلي
asyncResource.runInAsyncScope(fn, thisArg, ...args);
// استدعاء ردود الاتصال AsyncHooks destroy.
asyncResource.emitDestroy();
// إرجاع المعرف الفريد المعين لمثيل AsyncResource.
asyncResource.asyncId();
// إرجاع معرف المشغل لمثيل AsyncResource.
asyncResource.triggerAsyncId();
const { AsyncResource, executionAsyncId } = require('node:async_hooks');
// AsyncResource() مخصصة للتوسيع. إنشاء مثيل
// جديد لـ AsyncResource() يؤدي أيضًا إلى تشغيل init. إذا تم حذف triggerAsyncId، فسيتم
// استخدام async_hook.executionAsyncId().
const asyncResource = new AsyncResource(
type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);
// قم بتشغيل دالة في سياق تنفيذ المورد. هذا سوف
// * إنشاء سياق المورد
// * تشغيل ردود الاتصال AsyncHooks before
// * استدعاء الدالة المتوفرة `fn` مع الوسائط المقدمة
// * تشغيل ردود الاتصال AsyncHooks after
// * استعادة سياق التنفيذ الأصلي
asyncResource.runInAsyncScope(fn, thisArg, ...args);
// استدعاء ردود الاتصال AsyncHooks destroy.
asyncResource.emitDestroy();
// إرجاع المعرف الفريد المعين لمثيل AsyncResource.
asyncResource.asyncId();
// إرجاع معرف المشغل لمثيل AsyncResource.
asyncResource.triggerAsyncId();
new AsyncResource(type[, options])
type
<string> نوع الحدث غير المتزامن.options
<Object>triggerAsyncId
<number> معرّف سياق التنفيذ الذي أنشأ هذا الحدث غير المتزامن. افتراضي:executionAsyncId()
.requireManualDestroy
<boolean> إذا تم تعيينه علىtrue
، فسيتم تعطيلemitDestroy
عند جمع الكائن المهمل. لا يلزم عادةً تعيين هذا (حتى إذا تم استدعاءemitDestroy
يدويًا)، إلا إذا تم استردادasyncId
للمورد وتم استدعاء واجهة برمجة التطبيقات الحساسةemitDestroy
به. عند التعيين علىfalse
، سيتم استدعاءemitDestroy
عند جمع البيانات المهملة فقط إذا كان هناك خطافdestroy
نشط واحد على الأقل. افتراضي:false
.
مثال على الاستخدام:
class DBQuery extends AsyncResource {
constructor(db) {
super('DBQuery');
this.db = db;
}
getInfo(query, callback) {
this.db.get(query, (err, data) => {
this.runInAsyncScope(callback, null, err, data);
});
}
close() {
this.db = null;
this.emitDestroy();
}
}
طريقة ثابتة: AsyncResource.bind(fn[, type[, thisArg]])
[السجل]
الإصدار | التغييرات |
---|---|
v20.0.0 | تم إهمال خاصية asyncResource المضافة إلى الدالة المربوطة وستتم إزالتها في إصدار مستقبلي. |
v17.8.0, v16.15.0 | تم تغيير القيمة الافتراضية عندما تكون thisArg غير معرّفة لاستخدام this من المتصل. |
v16.0.0 | تمت إضافة thisArg اختيارية. |
v14.8.0, v12.19.0 | تمت إضافتها في: v14.8.0, v12.19.0 |
fn
<Function> الدالة المراد ربطها بسياق التنفيذ الحالي.type
<string> اسم اختياري لربطه بـAsyncResource
الأساسي.thisArg
<any>
يربط الدالة المحددة بسياق التنفيذ الحالي.
asyncResource.bind(fn[, thisArg])
[سجل التغييرات]
الإصدار | التغييرات |
---|---|
v20.0.0 | تم إهمال خاصية asyncResource المضافة إلى الدالة المرتبطة وسيتم إزالتها في إصدار مستقبلي. |
v17.8.0, v16.15.0 | تم تغيير الإعداد الافتراضي عندما يكون thisArg غير معرف لاستخدام this من المتصل. |
v16.0.0 | تمت إضافة thisArg اختياري. |
v14.8.0, v12.19.0 | تمت إضافتها في: v14.8.0, v12.19.0 |
يربط الدالة المحددة ليتم تنفيذها في نطاق AsyncResource
هذا.
asyncResource.runInAsyncScope(fn[, thisArg, ...args])
تمت إضافتها في: v9.6.0
fn
<الدالة> الدالة المراد استدعاؤها في سياق التنفيذ لمورد async هذا.thisArg
<any> المُستقبِل المراد استخدامه لاستدعاء الدالة....args
<any> وسيطات اختيارية لتمريرها إلى الدالة.
يستدعي الدالة المقدمة مع الوسيطات المقدمة في سياق التنفيذ لمورد async. سيؤدي هذا إلى إنشاء السياق، وتشغيل معاودة الاتصال AsyncHooks before، واستدعاء الدالة، وتشغيل معاودة الاتصال AsyncHooks after، ثم استعادة سياق التنفيذ الأصلي.
asyncResource.emitDestroy()
- الإرجاع: <AsyncResource> مرجع إلى
asyncResource
.
يستدعي جميع خطافات destroy
. يجب استدعاء هذا مرة واحدة فقط. سيتم طرح خطأ إذا تم استدعاؤه أكثر من مرة. يجب استدعاء هذا يدويًا. إذا تُرك المورد ليتم جمعه بواسطة GC، فلن يتم استدعاء خطافات destroy
أبدًا.
asyncResource.asyncId()
- المردود: <number>
asyncId
الفريد المُعيَّن للمورد.
asyncResource.triggerAsyncId()
- المردود: <number> نفس
triggerAsyncId
الذي تم تمريره إلى الدالة البانيةAsyncResource
.
استخدام AsyncResource
لمجموعة عمليات Worker
يوضح المثال التالي كيفية استخدام الفئة AsyncResource
لتوفير تتبع غير متزامن لمجموعة Worker
بشكل صحيح. يمكن لمجموعات الموارد الأخرى، مثل مجموعات اتصال قاعدة البيانات، اتباع نموذج مماثل.
بافتراض أن المهمة هي إضافة رقمين، باستخدام ملف باسم task_processor.js
بالمحتوى التالي:
import { parentPort } from 'node:worker_threads';
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
const { parentPort } = require('node:worker_threads');
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
يمكن لمجموعة عمليات Worker حولها استخدام البنية التالية:
import { AsyncResource } from 'node:async_hooks';
import { EventEmitter } from 'node:events';
import { Worker } from 'node:worker_threads';
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
export default class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}
addNewWorker() {
const worker = new Worker(new URL('task_processor.js', import.meta.url));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}
const { AsyncResource } = require('node:async_hooks');
const { EventEmitter } = require('node:events');
const path = require('node:path');
const { Worker } = require('node:worker_threads');
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}
addNewWorker() {
const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}
module.exports = WorkerPool;
بدون التتبع الصريح المضاف بواسطة كائنات WorkerPoolTaskInfo
، سيظهر أن ردود النداء مرتبطة بكائنات Worker
الفردية. ومع ذلك، فإن إنشاء Worker
s لا يرتبط بإنشاء المهام ولا يوفر معلومات حول وقت جدولة المهام.
يمكن استخدام هذه المجموعة على النحو التالي:
import WorkerPool from './worker_pool.js';
import os from 'node:os';
const pool = new WorkerPool(os.availableParallelism());
let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
const WorkerPool = require('./worker_pool.js');
const os = require('node:os');
const pool = new WorkerPool(os.availableParallelism());
let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
دمج AsyncResource
مع EventEmitter
قد يتم تشغيل مستمعي الأحداث الذين تم تشغيلهم بواسطة EventEmitter
في سياق تنفيذ مختلف عن السياق النشط عند استدعاء eventEmitter.on()
.
يوضح المثال التالي كيفية استخدام الفئة AsyncResource
لربط مستمع الأحداث بشكل صحيح بسياق التنفيذ الصحيح. يمكن تطبيق نفس النهج على Stream
أو فئة مماثلة تعتمد على الأحداث.
import { createServer } from 'node:http';
import { AsyncResource, executionAsyncId } from 'node:async_hooks';
const server = createServer((req, res) => {
req.on('close', AsyncResource.bind(() => {
// Execution context is bound to the current outer scope.
}));
req.on('close', () => {
// Execution context is bound to the scope that caused 'close' to emit.
});
res.end();
}).listen(3000);
const { createServer } = require('node:http');
const { AsyncResource, executionAsyncId } = require('node:async_hooks');
const server = createServer((req, res) => {
req.on('close', AsyncResource.bind(() => {
// Execution context is bound to the current outer scope.
}));
req.on('close', () => {
// Execution context is bound to the scope that caused 'close' to emit.
});
res.end();
}).listen(3000);