الضغط العكسي في التدفقات
هناك مشكلة عامة تحدث أثناء معالجة البيانات تسمى الضغط العكسي وتصف تراكم البيانات خلف مخزن مؤقت أثناء نقل البيانات. عندما يكون الطرف المتلقي للنقل لديه عمليات معقدة، أو يكون أبطأ لأي سبب من الأسباب، هناك ميل لتراكم البيانات من المصدر الوارد، مثل الانسداد.
لحل هذه المشكلة، يجب أن يكون هناك نظام تفويض جاهز لضمان تدفق سلس للبيانات من مصدر إلى آخر. لقد حلت مجتمعات مختلفة هذه المشكلة بشكل فريد لبرامجها، وأنابيب يونكس ومآخذ TCP أمثلة جيدة على ذلك، وغالبًا ما يشار إليها باسم التحكم في التدفق. في Node.js، كانت التدفقات هي الحل المعتمد.
الغرض من هذا الدليل هو توضيح ما هو الضغط العكسي، وكيف تعالج التدفقات هذه المشكلة بالضبط في رمز مصدر Node.js. سيقدم الجزء الثاني من الدليل أفضل الممارسات المقترحة لضمان أن يكون رمز تطبيقك آمنًا ومحسّنًا عند تنفيذ التدفقات.
نفترض معرفة بسيطة بالتعريف العام لـ backpressure
و Buffer
و EventEmitters
في Node.js، بالإضافة إلى بعض الخبرة في Stream
. إذا لم تقرأ هذه الوثائق، فليس من السيء إلقاء نظرة على وثائق API أولاً، حيث سيساعد ذلك على توسيع فهمك أثناء قراءة هذا الدليل.
مشكلة معالجة البيانات
في نظام الكمبيوتر، يتم نقل البيانات من عملية إلى أخرى عبر الأنابيب والمآخذ والإشارات. في Node.js، نجد آلية مشابهة تسمى Stream
. التدفقات رائعة! إنها تفعل الكثير من أجل Node.js وتستخدم تقريبًا كل جزء من قاعدة البيانات الداخلية هذه الوحدة النمطية. بصفتك مطورًا، فإنك مدعو أكثر من ذلك لاستخدامها أيضًا!
const readline = require('node:readline')
const rl = readline.createInterface({
output: process.stdout,
input: process.stdin,
})
rl.question('Why should you use streams? ', answer => {
console.log(`Maybe it's ${answer}, maybe it's because they are awesome!`)
})
rl.close()
يمكن توضيح مثال جيد على سبب كون آلية الضغط العكسي التي تم تنفيذها من خلال التدفقات تحسينًا رائعًا من خلال مقارنة أدوات النظام الداخلية من تنفيذ Stream في Node.js.
في أحد السيناريوهات، سنأخذ ملفًا كبيرًا (حوالي -9 غيغابايت) ونضغطه باستخدام أداة zip(1)
المألوفة.
zip The.Matrix.1080p.mkv
بينما سيستغرق هذا بضع دقائق لإكماله، في قشرة أخرى قد نقوم بتشغيل برنامج نصي يأخذ وحدة Node.js zlib
، التي تلف حول أداة ضغط أخرى، gzip(1)
.
const gzip = require('node:zlib').createGzip()
const fs = require('node:fs')
const inp = fs.createReadStream('The.Matrix.1080p.mkv')
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz')
inp.pipe(gzip).pipe(out)
لاختبار النتائج، حاول فتح كل ملف مضغوط. سيُعلمك الملف المضغوط بواسطة أداة zip(1)
أن الملف تالف، بينما سيتم فك ضغط الضغط الذي تم إنجازه بواسطة Stream بدون خطأ.
ملاحظة
في هذا المثال، نستخدم .pipe()
للحصول على مصدر البيانات من أحد الطرفين إلى الطرف الآخر. ومع ذلك، لاحظ أنه لا توجد معالجات أخطاء مناسبة مرفقة. إذا فشلت قطعة من البيانات في الاستقبال بشكل صحيح، فلن يتم تدمير مصدر Readable أو تدفق gzip
. pump
هي أداة مساعدة ستقوم بتدمير جميع التدفقات في خط الأنابيب بشكل صحيح إذا فشل أحدها أو أغلق، وهي ضرورية في هذه الحالة!
pump
ضرورية فقط لنظام Node.js 8.x أو أقدم، أما بالنسبة لإصدار Node.js 10.x أو أحدث، يتم تقديم pipeline
ليحل محل pump
. هذه هي طريقة وحدة نمطية للربط بين التدفقات التي تقوم بتحويل الأخطاء وتنظيفها بشكل صحيح وتوفير استدعاء للوظيفة عند اكتمال خط الأنابيب.
فيما يلي مثال على استخدام pipeline:
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge video file efficiently:
pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
err => {
if (err) {
console.error('Pipeline failed', err)
} else {
console.log('Pipeline succeeded')
}
}
)
يمكنك أيضًا استخدام وحدة stream/promises
لاستخدام pipeline مع async / await
:
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
try {
await pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz')
)
console.log('Pipeline succeeded')
} catch (err) {
console.error('Pipeline failed', err)
}
}
بيانات كثيرة جداً، بسرعة كبيرة جداً
هناك حالات قد يوفر فيها تدفق Readable
البيانات إلى Writable
بسرعة كبيرة جداً - أكثر بكثير مما يمكن للمستهلك التعامل معه!
عندما يحدث ذلك، سيبدأ المستهلك في وضع جميع أجزاء البيانات في قائمة انتظار للاستهلاك لاحقاً. ستصبح قائمة انتظار الكتابة أطول وأطول، وبسبب هذا يجب الاحتفاظ ببيانات أكثر في الذاكرة حتى اكتمال العملية بأكملها.
الكتابة على القرص أبطأ بكثير من القراءة من القرص، وبالتالي، عندما نحاول ضغط ملف وكتابته على القرص الصلب الخاص بنا، سيحدث ضغط عكسي لأن قرص الكتابة لن يتمكن من مواكبة سرعة القراءة.
// سرّياً، يقول التدفق: "انتظر، انتظر! هذا كثير جداً!"
// ستبدأ البيانات بالتراكم على جانب القراءة من مخزن البيانات حيث
// تحاول الكتابة مواكبة تدفق البيانات الواردة.
inp.pipe(gzip).pipe(outputFile)
لهذا السبب، تعتبر آلية الضغط العكسي مهمة. إذا لم تكن هناك آلية ضغط عكسي، فستستهلك العملية ذاكرة النظام، مما يبطئ العمليات الأخرى بشكل فعال، ويحتكر جزءًا كبيرًا من نظامك حتى الانتهاء.
ينتج عن هذا بعض الأشياء:
- إبطاء جميع العمليات الحالية الأخرى
- جامع قمامة مرهق للغاية
- استنفاد الذاكرة
في الأمثلة التالية، سنزيل قيمة الإرجاع لوظيفة .write()
ونغيرها إلى true
، مما يُعطل فعلياً دعم الضغط العكسي في نواة Node.js. في أي إشارة إلى ثنائي 'modified'
، نتحدث عن تشغيل الثنائي node بدون سطر return ret;
، وبدلاً من ذلك مع return true;
المُستبدل.
سحب زائد على جامع القمامة
دعونا نلقي نظرة على مقياس مرجعي سريع. باستخدام نفس المثال من الأعلى، قمنا بتشغيل بعض التجارب الزمنية للحصول على متوسط زمني لكلا الثنائيين.
تجربة رقم | ثنائي `node` (مللي ثانية) | ثنائي `node` المُعدّل (مللي ثانية)
=================================================================
1 | 56924 | 55011
2 | 52686 | 55869
3 | 59479 | 54043
4 | 54473 | 55229
5 | 52933 | 59723
=================================================================
الوقت المتوسط: | 55299 | 55975
يستغرق كلاهما حوالي دقيقة للتشغيل، لذلك لا يوجد فرق كبير على الإطلاق، ولكن دعونا نلقي نظرة أقرب للتأكد من صحة شكوكنا. نستخدم أداة Linux dtrace
لتقييم ما يحدث مع جامع قمامة V8.
يشير وقت جامع القمامة (GC) المُقاس إلى فترات دورة كاملة لمسح واحد يقوم به جامع القمامة:
الوقت التقريبي (مللي ثانية) | جامع القمامة (مللي ثانية) | جامع القمامة المُعدّل (مللي ثانية)
=================================================
0 | 0 | 0
1 | 0 | 0
40 | 0 | 2
170 | 3 | 1
300 | 3 | 1
* * *
* * *
* * *
39000 | 6 | 26
42000 | 6 | 21
47000 | 5 | 32
50000 | 8 | 28
54000 | 6 | 35
بينما تبدأ العمليتان بنفس الشكل ويبدوان أنهما يعملان جامع القمامة بنفس المعدل، يصبح من الواضح أنه بعد بضع ثوانٍ مع وجود نظام ضغط عكسي يعمل بشكل صحيح، فإنه يوزع حمل جامع القمامة عبر فترات ثابتة تتراوح من 4-8 مللي ثانية حتى نهاية نقل البيانات.
ومع ذلك، عندما لا يكون نظام الضغط العكسي موجوداً، يبدأ جامع قمامة V8 بالتأخر. يقوم الثنائي العادي باستدعاء جامع القمامة حوالي 75 مرة في دقيقة واحدة، بينما يقوم الثنائي المُعدّل باستدعائه 36 مرة فقط.
هذا هو الدين البطيء والمتراكم تدريجياً من استخدام الذاكرة المتزايد. مع نقل البيانات، وبدون وجود نظام ضغط عكسي، يتم استخدام المزيد من الذاكرة لكل عملية نقل جزء.
كلما زادت الذاكرة المُخصصة، زاد ما يتعين على جامع القمامة التعامل معه في عملية مسح واحدة. كلما كبر المسح، زاد ما يحتاج جامع القمامة إلى تحديده لما يمكن تحريره، وسيستهلك البحث عن المؤشرات المنفصلة في مساحة ذاكرة أكبر المزيد من قوة الحوسبة.
نضوب الذاكرة
لتحديد استهلاك الذاكرة لكل ملف ثنائي، قمنا بتسجيل كل عملية باستخدام الأمر /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js
بشكل فردي.
هذه هي النتيجة على الملف الثنائي العادي:
Respecting the return value of .write()
=============================================
real 58.88
user 56.79
sys 8.79
87810048 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
19427 page reclaims
3134 page faults
0 swaps
5 block input operations
194 block output operations
0 messages sent
0 messages received
1 signals received
12 voluntary context switches
666037 involuntary context switches
تبين أن الحد الأقصى لحجم البايت الذي تشغله الذاكرة الظاهرية هو تقريباً 87.81 ميجابايت.
والآن بتغيير قيمة الإرجاع لوظيفة .write()
، نحصل على:
Without respecting the return value of .write():
==================================================
real 54.48
user 53.15
sys 7.43
1524965376 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
373617 page reclaims
3139 page faults
0 swaps
18 block input operations
199 block output operations
0 messages sent
0 messages received
1 signals received
25 voluntary context switches
629566 involuntary context switches
تبين أن الحد الأقصى لحجم البايت الذي تشغله الذاكرة الظاهرية هو تقريباً 1.52 جيجابايت.
بدون تدفقات لِتفويض ضغط الذاكرة، هناك فرق بمقدار عشرة أضعاف في مساحة الذاكرة المُخصصة - هامش فرق ضخم بين نفس العملية!
تُظهر هذه التجربة مدى كفاءة وآلية آلية ضغط الذاكرة في Node.js لنظام الحوسبة الخاص بك. الآن، دعونا نفصل كيفية عملها!
كيف تحل ضغط الظهر هذه المشكلات؟
توجد وظائف مختلفة لنقل البيانات من عملية إلى أخرى. في Node.js، توجد وظيفة مدمجة داخلية تسمى .pipe ()
. هناك حزم أخرى يمكنك استخدامها أيضًا! في النهاية، على مستوى هذه العملية الأساسي، لدينا مكونان منفصلان: مصدر البيانات والمستهلك.
عندما يتم استدعاء .pipe ()
من المصدر، فإنه يُشير إلى المستهلك بوجود بيانات قابلة للنقل. تساعد دالة الأنابيب في إعداد عمليات إغلاق ضغط الظهر المناسبة لمحفزات الأحداث.
في Node.js، يكون المصدر عبارة عن تدفق Readable
والمستهلك هو تدفق Writable
(يمكن تبادل كليهما مع تدفق Duplex أو Transform، ولكن هذا خارج نطاق هذا الدليل).
يمكن تحديد اللحظة التي يتم فيها تشغيل ضغط الظهر بدقة إلى القيمة المُرجعة لوظيفة .write ()
الخاصة بـ Writable
. تُحدد هذه القيمة المُرجعة بواسطة بعض الشروط، بالطبع.
في أي سيناريو تتجاوز فيه ذاكرة التخزين المؤقت للبيانات highwaterMark
أو تكون قائمة الانتظار للكتابة مشغولة حاليًا، فإن .write ()
ستُرجع false
.
عندما يتم إرجاع قيمة false
، يبدأ نظام ضغط الظهر في العمل. سيعيق تدفق Readable
الوارد من إرسال أي بيانات وينتظر حتى يصبح المستهلك جاهزًا مرة أخرى. بمجرد تفريغ ذاكرة التخزين المؤقت للبيانات، سيتم إصدار حدث 'drain'
واستئناف تدفق البيانات الواردة.
بمجرد الانتهاء من قائمة الانتظار، سيسمح ضغط الظهر بإرسال البيانات مرة أخرى. ستحرر المساحة في الذاكرة التي كانت تُستخدم نفسها وتُعد للوحدة التالية من البيانات.
يسمح هذا بفعالية باستخدام كمية ثابتة من الذاكرة في أي وقت مُحدد لوظيفة .pipe ()
. لن يكون هناك أي تسرب للذاكرة، ولن يكون هناك تخزين مؤقت لانهائي، ولن يتعين على جامع القمامة التعامل إلا مع منطقة واحدة في الذاكرة!
إذن، إذا كان ضغط الظهر مهمًا جدًا، فلماذا لم تسمع به (على الأرجح)؟ حسنًا، الإجابة بسيطة: Node.js يقوم بكل هذا تلقائيًا نيابة عنك.
هذا رائع! ولكن ليس رائعًا أيضًا عندما نحاول فهم كيفية تنفيذ تياراتنا المخصصة.
ملاحظة
في معظم الأجهزة، يوجد حجم بايت يُحدد متى تكون ذاكرة التخزين المؤقت ممتلئة (وهو ما يختلف بين الأجهزة المختلفة). يسمح لك Node.js بتعيين highWaterMark
المخصص الخاص بك، ولكن بشكل شائع، يتم تعيين الإعداد الافتراضي إلى 16 كيلوبايت (16384، أو 16 لتدفقات objectMode). في الحالات التي قد ترغب فيها في رفع هذه القيمة، افعل ذلك، ولكن افعل ذلك بحذر!
دورة حياة .pipe()
لفهم أفضل لضغط الظهر، إليك مخطط انسيابي لدورة حياة دفق Readable
يتم تمريره piped إلى دفق Writable
:
+===================+
x--> وظائف التمرير +--> src.pipe(dest) |
x تم إعدادها أثناء |===================|
x طريقة .pipe. | استدعاءات الأحداث |
+===============+ x |-------------------|
| بياناتك | x توجد خارج | .on('close', cb) |
+=======+=======+ x تدفق البيانات، ولكن | .on('data', cb) |
| x ربط الأحداث المهمة | .on('drain', cb) |
| x واستدعاءاتها | .on('unpipe', cb) |
+---------v---------+ x المقابلة. | .on('error', cb) |
| دفق قابل للقراءة +----+ | .on('finish', cb) |
+-^-------^-------^-+ | | .on('end', cb) |
^ | ^ | +-------------------+
| | | |
| ^ | |
^ ^ ^ | +-------------------+ +=================+
^ | ^ +----> دفق قابل للكتابة +---------> .write(chunk) |
| | | +-------------------+ +=======+=========+
| | | |
| ^ | +------------------v---------+
^ | +-> if (!chunk) | هل هذه القطعة كبيرة جدًا؟ |
^ | | انبعاث .end(); | هل قائمة الانتظار مشغولة؟ |
| | +-> else +-------+----------------+---+
| ^ | انبعاث .write(); | |
| ^ ^ +--v---+ +---v---+
| | ^-----------------------------------< لا | | نعم |
^ | +------+ +---v---+
| | |
| ^ انبعاث .pause(); +=================+ |
| ^---------------^-----------------------+ إرجاع false; <-----+---+
| +=================+ |
| |
^ عندما تكون قائمة الانتظار فارغة +============+ |
^------------^-----------------------< تخزين مؤقت | |
| |============| |
+> انبعاث .drain(); | ^الذاكرة التخزين المؤقتة^ | |
+> انبعاث .resume(); +------------+ |
| ^الذاكرة التخزين المؤقتة^ | |
+------------+ إضافة جزء إلى قائمة الانتظار |
| <---^---------------------<
+============+
ملاحظة
إذا كنت تقوم بإعداد خط أنابيب لسلسلة بعض الدفقات لمعالجة بياناتك، فمن المحتمل أنك ستقوم بتنفيذ دفق تحويل.
في هذه الحالة، ستدخل مخرجاتك من دفق Readable
في Transform
وسيتم تمريرها إلى Writable
.
Readable.pipe(Transformable).pipe(Writable)
سيتم تطبيق ضغط الظهر تلقائيًا، ولكن لاحظ أنه يمكن معالجة كل من highwaterMark
الواردة والصادرة لدفق Transform
وسوف يؤثر ذلك على نظام ضغط الظهر.
إرشادات الضغط العكسي
منذ إصدار Node.js v0.10، وفرت فئة Stream القدرة على تعديل سلوك .read()
أو .write()
باستخدام الإصدار ذي الشرطة السفلية من هاتين الوظيفتين (._read()
و ._write()
).
توجد إرشادات موثقة لتنفيذ تيارات قابلة للقراءة وتنفيذ تيارات قابلة للكتابة. سنفترض أنك قرأت هذه الإرشادات، وسيتناول القسم التالي المزيد من التفاصيل.
القواعد الواجب التقيد بها عند تنفيذ تيارات مخصصة
القاعدة الذهبية للتيارات هي دائمًا احترام الضغط العكسي. ما يُشكل أفضل الممارسات هو الممارسة غير المتناقضة. طالما كنت حريصًا على تجنب السلوكيات التي تتعارض مع دعم الضغط العكسي الداخلي، فيمكنك التأكد من أنك تتبع الممارسات الجيدة.
بشكل عام،
- لا تقم أبدًا بـ
.push()
إذا لم يُطلب منك ذلك. - لا تتصل أبدًا بـ
.write()
بعد أن تُرجع قيمة خاطئة، ولكن انتظر حدث "drain" بدلاً من ذلك. - تتغير تيارات بين إصدارات Node.js المختلفة، والمكتبة التي تستخدمها. كن حذرًا واختبر الأشياء.
ملاحظة
فيما يتعلق بالنقطة 3، تُعد حزمة مفيدة بشكل لا يُصدق لبناء تيارات المتصفح هي readable-stream
. كتب Rodd Vagg منشور مدونة رائع يصف فائدة هذه المكتبة. باختصار، توفر نوعًا من التدهور السلس الآلي للتيارات القابلة للقراءة، وتدعم الإصدارات القديمة من المتصفحات و Node.js.
قواعد خاصة بالتيارات القابلة للقراءة
حتى الآن، لقد ألقينا نظرة على كيفية تأثير .write()
على الضغط العكسي، وقد ركزنا كثيرًا على التيار القابل للكتابة. نظرًا لوظائف Node.js، تتدفق البيانات تقنيًا في اتجاه مجرى النهر من القابل للقراءة إلى القابل للكتابة. ومع ذلك، كما يمكننا أن نلاحظ في أي نقل للبيانات أو المواد أو الطاقة، فإن المصدر مهم بنفس القدر مثل الوجهة، والتيار القابل للقراءة ضروري لكيفية التعامل مع الضغط العكسي.
تعتمد هاتان العمليتان على بعضهما البعض للتواصل بشكل فعال، إذا تجاهل القابل للقراءة عندما يطلب منه التيار القابل للكتابة التوقف عن إرسال البيانات، فقد يكون ذلك مُشكلاً بنفس القدر عندما تكون قيمة الإرجاع لـ .write()
غير صحيحة.
لذلك، بالإضافة إلى احترام قيمة الإرجاع لـ .write()
، يجب علينا أيضًا احترام قيمة الإرجاع لـ .push()
المستخدمة في طريقة ._read()
. إذا أعادت .push()
قيمة خاطئة، فسيتوقف التيار عن القراءة من المصدر. خلاف ذلك، سيستمر دون توقف.
فيما يلي مثال على الممارسة السيئة باستخدام .push()
:
// هذا مُشكِل لأنه يتجاهل تمامًا قيمة الإرجاع من push
// والتي قد تكون إشارة للضغط العكسي من تيار الوجهة!
class MyReadable extends Readable {
_read(size) {
let chunk
while (null == (chunk = getNextChunk())) {
this.push(chunk)
}
}
}
بالإضافة إلى ذلك، من خارج التيار المخصص، هناك مُزالق لتجاهل الضغط العكسي. في هذا المثال المضاد للممارسة الجيدة، تُجبر شفرة التطبيق البيانات على المرور عبرها كلما كانت متاحة (كما يشير حدث 'data'
):
// هذا يتجاهل آليات الضغط العكسي التي وضعتها Node.js،
// ويدفع البيانات دون قيد أو شرط، بغض النظر عما إذا كان
// تيار الوجهة جاهزًا لذلك أم لا.
readable.on('data', data => writable.write(data))
فيما يلي مثال على استخدام .push()
مع تيار قابل للقراءة.
const { Readable } = require('node:stream')
// إنشاء تيار قابل للقراءة مخصص
const myReadableStream = new Readable({
objectMode: true,
read(size) {
// دفع بعض البيانات إلى التيار
this.push({ message: 'Hello, world!' })
this.push(null) // وضع علامة على نهاية التيار
},
})
// استهلاك التيار
myReadableStream.on('data', chunk => {
console.log(chunk)
})
// الإخراج:
// { message: 'Hello, world!' }
قواعد خاصة بالتيارات القابلة للكتابة
تذكر أنَّ .write()
قد تُرجع قيمة صحيحة أو خاطئة بناءً على بعض الشروط. لحسن الحظ، عند إنشاء تيار قابل للكتابة خاص بنا، ستتعامل آلة حالة التيار مع عمليات الاستدعاء العائدة لدينا وتحدد متى تتعامل مع ضغط الظهر (backpressure) وتحسين تدفق البيانات لنا. ومع ذلك، عندما نرغب في استخدام تيار قابل للكتابة مباشرة، يجب علينا احترام قيمة الإرجاع .write()
والانتباه عن كثب إلى هذه الشروط:
- إذا كانت قائمة الانتظار للكتابة مشغولة، فإنَّ
.write()
ستُرجع قيمة خاطئة. - إذا كانت قطعة البيانات كبيرة جدًا، فإنَّ
.write()
ستُرجع قيمة خاطئة (يُشار إلى الحد بواسطة المتغير،highWaterMark
).
في هذا المثال، نقوم بإنشاء تيار قابل للقراءة مخصص يدفع كائنًا واحدًا إلى التيار باستخدام .push()
. يتم استدعاء الأسلوب ._read()
عندما يكون التيار جاهزًا لاستهلاك البيانات، وفي هذه الحالة، نقوم على الفور بدفع بعض البيانات إلى التيار ووضع علامة على نهاية التيار عن طريق دفع null
.
const stream = require('stream')
class MyReadable extends stream.Readable {
constructor() {
super()
}
_read() {
const data = { message: 'Hello, world!' }
this.push(data)
this.push(null)
}
}
const readableStream = new MyReadable()
readableStream.pipe(process.stdout)
ثم نستهلك التيار من خلال الاستماع إلى حدث "data" وتسجيل كل قطعة بيانات يتم دفعها إلى التيار. في هذه الحالة، نقوم فقط بدفع قطعة بيانات واحدة إلى التيار، لذلك لا نرى سوى رسالة سجل واحدة.
قواعد خاصة بالتيارات القابلة للكتابة
تذكر أنَّ .write()
قد تُرجع قيمة صحيحة أو خاطئة بناءً على بعض الشروط. لحسن الحظ، عند إنشاء تيار قابل للكتابة خاص بنا، ستتعامل آلة حالة التيار مع عمليات الاستدعاء العائدة لدينا وتحدد متى تتعامل مع ضغط الظهر (backpressure) وتحسين تدفق البيانات لنا.
ومع ذلك، عندما نرغب في استخدام تيار قابل للكتابة مباشرة، يجب علينا احترام قيمة الإرجاع .write()
والانتباه عن كثب إلى هذه الشروط:
- إذا كانت قائمة الانتظار للكتابة مشغولة، فإنَّ
.write()
ستُرجع قيمة خاطئة. - إذا كانت قطعة البيانات كبيرة جدًا، فإنَّ
.write()
ستُرجع قيمة خاطئة (يُشار إلى الحد بواسطة المتغير،highWaterMark
).
class MyWritable extends Writable {
// هذا التيار القابل للكتابة غير صالح بسبب الطبيعة غير المتزامنة لاستدعاءات JavaScript العائدة.
// بدون بيان إرجاع لكل استدعاء عائد قبل الأخير،
// هناك فرصة كبيرة لاستدعاء العديد من الاستدعاءات العائدة.
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) callback()
else if (chunk.toString().indexOf('b') >= 0) callback()
callback()
}
}
هناك أيضًا بعض الأشياء التي يجب الانتباه إليها عند تنفيذ ._writev()
. ترتبط الدالة بـ .cork()
، ولكن هناك خطأ شائع عند الكتابة:
// استخدام .uncork() مرتين هنا يُحدث مُكالمتين على طبقة C++، مما يجعل تقنية cork/uncork عديمة الفائدة.
ws.cork()
ws.write('hello ')
ws.write('world ')
ws.uncork()
ws.cork()
ws.write('from ')
ws.write('Matteo')
ws.uncork()
// الطريقة الصحيحة لكتابة هذا هي استخدام process.nextTick()، الذي يُطلق في دورة الحدث التالية.
ws.cork()
ws.write('hello ')
ws.write('world ')
process.nextTick(doUncork, ws)
ws.cork()
ws.write('from ')
ws.write('Matteo')
process.nextTick(doUncork, ws)
// كدالة عامة.
function doUncork(stream) {
stream.uncork()
}
يمكن استدعاء .cork()
عدة مرات حسب رغبتنا، كل ما نحتاجه هو الحرص على استدعاء .uncork()
نفس العدد من المرات لجعله يتدفق مرة أخرى.
الخلاصة
تُعدّ الدفقات (Streams) وحدةً مُستخدمةً بكثرة في Node.js. وهي مهمةٌ للهيكل الداخلي، وللمطورين، لتوسيع نطاق التوصيل عبر نظام بيئة وحدات Node.js.
آمل أن تتمكن الآن من استكشاف الأخطاء وإصلاحها وكتابة التعليمات البرمجية الخاصة بك بأمان لتيارات Writable
و Readable
مع مراعاة ضغط الظهر، ومشاركة معرفتك مع الزملاء والأصدقاء.
تأكد من قراءة المزيد حول Stream
لوظائف واجهة برمجة التطبيقات الأخرى للمساعدة في تحسين وإطلاق قدرات البث الخاصة بك عند إنشاء تطبيق باستخدام Node.js.