تدفق
[مستقر: 2 - مستقر]
مستقر: 2 استقرار: 2 - مستقر
رمز المصدر: lib/stream.js
التدفق هو واجهة مجردة للعمل مع البيانات المتدفقة في Node.js. توفر وحدة node:stream
واجهة برمجة تطبيقات لتنفيذ واجهة التدفق.
هناك العديد من كائنات التدفق التي توفرها Node.js. على سبيل المثال، الطلب إلى خادم HTTP و process.stdout
كلاهما مثيلات لتدفق.
يمكن أن تكون التدفقات قابلة للقراءة أو الكتابة أو كليهما. جميع التدفقات هي مثيلات لـ EventEmitter
.
للوصول إلى وحدة node:stream
:
const stream = require('node:stream')
وحدة node:stream
مفيدة لإنشاء أنواع جديدة من مثيلات التدفق. عادةً لا يكون من الضروري استخدام وحدة node:stream
لاستهلاك التدفقات.
تنظيم هذه الوثيقة
تحتوي هذه الوثيقة على قسمين رئيسيين وقسم ثالث للملاحظات. يشرح القسم الأول كيفية استخدام التدفقات الموجودة داخل تطبيق. يشرح القسم الثاني كيفية إنشاء أنواع جديدة من التدفقات.
أنواع التدفقات
هناك أربعة أنواع أساسية للتدفقات داخل Node.js:
Writable
: تدفقات يمكن كتابة البيانات إليها (على سبيل المثال،fs.createWriteStream()
).Readable
: تدفقات يمكن قراءة البيانات منها (على سبيل المثال،fs.createReadStream()
).Duplex
: تدفقات قابلة للقراءة والكتابة (على سبيل المثال،net.Socket
).Transform
: تدفقاتDuplex
يمكنها تعديل أو تحويل البيانات أثناء كتابتها وقراءتها (على سبيل المثال،zlib.createDeflate()
).
بالإضافة إلى ذلك، تتضمن هذه الوحدة دوال المساعدة stream.duplexPair()
، stream.pipeline()
، stream.finished()
stream.Readable.from()
، و stream.addAbortSignal()
.
واجهة برمجة التطبيقات للمتابعة والوعود
مضاف في: v15.0.0
توفر واجهة برمجة التطبيقات stream/promises
مجموعة بديلة من دوال المرافق غير المتزامنة للتيارات التي تُرجع كائنات Promise
بدلاً من استخدام عمليات النداء العكسي. يمكن الوصول إلى واجهة برمجة التطبيقات عبر require('node:stream/promises')
أو require('node:stream').promises
.
stream.pipeline(source[, ...transforms], destination[, options])
stream.pipeline(streams[, options])
[السجل]
الإصدار | التغييرات |
---|---|
v18.0.0، v17.2.0، v16.14.0 | إضافة خيار end ، والذي يمكن تعيينه إلى false لمنع إغلاق تيار الوجهة تلقائيًا عند انتهاء المصدر. |
v15.0.0 | مضاف في: v15.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function>- الإرجاع: <Promise> | <AsyncIterable>
...transforms
<Stream> | <Function>source
<AsyncIterable>- الإرجاع: <Promise> | <AsyncIterable>
destination
<Stream> | <Function>source
<AsyncIterable>- الإرجاع: <Promise> | <AsyncIterable>
options
<Object> خيارات خط الأنابيبsignal
<AbortSignal>end
<boolean> إنهاء تيار الوجهة عندما ينتهي تيار المصدر. يتم دائمًا إنهاء تيارات التحويل، حتى إذا كانت هذه القيمةfalse
. الافتراضي:true
.
الإرجاع: <Promise> يتم الوفاء به عندما يكتمل خط الأنابيب.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
console.log('Pipeline succeeded.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Pipeline succeeded.')
لاستخدام AbortSignal
، قم بتمريره داخل كائن الخيارات، كحجة أخيرة. عندما يتم إلغاء الإشارة، سيتم استدعاء destroy
على خط الأنابيب الأساسي، مع AbortError
.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
const ac = new AbortController()
const signal = ac.signal
setImmediate(() => ac.abort())
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
signal,
})
}
run().catch(console.error) // AbortError
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
console.error(err) // AbortError
}
تدعم واجهة برمجة التطبيقات pipeline
أيضًا المُولِّدات غير المتزامنة:
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // التعامل مع السلاسل بدلاً من `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
fs.createWriteStream('uppercase.txt')
)
console.log('Pipeline succeeded.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // التعامل مع السلاسل بدلاً من `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
createWriteStream('uppercase.txt')
)
console.log('Pipeline succeeded.')
تذكر التعامل مع وسيطة signal
المُمرَّرة إلى المُولِّد غير المتزامن. خاصة في حالة كون المُولِّد غير المتزامن هو المصدر لخط الأنابيب (أي الحجة الأولى) أو لن يتم إكمال خط الأنابيب أبدًا.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline succeeded.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline succeeded.')
توفر واجهة برمجة التطبيقات pipeline
نسخة callback:
stream.finished(stream[, options])
[History]
الإصدار | التغييرات |
---|---|
v19.5.0, v18.14.0 | تمت إضافة دعم لـ ReadableStream و WritableStream . |
v19.1.0, v18.13.0 | تمت إضافة خيار cleanup . |
v15.0.0 | تمت الإضافة في: v15.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> تدفق/تدفق ويب للقراءة و/أو الكتابة.options
<Object>error
<boolean> | <undefined>readable
<boolean> | <undefined>writable
<boolean> | <undefined>signal
<AbortSignal> | <undefined>cleanup
<boolean> | <undefined> إذا كانتtrue
، فإنه يزيل المستمعين المسجلين بواسطة هذه الدالة قبل تحقيق الوعد. الافتراضي:false
.
الإرجاع: <Promise> يتم الوفاء به عندما لا يكون التدفق قابلاً للقراءة أو الكتابة بعد الآن.
const { finished } = require('node:stream/promises')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('Stream is done reading.')
}
run().catch(console.error)
rs.resume() // Drain the stream.
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'
const rs = createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('Stream is done reading.')
}
run().catch(console.error)
rs.resume() // Drain the stream.
يوفر واجهة برمجة التطبيقات finished
أيضًا إصدارًا قائمًا على المُستدعي.
يترك stream.finished()
مستمعي أحداث متدلية (خاصةً 'error'
, 'end'
, 'finish'
و 'close'
) بعد حل أو رفض الوعد المُرجَع. والسبب في ذلك هو أن أحداث 'error'
غير المتوقعة (بسبب تنفيذ التدفق غير الصحيح) لا تتسبب في تعطل غير متوقع. إذا كان هذا سلوكًا غير مرغوب فيه، فيجب تعيين options.cleanup
على true
:
await finished(rs, { cleanup: true })
وضع الكائن
جميع التدفقات التي تم إنشاؤها بواسطة واجهات برمجة التطبيقات Node.js تعمل حصريًا على سلاسل، <Buffer>، <TypedArray> وكائنات <DataView>:
Strings
وBuffers
هما الأنواع الأكثر شيوعًا المستخدمة مع التدفقات.TypedArray
وDataView
يسمحان لك بالتعامل مع البيانات الثنائية بأنواع مثلInt32Array
أوUint8Array
. عندما تقوم بكتابة TypedArray أو DataView إلى تدفق، تقوم Node.js بمعالجة البايت الخام.
ومع ذلك، من الممكن أن تعمل تنفيذات التدفق مع أنواع أخرى من قيم JavaScript (باستثناء null
، والذي يخدم غرضًا خاصًا داخل التدفقات). تعتبر هذه التدفقات تعمل في "وضع الكائن".
يتم تبديل مثيلات التدفق إلى وضع الكائن باستخدام خيار objectMode
عند إنشاء التدفق. إن محاولة تبديل تدفق موجود إلى وضع الكائن ليست آمنة.
التخزين المؤقت
تخزن كل من تدفقات Writable
و Readable
البيانات في مخزن مؤقت داخلي.
تعتمد كمية البيانات التي يمكن تخزينها مؤقتًا على خيار highWaterMark
الممرر إلى مُنشئ التدفق. بالنسبة للتدفقات العادية، يُحدد خيار highWaterMark
العدد الإجمالي للبايتات. بالنسبة للتدفقات التي تعمل في وضع الكائن، يُحدد highWaterMark
العدد الإجمالي للكائنات. بالنسبة للتدفقات التي تعمل على (ولكن لا تقوم بفك تشفير) السلاسل، يُحدد highWaterMark
العدد الإجمالي لوحدات ترميز UTF-16.
يتم تخزين البيانات مؤقتًا في تدفقات Readable
عندما يقوم التنفيذ باستدعاء stream.push(chunk)
. إذا لم يستدعِ مُستهلك التدفق stream.read()
، فستبقى البيانات في قائمة الانتظار الداخلية حتى يتم استهلاكها.
بمجرد أن يصل الحجم الإجمالي لمخزن مؤقت القراءة الداخلي إلى الحد المحدد بواسطة highWaterMark
، سيتوقف التدفق مؤقتًا عن قراءة البيانات من المورد الأساسي حتى يمكن استهلاك البيانات المخزنة مؤقتًا حاليًا (أي أن التدفق سيتوقف عن استدعاء الطريقة الداخلية readable._read()
المستخدمة لملء مخزن مؤقت القراءة).
يتم تخزين البيانات مؤقتًا في تدفقات Writable
عندما يتم استدعاء طريقة writable.write(chunk)
بشكل متكرر. طالما أن الحجم الإجمالي لمخزن مؤقت الكتابة الداخلي أقل من الحد المحدد بواسطة highWaterMark
، ستعيد استدعاءات writable.write()
قيمة true
. بمجرد أن يصل حجم المخزن المؤقت الداخلي إلى أو يتجاوز highWaterMark
، سيتم إرجاع false
.
الهدف الرئيسي لواجهة برمجة التطبيقات stream
، وخاصة طريقة stream.pipe()
، هو الحد من تخزين البيانات مؤقتًا إلى مستويات مقبولة بحيث لا تغمر مصادر ووجهات السرعات المختلفة الذاكرة المتاحة.
خيار highWaterMark
هو حد، وليس قيدًا: فهو يملي كمية البيانات التي يخزنها التدفق مؤقتًا قبل أن يتوقف عن طلب المزيد من البيانات. إنه لا يفرض قيدًا صارمًا على الذاكرة بشكل عام. قد تختار تنفيذات التدفق المحددة فرض قيود أكثر صرامة، ولكن هذا اختياري.
بما أن تدفقات Duplex
و Transform
هي كل من Readable
و Writable
، فإن كل منهما يحتفظ بـ اثنين من المخازن المؤقتة الداخلية المنفصلة المستخدمة للقراءة والكتابة، مما يسمح لكل جانب بالعمل بشكل مستقل عن الآخر مع الحفاظ على تدفق مناسب وفعال للبيانات. على سبيل المثال، تعتبر مثيلات net.Socket
تدفقات Duplex
حيث يسمح الجانب Readable
باستهلاك البيانات الواردة من المقبس، ويسمح الجانب Writable
بكتابة البيانات إلى المقبس. نظرًا لإمكانية كتابة البيانات إلى المقبس بمعدل أسرع أو أبطأ من استقبال البيانات، يجب أن يعمل كل جانب (ويخزن مؤقتًا) بشكل مستقل عن الآخر.
آليات التخزين المؤقت الداخلي هي تفاصيل تنفيذ داخلية وقد يتم تغييرها في أي وقت. ومع ذلك، بالنسبة لبعض التنفيذات المتقدمة، يمكن استرداد المخازن المؤقتة الداخلية باستخدام writable.writableBuffer
أو readable.readableBuffer
. يُنصح بتجنب استخدام هذه الخصائص غير الموثقة.
واجهة برمجة التطبيقات لمستهلكي التدفقات
تستخدم معظم تطبيقات Node.js، بغض النظر عن مدى بساطتها، التدفقات بطريقة ما. فيما يلي مثال على استخدام التدفقات في تطبيق Node.js الذي ينفذ خادم HTTP:
const http = require('node:http')
const server = http.createServer((req, res) => {
// `req` هو http.IncomingMessage، وهو تدفق قابل للقراءة.
// `res` هو http.ServerResponse، وهو تدفق قابل للكتابة.
let body = ''
// احصل على البيانات كسلاسل utf8.
// إذا لم يتم تعيين ترميز، فسيتم استقبال كائنات Buffer.
req.setEncoding('utf8')
// تُصدر التدفقات القابلة للقراءة أحداث 'data' بمجرد إضافة مستمع.
req.on('data', chunk => {
body += chunk
})
// يشير حدث 'end' إلى استلام الجسم بالكامل.
req.on('end', () => {
try {
const data = JSON.parse(body)
// اكتب شيئًا مثيرًا للاهتمام للمستخدم:
res.write(typeof data)
res.end()
} catch (er) {
// أوه أوه! بيانات JSON خاطئة!
res.statusCode = 400
return res.end(`error: ${er.message}`)
}
})
})
server.listen(1337)
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON
تُعرض التدفقات القابلة للكتابة Writable
(مثل res
في المثال) طرقًا مثل write()
و end()
التي تُستخدم لكتابة البيانات على التدفق.
تستخدم التدفقات القابلة للقراءة Readable
واجهة برمجة التطبيقات EventEmitter
لإعلام رمز التطبيق عندما تتوفر بيانات للقراءة من التدفق. يمكن قراءة تلك البيانات المتاحة من التدفق بعدة طرق.
تستخدم كل من التدفقات القابلة للكتابة Writable
والقابلة للقراءة Readable
واجهة برمجة التطبيقات EventEmitter
بطرق مختلفة للتواصل مع الحالة الحالية للتدفق.
التدفقات Duplex
و Transform
هما Writable
و Readable
.
لا يُطلب من التطبيقات التي تكتب بيانات إلى تدفق أو تستهلك بيانات منه تنفيذ واجهات التدفق مباشرةً، ولن يكون لديها عمومًا سبب للاتصال بـ require('node:stream')
.
يجب على المطورين الذين يرغبون في تنفيذ أنواع جديدة من التدفقات الرجوع إلى قسم واجهة برمجة التطبيقات لمنفذي التدفق.
تدفقات قابلة للكتابة
تُعد التدفقات القابلة للكتابة تجريدًا لـ وجهة يتم كتابة البيانات إليها.
أمثلة على تدفقات Writable
:
- طلبات HTTP، على العميل
- استجابات HTTP، على الخادم
- تدفقات كتابة fs
- تدفقات zlib
- تدفقات تشفير
- مقابس TCP
- مدخل عملية فرعية
process.stdout
،process.stderr
بعض هذه الأمثلة هي في الواقع تدفقات Duplex
التي تُنفذ واجهة Writable
.
جميع تدفقات Writable
تُنفذ الواجهة المُعرّفة بواسطة فئة stream.Writable
.
في حين أن حالات مُحددة من تدفقات Writable
قد تختلف بطرق مُتعددة، إلا أن جميع التدفقات Writable
تتبع نفس نمط الاستخدام الأساسي كما هو موضح في المثال أدناه:
const myStream = getWritableStreamSomehow()
myStream.write('some data')
myStream.write('some more data')
myStream.end('done writing data')
فئة: stream.Writable
مضاف في: v0.9.4
حدث: 'close'
[تاريخ]
الإصدار | التغييرات |
---|---|
v10.0.0 | إضافة خيار emitClose لتحديد ما إذا كان يتم إصدار 'close' عند التدمير. |
v0.9.4 | مضاف في: v0.9.4 |
يتم إصدار حدث 'close'
عندما يتم إغلاق التدفق وأي من موارده الأساسية (وصف الملف، على سبيل المثال). يشير الحدث إلى أنه لن يتم إصدار المزيد من الأحداث، ولن يتم إجراء المزيد من الحسابات.
سوف تقوم تدفقات Writable
دائمًا بإصدار حدث 'close'
إذا تم إنشاؤها باستخدام خيار emitClose
.
حدث: 'drain'
مضاف في: v0.9.4
إذا قامت مكالمة إلى stream.write(chunk)
بإرجاع false
، فسيتم إصدار حدث 'drain'
عندما يكون من المناسب استئناف كتابة البيانات إلى التدفق.
// كتابة البيانات إلى تدفق الكتابة المُقدم مليون مرة.
// انتبه إلى ضغط الظهر.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000
write()
function write() {
let ok = true
do {
i--
if (i === 0) {
// آخر مرة!
writer.write(data, encoding, callback)
} else {
// تحقق مما إذا كان ينبغي لنا المتابعة، أو الانتظار.
// لا تُمرّر المُنعطف، لأننا لم ننتهي بعد.
ok = writer.write(data, encoding)
}
} while (i > 0 && ok)
if (i > 0) {
// كان لا بد من التوقف مبكرًا!
// اكتب المزيد بمجرد تصريفه.
writer.once('drain', write)
}
}
}
الحدث: 'error'
مضاف في: v0.9.4
يُصدر حدث 'error'
إذا حدث خطأ أثناء كتابة البيانات أو تمريرها. يتم تمرير مُعامل Error
واحد إلى دالة المُستمع عند النداء.
يتم إغلاق التدفق عندما يتم إصدار حدث 'error'
ما لم يتم تعيين خيار autoDestroy
على false
عند إنشاء التدفق.
بعد 'error'
، لا يجب إصدار أي أحداث أخرى غير 'close'
(بما في ذلك أحداث 'error'
).
الحدث: 'finish'
مضاف في: v0.9.4
يُصدر حدث 'finish'
بعد استدعاء طريقة stream.end()
، وبعد تطهير جميع البيانات إلى النظام الأساسي.
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
console.log('All writes are now complete.')
})
writer.end('This is the end\n')
الحدث: 'pipe'
مضاف في: v0.9.4
src
<stream.Readable> تدفق المصدر الذي يقوم بالتمرير إلى هذا الكتابة
يُصدر حدث 'pipe'
عند استدعاء طريقة stream.pipe()
على تدفق قابل للقراءة، مضيفًا هذا الكتابة إلى مجموعة وجهاته.
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
console.log('Something is piping into the writer.')
assert.equal(src, reader)
})
reader.pipe(writer)
الحدث: 'unpipe'
مضاف في: v0.9.4
src
<stream.Readable> تدفق المصدر الذي قام بإلغاء التمرير لهذا الكتابة
يُصدر حدث 'unpipe'
عند استدعاء طريقة stream.unpipe()
على تدفق Readable
، وإزالة هذا Writable
من مجموعة وجهاته.
كما يتم إصدار هذا في حالة إصدار هذا التدفق Writable
خطأً عندما يقوم تدفق Readable
بالتمرير إليه.
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
console.log('Something has stopped piping into the writer.')
assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()
مُضاف في: v0.11.2
تُجبر طريقة writable.cork()
جميع البيانات المكتوبة على التخزين المؤقت في الذاكرة. سيتم تفريغ البيانات المخزنة مؤقتًا عند استدعاء طريقتي stream.uncork()
أو stream.end()
.
الهدف الأساسي من writable.cork()
هو تسهيل الوضع الذي تُكتب فيه العديد من القطع الصغيرة إلى الدفق في تعاقب سريع. بدلاً من توجيهها على الفور إلى الوجهة الأساسية، يقوم writable.cork()
بتخزين جميع القطع مؤقتًا حتى يتم استدعاء writable.uncork()
, والذي سيرسلها جميعًا إلى writable._writev()
, إذا وجد. هذا يمنع حالة حجب رأس الخط حيث يتم تخزين البيانات مؤقتًا أثناء انتظار معالجة أول قطعة صغيرة. ومع ذلك، قد يكون لاستخدام writable.cork()
دون تنفيذ writable._writev()
تأثير سلبي على الإنتاجية.
انظر أيضًا: writable.uncork()
، writable._writev()
.
writable.destroy([error])
[السجل]
الإصدار | التغييرات |
---|---|
v14.0.0 | تعمل كعملية لا شيء على دفق تم تدميره بالفعل. |
v8.0.0 | مُضاف في: v8.0.0 |
تدمير الدفق. يُصدر اختيارياً حدث 'error'
، ويُصدر حدث 'close'
(ما لم يتم تعيين emitClose
على false
). بعد هذه المكالمة، يكون دفق الكتابة قد انتهى، وستؤدي المكالمات اللاحقة إلى write()
أو end()
إلى خطأ ERR_STREAM_DESTROYED
. هذه طريقة مدمرة وفورية لتدمير دفق. قد لا تكون المكالمات السابقة إلى write()
قد تم تصريفها، وقد تُشغل خطأ ERR_STREAM_DESTROYED
. استخدم end()
بدلاً من destroy
إذا كان ينبغي شطف البيانات قبل الإغلاق، أو انتظر حدث 'drain'
قبل تدمير الدفق.
const { Writable } = require('node:stream')
const myStream = new Writable()
const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.on('error', function wontHappen() {})
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED
بمجرد استدعاء destroy()
، ستكون أي مكالمات أخرى عملية لا شيء، ولن يتم إصدار أي أخطاء أخرى باستثناء الأخطاء من _destroy()
كـ 'error'
.
يجب على المُنفذين عدم تجاوز هذه الطريقة، ولكن بدلاً من ذلك تنفيذ writable._destroy()
.
writable.closed
مضاف في: v18.0.0
تكون true
بعد إرسال 'close'
.
writable.destroyed
مضاف في: v8.0.0
تكون true
بعد استدعاء writable.destroy()
.
const { Writable } = require('node:stream')
const myStream = new Writable()
console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])
[History]
الإصدار | التغييرات |
---|---|
v22.0.0, v20.13.0 | يمكن الآن أن تكون وسيطة chunk مثيلًا لـTypedArray أو DataView . |
v15.0.0 | يتم استدعاء callback قبل 'finish' أو عند حدوث خطأ. |
v14.0.0 | يتم استدعاء callback إذا تم إرسال 'finish' أو 'error' . |
v10.0.0 | تُعيد هذه الطريقة الآن مرجعًا إلى writable . |
v8.0.0 | يمكن الآن أن تكون وسيطة chunk مثيلًا لـUint8Array . |
v0.9.4 | مضاف في: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> بيانات اختيارية للكتابة. بالنسبة للتيارات التي لا تعمل في وضع الكائن، يجب أن يكونchunk
<string> أو <Buffer> أو <TypedArray> أو <DataView>. بالنسبة لتيارات وضع الكائن، قد تكونchunk
أي قيمة JavaScript بخلافnull
.encoding
<string> الترميز إذا كانchunk
سلسلة.callback
<Function> دالة الاستدعاء الراجعة عندما تنتهي البث.- تُرجع: <this>
إن استدعاء طريقة writable.end()
يشير إلى أنه لن يتم كتابة المزيد من البيانات إلى Writable
. تسمح الوسيطات الاختيارية chunk
و encoding
بكتابة جزء إضافي أخير من البيانات قبل إغلاق التيار.
سيؤدي استدعاء طريقة stream.write()
بعد استدعاء stream.end()
إلى إثارة خطأ.
// كتابة 'hello, ' ثم إنهاء بـ 'world!'.
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hello, ')
file.end('world!')
// الكتابة الآن غير مسموح بها!
writable.setDefaultEncoding(encoding)
[السجل]
الإصدار | التغييرات |
---|---|
v6.1.0 | تُعيد هذه الطريقة الآن مرجعًا إلى writable . |
v0.11.15 | تمت الإضافة في: v0.11.15 |
تُعيّن طريقة writable.setDefaultEncoding()
ترميز الإعداد الافتراضي encoding
لدفق Writable
.
writable.uncork()
تمت الإضافة في: v0.11.2
تقوم طريقة writable.uncork()
بتفريغ جميع البيانات المُخزَّنة مؤقتًا منذ استدعاء stream.cork()
.
عند استخدام writable.cork()
و writable.uncork()
لإدارة تخزين كتابات الدفق مؤقتًا، تأجيل استدعاءات writable.uncork()
باستخدام process.nextTick()
. يسمح ذلك بتجميع جميع استدعاءات writable.write()
التي تحدث داخل مرحلة حلقة أحداث Node.js معينة.
stream.cork()
stream.write('some ')
stream.write('data ')
process.nextTick(() => stream.uncork())
إذا تم استدعاء طريقة writable.cork()
عدة مرات على دفق، فيجب استدعاء نفس عدد استدعاءات writable.uncork()
لتفريغ البيانات المُخزَّنة مؤقتًا.
stream.cork()
stream.write('some ')
stream.cork()
stream.write('data ')
process.nextTick(() => {
stream.uncork()
// لن يتم تفريغ البيانات حتى يتم استدعاء uncork() للمرة الثانية.
stream.uncork()
})
انظر أيضًا: writable.cork()
.
writable.writable
تمت الإضافة في: v11.4.0
تكون true
إذا كان من الآمن استدعاء writable.write()
، مما يعني أن الدفق لم يتم تدميره أو حدوث خطأ فيه أو إنهاؤه.
writable.writableAborted
تمت الإضافة في: v18.0.0، v16.17.0
يُرجع ما إذا كان الدفق قد تم تدميره أو حدوث خطأ فيه قبل إصدار 'finish'
.
writable.writableEnded
مضاف في: v12.9.0
تكون قيمتها true
بعد استدعاء writable.end()
. لا تشير هذه الخاصية إلى ما إذا تم تفريغ البيانات أم لا، استخدم writable.writableFinished
بدلاً من ذلك.
writable.writableCorked
مضاف في: v13.2.0، v12.16.0
عدد المرات التي يجب فيها استدعاء writable.uncork()
من أجل إلغاء سدّ التدفق بالكامل.
writable.errored
مضاف في: v18.0.0
يُرجع خطأ إذا تم تدمير التدفق بخطأ.
writable.writableFinished
مضاف في: v12.6.0
تُعيّن إلى true
مباشرة قبل إصدار حدث 'finish'
.
writable.writableHighWaterMark
مضاف في: v9.3.0
تُرجع قيمة highWaterMark
التي تم تمريرها عند إنشاء هذا Writable
.
writable.writableLength
مضاف في: v9.4.0
تحتوي هذه الخاصية على عدد البايتات (أو الكائنات) في قائمة الانتظار الجاهزة للكتابة. توفر القيمة بيانات استبطانية بخصوص حالة highWaterMark
.
writable.writableNeedDrain
مضاف في: v15.2.0، v14.17.0
تكون قيمتها true
إذا امتلأ المخزن المؤقت للتدفق وسوف يُصدر التدفق 'drain'
.
writable.writableObjectMode
مضاف في: v12.3.0
معرّف لخاصية objectMode
لبثّ Writable
معيّن.
writable[Symbol.asyncDispose]()
مضاف في: v22.4.0, v20.16.0
[مستقر: 1 - تجريبي]
مستقر: 1 استقرار: 1 - تجريبي
يطلق writable.destroy()
مع AbortError
ويعيد وعدًا يتم إنجازه عند انتهاء البث.
writable.write(chunk[, encoding][, callback])
[السجل]
الإصدار | التغييرات |
---|---|
v22.0.0, v20.13.0 | يمكن الآن أن تكون وسيطة chunk مثيلًا لـ TypedArray أو DataView . |
v8.0.0 | يمكن الآن أن تكون وسيطة chunk مثيلًا لـ Uint8Array . |
v6.0.0 | سيُعتبر تمرير null كوسيط chunk غير صالح دائمًا الآن، حتى في وضع الكائن. |
v0.9.4 | مضاف في: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> بيانات اختيارية للكتابة. بالنسبة للبثّات التي لا تعمل في وضع الكائن، يجب أن يكونchunk
<string>، أو <Buffer>، أو <TypedArray>، أو <DataView>. بالنسبة لبثّات وضع الكائن، قد يكونchunk
أي قيمة جافا سكريبت بخلافnull
.encoding
<string> | <null> الترميز، إذا كانchunk
سلسلة. افتراضيًا:'utf8'
callback
<Function> دالة مُراجعة عند شطف جزء البيانات هذا.- القيمة المُعادة: <boolean>
false
إذا أراد البثّ أن تنتظر الكود المُستدعي حدث'drain'
قبل المتابعة في كتابة بيانات إضافية؛ وإلاtrue
.
تكتب طريقة writable.write()
بعض البيانات إلى البث، وتُطلق دالة المُراجعة المُعطاة بمجرد معالجة البيانات بالكامل. إذا حدث خطأ، فسيتم استدعاء دالة المُراجعة مع الخطأ كوسيطها الأول. يتم استدعاء دالة المُراجعة بشكل غير متزامن وقبل إصدار 'error'
.
القيمة المُعادة هي true
إذا كانت ذاكرة التخزين المؤقت الداخلية أقل من highWaterMark
المُهيّأ عند إنشاء البث بعد قبول chunk
. إذا تم إرجاع false
، فيجب إيقاف المزيد من المحاولات لكتابة البيانات في البث حتى يتم إصدار حدث 'drain'
.
بينما البثّ لا يُفرغ، ستُخزّن مُكالمات write()
chunk
، وستُعيد false
. بمجرد تصريف جميع أجزاء البيانات المخزنة مؤقتًا حاليًا (مقبولة للتسليم بواسطة نظام التشغيل)، سيتم إصدار حدث 'drain'
. بمجرد أن تُعيد write()
قيمة false
، لا تكتب المزيد من الأجزاء حتى يتم إصدار حدث 'drain'
. بينما يُسمح باستدعاء write()
على بثّ لا يُفرغ، سيُخزّن Node.js جميع الأجزاء المكتوبة حتى يحدث الحد الأقصى لاستخدام الذاكرة، عندها سيتم الإلغاء بشكل غير مشروط. حتى قبل أن يُلغي، سيؤدي استخدام الذاكرة العالي إلى ضعف أداء جامع القمامة وارتفاع RSS (الذي لا يتم إطلاقه عادةً مرة أخرى إلى النظام، حتى بعد عدم الحاجة إلى الذاكرة بعد الآن). نظرًا لأن مقابس TCP قد لا تُفرغ أبدًا إذا لم يقرأ الطرف البعيد البيانات، فقد يؤدي كتابة مقبس لا يُفرغ إلى ثغرة أمنية قابلة للاستغلال عن بُعد.
كتابة البيانات بينما البثّ لا يُفرغ أمرٌ مُشكِل بشكل خاص لـ Transform
، لأن بثّات Transform
مُوقفة افتراضيًا حتى يتم توصيلها أو إضافة مُعالِج حدث 'data'
أو 'readable'
.
إذا كان يمكن إنشاء البيانات المراد كتابتها أو جلبها عند الطلب، يُنصح بتغليف المنطق في Readable
واستخدام stream.pipe()
. ومع ذلك، إذا كان يُفضّل استدعاء write()
، فمن الممكن احترام ضغط الظهر وتجنب مشكلات الذاكرة باستخدام حدث 'drain'
:
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb)
} else {
process.nextTick(cb)
}
}
// انتظر حتى يتم استدعاء cb قبل القيام بأي كتابة أخرى.
write('hello', () => {
console.log('كتابة مكتملة، قم بإجراء المزيد من الكتابة الآن.')
})
سيُهمل بثّ Writable
في وضع الكائن وسيط encoding
دائمًا.
تدفّقات قابلة للقراءة
تُعدّ التدفّقات القابلة للقراءة تجرّداً لمصدر يتم استهلاك البيانات منه.
أمثلة على تدفّقات Readable
:
- استجابات HTTP، على العميل
- طلبات HTTP، على الخادم
- تدفّقات قراءة fs
- تدفّقات zlib
- تدفّقات crypto
- مقابس TCP
- مخرجات ومدخلات خطأ عملية فرعية
process.stdin
جميع التدفّقات Readable
تُنفّذ الواجهة المُعرّفة بواسطة فئة stream.Readable
.
وضعا القراءة
تعمل تدفّقات Readable
بشكل فعّال في أحد وضعين: وضع التدفّق ووضع الإيقاف المؤقّت. هذان الوضعان منفصلان عن وضع الكائن. يمكن أن يكون تدفّق Readable
في وضع الكائن أو ليس فيه، بغضّ النظر عمّا إذا كان في وضع التدفّق أو وضع الإيقاف المؤقّت.
- في وضع التدفّق، يتم قراءة البيانات من النظام الأساسي تلقائيًا وتوفيرها للتطبيق بأسرع ما يمكن باستخدام الأحداث عبر واجهة
EventEmitter
. - في وضع الإيقاف المؤقّت، يجب استدعاء طريقة
stream.read()
صراحةً لقراءة أجزاء البيانات من التدفّق.
تبدأ جميع التدفّقات Readable
في وضع الإيقاف المؤقّت، ولكن يمكن تبديلها إلى وضع التدفّق بإحدى الطرق التالية:
- إضافة مُعالِج حدث
'data'
. - استدعاء طريقة
stream.resume()
. - استدعاء طريقة
stream.pipe()
لإرسال البيانات إلىWritable
.
يمكن أن يعود Readable
إلى وضع الإيقاف المؤقّت باستخدام إحدى الطرق التالية:
- إذا لم تكن هناك وجهات أنابيب، عن طريق استدعاء طريقة
stream.pause()
. - إذا كانت هناك وجهات أنابيب، عن طريق إزالة جميع وجهات الأنابيب. يمكن إزالة العديد من وجهات الأنابيب عن طريق استدعاء طريقة
stream.unpipe()
.
المفهوم المهم الذي يجب تذكّره هو أن Readable
لن يُولّد بيانات حتى يتم توفير آلية لاستهلاك تلك البيانات أو تجاهلها. إذا تم تعطيل آلية الاستهلاك أو إزالتها، فإن Readable
سيحاول إيقاف توليد البيانات.
لأسباب التوافق مع الإصدارات السابقة، فإن إزالة مُعالِجات أحداث 'data'
لن يؤدّي إلى إيقاف التدفّق تلقائيًا. أيضًا، إذا كانت هناك وجهات أنابيب، فإن استدعاء stream.pause()
لن يضمن أن يبقى التدفّق موقوفًا بمجرد أن تستنزف تلك الوجهات وتطلب المزيد من البيانات.
إذا تم تبديل Readable
إلى وضع التدفّق ولم تكن هناك مستهلكات متاحة لمعالجة البيانات، فستضيع تلك البيانات. يمكن أن يحدث هذا، على سبيل المثال، عند استدعاء طريقة readable.resume()
بدون مستمع مُرفق بحدث 'data'
، أو عند إزالة مُعالِج حدث 'data'
من التدفّق.
يؤدّي إضافة مُعالِج حدث 'readable'
تلقائيًا إلى إيقاف تدفّق التدفّق، ويجب استهلاك البيانات عبر readable.read()
. إذا تمت إزالة مُعالِج حدث 'readable'
، فسوف يبدأ التدفّق مرة أخرى إذا كان هناك مُعالِج حدث 'data'
.
ثلاث حالات
يُعد "الوضعان" لتشغيل دفق Readable
تجريدًا مبسطًا لإدارة الحالة الداخلية الأكثر تعقيدًا التي تحدث داخل تنفيذ دفق Readable
.
على وجه التحديد، في أي نقطة زمنية معينة، يكون كل دفق Readable
في إحدى الحالات الثلاث الممكنة:
readable.readableFlowing === null
readable.readableFlowing === false
readable.readableFlowing === true
عندما يكون readable.readableFlowing
مساويًا لـ null
، لا يتم توفير آلية لاستهلاك بيانات الدفق. لذلك، لن يقوم الدفق بإنشاء بيانات. أثناء وجوده في هذه الحالة، سيؤدي إرفاق مستمع لحدث 'data'
، أو استدعاء طريقة readable.pipe()
، أو استدعاء طريقة readable.resume()
إلى تبديل readable.readableFlowing
إلى true
، مما يتسبب في بدء دفق Readable
في إصدار أحداث بشكل فعال عند إنشاء البيانات.
سيؤدي استدعاء readable.pause()
، أو readable.unpipe()
، أو تلقي ضغط عكسي إلى تعيين readable.readableFlowing
على أنه false
، مما يؤدي مؤقتًا إلى إيقاف تدفق الأحداث ولكن ليس إيقاف إنشاء البيانات. أثناء وجوده في هذه الحالة، لن يؤدي إرفاق مستمع لحدث 'data'
إلى تبديل readable.readableFlowing
إلى true
.
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()
pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing is now false.
pass.on('data', chunk => {
console.log(chunk.toString())
})
// readableFlowing is still false.
pass.write('ok') // Will not emit 'data'.
pass.resume() // Must be called to make stream emit 'data'.
// readableFlowing is now true.
بينما يكون readable.readableFlowing
مساويًا لـ false
، قد تتراكم البيانات داخل المخزن المؤقت الداخلي للدفق.
اختر أسلوب واجهة برمجة التطبيقات واحدًا
تطورت واجهة برمجة تطبيقات دفق Readable
عبر إصدارات متعددة من Node.js وتوفر طرقًا متعددة لاستهلاك بيانات الدفق. بشكل عام، يجب على المطورين اختيار طريقة واحدة لاستهلاك البيانات ويجب عدم استخدام طرق متعددة أبدًا لاستهلاك البيانات من دفق واحد. على وجه التحديد، قد يؤدي استخدام مجموعة من on('data')
، و on('readable')
، و pipe()
، أو المتكررات غير المتزامنة إلى سلوك غير بديهي.
الصف: stream.Readable
مضاف في: v0.9.4
الحدث: 'close'
[السجل]
الإصدار | التغييرات |
---|---|
v10.0.0 | إضافة خيار emitClose لتحديد ما إذا كان يتم إصدار 'close' عند التدمير. |
v0.9.4 | مضاف في: v0.9.4 |
يتم إصدار حدث 'close'
عندما يتم إغلاق الدفق وأي من موارده الأساسية (مثل مُوصِف الملف). يشير الحدث إلى أنه لن يتم إصدار المزيد من الأحداث، ولن يتم إجراء المزيد من الحسابات.
سوف يُصدر تيار Readable
دائمًا حدث 'close'
إذا تم إنشاؤه باستخدام خيار emitClose
.
الحدث: 'data'
مضاف في: v0.9.4
chunk
<Buffer> | <string> | <any> جزء البيانات. بالنسبة للتيارات التي لا تعمل في وضع الكائن، سيكون الجزء إما سلسلة أوBuffer
. بالنسبة للتيارات الموجودة في وضع الكائن، يمكن أن يكون الجزء أي قيمة JavaScript بخلافnull
.
يتم إصدار حدث 'data'
كلما تخلى التيار عن ملكية جزء من البيانات إلى مستهلك. قد يحدث هذا كلما تم تبديل التيار في وضع التدفق عن طريق استدعاء readable.pipe()
، readable.resume()
، أو عن طريق إرفاق دالة استدعاء مُستمع إلى حدث 'data'
. سيتم أيضًا إصدار حدث 'data'
كلما تم استدعاء طريقة readable.read()
وكانت هناك كتلة بيانات متاحة لإعادتها.
إن إرفاق مُستمع لحدث 'data'
بتيار لم يتم إيقافه مؤقتًا بشكل صريح سيُبدّل التيار إلى وضع التدفق. سيتم تمرير البيانات بمجرد توفرها.
سيتم تمرير دالة الاستدعاء المُستمع جزء البيانات كسلسلة إذا تم تحديد ترميز افتراضي للتيار باستخدام طريقة readable.setEncoding()
؛ وإلا سيتم تمرير البيانات كـ Buffer
.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`استلمت ${chunk.length} بايت من البيانات.`)
})
الحدث: 'end'
أضيف في: v0.9.4
يُصدر حدث 'end'
عندما لا يكون هناك المزيد من البيانات لاستهلاكها من الدفق.
لن يتم إصدار حدث 'end'
إلا إذا تم استهلاك البيانات بالكامل. يمكن تحقيق ذلك عن طريق تبديل الدفق إلى وضع التدفق، أو عن طريق استدعاء stream.read()
بشكل متكرر حتى يتم استهلاك جميع البيانات.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Received ${chunk.length} bytes of data.`)
})
readable.on('end', () => {
console.log('There will be no more data.')
})
الحدث: 'error'
أضيف في: v0.9.4
قد يُصدر تنفيذ Readable
حدث 'error'
في أي وقت. عادةً، قد يحدث هذا إذا كان الدفق الأساسي غير قادر على إنشاء بيانات بسبب فشل داخلي أساسي، أو عندما يحاول تنفيذ دفق دفع جزء بيانات غير صالح.
سيتم تمرير كائن Error
واحد إلى وظيفة الاستدعاء المُستمع.
الحدث: 'pause'
أضيف في: v0.9.4
يُصدر حدث 'pause'
عندما يتم استدعاء stream.pause()
و readableFlowing
ليس false
.
الحدث: 'readable'
[السجل]
الإصدار | التغييرات |
---|---|
v10.0.0 | يُصدر 'readable' دائمًا في التكرار التالي بعد استدعاء .push() |
v10.0.0 | يتطلب استخدام 'readable' استدعاء .read() . |
v0.9.4 | أضيف في: v0.9.4 |
يُصدر حدث 'readable'
عندما تتوفر بيانات للقراءة من الدفق، حتى علامة المياه العالية المُهيأة (state.highWaterMark
). في الواقع، يشير إلى أن الدفق يحتوي على معلومات جديدة داخل المخزن المؤقت. إذا كانت البيانات متاحة داخل هذا المخزن المؤقت، فيمكن استدعاء stream.read()
لاسترداد تلك البيانات. بالإضافة إلى ذلك، قد يتم إصدار حدث 'readable'
أيضًا عندما يتم الوصول إلى نهاية الدفق.
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
// هناك بعض البيانات للقراءة الآن.
let data
while ((data = this.read()) !== null) {
console.log(data)
}
})
إذا تم الوصول إلى نهاية الدفق، فإن استدعاء stream.read()
سيرجع null
ويُشغل حدث 'end'
. هذا صحيح أيضًا إذا لم تكن هناك بيانات على الإطلاق للقراءة. على سبيل المثال، في المثال التالي، foo.txt
هو ملف فارغ:
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
console.log('end')
})
مخرجات تشغيل هذا البرنامج النصي هي:
$ node test.js
readable: null
end
في بعض الحالات، سيؤدي إرفاق مُستمع لحدث 'readable'
إلى قراءة بعض البيانات في مخزن مؤقت داخلي.
بشكل عام، آليات readable.pipe()
وحدث 'data'
أسهل في الفهم من حدث 'readable'
. ومع ذلك، قد يؤدي التعامل مع 'readable'
إلى زيادة الإنتاجية.
إذا تم استخدام كل من 'readable'
و 'data'
في نفس الوقت، فإن 'readable'
يأخذ الأولوية في التحكم في التدفق، أي أن 'data'
سيتم إصداره فقط عند استدعاء stream.read()
. ستصبح خاصية readableFlowing
false
. إذا كانت هناك مُستمعين 'data'
عند إزالة 'readable'
، فسوف يبدأ الدفق بالتدفق، أي سيتم إصدار أحداث 'data'
بدون استدعاء .resume()
.
الحدث: 'resume'
أضيف في: v0.9.4
يتم إصدار حدث 'resume'
عند استدعاء stream.resume()
و readableFlowing
ليس true
.
readable.destroy([error])
[السجل]
الإصدار | التغييرات |
---|---|
v14.0.0 | يعمل كعملية لا شيء على تيار تم تدميره بالفعل. |
v8.0.0 | أضيف في: v8.0.0 |
تدمير التيار. و بشكل اختياري، إصدار حدث 'error'
، وإصدار حدث 'close'
(ما لم يتم تعيين emitClose
إلى false
). بعد هذه المكالمة، سيُطلق تيار القراءة أي موارد داخلية، وستُتجاهل المكالمات اللاحقة إلى push()
.
بمجرد استدعاء destroy()
، ستكون أي مكالمات أخرى عملية لا شيء، ولن يتم إصدار أي أخطاء أخرى باستثناء الأخطاء من _destroy()
كـ 'error'
.
يجب على المُنفذين عدم تجاوز هذه الطريقة، ولكن بدلاً من ذلك، تنفيذ readable._destroy()
.
readable.closed
أضيف في: v18.0.0
يكون true
بعد إصدار 'close'
.
readable.destroyed
أضيف في: v8.0.0
يكون true
بعد استدعاء readable.destroy()
.
readable.isPaused()
أضيف في: v0.11.14
- القيمة المُرجعة: <boolean>
ترجع طريقة readable.isPaused()
الحالة التشغيلية الحالية لـ Readable
. ويستخدم هذا بشكل أساسي بواسطة الآلية التي تقوم بتأسيس طريقة readable.pipe()
. في معظم الحالات النموذجية، لن يكون هناك سبب لاستخدام هذه الطريقة مباشرة.
const readable = new stream.Readable()
readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()
مضاف في: v0.9.4
- مُخرجات: <this>
ستؤدي طريقة readable.pause()
إلى إيقاف تدفق البيانات في وضع التدفق، مما يؤدي إلى إيقاف إصدار أحداث 'data'
، والانتقال من وضع التدفق. ستبقى أي بيانات متاحة في المخزن المؤقت الداخلي.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Received ${chunk.length} bytes of data.`)
readable.pause()
console.log('There will be no additional data for 1 second.')
setTimeout(() => {
console.log('Now data will start flowing again.')
readable.resume()
}, 1000)
})
لا تؤثر طريقة readable.pause()
إذا كان هناك مُستمع للأحداث 'readable'
.
readable.pipe(destination[, options])
مضاف في: v0.9.4
destination
<stream.Writable> الوجهة لكتابة البياناتoptions
<Object> خيارات الأنابيبend
<boolean> إنهاء الكاتب عند انتهاء القارئ. الافتراضي:true
.
مُخرجات: <stream.Writable> الـوجهة، مما يسمح بسلسلة من الأنابيب إذا كانت تدفقًا
Duplex
أو تدفقًاTransform
تقوم طريقة readable.pipe()
بتوصيل تدفق Writable
بـ readable
، مما يجعله يتحول تلقائيًا إلى وضع التدفق ويدفع جميع بياناته إلى Writable
المُرفق. سيتم إدارة تدفق البيانات تلقائيًا بحيث لا يُثقل تدفق Writable
الوجهة بتدفق Readable
أسرع.
يوضح المثال التالي جميع البيانات من readable
في ملف باسم file.txt
:
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// جميع البيانات من readable تذهب إلى 'file.txt'.
readable.pipe(writable)
من الممكن إرفاق تيارات Writable
متعددة بتيار Readable
واحد.
ترجع طريقة readable.pipe()
مرجعًا إلى تيار الوجهة مما يجعل من الممكن إعداد سلاسل من التيارات المُوصلة:
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)
افتراضيًا، يتم استدعاء stream.end()
على تيار Writable
الوجهة عندما يصدر تيار Readable
المصدر 'end'
، بحيث لا يكون الوجهة قابلاً للكتابة بعد الآن. لإلغاء هذا السلوك الافتراضي، يمكن تمرير خيار end
كـ false
، مما يتسبب في بقاء تيار الوجهة مفتوحًا:
reader.pipe(writer, { end: false })
reader.on('end', () => {
writer.end('Goodbye\n')
})
هناك تحذير مهم وهو أنه إذا أصدر تيار Readable
خطأً أثناء المعالجة، فلن يتم إغلاق وجهة Writable
تلقائيًا. إذا حدث خطأ، فستكون هناك حاجة إلى إغلاق كل تيار يدويًا لمنع تسرب الذاكرة.
لا يتم إغلاق تيارات Writable
process.stderr
و process.stdout
أبدًا حتى ينتهي إخراج عملية Node.js، بغض النظر عن الخيارات المحددة.
readable.read([size])
مضاف في: v0.9.4
size
<number> وسيطة اختيارية لتحديد مقدار البيانات التي سيتم قراءتها.- القيمة المُرجعة: <string> | <Buffer> | <null> | <any>
تقوم طريقة readable.read()
بقراءة البيانات من المخزن المؤقت الداخلي وإرجاعها. إذا لم تكن هناك بيانات متاحة للقراءة، يتم إرجاع null
. بشكل افتراضي، يتم إرجاع البيانات ككائن Buffer
ما لم يتم تحديد ترميز باستخدام طريقة readable.setEncoding()
أو يعمل التدفق في وضع الكائن.
تحدد الوسيطة الاختيارية size
عددًا محددًا من البايتات التي سيتم قراءتها. إذا لم تكن بايتات size
متاحة للقراءة، فسيتم إرجاع null
إلا إذا انتهى التدفق، وفي هذه الحالة سيتم إرجاع جميع البيانات المتبقية في المخزن المؤقت الداخلي.
إذا لم يتم تحديد الوسيطة size
، فسيتم إرجاع جميع البيانات الموجودة في المخزن المؤقت الداخلي.
يجب أن تكون الوسيطة size
أقل من أو تساوي 1 جيجابايت.
يجب فقط استدعاء طريقة readable.read()
على تيارات Readable
تعمل في وضع الإيقاف المؤقت. في الوضع المتدفق، يتم استدعاء readable.read()
تلقائيًا حتى يتم تفريغ المخزن المؤقت الداخلي بالكامل.
const readable = getReadableStreamSomehow()
// قد يتم تشغيل 'readable' عدة مرات مع تخزين البيانات مؤقتًا
readable.on('readable', () => {
let chunk
console.log('التيار قابل للقراءة (تم استلام بيانات جديدة في المخزن المؤقت)')
// استخدم حلقة للتأكد من أننا نقرأ جميع البيانات المتاحة حاليًا
while (null !== (chunk = readable.read())) {
console.log(`تم قراءة ${chunk.length} بايت من البيانات...`)
}
})
// سيتم تشغيل 'end' مرة واحدة عندما لا توجد بيانات أخرى متاحة
readable.on('end', () => {
console.log('وصل إلى نهاية التيار.')
})
كل استدعاء لـ readable.read()
يُرجع جزءًا من البيانات أو null
، مما يشير إلى عدم وجود المزيد من البيانات للقراءة في تلك اللحظة. لا يتم دمج هذه الأجزاء تلقائيًا. نظرًا لأن استدعاء read()
واحدًا لا يُرجع جميع البيانات، فقد يكون من الضروري استخدام حلقة while
لقراءة الأجزاء باستمرار حتى يتم استرداد جميع البيانات. عند قراءة ملف كبير، قد تُرجع .read()
قيمة null
مؤقتًا، مما يشير إلى أنها استهلكت جميع المحتويات المخزنة مؤقتًا ولكن قد يكون هناك المزيد من البيانات التي لم يتم تخزينها مؤقتًا بعد. في مثل هذه الحالات، يتم إصدار حدث 'readable'
جديد بمجرد وجود المزيد من البيانات في المخزن المؤقت، ويشير حدث 'end'
إلى نهاية نقل البيانات.
لذلك، لقراءة محتويات الملف بالكامل من readable
، من الضروري جمع الأجزاء عبر أحداث 'readable'
متعددة:
const chunks = []
readable.on('readable', () => {
let chunk
while (null !== (chunk = readable.read())) {
chunks.push(chunk)
}
})
readable.on('end', () => {
const content = chunks.join('')
})
سيرجع تيار Readable
في وضع الكائن دائمًا عنصرًا واحدًا من استدعاء لـ readable.read(size)
، بغض النظر عن قيمة الوسيطة size
.
إذا قامت طريقة readable.read()
بإرجاع جزء من البيانات، فسيتم أيضًا إصدار حدث 'data'
.
سيؤدي استدعاء stream.read([size])
بعد إصدار حدث 'end'
إلى إرجاع null
. لن يتم إثارة أي خطأ وقت التشغيل.
readable.readable
مضاف في: v11.4.0
تكون true
إذا كان من الآمن استدعاء readable.read()
، مما يعني أن الدفق لم يتم تدميره أو إصدار 'error'
أو 'end'
.
readable.readableAborted
مضاف في: v16.8.0
يُرجع ما إذا كان الدفق قد تم تدميره أو حدوث خطأ فيه قبل إصدار 'end'
.
readable.readableDidRead
مضاف في: v16.7.0، v14.18.0
يُرجع ما إذا كان 'data'
قد تم إصداره.
readable.readableEncoding
مضاف في: v12.7.0
طريقة الحصول على الخاصية encoding
لدفق Readable
معين. يمكن تعيين الخاصية encoding
باستخدام طريقة readable.setEncoding()
.
readable.readableEnded
مضاف في: v12.9.0
يصبح true
عندما يتم إصدار حدث 'end'
.
readable.errored
مضاف في: v18.0.0
يُرجع الخطأ إذا تم تدمير الدفق بخطأ.
readable.readableFlowing
مضاف في: v9.4.0
تعكس هذه الخاصية الحالة الحالية لدفق Readable
كما هو موضح في قسم الحالات الثلاث.
readable.readableHighWaterMark
مضاف في: v9.3.0
يرجع قيمة highWaterMark
التي تم تمريرها عند إنشاء هذا Readable
.
readable.readableLength
مضاف في: v9.4.0
تحتوي هذه الخاصية على عدد البايتات (أو الكائنات) في قائمة الانتظار جاهزة للقراءة. توفر القيمة بيانات استبطان فيما يتعلق بحالة highWaterMark
.
readable.readableObjectMode
مضاف في: v12.3.0
طريقة الحصول على خاصية objectMode
لتيار Readable
معين.
readable.resume()
[السجل]
الإصدار | التغييرات |
---|---|
v10.0.0 | لا يؤثر resume() إذا كان هناك حدث 'readable' يستمع. |
v0.9.4 | مضاف في: v0.9.4 |
- الإرجاع: <this>
تؤدي طريقة readable.resume()
إلى استئناف تيار Readable
الذي تم إيقافه صراحةً في إصدار أحداث 'data'
، مما يحول التيار إلى وضع التدفق.
يمكن استخدام طريقة readable.resume()
لاستهلاك البيانات بالكامل من تيار دون معالجة أي من هذه البيانات بالفعل:
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('وصل إلى النهاية، ولكن لم يقرأ أي شيء.')
})
لا تؤثر طريقة readable.resume()
إذا كان هناك مستمع حدث 'readable'
.
readable.setEncoding(encoding)
مضاف في: v0.9.4
تعيّن طريقة readable.setEncoding()
ترميز الأحرف لبيانات التي يتم قراءتها من تيار Readable
.
بشكل افتراضي، لا يتم تعيين أي ترميز وسيتم إرجاع بيانات التيار ككائنات Buffer
. يؤدي تعيين ترميز إلى إرجاع بيانات التيار كسلاسل نصية من الترميز المحدد بدلاً من كائنات Buffer
. على سبيل المثال، سيؤدي استدعاء readable.setEncoding('utf8')
إلى تفسير بيانات الإخراج كبيانات UTF-8، ومرورها كسلاسل نصية. سيؤدي استدعاء readable.setEncoding('hex')
إلى ترميز البيانات بتنسيق سلسلة سداسية عشرية.
سيتعامل تيار Readable
بشكل صحيح مع الأحرف متعددة البايت التي يتم تسليمها من خلال التيار والتي ستكون مشفرة بشكل غير صحيح بخلاف ذلك إذا تم سحبها ببساطة من التيار ككائنات Buffer
.
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
assert.equal(typeof chunk, 'string')
console.log('حصلت على %d حرف من بيانات السلسلة النصية:', chunk.length)
})
readable.unpipe([destination])
مضاف في: v0.9.4
destination
<stream.Writable> تدفق محدد اختياري لإلغاء الأنبوب- الإرجاع: <this>
طريقة readable.unpipe()
تفصل تدفق Writable
تم إرفاقه مسبقًا باستخدام طريقة stream.pipe()
.
إذا لم يتم تحديد destination
، فسيتم فصل جميع الأنابيب.
إذا تم تحديد destination
، ولكن لم يتم إعداد أي أنبوب له، فإن الطريقة لا تفعل شيئًا.
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// جميع البيانات من readable تذهب إلى 'file.txt'،
// ولكن فقط للثانية الأولى.
readable.pipe(writable)
setTimeout(() => {
console.log('إيقاف الكتابة في file.txt.')
readable.unpipe(writable)
console.log('إغلاق تدفق الملف يدويًا.')
writable.end()
}, 1000)
readable.unshift(chunk[, encoding])
[السجل]
الإصدار | التغييرات |
---|---|
v22.0.0, v20.13.0 | يمكن أن تكون وسيطة chunk الآن مثيلًا لـ TypedArray أو DataView . |
v8.0.0 | يمكن أن تكون وسيطة chunk الآن مثيلًا لـ Uint8Array . |
v0.9.11 | مضاف في: v0.9.11 |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> جزء من البيانات لوضعه في بداية قائمة الانتظار للقراءة. بالنسبة للتيارات التي لا تعمل في وضع الكائن، يجب أن يكونchunk
<string>، <Buffer>، <TypedArray>، <DataView> أوnull
. بالنسبة لتيارات وضع الكائن، قد يكونchunk
أي قيمة جافا سكريبت.encoding
<string> ترميز أجزاء السلسلة. يجب أن يكون ترميزًا صالحًا لـBuffer
، مثل'utf8'
أو'ascii'
.
إن تمرير chunk
كـ null
يشير إلى نهاية التدفق (EOF) ويتصرف بنفس طريقة readable.push(null)
، وبعد ذلك لا يمكن كتابة المزيد من البيانات. يتم وضع إشارة EOF في نهاية المخزن المؤقت وستظل جميع البيانات المخزنة مؤقتًا يتم تفريغها.
تضع طريقة readable.unshift()
جزءًا من البيانات مرة أخرى في المخزن المؤقت الداخلي. هذا مفيد في بعض المواقف حيث يتم استهلاك التدفق بواسطة التعليمات البرمجية التي تحتاج إلى "إلغاء استهلاك" بعض كمية البيانات التي سحبتها بشكل متفائل من المصدر، بحيث يمكن تمرير البيانات إلى طرف آخر.
لا يمكن استدعاء طريقة stream.unshift(chunk)
بعد إصدار حدث 'end'
وإلا سيتم طرح خطأ وقت التشغيل.
يجب على المطورين الذين يستخدمون stream.unshift()
غالبًا أن يفكروا في التبديل إلى استخدام تدفق Transform
بدلاً من ذلك. راجع قسم واجهة برمجة التطبيقات للمطورين الذين ينفذون التيارات لمزيد من المعلومات.
// سحب رأس محدد بواسطة \n\n.
// استخدام unshift() إذا حصلنا على الكثير.
// استدعاء callback مع (error, header, stream).
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
stream.on('error', callback)
stream.on('readable', onReadable)
const decoder = new StringDecoder('utf8')
let header = ''
function onReadable() {
let chunk
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk)
if (str.includes('\n\n')) {
// تم العثور على حد الرأس.
const split = str.split(/\n\n/)
header += split.shift()
const remaining = split.join('\n\n')
const buf = Buffer.from(remaining, 'utf8')
stream.removeListener('error', callback)
// إزالة مُستمع 'readable' قبل unshifting.
stream.removeListener('readable', onReadable)
if (buf.length) stream.unshift(buf)
// الآن يمكن قراءة جسم الرسالة من التدفق.
callback(null, header, stream)
return
}
// لا يزال قراءة الرأس.
header += str
}
}
}
على عكس stream.push(chunk)
، فإن stream.unshift(chunk)
لن ينهي عملية القراءة عن طريق إعادة تعيين حالة القراءة الداخلية للتدفق. هذا قد يتسبب في نتائج غير متوقعة إذا تم استدعاء readable.unshift()
أثناء القراءة (أي من داخل تنفيذ stream._read()
على تدفق مخصص). سيعيد استدعاء readable.unshift()
مع stream.push('')
الفوري إعادة تعيين حالة القراءة بشكل مناسب، ومع ذلك من الأفضل تجنب استدعاء readable.unshift()
أثناء عملية إجراء القراءة.
readable.wrap(stream)
تم الإضافة في: v0.9.4
قبل Node.js 0.10، لم تُنفذ الدفقات واجهة برمجة تطبيقات وحدة node:stream
بالكامل كما هو محدد حاليًا. (راجع التوافق لمزيد من المعلومات.)
عند استخدام مكتبة Node.js أقدم تُصدر أحداث 'data'
ولديها طريقة stream.pause()
استشارية فقط، يمكن استخدام طريقة readable.wrap()
لإنشاء دفق Readable
يستخدم الدفق القديم كمصدر بيانات له.
نادراً ما يكون من الضروري استخدام readable.wrap()
، ولكن تم توفير هذه الطريقة لتسهيل التفاعل مع تطبيقات ومكتبات Node.js الأقدم.
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)
myReader.on('readable', () => {
myReader.read() // إلخ.
})
readable[Symbol.asyncIterator]()
[السجل]
الإصدار | التغييرات |
---|---|
v11.14.0 | لم يعد دعم Symbol.asyncIterator تجريبيًا. |
v10.0.0 | تم الإضافة في: v10.0.0 |
- الإرجاع: <AsyncIterator> لاستهلاك الدفق بالكامل.
const fs = require('node:fs')
async function print(readable) {
readable.setEncoding('utf8')
let data = ''
for await (const chunk of readable) {
data += chunk
}
console.log(data)
}
print(fs.createReadStream('file')).catch(console.error)
إذا انتهت الحلقة باستخدام break
أو return
أو throw
، فسيتم تدمير الدفق. بعبارة أخرى، فإن التكرار فوق دفق سيستهلك الدفق بالكامل. سيتم قراءة الدفق على شكل أجزاء بحجم يساوي خيار highWaterMark
. في مثال التعليمات البرمجية أعلاه، ستكون البيانات في جزء واحد إذا كان الملف يحتوي على بيانات أقل من 64 كيلوبايت لأن خيار highWaterMark
غير مُقدم إلى fs.createReadStream()
.
readable[Symbol.asyncDispose]()
أضيف في: v20.4.0، v18.18.0
يطلق readable.destroy()
باستخدام AbortError
ويعيد وعدًا يتم تحقيقه عند انتهاء التدفق.
readable.compose(stream[, options])
أضيف في: v19.1.0، v18.13.0
stream
<Stream> | <Iterable> | <AsyncIterable> | <Function>options
<Object>signal
<AbortSignal> يسمح بتدمير التدفق إذا تم إلغاء الإشارة.
الإرجاع: <Duplex> تدفق مركب مع التدفق
stream
.
import { Readable } from 'node:stream'
async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ')
for (const word of words) {
yield word
}
}
}
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()
console.log(words) // يطبع ['this', 'is', 'compose', 'as', 'operator']
انظر إلى stream.compose
لمزيد من المعلومات.
readable.iterator([options])
أضيف في: v16.3.0
options
<Object>destroyOnReturn
<boolean> عند تعيينه علىfalse
، فإن استدعاءreturn
على مُكرر غير متزامن، أو الخروج من تكرارfor await...of
باستخدامbreak
،return
، أوthrow
لن يدمر التدفق. افتراضيًا:true
.
الإرجاع: <AsyncIterator> لاستهلاك التدفق.
يمنح المُكرر الذي تم إنشاؤه بواسطة هذه الطريقة المستخدمين خيار إلغاء تدمير التدفق إذا تم الخروج من حلقة for await...of
بواسطة return
، break
، أو throw
، أو إذا كان يجب على المُكرر تدمير التدفق إذا أصدر التدفق خطأً أثناء التكرار.
const { Readable } = require('node:stream')
async function printIterator(readable) {
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk) // 1
break
}
console.log(readable.destroyed) // false
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk) // سيطبع 2 ثم 3
}
console.log(readable.destroyed) // true، تم استهلاك التدفق بالكامل
}
async function printSymbolAsyncIterator(readable) {
for await (const chunk of readable) {
console.log(chunk) // 1
break
}
console.log(readable.destroyed) // true
}
async function showBoth() {
await printIterator(Readable.from([1, 2, 3]))
await printSymbolAsyncIterator(Readable.from([1, 2, 3]))
}
showBoth()
readable.map(fn[, options])
[History]
الإصدار | التغييرات |
---|---|
v20.7.0, v18.19.0 | تمت إضافة highWaterMark في الخيارات. |
v17.4.0, v16.14.0 | تمت الإضافة في: v17.4.0, v16.14.0 |
[Stable: 1 - Experimental]
Stable: 1 Stability: 1 - تجريبي
fn
<Function> | <AsyncFunction> دالة لإنشاء خريطة لكل جزء في الدفق.data
<any> جزء من البيانات من الدفق.options
<Object>signal
<AbortSignal> يتم إيقافه إذا تم تدمير الدفق مما يسمح بإلغاء استدعاءfn
مبكرًا.
options
<Object>concurrency
<number> الحد الأقصى لاستدعاءاتfn
المتزامنة التي سيتم استدعاؤها على الدفق في وقت واحد. الافتراضي:1
.highWaterMark
<number> عدد العناصر التي سيتم تخزينها مؤقتًا أثناء انتظار استهلاك المستخدم للعناصر المُنشأة خريطة لها. الافتراضي:concurrency * 2 - 1
.signal
<AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
الإرجاع: <Readable> دفق تم إنشاء خريطة له باستخدام الدالة
fn
.
تسمح هذه الطريقة بإنشاء خريطة للدفق. سيتم استدعاء دالة fn
لكل جزء في الدفق. إذا قامت دالة fn
بإرجاع وعود - فسيتم انتظار هذا الوعد قبل تمريره إلى دفق النتيجة.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// مع مُنشئ خريطة متزامن.
for await (const chunk of Readable.from([1, 2, 3, 4]).map(x => x * 2)) {
console.log(chunk) // 2, 4, 6, 8
}
// مع مُنشئ خريطة غير متزامن، مع إجراء ما يصل إلى 2 استعلام في وقت واحد.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
domain => resolver.resolve4(domain),
{ concurrency: 2 }
)
for await (const result of dnsResults) {
console.log(result) // يقوم بتسجيل نتيجة DNS لـ resolver.resolve4.
}
readable.filter(fn[, options])
[السجل]
الإصدار | التغييرات |
---|---|
v20.7.0, v18.19.0 | تمت إضافة highWaterMark في الخيارات. |
v17.4.0, v16.14.0 | تمت الإضافة في: v17.4.0, v16.14.0 |
fn
<دالة> | <دالة غير متزامنة> دالة لتصفية أجزاء من الدفق.data
<أي> جزء من البيانات من الدفق.options
<كائن>signal
<إشارة إلغاء> يتم إلغاؤها إذا تم تدمير الدفق مما يسمح بإلغاء دعوةfn
مبكرًا.
options
<كائن>concurrency
<رقم> الحد الأقصى للاستدعاء المتزامن لـfn
للاتصال بالدفق مرة واحدة. افتراضيًا:1
.highWaterMark
<رقم> عدد العناصر التي سيتم تخزينها مؤقتًا أثناء انتظار استهلاك المستخدم للعناصر المفلترة. افتراضيًا:concurrency * 2 - 1
.signal
<إشارة إلغاء> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
الإرجاع: <قابل للقراءة> دفق تم تصفيته باستخدام المُتَنبِئ
fn
.
تسمح هذه الطريقة بتصفية الدفق. بالنسبة لكل جزء في الدفق، سيتم استدعاء دالة fn
وإذا أعادت قيمة صحيحة، فسيتم تمرير الجزء إلى دفق النتيجة. إذا أعادت دالة fn
وعدًا - فسيتم انتظار هذا الوعد.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// مع مُتَنبِئ مُزامِن.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
console.log(chunk) // 3, 4
}
// مع مُتَنبِئ غير مُزامِن، مع إجراء ما يصل إلى 2 استعلام في وقت واحد.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).filter(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address.ttl > 60
},
{ concurrency: 2 }
)
for await (const result of dnsResults) {
// يسجل النطاقات التي تحتوي على أكثر من 60 ثانية على سجل dns المُحلل.
console.log(result)
}
readable.forEach(fn[, options])
تم الإضافة في: v17.5.0، v16.15.0
[مستقر: 1 - تجريبي]
مستقر: 1 استقرار: 1 - تجريبي
fn
<Function> | <AsyncFunction> دالة تُستدعى على كل جزء من أجزاء الدفق.data
<any> جزء من البيانات من الدفق.options
<Object>signal
<AbortSignal> يُلغى إذا تم تدمير الدفق مما يسمح بإلغاء دعوةfn
مبكرًا.
options
<Object>concurrency
<number> الحد الأقصى للاستدعاء المتزامن لـfn
الذي يجب استدعاءه على الدفق في وقت واحد. افتراضي:1
.signal
<AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
الإرجاع: <Promise> وعدٌ لمتى ينتهي الدفق.
تسمح هذه الطريقة بتكرار دفق. بالنسبة لكل جزء في الدفق، سيتم استدعاء دالة fn
. إذا قامت دالة fn
بإرجاع وعد، فسيتم انتظار هذا الوعد.
تختلف هذه الطريقة عن حلقات for await...of
في أنها يمكن أن تعالج الأجزاء بشكل متزامن اختيارياً. بالإضافة إلى ذلك، لا يمكن إيقاف تكرار forEach
إلا عن طريق تمرير خيار signal
وإلغاء AbortController
ذات الصلة بينما يمكن إيقاف for await...of
باستخدام break
أو return
. في كلتا الحالتين، سيتم تدمير الدفق.
تختلف هذه الطريقة عن الاستماع إلى حدث 'data'
في أنها تستخدم حدث readable
في الآلية الأساسية ويمكن أن تحد من عدد مكالمات fn
المتزامنة.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// مع مُبْدِئ متزامن.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
console.log(chunk) // 3, 4
}
// مع مُبْدِئ غير متزامن، مع إجراء ما يصل إلى 2 استعلام في وقت واحد.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address
},
{ concurrency: 2 }
)
await dnsResults.forEach(result => {
// يسجل النتيجة، مشابه لـ `for await (const result of dnsResults)`
console.log(result)
})
console.log('done') // انتهى الدفق
readable.toArray([options])
مُضاف في: v17.5.0، v16.15.0
options
<Object>signal
<AbortSignal> يسمح بإلغاء عملية toArray إذا تم إلغاء الإشارة.
الإرجاع: <Promise> وعد يحتوي على مصفوفة تحتوي على محتويات الدفق.
تتيح هذه الطريقة الحصول بسهولة على محتويات الدفق.
نظرًا لأن هذه الطريقة تقرأ الدفق بأكمله في الذاكرة، فإنها تلغي فوائد الدفق. وهي مخصصة للتوافق والراحة، وليس كطريقة أساسية لاستهلاك الدفق.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
await Readable.from([1, 2, 3, 4]).toArray() // [1, 2, 3, 4]
// إجراء استعلامات dns بشكل متزامن باستخدام .map وجمع
// النتائج في مصفوفة باستخدام toArray
const dnsResults = await Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org'])
.map(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address
},
{ concurrency: 2 }
)
.toArray()
readable.some(fn[, options])
مُضاف في: v17.5.0، v16.15.0
fn
<Function> | <AsyncFunction> دالة يتم استدعاؤها على كل جزء من أجزاء الدفق.data
<any> جزء من البيانات من الدفق.options
<Object>signal
<AbortSignal> يتم إجهاضه إذا تم تدمير الدفق مما يسمح بإجهاض استدعاءfn
مبكرًا.
options
<Object>concurrency
<number> الحد الأقصى للاستدعاء المتزامن لـfn
للاتصال بالدفق مرة واحدة. افتراضي:1
.signal
<AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
الإرجاع: <Promise> وعد يُقيّم إلى
true
إذا قامتfn
بإرجاع قيمة صحيحة على الأقل لإحدى الأجزاء.
تشبه هذه الطريقة Array.prototype.some
وتقوم باستدعاء fn
على كل جزء في الدفق حتى تصبح قيمة الإرجاع المنتظرة true
(أو أي قيمة صحيحة). بمجرد أن تصبح قيمة الإرجاع المنتظرة لاستدعاء fn
على جزء ما قيمة صحيحة، يتم تدمير الدفق ويتم تحقيق الوعد بـ true
. إذا لم تُرجع أي من استدعاءات fn
على الأجزاء قيمة صحيحة، فسيتم تحقيق الوعد بـ false
.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// مع مُنبئ متزامن.
await Readable.from([1, 2, 3, 4]).some(x => x > 2) // true
await Readable.from([1, 2, 3, 4]).some(x => x < 0) // false
// مع مُنبئ غير متزامن، مع إجراء ما يصل إلى فحصين للملف في وقت واحد.
const anyBigFile = await Readable.from(['file1', 'file2', 'file3']).some(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
console.log(anyBigFile) // `true` إذا كان أي ملف في القائمة أكبر من 1 ميجابايت
console.log('done') // انتهى الدفق
readable.find(fn[, options])
مضاف في: v17.5.0، v16.17.0
fn
<Function> | <AsyncFunction> دالة تُستدعى على كل جزء من أجزاء الدفق.data
<any> جزء من البيانات من الدفق.options
<Object>signal
<AbortSignal> يتم إلغاؤه إذا تم تدمير الدفق مما يسمح بإلغاء استدعاءfn
مبكرًا.
options
<Object>concurrency
<number> الحد الأقصى لاستدعاءاتfn
المتزامنة التي يجب استدعاؤها على الدفق في وقت واحد. افتراضيًا:1
.signal
<AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
قيمة الإرجاع: <Promise> وعد يُقيّم أول جزء تكون قيمة
fn
الخاصة به صحيحة، أوundefined
إذا لم يتم العثور على أي عنصر.
هذه الطريقة مشابهة لـ Array.prototype.find
وتستدعي fn
على كل جزء في الدفق للعثور على جزء تكون قيمته صحيحة لـ fn
. بمجرد أن تكون قيمة الإرجاع المنتظرة لاستدعاء fn
صحيحة، يتم تدمير الدفق ويتم تحقيق الوعد بقيمة fn
التي أعادت قيمة صحيحة. إذا أعادت جميع استدعاءات fn
على الأجزاء قيمة خاطئة، يتم تحقيق الوعد بـ undefined
.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// مع مُسند مُزامن.
await Readable.from([1, 2, 3, 4]).find(x => x > 2) // 3
await Readable.from([1, 2, 3, 4]).find(x => x > 0) // 1
await Readable.from([1, 2, 3, 4]).find(x => x > 10) // undefined
// مع مُسند غير مُزامن، مع إجراء أقصى عدد 2 من عمليات فحص الملفات في وقت واحد.
const foundBigFile = await Readable.from(['file1', 'file2', 'file3']).find(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
console.log(foundBigFile) // اسم الملف الكبير، إذا كان هناك أي ملف في القائمة أكبر من 1 ميغابايت
console.log('done') // انتهى الدفق
readable.every(fn[, options])
تم الإضافة في: v17.5.0، v16.15.0
[مستقر: 1 - تجريبي]
مستقر: 1 استقرار: 1 - تجريبي
fn
<Function> | <AsyncFunction> دالة تُستدعى على كل جزء من أجزاء الدفق.data
<any> جزء من البيانات من الدفق.options
<Object>signal
<AbortSignal> يتم إيقافه إذا تم تدمير الدفق مما يسمح بإلغاء استدعاءfn
مبكرًا.
options
<Object>concurrency
<number> الحد الأقصى لاستدعاءاتfn
المتزامنة التي سيتم استدعاؤها على الدفق في وقت واحد. الافتراضي:1
.signal
<AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
الإرجاع: <Promise> وعد يُقيّم إلى
true
إذا قامتfn
بإرجاع قيمة صحيحة لكل أجزاء الدفق.
تشبه هذه الطريقة Array.prototype.every
وتستدعي fn
على كل جزء في الدفق للتحقق مما إذا كانت جميع قيم الإرجاع المنتظرة قيمًا صحيحة لـ fn
. بمجرد أن تكون قيمة الإرجاع المنتظرة لاستدعاء fn
على جزء ما قيمة خاطئة، يتم تدمير الدفق ويتم تحقيق الوعد بقيمة false
. إذا قامت جميع استدعاءات fn
على الأجزاء بإرجاع قيمة صحيحة، فسيتم تحقيق الوعد بقيمة true
.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// مع مُتَبَدِّد متزامن.
await Readable.from([1, 2, 3, 4]).every(x => x > 2) // false
await Readable.from([1, 2, 3, 4]).every(x => x > 0) // true
// مع مُتَبَدِّد غير متزامن، مع إجراء أقصى عدد 2 من عمليات التحقق من الملفات في وقت واحد.
const allBigFiles = await Readable.from(['file1', 'file2', 'file3']).every(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
// `true` إذا كانت جميع الملفات في القائمة أكبر من 1 ميجابايت
console.log(allBigFiles)
console.log('done') // انتهى الدفق
readable.flatMap(fn[, options])
مضاف في: v17.5.0، v16.15.0
[مستقر: 1 - تجريبي]
مستقر: 1 استقرار: 1 - تجريبي
fn
<Function> | <AsyncGeneratorFunction> | <AsyncFunction> دالة لإنشاء خريطة لكل جزء في الدفق.data
<any> جزء من البيانات من الدفق.options
<Object>signal
<AbortSignal> يتم إيقافه إذا تم تدمير الدفق مما يسمح بإلغاء دعوةfn
مبكرًا.
options
<Object>concurrency
<number> الحد الأقصى للدعوات المتزامنة لـfn
للاتصال بالدفق في وقت واحد. افتراضي:1
.signal
<AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
الإرجاع: <Readable> دفق تم إنشاء خريطة مسطحة له باستخدام الدالة
fn
.
ترجع هذه الطريقة دفقًا جديدًا من خلال تطبيق مُدعَى الاستدعاء المُعطى على كل جزء من الدفق، ثم يتم تسطيح النتيجة.
من الممكن إرجاع دفق أو عنصر قابل للتكرار أو عنصر قابل للتكرار غير متزامن آخر من fn
وسيتم دمج تيارات النتائج (تسطيحها) في الدفق المُرجع.
import { Readable } from 'node:stream'
import { createReadStream } from 'node:fs'
// مع مُنشئ خريطة متزامن.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap(x => [x, x])) {
console.log(chunk) // 1, 1, 2, 2, 3, 3, 4, 4
}
// مع مُنشئ خريطة غير متزامن، دمج محتويات 4 ملفات
const concatResult = Readable.from(['./1.mjs', './2.mjs', './3.mjs', './4.mjs']).flatMap(fileName =>
createReadStream(fileName)
)
for await (const result of concatResult) {
// هذا سيحتوي على المحتويات (جميع الأجزاء) لجميع الملفات الأربعة
console.log(result)
}
readable.drop(limit[, options])
مضاف في: v17.5.0، v16.15.0
[مستقر: 1 - تجريبي]
مستقر: 1 استقرار: 1 - تجريبي
limit
<number> عدد الأجزاء التي سيتم حذفها من القابل للقراءة.options
<Object>signal
<AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
الإرجاع: <Readable> دفق تم حذف
limit
من أجزائه.
ترجع هذه الطريقة دفقًا جديدًا مع حذف أول limit
جزء.
import { Readable } from 'node:stream'
await Readable.from([1, 2, 3, 4]).drop(2).toArray() // [3, 4]
readable.take(limit[, options])
مضاف في: v17.5.0، v16.15.0
[مستقر: 1 - تجريبي]
مستقر: 1 استقرار: 1 - تجريبي
limit
<number> عدد الأجزاء التي سيتم أخذها من القابل للقراءة.options
<Object>signal
<AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
الإرجاع: <Readable> دفق تم أخذ
limit
من أجزائه.
ترجع هذه الطريقة دفقًا جديدًا مع أول limit
جزء.
import { Readable } from 'node:stream'
await Readable.from([1, 2, 3, 4]).take(2).toArray() // [1, 2]
readable.reduce(fn[, initial[, options]])
مضاف في: v17.5.0، v16.15.0
[مستقر: 1 - تجريبي]
مستقر: 1 استقرار: 1 - تجريبي
fn
<Function> | <AsyncFunction> دالة اختزال يتم استدعاؤها على كل جزء في الدفق.previous
<any> القيمة التي تم الحصول عليها من آخر استدعاء لـfn
أو قيمةinitial
إذا تم تحديدها أو أول جزء من الدفق بخلاف ذلك.data
<any> جزء من البيانات من الدفق.options
<Object>signal
<AbortSignal> يتم إلغاؤه إذا تم تدمير الدفق مما يسمح بإلغاء استدعاءfn
مبكرًا.
initial
<any> القيمة الأولية التي سيتم استخدامها في الاختزال.options
<Object>signal
<AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
الإرجاع: <Promise> وعد للقيمة النهائية للاختزال.
تستدعي هذه الطريقة fn
على كل جزء من الدفق بالترتيب، وتمرر له النتيجة من الحساب على العنصر السابق. ترجع وعدًا للقيمة النهائية للاختزال.
إذا لم يتم تقديم قيمة initial
، فسيتم استخدام أول جزء من الدفق كقيمة أولية. إذا كان الدفق فارغًا، يتم رفض الوعد مع TypeError
مع خاصية رمز ERR_INVALID_ARGS
.
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'
const directoryPath = './src'
const filesInDir = await readdir(directoryPath)
const folderSize = await Readable.from(filesInDir).reduce(async (totalSize, file) => {
const { size } = await stat(join(directoryPath, file))
return totalSize + size
}, 0)
console.log(folderSize)
تكرر دالة الاختزال عنصر الدفق عنصرًا تلو الآخر، مما يعني أنه لا توجد معلمة concurrency
أو توازي. لإجراء اختزال متزامن، يمكنك استخراج الدالة غير المتزامنة إلى طريقة readable.map
.
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'
const directoryPath = './src'
const filesInDir = await readdir(directoryPath)
const folderSize = await Readable.from(filesInDir)
.map(file => stat(join(directoryPath, file)), { concurrency: 2 })
.reduce((totalSize, { size }) => totalSize + size, 0)
console.log(folderSize)
تدفقات دوبلكس والتحويل
الصنف: stream.Duplex
[السجل]
الإصدار | التغييرات |
---|---|
v6.8.0 | الآن، تقوم مثيلات Duplex بإرجاع true عند التحقق من instanceof stream.Writable . |
v0.9.4 | تمت الإضافة في: v0.9.4 |
تدفقات دوبلكس هي تدفقات تُنفذ واجهتي Readable
و Writable
.
أمثلة على تدفقات Duplex
:
duplex.allowHalfOpen
تمت الإضافة في: v0.9.4
إذا كانت false
، فسوف ينهي التدفق تلقائيًا الجانب القابل للكتابة عندما ينتهي الجانب القابل للقراءة. يتم تعيينها في البداية بواسطة خيار مُنشئ allowHalfOpen
، والذي يكون افتراضياً true
.
يمكن تغيير ذلك يدويًا لتغيير سلوك نصف الفتحة لمثيل تدفق Duplex
موجود، ولكن يجب تغييره قبل إصدار حدث 'end'
.
الصنف: stream.Transform
تمت الإضافة في: v0.9.4
تدفقات التحويل هي تدفقات Duplex
حيث تكون المخرجات مرتبطة بطريقة ما بالمدخلات. مثل جميع تدفقات Duplex
، تُنفذ تدفقات Transform
كل من واجهتي Readable
و Writable
.
أمثلة على تدفقات Transform
:
transform.destroy([error])
[السجل]
الإصدار | التغييرات |
---|---|
v14.0.0 | تعمل كعملية لا شيء على تدفق تم تدميره بالفعل. |
v8.0.0 | تمت الإضافة في: v8.0.0 |
تدمير التدفق، واختياريًا إصدار حدث 'error'
. بعد هذه المكالمة، سيُطلق تدفق التحويل أي موارد داخلية. يجب على المُنفذين عدم تجاوز هذه الطريقة، ولكن بدلاً من ذلك تنفيذ readable._destroy()
. يقوم التنفيذ الافتراضي لـ _destroy()
لـ Transform
أيضًا بإصدار 'close'
ما لم يتم تعيين emitClose
على false.
بمجرد استدعاء destroy()
، ستكون أي مكالمات إضافية عملية لا شيء ولن يتم إصدار أية أخطاء أخرى باستثناء تلك من _destroy()
كـ 'error'
.
stream.duplexPair([options])
مضاف في: v22.6.0، v20.17.0
options
<Object> قيمة لإرسالها إلى كل من مُنشئيDuplex
، لتعيين خيارات مثل التخزين المؤقت.- قيمة مُعادة: <Array> من مثيلين من
Duplex
.
دالة الأداة المساعدة duplexPair
تُعيد مصفوفة تحتوي على عنصرين، كل منهما عبارة عن دفق Duplex
متصل بالجانب الآخر:
const [sideA, sideB] = duplexPair()
أي شيء يُكتب في دفق واحد يصبح قابلاً للقراءة في الآخر. فهي توفر سلوكًا مشابهًا لاتصال الشبكة، حيث تصبح البيانات التي يكتبها العميل قابلة للقراءة من قبل الخادم، والعكس صحيح.
تدفقات Duplex متناظرة؛ يمكن استخدام أحدهما أو الآخر دون أي فرق في السلوك.
stream.finished(stream[, options], callback)
[السجل]
الإصدار | التغييرات |
---|---|
v19.5.0 | تمت إضافة دعم لـ ReadableStream و WritableStream . |
v15.11.0 | تمت إضافة خيار signal . |
v14.0.0 | ستنتظر الدالة finished(stream, cb) حدث 'close' قبل استدعاء دالة الاستدعاء. تحاول التنفيذ اكتشاف التدفقات القديمة وتطبيق هذا السلوك فقط على التدفقات التي يُتوقع منها إصدار 'close' . |
v14.0.0 | سيؤدي إصدار 'close' قبل 'end' في دفق Readable إلى حدوث خطأ ERR_STREAM_PREMATURE_CLOSE . |
v14.0.0 | سيتم استدعاء دالة الاستدعاء على التدفقات التي انتهت بالفعل قبل الاتصال بـ finished(stream, cb) . |
v10.0.0 | تمت الإضافة في: v10.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> دفق/دفق ويب قابل للقراءة و/أو للكتابة.options
<Object>error
<boolean> إذا تم تعيينه علىfalse
، فلن يُعامل الاتصال بـemit('error', err)
على أنه مكتمل. الافتراضي:true
.readable
<boolean> عند تعيينه علىfalse
، سيتم استدعاء دالة الاستدعاء عند انتهاء الدفق حتى لو كان الدفق لا يزال قابلاً للقراءة. الافتراضي:true
.writable
<boolean> عند تعيينه علىfalse
، سيتم استدعاء دالة الاستدعاء عند انتهاء الدفق حتى لو كان الدفق لا يزال قابلاً للكتابة. الافتراضي:true
.signal
<AbortSignal> يسمح بإلغاء انتظار انتهاء الدفق. لن يتم إلغاء الدفق الأساسي إذا تم إلغاء الإشارة. سيتم استدعاء دالة الاستدعاء معAbortError
. سيتم أيضًا إزالة جميع المستمعين المسجلين المضافين بواسطة هذه الدالة.
callback
<Function> دالة استدعاء تأخذ وسيطة خطأ اختيارية.قيمة مُعادة: <Function> دالة تنظيف تقوم بإزالة جميع المستمعين المسجلين.
دالة للحصول على إشعار عندما لا يكون الدفق قابلًا للقراءة أو الكتابة أو واجه خطأ أو حدث إغلاق مبكر.
const { finished } = require('node:stream')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
finished(rs, err => {
if (err) {
console.error('Stream failed.', err)
} else {
console.log('Stream is done reading.')
}
})
rs.resume() // تصريف الدفق.
مفيدة بشكل خاص في سيناريوهات معالجة الأخطاء حيث يتم تدمير دفق مبكرًا (مثل طلب HTTP تم إلغاؤه)، ولن يُصدر 'end'
أو 'finish'
.
يوفر واجهة برمجة التطبيقات finished
نسخة واعدة.
تترك stream.finished()
مستمعي أحداث متدلية (خاصةً 'error'
, 'end'
, 'finish'
و 'close'
) بعد استدعاء callback
. والسبب في ذلك هو أن أحداث 'error'
غير المتوقعة (بسبب تنفيذ تدفق غير صحيح) لا تتسبب في تعطل غير متوقع. إذا كان هذا سلوكًا غير مرغوب فيه، فيجب استدعاء دالة التنظيف المُعادة في دالة الاستدعاء:
const cleanup = finished(rs, err => {
cleanup()
// ...
})
stream.pipeline(source[, ...transforms], destination, callback)
stream.pipeline(streams, callback)
[History]
الإصدار | التغييرات |
---|---|
v19.7.0, v18.16.0 | تمت إضافة دعم لـ webstreams. |
v18.0.0 | يؤدي تمرير مُنادٍ غير صالح إلى وسيطة callback الآن إلى طرح ERR_INVALID_ARG_TYPE بدلاً من ERR_INVALID_CALLBACK . |
v14.0.0 | ستنتظر pipeline(..., cb) حدث 'close' قبل استدعاء المُنادى. تحاول عملية التنفيذ اكتشاف التدفقات القديمة وتطبيق هذا السلوك فقط على التدفقات التي يُتوقع أن تُصدر 'close' . |
v13.10.0 | إضافة دعم للمولدات غير المتزامنة. |
v10.0.0 | تمت الإضافة في: v10.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function> | <ReadableStream>- الإرجاع: <Iterable> | <AsyncIterable>
...transforms
<Stream> | <Function> | <TransformStream>source
<AsyncIterable>- الإرجاع: <AsyncIterable>
destination
<Stream> | <Function> | <WritableStream>source
<AsyncIterable>- الإرجاع: <AsyncIterable> | <Promise>
callback
<Function> يتم استدعاءه عندما يكتمل خط الأنابيب بالكامل.err
<Error>val
القيمة المُحللة لـPromise
التي يُرجعهاdestination
.
الإرجاع: <Stream>
طريقة وحدة نمطية للربط بين التدفقات والمولدات التي تُحول الأخطاء وتُنظف بشكل صحيح وتوفر مُنادٍ عند اكتمال خط الأنابيب.
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// استخدام واجهة برمجة التطبيقات pipeline لتمرير سلسلة من التدفقات بسهولة
// معًا والحصول على إشعار عند اكتمال خط الأنابيب بالكامل.
// خط أنابيب لضغط ملف tar ضخم محتمل بكفاءة:
pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
if (err) {
console.error('Pipeline failed.', err)
} else {
console.log('Pipeline succeeded.')
}
})
يوفر واجهة برمجة التطبيقات pipeline
نسخة واعدة.
ستقوم stream.pipeline()
باستدعاء stream.destroy(err)
على جميع التدفقات باستثناء:
- التدفقات
Readable
التي أصدرت'end'
أو'close'
. - التدفقات
Writable
التي أصدرت'finish'
أو'close'
.
تترك stream.pipeline()
مستمعي أحداث مُتدلية على التدفقات بعد استدعاء callback
. في حالة إعادة استخدام التدفقات بعد الفشل، قد يؤدي هذا إلى تسرب مستمعي الأحداث وأخطاء مُبتلعة. إذا كان آخر تدفق قابلاً للقراءة، فسيتم إزالة مستمعي الأحداث المُتدلية بحيث يمكن استخدام آخر تدفق لاحقًا.
تقوم stream.pipeline()
بإغلاق جميع التدفقات عند طرح خطأ. قد يؤدي استخدام IncomingRequest
مع pipeline
إلى سلوك غير متوقع بمجرد تدمير المقبس دون إرسال الاستجابة المتوقعة. انظر المثال أدناه:
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt')
pipeline(fileStream, res, err => {
if (err) {
console.log(err) // No such file
// لا يمكن إرسال هذه الرسالة بمجرد تدمير `pipeline` للمقبس بالفعل
return res.end('error!!!')
}
})
})
stream.compose(...streams)
[السجل]
الإصدار | التغييرات |
---|---|
v21.1.0، v20.10.0 | تمت إضافة دعم لفئة الدفق. |
v19.8.0، v18.16.0 | تمت إضافة دعم لـ webstreams. |
v16.9.0 | تمت الإضافة في: v16.9.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]> | <Duplex[]> | <Function>- المُرجَع: <stream.Duplex>
يقوم بدمج تيارين أو أكثر في تيار Duplex
يقوم بالكتابة إلى الدفق الأول والقراءة من الدفق الأخير. يتم توصيل كل دفق مُقدم إلى الدفق التالي، باستخدام stream.pipeline
. إذا حدث خطأ في أي من الدفقات، فسيتم تدمير جميعها، بما في ذلك دفق Duplex
الخارجي.
بما أن stream.compose
يُرجع تيارًا جديدًا يمكن (ويجب) توصيله بدوره في تيارات أخرى، فإنه يُمكّن من التركيب. على النقيض من ذلك، عند تمرير التيارات إلى stream.pipeline
، يكون التيار الأول عادةً تيارًا قابلاً للقراءة والأخير تيارًا قابلًا للكتابة، مما يُشكل دائرة مغلقة.
إذا تم تمرير Function
، فيجب أن تكون طريقة مصنع تأخذ source
Iterable
.
import { compose, Transform } from 'node:stream'
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''))
},
})
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
}
let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf
}
console.log(res) // يطبع 'HELLOWORLD'
يمكن استخدام stream.compose
لتحويل الكائنات المتكررة غير المتزامنة، والمنشئات، والوظائف إلى تيارات.
AsyncIterable
يتم تحويلها إلىDuplex
قابل للقراءة. لا يمكن إنتاجnull
.AsyncGeneratorFunction
يتم تحويلها إلىDuplex
تحويل قابل للقراءة/الكتابة. يجب أن تأخذAsyncIterable
المصدر كمعامل أول. لا يمكن إنتاجnull
.AsyncFunction
يتم تحويلها إلىDuplex
قابل للكتابة. يجب أن تُرجع إماnull
أوundefined
.
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'
// تحويل AsyncIterable إلى Duplex قابل للقراءة.
const s1 = compose(
(async function* () {
yield 'Hello'
yield 'World'
})()
)
// تحويل AsyncGenerator إلى Duplex تحويل.
const s2 = compose(async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
})
let res = ''
// تحويل AsyncFunction إلى Duplex قابل للكتابة.
const s3 = compose(async function (source) {
for await (const chunk of source) {
res += chunk
}
})
await finished(compose(s1, s2, s3))
console.log(res) // يطبع 'HELLOWORLD'
انظر إلى readable.compose(stream)
لـ stream.compose
كعامل.
stream.Readable.from(iterable[, options])
مضاف في: v12.3.0، v10.17.0
iterable
<Iterable> كائن ينفذ بروتوكولSymbol.asyncIterator
أوSymbol.iterator
. يصدر حدث 'error' إذا تم تمرير قيمة خالية.options
<Object> الخيارات المُقدمة إلىnew stream.Readable([options])
. بشكل افتراضي، سيقومReadable.from()
بتعيينoptions.objectMode
إلىtrue
، ما لم يتم إلغاء هذا الاختيار صراحةً عن طريق تعيينoptions.objectMode
إلىfalse
.- مُخرجات: <stream.Readable>
طريقة مساعدة لإنشاء تيارات قابلة للقراءة من المُكررات.
const { Readable } = require('node:stream')
async function* generate() {
yield 'hello'
yield 'streams'
}
const readable = Readable.from(generate())
readable.on('data', chunk => {
console.log(chunk)
})
لن يتم تكرار السلاسل أو المخازن المؤقتة عند استدعاء Readable.from(string)
أو Readable.from(buffer)
لمطابقة دلالات التيارات الأخرى لأسباب تتعلق بالأداء.
إذا تم تمرير كائن Iterable
يحتوي على وعود كوسيطة، فقد يؤدي ذلك إلى رفض غير معالج.
const { Readable } = require('node:stream')
Readable.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // رفض غير معالج
])
stream.Readable.fromWeb(readableStream[, options])
مضاف في: v17.0.0
[مستقر: 1 - تجريبي]
مستقر: 1 استقرار: 1 - تجريبي
readableStream
<ReadableStream>options
<Object>encoding
<string>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
مُخرجات: <stream.Readable>
stream.Readable.isDisturbed(stream)
تم الإضافة في: v16.8.0
stream
<stream.Readable> | <ReadableStream>- الإرجاع:
boolean
يُرجع ما إذا تم قراءة الدفق أو إلغاؤه.
stream.isErrored(stream)
تم الإضافة في: v17.3.0، v16.14.0
stream
<Readable> | <Writable> | <Duplex> | <WritableStream> | <ReadableStream>- الإرجاع: <boolean>
يُرجع ما إذا كان الدفق قد واجه خطأً.
stream.isReadable(stream)
تم الإضافة في: v17.4.0، v16.14.0
stream
<Readable> | <Duplex> | <ReadableStream>- الإرجاع: <boolean>
يُرجع ما إذا كان الدفق قابلاً للقراءة.
stream.Readable.toWeb(streamReadable[, options])
تم الإضافة في: v17.0.0
streamReadable
<stream.Readable>options
<Object>strategy
<Object>highWaterMark
<number> الحد الأقصى لحجم قائمة الانتظار الداخلية (منReadableStream
المُنشأ) قبل تطبيق ضغط الظهر في القراءة منstream.Readable
المُعطى. إذا لم يتم توفير أي قيمة، فسيتم أخذها منstream.Readable
المُعطى.size
<Function> دالة تحدد حجم الكتلة المعطاة من البيانات. إذا لم يتم توفير أي قيمة، فسيكون الحجم1
لجميع الكتل.chunk
<any>- الإرجاع: <number>
الإرجاع: <ReadableStream>
stream.Writable.fromWeb(writableStream[, options])
تم الإضافة في: v17.0.0
[مستقر: 1 - تجريبي]
مستقر: 1 استقرار: 1 - تجريبي
writableStream
<WritableStream>options
<Object>decodeStrings
<boolean>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
الإرجاع: <stream.Writable>
stream.Writable.toWeb(streamWritable)
تم الإضافة في: v17.0.0
[مستقر: 1 - تجريبي]
مستقر: 1 استقرار: 1 - تجريبي
streamWritable
<stream.Writable>- الإرجاع: <WritableStream>
stream.Duplex.from(src)
[السجل]
الإصدار | التغييرات |
---|---|
v19.5.0, v18.17.0 | يمكن الآن أن تكون وسيطة src ReadableStream أو WritableStream . |
v16.8.0 | تم الإضافة في: v16.8.0 |
src
<Stream> | <Blob> | <ArrayBuffer> | <string> | <Iterable> | <AsyncIterable> | <AsyncGeneratorFunction> | <AsyncFunction> | <Promise> | <Object> | <ReadableStream> | <WritableStream>
طريقة مساعدة لإنشاء تدفقات ثنائية الاتجاه.
Stream
يحول التدفق القابل للكتابة إلىDuplex
قابل للكتابة والتدفق القابل للقراءة إلىDuplex
.Blob
يحول إلىDuplex
قابل للقراءة.string
يحول إلىDuplex
قابل للقراءة.ArrayBuffer
يحول إلىDuplex
قابل للقراءة.AsyncIterable
يحول إلىDuplex
قابل للقراءة. لا يمكن توليدnull
.AsyncGeneratorFunction
يحول إلىDuplex
تحويل قابل للقراءة/الكتابة. يجب أن يأخذAsyncIterable
مصدرًا كمعامل أول. لا يمكن توليدnull
.AsyncFunction
يحول إلىDuplex
قابل للكتابة. يجب أن يُرجع إماnull
أوundefined
Object ({ writable, readable })
يحولreadable
وwritable
إلىStream
ثم يجمعهما فيDuplex
حيث سيكتبDuplex
إلىwritable
ويقرأ منreadable
.Promise
يحول إلىDuplex
قابل للقراءة. يتم تجاهل القيمةnull
.ReadableStream
يحول إلىDuplex
قابل للقراءة.WritableStream
يحول إلىDuplex
قابل للكتابة.- الإرجاع: <stream.Duplex>
إذا تم تمرير كائن Iterable
يحتوي على وعود كوسيط، فقد يؤدي ذلك إلى رفض غير معالج.
const { Duplex } = require('node:stream')
Duplex.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // رفض غير معالج
])
stream.Duplex.fromWeb(pair[, options])
مضاف في: v17.0.0
[مستقر: 1 - تجريبي]
مستقر: 1 استقرار: 1 - تجريبي
pair
<Object>readable
<ReadableStream>writable
<WritableStream>
options
<Object>مُخرجات: <stream.Duplex>
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
for await (const chunk of duplex) {
console.log('readable', chunk)
}
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))
stream.Duplex.toWeb(streamDuplex)
أضيف في: v17.0.0
streamDuplex
<stream.Duplex>- قيمة الإرجاع: <Object>
readable
<ReadableStream>writable
<WritableStream>
import { Duplex } from 'node:stream'
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
const { value } = await readable.getReader().read()
console.log('readable', value)
const { Duplex } = require('node:stream')
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
readable
.getReader()
.read()
.then(result => {
console.log('readable', result.value)
})
stream.addAbortSignal(signal, stream)
[السجل]
الإصدار | التغييرات |
---|---|
v19.7.0, v18.16.0 | تمت إضافة دعم لـ ReadableStream و WritableStream . |
v15.4.0 | أضيف في: v15.4.0 |
signal
<AbortSignal> إشارة تمثل إلغاءً ممكنًاstream
<Stream> | <ReadableStream> | <WritableStream> تدفق لإرفاق إشارة به.
يربط إشارة AbortSignal بتدفق قابل للقراءة أو الكتابة. هذا يسمح للرمز بالتحكم في تدمير التدفق باستخدام AbortController
.
إن استدعاء abort
على AbortController
المقابل لإشارة AbortSignal
الممررة سيتصرف بنفس طريقة استدعاء .destroy(new AbortError())
على التدفق، و controller.error(new AbortError())
لتدفقات الويب.
const fs = require('node:fs')
const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// في وقت لاحق، إلغاء العملية وإغلاق التدفق
controller.abort()
أو باستخدام AbortSignal
مع تدفق قابل للقراءة كتكرار غير متزامن:
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // ضبط مهلة زمنية
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
try {
for await (const chunk of stream) {
await process(chunk)
}
} catch (e) {
if (e.name === 'AbortError') {
// تم إلغاء العملية
} else {
throw e
}
}
})()
أو باستخدام AbortSignal
مع ReadableStream:
const controller = new AbortController()
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello')
controller.enqueue('world')
controller.close()
},
})
addAbortSignal(controller.signal, rs)
finished(rs, err => {
if (err) {
if (err.name === 'AbortError') {
// تم إلغاء العملية
}
}
})
const reader = rs.getReader()
reader.read().then(({ value, done }) => {
console.log(value) // hello
console.log(done) // false
controller.abort()
})
stream.getDefaultHighWaterMark(objectMode)
مضاف في: v19.9.0، v18.17.0
يرجع علامة المياه العالية الافتراضية المستخدمة بواسطة التدفقات. القيمة الافتراضية هي 65536
(64 كيلوبايت)، أو 16
لـ objectMode
.
stream.setDefaultHighWaterMark(objectMode, value)
مضاف في: v19.9.0، v18.17.0
يُعيّن علامة المياه العالية الافتراضية المستخدمة بواسطة التدفقات.
واجهة برمجة التطبيقات لمنفذي التدفقات
تم تصميم واجهة برمجة التطبيقات node:stream
لجعل من السهل تنفيذ التدفقات باستخدام نموذج الميراث النموذجي في جافا سكريبت.
أولاً، سيعلن مطور التدفق عن فئة جافا سكريبت جديدة تمتد من إحدى فئات التدفقات الأساسية الأربع (stream.Writable
، stream.Readable
، stream.Duplex
، أو stream.Transform
)، مع التأكد من استدعاء مُنشئ فئة الوالد المناسب:
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark })
// ...
}
}
عند توسيع التدفقات، ضع في اعتبارك الخيارات التي يمكن للمستخدم تقديمها ويجب عليه تقديمها قبل توجيهها إلى مُنشئ القاعدة. على سبيل المثال، إذا كانت التنفيذات تقوم بافتراضات فيما يتعلق بخيارات autoDestroy
و emitClose
، فلا تسمح للمستخدم بتجاوز هذه الخيارات. كن واضحًا بشأن الخيارات التي يتم توجيهها بدلاً من توجيه جميع الخيارات ضمنيًا.
يجب على فئة التدفق الجديدة بعد ذلك تنفيذ طريقة أو أكثر محددة، اعتمادًا على نوع التدفق الذي يتم إنشاؤه، كما هو مفصل في الجدول أدناه:
حالة الاستخدام | الفئة | الطريقة (الطرق) التي يجب تنفيذها |
---|---|---|
القراءة فقط | Readable | _read() |
الكتابة فقط | Writable | _write() ، _writev() ، _final() |
القراءة والكتابة | Duplex | _read() ، _write() ، _writev() ، _final() |
التعامل مع البيانات المكتوبة، ثم قراءة النتيجة | Transform | _transform() ، _flush() ، _final() |
يجب ألا يقوم رمز التنفيذ الخاص بالتدفق أبدًا باستدعاء الطرق "العامة" للتدفق المخصصة للاستخدام من قبل المستهلكين (كما هو موضح في قسم واجهة برمجة التطبيقات لمستهلكي التدفقات). قد يؤدي القيام بذلك إلى آثار جانبية سلبية في رمز التطبيق الذي يستهلك التدفق.
تجنب تجاوز الطرق العامة مثل write()
، end()
، cork()
، uncork()
، read()
و destroy()
، أو إصدار أحداث داخلية مثل 'error'
، 'data'
، 'end'
، 'finish'
و 'close'
من خلال .emit()
. قد يؤدي القيام بذلك إلى كسر ثوابت التدفق الحالية والمستقبلية مما يؤدي إلى مشاكل في السلوك و/أو التوافق مع التدفقات الأخرى، وأدوات التدفق، وتوقعات المستخدم.
بناء مبسّط
أضيف في: v1.2.0
في العديد من الحالات البسيطة، من الممكن إنشاء تيار دون الاعتماد على الميراث. يمكن تحقيق ذلك من خلال إنشاء مثيلات مباشرة من كائنات stream.Writable
، وstream.Readable
، وstream.Duplex
، أو stream.Transform
، ومرور أساليب مناسبة كخيارات مُنشئ.
const { Writable } = require('node:stream')
const myWritable = new Writable({
construct(callback) {
// تهيئة الحالة وتحميل الموارد...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// تحرير الموارد...
},
})
تنفيذ تيار قابل للكتابة
يتم توسيع فئة stream.Writable
لتنفيذ تيار Writable
.
يجب على تيارات Writable
المخصصة استدعاء مُنشئ new stream.Writable([options])
وتنفيذ طريقة writable._write()
و/أو writable._writev()
.
new stream.Writable([options])
[History]
الإصدار | التغييرات |
---|---|
v22.0.0 | زيادة قيمة highWaterMark الافتراضية. |
v15.5.0 | دعم تمرير AbortSignal . |
v14.0.0 | تغيير قيمة الخيار autoDestroy الافتراضية إلى true . |
v11.2.0، v10.16.0 | إضافة خيار autoDestroy لإلغاء destroy() للتيار تلقائيًا عند إصداره 'finish' أو حدوث أخطاء. |
v10.0.0 | إضافة خيار emitClose لتحديد ما إذا كان يتم إصدار 'close' عند الإلغاء. |
options
<Object>highWaterMark
<number> مستوى المخزن المؤقت عندما يبدأstream.write()
في إرجاعfalse
. الافتراضي:65536
(64 كيلوبايت)، أو16
لتياراتobjectMode
.decodeStrings
<boolean> ما إذا كان سيتم ترميز سلاسلstring
الممررة إلىstream.write()
إلىBuffer
s (بالتشفير المحدد في استدعاءstream.write()
) قبل تمريرها إلىstream._write()
. لا يتم تحويل أنواع البيانات الأخرى (أي لا يتم فك تشفيرBuffer
s إلىstring
s). سيؤدي التعيين إلىfalse
إلى منع تحويل سلاسلstring
. الافتراضي:true
.defaultEncoding
<string> الترميز الافتراضي المستخدم عندما لا يتم تحديد ترميز كوسيطة لـstream.write()
. الافتراضي:'utf8'
.objectMode
<boolean> ما إذا كانتstream.write(anyObj)
عملية صالحة. عندما يتم تعيينه، يصبح من الممكن كتابة قيم JavaScript بخلاف السلسلة، <Buffer>، <TypedArray> أو <DataView> إذا كان مدعومًا من قبل تنفيذ التيار. الافتراضي:false
.emitClose
<boolean> ما إذا كان يجب على التيار إصدار'close'
بعد تدميره. الافتراضي:true
.write
<Function> التنفيذ لطريقةstream._write()
.writev
<Function> التنفيذ لطريقةstream._writev()
.destroy
<Function> التنفيذ لطريقةstream._destroy()
.final
<Function> التنفيذ لطريقةstream._final()
.construct
<Function> التنفيذ لطريقةstream._construct()
.autoDestroy
<boolean> ما إذا كان يجب على هذا التيار استدعاء.destroy()
على نفسه تلقائيًا بعد الانتهاء. الافتراضي:true
.signal
<AbortSignal> إشارة تمثل إلغاءً ممكنًا.
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor(options) {
// يستدعي مُنشئ stream.Writable().
super(options)
// ...
}
}
أو، عند استخدام مُنشئات على طراز ما قبل ES6:
const { Writable } = require('node:stream')
const util = require('node:util')
function MyWritable(options) {
if (!(this instanceof MyWritable)) return new MyWritable(options)
Writable.call(this, options)
}
util.inherits(MyWritable, Writable)
أو، باستخدام نهج المُنشئ المُبسّط:
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
})
سيؤدي استدعاء abort
على AbortController
المُقابِل لـ AbortSignal
المُمرر إلى نفس النتيجة التي يحدثها استدعاء .destroy(new AbortError())
على تيار الكتابة.
const { Writable } = require('node:stream')
const controller = new AbortController()
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
})
// في وقت لاحق، إلغاء العملية وإغلاق التيار
controller.abort()
writable._construct(callback)
مضاف في: v15.0.0
callback
<Function> استدعِ هذه الدالة (اختياريًا مع وسيطة خطأ) عندما ينتهي تدفق البيانات من عملية البدء.
يجب ألا يتم استدعاء طريقة _construct()
مباشرةً. قد يتم تنفيذها بواسطة فئات فرعية، وإذا كان الأمر كذلك، فسيتم استدعاءها بواسطة طرق فئة Writable
الداخلية فقط.
ستتم استدعاء هذه الدالة الاختيارية في دورة معالجة بعد أن تعيد مُنشئ التدفق، مما يؤخر أي استدعاءات لـ _write()
, _final()
, و _destroy()
حتى يتم استدعاء callback
. هذا مفيد لتهيئة الحالة أو تهيئة الموارد بشكل غير متزامن قبل استخدام التدفق.
const { Writable } = require('node:stream')
const fs = require('node:fs')
class WriteStream extends Writable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, 'w', (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback)
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
writable._write(chunk, encoding, callback)
[السجل]
الإصدار | التغييرات |
---|---|
v12.11.0 | _write() اختياري عند توفير _writev(). |
chunk
<Buffer> | <string> | <any> الـBuffer
المراد كتابته، مُحوّل من سلسلةstring
المُمرّرة إلىstream.write()
. إذا كان خيارdecodeStrings
للتدفق مُعيّنًا علىfalse
أو كان التدفق يعمل في وضع الكائن، فلن يتم تحويل الـchunk
وسيكون أي شيء مُمرّر إلىstream.write()
.encoding
<string> إذا كانت الـchunk
سلسلة، فإنencoding
هو ترميز الأحرف لتلك السلسلة. إذا كانت الـchunk
عبارة عنBuffer
، أو إذا كان التدفق يعمل في وضع الكائن، فيمكن تجاهلencoding
.callback
<Function> استدعِ هذه الدالة (اختياريًا مع وسيطة خطأ) عند اكتمال المعالجة للجزء المُعطى.
يجب أن توفر جميع تنفيذات تدفق Writable
طريقة writable._write()
و/أو writable._writev()
لإرسال البيانات إلى المورد الأساسي.
توفر تدفقات Transform
تنفيذها الخاص لـ writable._write()
.
يجب ألا يتم استدعاء هذه الدالة بواسطة رمز التطبيق مباشرةً. يجب تنفيذها بواسطة فئات فرعية، ويتم استدعاءها بواسطة طرق فئة Writable
الداخلية فقط.
يجب استدعاء دالة callback
بشكل متزامن داخل writable._write()
أو بشكل غير متزامن (أي دورة معالجة مختلفة) للإشارة إما إلى أن الكتابة اكتملت بنجاح أو فشلت بحدوث خطأ. يجب أن تكون الوسيطة الأولى المُمرّرة إلى callback
هي كائن Error
إذا فشل الاستدعاء أو null
إذا نجحت الكتابة.
ستتسبب جميع الاستدعاءات إلى writable.write()
التي تحدث بين وقت استدعاء writable._write()
واستدعاء callback
في تخزين البيانات المكتوبة مؤقتًا. عندما يتم استدعاء callback
، قد يُصدر التدفق حدث 'drain'
. إذا كان تنفيذ التدفق قادرًا على معالجة أجزاء متعددة من البيانات في وقت واحد، فيجب تنفيذ طريقة writable._writev()
.
إذا تم تعيين خاصية decodeStrings
صراحةً إلى false
في خيارات المُنشئ، فستظل chunk
نفس الكائن المُمرّر إلى .write()
، وقد تكون سلسلة بدلاً من Buffer
. هذا لدعم التنفيذات التي لديها معالجة مُحسّنة لبعض ترميزات بيانات السلاسل. في تلك الحالة، ستشير وسيطة encoding
إلى ترميز الأحرف للسلسلة. خلاف ذلك، يمكن تجاهل وسيطة encoding
بأمان.
تمت إضافة بادئة _
إلى طريقة writable._write()
لأنها داخلية للفئة التي تُعرّفها، ويجب ألا يتم استدعاءها مباشرةً بواسطة برامج المستخدم.
writable._writev(chunks, callback)
chunks
<Object[]> البيانات المراد كتابتها. القيمة عبارة عن مصفوفة من <Object> يمثل كل منها جزءًا منفصلًا من البيانات المراد كتابتها. خصائص هذه الكائنات هي:callback
<Function> دالة استدعاء (اختياريًا مع وسيطة خطأ) يتم استدعاؤها عند اكتمال المعالجة للأجزاء الموردة.
يجب ألا يتم استدعاء هذه الدالة مباشرة بواسطة رمز التطبيق. يجب تنفيذه بواسطة فئات فرعية، ويجب استدعاؤه بواسطة طرق فئة Writable
الداخلية فقط.
يمكن تنفيذ طريقة writable._writev()
بالإضافة إلى أو بدلاً من writable._write()
في تنفيذ التيارات القادرة على معالجة أجزاء متعددة من البيانات في وقت واحد. إذا تم تنفيذه، وإذا كانت هناك بيانات مؤقتة من كتابات سابقة، فسيتم استدعاء _writev()
بدلاً من _write()
.
تتم إضافة بادئة writable._writev()
بعلامة تحتية لأنها داخلية للفئة التي تُعرّفها، ويجب ألا يتم استدعاؤها مباشرةً بواسطة برامج المستخدم.
writable._destroy(err, callback)
تمت الإضافة في: v8.0.0
err
<Error> خطأ محتمل.callback
<Function> دالة استدعاء تأخذ وسيطة خطأ اختيارية.
يتم استدعاء طريقة _destroy()
بواسطة writable.destroy()
. يمكن تجاوزها بواسطة فئات فرعية ولكن يجب ألا يتم استدعاؤها مباشرة.
writable._final(callback)
مضاف في: v8.0.0
callback
<Function> استدعاء هذه الدالة (اختياريًا مع وسيطة خطأ) عند الانتهاء من كتابة أي بيانات متبقية.
يجب عدم استدعاء طريقة _final()
مباشرة. يمكن تنفيذها بواسطة فئات فرعية، وإذا كان الأمر كذلك، فسيتم استدعاءها بواسطة طرق الفئة Writable
الداخلية فقط.
ستتم استدعاء هذه الدالة الاختيارية قبل إغلاق التدفق، مما يؤخر حدث 'finish'
حتى يتم استدعاء callback
. هذا مفيد لإغلاق الموارد أو كتابة البيانات المخزنة مؤقتًا قبل انتهاء التدفق.
أخطاء أثناء الكتابة
يجب نشر الأخطاء التي تحدث أثناء معالجة طرق writable._write()
، writable._writev()
و writable._final()
عن طريق استدعاء الدالة المُرتجعة ومرور الخطأ كوسيطة أولى. يؤدي إلقاء خطأ Error
من داخل هذه الطرق أو إصدار حدث 'error'
يدويًا إلى سلوك غير محدد.
إذا كان تدفق Readable
يمر عبر أنبوب إلى تدفق Writable
عندما يصدر Writable
خطأ، فسيتم إلغاء أنبوب تدفق Readable
.
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
},
})
مثال على تدفق مكتوب
يوضح ما يلي تنفيذًا بسيطًا (وبلا معنى إلى حد ما) لتدفق Writable
مخصص. في حين أن مثيل تدفق Writable
المحدد هذا ليس له أي فائدة خاصة، إلا أن المثال يوضح كل العناصر المطلوبة لمثيل تدفق Writable
مخصص:
const { Writable } = require('node:stream')
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
}
}
فك تشفير المخازن المؤقتة في دفق قابل للكتابة
يُعد فك تشفير المخازن المؤقتة مهمة شائعة، على سبيل المثال، عند استخدام المُحولات التي يكون مُدخلها سلسلة نصية. هذه ليست عملية تافهة عند استخدام ترميز أحرف متعدد البايت، مثل UTF-8. يوضح المثال التالي كيفية فك تشفير السلاسل متعددة البايت باستخدام StringDecoder
و Writable
.
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')
class StringWritable extends Writable {
constructor(options) {
super(options)
this._decoder = new StringDecoder(options?.defaultEncoding)
this.data = ''
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk)
}
this.data += chunk
callback()
}
_final(callback) {
this.data += this._decoder.end()
callback()
}
}
const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()
w.write('currency: ')
w.write(euro[0])
w.end(euro[1])
console.log(w.data) // currency: €
تنفيذ دفق قابل للقراءة
يتم توسيع فئة stream.Readable
لتنفيذ دفق Readable
.
يجب على تيارات Readable
المخصصة استدعاء مُنشئ new stream.Readable([options])
وتنفيذ طريقة readable._read()
.
new stream.Readable([options])
[History]
الإصدار | التغييرات |
---|---|
v22.0.0 | زيادة قيمة highWaterMark الافتراضية. |
v15.5.0 | دعم تمرير AbortSignal . |
v14.0.0 | تغيير قيمة autoDestroy الافتراضية إلى true . |
v11.2.0, v10.16.0 | إضافة خيار autoDestroy لإلغاء destroy() للدفق تلقائيًا عند إصدار 'end' أو حدوث أخطاء. |
options
<Object>highWaterMark
<number> الحد الأقصى لـعدد البايتات التي سيتم تخزينها في المخزن المؤقت الداخلي قبل التوقف عن القراءة من المورد الأساسي. الافتراضي:65536
(64 كيلوبايت)، أو16
لتياراتobjectMode
.encoding
<string> إذا تم تحديده، فسيتم فك تشفير المخازن المؤقتة إلى سلاسل نصية باستخدام الترميز المحدد. الافتراضي:null
.objectMode
<boolean> ما إذا كان يجب أن يتصرف هذا الدفق كدفق من الكائنات. بمعنى أنstream.read(n)
يُرجع قيمة واحدة بدلاً منBuffer
بحجمn
. الافتراضي:false
.emitClose
<boolean> ما إذا كان يجب على الدفق إصدار'close'
بعد تدميره. الافتراضي:true
.read
<Function> التنفيذ لطريقةstream._read()
.destroy
<Function> التنفيذ لطريقةstream._destroy()
.construct
<Function> التنفيذ لطريقةstream._construct()
.autoDestroy
<boolean> ما إذا كان يجب على هذا الدفق استدعاء.destroy()
تلقائيًا بنفسه بعد الانتهاء. الافتراضي:true
.signal
<AbortSignal> إشارة تمثل إلغاءً محتملًا.
const { Readable } = require('node:stream')
class MyReadable extends Readable {
constructor(options) {
// يستدعي مُنشئ stream.Readable(options).
super(options)
// ...
}
}
أو، عند استخدام مُنشئات على طراز ما قبل ES6:
const { Readable } = require('node:stream')
const util = require('node:util')
function MyReadable(options) {
if (!(this instanceof MyReadable)) return new MyReadable(options)
Readable.call(this, options)
}
util.inherits(MyReadable, Readable)
أو، باستخدام نهج المُنشئ المُبسط:
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
// ...
},
})
سيؤدي استدعاء abort
على AbortController
المقابل لـ AbortSignal
المُمرر إلى نفس طريقة استدعاء .destroy(new AbortError())
على القابل للقراءة المُنشأ.
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal,
})
// لاحقًا، ألغِ العملية وأغلق الدفق
controller.abort()
readable._construct(callback)
مضاف في: v15.0.0
callback
<Function> اتصل بهذه الدالة (اختياريًا مع وسيطة خطأ) عندما ينتهي تدفق الإعداد.
يجب ألا يتم استدعاء طريقة _construct()
مباشرة. يمكن تنفيذه بواسطة فئات فرعية، وإذا كان الأمر كذلك، فسيتم استدعاؤه بواسطة طرق فئة Readable
الداخلية فقط.
ستتم جدولة هذه الدالة الاختيارية في التكرار التالي بواسطة مُنشئ التدفق، مما يؤخر أي استدعاءات _read()
و _destroy()
حتى يتم استدعاء callback
. هذا مفيد لتهيئة الحالة أو تهيئة الموارد بشكل غير متزامن قبل استخدام التدفق.
const { Readable } = require('node:stream')
const fs = require('node:fs')
class ReadStream extends Readable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_read(n) {
const buf = Buffer.alloc(n)
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err)
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
}
})
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
readable._read(size)
مضاف في: v0.9.4
size
<number> عدد البايتات التي سيتم قراءتها بشكل غير متزامن
يجب ألا يتم استدعاء هذه الدالة مباشرة بواسطة رمز التطبيق. يجب تنفيذه بواسطة فئات فرعية، ويتم استدعاؤه بواسطة طرق فئة Readable
الداخلية فقط.
يجب أن توفر جميع تنفيذات تدفق Readable
تنفيذًا لطريقة readable._read()
لجلب البيانات من المورد الأساسي.
عندما يتم استدعاء readable._read()
، إذا كانت البيانات متاحة من المورد، فيجب أن يبدأ التنفيذ في دفع هذه البيانات إلى قائمة الانتظار للقراءة باستخدام طريقة this.push(dataChunk)
. سيتم استدعاء _read()
مرة أخرى بعد كل استدعاء لـ this.push(dataChunk)
بمجرد أن يكون التدفق جاهزًا لقبول المزيد من البيانات. قد يستمر _read()
في القراءة من المورد ودفع البيانات حتى تقوم readable.push()
بإرجاع false
. فقط عندما يتم استدعاء _read()
مرة أخرى بعد توقفها، يجب أن تستأنف دفع بيانات إضافية إلى قائمة الانتظار.
بمجرد استدعاء طريقة readable._read()
، لن يتم استدعاؤها مرة أخرى حتى يتم دفع المزيد من البيانات من خلال طريقة readable.push()
. لن تتسبب البيانات الفارغة مثل المخازن المؤقتة والصفائف الفارغة في استدعاء readable._read()
.
وسيطه size
استشارية. بالنسبة للتنفيذات التي تكون فيها "القراءة" عملية واحدة تُرجع البيانات، يمكن استخدام وسيطة size
لتحديد مقدار البيانات التي سيتم جلبها. قد تتجاهل التنفيذات الأخرى هذه الوسيطة، وتقدم البيانات ببساطة عندما تصبح متاحة. ليست هناك حاجة "لانتظار" حتى تتوفر بايتات size
قبل استدعاء stream.push(chunk)
.
تمت إضافة بادئة _
إلى طريقة readable._read()
لأنها داخلية للفئة التي تُعرّفها، ويجب ألا يتم استدعاؤها مباشرةً بواسطة برامج المستخدم.
readable._destroy(err, callback)
مضاف في: v8.0.0
err
<Error> خطأ محتمل.callback
<Function> دالة استدعاء راجعة تأخذ وسيطة خطأ اختيارية.
تُستدعى طريقة _destroy()
بواسطة readable.destroy()
. يمكن للفئات الفرعية تجاوزها، ولكن يجب عدم استدعائها مباشرة.
readable.push(chunk[, encoding])
[السجل]
الإصدار | التغييرات |
---|---|
v22.0.0, v20.13.0 | يمكن الآن أن تكون وسيطة chunk مثيلًا من TypedArray أو DataView . |
v8.0.0 | يمكن الآن أن تكون وسيطة chunk مثيلًا من Uint8Array . |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> جزء من البيانات المراد دفعه إلى قائمة الانتظار للقراءة. بالنسبة للتيارات التي لا تعمل في وضع الكائن، يجب أن يكونchunk
عبارة عن <string>، أو <Buffer>، أو <TypedArray> أو <DataView>. بالنسبة لتيارات وضع الكائن، يمكن أن يكونchunk
أي قيمة جافا سكريبت.encoding
<string> ترميز أجزاء السلسلة. يجب أن يكون ترميزًا صالحًا لـBuffer
، مثل'utf8'
أو'ascii'
.- القيمة المُرجعه: <boolean>
true
إذا كان من الممكن الاستمرار في دفع أجزاء إضافية من البيانات؛false
خلاف ذلك.
عندما يكون chunk
عبارة عن <Buffer>، أو <TypedArray>، أو <DataView> أو <string>، سيتم إضافة جزء البيانات إلى قائمة الانتظار الداخلية للمستخدمين لتستهلكها. يشير تمرير chunk
كـ null
إلى نهاية الدفق (EOF)، وبعد ذلك لا يمكن كتابة المزيد من البيانات.
عندما يعمل Readable
في وضع الإيقاف المؤقت، يمكن قراءة البيانات المضافة باستخدام readable.push()
من خلال استدعاء طريقة readable.read()
عند إصدار حدث 'readable'
.
عندما يعمل Readable
في وضع التدفق، سيتم تسليم البيانات المضافة باستخدام readable.push()
من خلال إصدار حدث 'data'
.
صُممت طريقة readable.push()
لتكون مرنة قدر الإمكان. على سبيل المثال، عند لف مصدر منخفض المستوى يوفر نوعًا ما من آلية الإيقاف المؤقت/الاستئناف، ودالة استدعاء بيانات، يمكن لف المصدر منخفض المستوى بواسطة مثيل Readable
مخصص:
// `_source` هو كائن به طرق readStop() و readStart()،
// وعضو `ondata` الذي يتم استدعاؤه عندما يكون لديه بيانات، و
// وعضو `onend` الذي يتم استدعاؤه عندما تنتهي البيانات.
class SourceWrapper extends Readable {
constructor(options) {
super(options)
this._source = getLowLevelSourceObject()
// في كل مرة توجد بيانات، قم بدفعها إلى المخزن المؤقت الداخلي.
this._source.ondata = chunk => {
// إذا قامت push() بإرجاع false، فتوقف عن القراءة من المصدر.
if (!this.push(chunk)) this._source.readStop()
}
// عندما ينتهي المصدر، قم بدفع جزء `null` الذي يشير إلى نهاية الملف.
this._source.onend = () => {
this.push(null)
}
}
// سيتم استدعاء _read() عندما يريد الدفق سحب المزيد من البيانات.
// يتم تجاهل وسيطة الحجم الاستشاري في هذه الحالة.
_read(size) {
this._source.readStart()
}
}
تُستخدم طريقة readable.push()
لدفع المحتوى إلى المخزن المؤقت الداخلي. يمكن تشغيلها بواسطة طريقة readable._read()
.
بالنسبة للتيارات التي لا تعمل في وضع الكائن، إذا كانت معلمة chunk
لـ readable.push()
هي undefined
، فسيتم التعامل معها كسلسلة أو مخزن مؤقت فارغ. راجع readable.push('')
لمزيد من المعلومات.
أخطاء أثناء القراءة
يجب نشر الأخطاء التي تحدث أثناء معالجة readable._read()
من خلال طريقة readable.destroy(err)
. إن طرح خطأ Error
من داخل readable._read()
أو إصدار حدث 'error'
يدويًا يؤدي إلى سلوك غير محدد.
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
const err = checkSomeErrorCondition()
if (err) {
this.destroy(err)
} else {
// القيام ببعض العمل.
}
},
})
مثال على دفق العد
فيما يلي مثال أساسي على دفق Readable
يصدر الأرقام من 1 إلى 1,000,000 بترتيب تصاعدي، ثم ينتهي.
const { Readable } = require('node:stream')
class Counter extends Readable {
constructor(opt) {
super(opt)
this._max = 1000000
this._index = 1
}
_read() {
const i = this._index++
if (i > this._max) this.push(null)
else {
const str = String(i)
const buf = Buffer.from(str, 'ascii')
this.push(buf)
}
}
}
تنفيذ دفق ثنائي الاتجاه
دفق Duplex
هو دفق ينفذ كلاً من Readable
و Writable
، مثل اتصال مقبس TCP.
نظرًا لأن JavaScript لا يدعم الميراث المتعدد، يتم توسيع فئة stream.Duplex
لتنفيذ دفق Duplex
(على عكس توسيع فئتي stream.Readable
و stream.Writable
).
ترث فئة stream.Duplex
نموذجيًا من stream.Readable
و طفيليًا من stream.Writable
، لكن instanceof
ستعمل بشكل صحيح لكلا الفئتين الأساسيتين نظرًا لإلغاء تجاوز Symbol.hasInstance
على stream.Writable
.
يجب أن تستدعي دفقات Duplex
المخصصة مُنشئ new stream.Duplex([options])
وتنفذ كلاً من طريقتي readable._read()
و writable._write()
.
new stream.Duplex(options)
[السجل]
الإصدار | التغييرات |
---|---|
v8.4.0 | تم دعم خيارات readableHighWaterMark و writableHighWaterMark الآن. |
options
<Object> تُمرَّر إلى كل من مُنشئيWritable
وReadable
. كما أن لديها الحقول التالية:allowHalfOpen
<boolean> إذا تم تعيينه علىfalse
، فسوف يُنهي التدفق تلقائيًا الجانب القابل للكتابة عندما ينتهي الجانب القابل للقراءة. الافتراضي:true
.readable
<boolean> يُعيّن ما إذا كان يجب أن يكونDuplex
قابلًا للقراءة. الافتراضي:true
.writable
<boolean> يُعيّن ما إذا كان يجب أن يكونDuplex
قابلًا للكتابة. الافتراضي:true
.readableObjectMode
<boolean> يُعيّنobjectMode
للجانب القابل للقراءة من التدفق. ليس له أي تأثير إذا كانobjectMode
يساويtrue
. الافتراضي:false
.writableObjectMode
<boolean> يُعيّنobjectMode
للجانب القابل للكتابة من التدفق. ليس له أي تأثير إذا كانobjectMode
يساويtrue
. الافتراضي:false
.readableHighWaterMark
<number> يُعيّنhighWaterMark
للجانب القابل للقراءة من التدفق. ليس له أي تأثير إذا تم توفيرhighWaterMark
.writableHighWaterMark
<number> يُعيّنhighWaterMark
للجانب القابل للكتابة من التدفق. ليس له أي تأثير إذا تم توفيرhighWaterMark
.
const { Duplex } = require('node:stream')
class MyDuplex extends Duplex {
constructor(options) {
super(options)
// ...
}
}
أو، عند استخدام مُنشئات نمط ما قبل ES6:
const { Duplex } = require('node:stream')
const util = require('node:util')
function MyDuplex(options) {
if (!(this instanceof MyDuplex)) return new MyDuplex(options)
Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)
أو، باستخدام نهج المُنشئ المُبسّط:
const { Duplex } = require('node:stream')
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
},
})
عند استخدام pipeline:
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')
pipeline(
fs.createReadStream('object.json').setEncoding('utf8'),
new Transform({
decodeStrings: false, // قبول إدخال نصي بدلاً من العُروض
construct(callback) {
this.data = ''
callback()
},
transform(chunk, encoding, callback) {
this.data += chunk
callback()
},
flush(callback) {
try {
// تأكد من أنّه JSON صحيح.
JSON.parse(this.data)
this.push(this.data)
callback()
} catch (err) {
callback(err)
}
},
}),
fs.createWriteStream('valid-object.json'),
err => {
if (err) {
console.error('failed', err)
} else {
console.log('completed')
}
}
)
مثال تيار ثنائي الاتجاه
يوضح ما يلي مثالاً بسيطًا لتيار Duplex
يلف كائن مصدر منخفض المستوى افتراضي يمكن كتابة البيانات إليه، وقراءة البيانات منه، وإن كان ذلك باستخدام واجهة برمجة تطبيقات غير متوافقة مع تيارات Node.js. يوضح ما يلي مثالًا بسيطًا لتيار Duplex
يخزن البيانات المكتوبة الواردة عبر واجهة Writable
التي تُقرأ مرة أخرى عبر واجهة Readable
.
const { Duplex } = require('node:stream')
const kSource = Symbol('source')
class MyDuplex extends Duplex {
constructor(source, options) {
super(options)
this[kSource] = source
}
_write(chunk, encoding, callback) {
// المصدر الأساسي يتعامل فقط مع السلاسل النصية.
if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
this[kSource].writeSomeData(chunk)
callback()
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding))
})
}
}
الجانب الأكثر أهمية في تيار Duplex
هو أن جانبي Readable
و Writable
يعملان بشكل مستقل عن بعضهما البعض على الرغم من وجودهما معًا ضمن مثيل كائن واحد.
تيارات ثنائية الاتجاه في وضع الكائن
بالنسبة لتيارات Duplex
، يمكن تعيين objectMode
حصريًا لإما جانب Readable
أو Writable
باستخدام خيارات readableObjectMode
و writableObjectMode
على التوالي.
في المثال التالي، على سبيل المثال، يتم إنشاء تيار Transform
جديد (وهو نوع من تيار Duplex
) يحتوي على جانب Writable
في وضع الكائن يقبل الأرقام الجافا سكريبت التي يتم تحويلها إلى سلاسل سداسية عشرية على جانب Readable
.
const { Transform } = require('node:stream')
// جميع تيارات Transform هي أيضًا تيارات Duplex.
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// تحويل الشريحة إلى رقم إذا لزم الأمر.
chunk |= 0
// تحويل الشريحة إلى شيء آخر.
const data = chunk.toString(16)
// دفع البيانات إلى قائمة الانتظار القابلة للقراءة.
callback(null, '0'.repeat(data.length % 2) + data)
},
})
myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))
myTransform.write(1)
// يطبع: 01
myTransform.write(10)
// يطبع: 0a
myTransform.write(100)
// يطبع: 64
تنفيذ دفق التحويل
دفق Transform
هو دفق Duplex
حيث يتم حساب الإخراج بطريقة ما من الإدخال. تتضمن الأمثلة دفقات zlib أو دفقات crypto التي تقوم بضغط البيانات أو تشفيرها أو فك تشفيرها.
لا يوجد شرط أن يكون حجم الإخراج هو نفس حجم الإدخال، أو نفس عدد الأجزاء، أو الوصول في نفس الوقت. على سبيل المثال، لن يكون لدفق Hash
سوى جزء واحد فقط من الإخراج يتم توفيره عند انتهاء الإدخال. سيُنتج دفق zlib
إخراجًا أصغر بكثير أو أكبر بكثير من إدخاله.
يتم توسيع فئة stream.Transform
لتنفيذ دفق Transform
.
ترث فئة stream.Transform
نموذجيًا من stream.Duplex
وتنفذ إصداراتها الخاصة من طرق writable._write()
و readable._read()
. يجب على تنفيذات Transform
المخصصة تنفيذ طريقة transform._transform()
و يجوز لها أيضًا تنفيذ طريقة transform._flush()
.
يجب توخي الحذر عند استخدام دفقات Transform
نظرًا لأن البيانات المكتوبة في الدفق يمكن أن تسبب توقف جانب Writable
من الدفق إذا لم يتم استهلاك الإخراج على جانب Readable
.
new stream.Transform([options])
options
<Object> تم تمريره إلى كل من مُنشئيWritable
وReadable
. يحتوي أيضًا على الحقول التالية:transform
<Function> التنفيذ لطريقةstream._transform()
.flush
<Function> التنفيذ لطريقةstream._flush()
.
const { Transform } = require('node:stream')
class MyTransform extends Transform {
constructor(options) {
super(options)
// ...
}
}
أو، عند استخدام مُنشئات على طراز ما قبل ES6:
const { Transform } = require('node:stream')
const util = require('node:util')
function MyTransform(options) {
if (!(this instanceof MyTransform)) return new MyTransform(options)
Transform.call(this, options)
}
util.inherits(MyTransform, Transform)
أو، باستخدام نهج المُنشئ المبسط:
const { Transform } = require('node:stream')
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
},
})
الحدث: 'end'
حدث 'end'
من فئة stream.Readable
. يُصدر حدث 'end'
بعد إخراج جميع البيانات، وهو ما يحدث بعد استدعاء دالة المُعاودة في transform._flush()
. في حالة حدوث خطأ، يجب عدم إصدار 'end'
.
الحدث: 'finish'
حدث 'finish'
من فئة stream.Writable
. يُصدر حدث 'finish'
بعد استدعاء stream.end()
ومعالجة جميع الأجزاء بواسطة stream._transform()
. في حالة حدوث خطأ، يجب عدم إصدار 'finish'
.
transform._flush(callback)
callback
<Function> دالة مُعاودة (اختياريًا مع وسيطة خطأ وبيانات) ليتم استدعاؤها عندما يتم إفراغ البيانات المتبقية.
يجب ألا يتم استدعاء هذه الدالة مباشرةً بواسطة رمز التطبيق. يجب تنفيذها بواسطة فئات فرعية، ويجب استدعاؤها بواسطة طرق فئة Readable
الداخلية فقط.
في بعض الحالات، قد تحتاج عملية التحويل إلى إصدار جزء إضافي من البيانات في نهاية التدفق. على سبيل المثال، سيتخزن تدفق ضغط zlib
مقدارًا من الحالة الداخلية المستخدمة لضغط الإخراج بشكل مثالي. ومع ذلك، عند انتهاء التدفق، يجب إفراغ تلك البيانات الإضافية حتى تكون البيانات المضغوطة كاملة.
قد تُنفذ عمليات تنفيذ Transform
المُخصصة طريقة transform._flush()
. سيتم استدعاء هذه الطريقة عندما لا توجد بيانات مكتوبة أخرى يتم استهلاكها، ولكن قبل إصدار حدث 'end'
الذي يشير إلى نهاية تدفق Readable
.
ضمن تنفيذ transform._flush()
, قد يتم استدعاء طريقة transform.push()
صفر أو أكثر من المرات، حسب الاقتضاء. يجب استدعاء دالة callback
عندما تكتمل عملية الإفراغ.
تمت إضافة بادئة سطر سفلي لـ transform._flush()
لأنها داخلية للفئة التي تُعرّفها، ويجب ألا يتم استدعاؤها مباشرةً بواسطة برامج المستخدم.
transform._transform(chunk, encoding, callback)
chunk
<Buffer> | <string> | <any> الـBuffer
المراد تحويله، مُحوَّل من سلسلة النصوص المُمرَّرة إلىstream.write()
. إذا كان خيارdecodeStrings
الخاص بالدفق مُعيَّنًا علىfalse
أو كان الدفق يعمل في وضع الكائن، فلن يتم تحويل الجزء ولن يكون سوى ما تم تمريره إلىstream.write()
.encoding
<string> إذا كان الجزء عبارة عن سلسلة نصية، فهذا هو نوع الترميز. إذا كان الجزء عبارة عن مُخزن مؤقت، فهذه هي القيمة الخاصة'buffer'
. تجاهلها في هذه الحالة.callback
<Function> دالة مُنعكس (اختياريًا مع وسيطة خطأ وبيانات) ليتم استدعاؤها بعد معالجة الجزء المُقدَّم.
يجب ألا يتم استدعاء هذه الدالة مباشرةً بواسطة رمز التطبيق. يجب تنفيذها بواسطة فئات فرعية، واستدعائها بواسطة طرق فئة Readable
الداخلية فقط.
يجب أن توفر جميع تنفيذات دفق Transform
طريقة _transform()
لقبول الإدخال وإنتاج المخرجات. تتعامل تنفيذ transform._transform()
مع البايتات التي يتم كتابتها، وتحسب المخرجات، ثم تمرر هذه المخرجات إلى الجزء القابل للقراءة باستخدام طريقة transform.push()
.
يمكن استدعاء طريقة transform.push()
صفر أو أكثر من مرة لإنشاء مخرجات من جزء إدخال واحد، اعتمادًا على مقدار ما يجب إخراجه نتيجةً للجزء.
من الممكن ألا يتم إنشاء أي مخرجات من أي جزء معين من بيانات الإدخال.
يجب استدعاء دالة callback
فقط عند استهلاك الجزء الحالي بالكامل. يجب أن تكون الوسيطة الأولى المُمرَّرة إلى callback
عبارة عن كائن Error
إذا حدث خطأ أثناء معالجة الإدخال أو null
بخلاف ذلك. إذا تم تمرير وسيطة ثانية إلى callback
، فسيتم توجيهها إلى طريقة transform.push()
، ولكن فقط إذا كانت الوسيطة الأولى خاطئة. بعبارة أخرى، ما يلي متكافئ:
transform.prototype._transform = function (data, encoding, callback) {
this.push(data)
callback()
}
transform.prototype._transform = function (data, encoding, callback) {
callback(null, data)
}
تتم إضافة بادئة transform._transform()
بعلامة سفلية لأنها داخلية للفئة التي تُعرّفها، ويجب ألا يتم استدعاؤها مباشرةً بواسطة برامج المستخدم.
لا يتم أبدًا استدعاء transform._transform()
بالتوازي؛ تُنفذ الدفقات آلية قائمة انتظار، ولكي تتلقى الجزء التالي، يجب استدعاء callback
، إما بشكل متزامن أو غير متزامن.
الصنف: stream.PassThrough
يُعدُّ صنف stream.PassThrough
تنفيذاً بسيطاً لتيار Transform
الذي يُمرِّر ببساطة البايتات الداخلة إلى المخرجات. ويهدف هذا الصنف أساساً إلى الأمثلة والاختبارات، ولكن هناك بعض حالات الاستخدام حيث يكون stream.PassThrough
مفيداً كحجر بناء لأنواع جديدة من التيارات.
ملاحظات إضافية
توافق التيارات مع المُولِّدات المتزامنة و المُكرِّرات المتزامنة
مع دعم المُولِّدات المتزامنة و المُكرِّرات في جافاسكريبت، أصبحت المُولِّدات المتزامنة بفعالية بنية لغوية أساسية للتيارات في هذه المرحلة.
فيما يلي بعض حالات التشغيل البيني الشائعة لاستخدام تيارات Node.js مع المُولِّدات المتزامنة و المُكرِّرات المتزامنة.
استهلاك التيارات القابلة للقراءة باستخدام المُكرِّرات المتزامنة
;(async function () {
for await (const chunk of readable) {
console.log(chunk)
}
})()
تسجِّل المُكرِّرات المتزامنة مُعالِج أخطاء دائمًا على التيار لمنع أي أخطاء غير مُعالجة بعد الإلغاء.
إنشاء تيارات قابلة للقراءة باستخدام المُولِّدات المتزامنة
يمكن إنشاء تيار Node.js قابل للقراءة من مُولِّد متزامن باستخدام طريقة الأداة المساعدة Readable.from()
:
const { Readable } = require('node:stream')
const ac = new AbortController()
const signal = ac.signal
async function* generate() {
yield 'a'
await someLongRunningFn({ signal })
yield 'b'
yield 'c'
}
const readable = Readable.from(generate())
readable.on('close', () => {
ac.abort()
})
readable.on('data', chunk => {
console.log(chunk)
})
الإرسال الأنبوبي إلى التيارات القابلة للكتابة من المُكرِّرات المتزامنة
عند الكتابة في تيار قابل للكتابة من مُكرِّر متزامن، تأكد من المعالجة الصحيحة للضغط العكسي والأخطاء. يُلخِّص stream.pipeline()
معالجة الضغط العكسي والأخطاء ذات الصلة بالضغط العكسي:
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')
const writable = fs.createWriteStream('./file')
const ac = new AbortController()
const signal = ac.signal
const iterator = createIterator({ signal })
// نمط الاستدعاء
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err)
} else {
console.log(value, 'value returned')
}
}).on('close', () => {
ac.abort()
})
// نمط الوعد
pipelinePromise(iterator, writable)
.then(value => {
console.log(value, 'value returned')
})
.catch(err => {
console.error(err)
ac.abort()
})
التوافق مع إصدارات Node.js الأقدم
قبل Node.js 0.10، كانت واجهة تدفق Readable
أبسط، ولكنها كانت أيضًا أقل قوة وأقل فائدة.
- بدلاً من انتظار المكالمات إلى طريقة
stream.read()
، ستبدأ أحداث'data'
في الإنبعاث على الفور. كانت التطبيقات التي تحتاج إلى القيام ببعض العمل لتحديد كيفية التعامل مع البيانات مطلوبة لتخزين البيانات المقروءة في المخازن المؤقتة حتى لا تضيع البيانات. - كانت طريقة
stream.pause()
استشارية، وليست مضمونة. وهذا يعني أنه كان لا يزال من الضروري أن تكون مستعدًا لتلقي أحداث'data'
حتى عندما يكون التدفق في حالة إيقاف مؤقت.
في Node.js 0.10، تم إضافة فئة Readable
. من أجل التوافق مع برامج Node.js الأقدم، تتحول تيارات Readable
إلى "وضع التدفق" عند إضافة مُعالِج أحداث 'data'
، أو عند استدعاء طريقة stream.resume()
. والنتيجة هي أنه، حتى عندما لا تستخدم الطريقة الجديدة stream.read()
وحدث 'readable'
، لم يعد من الضروري القلق بشأن فقدان أجزاء 'data'
.
بينما ستستمر معظم التطبيقات في العمل بشكل طبيعي، إلا أن هذا يقدم حالة حدية في الظروف التالية:
- لم يتم إضافة أي مُستمع حدث
'data'
. - لم يتم استدعاء طريقة
stream.resume()
أبدًا. - لم يتم توصيل التدفق بأي وجهة قابلة للكتابة.
على سبيل المثال، ضع في اعتبارك التعليمات البرمجية التالية:
// تحذير! معطوبة!
net
.createServer(socket => {
// نقوم بإضافة مُستمع 'end'، لكننا لا نستهلك البيانات أبدًا.
socket.on('end', () => {
// لن يصل أبدًا إلى هنا.
socket.end('The message was received but was not processed.\n')
})
})
.listen(1337)
قبل Node.js 0.10، كانت بيانات الرسالة الواردة تُرمى ببساطة. ومع ذلك، في Node.js 0.10 وما بعده، يبقى المقبس معلقًا إلى الأبد.
الحل في هذه الحالة هو استدعاء طريقة stream.resume()
لبدء تدفق البيانات:
// حل بديل.
net
.createServer(socket => {
socket.on('end', () => {
socket.end('The message was received but was not processed.\n')
})
// بدء تدفق البيانات، مع تجاهلها.
socket.resume()
})
.listen(1337)
بالإضافة إلى تيارات Readable
الجديدة التي تنتقل إلى وضع التدفق، يمكن لف تيارات الطراز السابق لـ 0.10 في فئة Readable
باستخدام طريقة readable.wrap()
.
readable.read(0)
هناك بعض الحالات التي يكون من الضروري فيها تشغيل تحديث لآليات تدفق القراءة الأساسية، دون استهلاك أي بيانات بالفعل. في مثل هذه الحالات، من الممكن استدعاء readable.read(0)
، والذي سيُعيد دائمًا null
.
إذا كان المخزن المؤقت الداخلي للقراءة أقل من highWaterMark
، ولم يكن التدفق يقرأ حاليًا، فإن استدعاء stream.read(0)
سيُشغّل استدعاءً منخفض المستوى لـ stream._read()
.
في حين أن معظم التطبيقات لن تحتاج أبدًا إلى القيام بذلك، إلا أن هناك حالات داخل Node.js يتم فيها ذلك، خاصةً في الأجزاء الداخلية من فئة تدفق Readable
.
readable.push('')
لا يُوصى باستخدام readable.push('')
.
إن دفع سلسلة بايت صفري <string>، <Buffer>، <TypedArray> أو <DataView> إلى تدفق ليس في وضع الكائن له تأثير جانبي مثير للاهتمام. لأنه هو استدعاء لـ readable.push()
، فإن الاستدعاء سينهي عملية القراءة. ومع ذلك، نظرًا لأن الوسيطة هي سلسلة فارغة، فلن يتم إضافة أي بيانات إلى المخزن المؤقت للقراءة، لذلك لا يوجد شيء للمستخدم لاستهلاكه.
عدم تطابق highWaterMark
بعد استدعاء readable.setEncoding()
سيؤدي استخدام readable.setEncoding()
إلى تغيير سلوك كيفية عمل highWaterMark
في الوضع غير الكائن.
عادةً، يتم قياس حجم المخزن المؤقت الحالي مقابل highWaterMark
بوحدات البايت. ومع ذلك، بعد استدعاء setEncoding()
، ستبدأ دالة المقارنة في قياس حجم المخزن المؤقت بوحدات الحروف.
ليست هذه مشكلة في الحالات الشائعة مع latin1
أو ascii
. لكن يُنصح بالانتباه إلى هذا السلوك عند العمل مع السلاسل التي قد تحتوي على أحرف متعددة البايت.