الضغط الخلفي في التدفقات
هناك مشكلة عامة تحدث أثناء معالجة البيانات تسمى الضغط الخلفي وتصف تراكم البيانات خلف المخزن المؤقت أثناء نقل البيانات. عندما يكون للطرف المتلقي للنقل عمليات معقدة، أو يكون أبطأ لأي سبب كان، هناك ميل لتراكم البيانات من المصدر الوارد، مثل الانسداد.
لحل هذه المشكلة، يجب أن يكون هناك نظام تفويض معمول به لضمان التدفق السلس للبيانات من مصدر إلى آخر. قامت مجتمعات مختلفة بحل هذه المشكلة بشكل فريد لبرامجها، وتعتبر أنابيب Unix ومآخذ توصيل TCP أمثلة جيدة على ذلك، وغالبًا ما يشار إليها باسم التحكم في التدفق. في Node.js، كانت التدفقات هي الحل المعتمد.
الغرض من هذا الدليل هو تقديم مزيد من التفاصيل حول ماهية الضغط الخلفي، وكيف تعالج التدفقات ذلك تحديدًا في التعليمات البرمجية المصدر لـ Node.js. سيقدم الجزء الثاني من الدليل أفضل الممارسات المقترحة لضمان أن تكون التعليمات البرمجية لتطبيقك آمنة ومحسّنة عند تنفيذ التدفقات.
نفترض الإلمام قليلاً بالتعريف العام لـ backpressure
و Buffer
و EventEmitters
في Node.js، بالإضافة إلى بعض الخبرة في Stream
. إذا لم تكن قد قرأت هذه المستندات، فليس من السيئ إلقاء نظرة على وثائق واجهة برمجة التطبيقات أولاً، لأنها ستساعد في توسيع فهمك أثناء قراءة هذا الدليل.
مشكلة معالجة البيانات
في نظام الكمبيوتر، يتم نقل البيانات من عملية إلى أخرى عبر الأنابيب والمآخذ والإشارات. في Node.js، نجد آلية مماثلة تسمى Stream
. التدفقات رائعة! إنها تفعل الكثير لـ Node.js ويستخدم كل جزء تقريبًا من قاعدة التعليمات البرمجية الداخلية هذه الوحدة. بصفتك مطورًا، فأنت أكثر من مشجع على استخدامها أيضًا!
const readline = require('node:readline');
const rl = readline.createInterface({
output: process.stdout,
input: process.stdin,
});
rl.question('لماذا يجب عليك استخدام التدفقات؟ ', answer => {
console.log(`ربما يكون ${answer}، ربما يكون ذلك لأنها رائعة!`);
});
rl.close();
هناك مثال جيد على سبب كون آلية الضغط الخلفي المنفذة من خلال التدفقات تحسينًا رائعًا يمكن إثباته من خلال مقارنة أدوات النظام الداخلية من تطبيق Stream الخاص بـ Node.js.
في أحد السيناريوهات، سنأخذ ملفًا كبيرًا (حوالي -9 جيجابايت) ونضغطه باستخدام أداة zip(1)
المألوفة.
zip The.Matrix.1080p.mkv
في حين أن ذلك سيستغرق بضع دقائق حتى يكتمل، في غلاف آخر قد نقوم بتشغيل برنامج نصي يأخذ وحدة zlib
الخاصة بـ Node.js، والتي تلتف حول أداة ضغط أخرى، gzip(1)
.
const gzip = require('node:zlib').createGzip();
const fs = require('node:fs');
const inp = fs.createReadStream('The.Matrix.1080p.mkv');
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz');
inp.pipe(gzip).pipe(out);
لاختبار النتائج، حاول فتح كل ملف مضغوط. سيبلغك الملف المضغوط بواسطة أداة zip(1)
بأن الملف تالف، في حين أن الضغط الذي تم الانتهاء منه بواسطة Stream سيقوم بفك الضغط دون خطأ.
ملاحظة
في هذا المثال، نستخدم .pipe()
للحصول على مصدر البيانات من طرف إلى آخر. ومع ذلك، لاحظ أنه لا توجد معالجات أخطاء مناسبة مرفقة. إذا فشل استقبال جزء من البيانات بشكل صحيح، فلن يتم تدمير مصدر Readable أو دفق gzip
. pump
هي أداة مساعدة تعمل على تدمير جميع التدفقات في خط الأنابيب بشكل صحيح إذا فشل أحدها أو تم إغلاقه، وهو أمر لا بد منه في هذه الحالة!
pump
ضروري فقط للإصدار Node.js 8.x أو الإصدارات الأقدم، أما بالنسبة للإصدار Node.js 10.x أو الإصدارات الأحدث، فقد تم تقديم pipeline
ليحل محل pump
. هذه هي طريقة الوحدة النمطية للأنابيب بين التدفقات التي ترسل الأخطاء وتنظيفها بشكل صحيح وتقديم رد اتصال عند اكتمال خط الأنابيب.
إليك مثال لاستخدام خط الأنابيب:
const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
// استخدم واجهة برمجة تطبيقات خط الأنابيب لربط سلسلة من التدفقات بسهولة
// معًا والحصول على إشعار عند اكتمال خط الأنابيب بالكامل.
// خط أنابيب لضغط ملف فيديو ضخم محتمل بكفاءة:
pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
err => {
if (err) {
console.error('فشل خط الأنابيب', err);
} else {
console.log('نجح خط الأنابيب');
}
}
);
يمكنك أيضًا استخدام وحدة stream/promises
لاستخدام خط الأنابيب مع async / await
:
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
try {
await pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz')
);
console.log('نجح خط الأنابيب');
} catch (err) {
console.error('فشل خط الأنابيب', err);
}
}
الكثير من البيانات، بسرعة كبيرة
هناك حالات قد تقدم فيها دفق قابل للقراءة
البيانات إلى قابل للكتابة
بسرعة كبيرة جدًا - أكثر بكثير مما يمكن للمستهلك التعامل معه!
عندما يحدث ذلك، سيبدأ المستهلك في وضع جميع أجزاء البيانات في قائمة انتظار للاستهلاك لاحقًا. سيصبح صف كتابة أطول وأطول، ولهذا السبب يجب الاحتفاظ بالمزيد من البيانات في الذاكرة حتى تكتمل العملية بأكملها.
الكتابة على القرص أبطأ بكثير من القراءة من القرص، وبالتالي، عندما نحاول ضغط ملف وكتابته على القرص الصلب، سيحدث ضغط خلفي لأن قرص الكتابة لن يكون قادرًا على مواكبة السرعة من القراءة.
// سرًا، يقول الدفق: "مهلاً، مهلاً! انتظر، هذا كثير جدًا!"
// ستبدأ البيانات في التراكم على جانب القراءة من مخزن البيانات المؤقت مثلما
// تحاول الكتابة مواكبة تدفق البيانات الواردة.
inp.pipe(gzip).pipe(outputFile);
هذا هو السبب في أهمية وجود آلية للضغط الخلفي. إذا لم يكن هناك نظام للضغط الخلفي، فإن العملية ستستهلك ذاكرة النظام الخاص بك، مما يبطئ بشكل فعال العمليات الأخرى، ويحتكر جزءًا كبيرًا من نظامك حتى الانتهاء.
ينتج عن هذا عدة أشياء:
- إبطاء جميع العمليات الحالية الأخرى
- جامع قمامة مرهق للغاية
- استنفاد الذاكرة
في الأمثلة التالية، سنخرج قيمة الإرجاع لوظيفة .write()
ونغيرها إلى true
، مما يعطل بشكل فعال دعم الضغط الخلفي في Node.js core. في أي إشارة إلى 'modified'
binary، فإننا نتحدث عن تشغيل الثنائي للعقدة بدون سطر return ret;
، وبدلاً من ذلك مع استبدال return true;
.
سحب زائد على جمع القمامة
دعونا نلقي نظرة على معيار سريع. باستخدام نفس المثال من الأعلى، أجرينا عددًا قليلًا من التجارب الزمنية للحصول على متوسط الوقت لكلا الثنائيين.
trial (#) | `node` binary (ms) | modified `node` binary (ms)
=================================================================
1 | 56924 | 55011
2 | 52686 | 55869
3 | 59479 | 54043
4 | 54473 | 55229
5 | 52933 | 59723
=================================================================
average time: | 55299 | 55975
يستغرق كلاهما حوالي دقيقة للتشغيل، لذلك لا يوجد فرق كبير على الإطلاق، ولكن دعونا نلقي نظرة فاحصة للتأكد مما إذا كانت شكوكنا صحيحة. نستخدم أداة Linux dtrace
لتقييم ما يحدث مع جامع القمامة V8.
يشير الوقت المقاس GC (جامع القمامة) إلى فترات الدورة الكاملة لمسح واحد يتم إجراؤه بواسطة جامع القمامة:
approx. time (ms) | GC (ms) | modified GC (ms)
=================================================
0 | 0 | 0
1 | 0 | 0
40 | 0 | 2
170 | 3 | 1
300 | 3 | 1
* * *
* * *
* * *
39000 | 6 | 26
42000 | 6 | 21
47000 | 5 | 32
50000 | 8 | 28
54000 | 6 | 35
بينما تبدأ العمليتان بنفس الطريقة ويبدو أنهما تعملان على GC بنفس المعدل، يصبح من الواضح أنه بعد بضع ثوانٍ مع وجود نظام ضغط خلفي يعمل بشكل صحيح، فإنه ينشر حمل GC عبر فترات متسقة من 4-8 مللي ثانية حتى نهاية نقل البيانات.
ومع ذلك، عندما لا يكون هناك نظام ضغط خلفي في مكانه، يبدأ جمع القمامة V8 في الانسحاب. يستدعي الثنائي العادي عمليات إطلاق GC ما يقرب من 75 مرة في الدقيقة، في حين أن الثنائي المعدل يطلق 36 مرة فقط.
هذا هو الدين البطيء والتدريجي المتراكم من زيادة استخدام الذاكرة. أثناء نقل البيانات، بدون وجود نظام ضغط خلفي في مكانه، يتم استخدام المزيد من الذاكرة لكل نقل جزء.
كلما تم تخصيص المزيد من الذاكرة، زادت مسؤولية GC في مسح واحد. كلما كان المسح أكبر، زادت حاجة GC إلى تحديد ما يمكن تحريره، وسوف يستهلك فحص المؤشرات المنفصلة في مساحة ذاكرة أكبر المزيد من قوة الحوسبة.
استنفاد الذاكرة
لتحديد استهلاك الذاكرة لكل ملف تنفيذي، قمنا بتوقيت كل عملية باستخدام /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js
بشكل فردي.
هذا هو الناتج على الملف التنفيذي العادي:
احترام القيمة المرجعة لـ .write()
=============================================
real 58.88
user 56.79
sys 8.79
87810048 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
19427 page reclaims
3134 page faults
0 swaps
5 block input operations
194 block output operations
0 messages sent
0 messages received
1 signals received
12 voluntary context switches
666037 involuntary context switches
اتضح أن الحد الأقصى لحجم البايت الذي تشغله الذاكرة الظاهرية يبلغ حوالي 87.81 ميجابايت.
والآن تغيير القيمة المرجعية للدالة .write()
، نحصل على:
عدم احترام القيمة المرجعية لـ .write():
==================================================
real 54.48
user 53.15
sys 7.43
1524965376 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
373617 page reclaims
3139 page faults
0 swaps
18 block input operations
199 block output operations
0 messages sent
0 messages received
1 signals received
25 voluntary context switches
629566 involuntary context switches
اتضح أن الحد الأقصى لحجم البايت الذي تشغله الذاكرة الظاهرية يبلغ حوالي 1.52 جيجابايت.
بدون وجود تدفقات لتفويض الضغط الخلفي، يوجد ترتيب من حيث الحجم أكبر من مساحة الذاكرة التي يتم تخصيصها - هامش اختلاف كبير بين نفس العملية!
توضح هذه التجربة مدى تحسين وفعالية آلية الضغط الخلفي في Node.js من حيث التكلفة لنظام الحوسبة الخاص بك. الآن، دعنا نحلل كيفية عملها!
كيف تحل الضغط الخلفي هذه المشاكل؟
هناك وظائف مختلفة لنقل البيانات من عملية إلى أخرى. في Node.js، توجد وظيفة مدمجة داخلية تسمى .pipe()
. توجد حزم أخرى يمكنك استخدامها أيضًا! ومع ذلك، في المستوى الأساسي لهذه العملية، لدينا مكونان منفصلان: مصدر البيانات والمستهلك.
عندما يتم استدعاء .pipe()
من المصدر، فإنه يشير إلى المستهلك بوجود بيانات سيتم نقلها. تساعد وظيفة الأنابيب في إعداد إغلاقات الضغط الخلفي المناسبة لمشغلات الأحداث.
في Node.js، المصدر هو دفق Readable
والمستهلك هو دفق Writable
(يمكن استبدال كليهما بدفق Duplex أو Transform، ولكن هذا خارج نطاق هذا الدليل).
يمكن تضييق اللحظة التي يتم فيها تشغيل الضغط الخلفي بالضبط إلى القيمة المرجعة لوظيفة .write()
الخاصة بـ Writable
. يتم تحديد هذه القيمة المرجعة من خلال بعض الشروط، بالطبع.
في أي سيناريو تجاوز فيه مخزن البيانات المؤقت highwaterMark
أو كان صف الكتابة مشغولاً حاليًا، ستعيد .write()
قيمة false
.
عند إرجاع قيمة false
، يبدأ نظام الضغط الخلفي. سيؤدي ذلك إلى إيقاف دفق Readable
الوارد مؤقتًا من إرسال أي بيانات والانتظار حتى يصبح المستهلك جاهزًا مرة أخرى. بمجرد إفراغ مخزن البيانات المؤقت، سيتم إصدار حدث 'drain'
واستئناف تدفق البيانات الواردة.
بمجرد الانتهاء من قائمة الانتظار، سيسمح الضغط الخلفي بإرسال البيانات مرة أخرى. ستتحرر المساحة الموجودة في الذاكرة والتي كانت قيد الاستخدام وتستعد للدُفعة التالية من البيانات.
يسمح هذا بشكل فعال باستخدام كمية ثابتة من الذاكرة في أي وقت لوظيفة .pipe()
. لن يكون هناك تسرب للذاكرة، ولا تخزين مؤقت لانهائي، وسيتعين على جامع البيانات المهملة التعامل مع منطقة واحدة فقط في الذاكرة!
إذن، إذا كان الضغط الخلفي مهمًا جدًا، فلماذا (على الأرجح) لم تسمع به؟ حسنًا، الإجابة بسيطة: Node.js يفعل كل هذا تلقائيًا لك.
هذا رائع جدا! ولكنه ليس رائعًا أيضًا عندما نحاول فهم كيفية تنفيذ تدفقاتنا المخصصة.
NOTE
في معظم الأجهزة، يوجد حجم بالبايت يحدد متى يكون المخزن المؤقت ممتلئًا (والذي سيختلف عبر الأجهزة المختلفة). يسمح لك Node.js بتعيين highWaterMark
المخصص الخاص بك، ولكن عادةً ما يتم تعيين الإعداد الافتراضي على 16 كيلوبايت (16384 أو 16 لتدفقات objectMode). في الحالات التي قد ترغب فيها في رفع هذه القيمة، تفضل بذلك، ولكن افعل ذلك بحذر!
دورة حياة .pipe()
لتحقيق فهم أفضل للضغط الخلفي، إليك مخطط انسيابي لدورة حياة دفق Readable
يتم توجيهه إلى دفق Writable
:
+===================+
x--> وظائف التوجيه +--> src.pipe(dest) |
x يتم إعدادها أثناء |===================|
x طريقة .pipe. | استدعاءات الأحداث |
+===============+ x |-------------------|
| بياناتك | x إنها موجودة خارج | .on('close', cb) |
+=======+=======+ x تدفق البيانات، ولكن | .on('data', cb) |
| x بشكل مهم يتم إرفاق | .on('drain', cb) |
| x الأحداث، واستدعاءاتها | .on('unpipe', cb) |
+---------v---------+ x ذات الصلة. | .on('error', cb) |
| دفق Readable +----+ | .on('finish', cb) |
+-^-------^-------^-+ | | .on('end', cb) |
^ | ^ | +-------------------+
| | | |
| ^ | |
^ ^ ^ | +-------------------+ +=================+
^ | ^ +----> دفق Writable +---------> .write(chunk) |
| | | +-------------------+ +=======+=========+
| ^ | |
| ^ | +------------------v---------+
^ | +-> if (!chunk) | هل هذه القطعة كبيرة جدا؟ |
^ | | emit .end(); | هل قائمة الانتظار مشغولة؟ |
| | +-> else +-------+----------------+---+
| ^ | emit .write(); | |
| ^ ^ +--v---+ +---v---+
| | ^-----------------------------------< لا | | نعم |
^ | +------+ +---v---+
^ | |
| ^ emit .pause(); +=================+ |
| ^---------------^-----------------------+ إرجاع false; <-----+---+
| +=================+ |
| |
^ عندما تكون قائمة الانتظار فارغة +============+ |
^------------^-----------------------< التخزين المؤقت | |
| |============| |
+> emit .drain(); | ^تخزين مؤقت^ | |
+> emit .resume(); +------------+ |
| ^تخزين مؤقت^ | |
+------------+ إضافة قطعة إلى قائمة الانتظار |
| <---^---------------------<
+============+
ملاحظة
إذا كنت تقوم بإعداد مسار لسلسلة عدد قليل من التدفقات لمعالجة بياناتك، فمن المحتمل أن تقوم بتنفيذ دفق التحويل.
في هذه الحالة، سيدخل الإخراج من دفق Readable
الخاص بك في Transform
وسيتم توجيهه إلى Writable
.
Readable.pipe(Transformable).pipe(Writable);
سيتم تطبيق الضغط الخلفي تلقائيًا، ولكن لاحظ أن كلاً من highwaterMark
الواردة والصادرة لدفق Transform
قد يتم التلاعب بها وستؤثر على نظام الضغط الخلفي.
إرشادات الضغط الخلفي
منذ Node.js v0.10، قدمت فئة Stream القدرة على تعديل سلوك .read()
أو .write()
باستخدام الإصدار الذي يبدأ بعلامة الشرطة السفلية لهذه الوظائف (._read()
و ._write()
على التوالي).
هناك إرشادات موثقة لتنفيذ تدفقات قابلة للقراءة وتنفيذ تدفقات قابلة للكتابة. سنفترض أنك قرأت هذه الإرشادات، وسيتعمق القسم التالي قليلاً.
القواعد التي يجب الالتزام بها عند تنفيذ تدفقات مخصصة
القاعدة الذهبية للتدفقات هي احترام الضغط الخلفي دائمًا. ما يشكل أفضل الممارسات هو الممارسة غير المتناقضة. طالما أنك حريص على تجنب السلوكيات التي تتعارض مع دعم الضغط الخلفي الداخلي، يمكنك التأكد من أنك تتبع ممارسة جيدة.
بشكل عام،
- لا تستخدم
.push()
أبدًا إذا لم يُطلب منك ذلك. - لا تستدعي
.write()
أبدًا بعد أن ترجع خطأ ولكن انتظر 'drain' بدلاً من ذلك. - تتغير التدفقات بين إصدارات Node.js المختلفة والمكتبة التي تستخدمها. كن حذرًا واختبر الأشياء.
NOTE
فيما يتعلق بالنقطة 3، فإن الحزمة المفيدة بشكل لا يصدق لبناء تدفقات المتصفح هي readable-stream
. كتب Rodd Vagg تدوينة رائعة تصف فائدة هذه المكتبة. باختصار، فهي توفر نوعًا من التدهور التدريجي التلقائي للتدفقات القابلة للقراءة، وتدعم الإصدارات القديمة من المتصفحات و Node.js.
القواعد الخاصة بالتدفقات القابلة للقراءة
حتى الآن، ألقينا نظرة على كيفية تأثير .write()
على الضغط الخلفي وركزنا كثيرًا على التدفق القابل للكتابة. نظرًا لوظائف Node.js، فإن البيانات تتدفق تقنيًا في اتجاه مجرى النهر من القابل للقراءة إلى القابل للكتابة. ومع ذلك، كما يمكننا أن نلاحظ في أي نقل للبيانات أو المادة أو الطاقة، فإن المصدر لا يقل أهمية عن الوجهة، والتدفق القابل للقراءة ضروري لكيفية التعامل مع الضغط الخلفي.
تعتمد هاتان العمليتان على بعضهما البعض للتواصل بفعالية، وإذا تجاهل القابل للقراءة متى يطلب منه التدفق القابل للكتابة التوقف عن إرسال البيانات، فقد يكون ذلك إشكاليًا تمامًا كما هو الحال عندما تكون القيمة المرجعة لـ .write()
غير صحيحة.
لذا، بالإضافة إلى احترام القيمة المرجعة لـ .write()
، يجب علينا أيضًا احترام القيمة المرجعية لـ .push()
المستخدمة في طريقة ._read()
. إذا أرجعت .push()
قيمة خاطئة، فسيتوقف التدفق عن القراءة من المصدر. وإلا، فسيستمر دون توقف.
فيما يلي مثال على الممارسة السيئة باستخدام .push()
:
// هذه مشكلة لأنها تتجاهل تمامًا القيمة المرجعية من push
// والتي قد تكون إشارة للضغط الخلفي من تدفق الوجهة!
class MyReadable extends Readable {
_read(size) {
let chunk;
while (null == (chunk = getNextChunk())) {
this.push(chunk);
}
}
}
بالإضافة إلى ذلك، من خارج التدفق المخصص، هناك مخاطر من تجاهل الضغط الخلفي. في هذا المثال المضاد للممارسة الجيدة، يجبر كود التطبيق البيانات على المرور كلما كانت متاحة (يشار إليها بواسطة حدث 'data'
) :
// هذا يتجاهل آليات الضغط الخلفي التي وضعتها Node.js،
// ويدفع البيانات بشكل غير مشروط، بغض النظر عما إذا كان
// تدفق الوجهة جاهزًا لها أم لا.
readable.on('data', data => writable.write(data));
فيما يلي مثال على استخدام .push()
مع تدفق قابل للقراءة.
const { Readable } = require('node:stream');
// قم بإنشاء تدفق قابل للقراءة مخصص
const myReadableStream = new Readable({
objectMode: true,
read(size) {
// ادفع بعض البيانات إلى التدفق
this.push({ message: 'Hello, world!' });
this.push(null); // ضع علامة على نهاية التدفق
},
});
// استهلك التدفق
myReadableStream.on('data', chunk => {
console.log(chunk);
});
// الإخراج:
// { message: 'Hello, world!' }
قواعد خاصة بتدفقات الكتابة (Writable Streams)
تذكر أن .write()
قد تعيد true أو false اعتمادًا على بعض الشروط. لحسن حظنا، عند بناء تدفق كتابة خاص بنا، ستتعامل آلة حالة التدفق مع ردود الاتصال الخاصة بنا وتحدد متى يتم التعامل مع الضغط الخلفي وتحسين تدفق البيانات لنا. ومع ذلك، عندما نريد استخدام تدفق كتابة مباشرة، يجب علينا احترام القيمة المرجعة .write()
وإيلاء اهتمام وثيق لهذه الشروط:
- إذا كان طابور الكتابة مشغولاً، فسوف تعيد
.write()
قيمة false. - إذا كانت كتلة البيانات كبيرة جدًا، فسوف تعيد
.write()
قيمة false (يشار إلى الحد بالمتغير highWaterMark).
في هذا المثال، نقوم بإنشاء تدفق قراءة مخصص يدفع كائنًا واحدًا إلى التدفق باستخدام .push()
. يتم استدعاء الطريقة ._read()
عندما يكون التدفق جاهزًا لاستهلاك البيانات، وفي هذه الحالة، نقوم على الفور بدفع بعض البيانات إلى التدفق ونحدد نهاية التدفق عن طريق دفع null
.
const stream = require('stream');
class MyReadable extends stream.Readable {
constructor() {
super();
}
_read() {
const data = { message: 'Hello, world!' };
this.push(data);
this.push(null);
}
}
const readableStream = new MyReadable();
readableStream.pipe(process.stdout);
ثم نستهلك التدفق من خلال الاستماع إلى حدث 'data' وتسجيل كل كتلة بيانات يتم دفعها إلى التدفق. في هذه الحالة، نقوم فقط بدفع كتلة بيانات واحدة إلى التدفق، لذلك نرى رسالة سجل واحدة فقط.
قواعد خاصة بتدفقات الكتابة (Writable Streams)
تذكر أن .write()
قد تعيد true أو false اعتمادًا على بعض الشروط. لحسن حظنا، عند بناء تدفق كتابة خاص بنا، ستتعامل آلة حالة التدفق مع ردود الاتصال الخاصة بنا وتحدد متى يتم التعامل مع الضغط الخلفي وتحسين تدفق البيانات لنا.
ومع ذلك، عندما نريد استخدام تدفق كتابة مباشرة، يجب علينا احترام القيمة المرجعة .write()
وإيلاء اهتمام وثيق لهذه الشروط:
- إذا كان طابور الكتابة مشغولاً، فسوف تعيد
.write()
قيمة false. - إذا كانت كتلة البيانات كبيرة جدًا، فسوف تعيد
.write()
قيمة false (يشار إلى الحد بالمتغير highWaterMark).
class MyWritable extends Writable {
// This writable is invalid because of the async nature of JavaScript callbacks.
// Without a return statement for each callback prior to the last,
// there is a great chance multiple callbacks will be called.
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) callback();
else if (chunk.toString().indexOf('b') >= 0) callback();
callback();
}
}
هناك أيضًا بعض الأشياء التي يجب الانتباه إليها عند تنفيذ ._writev()
. تقترن الدالة بـ .cork()
، ولكن هناك خطأ شائع عند الكتابة:
// Using .uncork() twice here makes two calls on the C++ layer, rendering the
// cork/uncork technique useless.
ws.cork();
ws.write('hello ');
ws.write('world ');
ws.uncork();
ws.cork();
ws.write('from ');
ws.write('Matteo');
ws.uncork();
// The correct way to write this is to utilize process.nextTick(), which fires
// on the next event loop.
ws.cork();
ws.write('hello ');
ws.write('world ');
process.nextTick(doUncork, ws);
ws.cork();
ws.write('from ');
ws.write('Matteo');
process.nextTick(doUncork, ws);
// As a global function.
function doUncork(stream) {
stream.uncork();
}
يمكن استدعاء .cork()
بالعدد الذي نريده من المرات، ولكن يجب أن نكون حريصين على استدعاء .uncork()
بنفس عدد المرات لجعله يتدفق مرة أخرى.
خاتمة
تُعد التدفقات وحدة مستخدمة بشكل متكرر في Node.js. وهي مهمة للهيكل الداخلي، وبالنسبة للمطورين، للتوسع والاتصال عبر النظام البيئي لوحدات Node.js النمطية.
نأمل أن تكون الآن قادرًا على استكشاف الأخطاء وإصلاحها وكتابة تعليمات برمجية آمنة لتدفقات Writable
و Readable
الخاصة بك مع مراعاة الضغط الخلفي، ومشاركة معرفتك مع الزملاء والأصدقاء.
تأكد من قراءة المزيد حول Stream
لوظائف واجهة برمجة التطبيقات الأخرى للمساعدة في تحسين وإطلاق قدرات البث الخاصة بك عند إنشاء تطبيق باستخدام Node.js.