Skip to content

Поток

[Стабильно: 2 - Стабильно]

Стабильно: 2 Стабильность: 2 - Стабильно

Исходный код: lib/stream.js

Поток — это абстрактный интерфейс для работы со потоковыми данными в Node.js. Модуль node:stream предоставляет API для реализации интерфейса потока.

Node.js предоставляет множество объектов потоков. Например, запрос к HTTP-серверу и process.stdout являются экземплярами потоков.

Потоки могут быть читаемыми, записываемыми или и теми, и другими. Все потоки являются экземплярами EventEmitter.

Для доступа к модулю node:stream:

js
const stream = require('node:stream')

Модуль node:stream полезен для создания новых типов экземпляров потоков. Обычно нет необходимости использовать модуль node:stream для потребления потоков.

Структура этого документа

Этот документ содержит два основных раздела и третий раздел для заметок. В первом разделе объясняется, как использовать существующие потоки в приложении. Во втором разделе объясняется, как создавать новые типы потоков.

Типы потоков

В Node.js есть четыре основных типа потоков:

  • Writable: потоки, в которые можно записывать данные (например, fs.createWriteStream()).
  • Readable: потоки, из которых можно читать данные (например, fs.createReadStream()).
  • Duplex: потоки, которые являются одновременно Readable и Writable (например, net.Socket).
  • Transform: потоки Duplex, которые могут изменять или преобразовывать данные по мере их записи и чтения (например, zlib.createDeflate()).

Кроме того, этот модуль включает в себя служебные функции stream.duplexPair(), stream.pipeline(), stream.finished(), stream.Readable.from() и stream.addAbortSignal().

API потоков Promise

Добавлено в: v15.0.0

API stream/promises предоставляет альтернативный набор асинхронных служебных функций для потоков, которые возвращают объекты Promise, а не используют обратные вызовы. API доступен через require('node:stream/promises') или require('node:stream').promises.

stream.pipeline(source[, ...transforms], destination[, options])

stream.pipeline(streams[, options])

[История]

ВерсияИзменения
v18.0.0, v17.2.0, v16.14.0Добавлен параметр end, который можно установить в false, чтобы предотвратить автоматическое закрытие выходного потока по завершении исходного.
v15.0.0Добавлено в: v15.0.0
js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
  console.log('Конвейер завершен.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Конвейер завершен.')

Для использования AbortSignal передайте его в объект options в качестве последнего аргумента. Когда сигнал прерывается, destroy будет вызван для базового конвейера с ошибкой AbortError.

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  const ac = new AbortController()
  const signal = ac.signal

  setImmediate(() => ac.abort())
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
    signal,
  })
}

run().catch(console.error) // AbortError
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
  await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
  console.error(err) // AbortError
}

API pipeline также поддерживает асинхронные генераторы:

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(
    fs.createReadStream('lowercase.txt'),
    async function* (source, { signal }) {
      source.setEncoding('utf8') // Работа со строками, а не с `Buffer`s.
      for await (const chunk of source) {
        yield await processChunk(chunk, { signal })
      }
    },
    fs.createWriteStream('uppercase.txt')
  )
  console.log('Конвейер завершен.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'

await pipeline(
  createReadStream('lowercase.txt'),
  async function* (source, { signal }) {
    source.setEncoding('utf8') // Работа со строками, а не с `Buffer`s.
    for await (const chunk of source) {
      yield await processChunk(chunk, { signal })
    }
  },
  createWriteStream('uppercase.txt')
)
console.log('Конвейер завершен.')

Не забудьте обработать аргумент signal, переданный в асинхронный генератор. Особенно в случае, когда асинхронный генератор является источником для конвейера (т.е. первый аргумент), иначе конвейер никогда не завершится.

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(async function* ({ signal }) {
    await someLongRunningfn({ signal })
    yield 'asd'
  }, fs.createWriteStream('uppercase.txt'))
  console.log('Конвейер завершен.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
  await someLongRunningfn({ signal })
  yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Конвейер завершен.')

API pipeline предоставляет версию с обратным вызовом:

stream.finished(stream[, options])

[История]

ВерсияИзменения
v19.5.0, v18.14.0Добавлена поддержка ReadableStream и WritableStream.
v19.1.0, v18.13.0Добавлен параметр cleanup.
v15.0.0Добавлено в: v15.0.0
js
const { finished } = require('node:stream/promises')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Поток завершил чтение.')
}

run().catch(console.error)
rs.resume() // Очистить поток.
js
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'

const rs = createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Поток завершил чтение.')
}

run().catch(console.error)
rs.resume() // Очистить поток.

API finished также предоставляет версию с обратным вызовом.

stream.finished() оставляет висячие обработчики событий (в частности, 'error', 'end', 'finish' и 'close') после того, как возвращаемое обещание выполнено или отклонено. Причина этого в том, что непредвиденные события 'error' (из-за неправильной реализации потока) не вызывают неожиданных сбоев. Если это нежелательное поведение, то options.cleanup следует установить в true:

js
await finished(rs, { cleanup: true })

Режим объектов

Все потоки, создаваемые API Node.js, работают исключительно со строками, объектами <Buffer>, <TypedArray> и <DataView>:

  • Strings и Buffers являются наиболее распространёнными типами, используемыми с потоками.
  • TypedArray и DataView позволяют обрабатывать двоичные данные с типами, такими как Int32Array или Uint8Array. При записи TypedArray или DataView в поток Node.js обрабатывает необработанные байты.

Однако реализации потоков могут работать с другими типами значений JavaScript (за исключением null, который имеет специальное назначение в потоках). Такие потоки считаются работающими в «режиме объектов».

Экземпляры потоков переключаются в режим объектов с помощью опции objectMode при создании потока. Попытка переключить существующий поток в режим объектов небезопасна.

Буферизация

И потоки Writable, и потоки Readable будут хранить данные во внутреннем буфере.

Объём потенциально буферизуемых данных зависит от опции highWaterMark, передаваемой в конструктор потока. Для обычных потоков опция highWaterMark указывает общее количество байтов. Для потоков, работающих в режиме объектов, highWaterMark указывает общее количество объектов. Для потоков, работающих со строками (но не декодирующих их), highWaterMark указывает общее количество кодовых единиц UTF-16.

Данные буферизуются в потоках Readable, когда реализация вызывает stream.push(chunk). Если потребитель потока не вызывает stream.read(), данные будут находиться во внутренней очереди до тех пор, пока они не будут использованы.

Как только общий размер внутреннего буфера чтения достигнет порога, заданного highWaterMark, поток временно прекратит чтение данных из базового ресурса, пока не будут использованы текущие буферизованные данные (то есть поток прекратит вызывать внутренний метод readable._read(), который используется для заполнения буфера чтения).

Данные буферизуются в потоках Writable, когда метод writable.write(chunk) вызывается многократно. Пока общий размер внутреннего буфера записи меньше порога, установленного highWaterMark, вызовы writable.write() будут возвращать true. Как только размер внутреннего буфера достигнет или превысит highWaterMark, будет возвращено false.

Ключевая цель API stream, в частности метода stream.pipe(), — ограничить буферизацию данных до приемлемого уровня, чтобы источники и назначение с различной скоростью не перегружали доступную память.

Опция highWaterMark является порогом, а не ограничением: она определяет объём данных, которые поток буферизует, прежде чем перестанет запрашивать больше данных. Она не устанавливает строгое ограничение памяти в целом. Конкретные реализации потоков могут выбирать более строгие ограничения, но это необязательно.

Поскольку потоки Duplex и Transform являются одновременно Readable и Writable, каждый из них поддерживает два отдельных внутренних буфера, используемых для чтения и записи, что позволяет каждой стороне работать независимо от другой, поддерживая при этом соответствующий и эффективный поток данных. Например, экземпляры net.Socket являются потоками Duplex, чья сторона Readable позволяет потреблять данные, полученные от сокета, а чья сторона Writable позволяет записывать данные в сокет. Поскольку данные могут записываться в сокет с большей или меньшей скоростью, чем данные принимаются, каждая сторона должна работать (и буферизовать) независимо от другой.

Механизм внутреннего буферирования является внутренней деталью реализации и может быть изменён в любое время. Однако для некоторых расширенных реализаций внутренние буферы могут быть получены с помощью writable.writableBuffer или readable.readableBuffer. Использование этих не документированных свойств не рекомендуется.

API для потребителей потоков

Почти все приложения Node.js, независимо от их сложности, используют потоки. Ниже приведён пример использования потоков в приложении Node.js, реализующем HTTP-сервер:

js
const http = require('node:http')

const server = http.createServer((req, res) => {
  // `req` — это http.IncomingMessage, читаемый поток.
  // `res` — это http.ServerResponse, записывающий поток.

  let body = ''
  // Получение данных как строки utf8.
  // Если кодировка не задана, будут получены объекты Buffer.
  req.setEncoding('utf8')

  // Читаемые потоки генерируют события 'data' после добавления слушателя.
  req.on('data', chunk => {
    body += chunk
  })

  // Событие 'end' указывает на то, что всё тело было получено.
  req.on('end', () => {
    try {
      const data = JSON.parse(body)
      // Отправить пользователю что-нибудь интересное:
      res.write(typeof data)
      res.end()
    } catch (er) {
      // Ой! Плохой JSON!
      res.statusCode = 400
      return res.end(`error: ${er.message}`)
    }
  })
})

server.listen(1337)

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON

Потоки Writable (например, res в примере) предоставляют методы, такие как write() и end(), которые используются для записи данных в поток.

Потоки Readable используют API EventEmitter для уведомления кода приложения о доступности данных для чтения из потока. Доступные данные могут быть считаны из потока несколькими способами.

И Writable, и Readable потоки используют API EventEmitter различными способами для передачи текущего состояния потока.

Потоки Duplex и Transform являются одновременно Writable и Readable.

Приложения, которые записывают данные в поток или считывают данные из потока, не обязаны напрямую реализовывать интерфейсы потока и, как правило, не будут иметь причин вызывать require('node:stream').

Разработчики, желающие реализовать новые типы потоков, должны обратиться к разделу API для разработчиков потоков.

Записывающие потоки

Записывающие потоки — это абстракция для назначения, в которое записываются данные.

Примеры потоков Writable:

Некоторые из этих примеров на самом деле являются потоками Duplex, которые реализуют интерфейс Writable.

Все потоки Writable реализуют интерфейс, определённый классом stream.Writable.

Хотя конкретные экземпляры потоков Writable могут различаться различными способами, все потоки Writable следуют одной и той же фундаментальной схеме использования, как показано в примере ниже:

js
const myStream = getWritableStreamSomehow()
myStream.write('some data')
myStream.write('some more data')
myStream.end('done writing data')

Класс: stream.Writable

Добавлено в: v0.9.4

Событие: 'close'

[История]

ВерсияИзменения
v10.0.0Добавлен параметр emitClose для указания, должно ли событие 'close' генерироваться при уничтожении.
v0.9.4Добавлено в: v0.9.4

Событие 'close' генерируется, когда поток и любые его базовые ресурсы (например, дескриптор файла) закрыты. Событие указывает на то, что больше не будет генерироваться событий и не будет выполняться дальнейших вычислений.

Поток Writable всегда будет генерировать событие 'close', если он создан с параметром emitClose.

Событие: 'drain'

Добавлено в: v0.9.4

Если вызов stream.write(chunk) возвращает false, событие 'drain' будет генерироваться, когда будет целесообразно возобновить запись данных в поток.

js
// Запись данных в предоставленный записывающий поток миллион раз.
// Будьте внимательны к противодавлению.
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000
  write()
  function write() {
    let ok = true
    do {
      i--
      if (i === 0) {
        // Последний раз!
        writer.write(data, encoding, callback)
      } else {
        // Посмотрим, должны ли мы продолжать или подождать.
        // Не передавайте обратный вызов, потому что мы еще не закончили.
        ok = writer.write(data, encoding)
      }
    } while (i > 0 && ok)
    if (i > 0) {
      // Пришлось остановиться раньше!
      // Запишем еще немного, когда произойдет сброс.
      writer.once('drain', write)
    }
  }
}
Событие: 'error'

Добавлено в: v0.9.4

Событие 'error' генерируется, если произошла ошибка во время записи или передачи данных. Обратный вызов прослушивателя получает один аргумент Error при вызове.

Поток закрывается, когда генерируется событие 'error', если только параметр autoDestroy не был установлен в false при создании потока.

После 'error' не должно генерироваться больше никаких событий, кроме 'close' (включая события 'error').

Событие: 'finish'

Добавлено в: v0.9.4

Событие 'finish' генерируется после вызова метода stream.end(), и все данные были выгружены в базовую систему.

js
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
  console.log('All writes are now complete.')
})
writer.end('This is the end\n')
Событие: 'pipe'

Добавлено в: v0.9.4

  • src <stream.Readable> исходный поток, который передает данные в этот записывающий поток

Событие 'pipe' генерируется, когда вызывается метод stream.pipe() для читаемого потока, добавляя этот записывающий поток к его набору получателей.

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
  console.log('Something is piping into the writer.')
  assert.equal(src, reader)
})
reader.pipe(writer)
Событие: 'unpipe'

Добавлено в: v0.9.4

Событие 'unpipe' генерируется, когда вызывается метод stream.unpipe() для потока Readable, удаляя этот Writable из его набора получателей.

Это также генерируется в случае, если этот поток Writable генерирует ошибку, когда поток Readable передает данные в него.

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
  console.log('Something has stopped piping into the writer.')
  assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()

Добавлено в: v0.11.2

Метод writable.cork() принудительно буферизует все записанные данные в памяти. Забуференные данные будут сброшены при вызове методов stream.uncork() или stream.end().

Основное назначение writable.cork() — обработка ситуации, когда в поток быстро и последовательно записывается несколько небольших фрагментов. Вместо немедленной передачи их в конечный пункт назначения, writable.cork() буферизует все фрагменты до вызова writable.uncork(), который передаст их все в writable._writev(), если он присутствует. Это предотвращает блокировку из-за первого фрагмента, когда данные буферизуются в ожидании обработки первого небольшого фрагмента. Однако использование writable.cork() без реализации writable._writev() может негативно повлиять на пропускную способность.

См. также: writable.uncork(), writable._writev().

writable.destroy([error])

[История]

ВерсияИзменения
v14.0.0Работает как операция без действия в потоке, который уже был уничтожен.
v8.0.0Добавлено в: v8.0.0
  • error <Error> Необязательно, ошибка для генерации события 'error'.
  • Возвращает: <this>

Уничтожает поток. При необходимости генерирует событие 'error' и событие 'close' (если emitClose не установлено в false). После этого вызова запись в записываемый поток завершается, и последующие вызовы write() или end() приведут к ошибке ERR_STREAM_DESTROYED. Это деструктивный и немедленный способ уничтожения потока. Предыдущие вызовы write() могли не быть обработаны и могут вызвать ошибку ERR_STREAM_DESTROYED. Используйте end() вместо destroy, если данные должны быть сброшены перед закрытием, или дождитесь события 'drain' перед уничтожением потока.

js
const { Writable } = require('node:stream')

const myStream = new Writable()

const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
js
const { Writable } = require('node:stream')

const myStream = new Writable()

myStream.destroy()
myStream.on('error', function wontHappen() {})
js
const { Writable } = require('node:stream')

const myStream = new Writable()
myStream.destroy()

myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED

После вызова destroy() любые дальнейшие вызовы будут операциями без действия, и никаких дальнейших ошибок, кроме ошибок из _destroy(), не будут генерироваться как 'error'.

Разработчикам не следует переопределять этот метод, а вместо этого следует реализовать writable._destroy().

writable.closed

Добавлено в: v18.0.0

Имеет значение true после того, как событие 'close' было отправлено.

writable.destroyed

Добавлено в: v8.0.0

Имеет значение true после вызова writable.destroy().

js
const { Writable } = require('node:stream')

const myStream = new Writable()

console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])

[История]

ВерсияИзменения
v22.0.0, v20.13.0Аргумент chunk теперь может быть экземпляром TypedArray или DataView.
v15.0.0Обратный вызов callback вызывается до 'finish' или при ошибке.
v14.0.0Обратный вызов callback вызывается, если отправлено 'finish' или 'error'.
v10.0.0Этот метод теперь возвращает ссылку на writable.
v8.0.0Аргумент chunk теперь может быть экземпляром Uint8Array.
v0.9.4Добавлено в: v0.9.4
  • chunk <string> | <Buffer> | <TypedArray> | <DataView> | <any> Необязательные данные для записи. Для потоков, не работающих в объектном режиме, chunk должен быть <string>, <Buffer>, <TypedArray> или <DataView>. Для потоков в объектном режиме chunk может быть любым значением JavaScript, кроме null.
  • encoding <string> Кодировка, если chunk является строкой
  • callback <Function> Обратный вызов при завершении потока.
  • Возвращает: <this>

Вызов метода writable.end() сигнализирует о том, что больше никаких данных не будет записано в Writable. Необязательные аргументы chunk и encoding позволяют записать один последний дополнительный фрагмент данных непосредственно перед закрытием потока.

Вызов метода stream.write() после вызова stream.end() вызовет ошибку.

js
// Записываем 'hello, ' и затем завершаем запись 'world!'.
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hello, ')
file.end('world!')
// Запись больше не допускается!
writable.setDefaultEncoding(encoding)

[История]

ВерсияИзменения
v6.1.0Этот метод теперь возвращает ссылку на writable.
v0.11.15Добавлено в: v0.11.15
  • encoding <строка> Новая кодировка по умолчанию
  • Возвращает: <this>

Метод writable.setDefaultEncoding() устанавливает кодировку по умолчанию encoding для потока Writable.

writable.uncork()

Добавлено в: v0.11.2

Метод writable.uncork() сбрасывает все данные, буферизованные с момента вызова stream.cork().

При использовании writable.cork() и writable.uncork() для управления буферизацией записи в поток, откладывайте вызовы writable.uncork() с помощью process.nextTick(). Это позволяет группировать все вызовы writable.write(), которые происходят в течение данной фазы цикла событий Node.js.

js
stream.cork()
stream.write('some ')
stream.write('data ')
process.nextTick(() => stream.uncork())

Если метод writable.cork() вызывается несколько раз для одного потока, для сброса буферизованных данных необходимо вызвать writable.uncork() столько же раз.

js
stream.cork()
stream.write('some ')
stream.cork()
stream.write('data ')
process.nextTick(() => {
  stream.uncork()
  // Данные не будут сброшены, пока uncork() не будет вызван второй раз.
  stream.uncork()
})

См. также: writable.cork().

writable.writable

Добавлено в: v11.4.0

Равно true, если безопасно вызывать writable.write(), то есть поток не был уничтожен, не произошла ошибка и он не завершен.

writable.writableAborted

Добавлено в: v18.0.0, v16.17.0

[Стабильно: 1 - Экспериментально]

Стабильно: 1 Стабильность: 1 - Экспериментально

Возвращает, был ли поток уничтожен или произошла ошибка до выдачи события 'finish'.

writable.writableEnded

Добавлено в: v12.9.0

Принимает значение true после вызова writable.end(). Это свойство не указывает, были ли данные очищены; для этого используйте writable.writableFinished.

writable.writableCorked

Добавлено в: v13.2.0, v12.16.0

Количество вызовов writable.uncork(), необходимых для полного снятия блокировки потока.

writable.errored

Добавлено в: v18.0.0

Возвращает ошибку, если поток был уничтожен с ошибкой.

writable.writableFinished

Добавлено в: v12.6.0

Устанавливается в true непосредственно перед событием 'finish'.

writable.writableHighWaterMark

Добавлено в: v9.3.0

Возвращает значение highWaterMark, переданное при создании этого Writable.

writable.writableLength

Добавлено в: v9.4.0

Это свойство содержит количество байтов (или объектов) в очереди, готовых к записи. Значение предоставляет информацию о состоянии highWaterMark.

writable.writableNeedDrain

Добавлено в: v15.2.0, v14.17.0

Принимает значение true, если буфер потока заполнен и поток будет генерировать событие 'drain'.

writable.writableObjectMode

Добавлено в: v12.3.0

Геттер для свойства objectMode заданного потока Writable.

writable[Symbol.asyncDispose]()

Добавлено в: v22.4.0, v20.16.0

[Стабильность: 1 - Экспериментально]

Стабильность: 1 Стабильность: 1 - Экспериментально

Вызывает writable.destroy() с AbortError и возвращает промис, который выполняется после завершения потока.

writable.write(chunk[, encoding][, callback])

[История]

ВерсияИзменения
v22.0.0, v20.13.0Аргумент chunk теперь может быть экземпляром TypedArray или DataView.
v8.0.0Аргумент chunk теперь может быть экземпляром Uint8Array.
v6.0.0Передача null в качестве параметра chunk теперь всегда будет считаться недействительной, даже в объективном режиме.
v0.9.4Добавлено в: v0.9.4
  • chunk <string> | <Buffer> | <TypedArray> | <DataView> | <any> Необязательные данные для записи. Для потоков, не работающих в объективном режиме, chunk должен быть <string>, <Buffer>, <TypedArray> или <DataView>. Для потоков в объективном режиме chunk может быть любым значением JavaScript, кроме null.
  • encoding <string> | <null> Кодировка, если chunk является строкой. По умолчанию: 'utf8'
  • callback <Function> Обратный вызов после того, как этот фрагмент данных будет очищен.
  • Возвращает: <boolean> false, если поток желает, чтобы вызывающий код ожидал события 'drain', прежде чем продолжать запись дополнительных данных; в противном случае true.

Метод writable.write() записывает некоторые данные в поток и вызывает предоставленный callback после того, как данные будут полностью обработаны. Если возникает ошибка, callback будет вызван с ошибкой в качестве первого аргумента. callback вызывается асинхронно и до того, как будет выброшено событие 'error'.

Возвращаемое значение равно true, если внутренний буфер меньше highWaterMark, настроенного при создании потока после принятия chunk. Если возвращается false, дальнейшие попытки записи данных в поток должны быть остановлены до тех пор, пока не будет вызвано событие 'drain'.

Пока поток не осушается, вызовы write() будут буферизовать chunk и возвращать false. После того, как все текущие буферизованные фрагменты будут осушены (приняты для доставки операционной системой), будет вызвано событие 'drain'. После того, как write() вернет false, не записывайте больше фрагментов, пока не будет вызвано событие 'drain'. Хотя вызов write() в потоке, который не осушается, разрешен, Node.js будет буферизовать все записанные фрагменты до тех пор, пока не будет достигнуто максимальное использование памяти, после чего он безусловно прервется. Даже до того, как он прервется, высокое использование памяти приведет к низкой производительности сборщика мусора и высокому RSS (который обычно не возвращается в систему, даже после того, как память больше не требуется). Поскольку TCP-сокеты могут никогда не осушаться, если удаленный узел не читает данные, запись в сокет, который не осушается, может привести к уязвимости, которую можно использовать удаленно.

Запись данных, пока поток не осушается, особенно проблематична для Transform, потому что потоки Transform приостанавливаются по умолчанию, пока они не будут переданы по конвейеру или не будет добавлен обработчик событий 'data' или 'readable'.

Если данные для записи могут быть сгенерированы или получены по запросу, рекомендуется инкапсулировать логику в Readable и использовать stream.pipe(). Однако, если вы предпочитаете вызывать write(), можно учитывать противодавление и избегать проблем с памятью, используя событие 'drain':

js
function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb)
  } else {
    process.nextTick(cb)
  }
}

// Ожидание вызова cb перед выполнением любой другой записи.
write('hello', () => {
  console.log('Запись завершена, выполните больше записей сейчас.')
})

Поток Writable в объективном режиме всегда будет игнорировать аргумент encoding.

Читаемые потоки

Читаемые потоки — это абстракция для источника, из которого потребляются данные.

Примеры потоков Readable:

Все потоки Readable реализуют интерфейс, определенный классом stream.Readable.

Два режима чтения

Потоки Readable фактически работают в одном из двух режимов: потоковом и приостановленном. Эти режимы отделены от режима объекта. Поток Readable может находиться в режиме объекта или нет, независимо от того, находится ли он в потоковом режиме или в приостановленном режиме.

  • В потоковом режиме данные автоматически считываются из базовой системы и предоставляются приложению как можно быстрее с помощью событий через интерфейс EventEmitter.
  • В приостановленном режиме метод stream.read() должен вызываться явно для чтения фрагментов данных из потока.

Все потоки Readable начинаются в приостановленном режиме, но могут быть переключены в потоковый режим одним из следующих способов:

Readable может переключиться обратно в приостановленный режим одним из следующих способов:

  • Если нет целевых мест для передачи данных, вызвав метод stream.pause().
  • Если есть целевые места для передачи данных, удалив все целевые места для передачи данных. Несколько целевых мест для передачи данных могут быть удалены вызовом метода stream.unpipe().

Важно помнить, что Readable не будет генерировать данные, пока не будет предоставлен механизм для их потребления или игнорирования. Если механизм потребления отключен или удален, Readable попытается прекратить генерацию данных.

В целях обратной совместимости удаление обработчиков событий 'data' не будет автоматически приостанавливать поток. Кроме того, если есть целевые места для передачи данных, то вызов stream.pause() не гарантирует, что поток останется приостановленным после того, как эти места опустеют и запросят больше данных.

Если Readable переключается в потоковый режим, и нет потребителей, доступных для обработки данных, эти данные будут потеряны. Это может произойти, например, когда вызывается метод readable.resume() без обработчика, прикрепленного к событию 'data', или когда обработчик события 'data' удален из потока.

Добавление обработчика событий 'readable' автоматически останавливает поток данных, и данные должны потребляться через readable.read(). Если обработчик событий 'readable' удален, то поток снова начнет работать, если есть обработчик событий 'data'.

Три состояния

Два режима работы потока Readable — это упрощенное представление более сложного внутреннего управления состоянием, происходящего внутри реализации потока Readable.

В частности, в любой момент времени каждый Readable находится в одном из трех возможных состояний:

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

Когда readable.readableFlowing равно null, механизм потребления данных потока не предоставляется. Поэтому поток не будет генерировать данные. В этом состоянии подключение обработчика события 'data', вызов метода readable.pipe() или вызов метода readable.resume() переключат readable.readableFlowing на true, заставив Readable начать активно генерировать события по мере создания данных.

Вызов readable.pause(), readable.unpipe() или получение обратного давления приведут к тому, что readable.readableFlowing будет установлен как false, временно приостановив поток событий, но не приостановив генерацию данных. В этом состоянии подключение обработчика события 'data' не переключит readable.readableFlowing на true.

js
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()

pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing теперь false.

pass.on('data', chunk => {
  console.log(chunk.toString())
})
// readableFlowing по-прежнему false.
pass.write('ok') // Не будет генерировать 'data'.
pass.resume() // Необходимо вызвать для того, чтобы поток генерировал 'data'.
// readableFlowing теперь true.

Пока readable.readableFlowing равно false, данные могут накапливаться во внутреннем буфере потока.

Выберите один стиль API

API потока Readable развивался в нескольких версиях Node.js и предоставляет несколько способов потребления данных потока. В целом, разработчики должны выбирать один из способов потребления данных и никогда не должны использовать несколько способов для потребления данных из одного потока. В частности, использование комбинации on('data'), on('readable'), pipe() или асинхронных итераторов может привести к неинтуитивному поведению.

Класс: stream.Readable

Добавлено в: v0.9.4

Событие: 'close'

[История]

ВерсияИзменения
v10.0.0Добавлен параметр emitClose для указания, должно ли событие 'close' генерироваться при уничтожении.
v0.9.4Добавлено в: v0.9.4

Событие 'close' генерируется, когда поток и любые его базовые ресурсы (например, дескриптор файла) закрыты. Событие указывает, что больше событий генерироваться не будет, и дальнейшие вычисления выполняться не будут.

Поток Readable всегда будет генерировать событие 'close', если он создан с параметром emitClose.

Событие: 'data'

Добавлено в: v0.9.4

  • chunk <Buffer> | <string> | <any> Фрагмент данных. Для потоков, не работающих в объектном режиме, фрагмент будет строкой или Buffer. Для потоков, работающих в объектном режиме, фрагмент может быть любым значением JavaScript, кроме null.

Событие 'data' генерируется всякий раз, когда поток отказывается от владения фрагментом данных в пользу потребителя. Это может произойти, когда поток переключается в режим потока вызовом readable.pipe(), readable.resume() или путем добавления обработчика обратного вызова к событию 'data'. Событие 'data' также будет генерироваться всякий раз, когда вызывается метод readable.read(), и доступен фрагмент данных для возврата.

Добавление обработчика события 'data' к потоку, который не был явно приостановлен, переведет поток в режим потока. Данные будут передаваться, как только они станут доступны.

Обработчик обратного вызова получит фрагмент данных в виде строки, если для потока указана кодировка по умолчанию с помощью метода readable.setEncoding(); в противном случае данные будут переданы как Buffer.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Получено ${chunk.length} байт данных.`)
})
Событие: 'end'

Добавлено в: v0.9.4

Событие 'end' генерируется, когда больше нет данных для потребления из потока.

Событие 'end' не будет генерироваться, если данные не будут полностью потреблены. Этого можно добиться, переведя поток в режим потока или вызвав stream.read() многократно, пока все данные не будут потреблены.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Получено ${chunk.length} байт данных.`)
})
readable.on('end', () => {
  console.log('Больше данных не будет.')
})
Событие: 'error'

Добавлено в: v0.9.4

Событие 'error' может быть генерировано реализацией Readable в любое время. Как правило, это может произойти, если базовый поток не может генерировать данные из-за внутренней ошибки или когда реализация потока пытается передать недействительный фрагмент данных.

Функция обратного вызова обработчика получит один объект Error.

Событие: 'pause'

Добавлено в: v0.9.4

Событие 'pause' генерируется, когда вызывается stream.pause(), а readableFlowing не равно false.

Событие: 'readable'

[История]

ВерсияИзменения
v10.0.0Событие 'readable' всегда генерируется на следующем тике после вызова .push().
v10.0.0Использование 'readable' требует вызова .read().
v0.9.4Добавлено в: v0.9.4

Событие 'readable' генерируется, когда доступны данные для чтения из потока, вплоть до установленной верхней границы уровня воды (state.highWaterMark). Фактически, это указывает на то, что поток содержит новую информацию в буфере. Если данные доступны в этом буфере, можно вызвать stream.read() для извлечения этих данных. Кроме того, событие 'readable' также может быть генерировано, когда достигнут конец потока.

js
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
  // Сейчас есть данные для чтения.
  let data

  while ((data = this.read()) !== null) {
    console.log(data)
  }
})

Если достигнут конец потока, вызов stream.read() вернет null и вызовет событие 'end'. Это также верно, если данных для чтения никогда не было. Например, в следующем примере foo.txt — это пустой файл:

js
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
  console.log('end')
})

Вывод выполнения этого скрипта:

bash
$ node test.js
readable: null
end

В некоторых случаях подключение обработчика события 'readable' приведет к чтению некоторого количества данных во внутренний буфер.

В целом, механизмы readable.pipe() и события 'data' проще для понимания, чем событие 'readable'. Однако обработка 'readable' может привести к увеличению пропускной способности.

Если одновременно используются 'readable' и 'data', 'readable' имеет приоритет в управлении потоком, т.е. 'data' будет генерироваться только при вызове stream.read(). Свойство readableFlowing станет false. Если есть обработчики 'data', когда 'readable' удален, поток начнет работать, т.е. события 'data' будут генерироваться без вызова .resume().

Событие: 'resume'

Добавлено в: v0.9.4

Событие 'resume' генерируется, когда вызывается stream.resume(), а readableFlowing не равно true.

readable.destroy([error])

[История]

ВерсияИзменения
v14.0.0Работает как no-op для потока, который уже был уничтожен.
v8.0.0Добавлено в: v8.0.0
  • error <Error> Ошибка, которая будет передана в качестве полезной нагрузки в событии 'error'
  • Возвращает: <this>

Уничтожает поток. По желанию генерирует событие 'error' и событие 'close' (если emitClose не установлено в false). После этого вызова читаемый поток освободит все внутренние ресурсы, и последующие вызовы push() будут игнорироваться.

После вызова destroy() любые дальнейшие вызовы будут no-op, и не будет генерироваться никаких дальнейших ошибок, за исключением ошибок из _destroy(), которые могут быть выведены как 'error'.

Разработчики не должны переопределять этот метод, а вместо этого должны реализовать readable._destroy().

readable.closed

Добавлено в: v18.0.0

Равно true после того, как было сгенерировано событие 'close'.

readable.destroyed

Добавлено в: v8.0.0

Равно true после вызова readable.destroy().

readable.isPaused()

Добавлено в: v0.11.14

Метод readable.isPaused() возвращает текущее состояние работы Readable. Он используется в основном механизмом, лежащим в основе метода readable.pipe(). В большинстве типичных случаев нет необходимости использовать этот метод напрямую.

js
const readable = new stream.Readable()

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()

Добавлен в: v0.9.4

Метод readable.pause() заставляет поток в режиме потока прекратить отправку событий 'data', выключая режим потока. Любые доступные данные останутся во внутреннем буфере.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Получено ${chunk.length} байт данных.`)
  readable.pause()
  console.log('В течение 1 секунды дополнительных данных не будет.')
  setTimeout(() => {
    console.log('Теперь данные начнут поступать снова.')
    readable.resume()
  }, 1000)
})

Метод readable.pause() не оказывает никакого эффекта, если есть обработчик событий 'readable'.

readable.pipe(destination[, options])

Добавлен в: v0.9.4

  • destination <stream.Writable> Цель для записи данных

  • options <Object> Параметры конвейера

    • end <boolean> Завершить запись, когда завершается чтение. По умолчанию: true.
  • Возвращает: <stream.Writable> destination, что позволяет создавать цепочки конвейеров, если это поток Duplex или Transform

Метод readable.pipe() присоединяет поток Writable к readable, заставляя его автоматически переключаться в режим потока и передавать все свои данные в присоединенный поток Writable. Поток данных будет автоматически управляться, чтобы поток назначения Writable не был перегружен более быстрым потоком Readable.

В следующем примере все данные из readable записываются в файл с именем file.txt:

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Все данные из readable записываются в 'file.txt'.
readable.pipe(writable)

Можно присоединять несколько потоков Writable к одному потоку Readable.

Метод readable.pipe() возвращает ссылку на поток destination, что позволяет создавать цепочки связанных потоков:

js
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)

По умолчанию, stream.end() вызывается в потоке назначения Writable, когда исходный поток Readable отправляет событие 'end', так что поток назначения больше не будет доступен для записи. Чтобы отключить это поведение по умолчанию, можно передать параметр end со значением false, что приведет к тому, что поток назначения останется открытым:

js
reader.pipe(writer, { end: false })
reader.on('end', () => {
  writer.end('Goodbye\n')
})

Важно отметить, что если поток Readable отправляет ошибку во время обработки, поток назначения Writable не закрывается автоматически. Если возникает ошибка, необходимо вручную закрыть каждый поток, чтобы предотвратить утечки памяти.

Потоки Writable process.stderr и process.stdout никогда не закрываются до завершения процесса Node.js, независимо от указанных параметров.

readable.read([size])

Добавлено в: v0.9.4

  • size <number> Необязательный аргумент, указывающий, сколько данных нужно прочитать.
  • Возвращает: <string> | <Buffer> | <null> | <any>

Метод readable.read() считывает данные из внутреннего буфера и возвращает их. Если доступных для чтения данных нет, возвращается null. По умолчанию данные возвращаются в виде объекта Buffer, если только кодировка не была указана с помощью метода readable.setEncoding() или поток не работает в объектном режиме.

Необязательный аргумент size указывает конкретное количество байтов для чтения. Если size байтов недоступно для чтения, возвращается null, за исключением случая завершения потока, в котором случае будут возвращены все оставшиеся данные во внутреннем буфере.

Если аргумент size не указан, возвращаются все данные, содержащиеся во внутреннем буфере.

Аргумент size должен быть меньше или равен 1 ГиБ.

Метод readable.read() должен вызываться только для потоков Readable, работающих в приостановленном режиме. В режиме потока readable.read() вызывается автоматически, пока внутренний буфер полностью не опустошен.

js
const readable = getReadableStreamSomehow()

// 'readable' может срабатывать несколько раз по мере буферизации данных
readable.on('readable', () => {
  let chunk
  console.log('Поток доступен для чтения (новые данные получены в буфер)')
  // Используем цикл, чтобы убедиться, что мы прочитали все доступные данные
  while (null !== (chunk = readable.read())) {
    console.log(`Прочитано ${chunk.length} байт данных...`)
  }
})

// 'end' будет вызван один раз, когда больше не будет доступных данных
readable.on('end', () => {
  console.log('Достигнут конец потока.')
})

Каждый вызов readable.read() возвращает фрагмент данных или null, что означает, что в данный момент больше нет данных для чтения. Эти фрагменты не объединяются автоматически. Поскольку один вызов read() не возвращает все данные, для непрерывного чтения фрагментов до получения всех данных может потребоваться использование цикла while. При чтении большого файла .read() может временно возвращать null, указывая на то, что он использовал все буферизованное содержимое, но могут быть еще данные, которые еще не буферизованы. В таких случаях новое событие 'readable' генерируется, как только в буфере появляются новые данные, а событие 'end' сигнализирует о завершении передачи данных.

Поэтому для чтения всего содержимого файла из readable необходимо собирать фрагменты из нескольких событий 'readable':

js
const chunks = []

readable.on('readable', () => {
  let chunk
  while (null !== (chunk = readable.read())) {
    chunks.push(chunk)
  }
})

readable.on('end', () => {
  const content = chunks.join('')
})

Поток Readable в объектном режиме всегда возвращает один элемент из вызова readable.read(size), независимо от значения аргумента size.

Если метод readable.read() возвращает фрагмент данных, событие 'data' также будет генерироваться.

Вызов stream.read([size]) после того, как событие 'end' было сгенерировано, вернет null. Ошибка времени выполнения не будет вызвана.

readable.readable

Добавлен в: v11.4.0

Имеет значение true, если безопасен вызов readable.read(), что означает, что поток не был уничтожен и не выдал события 'error' или 'end'.

readable.readableAborted

Добавлен в: v16.8.0

[Стабильность: 1 - Экспериментальный]

Стабильность: 1 Стабильность: 1 - Экспериментальный

Возвращает, был ли поток уничтожен или произошла ошибка до выдачи события 'end'.

readable.readableDidRead

Добавлен в: v16.7.0, v14.18.0

[Стабильность: 1 - Экспериментальный]

Стабильность: 1 Стабильность: 1 - Экспериментальный

Возвращает, было ли выдано событие 'data'.

readable.readableEncoding

Добавлен в: v12.7.0

Геттер для свойства encoding данного потока Readable. Свойство encoding может быть установлено с помощью метода readable.setEncoding().

readable.readableEnded

Добавлен в: v12.9.0

Принимает значение true, когда выдаётся событие 'end'.

readable.errored

Добавлен в: v18.0.0

Возвращает ошибку, если поток был уничтожен с ошибкой.

readable.readableFlowing

Добавлен в: v9.4.0

Это свойство отражает текущее состояние потока Readable, как описано в разделе Три состояния.

readable.readableHighWaterMark

Добавлен в: v9.3.0

Возвращает значение highWaterMark, переданное при создании этого Readable.

readable.readableLength

Добавлен в: v9.4.0

Это свойство содержит количество байтов (или объектов) в очереди, готовых к чтению. Значение предоставляет данные для интроспекции относительно состояния highWaterMark.

readable.readableObjectMode

Добавлен в: v12.3.0

Геттер для свойства objectMode данного потока Readable.

readable.resume()

[История]

ВерсияИзменения
v10.0.0resume() не оказывает никакого эффекта, если есть слушатель события 'readable'.
v0.9.4Добавлено в: v0.9.4

Метод readable.resume() заставляет явно приостановленный поток Readable возобновить излучение событий 'data', переключая поток в режим потока.

Метод readable.resume() может использоваться для полного потребления данных из потока без фактической обработки каких-либо из этих данных:

js
getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Достигнут конец, но ничего не прочитано.')
  })

Метод readable.resume() не оказывает никакого эффекта, если есть слушатель события 'readable'.

readable.setEncoding(encoding)

Добавлен в: v0.9.4

  • encoding <строка> Кодировка для использования.
  • Возвращает: <this>

Метод readable.setEncoding() устанавливает кодировку символов для данных, считываемых из потока Readable.

По умолчанию кодировка не назначена, и данные потока будут возвращены в виде объектов Buffer. Установка кодировки приводит к тому, что данные потока возвращаются в виде строк указанной кодировки, а не в виде объектов Buffer. Например, вызов readable.setEncoding('utf8') приведет к тому, что выходные данные будут интерпретироваться как данные UTF-8 и передаваться как строки. Вызов readable.setEncoding('hex') приведет к кодированию данных в шестнадцатеричном строковом формате.

Поток Readable будет правильно обрабатывать многобайтовые символы, передаваемые через поток, которые в противном случае были бы неправильно декодированы, если бы просто извлекались из потока как объекты Buffer.

js
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
  assert.equal(typeof chunk, 'string')
  console.log('Получено %d символов строковых данных:', chunk.length)
})
readable.unpipe([destination])

Добавлено в: v0.9.4

  • destination <stream.Writable> Необязательный конкретный поток для отключения
  • Возвращает: <this>

Метод readable.unpipe() отсоединяет поток Writable, ранее подключенный с помощью метода stream.pipe().

Если destination не указан, то отсоединяются все каналы.

Если destination указан, но для него не установлен канал, метод ничего не делает.

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Все данные из readable поступают в 'file.txt',
// но только в течение первой секунды.
readable.pipe(writable)
setTimeout(() => {
  console.log('Прекращение записи в file.txt.')
  readable.unpipe(writable)
  console.log('Ручное закрытие потока файла.')
  writable.end()
}, 1000)
readable.unshift(chunk[, encoding])

[История]

ВерсияИзменения
v22.0.0, v20.13.0Аргумент chunk теперь может быть экземпляром TypedArray или DataView.
v8.0.0Аргумент chunk теперь может быть экземпляром Uint8Array.
v0.9.11Добавлено в: v0.9.11
  • chunk <Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Фрагмент данных для добавления в начало очереди чтения. Для потоков, не работающих в объективном режиме, chunk должен быть <string>, <Buffer>, <TypedArray>, <DataView> или null. Для потоков в объективном режиме chunk может быть любым значением JavaScript.
  • encoding <string> Кодировка строковых фрагментов. Должна быть допустимой кодировкой Buffer, например, 'utf8' или 'ascii'.

Передача chunk как null сигнализирует о конце потока (EOF) и ведет себя так же, как readable.push(null), после чего больше данных записать нельзя. Сигнал EOF помещается в конец буфера, и все буферизованные данные все равно будут отправлены.

Метод readable.unshift() добавляет фрагмент данных в начало внутреннего буфера. Это полезно в некоторых ситуациях, когда поток потребляется кодом, которому нужно «отменить потребление» некоторого количества данных, которые он оптимистично извлек из источника, чтобы эти данные можно было передать другой стороне.

Метод stream.unshift(chunk) нельзя вызывать после того, как событие 'end' было отправлено, иначе будет выброшено исключение времени выполнения.

Разработчикам, использующим stream.unshift(), часто следует рассмотреть возможность перехода на использование потока Transform вместо этого. Дополнительные сведения см. в разделе API для разработчиков потоков.

js
// Извлечь заголовок, ограниченный \n\n.
// Использовать unshift(), если получим слишком много.
// Вызвать обратный вызов с (ошибка, заголовок, поток).
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
  stream.on('error', callback)
  stream.on('readable', onReadable)
  const decoder = new StringDecoder('utf8')
  let header = ''
  function onReadable() {
    let chunk
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk)
      if (str.includes('\n\n')) {
        // Найдена граница заголовка.
        const split = str.split(/\n\n/)
        header += split.shift()
        const remaining = split.join('\n\n')
        const buf = Buffer.from(remaining, 'utf8')
        stream.removeListener('error', callback)
        // Удалить прослушиватель 'readable' перед unshifting.
        stream.removeListener('readable', onReadable)
        if (buf.length) stream.unshift(buf)
        // Теперь тело сообщения можно прочитать из потока.
        callback(null, header, stream)
        return
      }
      // Продолжается чтение заголовка.
      header += str
    }
  }
}

В отличие от stream.push(chunk), stream.unshift(chunk) не завершит процесс чтения, сбросив внутреннее состояние чтения потока. Это может привести к неожиданным результатам, если readable.unshift() вызывается во время чтения (т. е. из реализации stream._read() в пользовательском потоке). Вызов readable.unshift() с последующим немедленным вызовом stream.push('') правильно сбросит состояние чтения, однако лучше просто избегать вызова readable.unshift(), находясь в процессе чтения.

readable.wrap(stream)

Добавлено в: v0.9.4

  • stream <Stream> Поток чтения "старого стиля"
  • Возвращает: <this>

До Node.js 0.10 потоки не реализовывали весь API модуля node:stream в том виде, в котором он определен сейчас. (См. Совместимость для получения дополнительной информации.)

При использовании старой библиотеки Node.js, которая генерирует события 'data' и имеет метод stream.pause(), который является только рекомендательным, метод readable.wrap() может использоваться для создания потока Readable, который использует старый поток в качестве источника данных.

В редких случаях потребуется использовать readable.wrap(), но этот метод был предоставлен для удобства взаимодействия со старыми приложениями и библиотеками Node.js.

js
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)

myReader.on('readable', () => {
  myReader.read() // и т.д.
})
readable[Symbol.asyncIterator]()

[История]

ВерсияИзменения
v11.14.0Поддержка Symbol.asyncIterator больше не является экспериментальной.
v10.0.0Добавлено в: v10.0.0
  • Возвращает: <AsyncIterator> для полного потребления потока.
js
const fs = require('node:fs')

async function print(readable) {
  readable.setEncoding('utf8')
  let data = ''
  for await (const chunk of readable) {
    data += chunk
  }
  console.log(data)
}

print(fs.createReadStream('file')).catch(console.error)

Если цикл завершается с помощью break, return или throw, поток будет уничтожен. Иными словами, итерация по потоку полностью его потребляет. Поток будет считываться фрагментами размером, равным параметру highWaterMark. В приведенном выше примере кода данные будут находиться в одном фрагменте, если файл содержит менее 64 КБ данных, поскольку параметр highWaterMark не указан для fs.createReadStream().

readable[Symbol.asyncDispose]()

Добавлено в: v20.4.0, v18.18.0

[Стабильность: 1 - Экспериментальный]

Стабильность: 1 Стабильность: 1 - Экспериментальный

Вызывает readable.destroy() с AbortError и возвращает промис, который выполняется, когда поток завершен.

readable.compose(stream[, options])

Добавлено в: v19.1.0, v18.13.0

[Стабильность: 1 - Экспериментальный]

Стабильность: 1 Стабильность: 1 - Экспериментальный

js
import { Readable } from 'node:stream'

async function* splitToWords(source) {
  for await (const chunk of source) {
    const words = String(chunk).split(' ')

    for (const word of words) {
      yield word
    }
  }
}

const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()

console.log(words) // выведет ['this', 'is', 'compose', 'as', 'operator']

См. stream.compose для получения дополнительной информации.

readable.iterator([options])

Добавлено в: v16.3.0

[Стабильность: 1 - Экспериментальный]

Стабильность: 1 Стабильность: 1 - Экспериментальный

  • options <Объект>

    • destroyOnReturn <булево> Если установлено значение false, вызов return в асинхронном итераторе или выход из итерации for await...of с помощью break, return или throw не уничтожит поток. По умолчанию: true.
  • Возвращает: <Асинхронный итератор> для потребления потока.

Итератор, созданный этим методом, предоставляет пользователям возможность отменить уничтожение потока, если цикл for await...of завершен с помощью return, break или throw, или если итератор должен уничтожить поток, если поток выдал ошибку во время итерации.

js
const { Readable } = require('node:stream')

async function printIterator(readable) {
  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk) // 1
    break
  }

  console.log(readable.destroyed) // false

  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk) // Выведет 2, а затем 3
  }

  console.log(readable.destroyed) // True, поток был полностью потреблён
}

async function printSymbolAsyncIterator(readable) {
  for await (const chunk of readable) {
    console.log(chunk) // 1
    break
  }

  console.log(readable.destroyed) // true
}

async function showBoth() {
  await printIterator(Readable.from([1, 2, 3]))
  await printSymbolAsyncIterator(Readable.from([1, 2, 3]))
}

showBoth()
readable.map(fn[, options])

[История]

ВерсияИзменения
v20.7.0, v18.19.0добавлен highWaterMark в options.
v17.4.0, v16.14.0Добавлено в: v17.4.0, v16.14.0

[Стабильность: 1 - Экспериментально]

Стабильность: 1 Стабильность: 1 - Экспериментально

  • fn <Function> | <AsyncFunction> функция для отображения каждого фрагмента в потоке.

    • data <any> фрагмент данных из потока.
    • options <Object>
    • signal <AbortSignal> прерывается, если поток уничтожается, что позволяет прервать вызов fn раньше времени.
  • options <Object>

    • concurrency <number> максимальное количество одновременных вызовов fn в потоке. По умолчанию: 1.
    • highWaterMark <number> сколько элементов буферизовать в ожидании потребления пользователем отображаемых элементов. По умолчанию: concurrency * 2 - 1.
    • signal <AbortSignal> позволяет уничтожить поток, если сигнал прерван.
  • Возвращает: <Readable> поток, отображенный с помощью функции fn.

Этот метод позволяет отображать поток. Функция fn будет вызываться для каждого фрагмента в потоке. Если функция fn возвращает promise — этот promise будет awaited перед передачей в результирующий поток.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// С синхронным отображателем.
for await (const chunk of Readable.from([1, 2, 3, 4]).map(x => x * 2)) {
  console.log(chunk) // 2, 4, 6, 8
}
// С асинхронным отображателем, выполняя не более 2 запросов одновременно.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
  domain => resolver.resolve4(domain),
  { concurrency: 2 }
)
for await (const result of dnsResults) {
  console.log(result) // Выводит результат DNS resolver.resolve4.
}
readable.filter(fn[, options])

[История]

ВерсияИзменения
v20.7.0, v18.19.0Добавлен highWaterMark в options.
v17.4.0, v16.14.0Добавлено в: v17.4.0, v16.14.0

[Стабильность: 1 - Экспериментальный]

Стабильность: 1 Стабильность: 1 - Экспериментальный

  • fn <Функция> | <AsyncFunction> функция для фильтрации фрагментов из потока.

    • data <любой> фрагмент данных из потока.
    • options <Объект>
    • signal <AbortSignal> прерывается, если поток уничтожен, что позволяет прервать вызов fn раньше времени.
  • options <Объект>

    • concurrency <число> максимальное количество одновременных вызовов fn для потока. По умолчанию: 1.
    • highWaterMark <число> сколько элементов буферизовать в ожидании потребления отфильтрованных элементов пользователем. По умолчанию: concurrency * 2 - 1.
    • signal <AbortSignal> позволяет уничтожить поток, если сигнал прерван.
  • Возвращает: <Readable> поток, отфильтрованный с помощью предиката fn.

Этот метод позволяет фильтровать поток. Для каждого фрагмента в потоке будет вызвана функция fn, и если она возвращает истинное значение, фрагмент будет передан в результирующий поток. Если функция fn возвращает promise, этот promise будет awaited.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// С синхронным предикатом.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
  console.log(chunk) // 3, 4
}
// С асинхронным предикатом, выполняя максимум 2 запроса одновременно.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).filter(
  async domain => {
    const { address } = await resolver.resolve4(domain, { ttl: true })
    return address.ttl > 60
  },
  { concurrency: 2 }
)
for await (const result of dnsResults) {
  // Выводит домены с временем жизни записи DNS более 60 секунд.
  console.log(result)
}
readable.forEach(fn[, options])

Добавлено в: v17.5.0, v16.15.0

[Стабильно: 1 - Экспериментально]

Стабильно: 1 Стабильность: 1 - Экспериментально

  • fn <Function> | <AsyncFunction> функция, вызываемая для каждого фрагмента потока.

    • data <any> фрагмент данных из потока.
    • options <Object>
    • signal <AbortSignal> прерывается, если поток уничтожен, что позволяет прервать вызов fn раньше времени.
  • options <Object>

    • concurrency <number> максимальное количество одновременных вызовов fn для потока. По умолчанию: 1.
    • signal <AbortSignal> позволяет уничтожить поток, если сигнал прерван.
  • Возвращает: <Promise> промис, который завершается, когда поток завершен.

Этот метод позволяет итерировать поток. Для каждого фрагмента в потоке будет вызвана функция fn. Если функция fn возвращает промис, этот промис будет awaited.

Этот метод отличается от циклов for await...of тем, что он может обрабатывать фрагменты одновременно. Кроме того, итерация forEach может быть остановлена только путем передачи параметра signal и прерывания связанного AbortController, тогда как for await...of может быть остановлена с помощью break или return. В любом случае поток будет уничтожен.

Этот метод отличается от прослушивания события 'data' тем, что он использует событие readable в базовом механизме и может ограничивать количество одновременных вызовов fn.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// С синхронным предикатом.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
  console.log(chunk) // 3, 4
}
// С асинхронным предикатом, выполняя максимум 2 запроса одновременно.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
  async domain => {
    const { address } = await resolver.resolve4(domain, { ttl: true })
    return address
  },
  { concurrency: 2 }
)
await dnsResults.forEach(result => {
  // Выводит результат, аналогично `for await (const result of dnsResults)`
  console.log(result)
})
console.log('done') // Поток завершен
readable.toArray([options])

Добавлено в: v17.5.0, v16.15.0

[Стабильность: 1 - Экспериментальный]

Стабильность: 1 Стабильность: 1 - Экспериментальный

  • options <Object>

    • signal <AbortSignal> позволяет отменить операцию toArray, если сигнал прерывается.
  • Возвращает: <Promise> promise, содержащий массив с содержимым потока.

Этот метод позволяет легко получить содержимое потока.

Поскольку этот метод считывает весь поток в память, он сводит на нет преимущества потоков. Он предназначен для обеспечения совместимости и удобства, а не в качестве основного способа использования потоков.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

await Readable.from([1, 2, 3, 4]).toArray() // [1, 2, 3, 4]

// Выполняем запросы DNS одновременно с помощью .map и собираем
// результаты в массив с помощью toArray
const dnsResults = await Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org'])
  .map(
    async domain => {
      const { address } = await resolver.resolve4(domain, { ttl: true })
      return address
    },
    { concurrency: 2 }
  )
  .toArray()
readable.some(fn[, options])

Добавлено в: v17.5.0, v16.15.0

[Стабильность: 1 - Экспериментальный]

Стабильность: 1 Стабильность: 1 - Экспериментальный

  • fn <Function> | <AsyncFunction> функция, вызываемая для каждого фрагмента потока.

    • data <any> фрагмент данных из потока.
    • options <Object>
    • signal <AbortSignal> прерывается, если поток уничтожается, что позволяет прервать вызов fn раньше времени.
  • options <Object>

    • concurrency <number> максимальное количество одновременных вызовов fn для потока одновременно. По умолчанию: 1.
    • signal <AbortSignal> позволяет уничтожить поток, если сигнал прерывается.
  • Возвращает: <Promise> promise, вычисляющийся в true, если fn возвратил истинное значение хотя бы для одного из фрагментов.

Этот метод похож на Array.prototype.some и вызывает fn для каждого фрагмента в потоке, пока ожидаемое возвращаемое значение не станет true (или любым истинным значением). Как только ожидаемое возвращаемое значение вызова fn для фрагмента является истинным, поток уничтожается, и promise выполняется с true. Если ни один из вызовов fn для фрагментов не возвращает истинное значение, promise выполняется с false.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// С синхронным предикатом.
await Readable.from([1, 2, 3, 4]).some(x => x > 2) // true
await Readable.from([1, 2, 3, 4]).some(x => x < 0) // false

// С асинхронным предикатом, выполняя не более 2 проверок файлов одновременно.
const anyBigFile = await Readable.from(['file1', 'file2', 'file3']).some(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
console.log(anyBigFile) // `true`, если любой файл в списке больше 1 МБ
console.log('done') // Поток завершен
readable.find(fn[, options])

Добавлено в: v17.5.0, v16.17.0

[Стабильно: 1 - Экспериментально]

Стабильно: 1 Стабильность: 1 - Экспериментально

  • fn <Function> | <AsyncFunction> функция, вызываемая для каждого фрагмента потока.

    • data <any> фрагмент данных из потока.
    • options <Object>
    • signal <AbortSignal> прерывается, если поток уничтожается, что позволяет преждевременно прервать вызов fn.
  • options <Object>

    • concurrency <number> максимальное количество одновременных вызовов fn в потоке. По умолчанию: 1.
    • signal <AbortSignal> позволяет уничтожить поток, если сигнал прерван.
  • Возвращает: <Promise> promise, вычисляемый в первый фрагмент, для которого fn вычислил истинное значение, или undefined, если элемент не найден.

Этот метод аналогичен Array.prototype.find и вызывает fn для каждого фрагмента в потоке, чтобы найти фрагмент с истинным значением для fn. Как только ожидаемое возвращаемое значение вызова fn истинно, поток уничтожается, и promise выполняется со значением, для которого fn вернул истинное значение. Если все вызовы fn для фрагментов возвращают ложное значение, promise выполняется с undefined.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// С синхронным предикатом.
await Readable.from([1, 2, 3, 4]).find(x => x > 2) // 3
await Readable.from([1, 2, 3, 4]).find(x => x > 0) // 1
await Readable.from([1, 2, 3, 4]).find(x => x > 10) // undefined

// С асинхронным предикатом, выполняя не более 2 проверок файлов одновременно.
const foundBigFile = await Readable.from(['file1', 'file2', 'file3']).find(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
console.log(foundBigFile) // Имя файла большого файла, если какой-либо файл в списке больше 1 МБ
console.log('done') // Поток завершен
readable.every(fn[, options])

Добавлено в: v17.5.0, v16.15.0

[Стабильно: 1 - Экспериментально]

Стабильно: 1 Стабильность: 1 - Экспериментально

  • fn <Function> | <AsyncFunction> функция, вызываемая для каждого фрагмента потока.

    • data <any> фрагмент данных из потока.
    • options <Object>
    • signal <AbortSignal> прерывается, если поток уничтожен, что позволяет прервать вызов fn раньше времени.
  • options <Object>

    • concurrency <number> максимальное количество одновременных вызовов fn в потоке. По умолчанию: 1.
    • signal <AbortSignal> позволяет уничтожить поток, если сигнал прерывается.
  • Возвращает: <Promise> promise, вычисляющийся в true, если fn вернул истинное значение для всех фрагментов.

Этот метод похож на Array.prototype.every и вызывает fn для каждого фрагмента в потоке, чтобы проверить, являются ли все ожидаемые возвращаемые значения истинными значениями для fn. Как только возвращаемое значение вызова fn для фрагмента является ложным, поток уничтожается, и promise выполняется с false. Если все вызовы fn для фрагментов возвращают истинное значение, promise выполняется с true.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// С синхронным предикатом.
await Readable.from([1, 2, 3, 4]).every(x => x > 2) // false
await Readable.from([1, 2, 3, 4]).every(x => x > 0) // true

// С асинхронным предикатом, выполняя не более 2 проверок файлов одновременно.
const allBigFiles = await Readable.from(['file1', 'file2', 'file3']).every(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
// `true`, если все файлы в списке больше 1 MiB
console.log(allBigFiles)
console.log('done') // Поток завершен
readable.flatMap(fn[, options])

Добавлено в: v17.5.0, v16.15.0

[Стабильно: 1 - Экспериментально]

Стабильно: 1 Стабильность: 1 - Экспериментально

  • fn <Function> | <AsyncGeneratorFunction> | <AsyncFunction> функция для отображения каждого фрагмента в потоке.

    • data <any> фрагмент данных из потока.
    • options <Object>
    • signal <AbortSignal> прерывается, если поток уничтожается, что позволяет прервать вызов fn раньше времени.
  • options <Object>

    • concurrency <number> максимальное количество одновременных вызовов fn в потоке. По умолчанию: 1.
    • signal <AbortSignal> позволяет уничтожить поток, если сигнал прерывается.
  • Возвращает: <Readable> поток, преобразованный с помощью функции fn.

Этот метод возвращает новый поток, применяя заданный обратный вызов к каждому фрагменту потока, а затем выравнивая результат.

Можно возвращать поток или другую итерируемую или асинхронную итерируемую сущность из fn, и результирующие потоки будут объединены (выровнены) в возвращаемый поток.

js
import { Readable } from 'node:stream'
import { createReadStream } from 'node:fs'

// С синхронным отображателем.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap(x => [x, x])) {
  console.log(chunk) // 1, 1, 2, 2, 3, 3, 4, 4
}
// С асинхронным отображателем, объединяем содержимое 4 файлов
const concatResult = Readable.from(['./1.mjs', './2.mjs', './3.mjs', './4.mjs']).flatMap(fileName =>
  createReadStream(fileName)
)
for await (const result of concatResult) {
  // Это будет содержать содержимое (все фрагменты) всех 4 файлов
  console.log(result)
}
readable.drop(limit[, options])

Добавлено в: v17.5.0, v16.15.0

[Стабильность: 1 - Экспериментально]

Стабильность: 1 Стабильность: 1 - Экспериментально

  • limit <number> количество фрагментов, которые нужно пропустить в потоке чтения.

  • options <Object>

    • signal <AbortSignal> позволяет уничтожить поток, если сигнал прерван.
  • Возвращает: <Readable> поток с пропущенными limit фрагментами.

Этот метод возвращает новый поток с пропущенными первыми limit фрагментами.

js
import { Readable } from 'node:stream'

await Readable.from([1, 2, 3, 4]).drop(2).toArray() // [3, 4]
readable.take(limit[, options])

Добавлено в: v17.5.0, v16.15.0

[Стабильность: 1 - Экспериментально]

Стабильность: 1 Стабильность: 1 - Экспериментально

  • limit <number> количество фрагментов, которые нужно взять из потока чтения.

  • options <Object>

    • signal <AbortSignal> позволяет уничтожить поток, если сигнал прерван.
  • Возвращает: <Readable> поток с взятыми limit фрагментами.

Этот метод возвращает новый поток с первыми limit фрагментами.

js
import { Readable } from 'node:stream'

await Readable.from([1, 2, 3, 4]).take(2).toArray() // [1, 2]
readable.reduce(fn[, initial[, options]])

Добавлено в: v17.5.0, v16.15.0

[Стабильность: 1 - Экспериментально]

Стабильность: 1 Стабильность: 1 - Экспериментально

  • fn <Function> | <AsyncFunction> функция-редуктор, вызываемая для каждого фрагмента в потоке.

    • previous <any> значение, полученное из последнего вызова fn или значение initial, если оно указано, или первый фрагмент потока в противном случае.
    • data <any> фрагмент данных из потока.
    • options <Object>
    • signal <AbortSignal> прерывается, если поток уничтожен, что позволяет прервать вызов fn раньше времени.
  • initial <any> начальное значение для использования в редукции.

  • options <Object>

    • signal <AbortSignal> позволяет уничтожить поток, если сигнал прерван.
  • Возвращает: <Promise> промис для конечного значения редукции.

Этот метод вызывает fn для каждого фрагмента потока по порядку, передавая ему результат вычисления предыдущего элемента. Он возвращает промис для конечного значения редукции.

Если значение initial не указано, в качестве начального значения используется первый фрагмент потока. Если поток пуст, промис отклоняется с TypeError со свойством кода ERR_INVALID_ARGS.

js
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'

const directoryPath = './src'
const filesInDir = await readdir(directoryPath)

const folderSize = await Readable.from(filesInDir).reduce(async (totalSize, file) => {
  const { size } = await stat(join(directoryPath, file))
  return totalSize + size
}, 0)

console.log(folderSize)

Функция-редуктор итерирует элементы потока по одному, что означает, что нет параметра concurrency или параллелизма. Для выполнения reduce одновременно можно извлечь асинхронную функцию в метод readable.map.

js
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'

const directoryPath = './src'
const filesInDir = await readdir(directoryPath)

const folderSize = await Readable.from(filesInDir)
  .map(file => stat(join(directoryPath, file)), { concurrency: 2 })
  .reduce((totalSize, { size }) => totalSize + size, 0)

console.log(folderSize)

Дуплексные и преобразующие потоки

Класс: stream.Duplex

[История]

ВерсияИзменения
v6.8.0Экземпляры Duplex теперь возвращают true при проверке instanceof stream.Writable.
v0.9.4Добавлено в: v0.9.4

Дуплексные потоки — это потоки, реализующие как интерфейс Readable, так и Writable.

Примеры Duplex потоков:

duplex.allowHalfOpen

Добавлен в: v0.9.4

Если false, то поток автоматически завершит запись, когда завершится чтение. Изначально устанавливается с помощью параметра конструктора allowHalfOpen, который по умолчанию имеет значение true.

Это можно изменить вручную, чтобы изменить поведение полуоткрытого соединения существующего экземпляра потока Duplex, но это должно быть изменено до того, как будет вызвано событие 'end'.

Класс: stream.Transform

Добавлен в: v0.9.4

Преобразующие потоки — это Duplex потоки, где вывод каким-либо образом связан с вводом. Как и все Duplex потоки, Transform потоки реализуют как интерфейс Readable, так и Writable.

Примеры Transform потоков:

transform.destroy([error])

[История]

ВерсияИзменения
v14.0.0Работает как no-op в потоке, который уже был уничтожен.
v8.0.0Добавлено в: v8.0.0

Уничтожает поток и, при необходимости, генерирует событие 'error'. После этого вызова преобразующий поток освободит все внутренние ресурсы. Разработчики не должны переопределять этот метод, а вместо этого реализовать readable._destroy(). Реализация по умолчанию _destroy() для Transform также генерирует 'close', если emitClose не установлен в false.

После вызова destroy() любые дальнейшие вызовы будут являться no-op, и никакие другие ошибки, кроме ошибок из _destroy(), не будут генерироваться как 'error'.

stream.duplexPair([options])

Добавлено в: v22.6.0, v20.17.0

  • options <Object> Значение, передаваемое обоим конструкторам Duplex для установки параметров, таких как буферизация.
  • Возвращает: <Array> из двух экземпляров Duplex.

Вспомогательная функция duplexPair возвращает массив из двух элементов, каждый из которых представляет собой поток Duplex, подключенный к другой стороне:

js
const [sideA, sideB] = duplexPair()

Все, что записывается в один поток, становится доступным для чтения в другом. Он обеспечивает поведение, аналогичное сетевому соединению, где данные, записанные клиентом, становятся доступными для чтения сервером и наоборот.

Потоки Duplex симметричны; один или другой может использоваться без каких-либо различий в поведении.

stream.finished(stream[, options], callback)

[История]

ВерсияИзменения
v19.5.0Добавлена поддержка ReadableStream и WritableStream.
v15.11.0Добавлен параметр signal.
v14.0.0finished(stream, cb) будет ожидать события 'close' перед вызовом обратного вызова. Реализация пытается обнаружить устаревшие потоки и применять это поведение только к потокам, которые, как ожидается, будут генерировать событие 'close'.
v14.0.0Генерация события 'close' до 'end' в потоке Readable вызовет ошибку ERR_STREAM_PREMATURE_CLOSE.
v14.0.0Обратный вызов будет вызван для потоков, которые уже завершены до вызова finished(stream, cb).
v10.0.0Добавлено в: v10.0.0
  • stream <Stream> | <ReadableStream> | <WritableStream> Читаемый и/или записываемый поток/веб-поток.

  • options <Object>

    • error <boolean> Если установлено в false, то вызов emit('error', err) не рассматривается как завершенный. По умолчанию: true.
    • readable <boolean> Если установлено в false, обратный вызов будет вызван при завершении потока, даже если поток может быть еще доступен для чтения. По умолчанию: true.
    • writable <boolean> Если установлено в false, обратный вызов будет вызван при завершении потока, даже если поток может быть еще доступен для записи. По умолчанию: true.
    • signal <AbortSignal> позволяет прервать ожидание завершения потока. Базовый поток не будет прерван, если сигнал будет прерван. Обратный вызов будет вызван с AbortError. Все зарегистрированные слушатели, добавленные этой функцией, также будут удалены.
  • callback <Function> Функция обратного вызова, которая принимает необязательный аргумент ошибки.

  • Возвращает: <Function> Функция очистки, которая удаляет всех зарегистрированных слушателей.

Функция для получения уведомления о том, что поток больше не доступен для чтения, записи или произошла ошибка или событие преждевременного закрытия.

js
const { finished } = require('node:stream')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

finished(rs, err => {
  if (err) {
    console.error('Поток завершился с ошибкой.', err)
  } else {
    console.log('Поток завершил чтение.')
  }
})

rs.resume() // Осушаем поток.

Особенно полезно в сценариях обработки ошибок, когда поток уничтожается преждевременно (например, прерванный HTTP-запрос) и не генерирует события 'end' или 'finish'.

API finished предоставляет версию с promise.

stream.finished() оставляет висячие обработчики событий (в частности, 'error', 'end', 'finish' и 'close') после вызова callback. Причина этого заключается в том, что непредвиденные события 'error' (из-за неправильных реализаций потоков) не вызывают неожиданных сбоев. Если это нежелательное поведение, то необходимо вызвать возвращаемую функцию очистки в обратном вызове:

js
const cleanup = finished(rs, err => {
  cleanup()
  // ...
})

stream.pipeline(source[, ...transforms], destination, callback)

stream.pipeline(streams, callback)

[История]

ВерсияИзменения
v19.7.0, v18.16.0Добавлена поддержка webstreams.
v18.0.0Передача недопустимого обратного вызова в аргумент callback теперь вызывает ERR_INVALID_ARG_TYPE вместо ERR_INVALID_CALLBACK.
v14.0.0pipeline(..., cb) будет ждать события 'close', прежде чем вызывать обратный вызов. Реализация пытается обнаружить устаревшие потоки и применять это поведение только к потокам, которые, как ожидается, будут генерировать событие 'close'.
v13.10.0Добавлена поддержка асинхронных генераторов.
v10.0.0Добавлено в: v10.0.0

Метод модуля для передачи данных между потоками и генераторами, перенаправляющий ошибки, обеспечивающий правильную очистку и предоставляющий обратный вызов по завершении конвейера.

js
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')

// Используйте API pipeline для простого соединения последовательности потоков
// и получения уведомления о полном завершении конвейера.

// Конвейер для эффективной gzip-сжатия потенциально огромного tar-файла:

pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
  if (err) {
    console.error('Конвейер завершился с ошибкой.', err)
  } else {
    console.log('Конвейер завершен успешно.')
  }
})

API pipeline предоставляет версию с promise.

stream.pipeline() вызовет stream.destroy(err) для всех потоков, за исключением:

  • Потоков Readable, которые выпустили событие 'end' или 'close'.
  • Потоков Writable, которые выпустили событие 'finish' или 'close'.

stream.pipeline() оставляет висячие обработчики событий в потоках после вызова обратного вызова. В случае повторного использования потоков после сбоя это может привести к утечкам обработчиков событий и пропущенным ошибкам. Если последний поток является читаемым, висячие обработчики событий будут удалены, чтобы последний поток можно было использовать позже.

stream.pipeline() закрывает все потоки, когда возникает ошибка. Использование IncomingRequest с pipeline может привести к неожиданному поведению, поскольку оно уничтожит сокет, не отправив ожидаемый ответ. См. пример ниже:

js
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')

const server = http.createServer((req, res) => {
  const fileStream = fs.createReadStream('./fileNotExist.txt')
  pipeline(fileStream, res, err => {
    if (err) {
      console.log(err) // Нет такого файла
      // Это сообщение не может быть отправлено, так как `pipeline` уже уничтожил сокет
      return res.end('error!!!')
    }
  })
})

stream.compose(...streams)

[История]

ВерсияИзменения
v21.1.0, v20.10.0Добавлена поддержка класса stream.
v19.8.0, v18.16.0Добавлена поддержка webstreams.
v16.9.0Добавлено в: v16.9.0

[Стабильность: 1 - Экспериментально]

Стабильность: 1 Стабильность: 1 - stream.compose является экспериментальным.

Объединяет два или более потока в поток Duplex, который записывает в первый поток и считывает из последнего. Каждый предоставленный поток передаётся в следующий с использованием stream.pipeline. Если в любом из потоков возникает ошибка, то все они уничтожаются, включая внешний поток Duplex.

Поскольку stream.compose возвращает новый поток, который, в свою очередь, может (и должен) быть передан в другие потоки, он позволяет использовать композицию. В отличие от этого, при передаче потоков в stream.pipeline, обычно первый поток является читаемым потоком, а последний — записываемым, образуя замкнутую цепь.

Если передана функция Function, она должна быть методом-фабрикой, принимающим source Iterable.

js
import { compose, Transform } from 'node:stream'

const removeSpaces = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''))
  },
})

async function* toUpper(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
}

let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
  res += buf
}

console.log(res) // выведет 'HELLOWORLD'

stream.compose может использоваться для преобразования асинхронных итерируемых объектов, генераторов и функций в потоки.

  • AsyncIterable преобразуется в читаемый Duplex. Не может возвращать null.
  • AsyncGeneratorFunction преобразуется в читаемый/записываемый transform Duplex. Должен принимать исходный AsyncIterable в качестве первого параметра. Не может возвращать null.
  • AsyncFunction преобразуется в записываемый Duplex. Должен возвращать либо null, либо undefined.
js
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'

// Преобразование AsyncIterable в читаемый Duplex.
const s1 = compose(
  (async function* () {
    yield 'Hello'
    yield 'World'
  })()
)

// Преобразование AsyncGenerator в transform Duplex.
const s2 = compose(async function* (source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
})

let res = ''

// Преобразование AsyncFunction в записываемый Duplex.
const s3 = compose(async function (source) {
  for await (const chunk of source) {
    res += chunk
  }
})

await finished(compose(s1, s2, s3))

console.log(res) // выведет 'HELLOWORLD'

См. readable.compose(stream) для stream.compose как оператора.

stream.Readable.from(iterable[, options])

Добавлено в: v12.3.0, v10.17.0

  • iterable <Iterable> Объект, реализующий протокол итерируемого объекта Symbol.asyncIterator или Symbol.iterator. Генерирует событие 'error', если передано значение null.
  • options <Object> Параметры, предоставляемые new stream.Readable([options]). По умолчанию Readable.from() установит options.objectMode в true, если это явно не отменено установкой options.objectMode в false.
  • Возвращает: <stream.Readable>

Вспомогательный метод для создания читаемых потоков из итераторов.

js
const { Readable } = require('node:stream')

async function* generate() {
  yield 'hello'
  yield 'streams'
}

const readable = Readable.from(generate())

readable.on('data', chunk => {
  console.log(chunk)
})

Вызов Readable.from(string) или Readable.from(buffer) не будет итерировать строки или буферы для соответствия семантике других потоков по причинам производительности.

Если в качестве аргумента передаётся объект Iterable, содержащий промисы, это может привести к необработанному отклонению.

js
const { Readable } = require('node:stream')

Readable.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Необработанное отклонение
])

stream.Readable.fromWeb(readableStream[, options])

Добавлено в: v17.0.0

[Стабильность: 1 - Экспериментально]

Стабильность: 1 Стабильность: 1 - Экспериментально

stream.Readable.isDisturbed(stream)

Добавлен в: v16.8.0

[Стабильность: 1 — Экспериментальный]

Стабильность: 1 Стабильность: 1 — Экспериментальный

Возвращает, был ли поток прочитан или отменен.

stream.isErrored(stream)

Добавлен в: v17.3.0, v16.14.0

[Стабильность: 1 — Экспериментальный]

Стабильность: 1 Стабильность: 1 — Экспериментальный

Возвращает, произошла ли ошибка в потоке.

stream.isReadable(stream)

Добавлен в: v17.4.0, v16.14.0

[Стабильность: 1 — Экспериментальный]

Стабильность: 1 Стабильность: 1 — Экспериментальный

Возвращает, является ли поток читаемым.

stream.Readable.toWeb(streamReadable[, options])

Добавлен в: v17.0.0

[Стабильность: 1 — Экспериментальный]

Стабильность: 1 Стабильность: 1 — Экспериментальный

  • streamReadable <stream.Readable>

  • options <Object>

    • strategy <Object>
    • highWaterMark <number> Максимальный размер внутренней очереди (созданного ReadableStream) перед применением противодавления при чтении из заданного stream.Readable. Если значение не указано, оно будет взято из заданного stream.Readable.
    • size <Function> Функция, определяющая размер заданного фрагмента данных. Если значение не указано, размер будет равен 1 для всех фрагментов.
    • chunk <any>
    • Возвращает: <number>
  • Возвращает: <ReadableStream>

stream.Writable.fromWeb(writableStream[, options])

Добавлено в: v17.0.0

[Стабильность: 1 - Экспериментальный]

Стабильность: 1 Стабильность: 1 - Экспериментальный

stream.Writable.toWeb(streamWritable)

Добавлено в: v17.0.0

[Стабильность: 1 - Экспериментальный]

Стабильность: 1 Стабильность: 1 - Экспериментальный

stream.Duplex.from(src)

[История]

ВерсияИзменения
v19.5.0, v18.17.0Аргумент src теперь может быть ReadableStream или WritableStream.
v16.8.0Добавлено в: v16.8.0

Вспомогательный метод для создания дуплексных потоков.

  • Stream преобразует выходной поток в выходной Duplex и входной поток в Duplex.
  • Blob преобразуется во входной Duplex.
  • string преобразуется во входной Duplex.
  • ArrayBuffer преобразуется во входной Duplex.
  • AsyncIterable преобразуется во входной Duplex. Не может возвращать null.
  • AsyncGeneratorFunction преобразуется во входной/выходной transform Duplex. Должен принимать исходный AsyncIterable в качестве первого параметра. Не может возвращать null.
  • AsyncFunction преобразуется в выходной Duplex. Должен возвращать либо null, либо undefined.
  • Object ({ writable, readable }) преобразует readable и writable в Stream, а затем объединяет их в Duplex, где Duplex будет записывать в writable и читать из readable.
  • Promise преобразуется во входной Duplex. Значение null игнорируется.
  • ReadableStream преобразуется во входной Duplex.
  • WritableStream преобразуется в выходной Duplex.
  • Возвращает: <stream.Duplex>

Если в качестве аргумента передаётся объект Iterable, содержащий промисы, это может привести к необработанному отклонению.

js
const { Duplex } = require('node:stream')

Duplex.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Необработанное отклонение
])

stream.Duplex.fromWeb(pair[, options])

Добавлено в: v17.0.0

[Стабильность: 1 - Экспериментально]

Стабильность: 1 Стабильность: 1 - Экспериментально

js
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')

for await (const chunk of duplex) {
  console.log('readable', chunk)
}
js
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))

stream.Duplex.toWeb(streamDuplex)

Добавлено в: v17.0.0

[Стабильно: 1 - Экспериментально]

Стабильно: 1 Стабильность: 1 - Экспериментально

js
import { Duplex } from 'node:stream'

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

const { value } = await readable.getReader().read()
console.log('readable', value)
js
const { Duplex } = require('node:stream')

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

readable
  .getReader()
  .read()
  .then(result => {
    console.log('readable', result.value)
  })

stream.addAbortSignal(signal, stream)

[История]

ВерсияИзменения
v19.7.0, v18.16.0Добавлена поддержка ReadableStream и WritableStream.
v15.4.0Добавлено в: v15.4.0

Прикрепляет AbortSignal к читаемому или записываемому потоку. Это позволяет коду управлять уничтожением потока с помощью AbortController.

Вызов abort для AbortController, соответствующего переданному AbortSignal, будет работать так же, как вызов .destroy(new AbortError()) для потока, и controller.error(new AbortError()) для веб-потоков.

js
const fs = require('node:fs')

const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// Позже прервите операцию, закрыв поток
controller.abort()

Или используя AbortSignal с читаемым потоком как асинхронную итерируемую сущность:

js
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // установить таймаут
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
  try {
    for await (const chunk of stream) {
      await process(chunk)
    }
  } catch (e) {
    if (e.name === 'AbortError') {
      // Операция была отменена
    } else {
      throw e
    }
  }
})()

Или используя AbortSignal с ReadableStream:

js
const controller = new AbortController()
const rs = new ReadableStream({
  start(controller) {
    controller.enqueue('hello')
    controller.enqueue('world')
    controller.close()
  },
})

addAbortSignal(controller.signal, rs)

finished(rs, err => {
  if (err) {
    if (err.name === 'AbortError') {
      // Операция была отменена
    }
  }
})

const reader = rs.getReader()

reader.read().then(({ value, done }) => {
  console.log(value) // hello
  console.log(done) // false
  controller.abort()
})

stream.getDefaultHighWaterMark(objectMode)

Добавлен в: v19.9.0, v18.17.0

Возвращает значение default highWaterMark, используемое потоками. По умолчанию равно 65536 (64 KiB) или 16 для objectMode.

stream.setDefaultHighWaterMark(objectMode, value)

Добавлен в: v19.9.0, v18.17.0

Устанавливает значение default highWaterMark, используемое потоками.

API для разработчиков потоков

API модуля node:stream разработан таким образом, чтобы упростить реализацию потоков с использованием модели прототипного наследования JavaScript.

Сначала разработчик потока объявляет новый класс JavaScript, который расширяет один из четырех основных классов потоков (stream.Writable, stream.Readable, stream.Duplex или stream.Transform), обязательно вызывая соответствующий конструктор родительского класса:

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor({ highWaterMark, ...options }) {
    super({ highWaterMark })
    // ...
  }
}

При расширении потоков учитывайте, какие параметры пользователь может и должен предоставлять, прежде чем передавать их базовому конструктору. Например, если реализация делает предположения относительно параметров autoDestroy и emitClose, не позволяйте пользователю переопределять их. Явно указывайте, какие параметры передаются, вместо неявной передачи всех параметров.

Затем новый класс потока должен реализовать один или несколько конкретных методов в зависимости от типа создаваемого потока, как подробно описано в таблице ниже:

Случай использованияКлассМетод(ы) для реализации
Только чтениеReadable_read()
Только записьWritable_write() , _writev() , _final()
Чтение и записьDuplex_read() , _write() , _writev() , _final()
Обработка записанных данных, затем чтение результатаTransform_transform() , _flush() , _final()

Код реализации потока никогда не должен вызывать "публичные" методы потока, предназначенные для использования потребителями (как описано в разделе API для потребителей потоков). Это может привести к нежелательным побочным эффектам в коде приложения, потребляющем поток.

Избегайте переопределения публичных методов, таких как write(), end(), cork(), uncork(), read() и destroy(), или генерации внутренних событий, таких как 'error', 'data', 'end', 'finish' и 'close' через .emit(). Это может нарушить текущие и будущие инварианты потока, что приведет к проблемам с поведением и/или совместимостью с другими потоками, утилитами потоков и ожиданиями пользователей.

Упрощенное построение

Добавлено в: v1.2.0

Для многих простых случаев можно создавать поток без использования наследования. Этого можно достичь путем непосредственного создания экземпляров объектов stream.Writable, stream.Readable, stream.Duplex или stream.Transform и передачи соответствующих методов в качестве параметров конструктора.

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  construct(callback) {
    // Инициализация состояния и загрузка ресурсов...
  },
  write(chunk, encoding, callback) {
    // ...
  },
  destroy() {
    // Освобождение ресурсов...
  },
})

Реализация потока записи

Класс stream.Writable расширяется для реализации потока Writable.

Пользовательские потоки Writable должны вызывать конструктор new stream.Writable([options]) и реализовывать метод writable._write() и/или writable._writev().

new stream.Writable([options])

[История]

ВерсияИзменения
v22.0.0Увеличение значения highWaterMark по умолчанию.
v15.5.0Поддержка передачи AbortSignal.
v14.0.0Изменение значения параметра autoDestroy по умолчанию на true.
v11.2.0, v10.16.0Добавление параметра autoDestroy для автоматического вызова destroy() потока при возникновении события 'finish' или ошибок.
v10.0.0Добавление параметра emitClose для указания, должно ли событие 'close' генерироваться при уничтожении.
  • options <Object>
    • highWaterMark <number> Уровень буфера, при котором stream.write() начинает возвращать false. По умолчанию: 65536 (64 KiB) или 16 для потоков с objectMode.
    • decodeStrings <boolean> Кодировать ли строки, переданные в stream.write(), в Buffer (с кодировкой, указанной в вызове stream.write()) перед передачей их в stream._write(). Другие типы данных не преобразуются (т. е. Buffer не декодируются в строки). Установка в false предотвратит преобразование строк. По умолчанию: true.
    • defaultEncoding <string> Кодировка по умолчанию, используемая, когда кодировка не указана в качестве аргумента для stream.write(). По умолчанию: 'utf8'.
    • objectMode <boolean> Является ли stream.write(anyObj) допустимой операцией. При установке этого параметра становится возможным записывать значения JavaScript, отличные от строки, <Buffer>, <TypedArray> или <DataView>, если это поддерживается реализацией потока. По умолчанию: false.
    • emitClose <boolean> Должен ли поток генерировать событие 'close' после того, как он был уничтожен. По умолчанию: true.
    • write <Function> Реализация для метода stream._write().
    • writev <Function> Реализация для метода stream._writev().
    • destroy <Function> Реализация для метода stream._destroy().
    • final <Function> Реализация для метода stream._final().
    • construct <Function> Реализация для метода stream._construct().
    • autoDestroy <boolean> Должен ли этот поток автоматически вызывать .destroy() сам по себе после завершения. По умолчанию: true.
    • signal <AbortSignal> Сигнал, представляющий возможное отключение.
js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor(options) {
    // Вызывает конструктор stream.Writable().
    super(options)
    // ...
  }
}

Или, при использовании конструкторов в стиле pre-ES6:

js
const { Writable } = require('node:stream')
const util = require('node:util')

function MyWritable(options) {
  if (!(this instanceof MyWritable)) return new MyWritable(options)
  Writable.call(this, options)
}
util.inherits(MyWritable, Writable)

Или, используя упрощенный подход к конструктору:

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
})

Вызов abort для AbortController, соответствующего переданному AbortSignal, будет работать так же, как вызов .destroy(new AbortError()) для потока записи.

js
const { Writable } = require('node:stream')

const controller = new AbortController()
const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
  signal: controller.signal,
})
// Позже, прервать операцию, закрыв поток
controller.abort()

writable._construct(callback)

Добавлено в: v15.0.0

  • callback <Function> Вызовите эту функцию (необязательно с аргументом ошибки), когда поток завершит инициализацию.

Метод _construct() НИКОГДА не должен вызываться напрямую. Он может быть реализован дочерними классами, и если это так, он будет вызываться только внутренними методами класса Writable.

Эта необязательная функция будет вызвана через такт после возврата конструктора потока, откладывая любые вызовы _write(), _final() и _destroy() до тех пор, пока не будет вызван callback. Это полезно для инициализации состояния или асинхронной инициализации ресурсов, прежде чем поток станет доступен для использования.

js
const { Writable } = require('node:stream')
const fs = require('node:fs')

class WriteStream extends Writable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, 'w', (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _write(chunk, encoding, callback) {
    fs.write(this.fd, chunk, callback)
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

writable._write(chunk, encoding, callback)

[История]

ВерсияИзменения
v12.11.0_write() является необязательным при предоставлении _writev().
  • chunk <Buffer> | <string> | <any> Buffer, который будет записан, преобразованный из строки, переданной в stream.write(). Если параметр decodeStrings потока имеет значение false или поток работает в объективном режиме, фрагмент не будет преобразован и будет тем, что было передано в stream.write().
  • encoding <string> Если фрагмент является строкой, то encoding — это кодировка этой строки. Если фрагмент — это Buffer или поток работает в объективном режиме, encoding может игнорироваться.
  • callback <Function> Вызовите эту функцию (необязательно с аргументом ошибки), когда обработка предоставленного фрагмента будет завершена.

Все реализации потока Writable должны предоставлять метод writable._write() и/или writable._writev() для отправки данных в базовый ресурс.

Потоки Transform предоставляют свою собственную реализацию writable._write().

Эту функцию НИКОГДА не следует вызывать непосредственно кодом приложения. Она должна быть реализована дочерними классами и вызываться только внутренними методами класса Writable.

Функция callback должна вызываться синхронно внутри writable._write() или асинхронно (т. е. в другом такте), чтобы сигнализировать о том, что запись завершилась успешно или завершилась с ошибкой. Первый аргумент, переданный в callback, должен быть объектом Error, если вызов завершился неудачей, или null, если запись прошла успешно.

Все вызовы writable.write(), которые происходят между моментом вызова writable._write() и вызова callback, приведут к буферизации записанных данных. Когда callback вызывается, поток может генерировать событие 'drain'. Если реализация потока способна обрабатывать несколько фрагментов данных одновременно, следует реализовать метод writable._writev().

Если свойство decodeStrings явно установлено в false в параметрах конструктора, то chunk останется тем же объектом, который передается в .write(), и может быть строкой, а не Buffer. Это необходимо для поддержки реализаций, которые имеют оптимизированную обработку для определенных кодировок строковых данных. В этом случае аргумент encoding будет указывать кодировку строки. В противном случае аргумент encoding можно спокойно игнорировать.

Метод writable._write() имеет префикс подчеркивания, поскольку он является внутренним для класса, который его определяет, и никогда не должен вызываться непосредственно пользовательскими программами.

writable._writev(chunks, callback)

  • chunks <Object[]> Записываемые данные. Значение представляет собой массив <Object>, каждый из которых представляет отдельный фрагмент данных для записи. Свойства этих объектов:

    • chunk <Buffer> | <string> Экземпляр буфера или строка, содержащая записываемые данные. chunk будет строкой, если Writable был создан с параметром decodeStrings, установленным в false, и в write() была передана строка.
    • encoding <string> Кодировка символов chunk. Если chunk является Buffer, encoding будет 'buffer'.
  • callback <Function> Функция обратного вызова (необязательно с аргументом ошибки), которая вызывается по завершении обработки предоставленных фрагментов.

Эту функцию НИ В КОЕМ СЛУЧАЕ НЕ ДОЛЖЕН вызывать код приложения напрямую. Она должна быть реализована дочерними классами и вызываться только внутренними методами класса Writable.

Метод writable._writev() может быть реализован дополнительно или вместо writable._write() в реализациях потоков, способных обрабатывать несколько фрагментов данных одновременно. Если он реализован и есть буферизованные данные из предыдущих записей, будет вызван _writev(), а не _write().

Метод writable._writev() имеет префикс подчеркивания, поскольку является внутренним для класса, который его определяет, и никогда не должен вызываться пользовательскими программами напрямую.

writable._destroy(err, callback)

Добавлено в: v8.0.0

  • err <Error> Возможная ошибка.
  • callback <Function> Функция обратного вызова, принимающая необязательный аргумент ошибки.

Метод _destroy() вызывается методом writable.destroy(). Он может быть переопределен дочерними классами, но его нельзя вызывать напрямую.

writable._final(callback)

Добавлено в: v8.0.0

  • callback <Function> Эта функция вызывается (необязательно с аргументом ошибки) после завершения записи оставшихся данных.

Метод _final() не должен вызываться напрямую. Он может быть реализован дочерними классами, и если это так, то будет вызываться только внутренними методами класса Writable.

Эта необязательная функция будет вызвана перед закрытием потока, откладывая событие 'finish' до вызова callback. Это полезно для закрытия ресурсов или записи буферизованных данных перед завершением потока.

Ошибки во время записи

Ошибки, возникающие во время обработки методов writable._write(), writable._writev() и writable._final(), должны распространяться путем вызова обратного вызова и передачи ошибки в качестве первого аргумента. Выбрасывание Error изнутри этих методов или ручное излучение события 'error' приводит к неопределенному поведению.

Если поток Readable перенаправляется в поток Writable, когда Writable излучает ошибку, поток Readable будет отсоединен.

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'))
    } else {
      callback()
    }
  },
})

Пример потока writable

Следующий пример иллюстрирует довольно упрощенную (и несколько бессмысленную) реализацию пользовательского потока Writable. Хотя этот конкретный экземпляр потока Writable не имеет какой-либо реальной практической пользы, пример иллюстрирует каждый из необходимых элементов пользовательского экземпляра потока Writable:

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'))
    } else {
      callback()
    }
  }
}

Декодирование буферов в записываемом потоке

Декодирование буферов — распространенная задача, например, при использовании трансформаторов, входными данными которых является строка. Это нетривиальный процесс при использовании кодировки многобайтовых символов, таких как UTF-8. В следующем примере показано, как декодировать многобайтовые строки с помощью StringDecoder и Writable.

js
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')

class StringWritable extends Writable {
  constructor(options) {
    super(options)
    this._decoder = new StringDecoder(options?.defaultEncoding)
    this.data = ''
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk)
    }
    this.data += chunk
    callback()
  }
  _final(callback) {
    this.data += this._decoder.end()
    callback()
  }
}

const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()

w.write('currency: ')
w.write(euro[0])
w.end(euro[1])

console.log(w.data) // currency: €

Реализация читаемого потока

Класс stream.Readable расширяется для реализации потока Readable.

Пользовательские потоки Readable должны вызывать конструктор new stream.Readable([options]) и реализовывать метод readable._read().

new stream.Readable([options])

[История]

ВерсияИзменения
v22.0.0увеличение значения highWaterMark по умолчанию.
v15.5.0поддержка передачи AbortSignal.
v14.0.0изменение значения параметра autoDestroy по умолчанию на true.
v11.2.0, v10.16.0добавление параметра autoDestroy для автоматического вызова destroy() потока при генерации события 'end' или ошибок.
  • options <Object>
    • highWaterMark <number> Максимальное количество байтов, которые можно хранить во внутреннем буфере, прежде чем прекратить чтение из базового ресурса. По умолчанию: 65536 (64 KiB) или 16 для потоков objectMode.
    • encoding <string> Если указано, буферы будут декодированы в строки с использованием указанной кодировки. По умолчанию: null.
    • objectMode <boolean> Должен ли этот поток вести себя как поток объектов. Это означает, что stream.read(n) возвращает одно значение вместо Buffer размера n. По умолчанию: false.
    • emitClose <boolean> Должен ли поток генерировать событие 'close' после того, как он будет уничтожен. По умолчанию: true.
    • read <Function> Реализация метода stream._read().
    • destroy <Function> Реализация метода stream._destroy().
    • construct <Function> Реализация метода stream._construct().
    • autoDestroy <boolean> Должен ли этот поток автоматически вызывать .destroy() сам по себе после завершения. По умолчанию: true.
    • signal <AbortSignal> Сигнал, представляющий возможное отключение.
js
const { Readable } = require('node:stream')

class MyReadable extends Readable {
  constructor(options) {
    // Вызывает конструктор stream.Readable(options).
    super(options)
    // ...
  }
}

Или, при использовании конструкторов в стиле pre-ES6:

js
const { Readable } = require('node:stream')
const util = require('node:util')

function MyReadable(options) {
  if (!(this instanceof MyReadable)) return new MyReadable(options)
  Readable.call(this, options)
}
util.inherits(MyReadable, Readable)

Или, используя упрощенный подход к конструктору:

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    // ...
  },
})

Вызов abort для AbortController, соответствующего переданному AbortSignal, будет вести себя так же, как вызов .destroy(new AbortError()) для созданного читаемого объекта.

js
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
  read(size) {
    // ...
  },
  signal: controller.signal,
})
// Позже прервите операцию, закрыв поток
controller.abort()

readable._construct(callback)

Добавлено в: v15.0.0

  • callback <Function> Вызов этой функции (необязательно с аргументом ошибки) после завершения инициализации потока.

Метод _construct() НЕ ДОЛЖЕН вызываться напрямую. Он может быть реализован дочерними классами, и в этом случае будет вызываться только внутренними методами класса Readable.

Эта необязательная функция будет запланирована на следующий такт конструктором потока, откладывая любые вызовы _read() и _destroy() до вызова callback. Это полезно для инициализации состояния или асинхронной инициализации ресурсов, прежде чем поток станет доступен для использования.

js
const { Readable } = require('node:stream')
const fs = require('node:fs')

class ReadStream extends Readable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _read(n) {
    const buf = Buffer.alloc(n)
    fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
      if (err) {
        this.destroy(err)
      } else {
        this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
      }
    })
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

readable._read(size)

Добавлено в: v0.9.4

  • size <number> Количество байтов для асинхронного чтения

Эту функцию НЕ ДОЛЖЕН вызывать код приложения напрямую. Она должна быть реализована дочерними классами и вызываться только внутренними методами класса Readable.

Все реализации потока Readable должны предоставлять реализацию метода readable._read() для извлечения данных из базового ресурса.

Когда вызывается readable._read(), если данные доступны из ресурса, реализация должна начать отправку этих данных в очередь чтения с помощью метода this.push(dataChunk). _read() будет вызываться снова после каждого вызова this.push(dataChunk), как только поток будет готов принять больше данных. _read() может продолжить чтение из ресурса и отправку данных до тех пор, пока readable.push() не вернет false. Только когда _read() вызывается снова после того, как он остановился, он должен возобновить отправку дополнительных данных в очередь.

После того, как метод readable._read() был вызван, он не будет вызываться снова, пока не будут отправлены дополнительные данные через метод readable.push(). Пустые данные, такие как пустые буферы и строки, не приведут к вызову readable._read().

Аргумент size является рекомендательным. Для реализаций, где "чтение" - это единственная операция, которая возвращает данные, можно использовать аргумент size для определения того, сколько данных нужно извлечь. Другие реализации могут игнорировать этот аргумент и просто предоставлять данные, как только они станут доступны. Нет необходимости "ждать", пока не станет доступно size байтов, прежде чем вызывать stream.push(chunk).

Метод readable._read() имеет префикс подчеркивания, потому что он является внутренним для класса, который его определяет, и никогда не должен вызываться напрямую пользовательскими программами.

readable._destroy(err, callback)

Добавлено в: v8.0.0

  • err <Error> Возможная ошибка.
  • callback <Function> Функция обратного вызова, принимающая необязательный аргумент ошибки.

Метод _destroy() вызывается методом readable.destroy(). Его можно переопределить в дочерних классах, но нельзя вызывать напрямую.

readable.push(chunk[, encoding])

[История]

ВерсияИзменения
v22.0.0, v20.13.0Аргумент chunk теперь может быть экземпляром TypedArray или DataView.
v8.0.0Аргумент chunk теперь может быть экземпляром Uint8Array.
  • chunk <Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Фрагмент данных для добавления в очередь чтения. Для потоков, не работающих в объектном режиме, chunk должен быть <string>, <Buffer>, <TypedArray> или <DataView>. Для потоков в объектном режиме chunk может быть любым значением JavaScript.
  • encoding <string> Кодировка строковых фрагментов. Должна быть допустимой кодировкой Buffer, например, 'utf8' или 'ascii'.
  • Возвращает: <boolean> true, если можно продолжать добавлять дополнительные фрагменты данных; false в противном случае.

Когда chunk является <Buffer>, <TypedArray>, <DataView> или <string>, фрагмент данных будет добавлен во внутренний буфер для использования потребителями потока. Передача chunk как null сигнализирует о конце потока (EOF), после чего больше данных записать нельзя.

Когда Readable работает в приостановленном режиме, данные, добавленные с помощью readable.push(), можно считать, вызвав метод readable.read(), когда возникает событие 'readable'.

Когда Readable работает в непрерывном режиме, данные, добавленные с помощью readable.push(), будут доставлены путем генерации события 'data'.

Метод readable.push() разработан для максимальной гибкости. Например, при обёртывании низкоуровневого источника, предоставляющего какой-либо механизм приостановки/возобновления и обратный вызов данных, низкоуровневый источник может быть обернут пользовательским экземпляром Readable:

js
// `_source` — это объект с методами readStop() и readStart(),
// а также членом `ondata`, который вызывается при наличии данных, и
// членом `onend`, который вызывается по окончании данных.

class SourceWrapper extends Readable {
  constructor(options) {
    super(options)

    this._source = getLowLevelSourceObject()

    // Каждый раз, когда есть данные, добавляем их во внутренний буфер.
    this._source.ondata = chunk => {
      // Если push() возвращает false, то прекращаем чтение из источника.
      if (!this.push(chunk)) this._source.readStop()
    }

    // Когда источник заканчивается, добавляем фрагмент `null`, сигнализирующий о конце файла.
    this._source.onend = () => {
      this.push(null)
    }
  }
  // _read() будет вызываться, когда поток захочет получить больше данных.
  // Аргумент advisory size в этом случае игнорируется.
  _read(size) {
    this._source.readStart()
  }
}

Метод readable.push() используется для добавления содержимого во внутренний буфер. Он может управляться методом readable._read().

Для потоков, не работающих в объектном режиме, если параметр chunk метода readable.push() равен undefined, он будет рассматриваться как пустая строка или буфер. См. readable.push('') для получения дополнительной информации.

Ошибки при чтении

Ошибки, возникающие во время обработки readable._read(), должны распространяться через метод readable.destroy(err). Выбрасывание Error изнутри readable._read() или ручная эмиссия события 'error' приводят к неопределённому поведению.

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    const err = checkSomeErrorCondition()
    if (err) {
      this.destroy(err)
    } else {
      // Выполнить некоторые действия.
    }
  },
})

Пример потока подсчёта

Ниже приведён базовый пример потока Readable, который излучает числовые значения от 1 до 1 000 000 в порядке возрастания, а затем завершается.

js
const { Readable } = require('node:stream')

class Counter extends Readable {
  constructor(opt) {
    super(opt)
    this._max = 1000000
    this._index = 1
  }

  _read() {
    const i = this._index++
    if (i > this._max) this.push(null)
    else {
      const str = String(i)
      const buf = Buffer.from(str, 'ascii')
      this.push(buf)
    }
  }
}

Реализация дуплексного потока

Поток Duplex — это поток, который реализует как Readable, так и Writable, например, соединение сокета TCP.

Поскольку JavaScript не поддерживает множественное наследование, класс stream.Duplex расширяется для реализации потока Duplex (в отличие от расширения классов stream.Readable и stream.Writable).

Класс stream.Duplex прототипически наследуется от stream.Readable и паразитически от stream.Writable, но instanceof будет работать корректно для обоих базовых классов из-за переопределения Symbol.hasInstance в stream.Writable.

Пользовательские потоки Duplex должны вызывать конструктор new stream.Duplex([options]) и реализовывать оба метода readable._read() и writable._write().

new stream.Duplex(options)

[История]

ВерсияИзменения
v8.4.0Теперь поддерживаются параметры readableHighWaterMark и writableHighWaterMark.
  • options <Объект> Передаётся как конструкторам Writable, так и Readable. Также имеет следующие поля:
    • allowHalfOpen <булево> Если установлено в false, то поток автоматически завершит запись, когда завершится чтение. По умолчанию: true.
    • readable <булево> Указывает, должен ли Duplex быть читаемым. По умолчанию: true.
    • writable <булево> Указывает, должен ли Duplex быть записываемым. По умолчанию: true.
    • readableObjectMode <булево> Устанавливает objectMode для читаемой стороны потока. Не оказывает влияния, если objectMode равен true. По умолчанию: false.
    • writableObjectMode <булево> Устанавливает objectMode для записываемой стороны потока. Не оказывает влияния, если objectMode равен true. По умолчанию: false.
    • readableHighWaterMark <число> Устанавливает highWaterMark для читаемой стороны потока. Не оказывает влияния, если указан highWaterMark.
    • writableHighWaterMark <число> Устанавливает highWaterMark для записываемой стороны потока. Не оказывает влияния, если указан highWaterMark.
js
const { Duplex } = require('node:stream')

class MyDuplex extends Duplex {
  constructor(options) {
    super(options)
    // ...
  }
}

Или, при использовании конструкторов в стиле pre-ES6:

js
const { Duplex } = require('node:stream')
const util = require('node:util')

function MyDuplex(options) {
  if (!(this instanceof MyDuplex)) return new MyDuplex(options)
  Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)

Или, используя упрощённый подход к конструктору:

js
const { Duplex } = require('node:stream')

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  },
})

При использовании pipeline:

js
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')

pipeline(
  fs.createReadStream('object.json').setEncoding('utf8'),
  new Transform({
    decodeStrings: false, // Принимает строковый ввод вместо буферов
    construct(callback) {
      this.data = ''
      callback()
    },
    transform(chunk, encoding, callback) {
      this.data += chunk
      callback()
    },
    flush(callback) {
      try {
        // Убеждаемся, что это валидный JSON.
        JSON.parse(this.data)
        this.push(this.data)
        callback()
      } catch (err) {
        callback(err)
      }
    },
  }),
  fs.createWriteStream('valid-object.json'),
  err => {
    if (err) {
      console.error('failed', err)
    } else {
      console.log('completed')
    }
  }
)

Пример дуплексного потока

Ниже приведён простой пример Duplex-потока, который обертывает гипотетический низкоуровневый исходный объект, в который можно записывать данные и из которого можно считывать данные, хотя и с использованием API, несовместимого с потоками Node.js. Следующий пример иллюстрирует простой пример Duplex-потока, который буферизует входящие записанные данные через интерфейс Writable, которые затем считываются через интерфейс Readable.

js
const { Duplex } = require('node:stream')
const kSource = Symbol('source')

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options)
    this[kSource] = source
  }

  _write(chunk, encoding, callback) {
    // Исходный объект работает только со строками.
    if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
    this[kSource].writeSomeData(chunk)
    callback()
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding))
    })
  }
}

Самый важный аспект Duplex-потока заключается в том, что стороны Readable и Writable работают независимо друг от друга, несмотря на то, что сосуществуют в одном экземпляре объекта.

Дуплексные потоки в объектом режиме

Для Duplex-потоков objectMode может быть установлен исключительно для стороны Readable или Writable с помощью опций readableObjectMode и writableObjectMode соответственно.

В следующем примере, например, создаётся новый поток Transform (который является типом потока Duplex), имеющий сторону Writable в объектом режиме, которая принимает числа JavaScript, которые преобразуются в шестнадцатеричные строки на стороне Readable.

js
const { Transform } = require('node:stream')

// Все потоки Transform также являются Duplex потоками.
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // Приведение чанка к числу, если необходимо.
    chunk |= 0

    // Преобразование чанка во что-то другое.
    const data = chunk.toString(16)

    // Добавление данных в очередь чтения.
    callback(null, '0'.repeat(data.length % 2) + data)
  },
})

myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))

myTransform.write(1)
// Выводит: 01
myTransform.write(10)
// Выводит: 0a
myTransform.write(100)
// Выводит: 64

Реализация потока преобразования

Поток Transform — это поток Duplex, в котором выходные данные каким-либо образом вычисляются из входных данных. Примерами являются потоки zlib или crypto, которые сжимают, шифруют или дешифруют данные.

Нет необходимости, чтобы размер выходных данных был равен размеру входных данных, количество фрагментов было одинаковым или они поступали одновременно. Например, поток Hash будет иметь только один фрагмент выходных данных, который предоставляется по завершении ввода. Поток zlib будет производить выходные данные, которые значительно меньше или больше, чем входные.

Класс stream.Transform расширяется для реализации потока Transform.

Класс stream.Transform прототипически наследуется от stream.Duplex и реализует собственные версии методов writable._write() и readable._read(). Пользовательские реализации Transform должны реализовывать метод transform._transform() и могут также реализовывать метод transform._flush().

Следует соблюдать осторожность при использовании потоков Transform, поскольку данные, записанные в поток, могут привести к приостановке части Writable потока, если выходные данные со стороны Readable не потребляются.

new stream.Transform([options])

js
const { Transform } = require('node:stream')

class MyTransform extends Transform {
  constructor(options) {
    super(options)
    // ...
  }
}

Или, при использовании конструкторов в стиле pre-ES6:

js
const { Transform } = require('node:stream')
const util = require('node:util')

function MyTransform(options) {
  if (!(this instanceof MyTransform)) return new MyTransform(options)
  Transform.call(this, options)
}
util.inherits(MyTransform, Transform)

Или, используя упрощенный подход к конструктору:

js
const { Transform } = require('node:stream')

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  },
})

Событие: 'end'

Событие 'end' происходит от класса stream.Readable. Событие 'end' генерируется после вывода всех данных, что происходит после вызова обратного вызова в transform._flush(). В случае ошибки 'end' не должно генерироваться.

Событие: 'finish'

Событие 'finish' происходит от класса stream.Writable. Событие 'finish' генерируется после вызова stream.end() и обработки всех фрагментов методом stream._transform(). В случае ошибки 'finish' не должно генерироваться.

transform._flush(callback)

  • callback <Function> Функция обратного вызова (опционально с аргументом ошибки и данными), которая вызывается после очистки оставшихся данных.

Эта функция НИ В КОЕМ СЛУЧАЕ не должна вызываться кодом приложения напрямую. Она должна быть реализована дочерними классами и вызываться только внутренними методами класса Readable.

В некоторых случаях операции преобразования могут потребовать вывода дополнительных данных в конце потока. Например, поток сжатия zlib будет хранить некоторое количество внутреннего состояния, используемого для оптимального сжатия вывода. Однако, когда поток заканчивается, эти дополнительные данные должны быть очищены, чтобы сжатые данные были полными.

Пользовательские реализации Transform могут реализовывать метод transform._flush(). Он будет вызван, когда нет больше записываемых данных для обработки, но до того, как будет сгенерировано событие 'end', сигнализирующее о конце потока Readable.

Внутри реализации transform._flush() метод transform.push() может вызываться ноль или более раз, в зависимости от ситуации. Функция callback должна быть вызвана по завершении операции очистки.

Метод transform._flush() имеет префикс подчеркивания, поскольку он является внутренним для класса, который его определяет, и никогда не должен вызываться напрямую пользовательскими программами.

transform._transform(chunk, encoding, callback)

  • chunk <Buffer> | <string> | <any> Преобразуемый буфер, преобразованный из строки, переданной в stream.write(). Если параметр decodeStrings потока имеет значение false или поток работает в объектном режиме, фрагмент не будет преобразован и будет таким же, как переданный в stream.write().
  • encoding <string> Если фрагмент является строкой, то это тип кодировки. Если фрагмент является буфером, то это специальное значение 'buffer'. В этом случае игнорируйте его.
  • callback <Function> Функция обратного вызова (необязательно с аргументом ошибки и данными), которая вызывается после обработки предоставленного фрагмента chunk.

Эту функцию НИ В КОЕМ СЛУЧАЕ нельзя вызывать непосредственно из кода приложения. Она должна быть реализована дочерними классами и вызываться только внутренними методами класса Readable.

Все реализации потоков Transform должны предоставлять метод _transform() для приема входных данных и вывода результата. Реализация transform._transform() обрабатывает записываемые байты, вычисляет выходные данные, а затем передает эти выходные данные в читаемую часть с помощью метода transform.push().

Метод transform.push() может вызываться от нуля и более раз для генерации выходных данных из одного входного фрагмента, в зависимости от того, сколько данных должно быть выведено в результате обработки фрагмента.

Возможно, что никакие выходные данные не будут сгенерированы из любого данного входного фрагмента данных.

Функция callback должна вызываться только тогда, когда текущий фрагмент полностью обработан. Первый аргумент, переданный в callback, должен быть объектом Error, если произошла ошибка во время обработки входных данных, или null в противном случае. Если второй аргумент передается в callback, он будет передан в метод transform.push(), но только если первый аргумент является ложным. Другими словами, следующие примеры эквивалентны:

js
transform.prototype._transform = function (data, encoding, callback) {
  this.push(data)
  callback()
}

transform.prototype._transform = function (data, encoding, callback) {
  callback(null, data)
}

Метод transform._transform() имеет префикс подчеркивания, поскольку он является внутренним для класса, который его определяет, и никогда не должен вызываться непосредственно пользовательскими программами.

transform._transform() никогда не вызывается параллельно; потоки реализуют механизм очереди, и для получения следующего фрагмента необходимо вызвать callback, синхронно или асинхронно.

Класс: stream.PassThrough

Класс stream.PassThrough — это тривиальная реализация потока Transform, который просто передает входные байты на выход. Его основное назначение — примеры и тестирование, но есть некоторые случаи использования, когда stream.PassThrough полезен как строительный блок для новых типов потоков.

Дополнительные заметки

Совместимость потоков с асинхронными генераторами и асинхронными итераторами

С поддержкой асинхронных генераторов и итераторов в JavaScript асинхронные генераторы на данный момент являются фактически полноценной языковой конструкцией потока.

Ниже приведены некоторые распространенные примеры взаимодействия использования потоков Node.js с асинхронными генераторами и асинхронными итераторами.

Потребление читаемых потоков с помощью асинхронных итераторов

js
;(async function () {
  for await (const chunk of readable) {
    console.log(chunk)
  }
})()

Асинхронные итераторы регистрируют постоянный обработчик ошибок в потоке, чтобы предотвратить любые необработанные ошибки после уничтожения.

Создание читаемых потоков с помощью асинхронных генераторов

Читаемый поток Node.js может быть создан из асинхронного генератора с помощью вспомогательного метода Readable.from():

js
const { Readable } = require('node:stream')

const ac = new AbortController()
const signal = ac.signal

async function* generate() {
  yield 'a'
  await someLongRunningFn({ signal })
  yield 'b'
  yield 'c'
}

const readable = Readable.from(generate())
readable.on('close', () => {
  ac.abort()
})

readable.on('data', chunk => {
  console.log(chunk)
})

Передача данных в записываемые потоки из асинхронных итераторов

При записи в записываемый поток из асинхронного итератора обеспечьте правильную обработку противодавления и ошибок. stream.pipeline() абстрагирует обработку противодавления и связанных с ним ошибок:

js
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')

const writable = fs.createWriteStream('./file')

const ac = new AbortController()
const signal = ac.signal

const iterator = createIterator({ signal })

// Шаблон обратного вызова
pipeline(iterator, writable, (err, value) => {
  if (err) {
    console.error(err)
  } else {
    console.log(value, 'value returned')
  }
}).on('close', () => {
  ac.abort()
})

// Шаблон Promise
pipelinePromise(iterator, writable)
  .then(value => {
    console.log(value, 'value returned')
  })
  .catch(err => {
    console.error(err)
    ac.abort()
  })

Совместимость со старыми версиями Node.js

До Node.js 0.10 интерфейс потока Readable был проще, но и менее мощным и полезным.

  • Вместо ожидания вызовов метода stream.read(), события 'data' начинали генерироваться немедленно. Приложениям, которым требовалось выполнить некоторую работу для определения способа обработки данных, приходилось сохранять считанные данные в буферы, чтобы данные не были потеряны.
  • Метод stream.pause() был рекомендательным, а не гарантированным. Это означало, что по-прежнему необходимо было быть готовым получать события 'data' даже когда поток находился в приостановленном состоянии.

В Node.js 0.10 был добавлен класс Readable. Для обратной совместимости со старыми программами Node.js потоки Readable переключаются в "режим потока", когда добавляется обработчик событий 'data' или вызывается метод stream.resume(). Эффект заключается в том, что даже при не использовании нового метода stream.read() и события 'readable', больше не нужно беспокоиться о потере фрагментов 'data'.

Хотя большинство приложений будут продолжать работать нормально, это вносит пограничный случай в следующих условиях:

  • Не добавлен обработчик событий 'data'.
  • Метод stream.resume() никогда не вызывается.
  • Поток не передаётся ни в один записываемый пункт назначения.

Например, рассмотрим следующий код:

js
// ВНИМАНИЕ!  НЕ РАБОТАЕТ!
net
  .createServer(socket => {
    // Мы добавляем обработчик 'end', но никогда не потребляем данные.
    socket.on('end', () => {
      // Сюда никогда не дойдёт.
      socket.end('The message was received but was not processed.\n')
    })
  })
  .listen(1337)

До Node.js 0.10 входящие данные сообщения просто отбрасывались. Однако в Node.js 0.10 и выше сокет остаётся приостановленным навсегда.

Решение в этой ситуации — вызов метода stream.resume() для начала потока данных:

js
// Решение.
net
  .createServer(socket => {
    socket.on('end', () => {
      socket.end('The message was received but was not processed.\n')
    })

    // Запуск потока данных, отбрасывая их.
    socket.resume()
  })
  .listen(1337)

В дополнение к новым потокам Readable, переключающимся в режим потока, потоки в стиле до 0.10 могут быть обернуты в класс Readable с помощью метода readable.wrap().

readable.read(0)

Существуют случаи, когда необходимо инициировать обновление базовых механизмов потока чтения, не потребляя при этом никаких данных. В таких случаях можно вызвать readable.read(0), который всегда возвращает null.

Если внутренний буфер чтения меньше highWaterMark, и поток в данный момент не читается, то вызов stream.read(0) инициирует низкоуровневый вызов stream._read().

Хотя большинству приложений это практически никогда не понадобится, существуют ситуации в Node.js, где это делается, особенно во внутренних компонентах класса потока Readable.

readable.push('')

Использование readable.push('') не рекомендуется.

Передача нулевого байта <string>, <Buffer>, <TypedArray> или <DataView> в поток, который не находится в объектном режиме, имеет интересный побочный эффект. Поскольку это является вызовом readable.push(), вызов завершит процесс чтения. Однако, поскольку аргумент является пустой строкой, данные не добавляются в буфер чтения, поэтому пользователю нечего потреблять.

Расхождение highWaterMark после вызова readable.setEncoding()

Использование readable.setEncoding() изменит поведение работы highWaterMark в не объектном режиме.

Обычно размер текущего буфера измеряется относительно highWaterMark в байтах. Однако после вызова setEncoding() функция сравнения начнет измерять размер буфера в символах.

В обычных случаях с latin1 или ascii это не проблема. Но рекомендуется учитывать это поведение при работе со строками, которые могут содержать многобайтовые символы.