Skip to content

تدفق

[مستقر: 2 - مستقر]

مستقر: 2 استقرار: 2 - مستقر

رمز المصدر: lib/stream.js

التدفق هو واجهة مجردة للعمل مع البيانات المتدفقة في Node.js. توفر وحدة node:stream واجهة برمجة تطبيقات لتنفيذ واجهة التدفق.

هناك العديد من كائنات التدفق التي توفرها Node.js. على سبيل المثال، الطلب إلى خادم HTTP و process.stdout كلاهما مثيلات لتدفق.

يمكن أن تكون التدفقات قابلة للقراءة أو الكتابة أو كليهما. جميع التدفقات هي مثيلات لـ EventEmitter.

للوصول إلى وحدة node:stream:

js
const stream = require('node:stream')

وحدة node:stream مفيدة لإنشاء أنواع جديدة من مثيلات التدفق. عادةً لا يكون من الضروري استخدام وحدة node:stream لاستهلاك التدفقات.

تنظيم هذه الوثيقة

تحتوي هذه الوثيقة على قسمين رئيسيين وقسم ثالث للملاحظات. يشرح القسم الأول كيفية استخدام التدفقات الموجودة داخل تطبيق. يشرح القسم الثاني كيفية إنشاء أنواع جديدة من التدفقات.

أنواع التدفقات

هناك أربعة أنواع أساسية للتدفقات داخل Node.js:

  • Writable: تدفقات يمكن كتابة البيانات إليها (على سبيل المثال، fs.createWriteStream()).
  • Readable: تدفقات يمكن قراءة البيانات منها (على سبيل المثال، fs.createReadStream()).
  • Duplex: تدفقات قابلة للقراءة والكتابة (على سبيل المثال، net.Socket).
  • Transform: تدفقات Duplex يمكنها تعديل أو تحويل البيانات أثناء كتابتها وقراءتها (على سبيل المثال، zlib.createDeflate()).

بالإضافة إلى ذلك، تتضمن هذه الوحدة دوال المساعدة stream.duplexPair()، stream.pipeline()، stream.finished() stream.Readable.from()، و stream.addAbortSignal().

واجهة برمجة التطبيقات للمتابعة والوعود

مضاف في: v15.0.0

توفر واجهة برمجة التطبيقات stream/promises مجموعة بديلة من دوال المرافق غير المتزامنة للتيارات التي تُرجع كائنات Promise بدلاً من استخدام عمليات النداء العكسي. يمكن الوصول إلى واجهة برمجة التطبيقات عبر require('node:stream/promises') أو require('node:stream').promises.

stream.pipeline(source[, ...transforms], destination[, options])

stream.pipeline(streams[, options])

[السجل]

الإصدارالتغييرات
v18.0.0، v17.2.0، v16.14.0إضافة خيار end، والذي يمكن تعيينه إلى false لمنع إغلاق تيار الوجهة تلقائيًا عند انتهاء المصدر.
v15.0.0مضاف في: v15.0.0
js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
  console.log('Pipeline succeeded.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Pipeline succeeded.')

لاستخدام AbortSignal، قم بتمريره داخل كائن الخيارات، كحجة أخيرة. عندما يتم إلغاء الإشارة، سيتم استدعاء destroy على خط الأنابيب الأساسي، مع AbortError.

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  const ac = new AbortController()
  const signal = ac.signal

  setImmediate(() => ac.abort())
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
    signal,
  })
}

run().catch(console.error) // AbortError
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
  await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
  console.error(err) // AbortError
}

تدعم واجهة برمجة التطبيقات pipeline أيضًا المُولِّدات غير المتزامنة:

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(
    fs.createReadStream('lowercase.txt'),
    async function* (source, { signal }) {
      source.setEncoding('utf8') // التعامل مع السلاسل بدلاً من `Buffer`s.
      for await (const chunk of source) {
        yield await processChunk(chunk, { signal })
      }
    },
    fs.createWriteStream('uppercase.txt')
  )
  console.log('Pipeline succeeded.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'

await pipeline(
  createReadStream('lowercase.txt'),
  async function* (source, { signal }) {
    source.setEncoding('utf8') // التعامل مع السلاسل بدلاً من `Buffer`s.
    for await (const chunk of source) {
      yield await processChunk(chunk, { signal })
    }
  },
  createWriteStream('uppercase.txt')
)
console.log('Pipeline succeeded.')

تذكر التعامل مع وسيطة signal المُمرَّرة إلى المُولِّد غير المتزامن. خاصة في حالة كون المُولِّد غير المتزامن هو المصدر لخط الأنابيب (أي الحجة الأولى) أو لن يتم إكمال خط الأنابيب أبدًا.

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(async function* ({ signal }) {
    await someLongRunningfn({ signal })
    yield 'asd'
  }, fs.createWriteStream('uppercase.txt'))
  console.log('Pipeline succeeded.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
  await someLongRunningfn({ signal })
  yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline succeeded.')

توفر واجهة برمجة التطبيقات pipeline نسخة callback:

stream.finished(stream[, options])

[History]

الإصدارالتغييرات
v19.5.0, v18.14.0تمت إضافة دعم لـ ReadableStream و WritableStream.
v19.1.0, v18.13.0تمت إضافة خيار cleanup.
v15.0.0تمت الإضافة في: v15.0.0
js
const { finished } = require('node:stream/promises')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Stream is done reading.')
}

run().catch(console.error)
rs.resume() // Drain the stream.
js
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'

const rs = createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Stream is done reading.')
}

run().catch(console.error)
rs.resume() // Drain the stream.

يوفر واجهة برمجة التطبيقات finished أيضًا إصدارًا قائمًا على المُستدعي.

يترك stream.finished() مستمعي أحداث متدلية (خاصةً 'error', 'end', 'finish' و 'close') بعد حل أو رفض الوعد المُرجَع. والسبب في ذلك هو أن أحداث 'error' غير المتوقعة (بسبب تنفيذ التدفق غير الصحيح) لا تتسبب في تعطل غير متوقع. إذا كان هذا سلوكًا غير مرغوب فيه، فيجب تعيين options.cleanup على true:

js
await finished(rs, { cleanup: true })

وضع الكائن

جميع التدفقات التي تم إنشاؤها بواسطة واجهات برمجة التطبيقات Node.js تعمل حصريًا على سلاسل، <Buffer>، <TypedArray> وكائنات <DataView>:

  • Strings و Buffers هما الأنواع الأكثر شيوعًا المستخدمة مع التدفقات.
  • TypedArray و DataView يسمحان لك بالتعامل مع البيانات الثنائية بأنواع مثل Int32Array أو Uint8Array. عندما تقوم بكتابة TypedArray أو DataView إلى تدفق، تقوم Node.js بمعالجة البايت الخام.

ومع ذلك، من الممكن أن تعمل تنفيذات التدفق مع أنواع أخرى من قيم JavaScript (باستثناء null، والذي يخدم غرضًا خاصًا داخل التدفقات). تعتبر هذه التدفقات تعمل في "وضع الكائن".

يتم تبديل مثيلات التدفق إلى وضع الكائن باستخدام خيار objectMode عند إنشاء التدفق. إن محاولة تبديل تدفق موجود إلى وضع الكائن ليست آمنة.

التخزين المؤقت

تخزن كل من تدفقات Writable و Readable البيانات في مخزن مؤقت داخلي.

تعتمد كمية البيانات التي يمكن تخزينها مؤقتًا على خيار highWaterMark الممرر إلى مُنشئ التدفق. بالنسبة للتدفقات العادية، يُحدد خيار highWaterMark العدد الإجمالي للبايتات. بالنسبة للتدفقات التي تعمل في وضع الكائن، يُحدد highWaterMark العدد الإجمالي للكائنات. بالنسبة للتدفقات التي تعمل على (ولكن لا تقوم بفك تشفير) السلاسل، يُحدد highWaterMark العدد الإجمالي لوحدات ترميز UTF-16.

يتم تخزين البيانات مؤقتًا في تدفقات Readable عندما يقوم التنفيذ باستدعاء stream.push(chunk). إذا لم يستدعِ مُستهلك التدفق stream.read()، فستبقى البيانات في قائمة الانتظار الداخلية حتى يتم استهلاكها.

بمجرد أن يصل الحجم الإجمالي لمخزن مؤقت القراءة الداخلي إلى الحد المحدد بواسطة highWaterMark، سيتوقف التدفق مؤقتًا عن قراءة البيانات من المورد الأساسي حتى يمكن استهلاك البيانات المخزنة مؤقتًا حاليًا (أي أن التدفق سيتوقف عن استدعاء الطريقة الداخلية readable._read() المستخدمة لملء مخزن مؤقت القراءة).

يتم تخزين البيانات مؤقتًا في تدفقات Writable عندما يتم استدعاء طريقة writable.write(chunk) بشكل متكرر. طالما أن الحجم الإجمالي لمخزن مؤقت الكتابة الداخلي أقل من الحد المحدد بواسطة highWaterMark، ستعيد استدعاءات writable.write() قيمة true. بمجرد أن يصل حجم المخزن المؤقت الداخلي إلى أو يتجاوز highWaterMark، سيتم إرجاع false.

الهدف الرئيسي لواجهة برمجة التطبيقات stream، وخاصة طريقة stream.pipe()، هو الحد من تخزين البيانات مؤقتًا إلى مستويات مقبولة بحيث لا تغمر مصادر ووجهات السرعات المختلفة الذاكرة المتاحة.

خيار highWaterMark هو حد، وليس قيدًا: فهو يملي كمية البيانات التي يخزنها التدفق مؤقتًا قبل أن يتوقف عن طلب المزيد من البيانات. إنه لا يفرض قيدًا صارمًا على الذاكرة بشكل عام. قد تختار تنفيذات التدفق المحددة فرض قيود أكثر صرامة، ولكن هذا اختياري.

بما أن تدفقات Duplex و Transform هي كل من Readable و Writable، فإن كل منهما يحتفظ بـ اثنين من المخازن المؤقتة الداخلية المنفصلة المستخدمة للقراءة والكتابة، مما يسمح لكل جانب بالعمل بشكل مستقل عن الآخر مع الحفاظ على تدفق مناسب وفعال للبيانات. على سبيل المثال، تعتبر مثيلات net.Socket تدفقات Duplex حيث يسمح الجانب Readable باستهلاك البيانات الواردة من المقبس، ويسمح الجانب Writable بكتابة البيانات إلى المقبس. نظرًا لإمكانية كتابة البيانات إلى المقبس بمعدل أسرع أو أبطأ من استقبال البيانات، يجب أن يعمل كل جانب (ويخزن مؤقتًا) بشكل مستقل عن الآخر.

آليات التخزين المؤقت الداخلي هي تفاصيل تنفيذ داخلية وقد يتم تغييرها في أي وقت. ومع ذلك، بالنسبة لبعض التنفيذات المتقدمة، يمكن استرداد المخازن المؤقتة الداخلية باستخدام writable.writableBuffer أو readable.readableBuffer. يُنصح بتجنب استخدام هذه الخصائص غير الموثقة.

واجهة برمجة التطبيقات لمستهلكي التدفقات

تستخدم معظم تطبيقات Node.js، بغض النظر عن مدى بساطتها، التدفقات بطريقة ما. فيما يلي مثال على استخدام التدفقات في تطبيق Node.js الذي ينفذ خادم HTTP:

js
const http = require('node:http')

const server = http.createServer((req, res) => {
  // `req` هو http.IncomingMessage، وهو تدفق قابل للقراءة.
  // `res` هو http.ServerResponse، وهو تدفق قابل للكتابة.

  let body = ''
  // احصل على البيانات كسلاسل utf8.
  // إذا لم يتم تعيين ترميز، فسيتم استقبال كائنات Buffer.
  req.setEncoding('utf8')

  // تُصدر التدفقات القابلة للقراءة أحداث 'data' بمجرد إضافة مستمع.
  req.on('data', chunk => {
    body += chunk
  })

  // يشير حدث 'end' إلى استلام الجسم بالكامل.
  req.on('end', () => {
    try {
      const data = JSON.parse(body)
      // اكتب شيئًا مثيرًا للاهتمام للمستخدم:
      res.write(typeof data)
      res.end()
    } catch (er) {
      // أوه أوه! بيانات JSON خاطئة!
      res.statusCode = 400
      return res.end(`error: ${er.message}`)
    }
  })
})

server.listen(1337)

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON

تُعرض التدفقات القابلة للكتابة Writable (مثل res في المثال) طرقًا مثل write() و end() التي تُستخدم لكتابة البيانات على التدفق.

تستخدم التدفقات القابلة للقراءة Readable واجهة برمجة التطبيقات EventEmitter لإعلام رمز التطبيق عندما تتوفر بيانات للقراءة من التدفق. يمكن قراءة تلك البيانات المتاحة من التدفق بعدة طرق.

تستخدم كل من التدفقات القابلة للكتابة Writable والقابلة للقراءة Readable واجهة برمجة التطبيقات EventEmitter بطرق مختلفة للتواصل مع الحالة الحالية للتدفق.

التدفقات Duplex و Transform هما Writable و Readable.

لا يُطلب من التطبيقات التي تكتب بيانات إلى تدفق أو تستهلك بيانات منه تنفيذ واجهات التدفق مباشرةً، ولن يكون لديها عمومًا سبب للاتصال بـ require('node:stream').

يجب على المطورين الذين يرغبون في تنفيذ أنواع جديدة من التدفقات الرجوع إلى قسم واجهة برمجة التطبيقات لمنفذي التدفق.

تدفقات قابلة للكتابة

تُعد التدفقات القابلة للكتابة تجريدًا لـ وجهة يتم كتابة البيانات إليها.

أمثلة على تدفقات Writable:

بعض هذه الأمثلة هي في الواقع تدفقات Duplex التي تُنفذ واجهة Writable.

جميع تدفقات Writable تُنفذ الواجهة المُعرّفة بواسطة فئة stream.Writable.

في حين أن حالات مُحددة من تدفقات Writable قد تختلف بطرق مُتعددة، إلا أن جميع التدفقات Writable تتبع نفس نمط الاستخدام الأساسي كما هو موضح في المثال أدناه:

js
const myStream = getWritableStreamSomehow()
myStream.write('some data')
myStream.write('some more data')
myStream.end('done writing data')

فئة: stream.Writable

مضاف في: v0.9.4

حدث: 'close'

[تاريخ]

الإصدارالتغييرات
v10.0.0إضافة خيار emitClose لتحديد ما إذا كان يتم إصدار 'close' عند التدمير.
v0.9.4مضاف في: v0.9.4

يتم إصدار حدث 'close' عندما يتم إغلاق التدفق وأي من موارده الأساسية (وصف الملف، على سبيل المثال). يشير الحدث إلى أنه لن يتم إصدار المزيد من الأحداث، ولن يتم إجراء المزيد من الحسابات.

سوف تقوم تدفقات Writable دائمًا بإصدار حدث 'close' إذا تم إنشاؤها باستخدام خيار emitClose.

حدث: 'drain'

مضاف في: v0.9.4

إذا قامت مكالمة إلى stream.write(chunk) بإرجاع false، فسيتم إصدار حدث 'drain' عندما يكون من المناسب استئناف كتابة البيانات إلى التدفق.

js
// كتابة البيانات إلى تدفق الكتابة المُقدم مليون مرة.
// انتبه إلى ضغط الظهر.
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000
  write()
  function write() {
    let ok = true
    do {
      i--
      if (i === 0) {
        // آخر مرة!
        writer.write(data, encoding, callback)
      } else {
        // تحقق مما إذا كان ينبغي لنا المتابعة، أو الانتظار.
        // لا تُمرّر المُنعطف، لأننا لم ننتهي بعد.
        ok = writer.write(data, encoding)
      }
    } while (i > 0 && ok)
    if (i > 0) {
      // كان لا بد من التوقف مبكرًا!
      // اكتب المزيد بمجرد تصريفه.
      writer.once('drain', write)
    }
  }
}
الحدث: 'error'

مضاف في: v0.9.4

يُصدر حدث 'error' إذا حدث خطأ أثناء كتابة البيانات أو تمريرها. يتم تمرير مُعامل Error واحد إلى دالة المُستمع عند النداء.

يتم إغلاق التدفق عندما يتم إصدار حدث 'error' ما لم يتم تعيين خيار autoDestroy على false عند إنشاء التدفق.

بعد 'error'، لا يجب إصدار أي أحداث أخرى غير 'close' (بما في ذلك أحداث 'error' ).

الحدث: 'finish'

مضاف في: v0.9.4

يُصدر حدث 'finish' بعد استدعاء طريقة stream.end() ، وبعد تطهير جميع البيانات إلى النظام الأساسي.

js
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
  console.log('All writes are now complete.')
})
writer.end('This is the end\n')
الحدث: 'pipe'

مضاف في: v0.9.4

  • src <stream.Readable> تدفق المصدر الذي يقوم بالتمرير إلى هذا الكتابة

يُصدر حدث 'pipe' عند استدعاء طريقة stream.pipe() على تدفق قابل للقراءة، مضيفًا هذا الكتابة إلى مجموعة وجهاته.

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
  console.log('Something is piping into the writer.')
  assert.equal(src, reader)
})
reader.pipe(writer)
الحدث: 'unpipe'

مضاف في: v0.9.4

يُصدر حدث 'unpipe' عند استدعاء طريقة stream.unpipe() على تدفق Readable ، وإزالة هذا Writable من مجموعة وجهاته.

كما يتم إصدار هذا في حالة إصدار هذا التدفق Writable خطأً عندما يقوم تدفق Readable بالتمرير إليه.

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
  console.log('Something has stopped piping into the writer.')
  assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()

مُضاف في: v0.11.2

تُجبر طريقة writable.cork() جميع البيانات المكتوبة على التخزين المؤقت في الذاكرة. سيتم تفريغ البيانات المخزنة مؤقتًا عند استدعاء طريقتي stream.uncork() أو stream.end().

الهدف الأساسي من writable.cork() هو تسهيل الوضع الذي تُكتب فيه العديد من القطع الصغيرة إلى الدفق في تعاقب سريع. بدلاً من توجيهها على الفور إلى الوجهة الأساسية، يقوم writable.cork() بتخزين جميع القطع مؤقتًا حتى يتم استدعاء writable.uncork(), والذي سيرسلها جميعًا إلى writable._writev(), إذا وجد. هذا يمنع حالة حجب رأس الخط حيث يتم تخزين البيانات مؤقتًا أثناء انتظار معالجة أول قطعة صغيرة. ومع ذلك، قد يكون لاستخدام writable.cork() دون تنفيذ writable._writev() تأثير سلبي على الإنتاجية.

انظر أيضًا: writable.uncork()، writable._writev().

writable.destroy([error])

[السجل]

الإصدارالتغييرات
v14.0.0تعمل كعملية لا شيء على دفق تم تدميره بالفعل.
v8.0.0مُضاف في: v8.0.0
  • error <Error> اختياري، خطأ لإصداره مع حدث 'error'.
  • الإرجاع: <this>

تدمير الدفق. يُصدر اختيارياً حدث 'error'، ويُصدر حدث 'close' (ما لم يتم تعيين emitClose على false). بعد هذه المكالمة، يكون دفق الكتابة قد انتهى، وستؤدي المكالمات اللاحقة إلى write() أو end() إلى خطأ ERR_STREAM_DESTROYED. هذه طريقة مدمرة وفورية لتدمير دفق. قد لا تكون المكالمات السابقة إلى write() قد تم تصريفها، وقد تُشغل خطأ ERR_STREAM_DESTROYED. استخدم end() بدلاً من destroy إذا كان ينبغي شطف البيانات قبل الإغلاق، أو انتظر حدث 'drain' قبل تدمير الدفق.

js
const { Writable } = require('node:stream')

const myStream = new Writable()

const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
js
const { Writable } = require('node:stream')

const myStream = new Writable()

myStream.destroy()
myStream.on('error', function wontHappen() {})
js
const { Writable } = require('node:stream')

const myStream = new Writable()
myStream.destroy()

myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED

بمجرد استدعاء destroy()، ستكون أي مكالمات أخرى عملية لا شيء، ولن يتم إصدار أي أخطاء أخرى باستثناء الأخطاء من _destroy() كـ 'error'.

يجب على المُنفذين عدم تجاوز هذه الطريقة، ولكن بدلاً من ذلك تنفيذ writable._destroy().

writable.closed

مضاف في: v18.0.0

تكون true بعد إرسال 'close' .

writable.destroyed

مضاف في: v8.0.0

تكون true بعد استدعاء writable.destroy().

js
const { Writable } = require('node:stream')

const myStream = new Writable()

console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])

[History]

الإصدارالتغييرات
v22.0.0, v20.13.0يمكن الآن أن تكون وسيطة chunk مثيلًا لـTypedArray أو DataView.
v15.0.0يتم استدعاء callback قبل 'finish' أو عند حدوث خطأ.
v14.0.0يتم استدعاء callback إذا تم إرسال 'finish' أو 'error'.
v10.0.0تُعيد هذه الطريقة الآن مرجعًا إلى writable.
v8.0.0يمكن الآن أن تكون وسيطة chunk مثيلًا لـUint8Array.
v0.9.4مضاف في: v0.9.4
  • chunk <string> | <Buffer> | <TypedArray> | <DataView> | <any> بيانات اختيارية للكتابة. بالنسبة للتيارات التي لا تعمل في وضع الكائن، يجب أن يكون chunk <string> أو <Buffer> أو <TypedArray> أو <DataView>. بالنسبة لتيارات وضع الكائن، قد تكون chunk أي قيمة JavaScript بخلاف null.
  • encoding <string> الترميز إذا كان chunk سلسلة.
  • callback <Function> دالة الاستدعاء الراجعة عندما تنتهي البث.
  • تُرجع: <this>

إن استدعاء طريقة writable.end() يشير إلى أنه لن يتم كتابة المزيد من البيانات إلى Writable. تسمح الوسيطات الاختيارية chunk و encoding بكتابة جزء إضافي أخير من البيانات قبل إغلاق التيار.

سيؤدي استدعاء طريقة stream.write() بعد استدعاء stream.end() إلى إثارة خطأ.

js
// كتابة 'hello, ' ثم إنهاء بـ 'world!'.
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hello, ')
file.end('world!')
// الكتابة الآن غير مسموح بها!
writable.setDefaultEncoding(encoding)

[السجل]

الإصدارالتغييرات
v6.1.0تُعيد هذه الطريقة الآن مرجعًا إلى writable.
v0.11.15تمت الإضافة في: v0.11.15
  • encoding <سلسلة> ترميز الإعداد الافتراضي الجديد
  • القيمة المُرَجَّعة: <this>

تُعيّن طريقة writable.setDefaultEncoding() ترميز الإعداد الافتراضي encoding لدفق Writable.

writable.uncork()

تمت الإضافة في: v0.11.2

تقوم طريقة writable.uncork() بتفريغ جميع البيانات المُخزَّنة مؤقتًا منذ استدعاء stream.cork().

عند استخدام writable.cork() و writable.uncork() لإدارة تخزين كتابات الدفق مؤقتًا، تأجيل استدعاءات writable.uncork() باستخدام process.nextTick(). يسمح ذلك بتجميع جميع استدعاءات writable.write() التي تحدث داخل مرحلة حلقة أحداث Node.js معينة.

js
stream.cork()
stream.write('some ')
stream.write('data ')
process.nextTick(() => stream.uncork())

إذا تم استدعاء طريقة writable.cork() عدة مرات على دفق، فيجب استدعاء نفس عدد استدعاءات writable.uncork() لتفريغ البيانات المُخزَّنة مؤقتًا.

js
stream.cork()
stream.write('some ')
stream.cork()
stream.write('data ')
process.nextTick(() => {
  stream.uncork()
  // لن يتم تفريغ البيانات حتى يتم استدعاء uncork() للمرة الثانية.
  stream.uncork()
})

انظر أيضًا: writable.cork().

writable.writable

تمت الإضافة في: v11.4.0

تكون true إذا كان من الآمن استدعاء writable.write()، مما يعني أن الدفق لم يتم تدميره أو حدوث خطأ فيه أو إنهاؤه.

writable.writableAborted

تمت الإضافة في: v18.0.0، v16.17.0

[مستقر: 1 - تجريبي]

مستقر: 1 الثبات: 1 - تجريبي

يُرجع ما إذا كان الدفق قد تم تدميره أو حدوث خطأ فيه قبل إصدار 'finish'.

writable.writableEnded

مضاف في: v12.9.0

تكون قيمتها true بعد استدعاء writable.end(). لا تشير هذه الخاصية إلى ما إذا تم تفريغ البيانات أم لا، استخدم writable.writableFinished بدلاً من ذلك.

writable.writableCorked

مضاف في: v13.2.0، v12.16.0

عدد المرات التي يجب فيها استدعاء writable.uncork() من أجل إلغاء سدّ التدفق بالكامل.

writable.errored

مضاف في: v18.0.0

يُرجع خطأ إذا تم تدمير التدفق بخطأ.

writable.writableFinished

مضاف في: v12.6.0

تُعيّن إلى true مباشرة قبل إصدار حدث 'finish'.

writable.writableHighWaterMark

مضاف في: v9.3.0

تُرجع قيمة highWaterMark التي تم تمريرها عند إنشاء هذا Writable.

writable.writableLength

مضاف في: v9.4.0

تحتوي هذه الخاصية على عدد البايتات (أو الكائنات) في قائمة الانتظار الجاهزة للكتابة. توفر القيمة بيانات استبطانية بخصوص حالة highWaterMark.

writable.writableNeedDrain

مضاف في: v15.2.0، v14.17.0

تكون قيمتها true إذا امتلأ المخزن المؤقت للتدفق وسوف يُصدر التدفق 'drain'.

writable.writableObjectMode

مضاف في: v12.3.0

معرّف لخاصية objectMode لبثّ Writable معيّن.

writable[Symbol.asyncDispose]()

مضاف في: v22.4.0, v20.16.0

[مستقر: 1 - تجريبي]

مستقر: 1 استقرار: 1 - تجريبي

يطلق writable.destroy() مع AbortError ويعيد وعدًا يتم إنجازه عند انتهاء البث.

writable.write(chunk[, encoding][, callback])

[السجل]

الإصدارالتغييرات
v22.0.0, v20.13.0يمكن الآن أن تكون وسيطة chunk مثيلًا لـ TypedArray أو DataView.
v8.0.0يمكن الآن أن تكون وسيطة chunk مثيلًا لـ Uint8Array.
v6.0.0سيُعتبر تمرير null كوسيط chunk غير صالح دائمًا الآن، حتى في وضع الكائن.
v0.9.4مضاف في: v0.9.4
  • chunk <string> | <Buffer> | <TypedArray> | <DataView> | <any> بيانات اختيارية للكتابة. بالنسبة للبثّات التي لا تعمل في وضع الكائن، يجب أن يكون chunk <string>، أو <Buffer>، أو <TypedArray>، أو <DataView>. بالنسبة لبثّات وضع الكائن، قد يكون chunk أي قيمة جافا سكريبت بخلاف null.
  • encoding <string> | <null> الترميز، إذا كان chunk سلسلة. افتراضيًا: 'utf8'
  • callback <Function> دالة مُراجعة عند شطف جزء البيانات هذا.
  • القيمة المُعادة: <boolean> false إذا أراد البثّ أن تنتظر الكود المُستدعي حدث 'drain' قبل المتابعة في كتابة بيانات إضافية؛ وإلا true.

تكتب طريقة writable.write() بعض البيانات إلى البث، وتُطلق دالة المُراجعة المُعطاة بمجرد معالجة البيانات بالكامل. إذا حدث خطأ، فسيتم استدعاء دالة المُراجعة مع الخطأ كوسيطها الأول. يتم استدعاء دالة المُراجعة بشكل غير متزامن وقبل إصدار 'error'.

القيمة المُعادة هي true إذا كانت ذاكرة التخزين المؤقت الداخلية أقل من highWaterMark المُهيّأ عند إنشاء البث بعد قبول chunk. إذا تم إرجاع false، فيجب إيقاف المزيد من المحاولات لكتابة البيانات في البث حتى يتم إصدار حدث 'drain'.

بينما البثّ لا يُفرغ، ستُخزّن مُكالمات write() chunk، وستُعيد false. بمجرد تصريف جميع أجزاء البيانات المخزنة مؤقتًا حاليًا (مقبولة للتسليم بواسطة نظام التشغيل)، سيتم إصدار حدث 'drain'. بمجرد أن تُعيد write() قيمة false، لا تكتب المزيد من الأجزاء حتى يتم إصدار حدث 'drain'. بينما يُسمح باستدعاء write() على بثّ لا يُفرغ، سيُخزّن Node.js جميع الأجزاء المكتوبة حتى يحدث الحد الأقصى لاستخدام الذاكرة، عندها سيتم الإلغاء بشكل غير مشروط. حتى قبل أن يُلغي، سيؤدي استخدام الذاكرة العالي إلى ضعف أداء جامع القمامة وارتفاع RSS (الذي لا يتم إطلاقه عادةً مرة أخرى إلى النظام، حتى بعد عدم الحاجة إلى الذاكرة بعد الآن). نظرًا لأن مقابس TCP قد لا تُفرغ أبدًا إذا لم يقرأ الطرف البعيد البيانات، فقد يؤدي كتابة مقبس لا يُفرغ إلى ثغرة أمنية قابلة للاستغلال عن بُعد.

كتابة البيانات بينما البثّ لا يُفرغ أمرٌ مُشكِل بشكل خاص لـ Transform، لأن بثّات Transform مُوقفة افتراضيًا حتى يتم توصيلها أو إضافة مُعالِج حدث 'data' أو 'readable'.

إذا كان يمكن إنشاء البيانات المراد كتابتها أو جلبها عند الطلب، يُنصح بتغليف المنطق في Readable واستخدام stream.pipe(). ومع ذلك، إذا كان يُفضّل استدعاء write()، فمن الممكن احترام ضغط الظهر وتجنب مشكلات الذاكرة باستخدام حدث 'drain':

js
function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb)
  } else {
    process.nextTick(cb)
  }
}

// انتظر حتى يتم استدعاء cb قبل القيام بأي كتابة أخرى.
write('hello', () => {
  console.log('كتابة مكتملة، قم بإجراء المزيد من الكتابة الآن.')
})

سيُهمل بثّ Writable في وضع الكائن وسيط encoding دائمًا.

تدفّقات قابلة للقراءة

تُعدّ التدفّقات القابلة للقراءة تجرّداً لمصدر يتم استهلاك البيانات منه.

أمثلة على تدفّقات Readable:

جميع التدفّقات Readable تُنفّذ الواجهة المُعرّفة بواسطة فئة stream.Readable.

وضعا القراءة

تعمل تدفّقات Readable بشكل فعّال في أحد وضعين: وضع التدفّق ووضع الإيقاف المؤقّت. هذان الوضعان منفصلان عن وضع الكائن. يمكن أن يكون تدفّق Readable في وضع الكائن أو ليس فيه، بغضّ النظر عمّا إذا كان في وضع التدفّق أو وضع الإيقاف المؤقّت.

  • في وضع التدفّق، يتم قراءة البيانات من النظام الأساسي تلقائيًا وتوفيرها للتطبيق بأسرع ما يمكن باستخدام الأحداث عبر واجهة EventEmitter.
  • في وضع الإيقاف المؤقّت، يجب استدعاء طريقة stream.read() صراحةً لقراءة أجزاء البيانات من التدفّق.

تبدأ جميع التدفّقات Readable في وضع الإيقاف المؤقّت، ولكن يمكن تبديلها إلى وضع التدفّق بإحدى الطرق التالية:

يمكن أن يعود Readable إلى وضع الإيقاف المؤقّت باستخدام إحدى الطرق التالية:

  • إذا لم تكن هناك وجهات أنابيب، عن طريق استدعاء طريقة stream.pause().
  • إذا كانت هناك وجهات أنابيب، عن طريق إزالة جميع وجهات الأنابيب. يمكن إزالة العديد من وجهات الأنابيب عن طريق استدعاء طريقة stream.unpipe().

المفهوم المهم الذي يجب تذكّره هو أن Readable لن يُولّد بيانات حتى يتم توفير آلية لاستهلاك تلك البيانات أو تجاهلها. إذا تم تعطيل آلية الاستهلاك أو إزالتها، فإن Readable سيحاول إيقاف توليد البيانات.

لأسباب التوافق مع الإصدارات السابقة، فإن إزالة مُعالِجات أحداث 'data' لن يؤدّي إلى إيقاف التدفّق تلقائيًا. أيضًا، إذا كانت هناك وجهات أنابيب، فإن استدعاء stream.pause() لن يضمن أن يبقى التدفّق موقوفًا بمجرد أن تستنزف تلك الوجهات وتطلب المزيد من البيانات.

إذا تم تبديل Readable إلى وضع التدفّق ولم تكن هناك مستهلكات متاحة لمعالجة البيانات، فستضيع تلك البيانات. يمكن أن يحدث هذا، على سبيل المثال، عند استدعاء طريقة readable.resume() بدون مستمع مُرفق بحدث 'data'، أو عند إزالة مُعالِج حدث 'data' من التدفّق.

يؤدّي إضافة مُعالِج حدث 'readable' تلقائيًا إلى إيقاف تدفّق التدفّق، ويجب استهلاك البيانات عبر readable.read(). إذا تمت إزالة مُعالِج حدث 'readable'، فسوف يبدأ التدفّق مرة أخرى إذا كان هناك مُعالِج حدث 'data'.

ثلاث حالات

يُعد "الوضعان" لتشغيل دفق Readable تجريدًا مبسطًا لإدارة الحالة الداخلية الأكثر تعقيدًا التي تحدث داخل تنفيذ دفق Readable.

على وجه التحديد، في أي نقطة زمنية معينة، يكون كل دفق Readable في إحدى الحالات الثلاث الممكنة:

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

عندما يكون readable.readableFlowing مساويًا لـ null، لا يتم توفير آلية لاستهلاك بيانات الدفق. لذلك، لن يقوم الدفق بإنشاء بيانات. أثناء وجوده في هذه الحالة، سيؤدي إرفاق مستمع لحدث 'data'، أو استدعاء طريقة readable.pipe()، أو استدعاء طريقة readable.resume() إلى تبديل readable.readableFlowing إلى true، مما يتسبب في بدء دفق Readable في إصدار أحداث بشكل فعال عند إنشاء البيانات.

سيؤدي استدعاء readable.pause()، أو readable.unpipe()، أو تلقي ضغط عكسي إلى تعيين readable.readableFlowing على أنه false، مما يؤدي مؤقتًا إلى إيقاف تدفق الأحداث ولكن ليس إيقاف إنشاء البيانات. أثناء وجوده في هذه الحالة، لن يؤدي إرفاق مستمع لحدث 'data' إلى تبديل readable.readableFlowing إلى true.

js
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()

pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing is now false.

pass.on('data', chunk => {
  console.log(chunk.toString())
})
// readableFlowing is still false.
pass.write('ok') // Will not emit 'data'.
pass.resume() // Must be called to make stream emit 'data'.
// readableFlowing is now true.

بينما يكون readable.readableFlowing مساويًا لـ false، قد تتراكم البيانات داخل المخزن المؤقت الداخلي للدفق.

اختر أسلوب واجهة برمجة التطبيقات واحدًا

تطورت واجهة برمجة تطبيقات دفق Readable عبر إصدارات متعددة من Node.js وتوفر طرقًا متعددة لاستهلاك بيانات الدفق. بشكل عام، يجب على المطورين اختيار طريقة واحدة لاستهلاك البيانات ويجب عدم استخدام طرق متعددة أبدًا لاستهلاك البيانات من دفق واحد. على وجه التحديد، قد يؤدي استخدام مجموعة من on('data')، و on('readable')، و pipe()، أو المتكررات غير المتزامنة إلى سلوك غير بديهي.

الصف: stream.Readable

مضاف في: v0.9.4

الحدث: 'close'

[السجل]

الإصدارالتغييرات
v10.0.0إضافة خيار emitClose لتحديد ما إذا كان يتم إصدار 'close' عند التدمير.
v0.9.4مضاف في: v0.9.4

يتم إصدار حدث 'close' عندما يتم إغلاق الدفق وأي من موارده الأساسية (مثل مُوصِف الملف). يشير الحدث إلى أنه لن يتم إصدار المزيد من الأحداث، ولن يتم إجراء المزيد من الحسابات.

سوف يُصدر تيار Readable دائمًا حدث 'close' إذا تم إنشاؤه باستخدام خيار emitClose.

الحدث: 'data'

مضاف في: v0.9.4

  • chunk <Buffer> | <string> | <any> جزء البيانات. بالنسبة للتيارات التي لا تعمل في وضع الكائن، سيكون الجزء إما سلسلة أو Buffer. بالنسبة للتيارات الموجودة في وضع الكائن، يمكن أن يكون الجزء أي قيمة JavaScript بخلاف null.

يتم إصدار حدث 'data' كلما تخلى التيار عن ملكية جزء من البيانات إلى مستهلك. قد يحدث هذا كلما تم تبديل التيار في وضع التدفق عن طريق استدعاء readable.pipe()، readable.resume()، أو عن طريق إرفاق دالة استدعاء مُستمع إلى حدث 'data' . سيتم أيضًا إصدار حدث 'data' كلما تم استدعاء طريقة readable.read() وكانت هناك كتلة بيانات متاحة لإعادتها.

إن إرفاق مُستمع لحدث 'data' بتيار لم يتم إيقافه مؤقتًا بشكل صريح سيُبدّل التيار إلى وضع التدفق. سيتم تمرير البيانات بمجرد توفرها.

سيتم تمرير دالة الاستدعاء المُستمع جزء البيانات كسلسلة إذا تم تحديد ترميز افتراضي للتيار باستخدام طريقة readable.setEncoding()؛ وإلا سيتم تمرير البيانات كـ Buffer.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`استلمت ${chunk.length} بايت من البيانات.`)
})
الحدث: 'end'

أضيف في: v0.9.4

يُصدر حدث 'end' عندما لا يكون هناك المزيد من البيانات لاستهلاكها من الدفق.

لن يتم إصدار حدث 'end' إلا إذا تم استهلاك البيانات بالكامل. يمكن تحقيق ذلك عن طريق تبديل الدفق إلى وضع التدفق، أو عن طريق استدعاء stream.read() بشكل متكرر حتى يتم استهلاك جميع البيانات.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Received ${chunk.length} bytes of data.`)
})
readable.on('end', () => {
  console.log('There will be no more data.')
})
الحدث: 'error'

أضيف في: v0.9.4

قد يُصدر تنفيذ Readable حدث 'error' في أي وقت. عادةً، قد يحدث هذا إذا كان الدفق الأساسي غير قادر على إنشاء بيانات بسبب فشل داخلي أساسي، أو عندما يحاول تنفيذ دفق دفع جزء بيانات غير صالح.

سيتم تمرير كائن Error واحد إلى وظيفة الاستدعاء المُستمع.

الحدث: 'pause'

أضيف في: v0.9.4

يُصدر حدث 'pause' عندما يتم استدعاء stream.pause() و readableFlowing ليس false.

الحدث: 'readable'

[السجل]

الإصدارالتغييرات
v10.0.0يُصدر 'readable' دائمًا في التكرار التالي بعد استدعاء .push()
v10.0.0يتطلب استخدام 'readable' استدعاء .read().
v0.9.4أضيف في: v0.9.4

يُصدر حدث 'readable' عندما تتوفر بيانات للقراءة من الدفق، حتى علامة المياه العالية المُهيأة (state.highWaterMark). في الواقع، يشير إلى أن الدفق يحتوي على معلومات جديدة داخل المخزن المؤقت. إذا كانت البيانات متاحة داخل هذا المخزن المؤقت، فيمكن استدعاء stream.read() لاسترداد تلك البيانات. بالإضافة إلى ذلك، قد يتم إصدار حدث 'readable' أيضًا عندما يتم الوصول إلى نهاية الدفق.

js
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
  // هناك بعض البيانات للقراءة الآن.
  let data

  while ((data = this.read()) !== null) {
    console.log(data)
  }
})

إذا تم الوصول إلى نهاية الدفق، فإن استدعاء stream.read() سيرجع null ويُشغل حدث 'end' . هذا صحيح أيضًا إذا لم تكن هناك بيانات على الإطلاق للقراءة. على سبيل المثال، في المثال التالي، foo.txt هو ملف فارغ:

js
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
  console.log('end')
})

مخرجات تشغيل هذا البرنامج النصي هي:

bash
$ node test.js
readable: null
end

في بعض الحالات، سيؤدي إرفاق مُستمع لحدث 'readable' إلى قراءة بعض البيانات في مخزن مؤقت داخلي.

بشكل عام، آليات readable.pipe() وحدث 'data' أسهل في الفهم من حدث 'readable' . ومع ذلك، قد يؤدي التعامل مع 'readable' إلى زيادة الإنتاجية.

إذا تم استخدام كل من 'readable' و 'data' في نفس الوقت، فإن 'readable' يأخذ الأولوية في التحكم في التدفق، أي أن 'data' سيتم إصداره فقط عند استدعاء stream.read(). ستصبح خاصية readableFlowing false. إذا كانت هناك مُستمعين 'data' عند إزالة 'readable'، فسوف يبدأ الدفق بالتدفق، أي سيتم إصدار أحداث 'data' بدون استدعاء .resume().

الحدث: 'resume'

أضيف في: v0.9.4

يتم إصدار حدث 'resume' عند استدعاء stream.resume() و readableFlowing ليس true.

readable.destroy([error])

[السجل]

الإصدارالتغييرات
v14.0.0يعمل كعملية لا شيء على تيار تم تدميره بالفعل.
v8.0.0أضيف في: v8.0.0
  • error <Error> خطأ سيتم تمريره كحمولة في حدث 'error'
  • القيمة المُرجعة: <this>

تدمير التيار. و بشكل اختياري، إصدار حدث 'error'، وإصدار حدث 'close' (ما لم يتم تعيين emitClose إلى false). بعد هذه المكالمة، سيُطلق تيار القراءة أي موارد داخلية، وستُتجاهل المكالمات اللاحقة إلى push().

بمجرد استدعاء destroy()، ستكون أي مكالمات أخرى عملية لا شيء، ولن يتم إصدار أي أخطاء أخرى باستثناء الأخطاء من _destroy() كـ 'error'.

يجب على المُنفذين عدم تجاوز هذه الطريقة، ولكن بدلاً من ذلك، تنفيذ readable._destroy().

readable.closed

أضيف في: v18.0.0

يكون true بعد إصدار 'close'.

readable.destroyed

أضيف في: v8.0.0

يكون true بعد استدعاء readable.destroy().

readable.isPaused()

أضيف في: v0.11.14

ترجع طريقة readable.isPaused() الحالة التشغيلية الحالية لـ Readable. ويستخدم هذا بشكل أساسي بواسطة الآلية التي تقوم بتأسيس طريقة readable.pipe(). في معظم الحالات النموذجية، لن يكون هناك سبب لاستخدام هذه الطريقة مباشرة.

js
const readable = new stream.Readable()

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()

مضاف في: v0.9.4

ستؤدي طريقة readable.pause() إلى إيقاف تدفق البيانات في وضع التدفق، مما يؤدي إلى إيقاف إصدار أحداث 'data'، والانتقال من وضع التدفق. ستبقى أي بيانات متاحة في المخزن المؤقت الداخلي.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Received ${chunk.length} bytes of data.`)
  readable.pause()
  console.log('There will be no additional data for 1 second.')
  setTimeout(() => {
    console.log('Now data will start flowing again.')
    readable.resume()
  }, 1000)
})

لا تؤثر طريقة readable.pause() إذا كان هناك مُستمع للأحداث 'readable'.

readable.pipe(destination[, options])

مضاف في: v0.9.4

  • destination <stream.Writable> الوجهة لكتابة البيانات

  • options <Object> خيارات الأنابيب

    • end <boolean> إنهاء الكاتب عند انتهاء القارئ. الافتراضي: true.
  • مُخرجات: <stream.Writable> الـوجهة، مما يسمح بسلسلة من الأنابيب إذا كانت تدفقًا Duplex أو تدفقًا Transform

تقوم طريقة readable.pipe() بتوصيل تدفق Writable بـ readable، مما يجعله يتحول تلقائيًا إلى وضع التدفق ويدفع جميع بياناته إلى Writable المُرفق. سيتم إدارة تدفق البيانات تلقائيًا بحيث لا يُثقل تدفق Writable الوجهة بتدفق Readable أسرع.

يوضح المثال التالي جميع البيانات من readable في ملف باسم file.txt:

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// جميع البيانات من readable تذهب إلى 'file.txt'.
readable.pipe(writable)

من الممكن إرفاق تيارات Writable متعددة بتيار Readable واحد.

ترجع طريقة readable.pipe() مرجعًا إلى تيار الوجهة مما يجعل من الممكن إعداد سلاسل من التيارات المُوصلة:

js
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)

افتراضيًا، يتم استدعاء stream.end() على تيار Writable الوجهة عندما يصدر تيار Readable المصدر 'end'، بحيث لا يكون الوجهة قابلاً للكتابة بعد الآن. لإلغاء هذا السلوك الافتراضي، يمكن تمرير خيار end كـ false، مما يتسبب في بقاء تيار الوجهة مفتوحًا:

js
reader.pipe(writer, { end: false })
reader.on('end', () => {
  writer.end('Goodbye\n')
})

هناك تحذير مهم وهو أنه إذا أصدر تيار Readable خطأً أثناء المعالجة، فلن يتم إغلاق وجهة Writable تلقائيًا. إذا حدث خطأ، فستكون هناك حاجة إلى إغلاق كل تيار يدويًا لمنع تسرب الذاكرة.

لا يتم إغلاق تيارات Writable process.stderr و process.stdout أبدًا حتى ينتهي إخراج عملية Node.js، بغض النظر عن الخيارات المحددة.

readable.read([size])

مضاف في: v0.9.4

  • size <number> وسيطة اختيارية لتحديد مقدار البيانات التي سيتم قراءتها.
  • القيمة المُرجعة: <string> | <Buffer> | <null> | <any>

تقوم طريقة readable.read() بقراءة البيانات من المخزن المؤقت الداخلي وإرجاعها. إذا لم تكن هناك بيانات متاحة للقراءة، يتم إرجاع null. بشكل افتراضي، يتم إرجاع البيانات ككائن Buffer ما لم يتم تحديد ترميز باستخدام طريقة readable.setEncoding() أو يعمل التدفق في وضع الكائن.

تحدد الوسيطة الاختيارية size عددًا محددًا من البايتات التي سيتم قراءتها. إذا لم تكن بايتات size متاحة للقراءة، فسيتم إرجاع null إلا إذا انتهى التدفق، وفي هذه الحالة سيتم إرجاع جميع البيانات المتبقية في المخزن المؤقت الداخلي.

إذا لم يتم تحديد الوسيطة size، فسيتم إرجاع جميع البيانات الموجودة في المخزن المؤقت الداخلي.

يجب أن تكون الوسيطة size أقل من أو تساوي 1 جيجابايت.

يجب فقط استدعاء طريقة readable.read() على تيارات Readable تعمل في وضع الإيقاف المؤقت. في الوضع المتدفق، يتم استدعاء readable.read() تلقائيًا حتى يتم تفريغ المخزن المؤقت الداخلي بالكامل.

js
const readable = getReadableStreamSomehow()

// قد يتم تشغيل 'readable' عدة مرات مع تخزين البيانات مؤقتًا
readable.on('readable', () => {
  let chunk
  console.log('التيار قابل للقراءة (تم استلام بيانات جديدة في المخزن المؤقت)')
  // استخدم حلقة للتأكد من أننا نقرأ جميع البيانات المتاحة حاليًا
  while (null !== (chunk = readable.read())) {
    console.log(`تم قراءة ${chunk.length} بايت من البيانات...`)
  }
})

// سيتم تشغيل 'end' مرة واحدة عندما لا توجد بيانات أخرى متاحة
readable.on('end', () => {
  console.log('وصل إلى نهاية التيار.')
})

كل استدعاء لـ readable.read() يُرجع جزءًا من البيانات أو null، مما يشير إلى عدم وجود المزيد من البيانات للقراءة في تلك اللحظة. لا يتم دمج هذه الأجزاء تلقائيًا. نظرًا لأن استدعاء read() واحدًا لا يُرجع جميع البيانات، فقد يكون من الضروري استخدام حلقة while لقراءة الأجزاء باستمرار حتى يتم استرداد جميع البيانات. عند قراءة ملف كبير، قد تُرجع .read() قيمة null مؤقتًا، مما يشير إلى أنها استهلكت جميع المحتويات المخزنة مؤقتًا ولكن قد يكون هناك المزيد من البيانات التي لم يتم تخزينها مؤقتًا بعد. في مثل هذه الحالات، يتم إصدار حدث 'readable' جديد بمجرد وجود المزيد من البيانات في المخزن المؤقت، ويشير حدث 'end' إلى نهاية نقل البيانات.

لذلك، لقراءة محتويات الملف بالكامل من readable، من الضروري جمع الأجزاء عبر أحداث 'readable' متعددة:

js
const chunks = []

readable.on('readable', () => {
  let chunk
  while (null !== (chunk = readable.read())) {
    chunks.push(chunk)
  }
})

readable.on('end', () => {
  const content = chunks.join('')
})

سيرجع تيار Readable في وضع الكائن دائمًا عنصرًا واحدًا من استدعاء لـ readable.read(size)، بغض النظر عن قيمة الوسيطة size.

إذا قامت طريقة readable.read() بإرجاع جزء من البيانات، فسيتم أيضًا إصدار حدث 'data'.

سيؤدي استدعاء stream.read([size]) بعد إصدار حدث 'end' إلى إرجاع null. لن يتم إثارة أي خطأ وقت التشغيل.

readable.readable

مضاف في: v11.4.0

تكون true إذا كان من الآمن استدعاء readable.read()، مما يعني أن الدفق لم يتم تدميره أو إصدار 'error' أو 'end'.

readable.readableAborted

مضاف في: v16.8.0

[مستقر: 1 - تجريبي]

مستقر: 1 ثبات: 1 - تجريبي

يُرجع ما إذا كان الدفق قد تم تدميره أو حدوث خطأ فيه قبل إصدار 'end'.

readable.readableDidRead

مضاف في: v16.7.0، v14.18.0

[مستقر: 1 - تجريبي]

مستقر: 1 ثبات: 1 - تجريبي

يُرجع ما إذا كان 'data' قد تم إصداره.

readable.readableEncoding

مضاف في: v12.7.0

طريقة الحصول على الخاصية encoding لدفق Readable معين. يمكن تعيين الخاصية encoding باستخدام طريقة readable.setEncoding().

readable.readableEnded

مضاف في: v12.9.0

يصبح true عندما يتم إصدار حدث 'end'.

readable.errored

مضاف في: v18.0.0

يُرجع الخطأ إذا تم تدمير الدفق بخطأ.

readable.readableFlowing

مضاف في: v9.4.0

تعكس هذه الخاصية الحالة الحالية لدفق Readable كما هو موضح في قسم الحالات الثلاث.

readable.readableHighWaterMark

مضاف في: v9.3.0

يرجع قيمة highWaterMark التي تم تمريرها عند إنشاء هذا Readable.

readable.readableLength

مضاف في: v9.4.0

تحتوي هذه الخاصية على عدد البايتات (أو الكائنات) في قائمة الانتظار جاهزة للقراءة. توفر القيمة بيانات استبطان فيما يتعلق بحالة highWaterMark.

readable.readableObjectMode

مضاف في: v12.3.0

طريقة الحصول على خاصية objectMode لتيار Readable معين.

readable.resume()

[السجل]

الإصدارالتغييرات
v10.0.0لا يؤثر resume() إذا كان هناك حدث 'readable' يستمع.
v0.9.4مضاف في: v0.9.4

تؤدي طريقة readable.resume() إلى استئناف تيار Readable الذي تم إيقافه صراحةً في إصدار أحداث 'data'، مما يحول التيار إلى وضع التدفق.

يمكن استخدام طريقة readable.resume() لاستهلاك البيانات بالكامل من تيار دون معالجة أي من هذه البيانات بالفعل:

js
getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('وصل إلى النهاية، ولكن لم يقرأ أي شيء.')
  })

لا تؤثر طريقة readable.resume() إذا كان هناك مستمع حدث 'readable'.

readable.setEncoding(encoding)

مضاف في: v0.9.4

  • encoding <string> ترميز الاستخدام.
  • الإرجاع: <this>

تعيّن طريقة readable.setEncoding() ترميز الأحرف لبيانات التي يتم قراءتها من تيار Readable.

بشكل افتراضي، لا يتم تعيين أي ترميز وسيتم إرجاع بيانات التيار ككائنات Buffer. يؤدي تعيين ترميز إلى إرجاع بيانات التيار كسلاسل نصية من الترميز المحدد بدلاً من كائنات Buffer. على سبيل المثال، سيؤدي استدعاء readable.setEncoding('utf8') إلى تفسير بيانات الإخراج كبيانات UTF-8، ومرورها كسلاسل نصية. سيؤدي استدعاء readable.setEncoding('hex') إلى ترميز البيانات بتنسيق سلسلة سداسية عشرية.

سيتعامل تيار Readable بشكل صحيح مع الأحرف متعددة البايت التي يتم تسليمها من خلال التيار والتي ستكون مشفرة بشكل غير صحيح بخلاف ذلك إذا تم سحبها ببساطة من التيار ككائنات Buffer.

js
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
  assert.equal(typeof chunk, 'string')
  console.log('حصلت على %d حرف من بيانات السلسلة النصية:', chunk.length)
})
readable.unpipe([destination])

مضاف في: v0.9.4

طريقة readable.unpipe() تفصل تدفق Writable تم إرفاقه مسبقًا باستخدام طريقة stream.pipe().

إذا لم يتم تحديد destination، فسيتم فصل جميع الأنابيب.

إذا تم تحديد destination، ولكن لم يتم إعداد أي أنبوب له، فإن الطريقة لا تفعل شيئًا.

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// جميع البيانات من readable تذهب إلى 'file.txt'،
// ولكن فقط للثانية الأولى.
readable.pipe(writable)
setTimeout(() => {
  console.log('إيقاف الكتابة في file.txt.')
  readable.unpipe(writable)
  console.log('إغلاق تدفق الملف يدويًا.')
  writable.end()
}, 1000)
readable.unshift(chunk[, encoding])

[السجل]

الإصدارالتغييرات
v22.0.0, v20.13.0يمكن أن تكون وسيطة chunk الآن مثيلًا لـ TypedArray أو DataView.
v8.0.0يمكن أن تكون وسيطة chunk الآن مثيلًا لـ Uint8Array.
v0.9.11مضاف في: v0.9.11
  • chunk <Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> جزء من البيانات لوضعه في بداية قائمة الانتظار للقراءة. بالنسبة للتيارات التي لا تعمل في وضع الكائن، يجب أن يكون chunk <string>، <Buffer>، <TypedArray>، <DataView> أو null. بالنسبة لتيارات وضع الكائن، قد يكون chunk أي قيمة جافا سكريبت.
  • encoding <string> ترميز أجزاء السلسلة. يجب أن يكون ترميزًا صالحًا لـ Buffer، مثل 'utf8' أو 'ascii'.

إن تمرير chunk كـ null يشير إلى نهاية التدفق (EOF) ويتصرف بنفس طريقة readable.push(null)، وبعد ذلك لا يمكن كتابة المزيد من البيانات. يتم وضع إشارة EOF في نهاية المخزن المؤقت وستظل جميع البيانات المخزنة مؤقتًا يتم تفريغها.

تضع طريقة readable.unshift() جزءًا من البيانات مرة أخرى في المخزن المؤقت الداخلي. هذا مفيد في بعض المواقف حيث يتم استهلاك التدفق بواسطة التعليمات البرمجية التي تحتاج إلى "إلغاء استهلاك" بعض كمية البيانات التي سحبتها بشكل متفائل من المصدر، بحيث يمكن تمرير البيانات إلى طرف آخر.

لا يمكن استدعاء طريقة stream.unshift(chunk) بعد إصدار حدث 'end' وإلا سيتم طرح خطأ وقت التشغيل.

يجب على المطورين الذين يستخدمون stream.unshift() غالبًا أن يفكروا في التبديل إلى استخدام تدفق Transform بدلاً من ذلك. راجع قسم واجهة برمجة التطبيقات للمطورين الذين ينفذون التيارات لمزيد من المعلومات.

js
// سحب رأس محدد بواسطة \n\n.
// استخدام unshift() إذا حصلنا على الكثير.
// استدعاء callback مع (error, header, stream).
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
  stream.on('error', callback)
  stream.on('readable', onReadable)
  const decoder = new StringDecoder('utf8')
  let header = ''
  function onReadable() {
    let chunk
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk)
      if (str.includes('\n\n')) {
        // تم العثور على حد الرأس.
        const split = str.split(/\n\n/)
        header += split.shift()
        const remaining = split.join('\n\n')
        const buf = Buffer.from(remaining, 'utf8')
        stream.removeListener('error', callback)
        // إزالة مُستمع 'readable' قبل unshifting.
        stream.removeListener('readable', onReadable)
        if (buf.length) stream.unshift(buf)
        // الآن يمكن قراءة جسم الرسالة من التدفق.
        callback(null, header, stream)
        return
      }
      // لا يزال قراءة الرأس.
      header += str
    }
  }
}

على عكس stream.push(chunk)، فإن stream.unshift(chunk) لن ينهي عملية القراءة عن طريق إعادة تعيين حالة القراءة الداخلية للتدفق. هذا قد يتسبب في نتائج غير متوقعة إذا تم استدعاء readable.unshift() أثناء القراءة (أي من داخل تنفيذ stream._read() على تدفق مخصص). سيعيد استدعاء readable.unshift() مع stream.push('') الفوري إعادة تعيين حالة القراءة بشكل مناسب، ومع ذلك من الأفضل تجنب استدعاء readable.unshift() أثناء عملية إجراء القراءة.

readable.wrap(stream)

تم الإضافة في: v0.9.4

  • stream <Stream> دفق قابل للقراءة "من الطراز القديم"
  • الإرجاع: <this>

قبل Node.js 0.10، لم تُنفذ الدفقات واجهة برمجة تطبيقات وحدة node:stream بالكامل كما هو محدد حاليًا. (راجع التوافق لمزيد من المعلومات.)

عند استخدام مكتبة Node.js أقدم تُصدر أحداث 'data' ولديها طريقة stream.pause() استشارية فقط، يمكن استخدام طريقة readable.wrap() لإنشاء دفق Readable يستخدم الدفق القديم كمصدر بيانات له.

نادراً ما يكون من الضروري استخدام readable.wrap()، ولكن تم توفير هذه الطريقة لتسهيل التفاعل مع تطبيقات ومكتبات Node.js الأقدم.

js
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)

myReader.on('readable', () => {
  myReader.read() // إلخ.
})
readable[Symbol.asyncIterator]()

[السجل]

الإصدارالتغييرات
v11.14.0لم يعد دعم Symbol.asyncIterator تجريبيًا.
v10.0.0تم الإضافة في: v10.0.0
  • الإرجاع: <AsyncIterator> لاستهلاك الدفق بالكامل.
js
const fs = require('node:fs')

async function print(readable) {
  readable.setEncoding('utf8')
  let data = ''
  for await (const chunk of readable) {
    data += chunk
  }
  console.log(data)
}

print(fs.createReadStream('file')).catch(console.error)

إذا انتهت الحلقة باستخدام break أو return أو throw، فسيتم تدمير الدفق. بعبارة أخرى، فإن التكرار فوق دفق سيستهلك الدفق بالكامل. سيتم قراءة الدفق على شكل أجزاء بحجم يساوي خيار highWaterMark. في مثال التعليمات البرمجية أعلاه، ستكون البيانات في جزء واحد إذا كان الملف يحتوي على بيانات أقل من 64 كيلوبايت لأن خيار highWaterMark غير مُقدم إلى fs.createReadStream().

readable[Symbol.asyncDispose]()

أضيف في: v20.4.0، v18.18.0

[مستقر: 1 - تجريبي]

مستقر: 1 الثبات: 1 - تجريبي

يطلق readable.destroy() باستخدام AbortError ويعيد وعدًا يتم تحقيقه عند انتهاء التدفق.

readable.compose(stream[, options])

أضيف في: v19.1.0، v18.13.0

[مستقر: 1 - تجريبي]

مستقر: 1 الثبات: 1 - تجريبي

js
import { Readable } from 'node:stream'

async function* splitToWords(source) {
  for await (const chunk of source) {
    const words = String(chunk).split(' ')

    for (const word of words) {
      yield word
    }
  }
}

const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()

console.log(words) // يطبع ['this', 'is', 'compose', 'as', 'operator']

انظر إلى stream.compose لمزيد من المعلومات.

readable.iterator([options])

أضيف في: v16.3.0

[مستقر: 1 - تجريبي]

مستقر: 1 الثبات: 1 - تجريبي

  • options <Object>

    • destroyOnReturn <boolean> عند تعيينه على false، فإن استدعاء return على مُكرر غير متزامن، أو الخروج من تكرار for await...of باستخدام break، return، أو throw لن يدمر التدفق. افتراضيًا: true.
  • الإرجاع: <AsyncIterator> لاستهلاك التدفق.

يمنح المُكرر الذي تم إنشاؤه بواسطة هذه الطريقة المستخدمين خيار إلغاء تدمير التدفق إذا تم الخروج من حلقة for await...of بواسطة return، break، أو throw، أو إذا كان يجب على المُكرر تدمير التدفق إذا أصدر التدفق خطأً أثناء التكرار.

js
const { Readable } = require('node:stream')

async function printIterator(readable) {
  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk) // 1
    break
  }

  console.log(readable.destroyed) // false

  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk) // سيطبع 2 ثم 3
  }

  console.log(readable.destroyed) // true، تم استهلاك التدفق بالكامل
}

async function printSymbolAsyncIterator(readable) {
  for await (const chunk of readable) {
    console.log(chunk) // 1
    break
  }

  console.log(readable.destroyed) // true
}

async function showBoth() {
  await printIterator(Readable.from([1, 2, 3]))
  await printSymbolAsyncIterator(Readable.from([1, 2, 3]))
}

showBoth()
readable.map(fn[, options])

[History]

الإصدارالتغييرات
v20.7.0, v18.19.0تمت إضافة highWaterMark في الخيارات.
v17.4.0, v16.14.0تمت الإضافة في: v17.4.0, v16.14.0

[Stable: 1 - Experimental]

Stable: 1 Stability: 1 - تجريبي

  • fn <Function> | <AsyncFunction> دالة لإنشاء خريطة لكل جزء في الدفق.

    • data <any> جزء من البيانات من الدفق.
    • options <Object>
    • signal <AbortSignal> يتم إيقافه إذا تم تدمير الدفق مما يسمح بإلغاء استدعاء fn مبكرًا.
  • options <Object>

    • concurrency <number> الحد الأقصى لاستدعاءات fn المتزامنة التي سيتم استدعاؤها على الدفق في وقت واحد. الافتراضي: 1.
    • highWaterMark <number> عدد العناصر التي سيتم تخزينها مؤقتًا أثناء انتظار استهلاك المستخدم للعناصر المُنشأة خريطة لها. الافتراضي: concurrency * 2 - 1.
    • signal <AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
  • الإرجاع: <Readable> دفق تم إنشاء خريطة له باستخدام الدالة fn.

تسمح هذه الطريقة بإنشاء خريطة للدفق. سيتم استدعاء دالة fn لكل جزء في الدفق. إذا قامت دالة fn بإرجاع وعود - فسيتم انتظار هذا الوعد قبل تمريره إلى دفق النتيجة.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// مع مُنشئ خريطة متزامن.
for await (const chunk of Readable.from([1, 2, 3, 4]).map(x => x * 2)) {
  console.log(chunk) // 2, 4, 6, 8
}
// مع مُنشئ خريطة غير متزامن، مع إجراء ما يصل إلى 2 استعلام في وقت واحد.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
  domain => resolver.resolve4(domain),
  { concurrency: 2 }
)
for await (const result of dnsResults) {
  console.log(result) // يقوم بتسجيل نتيجة DNS لـ resolver.resolve4.
}
readable.filter(fn[, options])

[السجل]

الإصدارالتغييرات
v20.7.0, v18.19.0تمت إضافة highWaterMark في الخيارات.
v17.4.0, v16.14.0تمت الإضافة في: v17.4.0, v16.14.0

[مستقر: 1 - تجريبي]

مستقر: 1 الثبات: 1 - تجريبي

  • fn <دالة> | <دالة غير متزامنة> دالة لتصفية أجزاء من الدفق.

    • data <أي> جزء من البيانات من الدفق.
    • options <كائن>
    • signal <إشارة إلغاء> يتم إلغاؤها إذا تم تدمير الدفق مما يسمح بإلغاء دعوة fn مبكرًا.
  • options <كائن>

    • concurrency <رقم> الحد الأقصى للاستدعاء المتزامن لـ fn للاتصال بالدفق مرة واحدة. افتراضيًا: 1.
    • highWaterMark <رقم> عدد العناصر التي سيتم تخزينها مؤقتًا أثناء انتظار استهلاك المستخدم للعناصر المفلترة. افتراضيًا: concurrency * 2 - 1.
    • signal <إشارة إلغاء> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
  • الإرجاع: <قابل للقراءة> دفق تم تصفيته باستخدام المُتَنبِئ fn.

تسمح هذه الطريقة بتصفية الدفق. بالنسبة لكل جزء في الدفق، سيتم استدعاء دالة fn وإذا أعادت قيمة صحيحة، فسيتم تمرير الجزء إلى دفق النتيجة. إذا أعادت دالة fn وعدًا - فسيتم انتظار هذا الوعد.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// مع مُتَنبِئ مُزامِن.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
  console.log(chunk) // 3, 4
}
// مع مُتَنبِئ غير مُزامِن، مع إجراء ما يصل إلى 2 استعلام في وقت واحد.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).filter(
  async domain => {
    const { address } = await resolver.resolve4(domain, { ttl: true })
    return address.ttl > 60
  },
  { concurrency: 2 }
)
for await (const result of dnsResults) {
  // يسجل النطاقات التي تحتوي على أكثر من 60 ثانية على سجل dns المُحلل.
  console.log(result)
}
readable.forEach(fn[, options])

تم الإضافة في: v17.5.0، v16.15.0

[مستقر: 1 - تجريبي]

مستقر: 1 استقرار: 1 - تجريبي

  • fn <Function> | <AsyncFunction> دالة تُستدعى على كل جزء من أجزاء الدفق.

    • data <any> جزء من البيانات من الدفق.
    • options <Object>
    • signal <AbortSignal> يُلغى إذا تم تدمير الدفق مما يسمح بإلغاء دعوة fn مبكرًا.
  • options <Object>

    • concurrency <number> الحد الأقصى للاستدعاء المتزامن لـ fn الذي يجب استدعاءه على الدفق في وقت واحد. افتراضي: 1.
    • signal <AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
  • الإرجاع: <Promise> وعدٌ لمتى ينتهي الدفق.

تسمح هذه الطريقة بتكرار دفق. بالنسبة لكل جزء في الدفق، سيتم استدعاء دالة fn. إذا قامت دالة fn بإرجاع وعد، فسيتم انتظار هذا الوعد.

تختلف هذه الطريقة عن حلقات for await...of في أنها يمكن أن تعالج الأجزاء بشكل متزامن اختيارياً. بالإضافة إلى ذلك، لا يمكن إيقاف تكرار forEach إلا عن طريق تمرير خيار signal وإلغاء AbortController ذات الصلة بينما يمكن إيقاف for await...of باستخدام break أو return. في كلتا الحالتين، سيتم تدمير الدفق.

تختلف هذه الطريقة عن الاستماع إلى حدث 'data' في أنها تستخدم حدث readable في الآلية الأساسية ويمكن أن تحد من عدد مكالمات fn المتزامنة.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// مع مُبْدِئ متزامن.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
  console.log(chunk) // 3, 4
}
// مع مُبْدِئ غير متزامن، مع إجراء ما يصل إلى 2 استعلام في وقت واحد.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
  async domain => {
    const { address } = await resolver.resolve4(domain, { ttl: true })
    return address
  },
  { concurrency: 2 }
)
await dnsResults.forEach(result => {
  // يسجل النتيجة، مشابه لـ `for await (const result of dnsResults)`
  console.log(result)
})
console.log('done') // انتهى الدفق
readable.toArray([options])

مُضاف في: v17.5.0، v16.15.0

[مستقر: 1 - تجريبي]

مستقر: 1 الثبات: 1 - تجريبي

  • options <Object>

    • signal <AbortSignal> يسمح بإلغاء عملية toArray إذا تم إلغاء الإشارة.
  • الإرجاع: <Promise> وعد يحتوي على مصفوفة تحتوي على محتويات الدفق.

تتيح هذه الطريقة الحصول بسهولة على محتويات الدفق.

نظرًا لأن هذه الطريقة تقرأ الدفق بأكمله في الذاكرة، فإنها تلغي فوائد الدفق. وهي مخصصة للتوافق والراحة، وليس كطريقة أساسية لاستهلاك الدفق.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

await Readable.from([1, 2, 3, 4]).toArray() // [1, 2, 3, 4]

// إجراء استعلامات dns بشكل متزامن باستخدام .map وجمع
// النتائج في مصفوفة باستخدام toArray
const dnsResults = await Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org'])
  .map(
    async domain => {
      const { address } = await resolver.resolve4(domain, { ttl: true })
      return address
    },
    { concurrency: 2 }
  )
  .toArray()
readable.some(fn[, options])

مُضاف في: v17.5.0، v16.15.0

[مستقر: 1 - تجريبي]

مستقر: 1 الثبات: 1 - تجريبي

  • fn <Function> | <AsyncFunction> دالة يتم استدعاؤها على كل جزء من أجزاء الدفق.

    • data <any> جزء من البيانات من الدفق.
    • options <Object>
    • signal <AbortSignal> يتم إجهاضه إذا تم تدمير الدفق مما يسمح بإجهاض استدعاء fn مبكرًا.
  • options <Object>

    • concurrency <number> الحد الأقصى للاستدعاء المتزامن لـ fn للاتصال بالدفق مرة واحدة. افتراضي: 1.
    • signal <AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
  • الإرجاع: <Promise> وعد يُقيّم إلى true إذا قامت fn بإرجاع قيمة صحيحة على الأقل لإحدى الأجزاء.

تشبه هذه الطريقة Array.prototype.some وتقوم باستدعاء fn على كل جزء في الدفق حتى تصبح قيمة الإرجاع المنتظرة true (أو أي قيمة صحيحة). بمجرد أن تصبح قيمة الإرجاع المنتظرة لاستدعاء fn على جزء ما قيمة صحيحة، يتم تدمير الدفق ويتم تحقيق الوعد بـ true. إذا لم تُرجع أي من استدعاءات fn على الأجزاء قيمة صحيحة، فسيتم تحقيق الوعد بـ false.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// مع مُنبئ متزامن.
await Readable.from([1, 2, 3, 4]).some(x => x > 2) // true
await Readable.from([1, 2, 3, 4]).some(x => x < 0) // false

// مع مُنبئ غير متزامن، مع إجراء ما يصل إلى فحصين للملف في وقت واحد.
const anyBigFile = await Readable.from(['file1', 'file2', 'file3']).some(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
console.log(anyBigFile) // `true` إذا كان أي ملف في القائمة أكبر من 1 ميجابايت
console.log('done') // انتهى الدفق
readable.find(fn[, options])

مضاف في: v17.5.0، v16.17.0

[مستقر: 1 - تجريبي]

مستقر: 1 الثبات: 1 - تجريبي

  • fn <Function> | <AsyncFunction> دالة تُستدعى على كل جزء من أجزاء الدفق.

    • data <any> جزء من البيانات من الدفق.
    • options <Object>
    • signal <AbortSignal> يتم إلغاؤه إذا تم تدمير الدفق مما يسمح بإلغاء استدعاء fn مبكرًا.
  • options <Object>

    • concurrency <number> الحد الأقصى لاستدعاءات fn المتزامنة التي يجب استدعاؤها على الدفق في وقت واحد. افتراضيًا: 1.
    • signal <AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
  • قيمة الإرجاع: <Promise> وعد يُقيّم أول جزء تكون قيمة fn الخاصة به صحيحة، أو undefined إذا لم يتم العثور على أي عنصر.

هذه الطريقة مشابهة لـ Array.prototype.find وتستدعي fn على كل جزء في الدفق للعثور على جزء تكون قيمته صحيحة لـ fn. بمجرد أن تكون قيمة الإرجاع المنتظرة لاستدعاء fn صحيحة، يتم تدمير الدفق ويتم تحقيق الوعد بقيمة fn التي أعادت قيمة صحيحة. إذا أعادت جميع استدعاءات fn على الأجزاء قيمة خاطئة، يتم تحقيق الوعد بـ undefined.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// مع مُسند مُزامن.
await Readable.from([1, 2, 3, 4]).find(x => x > 2) // 3
await Readable.from([1, 2, 3, 4]).find(x => x > 0) // 1
await Readable.from([1, 2, 3, 4]).find(x => x > 10) // undefined

// مع مُسند غير مُزامن، مع إجراء أقصى عدد 2 من عمليات فحص الملفات في وقت واحد.
const foundBigFile = await Readable.from(['file1', 'file2', 'file3']).find(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
console.log(foundBigFile) // اسم الملف الكبير، إذا كان هناك أي ملف في القائمة أكبر من 1 ميغابايت
console.log('done') // انتهى الدفق
readable.every(fn[, options])

تم الإضافة في: v17.5.0، v16.15.0

[مستقر: 1 - تجريبي]

مستقر: 1 استقرار: 1 - تجريبي

  • fn <Function> | <AsyncFunction> دالة تُستدعى على كل جزء من أجزاء الدفق.

    • data <any> جزء من البيانات من الدفق.
    • options <Object>
    • signal <AbortSignal> يتم إيقافه إذا تم تدمير الدفق مما يسمح بإلغاء استدعاء fn مبكرًا.
  • options <Object>

    • concurrency <number> الحد الأقصى لاستدعاءات fn المتزامنة التي سيتم استدعاؤها على الدفق في وقت واحد. الافتراضي: 1.
    • signal <AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
  • الإرجاع: <Promise> وعد يُقيّم إلى true إذا قامت fn بإرجاع قيمة صحيحة لكل أجزاء الدفق.

تشبه هذه الطريقة Array.prototype.every وتستدعي fn على كل جزء في الدفق للتحقق مما إذا كانت جميع قيم الإرجاع المنتظرة قيمًا صحيحة لـ fn. بمجرد أن تكون قيمة الإرجاع المنتظرة لاستدعاء fn على جزء ما قيمة خاطئة، يتم تدمير الدفق ويتم تحقيق الوعد بقيمة false. إذا قامت جميع استدعاءات fn على الأجزاء بإرجاع قيمة صحيحة، فسيتم تحقيق الوعد بقيمة true.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// مع مُتَبَدِّد متزامن.
await Readable.from([1, 2, 3, 4]).every(x => x > 2) // false
await Readable.from([1, 2, 3, 4]).every(x => x > 0) // true

// مع مُتَبَدِّد غير متزامن، مع إجراء أقصى عدد 2 من عمليات التحقق من الملفات في وقت واحد.
const allBigFiles = await Readable.from(['file1', 'file2', 'file3']).every(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
// `true` إذا كانت جميع الملفات في القائمة أكبر من 1 ميجابايت
console.log(allBigFiles)
console.log('done') // انتهى الدفق
readable.flatMap(fn[, options])

مضاف في: v17.5.0، v16.15.0

[مستقر: 1 - تجريبي]

مستقر: 1 استقرار: 1 - تجريبي

  • fn <Function> | <AsyncGeneratorFunction> | <AsyncFunction> دالة لإنشاء خريطة لكل جزء في الدفق.

    • data <any> جزء من البيانات من الدفق.
    • options <Object>
    • signal <AbortSignal> يتم إيقافه إذا تم تدمير الدفق مما يسمح بإلغاء دعوة fn مبكرًا.
  • options <Object>

    • concurrency <number> الحد الأقصى للدعوات المتزامنة لـ fn للاتصال بالدفق في وقت واحد. افتراضي: 1.
    • signal <AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
  • الإرجاع: <Readable> دفق تم إنشاء خريطة مسطحة له باستخدام الدالة fn.

ترجع هذه الطريقة دفقًا جديدًا من خلال تطبيق مُدعَى الاستدعاء المُعطى على كل جزء من الدفق، ثم يتم تسطيح النتيجة.

من الممكن إرجاع دفق أو عنصر قابل للتكرار أو عنصر قابل للتكرار غير متزامن آخر من fn وسيتم دمج تيارات النتائج (تسطيحها) في الدفق المُرجع.

js
import { Readable } from 'node:stream'
import { createReadStream } from 'node:fs'

// مع مُنشئ خريطة متزامن.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap(x => [x, x])) {
  console.log(chunk) // 1, 1, 2, 2, 3, 3, 4, 4
}
// مع مُنشئ خريطة غير متزامن، دمج محتويات 4 ملفات
const concatResult = Readable.from(['./1.mjs', './2.mjs', './3.mjs', './4.mjs']).flatMap(fileName =>
  createReadStream(fileName)
)
for await (const result of concatResult) {
  // هذا سيحتوي على المحتويات (جميع الأجزاء) لجميع الملفات الأربعة
  console.log(result)
}
readable.drop(limit[, options])

مضاف في: v17.5.0، v16.15.0

[مستقر: 1 - تجريبي]

مستقر: 1 استقرار: 1 - تجريبي

  • limit <number> عدد الأجزاء التي سيتم حذفها من القابل للقراءة.

  • options <Object>

    • signal <AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
  • الإرجاع: <Readable> دفق تم حذف limit من أجزائه.

ترجع هذه الطريقة دفقًا جديدًا مع حذف أول limit جزء.

js
import { Readable } from 'node:stream'

await Readable.from([1, 2, 3, 4]).drop(2).toArray() // [3, 4]
readable.take(limit[, options])

مضاف في: v17.5.0، v16.15.0

[مستقر: 1 - تجريبي]

مستقر: 1 استقرار: 1 - تجريبي

  • limit <number> عدد الأجزاء التي سيتم أخذها من القابل للقراءة.

  • options <Object>

    • signal <AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
  • الإرجاع: <Readable> دفق تم أخذ limit من أجزائه.

ترجع هذه الطريقة دفقًا جديدًا مع أول limit جزء.

js
import { Readable } from 'node:stream'

await Readable.from([1, 2, 3, 4]).take(2).toArray() // [1, 2]
readable.reduce(fn[, initial[, options]])

مضاف في: v17.5.0، v16.15.0

[مستقر: 1 - تجريبي]

مستقر: 1 استقرار: 1 - تجريبي

  • fn <Function> | <AsyncFunction> دالة اختزال يتم استدعاؤها على كل جزء في الدفق.

    • previous <any> القيمة التي تم الحصول عليها من آخر استدعاء لـ fn أو قيمة initial إذا تم تحديدها أو أول جزء من الدفق بخلاف ذلك.
    • data <any> جزء من البيانات من الدفق.
    • options <Object>
    • signal <AbortSignal> يتم إلغاؤه إذا تم تدمير الدفق مما يسمح بإلغاء استدعاء fn مبكرًا.
  • initial <any> القيمة الأولية التي سيتم استخدامها في الاختزال.

  • options <Object>

    • signal <AbortSignal> يسمح بتدمير الدفق إذا تم إلغاء الإشارة.
  • الإرجاع: <Promise> وعد للقيمة النهائية للاختزال.

تستدعي هذه الطريقة fn على كل جزء من الدفق بالترتيب، وتمرر له النتيجة من الحساب على العنصر السابق. ترجع وعدًا للقيمة النهائية للاختزال.

إذا لم يتم تقديم قيمة initial، فسيتم استخدام أول جزء من الدفق كقيمة أولية. إذا كان الدفق فارغًا، يتم رفض الوعد مع TypeError مع خاصية رمز ERR_INVALID_ARGS.

js
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'

const directoryPath = './src'
const filesInDir = await readdir(directoryPath)

const folderSize = await Readable.from(filesInDir).reduce(async (totalSize, file) => {
  const { size } = await stat(join(directoryPath, file))
  return totalSize + size
}, 0)

console.log(folderSize)

تكرر دالة الاختزال عنصر الدفق عنصرًا تلو الآخر، مما يعني أنه لا توجد معلمة concurrency أو توازي. لإجراء اختزال متزامن، يمكنك استخراج الدالة غير المتزامنة إلى طريقة readable.map.

js
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'

const directoryPath = './src'
const filesInDir = await readdir(directoryPath)

const folderSize = await Readable.from(filesInDir)
  .map(file => stat(join(directoryPath, file)), { concurrency: 2 })
  .reduce((totalSize, { size }) => totalSize + size, 0)

console.log(folderSize)

تدفقات دوبلكس والتحويل

الصنف: stream.Duplex

[السجل]

الإصدارالتغييرات
v6.8.0الآن، تقوم مثيلات Duplex بإرجاع true عند التحقق من instanceof stream.Writable.
v0.9.4تمت الإضافة في: v0.9.4

تدفقات دوبلكس هي تدفقات تُنفذ واجهتي Readable و Writable.

أمثلة على تدفقات Duplex:

duplex.allowHalfOpen

تمت الإضافة في: v0.9.4

إذا كانت false، فسوف ينهي التدفق تلقائيًا الجانب القابل للكتابة عندما ينتهي الجانب القابل للقراءة. يتم تعيينها في البداية بواسطة خيار مُنشئ allowHalfOpen، والذي يكون افتراضياً true.

يمكن تغيير ذلك يدويًا لتغيير سلوك نصف الفتحة لمثيل تدفق Duplex موجود، ولكن يجب تغييره قبل إصدار حدث 'end'.

الصنف: stream.Transform

تمت الإضافة في: v0.9.4

تدفقات التحويل هي تدفقات Duplex حيث تكون المخرجات مرتبطة بطريقة ما بالمدخلات. مثل جميع تدفقات Duplex، تُنفذ تدفقات Transform كل من واجهتي Readable و Writable.

أمثلة على تدفقات Transform:

transform.destroy([error])

[السجل]

الإصدارالتغييرات
v14.0.0تعمل كعملية لا شيء على تدفق تم تدميره بالفعل.
v8.0.0تمت الإضافة في: v8.0.0

تدمير التدفق، واختياريًا إصدار حدث 'error'. بعد هذه المكالمة، سيُطلق تدفق التحويل أي موارد داخلية. يجب على المُنفذين عدم تجاوز هذه الطريقة، ولكن بدلاً من ذلك تنفيذ readable._destroy(). يقوم التنفيذ الافتراضي لـ _destroy() لـ Transform أيضًا بإصدار 'close' ما لم يتم تعيين emitClose على false.

بمجرد استدعاء destroy()، ستكون أي مكالمات إضافية عملية لا شيء ولن يتم إصدار أية أخطاء أخرى باستثناء تلك من _destroy() كـ 'error'.

stream.duplexPair([options])

مضاف في: v22.6.0، v20.17.0

  • options <Object> قيمة لإرسالها إلى كل من مُنشئي Duplex، لتعيين خيارات مثل التخزين المؤقت.
  • قيمة مُعادة: <Array> من مثيلين من Duplex.

دالة الأداة المساعدة duplexPair تُعيد مصفوفة تحتوي على عنصرين، كل منهما عبارة عن دفق Duplex متصل بالجانب الآخر:

js
const [sideA, sideB] = duplexPair()

أي شيء يُكتب في دفق واحد يصبح قابلاً للقراءة في الآخر. فهي توفر سلوكًا مشابهًا لاتصال الشبكة، حيث تصبح البيانات التي يكتبها العميل قابلة للقراءة من قبل الخادم، والعكس صحيح.

تدفقات Duplex متناظرة؛ يمكن استخدام أحدهما أو الآخر دون أي فرق في السلوك.

stream.finished(stream[, options], callback)

[السجل]

الإصدارالتغييرات
v19.5.0تمت إضافة دعم لـ ReadableStream و WritableStream.
v15.11.0تمت إضافة خيار signal.
v14.0.0ستنتظر الدالة finished(stream, cb) حدث 'close' قبل استدعاء دالة الاستدعاء. تحاول التنفيذ اكتشاف التدفقات القديمة وتطبيق هذا السلوك فقط على التدفقات التي يُتوقع منها إصدار 'close'.
v14.0.0سيؤدي إصدار 'close' قبل 'end' في دفق Readable إلى حدوث خطأ ERR_STREAM_PREMATURE_CLOSE.
v14.0.0سيتم استدعاء دالة الاستدعاء على التدفقات التي انتهت بالفعل قبل الاتصال بـ finished(stream, cb).
v10.0.0تمت الإضافة في: v10.0.0
  • stream <Stream> | <ReadableStream> | <WritableStream> دفق/دفق ويب قابل للقراءة و/أو للكتابة.

  • options <Object>

    • error <boolean> إذا تم تعيينه على false، فلن يُعامل الاتصال بـ emit('error', err) على أنه مكتمل. الافتراضي: true.
    • readable <boolean> عند تعيينه على false، سيتم استدعاء دالة الاستدعاء عند انتهاء الدفق حتى لو كان الدفق لا يزال قابلاً للقراءة. الافتراضي: true.
    • writable <boolean> عند تعيينه على false، سيتم استدعاء دالة الاستدعاء عند انتهاء الدفق حتى لو كان الدفق لا يزال قابلاً للكتابة. الافتراضي: true.
    • signal <AbortSignal> يسمح بإلغاء انتظار انتهاء الدفق. لن يتم إلغاء الدفق الأساسي إذا تم إلغاء الإشارة. سيتم استدعاء دالة الاستدعاء مع AbortError. سيتم أيضًا إزالة جميع المستمعين المسجلين المضافين بواسطة هذه الدالة.
  • callback <Function> دالة استدعاء تأخذ وسيطة خطأ اختيارية.

  • قيمة مُعادة: <Function> دالة تنظيف تقوم بإزالة جميع المستمعين المسجلين.

دالة للحصول على إشعار عندما لا يكون الدفق قابلًا للقراءة أو الكتابة أو واجه خطأ أو حدث إغلاق مبكر.

js
const { finished } = require('node:stream')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

finished(rs, err => {
  if (err) {
    console.error('Stream failed.', err)
  } else {
    console.log('Stream is done reading.')
  }
})

rs.resume() // تصريف الدفق.

مفيدة بشكل خاص في سيناريوهات معالجة الأخطاء حيث يتم تدمير دفق مبكرًا (مثل طلب HTTP تم إلغاؤه)، ولن يُصدر 'end' أو 'finish'.

يوفر واجهة برمجة التطبيقات finished نسخة واعدة.

تترك stream.finished() مستمعي أحداث متدلية (خاصةً 'error', 'end', 'finish' و 'close') بعد استدعاء callback. والسبب في ذلك هو أن أحداث 'error' غير المتوقعة (بسبب تنفيذ تدفق غير صحيح) لا تتسبب في تعطل غير متوقع. إذا كان هذا سلوكًا غير مرغوب فيه، فيجب استدعاء دالة التنظيف المُعادة في دالة الاستدعاء:

js
const cleanup = finished(rs, err => {
  cleanup()
  // ...
})

stream.pipeline(source[, ...transforms], destination, callback)

stream.pipeline(streams, callback)

[History]

الإصدارالتغييرات
v19.7.0, v18.16.0تمت إضافة دعم لـ webstreams.
v18.0.0يؤدي تمرير مُنادٍ غير صالح إلى وسيطة callback الآن إلى طرح ERR_INVALID_ARG_TYPE بدلاً من ERR_INVALID_CALLBACK.
v14.0.0ستنتظر pipeline(..., cb) حدث 'close' قبل استدعاء المُنادى. تحاول عملية التنفيذ اكتشاف التدفقات القديمة وتطبيق هذا السلوك فقط على التدفقات التي يُتوقع أن تُصدر 'close'.
v13.10.0إضافة دعم للمولدات غير المتزامنة.
v10.0.0تمت الإضافة في: v10.0.0

طريقة وحدة نمطية للربط بين التدفقات والمولدات التي تُحول الأخطاء وتُنظف بشكل صحيح وتوفر مُنادٍ عند اكتمال خط الأنابيب.

js
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')

// استخدام واجهة برمجة التطبيقات pipeline لتمرير سلسلة من التدفقات بسهولة
// معًا والحصول على إشعار عند اكتمال خط الأنابيب بالكامل.

// خط أنابيب لضغط ملف tar ضخم محتمل بكفاءة:

pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
  if (err) {
    console.error('Pipeline failed.', err)
  } else {
    console.log('Pipeline succeeded.')
  }
})

يوفر واجهة برمجة التطبيقات pipeline نسخة واعدة.

ستقوم stream.pipeline() باستدعاء stream.destroy(err) على جميع التدفقات باستثناء:

  • التدفقات Readable التي أصدرت 'end' أو 'close'.
  • التدفقات Writable التي أصدرت 'finish' أو 'close'.

تترك stream.pipeline() مستمعي أحداث مُتدلية على التدفقات بعد استدعاء callback. في حالة إعادة استخدام التدفقات بعد الفشل، قد يؤدي هذا إلى تسرب مستمعي الأحداث وأخطاء مُبتلعة. إذا كان آخر تدفق قابلاً للقراءة، فسيتم إزالة مستمعي الأحداث المُتدلية بحيث يمكن استخدام آخر تدفق لاحقًا.

تقوم stream.pipeline() بإغلاق جميع التدفقات عند طرح خطأ. قد يؤدي استخدام IncomingRequest مع pipeline إلى سلوك غير متوقع بمجرد تدمير المقبس دون إرسال الاستجابة المتوقعة. انظر المثال أدناه:

js
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')

const server = http.createServer((req, res) => {
  const fileStream = fs.createReadStream('./fileNotExist.txt')
  pipeline(fileStream, res, err => {
    if (err) {
      console.log(err) // No such file
      // لا يمكن إرسال هذه الرسالة بمجرد تدمير `pipeline` للمقبس بالفعل
      return res.end('error!!!')
    }
  })
})

stream.compose(...streams)

[السجل]

الإصدارالتغييرات
v21.1.0، v20.10.0تمت إضافة دعم لفئة الدفق.
v19.8.0، v18.16.0تمت إضافة دعم لـ webstreams.
v16.9.0تمت الإضافة في: v16.9.0

[مستقر: 1 - تجريبي]

مستقر: 1 الثبات: 1 - stream.compose تجريبية.

يقوم بدمج تيارين أو أكثر في تيار Duplex يقوم بالكتابة إلى الدفق الأول والقراءة من الدفق الأخير. يتم توصيل كل دفق مُقدم إلى الدفق التالي، باستخدام stream.pipeline. إذا حدث خطأ في أي من الدفقات، فسيتم تدمير جميعها، بما في ذلك دفق Duplex الخارجي.

بما أن stream.compose يُرجع تيارًا جديدًا يمكن (ويجب) توصيله بدوره في تيارات أخرى، فإنه يُمكّن من التركيب. على النقيض من ذلك، عند تمرير التيارات إلى stream.pipeline، يكون التيار الأول عادةً تيارًا قابلاً للقراءة والأخير تيارًا قابلًا للكتابة، مما يُشكل دائرة مغلقة.

إذا تم تمرير Function، فيجب أن تكون طريقة مصنع تأخذ source Iterable.

js
import { compose, Transform } from 'node:stream'

const removeSpaces = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''))
  },
})

async function* toUpper(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
}

let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
  res += buf
}

console.log(res) // يطبع 'HELLOWORLD'

يمكن استخدام stream.compose لتحويل الكائنات المتكررة غير المتزامنة، والمنشئات، والوظائف إلى تيارات.

  • AsyncIterable يتم تحويلها إلى Duplex قابل للقراءة. لا يمكن إنتاج null.
  • AsyncGeneratorFunction يتم تحويلها إلى Duplex تحويل قابل للقراءة/الكتابة. يجب أن تأخذ AsyncIterable المصدر كمعامل أول. لا يمكن إنتاج null.
  • AsyncFunction يتم تحويلها إلى Duplex قابل للكتابة. يجب أن تُرجع إما null أو undefined.
js
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'

// تحويل AsyncIterable إلى Duplex قابل للقراءة.
const s1 = compose(
  (async function* () {
    yield 'Hello'
    yield 'World'
  })()
)

// تحويل AsyncGenerator إلى Duplex تحويل.
const s2 = compose(async function* (source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
})

let res = ''

// تحويل AsyncFunction إلى Duplex قابل للكتابة.
const s3 = compose(async function (source) {
  for await (const chunk of source) {
    res += chunk
  }
})

await finished(compose(s1, s2, s3))

console.log(res) // يطبع 'HELLOWORLD'

انظر إلى readable.compose(stream) لـ stream.compose كعامل.

stream.Readable.from(iterable[, options])

مضاف في: v12.3.0، v10.17.0

  • iterable <Iterable> كائن ينفذ بروتوكول Symbol.asyncIterator أو Symbol.iterator. يصدر حدث 'error' إذا تم تمرير قيمة خالية.
  • options <Object> الخيارات المُقدمة إلى new stream.Readable([options]). بشكل افتراضي، سيقوم Readable.from() بتعيين options.objectMode إلى true، ما لم يتم إلغاء هذا الاختيار صراحةً عن طريق تعيين options.objectMode إلى false.
  • مُخرجات: <stream.Readable>

طريقة مساعدة لإنشاء تيارات قابلة للقراءة من المُكررات.

js
const { Readable } = require('node:stream')

async function* generate() {
  yield 'hello'
  yield 'streams'
}

const readable = Readable.from(generate())

readable.on('data', chunk => {
  console.log(chunk)
})

لن يتم تكرار السلاسل أو المخازن المؤقتة عند استدعاء Readable.from(string) أو Readable.from(buffer) لمطابقة دلالات التيارات الأخرى لأسباب تتعلق بالأداء.

إذا تم تمرير كائن Iterable يحتوي على وعود كوسيطة، فقد يؤدي ذلك إلى رفض غير معالج.

js
const { Readable } = require('node:stream')

Readable.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // رفض غير معالج
])

stream.Readable.fromWeb(readableStream[, options])

مضاف في: v17.0.0

[مستقر: 1 - تجريبي]

مستقر: 1 استقرار: 1 - تجريبي

stream.Readable.isDisturbed(stream)

تم الإضافة في: v16.8.0

[مستقر: 1 - تجريبي]

مستقر: 1 الثبات: 1 - تجريبي

يُرجع ما إذا تم قراءة الدفق أو إلغاؤه.

stream.isErrored(stream)

تم الإضافة في: v17.3.0، v16.14.0

[مستقر: 1 - تجريبي]

مستقر: 1 الثبات: 1 - تجريبي

يُرجع ما إذا كان الدفق قد واجه خطأً.

stream.isReadable(stream)

تم الإضافة في: v17.4.0، v16.14.0

[مستقر: 1 - تجريبي]

مستقر: 1 الثبات: 1 - تجريبي

يُرجع ما إذا كان الدفق قابلاً للقراءة.

stream.Readable.toWeb(streamReadable[, options])

تم الإضافة في: v17.0.0

[مستقر: 1 - تجريبي]

مستقر: 1 الثبات: 1 - تجريبي

  • streamReadable <stream.Readable>

  • options <Object>

    • strategy <Object>
    • highWaterMark <number> الحد الأقصى لحجم قائمة الانتظار الداخلية (من ReadableStream المُنشأ) قبل تطبيق ضغط الظهر في القراءة من stream.Readable المُعطى. إذا لم يتم توفير أي قيمة، فسيتم أخذها من stream.Readable المُعطى.
    • size <Function> دالة تحدد حجم الكتلة المعطاة من البيانات. إذا لم يتم توفير أي قيمة، فسيكون الحجم 1 لجميع الكتل.
    • chunk <any>
    • الإرجاع: <number>
  • الإرجاع: <ReadableStream>

stream.Writable.fromWeb(writableStream[, options])

تم الإضافة في: v17.0.0

[مستقر: 1 - تجريبي]

مستقر: 1 استقرار: 1 - تجريبي

stream.Writable.toWeb(streamWritable)

تم الإضافة في: v17.0.0

[مستقر: 1 - تجريبي]

مستقر: 1 استقرار: 1 - تجريبي

stream.Duplex.from(src)

[السجل]

الإصدارالتغييرات
v19.5.0, v18.17.0يمكن الآن أن تكون وسيطة src ReadableStream أو WritableStream.
v16.8.0تم الإضافة في: v16.8.0

طريقة مساعدة لإنشاء تدفقات ثنائية الاتجاه.

  • Stream يحول التدفق القابل للكتابة إلى Duplex قابل للكتابة والتدفق القابل للقراءة إلى Duplex.
  • Blob يحول إلى Duplex قابل للقراءة.
  • string يحول إلى Duplex قابل للقراءة.
  • ArrayBuffer يحول إلى Duplex قابل للقراءة.
  • AsyncIterable يحول إلى Duplex قابل للقراءة. لا يمكن توليد null.
  • AsyncGeneratorFunction يحول إلى Duplex تحويل قابل للقراءة/الكتابة. يجب أن يأخذ AsyncIterable مصدرًا كمعامل أول. لا يمكن توليد null.
  • AsyncFunction يحول إلى Duplex قابل للكتابة. يجب أن يُرجع إما null أو undefined
  • Object ({ writable, readable }) يحول readable و writable إلى Stream ثم يجمعهما في Duplex حيث سيكتب Duplex إلى writable ويقرأ من readable.
  • Promise يحول إلى Duplex قابل للقراءة. يتم تجاهل القيمة null.
  • ReadableStream يحول إلى Duplex قابل للقراءة.
  • WritableStream يحول إلى Duplex قابل للكتابة.
  • الإرجاع: <stream.Duplex>

إذا تم تمرير كائن Iterable يحتوي على وعود كوسيط، فقد يؤدي ذلك إلى رفض غير معالج.

js
const { Duplex } = require('node:stream')

Duplex.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // رفض غير معالج
])

stream.Duplex.fromWeb(pair[, options])

مضاف في: v17.0.0

[مستقر: 1 - تجريبي]

مستقر: 1 استقرار: 1 - تجريبي

js
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')

for await (const chunk of duplex) {
  console.log('readable', chunk)
}
js
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))

stream.Duplex.toWeb(streamDuplex)

أضيف في: v17.0.0

[مستقر: 1 - تجريبي]

مستقر: 1 ثبات: 1 - تجريبي

js
import { Duplex } from 'node:stream'

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

const { value } = await readable.getReader().read()
console.log('readable', value)
js
const { Duplex } = require('node:stream')

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

readable
  .getReader()
  .read()
  .then(result => {
    console.log('readable', result.value)
  })

stream.addAbortSignal(signal, stream)

[السجل]

الإصدارالتغييرات
v19.7.0, v18.16.0تمت إضافة دعم لـ ReadableStream و WritableStream.
v15.4.0أضيف في: v15.4.0

يربط إشارة AbortSignal بتدفق قابل للقراءة أو الكتابة. هذا يسمح للرمز بالتحكم في تدمير التدفق باستخدام AbortController.

إن استدعاء abort على AbortController المقابل لإشارة AbortSignal الممررة سيتصرف بنفس طريقة استدعاء .destroy(new AbortError()) على التدفق، و controller.error(new AbortError()) لتدفقات الويب.

js
const fs = require('node:fs')

const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// في وقت لاحق، إلغاء العملية وإغلاق التدفق
controller.abort()

أو باستخدام AbortSignal مع تدفق قابل للقراءة كتكرار غير متزامن:

js
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // ضبط مهلة زمنية
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
  try {
    for await (const chunk of stream) {
      await process(chunk)
    }
  } catch (e) {
    if (e.name === 'AbortError') {
      // تم إلغاء العملية
    } else {
      throw e
    }
  }
})()

أو باستخدام AbortSignal مع ReadableStream:

js
const controller = new AbortController()
const rs = new ReadableStream({
  start(controller) {
    controller.enqueue('hello')
    controller.enqueue('world')
    controller.close()
  },
})

addAbortSignal(controller.signal, rs)

finished(rs, err => {
  if (err) {
    if (err.name === 'AbortError') {
      // تم إلغاء العملية
    }
  }
})

const reader = rs.getReader()

reader.read().then(({ value, done }) => {
  console.log(value) // hello
  console.log(done) // false
  controller.abort()
})

stream.getDefaultHighWaterMark(objectMode)

مضاف في: v19.9.0، v18.17.0

يرجع علامة المياه العالية الافتراضية المستخدمة بواسطة التدفقات. القيمة الافتراضية هي 65536 (64 كيلوبايت)، أو 16 لـ objectMode.

stream.setDefaultHighWaterMark(objectMode, value)

مضاف في: v19.9.0، v18.17.0

يُعيّن علامة المياه العالية الافتراضية المستخدمة بواسطة التدفقات.

واجهة برمجة التطبيقات لمنفذي التدفقات

تم تصميم واجهة برمجة التطبيقات node:stream لجعل من السهل تنفيذ التدفقات باستخدام نموذج الميراث النموذجي في جافا سكريبت.

أولاً، سيعلن مطور التدفق عن فئة جافا سكريبت جديدة تمتد من إحدى فئات التدفقات الأساسية الأربع (stream.Writable، stream.Readable، stream.Duplex، أو stream.Transform)، مع التأكد من استدعاء مُنشئ فئة الوالد المناسب:

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor({ highWaterMark, ...options }) {
    super({ highWaterMark })
    // ...
  }
}

عند توسيع التدفقات، ضع في اعتبارك الخيارات التي يمكن للمستخدم تقديمها ويجب عليه تقديمها قبل توجيهها إلى مُنشئ القاعدة. على سبيل المثال، إذا كانت التنفيذات تقوم بافتراضات فيما يتعلق بخيارات autoDestroy و emitClose، فلا تسمح للمستخدم بتجاوز هذه الخيارات. كن واضحًا بشأن الخيارات التي يتم توجيهها بدلاً من توجيه جميع الخيارات ضمنيًا.

يجب على فئة التدفق الجديدة بعد ذلك تنفيذ طريقة أو أكثر محددة، اعتمادًا على نوع التدفق الذي يتم إنشاؤه، كما هو مفصل في الجدول أدناه:

حالة الاستخدامالفئةالطريقة (الطرق) التي يجب تنفيذها
القراءة فقطReadable_read()
الكتابة فقطWritable_write() ، _writev() ، _final()
القراءة والكتابةDuplex_read() ، _write() ، _writev() ، _final()
التعامل مع البيانات المكتوبة، ثم قراءة النتيجةTransform_transform() ، _flush() ، _final()

يجب ألا يقوم رمز التنفيذ الخاص بالتدفق أبدًا باستدعاء الطرق "العامة" للتدفق المخصصة للاستخدام من قبل المستهلكين (كما هو موضح في قسم واجهة برمجة التطبيقات لمستهلكي التدفقات). قد يؤدي القيام بذلك إلى آثار جانبية سلبية في رمز التطبيق الذي يستهلك التدفق.

تجنب تجاوز الطرق العامة مثل write()، end()، cork()، uncork()، read() و destroy()، أو إصدار أحداث داخلية مثل 'error'، 'data'، 'end'، 'finish' و 'close' من خلال .emit(). قد يؤدي القيام بذلك إلى كسر ثوابت التدفق الحالية والمستقبلية مما يؤدي إلى مشاكل في السلوك و/أو التوافق مع التدفقات الأخرى، وأدوات التدفق، وتوقعات المستخدم.

بناء مبسّط

أضيف في: v1.2.0

في العديد من الحالات البسيطة، من الممكن إنشاء تيار دون الاعتماد على الميراث. يمكن تحقيق ذلك من خلال إنشاء مثيلات مباشرة من كائنات stream.Writable، وstream.Readable، وstream.Duplex، أو stream.Transform، ومرور أساليب مناسبة كخيارات مُنشئ.

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  construct(callback) {
    // تهيئة الحالة وتحميل الموارد...
  },
  write(chunk, encoding, callback) {
    // ...
  },
  destroy() {
    // تحرير الموارد...
  },
})

تنفيذ تيار قابل للكتابة

يتم توسيع فئة stream.Writable لتنفيذ تيار Writable.

يجب على تيارات Writable المخصصة استدعاء مُنشئ new stream.Writable([options]) وتنفيذ طريقة writable._write() و/أو writable._writev().

new stream.Writable([options])

[History]

الإصدارالتغييرات
v22.0.0زيادة قيمة highWaterMark الافتراضية.
v15.5.0دعم تمرير AbortSignal.
v14.0.0تغيير قيمة الخيار autoDestroy الافتراضية إلى true.
v11.2.0، v10.16.0إضافة خيار autoDestroy لإلغاء destroy() للتيار تلقائيًا عند إصداره 'finish' أو حدوث أخطاء.
v10.0.0إضافة خيار emitClose لتحديد ما إذا كان يتم إصدار 'close' عند الإلغاء.
  • options <Object>
    • highWaterMark <number> مستوى المخزن المؤقت عندما يبدأ stream.write() في إرجاع false. الافتراضي: 65536 (64 كيلوبايت)، أو 16 لتيارات objectMode.
    • decodeStrings <boolean> ما إذا كان سيتم ترميز سلاسل string الممررة إلى stream.write() إلى Buffers (بالتشفير المحدد في استدعاء stream.write()) قبل تمريرها إلى stream._write(). لا يتم تحويل أنواع البيانات الأخرى (أي لا يتم فك تشفير Buffers إلى strings). سيؤدي التعيين إلى false إلى منع تحويل سلاسل string. الافتراضي: true.
    • defaultEncoding <string> الترميز الافتراضي المستخدم عندما لا يتم تحديد ترميز كوسيطة لـ stream.write(). الافتراضي: 'utf8'.
    • objectMode <boolean> ما إذا كانت stream.write(anyObj) عملية صالحة. عندما يتم تعيينه، يصبح من الممكن كتابة قيم JavaScript بخلاف السلسلة، <Buffer>، <TypedArray> أو <DataView> إذا كان مدعومًا من قبل تنفيذ التيار. الافتراضي: false.
    • emitClose <boolean> ما إذا كان يجب على التيار إصدار 'close' بعد تدميره. الافتراضي: true.
    • write <Function> التنفيذ لطريقة stream._write().
    • writev <Function> التنفيذ لطريقة stream._writev().
    • destroy <Function> التنفيذ لطريقة stream._destroy().
    • final <Function> التنفيذ لطريقة stream._final().
    • construct <Function> التنفيذ لطريقة stream._construct().
    • autoDestroy <boolean> ما إذا كان يجب على هذا التيار استدعاء .destroy() على نفسه تلقائيًا بعد الانتهاء. الافتراضي: true.
    • signal <AbortSignal> إشارة تمثل إلغاءً ممكنًا.
js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor(options) {
    // يستدعي مُنشئ stream.Writable().
    super(options)
    // ...
  }
}

أو، عند استخدام مُنشئات على طراز ما قبل ES6:

js
const { Writable } = require('node:stream')
const util = require('node:util')

function MyWritable(options) {
  if (!(this instanceof MyWritable)) return new MyWritable(options)
  Writable.call(this, options)
}
util.inherits(MyWritable, Writable)

أو، باستخدام نهج المُنشئ المُبسّط:

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
})

سيؤدي استدعاء abort على AbortController المُقابِل لـ AbortSignal المُمرر إلى نفس النتيجة التي يحدثها استدعاء .destroy(new AbortError()) على تيار الكتابة.

js
const { Writable } = require('node:stream')

const controller = new AbortController()
const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
  signal: controller.signal,
})
// في وقت لاحق، إلغاء العملية وإغلاق التيار
controller.abort()

writable._construct(callback)

مضاف في: v15.0.0

  • callback <Function> استدعِ هذه الدالة (اختياريًا مع وسيطة خطأ) عندما ينتهي تدفق البيانات من عملية البدء.

يجب ألا يتم استدعاء طريقة _construct() مباشرةً. قد يتم تنفيذها بواسطة فئات فرعية، وإذا كان الأمر كذلك، فسيتم استدعاءها بواسطة طرق فئة Writable الداخلية فقط.

ستتم استدعاء هذه الدالة الاختيارية في دورة معالجة بعد أن تعيد مُنشئ التدفق، مما يؤخر أي استدعاءات لـ _write(), _final(), و _destroy() حتى يتم استدعاء callback. هذا مفيد لتهيئة الحالة أو تهيئة الموارد بشكل غير متزامن قبل استخدام التدفق.

js
const { Writable } = require('node:stream')
const fs = require('node:fs')

class WriteStream extends Writable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, 'w', (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _write(chunk, encoding, callback) {
    fs.write(this.fd, chunk, callback)
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

writable._write(chunk, encoding, callback)

[السجل]

الإصدارالتغييرات
v12.11.0_write() اختياري عند توفير _writev().
  • chunk <Buffer> | <string> | <any> الـBuffer المراد كتابته، مُحوّل من سلسلة string المُمرّرة إلى stream.write(). إذا كان خيار decodeStrings للتدفق مُعيّنًا على false أو كان التدفق يعمل في وضع الكائن، فلن يتم تحويل الـ chunk وسيكون أي شيء مُمرّر إلى stream.write().
  • encoding <string> إذا كانت الـ chunk سلسلة، فإن encoding هو ترميز الأحرف لتلك السلسلة. إذا كانت الـ chunk عبارة عن Buffer، أو إذا كان التدفق يعمل في وضع الكائن، فيمكن تجاهل encoding.
  • callback <Function> استدعِ هذه الدالة (اختياريًا مع وسيطة خطأ) عند اكتمال المعالجة للجزء المُعطى.

يجب أن توفر جميع تنفيذات تدفق Writable طريقة writable._write() و/أو writable._writev() لإرسال البيانات إلى المورد الأساسي.

توفر تدفقات Transform تنفيذها الخاص لـ writable._write().

يجب ألا يتم استدعاء هذه الدالة بواسطة رمز التطبيق مباشرةً. يجب تنفيذها بواسطة فئات فرعية، ويتم استدعاءها بواسطة طرق فئة Writable الداخلية فقط.

يجب استدعاء دالة callback بشكل متزامن داخل writable._write() أو بشكل غير متزامن (أي دورة معالجة مختلفة) للإشارة إما إلى أن الكتابة اكتملت بنجاح أو فشلت بحدوث خطأ. يجب أن تكون الوسيطة الأولى المُمرّرة إلى callback هي كائن Error إذا فشل الاستدعاء أو null إذا نجحت الكتابة.

ستتسبب جميع الاستدعاءات إلى writable.write() التي تحدث بين وقت استدعاء writable._write() واستدعاء callback في تخزين البيانات المكتوبة مؤقتًا. عندما يتم استدعاء callback، قد يُصدر التدفق حدث 'drain'. إذا كان تنفيذ التدفق قادرًا على معالجة أجزاء متعددة من البيانات في وقت واحد، فيجب تنفيذ طريقة writable._writev().

إذا تم تعيين خاصية decodeStrings صراحةً إلى false في خيارات المُنشئ، فستظل chunk نفس الكائن المُمرّر إلى .write()، وقد تكون سلسلة بدلاً من Buffer. هذا لدعم التنفيذات التي لديها معالجة مُحسّنة لبعض ترميزات بيانات السلاسل. في تلك الحالة، ستشير وسيطة encoding إلى ترميز الأحرف للسلسلة. خلاف ذلك، يمكن تجاهل وسيطة encoding بأمان.

تمت إضافة بادئة _ إلى طريقة writable._write() لأنها داخلية للفئة التي تُعرّفها، ويجب ألا يتم استدعاءها مباشرةً بواسطة برامج المستخدم.

writable._writev(chunks, callback)

  • chunks <Object[]> البيانات المراد كتابتها. القيمة عبارة عن مصفوفة من <Object> يمثل كل منها جزءًا منفصلًا من البيانات المراد كتابتها. خصائص هذه الكائنات هي:

    • chunk <Buffer> | <string> مثيل بايت أو سلسلة تحتوي على البيانات المراد كتابتها. سيكون chunk سلسلة إذا تم إنشاء Writable مع خيار decodeStrings مضبوطًا على false وتم تمرير سلسلة إلى write().
    • encoding <string> ترميز أحرف chunk. إذا كان chunk عبارة عن Buffer، فسيكون encoding هو 'buffer'.
  • callback <Function> دالة استدعاء (اختياريًا مع وسيطة خطأ) يتم استدعاؤها عند اكتمال المعالجة للأجزاء الموردة.

يجب ألا يتم استدعاء هذه الدالة مباشرة بواسطة رمز التطبيق. يجب تنفيذه بواسطة فئات فرعية، ويجب استدعاؤه بواسطة طرق فئة Writable الداخلية فقط.

يمكن تنفيذ طريقة writable._writev() بالإضافة إلى أو بدلاً من writable._write() في تنفيذ التيارات القادرة على معالجة أجزاء متعددة من البيانات في وقت واحد. إذا تم تنفيذه، وإذا كانت هناك بيانات مؤقتة من كتابات سابقة، فسيتم استدعاء _writev() بدلاً من _write().

تتم إضافة بادئة writable._writev() بعلامة تحتية لأنها داخلية للفئة التي تُعرّفها، ويجب ألا يتم استدعاؤها مباشرةً بواسطة برامج المستخدم.

writable._destroy(err, callback)

تمت الإضافة في: v8.0.0

  • err <Error> خطأ محتمل.
  • callback <Function> دالة استدعاء تأخذ وسيطة خطأ اختيارية.

يتم استدعاء طريقة _destroy() بواسطة writable.destroy(). يمكن تجاوزها بواسطة فئات فرعية ولكن يجب ألا يتم استدعاؤها مباشرة.

writable._final(callback)

مضاف في: v8.0.0

  • callback <Function> استدعاء هذه الدالة (اختياريًا مع وسيطة خطأ) عند الانتهاء من كتابة أي بيانات متبقية.

يجب عدم استدعاء طريقة _final() مباشرة. يمكن تنفيذها بواسطة فئات فرعية، وإذا كان الأمر كذلك، فسيتم استدعاءها بواسطة طرق الفئة Writable الداخلية فقط.

ستتم استدعاء هذه الدالة الاختيارية قبل إغلاق التدفق، مما يؤخر حدث 'finish' حتى يتم استدعاء callback. هذا مفيد لإغلاق الموارد أو كتابة البيانات المخزنة مؤقتًا قبل انتهاء التدفق.

أخطاء أثناء الكتابة

يجب نشر الأخطاء التي تحدث أثناء معالجة طرق writable._write()، writable._writev() و writable._final() عن طريق استدعاء الدالة المُرتجعة ومرور الخطأ كوسيطة أولى. يؤدي إلقاء خطأ Error من داخل هذه الطرق أو إصدار حدث 'error' يدويًا إلى سلوك غير محدد.

إذا كان تدفق Readable يمر عبر أنبوب إلى تدفق Writable عندما يصدر Writable خطأ، فسيتم إلغاء أنبوب تدفق Readable.

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'))
    } else {
      callback()
    }
  },
})

مثال على تدفق مكتوب

يوضح ما يلي تنفيذًا بسيطًا (وبلا معنى إلى حد ما) لتدفق Writable مخصص. في حين أن مثيل تدفق Writable المحدد هذا ليس له أي فائدة خاصة، إلا أن المثال يوضح كل العناصر المطلوبة لمثيل تدفق Writable مخصص:

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'))
    } else {
      callback()
    }
  }
}

فك تشفير المخازن المؤقتة في دفق قابل للكتابة

يُعد فك تشفير المخازن المؤقتة مهمة شائعة، على سبيل المثال، عند استخدام المُحولات التي يكون مُدخلها سلسلة نصية. هذه ليست عملية تافهة عند استخدام ترميز أحرف متعدد البايت، مثل UTF-8. يوضح المثال التالي كيفية فك تشفير السلاسل متعددة البايت باستخدام StringDecoder و Writable.

js
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')

class StringWritable extends Writable {
  constructor(options) {
    super(options)
    this._decoder = new StringDecoder(options?.defaultEncoding)
    this.data = ''
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk)
    }
    this.data += chunk
    callback()
  }
  _final(callback) {
    this.data += this._decoder.end()
    callback()
  }
}

const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()

w.write('currency: ')
w.write(euro[0])
w.end(euro[1])

console.log(w.data) // currency: €

تنفيذ دفق قابل للقراءة

يتم توسيع فئة stream.Readable لتنفيذ دفق Readable.

يجب على تيارات Readable المخصصة استدعاء مُنشئ new stream.Readable([options]) وتنفيذ طريقة readable._read().

new stream.Readable([options])

[History]

الإصدارالتغييرات
v22.0.0زيادة قيمة highWaterMark الافتراضية.
v15.5.0دعم تمرير AbortSignal.
v14.0.0تغيير قيمة autoDestroy الافتراضية إلى true.
v11.2.0, v10.16.0إضافة خيار autoDestroy لإلغاء destroy() للدفق تلقائيًا عند إصدار 'end' أو حدوث أخطاء.
  • options <Object>
    • highWaterMark <number> الحد الأقصى لـعدد البايتات التي سيتم تخزينها في المخزن المؤقت الداخلي قبل التوقف عن القراءة من المورد الأساسي. الافتراضي: 65536 (64 كيلوبايت)، أو 16 لتيارات objectMode.
    • encoding <string> إذا تم تحديده، فسيتم فك تشفير المخازن المؤقتة إلى سلاسل نصية باستخدام الترميز المحدد. الافتراضي: null.
    • objectMode <boolean> ما إذا كان يجب أن يتصرف هذا الدفق كدفق من الكائنات. بمعنى أن stream.read(n) يُرجع قيمة واحدة بدلاً من Buffer بحجم n. الافتراضي: false.
    • emitClose <boolean> ما إذا كان يجب على الدفق إصدار 'close' بعد تدميره. الافتراضي: true.
    • read <Function> التنفيذ لطريقة stream._read().
    • destroy <Function> التنفيذ لطريقة stream._destroy().
    • construct <Function> التنفيذ لطريقة stream._construct().
    • autoDestroy <boolean> ما إذا كان يجب على هذا الدفق استدعاء .destroy() تلقائيًا بنفسه بعد الانتهاء. الافتراضي: true.
    • signal <AbortSignal> إشارة تمثل إلغاءً محتملًا.
js
const { Readable } = require('node:stream')

class MyReadable extends Readable {
  constructor(options) {
    // يستدعي مُنشئ stream.Readable(options).
    super(options)
    // ...
  }
}

أو، عند استخدام مُنشئات على طراز ما قبل ES6:

js
const { Readable } = require('node:stream')
const util = require('node:util')

function MyReadable(options) {
  if (!(this instanceof MyReadable)) return new MyReadable(options)
  Readable.call(this, options)
}
util.inherits(MyReadable, Readable)

أو، باستخدام نهج المُنشئ المُبسط:

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    // ...
  },
})

سيؤدي استدعاء abort على AbortController المقابل لـ AbortSignal المُمرر إلى نفس طريقة استدعاء .destroy(new AbortError()) على القابل للقراءة المُنشأ.

js
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
  read(size) {
    // ...
  },
  signal: controller.signal,
})
// لاحقًا، ألغِ العملية وأغلق الدفق
controller.abort()

readable._construct(callback)

مضاف في: v15.0.0

  • callback <Function> اتصل بهذه الدالة (اختياريًا مع وسيطة خطأ) عندما ينتهي تدفق الإعداد.

يجب ألا يتم استدعاء طريقة _construct() مباشرة. يمكن تنفيذه بواسطة فئات فرعية، وإذا كان الأمر كذلك، فسيتم استدعاؤه بواسطة طرق فئة Readable الداخلية فقط.

ستتم جدولة هذه الدالة الاختيارية في التكرار التالي بواسطة مُنشئ التدفق، مما يؤخر أي استدعاءات _read() و _destroy() حتى يتم استدعاء callback. هذا مفيد لتهيئة الحالة أو تهيئة الموارد بشكل غير متزامن قبل استخدام التدفق.

js
const { Readable } = require('node:stream')
const fs = require('node:fs')

class ReadStream extends Readable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _read(n) {
    const buf = Buffer.alloc(n)
    fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
      if (err) {
        this.destroy(err)
      } else {
        this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
      }
    })
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

readable._read(size)

مضاف في: v0.9.4

  • size <number> عدد البايتات التي سيتم قراءتها بشكل غير متزامن

يجب ألا يتم استدعاء هذه الدالة مباشرة بواسطة رمز التطبيق. يجب تنفيذه بواسطة فئات فرعية، ويتم استدعاؤه بواسطة طرق فئة Readable الداخلية فقط.

يجب أن توفر جميع تنفيذات تدفق Readable تنفيذًا لطريقة readable._read() لجلب البيانات من المورد الأساسي.

عندما يتم استدعاء readable._read()، إذا كانت البيانات متاحة من المورد، فيجب أن يبدأ التنفيذ في دفع هذه البيانات إلى قائمة الانتظار للقراءة باستخدام طريقة this.push(dataChunk). سيتم استدعاء _read() مرة أخرى بعد كل استدعاء لـ this.push(dataChunk) بمجرد أن يكون التدفق جاهزًا لقبول المزيد من البيانات. قد يستمر _read() في القراءة من المورد ودفع البيانات حتى تقوم readable.push() بإرجاع false. فقط عندما يتم استدعاء _read() مرة أخرى بعد توقفها، يجب أن تستأنف دفع بيانات إضافية إلى قائمة الانتظار.

بمجرد استدعاء طريقة readable._read()، لن يتم استدعاؤها مرة أخرى حتى يتم دفع المزيد من البيانات من خلال طريقة readable.push(). لن تتسبب البيانات الفارغة مثل المخازن المؤقتة والصفائف الفارغة في استدعاء readable._read().

وسيطه size استشارية. بالنسبة للتنفيذات التي تكون فيها "القراءة" عملية واحدة تُرجع البيانات، يمكن استخدام وسيطة size لتحديد مقدار البيانات التي سيتم جلبها. قد تتجاهل التنفيذات الأخرى هذه الوسيطة، وتقدم البيانات ببساطة عندما تصبح متاحة. ليست هناك حاجة "لانتظار" حتى تتوفر بايتات size قبل استدعاء stream.push(chunk).

تمت إضافة بادئة _ إلى طريقة readable._read() لأنها داخلية للفئة التي تُعرّفها، ويجب ألا يتم استدعاؤها مباشرةً بواسطة برامج المستخدم.

readable._destroy(err, callback)

مضاف في: v8.0.0

  • err <Error> خطأ محتمل.
  • callback <Function> دالة استدعاء راجعة تأخذ وسيطة خطأ اختيارية.

تُستدعى طريقة _destroy() بواسطة readable.destroy(). يمكن للفئات الفرعية تجاوزها، ولكن يجب عدم استدعائها مباشرة.

readable.push(chunk[, encoding])

[السجل]

الإصدارالتغييرات
v22.0.0, v20.13.0يمكن الآن أن تكون وسيطة chunk مثيلًا من TypedArray أو DataView.
v8.0.0يمكن الآن أن تكون وسيطة chunk مثيلًا من Uint8Array.
  • chunk <Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> جزء من البيانات المراد دفعه إلى قائمة الانتظار للقراءة. بالنسبة للتيارات التي لا تعمل في وضع الكائن، يجب أن يكون chunk عبارة عن <string>، أو <Buffer>، أو <TypedArray> أو <DataView>. بالنسبة لتيارات وضع الكائن، يمكن أن يكون chunk أي قيمة جافا سكريبت.
  • encoding <string> ترميز أجزاء السلسلة. يجب أن يكون ترميزًا صالحًا لـ Buffer، مثل 'utf8' أو 'ascii'.
  • القيمة المُرجعه: <boolean> true إذا كان من الممكن الاستمرار في دفع أجزاء إضافية من البيانات؛ false خلاف ذلك.

عندما يكون chunk عبارة عن <Buffer>، أو <TypedArray>، أو <DataView> أو <string>، سيتم إضافة جزء البيانات إلى قائمة الانتظار الداخلية للمستخدمين لتستهلكها. يشير تمرير chunk كـ null إلى نهاية الدفق (EOF)، وبعد ذلك لا يمكن كتابة المزيد من البيانات.

عندما يعمل Readable في وضع الإيقاف المؤقت، يمكن قراءة البيانات المضافة باستخدام readable.push() من خلال استدعاء طريقة readable.read() عند إصدار حدث 'readable'.

عندما يعمل Readable في وضع التدفق، سيتم تسليم البيانات المضافة باستخدام readable.push() من خلال إصدار حدث 'data'.

صُممت طريقة readable.push() لتكون مرنة قدر الإمكان. على سبيل المثال، عند لف مصدر منخفض المستوى يوفر نوعًا ما من آلية الإيقاف المؤقت/الاستئناف، ودالة استدعاء بيانات، يمكن لف المصدر منخفض المستوى بواسطة مثيل Readable مخصص:

js
// `_source` هو كائن به طرق readStop() و readStart()،
// وعضو `ondata` الذي يتم استدعاؤه عندما يكون لديه بيانات، و
// وعضو `onend` الذي يتم استدعاؤه عندما تنتهي البيانات.

class SourceWrapper extends Readable {
  constructor(options) {
    super(options)

    this._source = getLowLevelSourceObject()

    // في كل مرة توجد بيانات، قم بدفعها إلى المخزن المؤقت الداخلي.
    this._source.ondata = chunk => {
      // إذا قامت push() بإرجاع false، فتوقف عن القراءة من المصدر.
      if (!this.push(chunk)) this._source.readStop()
    }

    // عندما ينتهي المصدر، قم بدفع جزء `null` الذي يشير إلى نهاية الملف.
    this._source.onend = () => {
      this.push(null)
    }
  }
  // سيتم استدعاء _read() عندما يريد الدفق سحب المزيد من البيانات.
  // يتم تجاهل وسيطة الحجم الاستشاري في هذه الحالة.
  _read(size) {
    this._source.readStart()
  }
}

تُستخدم طريقة readable.push() لدفع المحتوى إلى المخزن المؤقت الداخلي. يمكن تشغيلها بواسطة طريقة readable._read().

بالنسبة للتيارات التي لا تعمل في وضع الكائن، إذا كانت معلمة chunk لـ readable.push() هي undefined، فسيتم التعامل معها كسلسلة أو مخزن مؤقت فارغ. راجع readable.push('') لمزيد من المعلومات.

أخطاء أثناء القراءة

يجب نشر الأخطاء التي تحدث أثناء معالجة readable._read() من خلال طريقة readable.destroy(err) . إن طرح خطأ Error من داخل readable._read() أو إصدار حدث 'error' يدويًا يؤدي إلى سلوك غير محدد.

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    const err = checkSomeErrorCondition()
    if (err) {
      this.destroy(err)
    } else {
      // القيام ببعض العمل.
    }
  },
})

مثال على دفق العد

فيما يلي مثال أساسي على دفق Readable يصدر الأرقام من 1 إلى 1,000,000 بترتيب تصاعدي، ثم ينتهي.

js
const { Readable } = require('node:stream')

class Counter extends Readable {
  constructor(opt) {
    super(opt)
    this._max = 1000000
    this._index = 1
  }

  _read() {
    const i = this._index++
    if (i > this._max) this.push(null)
    else {
      const str = String(i)
      const buf = Buffer.from(str, 'ascii')
      this.push(buf)
    }
  }
}

تنفيذ دفق ثنائي الاتجاه

دفق Duplex هو دفق ينفذ كلاً من Readable و Writable ، مثل اتصال مقبس TCP.

نظرًا لأن JavaScript لا يدعم الميراث المتعدد، يتم توسيع فئة stream.Duplex لتنفيذ دفق Duplex (على عكس توسيع فئتي stream.Readable و stream.Writable).

ترث فئة stream.Duplex نموذجيًا من stream.Readable و طفيليًا من stream.Writable، لكن instanceof ستعمل بشكل صحيح لكلا الفئتين الأساسيتين نظرًا لإلغاء تجاوز Symbol.hasInstance على stream.Writable.

يجب أن تستدعي دفقات Duplex المخصصة مُنشئ new stream.Duplex([options]) وتنفذ كلاً من طريقتي readable._read() و writable._write().

new stream.Duplex(options)

[السجل]

الإصدارالتغييرات
v8.4.0تم دعم خيارات readableHighWaterMark و writableHighWaterMark الآن.
  • options <Object> تُمرَّر إلى كل من مُنشئي Writable و Readable. كما أن لديها الحقول التالية:
    • allowHalfOpen <boolean> إذا تم تعيينه على false، فسوف يُنهي التدفق تلقائيًا الجانب القابل للكتابة عندما ينتهي الجانب القابل للقراءة. الافتراضي: true.
    • readable <boolean> يُعيّن ما إذا كان يجب أن يكون Duplex قابلًا للقراءة. الافتراضي: true.
    • writable <boolean> يُعيّن ما إذا كان يجب أن يكون Duplex قابلًا للكتابة. الافتراضي: true.
    • readableObjectMode <boolean> يُعيّن objectMode للجانب القابل للقراءة من التدفق. ليس له أي تأثير إذا كان objectMode يساوي true. الافتراضي: false.
    • writableObjectMode <boolean> يُعيّن objectMode للجانب القابل للكتابة من التدفق. ليس له أي تأثير إذا كان objectMode يساوي true. الافتراضي: false.
    • readableHighWaterMark <number> يُعيّن highWaterMark للجانب القابل للقراءة من التدفق. ليس له أي تأثير إذا تم توفير highWaterMark.
    • writableHighWaterMark <number> يُعيّن highWaterMark للجانب القابل للكتابة من التدفق. ليس له أي تأثير إذا تم توفير highWaterMark.
js
const { Duplex } = require('node:stream')

class MyDuplex extends Duplex {
  constructor(options) {
    super(options)
    // ...
  }
}

أو، عند استخدام مُنشئات نمط ما قبل ES6:

js
const { Duplex } = require('node:stream')
const util = require('node:util')

function MyDuplex(options) {
  if (!(this instanceof MyDuplex)) return new MyDuplex(options)
  Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)

أو، باستخدام نهج المُنشئ المُبسّط:

js
const { Duplex } = require('node:stream')

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  },
})

عند استخدام pipeline:

js
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')

pipeline(
  fs.createReadStream('object.json').setEncoding('utf8'),
  new Transform({
    decodeStrings: false, // قبول إدخال نصي بدلاً من العُروض
    construct(callback) {
      this.data = ''
      callback()
    },
    transform(chunk, encoding, callback) {
      this.data += chunk
      callback()
    },
    flush(callback) {
      try {
        // تأكد من أنّه JSON صحيح.
        JSON.parse(this.data)
        this.push(this.data)
        callback()
      } catch (err) {
        callback(err)
      }
    },
  }),
  fs.createWriteStream('valid-object.json'),
  err => {
    if (err) {
      console.error('failed', err)
    } else {
      console.log('completed')
    }
  }
)

مثال تيار ثنائي الاتجاه

يوضح ما يلي مثالاً بسيطًا لتيار Duplex يلف كائن مصدر منخفض المستوى افتراضي يمكن كتابة البيانات إليه، وقراءة البيانات منه، وإن كان ذلك باستخدام واجهة برمجة تطبيقات غير متوافقة مع تيارات Node.js. يوضح ما يلي مثالًا بسيطًا لتيار Duplex يخزن البيانات المكتوبة الواردة عبر واجهة Writable التي تُقرأ مرة أخرى عبر واجهة Readable.

js
const { Duplex } = require('node:stream')
const kSource = Symbol('source')

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options)
    this[kSource] = source
  }

  _write(chunk, encoding, callback) {
    // المصدر الأساسي يتعامل فقط مع السلاسل النصية.
    if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
    this[kSource].writeSomeData(chunk)
    callback()
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding))
    })
  }
}

الجانب الأكثر أهمية في تيار Duplex هو أن جانبي Readable و Writable يعملان بشكل مستقل عن بعضهما البعض على الرغم من وجودهما معًا ضمن مثيل كائن واحد.

تيارات ثنائية الاتجاه في وضع الكائن

بالنسبة لتيارات Duplex، يمكن تعيين objectMode حصريًا لإما جانب Readable أو Writable باستخدام خيارات readableObjectMode و writableObjectMode على التوالي.

في المثال التالي، على سبيل المثال، يتم إنشاء تيار Transform جديد (وهو نوع من تيار Duplex) يحتوي على جانب Writable في وضع الكائن يقبل الأرقام الجافا سكريبت التي يتم تحويلها إلى سلاسل سداسية عشرية على جانب Readable.

js
const { Transform } = require('node:stream')

// جميع تيارات Transform هي أيضًا تيارات Duplex.
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // تحويل الشريحة إلى رقم إذا لزم الأمر.
    chunk |= 0

    // تحويل الشريحة إلى شيء آخر.
    const data = chunk.toString(16)

    // دفع البيانات إلى قائمة الانتظار القابلة للقراءة.
    callback(null, '0'.repeat(data.length % 2) + data)
  },
})

myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))

myTransform.write(1)
// يطبع: 01
myTransform.write(10)
// يطبع: 0a
myTransform.write(100)
// يطبع: 64

تنفيذ دفق التحويل

دفق Transform هو دفق Duplex حيث يتم حساب الإخراج بطريقة ما من الإدخال. تتضمن الأمثلة دفقات zlib أو دفقات crypto التي تقوم بضغط البيانات أو تشفيرها أو فك تشفيرها.

لا يوجد شرط أن يكون حجم الإخراج هو نفس حجم الإدخال، أو نفس عدد الأجزاء، أو الوصول في نفس الوقت. على سبيل المثال، لن يكون لدفق Hash سوى جزء واحد فقط من الإخراج يتم توفيره عند انتهاء الإدخال. سيُنتج دفق zlib إخراجًا أصغر بكثير أو أكبر بكثير من إدخاله.

يتم توسيع فئة stream.Transform لتنفيذ دفق Transform.

ترث فئة stream.Transform نموذجيًا من stream.Duplex وتنفذ إصداراتها الخاصة من طرق writable._write() و readable._read(). يجب على تنفيذات Transform المخصصة تنفيذ طريقة transform._transform() و يجوز لها أيضًا تنفيذ طريقة transform._flush().

يجب توخي الحذر عند استخدام دفقات Transform نظرًا لأن البيانات المكتوبة في الدفق يمكن أن تسبب توقف جانب Writable من الدفق إذا لم يتم استهلاك الإخراج على جانب Readable.

new stream.Transform([options])

js
const { Transform } = require('node:stream')

class MyTransform extends Transform {
  constructor(options) {
    super(options)
    // ...
  }
}

أو، عند استخدام مُنشئات على طراز ما قبل ES6:

js
const { Transform } = require('node:stream')
const util = require('node:util')

function MyTransform(options) {
  if (!(this instanceof MyTransform)) return new MyTransform(options)
  Transform.call(this, options)
}
util.inherits(MyTransform, Transform)

أو، باستخدام نهج المُنشئ المبسط:

js
const { Transform } = require('node:stream')

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  },
})

الحدث: 'end'

حدث 'end' من فئة stream.Readable. يُصدر حدث 'end' بعد إخراج جميع البيانات، وهو ما يحدث بعد استدعاء دالة المُعاودة في transform._flush(). في حالة حدوث خطأ، يجب عدم إصدار 'end'.

الحدث: 'finish'

حدث 'finish' من فئة stream.Writable. يُصدر حدث 'finish' بعد استدعاء stream.end() ومعالجة جميع الأجزاء بواسطة stream._transform(). في حالة حدوث خطأ، يجب عدم إصدار 'finish'.

transform._flush(callback)

  • callback <Function> دالة مُعاودة (اختياريًا مع وسيطة خطأ وبيانات) ليتم استدعاؤها عندما يتم إفراغ البيانات المتبقية.

يجب ألا يتم استدعاء هذه الدالة مباشرةً بواسطة رمز التطبيق. يجب تنفيذها بواسطة فئات فرعية، ويجب استدعاؤها بواسطة طرق فئة Readable الداخلية فقط.

في بعض الحالات، قد تحتاج عملية التحويل إلى إصدار جزء إضافي من البيانات في نهاية التدفق. على سبيل المثال، سيتخزن تدفق ضغط zlib مقدارًا من الحالة الداخلية المستخدمة لضغط الإخراج بشكل مثالي. ومع ذلك، عند انتهاء التدفق، يجب إفراغ تلك البيانات الإضافية حتى تكون البيانات المضغوطة كاملة.

قد تُنفذ عمليات تنفيذ Transform المُخصصة طريقة transform._flush(). سيتم استدعاء هذه الطريقة عندما لا توجد بيانات مكتوبة أخرى يتم استهلاكها، ولكن قبل إصدار حدث 'end' الذي يشير إلى نهاية تدفق Readable.

ضمن تنفيذ transform._flush(), قد يتم استدعاء طريقة transform.push() صفر أو أكثر من المرات، حسب الاقتضاء. يجب استدعاء دالة callback عندما تكتمل عملية الإفراغ.

تمت إضافة بادئة سطر سفلي لـ transform._flush() لأنها داخلية للفئة التي تُعرّفها، ويجب ألا يتم استدعاؤها مباشرةً بواسطة برامج المستخدم.

transform._transform(chunk, encoding, callback)

  • chunk <Buffer> | <string> | <any> الـ Buffer المراد تحويله، مُحوَّل من سلسلة النصوص المُمرَّرة إلى stream.write(). إذا كان خيار decodeStrings الخاص بالدفق مُعيَّنًا على false أو كان الدفق يعمل في وضع الكائن، فلن يتم تحويل الجزء ولن يكون سوى ما تم تمريره إلى stream.write().
  • encoding <string> إذا كان الجزء عبارة عن سلسلة نصية، فهذا هو نوع الترميز. إذا كان الجزء عبارة عن مُخزن مؤقت، فهذه هي القيمة الخاصة 'buffer'. تجاهلها في هذه الحالة.
  • callback <Function> دالة مُنعكس (اختياريًا مع وسيطة خطأ وبيانات) ليتم استدعاؤها بعد معالجة الجزء المُقدَّم.

يجب ألا يتم استدعاء هذه الدالة مباشرةً بواسطة رمز التطبيق. يجب تنفيذها بواسطة فئات فرعية، واستدعائها بواسطة طرق فئة Readable الداخلية فقط.

يجب أن توفر جميع تنفيذات دفق Transform طريقة _transform() لقبول الإدخال وإنتاج المخرجات. تتعامل تنفيذ transform._transform() مع البايتات التي يتم كتابتها، وتحسب المخرجات، ثم تمرر هذه المخرجات إلى الجزء القابل للقراءة باستخدام طريقة transform.push().

يمكن استدعاء طريقة transform.push() صفر أو أكثر من مرة لإنشاء مخرجات من جزء إدخال واحد، اعتمادًا على مقدار ما يجب إخراجه نتيجةً للجزء.

من الممكن ألا يتم إنشاء أي مخرجات من أي جزء معين من بيانات الإدخال.

يجب استدعاء دالة callback فقط عند استهلاك الجزء الحالي بالكامل. يجب أن تكون الوسيطة الأولى المُمرَّرة إلى callback عبارة عن كائن Error إذا حدث خطأ أثناء معالجة الإدخال أو null بخلاف ذلك. إذا تم تمرير وسيطة ثانية إلى callback، فسيتم توجيهها إلى طريقة transform.push()، ولكن فقط إذا كانت الوسيطة الأولى خاطئة. بعبارة أخرى، ما يلي متكافئ:

js
transform.prototype._transform = function (data, encoding, callback) {
  this.push(data)
  callback()
}

transform.prototype._transform = function (data, encoding, callback) {
  callback(null, data)
}

تتم إضافة بادئة transform._transform() بعلامة سفلية لأنها داخلية للفئة التي تُعرّفها، ويجب ألا يتم استدعاؤها مباشرةً بواسطة برامج المستخدم.

لا يتم أبدًا استدعاء transform._transform() بالتوازي؛ تُنفذ الدفقات آلية قائمة انتظار، ولكي تتلقى الجزء التالي، يجب استدعاء callback، إما بشكل متزامن أو غير متزامن.

الصنف: stream.PassThrough

يُعدُّ صنف stream.PassThrough تنفيذاً بسيطاً لتيار Transform الذي يُمرِّر ببساطة البايتات الداخلة إلى المخرجات. ويهدف هذا الصنف أساساً إلى الأمثلة والاختبارات، ولكن هناك بعض حالات الاستخدام حيث يكون stream.PassThrough مفيداً كحجر بناء لأنواع جديدة من التيارات.

ملاحظات إضافية

توافق التيارات مع المُولِّدات المتزامنة و المُكرِّرات المتزامنة

مع دعم المُولِّدات المتزامنة و المُكرِّرات في جافاسكريبت، أصبحت المُولِّدات المتزامنة بفعالية بنية لغوية أساسية للتيارات في هذه المرحلة.

فيما يلي بعض حالات التشغيل البيني الشائعة لاستخدام تيارات Node.js مع المُولِّدات المتزامنة و المُكرِّرات المتزامنة.

استهلاك التيارات القابلة للقراءة باستخدام المُكرِّرات المتزامنة

js
;(async function () {
  for await (const chunk of readable) {
    console.log(chunk)
  }
})()

تسجِّل المُكرِّرات المتزامنة مُعالِج أخطاء دائمًا على التيار لمنع أي أخطاء غير مُعالجة بعد الإلغاء.

إنشاء تيارات قابلة للقراءة باستخدام المُولِّدات المتزامنة

يمكن إنشاء تيار Node.js قابل للقراءة من مُولِّد متزامن باستخدام طريقة الأداة المساعدة Readable.from() :

js
const { Readable } = require('node:stream')

const ac = new AbortController()
const signal = ac.signal

async function* generate() {
  yield 'a'
  await someLongRunningFn({ signal })
  yield 'b'
  yield 'c'
}

const readable = Readable.from(generate())
readable.on('close', () => {
  ac.abort()
})

readable.on('data', chunk => {
  console.log(chunk)
})

الإرسال الأنبوبي إلى التيارات القابلة للكتابة من المُكرِّرات المتزامنة

عند الكتابة في تيار قابل للكتابة من مُكرِّر متزامن، تأكد من المعالجة الصحيحة للضغط العكسي والأخطاء. يُلخِّص stream.pipeline() معالجة الضغط العكسي والأخطاء ذات الصلة بالضغط العكسي:

js
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')

const writable = fs.createWriteStream('./file')

const ac = new AbortController()
const signal = ac.signal

const iterator = createIterator({ signal })

// نمط الاستدعاء
pipeline(iterator, writable, (err, value) => {
  if (err) {
    console.error(err)
  } else {
    console.log(value, 'value returned')
  }
}).on('close', () => {
  ac.abort()
})

// نمط الوعد
pipelinePromise(iterator, writable)
  .then(value => {
    console.log(value, 'value returned')
  })
  .catch(err => {
    console.error(err)
    ac.abort()
  })

التوافق مع إصدارات Node.js الأقدم

قبل Node.js 0.10، كانت واجهة تدفق Readable أبسط، ولكنها كانت أيضًا أقل قوة وأقل فائدة.

  • بدلاً من انتظار المكالمات إلى طريقة stream.read()، ستبدأ أحداث 'data' في الإنبعاث على الفور. كانت التطبيقات التي تحتاج إلى القيام ببعض العمل لتحديد كيفية التعامل مع البيانات مطلوبة لتخزين البيانات المقروءة في المخازن المؤقتة حتى لا تضيع البيانات.
  • كانت طريقة stream.pause() استشارية، وليست مضمونة. وهذا يعني أنه كان لا يزال من الضروري أن تكون مستعدًا لتلقي أحداث 'data' حتى عندما يكون التدفق في حالة إيقاف مؤقت.

في Node.js 0.10، تم إضافة فئة Readable. من أجل التوافق مع برامج Node.js الأقدم، تتحول تيارات Readable إلى "وضع التدفق" عند إضافة مُعالِج أحداث 'data'، أو عند استدعاء طريقة stream.resume(). والنتيجة هي أنه، حتى عندما لا تستخدم الطريقة الجديدة stream.read() وحدث 'readable'، لم يعد من الضروري القلق بشأن فقدان أجزاء 'data'.

بينما ستستمر معظم التطبيقات في العمل بشكل طبيعي، إلا أن هذا يقدم حالة حدية في الظروف التالية:

  • لم يتم إضافة أي مُستمع حدث 'data'.
  • لم يتم استدعاء طريقة stream.resume() أبدًا.
  • لم يتم توصيل التدفق بأي وجهة قابلة للكتابة.

على سبيل المثال، ضع في اعتبارك التعليمات البرمجية التالية:

js
// تحذير! معطوبة!
net
  .createServer(socket => {
    // نقوم بإضافة مُستمع 'end'، لكننا لا نستهلك البيانات أبدًا.
    socket.on('end', () => {
      // لن يصل أبدًا إلى هنا.
      socket.end('The message was received but was not processed.\n')
    })
  })
  .listen(1337)

قبل Node.js 0.10، كانت بيانات الرسالة الواردة تُرمى ببساطة. ومع ذلك، في Node.js 0.10 وما بعده، يبقى المقبس معلقًا إلى الأبد.

الحل في هذه الحالة هو استدعاء طريقة stream.resume() لبدء تدفق البيانات:

js
// حل بديل.
net
  .createServer(socket => {
    socket.on('end', () => {
      socket.end('The message was received but was not processed.\n')
    })

    // بدء تدفق البيانات، مع تجاهلها.
    socket.resume()
  })
  .listen(1337)

بالإضافة إلى تيارات Readable الجديدة التي تنتقل إلى وضع التدفق، يمكن لف تيارات الطراز السابق لـ 0.10 في فئة Readable باستخدام طريقة readable.wrap().

readable.read(0)

هناك بعض الحالات التي يكون من الضروري فيها تشغيل تحديث لآليات تدفق القراءة الأساسية، دون استهلاك أي بيانات بالفعل. في مثل هذه الحالات، من الممكن استدعاء readable.read(0)، والذي سيُعيد دائمًا null.

إذا كان المخزن المؤقت الداخلي للقراءة أقل من highWaterMark، ولم يكن التدفق يقرأ حاليًا، فإن استدعاء stream.read(0) سيُشغّل استدعاءً منخفض المستوى لـ stream._read().

في حين أن معظم التطبيقات لن تحتاج أبدًا إلى القيام بذلك، إلا أن هناك حالات داخل Node.js يتم فيها ذلك، خاصةً في الأجزاء الداخلية من فئة تدفق Readable.

readable.push('')

لا يُوصى باستخدام readable.push('').

إن دفع سلسلة بايت صفري <string>، <Buffer>، <TypedArray> أو <DataView> إلى تدفق ليس في وضع الكائن له تأثير جانبي مثير للاهتمام. لأنه هو استدعاء لـ readable.push()، فإن الاستدعاء سينهي عملية القراءة. ومع ذلك، نظرًا لأن الوسيطة هي سلسلة فارغة، فلن يتم إضافة أي بيانات إلى المخزن المؤقت للقراءة، لذلك لا يوجد شيء للمستخدم لاستهلاكه.

عدم تطابق highWaterMark بعد استدعاء readable.setEncoding()

سيؤدي استخدام readable.setEncoding() إلى تغيير سلوك كيفية عمل highWaterMark في الوضع غير الكائن.

عادةً، يتم قياس حجم المخزن المؤقت الحالي مقابل highWaterMark بوحدات البايت. ومع ذلك، بعد استدعاء setEncoding()، ستبدأ دالة المقارنة في قياس حجم المخزن المؤقت بوحدات الحروف.

ليست هذه مشكلة في الحالات الشائعة مع latin1 أو ascii. لكن يُنصح بالانتباه إلى هذا السلوك عند العمل مع السلاسل التي قد تحتوي على أحرف متعددة البايت.