Поток
[Стабильно: 2 - Стабильно]
Стабильно: 2 Стабильность: 2 - Стабильно
Исходный код: lib/stream.js
Поток — это абстрактный интерфейс для работы со потоковыми данными в Node.js. Модуль node:stream
предоставляет API для реализации интерфейса потока.
Node.js предоставляет множество объектов потоков. Например, запрос к HTTP-серверу и process.stdout
являются экземплярами потоков.
Потоки могут быть читаемыми, записываемыми или и теми, и другими. Все потоки являются экземплярами EventEmitter
.
Для доступа к модулю node:stream
:
const stream = require('node:stream')
Модуль node:stream
полезен для создания новых типов экземпляров потоков. Обычно нет необходимости использовать модуль node:stream
для потребления потоков.
Структура этого документа
Этот документ содержит два основных раздела и третий раздел для заметок. В первом разделе объясняется, как использовать существующие потоки в приложении. Во втором разделе объясняется, как создавать новые типы потоков.
Типы потоков
В Node.js есть четыре основных типа потоков:
Writable
: потоки, в которые можно записывать данные (например,fs.createWriteStream()
).Readable
: потоки, из которых можно читать данные (например,fs.createReadStream()
).Duplex
: потоки, которые являются одновременноReadable
иWritable
(например,net.Socket
).Transform
: потокиDuplex
, которые могут изменять или преобразовывать данные по мере их записи и чтения (например,zlib.createDeflate()
).
Кроме того, этот модуль включает в себя служебные функции stream.duplexPair()
, stream.pipeline()
, stream.finished()
, stream.Readable.from()
и stream.addAbortSignal()
.
API потоков Promise
Добавлено в: v15.0.0
API stream/promises
предоставляет альтернативный набор асинхронных служебных функций для потоков, которые возвращают объекты Promise
, а не используют обратные вызовы. API доступен через require('node:stream/promises')
или require('node:stream').promises
.
stream.pipeline(source[, ...transforms], destination[, options])
stream.pipeline(streams[, options])
[История]
Версия | Изменения |
---|---|
v18.0.0, v17.2.0, v16.14.0 | Добавлен параметр end , который можно установить в false , чтобы предотвратить автоматическое закрытие выходного потока по завершении исходного. |
v15.0.0 | Добавлено в: v15.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function>- Возвращает: <Promise> | <AsyncIterable>
...transforms
<Stream> | <Function>source
<AsyncIterable>- Возвращает: <Promise> | <AsyncIterable>
destination
<Stream> | <Function>source
<AsyncIterable>- Возвращает: <Promise> | <AsyncIterable>
options
<Object> Параметры конвейераsignal
<AbortSignal>end
<boolean> Завершить выходной поток по завершении исходного потока. Потоки преобразования всегда завершаются, даже если это значение равноfalse
. По умолчанию:true
.
Возвращает: <Promise> Выполняется, когда конвейер завершен.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
console.log('Конвейер завершен.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Конвейер завершен.')
Для использования AbortSignal
передайте его в объект options в качестве последнего аргумента. Когда сигнал прерывается, destroy
будет вызван для базового конвейера с ошибкой AbortError
.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
const ac = new AbortController()
const signal = ac.signal
setImmediate(() => ac.abort())
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
signal,
})
}
run().catch(console.error) // AbortError
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
console.error(err) // AbortError
}
API pipeline
также поддерживает асинхронные генераторы:
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // Работа со строками, а не с `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
fs.createWriteStream('uppercase.txt')
)
console.log('Конвейер завершен.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // Работа со строками, а не с `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
createWriteStream('uppercase.txt')
)
console.log('Конвейер завершен.')
Не забудьте обработать аргумент signal
, переданный в асинхронный генератор. Особенно в случае, когда асинхронный генератор является источником для конвейера (т.е. первый аргумент), иначе конвейер никогда не завершится.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Конвейер завершен.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Конвейер завершен.')
API pipeline
предоставляет версию с обратным вызовом:
stream.finished(stream[, options])
[История]
Версия | Изменения |
---|---|
v19.5.0, v18.14.0 | Добавлена поддержка ReadableStream и WritableStream . |
v19.1.0, v18.13.0 | Добавлен параметр cleanup . |
v15.0.0 | Добавлено в: v15.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> Читаемый и/или записываемый поток/веб-поток.options
<Object>error
<boolean> | <undefined>readable
<boolean> | <undefined>writable
<boolean> | <undefined>signal
<AbortSignal> | <undefined>cleanup
<boolean> | <undefined> Еслиtrue
, удаляет прослушиватели, зарегистрированные этой функцией, до выполнения обещания. По умолчанию:false
.
Возвращает: <Promise> Выполняется, когда поток больше не является читаемым или записываемым.
const { finished } = require('node:stream/promises')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('Поток завершил чтение.')
}
run().catch(console.error)
rs.resume() // Очистить поток.
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'
const rs = createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('Поток завершил чтение.')
}
run().catch(console.error)
rs.resume() // Очистить поток.
API finished
также предоставляет версию с обратным вызовом.
stream.finished()
оставляет висячие обработчики событий (в частности, 'error'
, 'end'
, 'finish'
и 'close'
) после того, как возвращаемое обещание выполнено или отклонено. Причина этого в том, что непредвиденные события 'error'
(из-за неправильной реализации потока) не вызывают неожиданных сбоев. Если это нежелательное поведение, то options.cleanup
следует установить в true
:
await finished(rs, { cleanup: true })
Режим объектов
Все потоки, создаваемые API Node.js, работают исключительно со строками, объектами <Buffer>, <TypedArray> и <DataView>:
Strings
иBuffers
являются наиболее распространёнными типами, используемыми с потоками.TypedArray
иDataView
позволяют обрабатывать двоичные данные с типами, такими какInt32Array
илиUint8Array
. При записи TypedArray или DataView в поток Node.js обрабатывает необработанные байты.
Однако реализации потоков могут работать с другими типами значений JavaScript (за исключением null
, который имеет специальное назначение в потоках). Такие потоки считаются работающими в «режиме объектов».
Экземпляры потоков переключаются в режим объектов с помощью опции objectMode
при создании потока. Попытка переключить существующий поток в режим объектов небезопасна.
Буферизация
И потоки Writable
, и потоки Readable
будут хранить данные во внутреннем буфере.
Объём потенциально буферизуемых данных зависит от опции highWaterMark
, передаваемой в конструктор потока. Для обычных потоков опция highWaterMark
указывает общее количество байтов. Для потоков, работающих в режиме объектов, highWaterMark
указывает общее количество объектов. Для потоков, работающих со строками (но не декодирующих их), highWaterMark
указывает общее количество кодовых единиц UTF-16.
Данные буферизуются в потоках Readable
, когда реализация вызывает stream.push(chunk)
. Если потребитель потока не вызывает stream.read()
, данные будут находиться во внутренней очереди до тех пор, пока они не будут использованы.
Как только общий размер внутреннего буфера чтения достигнет порога, заданного highWaterMark
, поток временно прекратит чтение данных из базового ресурса, пока не будут использованы текущие буферизованные данные (то есть поток прекратит вызывать внутренний метод readable._read()
, который используется для заполнения буфера чтения).
Данные буферизуются в потоках Writable
, когда метод writable.write(chunk)
вызывается многократно. Пока общий размер внутреннего буфера записи меньше порога, установленного highWaterMark
, вызовы writable.write()
будут возвращать true
. Как только размер внутреннего буфера достигнет или превысит highWaterMark
, будет возвращено false
.
Ключевая цель API stream
, в частности метода stream.pipe()
, — ограничить буферизацию данных до приемлемого уровня, чтобы источники и назначение с различной скоростью не перегружали доступную память.
Опция highWaterMark
является порогом, а не ограничением: она определяет объём данных, которые поток буферизует, прежде чем перестанет запрашивать больше данных. Она не устанавливает строгое ограничение памяти в целом. Конкретные реализации потоков могут выбирать более строгие ограничения, но это необязательно.
Поскольку потоки Duplex
и Transform
являются одновременно Readable
и Writable
, каждый из них поддерживает два отдельных внутренних буфера, используемых для чтения и записи, что позволяет каждой стороне работать независимо от другой, поддерживая при этом соответствующий и эффективный поток данных. Например, экземпляры net.Socket
являются потоками Duplex
, чья сторона Readable
позволяет потреблять данные, полученные от сокета, а чья сторона Writable
позволяет записывать данные в сокет. Поскольку данные могут записываться в сокет с большей или меньшей скоростью, чем данные принимаются, каждая сторона должна работать (и буферизовать) независимо от другой.
Механизм внутреннего буферирования является внутренней деталью реализации и может быть изменён в любое время. Однако для некоторых расширенных реализаций внутренние буферы могут быть получены с помощью writable.writableBuffer
или readable.readableBuffer
. Использование этих не документированных свойств не рекомендуется.
API для потребителей потоков
Почти все приложения Node.js, независимо от их сложности, используют потоки. Ниже приведён пример использования потоков в приложении Node.js, реализующем HTTP-сервер:
const http = require('node:http')
const server = http.createServer((req, res) => {
// `req` — это http.IncomingMessage, читаемый поток.
// `res` — это http.ServerResponse, записывающий поток.
let body = ''
// Получение данных как строки utf8.
// Если кодировка не задана, будут получены объекты Buffer.
req.setEncoding('utf8')
// Читаемые потоки генерируют события 'data' после добавления слушателя.
req.on('data', chunk => {
body += chunk
})
// Событие 'end' указывает на то, что всё тело было получено.
req.on('end', () => {
try {
const data = JSON.parse(body)
// Отправить пользователю что-нибудь интересное:
res.write(typeof data)
res.end()
} catch (er) {
// Ой! Плохой JSON!
res.statusCode = 400
return res.end(`error: ${er.message}`)
}
})
})
server.listen(1337)
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON
Потоки Writable
(например, res
в примере) предоставляют методы, такие как write()
и end()
, которые используются для записи данных в поток.
Потоки Readable
используют API EventEmitter
для уведомления кода приложения о доступности данных для чтения из потока. Доступные данные могут быть считаны из потока несколькими способами.
И Writable
, и Readable
потоки используют API EventEmitter
различными способами для передачи текущего состояния потока.
Потоки Duplex
и Transform
являются одновременно Writable
и Readable
.
Приложения, которые записывают данные в поток или считывают данные из потока, не обязаны напрямую реализовывать интерфейсы потока и, как правило, не будут иметь причин вызывать require('node:stream')
.
Разработчики, желающие реализовать новые типы потоков, должны обратиться к разделу API для разработчиков потоков.
Записывающие потоки
Записывающие потоки — это абстракция для назначения, в которое записываются данные.
Примеры потоков Writable
:
- HTTP-запросы на стороне клиента
- HTTP-ответы на стороне сервера
- потоки записи fs
- потоки zlib
- потоки crypto
- TCP-сокеты
- стандартный ввод дочернего процесса
process.stdout
,process.stderr
Некоторые из этих примеров на самом деле являются потоками Duplex
, которые реализуют интерфейс Writable
.
Все потоки Writable
реализуют интерфейс, определённый классом stream.Writable
.
Хотя конкретные экземпляры потоков Writable
могут различаться различными способами, все потоки Writable
следуют одной и той же фундаментальной схеме использования, как показано в примере ниже:
const myStream = getWritableStreamSomehow()
myStream.write('some data')
myStream.write('some more data')
myStream.end('done writing data')
Класс: stream.Writable
Добавлено в: v0.9.4
Событие: 'close'
[История]
Версия | Изменения |
---|---|
v10.0.0 | Добавлен параметр emitClose для указания, должно ли событие 'close' генерироваться при уничтожении. |
v0.9.4 | Добавлено в: v0.9.4 |
Событие 'close'
генерируется, когда поток и любые его базовые ресурсы (например, дескриптор файла) закрыты. Событие указывает на то, что больше не будет генерироваться событий и не будет выполняться дальнейших вычислений.
Поток Writable
всегда будет генерировать событие 'close'
, если он создан с параметром emitClose
.
Событие: 'drain'
Добавлено в: v0.9.4
Если вызов stream.write(chunk)
возвращает false
, событие 'drain'
будет генерироваться, когда будет целесообразно возобновить запись данных в поток.
// Запись данных в предоставленный записывающий поток миллион раз.
// Будьте внимательны к противодавлению.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000
write()
function write() {
let ok = true
do {
i--
if (i === 0) {
// Последний раз!
writer.write(data, encoding, callback)
} else {
// Посмотрим, должны ли мы продолжать или подождать.
// Не передавайте обратный вызов, потому что мы еще не закончили.
ok = writer.write(data, encoding)
}
} while (i > 0 && ok)
if (i > 0) {
// Пришлось остановиться раньше!
// Запишем еще немного, когда произойдет сброс.
writer.once('drain', write)
}
}
}
Событие: 'error'
Добавлено в: v0.9.4
Событие 'error'
генерируется, если произошла ошибка во время записи или передачи данных. Обратный вызов прослушивателя получает один аргумент Error
при вызове.
Поток закрывается, когда генерируется событие 'error'
, если только параметр autoDestroy
не был установлен в false
при создании потока.
После 'error'
не должно генерироваться больше никаких событий, кроме 'close'
(включая события 'error'
).
Событие: 'finish'
Добавлено в: v0.9.4
Событие 'finish'
генерируется после вызова метода stream.end()
, и все данные были выгружены в базовую систему.
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
console.log('All writes are now complete.')
})
writer.end('This is the end\n')
Событие: 'pipe'
Добавлено в: v0.9.4
src
<stream.Readable> исходный поток, который передает данные в этот записывающий поток
Событие 'pipe'
генерируется, когда вызывается метод stream.pipe()
для читаемого потока, добавляя этот записывающий поток к его набору получателей.
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
console.log('Something is piping into the writer.')
assert.equal(src, reader)
})
reader.pipe(writer)
Событие: 'unpipe'
Добавлено в: v0.9.4
src
<stream.Readable> Исходный поток, который отсоединил этот записывающий поток
Событие 'unpipe'
генерируется, когда вызывается метод stream.unpipe()
для потока Readable
, удаляя этот Writable
из его набора получателей.
Это также генерируется в случае, если этот поток Writable
генерирует ошибку, когда поток Readable
передает данные в него.
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
console.log('Something has stopped piping into the writer.')
assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()
Добавлено в: v0.11.2
Метод writable.cork()
принудительно буферизует все записанные данные в памяти. Забуференные данные будут сброшены при вызове методов stream.uncork()
или stream.end()
.
Основное назначение writable.cork()
— обработка ситуации, когда в поток быстро и последовательно записывается несколько небольших фрагментов. Вместо немедленной передачи их в конечный пункт назначения, writable.cork()
буферизует все фрагменты до вызова writable.uncork()
, который передаст их все в writable._writev()
, если он присутствует. Это предотвращает блокировку из-за первого фрагмента, когда данные буферизуются в ожидании обработки первого небольшого фрагмента. Однако использование writable.cork()
без реализации writable._writev()
может негативно повлиять на пропускную способность.
См. также: writable.uncork()
, writable._writev()
.
writable.destroy([error])
[История]
Версия | Изменения |
---|---|
v14.0.0 | Работает как операция без действия в потоке, который уже был уничтожен. |
v8.0.0 | Добавлено в: v8.0.0 |
Уничтожает поток. При необходимости генерирует событие 'error'
и событие 'close'
(если emitClose
не установлено в false
). После этого вызова запись в записываемый поток завершается, и последующие вызовы write()
или end()
приведут к ошибке ERR_STREAM_DESTROYED
. Это деструктивный и немедленный способ уничтожения потока. Предыдущие вызовы write()
могли не быть обработаны и могут вызвать ошибку ERR_STREAM_DESTROYED
. Используйте end()
вместо destroy
, если данные должны быть сброшены перед закрытием, или дождитесь события 'drain'
перед уничтожением потока.
const { Writable } = require('node:stream')
const myStream = new Writable()
const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.on('error', function wontHappen() {})
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED
После вызова destroy()
любые дальнейшие вызовы будут операциями без действия, и никаких дальнейших ошибок, кроме ошибок из _destroy()
, не будут генерироваться как 'error'
.
Разработчикам не следует переопределять этот метод, а вместо этого следует реализовать writable._destroy()
.
writable.closed
Добавлено в: v18.0.0
Имеет значение true
после того, как событие 'close'
было отправлено.
writable.destroyed
Добавлено в: v8.0.0
Имеет значение true
после вызова writable.destroy()
.
const { Writable } = require('node:stream')
const myStream = new Writable()
console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])
[История]
Версия | Изменения |
---|---|
v22.0.0, v20.13.0 | Аргумент chunk теперь может быть экземпляром TypedArray или DataView . |
v15.0.0 | Обратный вызов callback вызывается до 'finish' или при ошибке. |
v14.0.0 | Обратный вызов callback вызывается, если отправлено 'finish' или 'error'. |
v10.0.0 | Этот метод теперь возвращает ссылку на writable . |
v8.0.0 | Аргумент chunk теперь может быть экземпляром Uint8Array . |
v0.9.4 | Добавлено в: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> Необязательные данные для записи. Для потоков, не работающих в объектном режиме,chunk
должен быть <string>, <Buffer>, <TypedArray> или <DataView>. Для потоков в объектном режимеchunk
может быть любым значением JavaScript, кромеnull
.encoding
<string> Кодировка, еслиchunk
является строкойcallback
<Function> Обратный вызов при завершении потока.- Возвращает: <this>
Вызов метода writable.end()
сигнализирует о том, что больше никаких данных не будет записано в Writable
. Необязательные аргументы chunk
и encoding
позволяют записать один последний дополнительный фрагмент данных непосредственно перед закрытием потока.
Вызов метода stream.write()
после вызова stream.end()
вызовет ошибку.
// Записываем 'hello, ' и затем завершаем запись 'world!'.
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hello, ')
file.end('world!')
// Запись больше не допускается!
writable.setDefaultEncoding(encoding)
[История]
Версия | Изменения |
---|---|
v6.1.0 | Этот метод теперь возвращает ссылку на writable . |
v0.11.15 | Добавлено в: v0.11.15 |
Метод writable.setDefaultEncoding()
устанавливает кодировку по умолчанию encoding
для потока Writable
.
writable.uncork()
Добавлено в: v0.11.2
Метод writable.uncork()
сбрасывает все данные, буферизованные с момента вызова stream.cork()
.
При использовании writable.cork()
и writable.uncork()
для управления буферизацией записи в поток, откладывайте вызовы writable.uncork()
с помощью process.nextTick()
. Это позволяет группировать все вызовы writable.write()
, которые происходят в течение данной фазы цикла событий Node.js.
stream.cork()
stream.write('some ')
stream.write('data ')
process.nextTick(() => stream.uncork())
Если метод writable.cork()
вызывается несколько раз для одного потока, для сброса буферизованных данных необходимо вызвать writable.uncork()
столько же раз.
stream.cork()
stream.write('some ')
stream.cork()
stream.write('data ')
process.nextTick(() => {
stream.uncork()
// Данные не будут сброшены, пока uncork() не будет вызван второй раз.
stream.uncork()
})
См. также: writable.cork()
.
writable.writable
Добавлено в: v11.4.0
Равно true
, если безопасно вызывать writable.write()
, то есть поток не был уничтожен, не произошла ошибка и он не завершен.
writable.writableAborted
Добавлено в: v18.0.0, v16.17.0
[Стабильно: 1 - Экспериментально]
Стабильно: 1 Стабильность: 1 - Экспериментально
Возвращает, был ли поток уничтожен или произошла ошибка до выдачи события 'finish'
.
writable.writableEnded
Добавлено в: v12.9.0
Принимает значение true
после вызова writable.end()
. Это свойство не указывает, были ли данные очищены; для этого используйте writable.writableFinished
.
writable.writableCorked
Добавлено в: v13.2.0, v12.16.0
Количество вызовов writable.uncork()
, необходимых для полного снятия блокировки потока.
writable.errored
Добавлено в: v18.0.0
Возвращает ошибку, если поток был уничтожен с ошибкой.
writable.writableFinished
Добавлено в: v12.6.0
Устанавливается в true
непосредственно перед событием 'finish'
.
writable.writableHighWaterMark
Добавлено в: v9.3.0
Возвращает значение highWaterMark
, переданное при создании этого Writable
.
writable.writableLength
Добавлено в: v9.4.0
Это свойство содержит количество байтов (или объектов) в очереди, готовых к записи. Значение предоставляет информацию о состоянии highWaterMark
.
writable.writableNeedDrain
Добавлено в: v15.2.0, v14.17.0
Принимает значение true
, если буфер потока заполнен и поток будет генерировать событие 'drain'
.
writable.writableObjectMode
Добавлено в: v12.3.0
Геттер для свойства objectMode
заданного потока Writable
.
writable[Symbol.asyncDispose]()
Добавлено в: v22.4.0, v20.16.0
[Стабильность: 1 - Экспериментально]
Стабильность: 1 Стабильность: 1 - Экспериментально
Вызывает writable.destroy()
с AbortError
и возвращает промис, который выполняется после завершения потока.
writable.write(chunk[, encoding][, callback])
[История]
Версия | Изменения |
---|---|
v22.0.0, v20.13.0 | Аргумент chunk теперь может быть экземпляром TypedArray или DataView . |
v8.0.0 | Аргумент chunk теперь может быть экземпляром Uint8Array . |
v6.0.0 | Передача null в качестве параметра chunk теперь всегда будет считаться недействительной, даже в объективном режиме. |
v0.9.4 | Добавлено в: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> Необязательные данные для записи. Для потоков, не работающих в объективном режиме,chunk
должен быть <string>, <Buffer>, <TypedArray> или <DataView>. Для потоков в объективном режимеchunk
может быть любым значением JavaScript, кромеnull
.encoding
<string> | <null> Кодировка, еслиchunk
является строкой. По умолчанию:'utf8'
callback
<Function> Обратный вызов после того, как этот фрагмент данных будет очищен.- Возвращает: <boolean>
false
, если поток желает, чтобы вызывающий код ожидал события'drain'
, прежде чем продолжать запись дополнительных данных; в противном случаеtrue
.
Метод writable.write()
записывает некоторые данные в поток и вызывает предоставленный callback
после того, как данные будут полностью обработаны. Если возникает ошибка, callback
будет вызван с ошибкой в качестве первого аргумента. callback
вызывается асинхронно и до того, как будет выброшено событие 'error'
.
Возвращаемое значение равно true
, если внутренний буфер меньше highWaterMark
, настроенного при создании потока после принятия chunk
. Если возвращается false
, дальнейшие попытки записи данных в поток должны быть остановлены до тех пор, пока не будет вызвано событие 'drain'
.
Пока поток не осушается, вызовы write()
будут буферизовать chunk
и возвращать false. После того, как все текущие буферизованные фрагменты будут осушены (приняты для доставки операционной системой), будет вызвано событие 'drain'
. После того, как write()
вернет false, не записывайте больше фрагментов, пока не будет вызвано событие 'drain'
. Хотя вызов write()
в потоке, который не осушается, разрешен, Node.js будет буферизовать все записанные фрагменты до тех пор, пока не будет достигнуто максимальное использование памяти, после чего он безусловно прервется. Даже до того, как он прервется, высокое использование памяти приведет к низкой производительности сборщика мусора и высокому RSS (который обычно не возвращается в систему, даже после того, как память больше не требуется). Поскольку TCP-сокеты могут никогда не осушаться, если удаленный узел не читает данные, запись в сокет, который не осушается, может привести к уязвимости, которую можно использовать удаленно.
Запись данных, пока поток не осушается, особенно проблематична для Transform
, потому что потоки Transform
приостанавливаются по умолчанию, пока они не будут переданы по конвейеру или не будет добавлен обработчик событий 'data'
или 'readable'
.
Если данные для записи могут быть сгенерированы или получены по запросу, рекомендуется инкапсулировать логику в Readable
и использовать stream.pipe()
. Однако, если вы предпочитаете вызывать write()
, можно учитывать противодавление и избегать проблем с памятью, используя событие 'drain'
:
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb)
} else {
process.nextTick(cb)
}
}
// Ожидание вызова cb перед выполнением любой другой записи.
write('hello', () => {
console.log('Запись завершена, выполните больше записей сейчас.')
})
Поток Writable
в объективном режиме всегда будет игнорировать аргумент encoding
.
Читаемые потоки
Читаемые потоки — это абстракция для источника, из которого потребляются данные.
Примеры потоков Readable
:
- HTTP-ответы на стороне клиента
- HTTP-запросы на стороне сервера
- Потоки чтения fs
- Потоки zlib
- Потоки crypto
- TCP-сокеты
- stdout и stderr дочернего процесса
process.stdin
Все потоки Readable
реализуют интерфейс, определенный классом stream.Readable
.
Два режима чтения
Потоки Readable
фактически работают в одном из двух режимов: потоковом и приостановленном. Эти режимы отделены от режима объекта. Поток Readable
может находиться в режиме объекта или нет, независимо от того, находится ли он в потоковом режиме или в приостановленном режиме.
- В потоковом режиме данные автоматически считываются из базовой системы и предоставляются приложению как можно быстрее с помощью событий через интерфейс
EventEmitter
. - В приостановленном режиме метод
stream.read()
должен вызываться явно для чтения фрагментов данных из потока.
Все потоки Readable
начинаются в приостановленном режиме, но могут быть переключены в потоковый режим одним из следующих способов:
- Добавление обработчика событий
'data'
. - Вызов метода
stream.resume()
. - Вызов метода
stream.pipe()
для отправки данных вWritable
.
Readable
может переключиться обратно в приостановленный режим одним из следующих способов:
- Если нет целевых мест для передачи данных, вызвав метод
stream.pause()
. - Если есть целевые места для передачи данных, удалив все целевые места для передачи данных. Несколько целевых мест для передачи данных могут быть удалены вызовом метода
stream.unpipe()
.
Важно помнить, что Readable
не будет генерировать данные, пока не будет предоставлен механизм для их потребления или игнорирования. Если механизм потребления отключен или удален, Readable
попытается прекратить генерацию данных.
В целях обратной совместимости удаление обработчиков событий 'data'
не будет автоматически приостанавливать поток. Кроме того, если есть целевые места для передачи данных, то вызов stream.pause()
не гарантирует, что поток останется приостановленным после того, как эти места опустеют и запросят больше данных.
Если Readable
переключается в потоковый режим, и нет потребителей, доступных для обработки данных, эти данные будут потеряны. Это может произойти, например, когда вызывается метод readable.resume()
без обработчика, прикрепленного к событию 'data'
, или когда обработчик события 'data'
удален из потока.
Добавление обработчика событий 'readable'
автоматически останавливает поток данных, и данные должны потребляться через readable.read()
. Если обработчик событий 'readable'
удален, то поток снова начнет работать, если есть обработчик событий 'data'
.
Три состояния
Два режима работы потока Readable
— это упрощенное представление более сложного внутреннего управления состоянием, происходящего внутри реализации потока Readable
.
В частности, в любой момент времени каждый Readable
находится в одном из трех возможных состояний:
readable.readableFlowing === null
readable.readableFlowing === false
readable.readableFlowing === true
Когда readable.readableFlowing
равно null
, механизм потребления данных потока не предоставляется. Поэтому поток не будет генерировать данные. В этом состоянии подключение обработчика события 'data'
, вызов метода readable.pipe()
или вызов метода readable.resume()
переключат readable.readableFlowing
на true
, заставив Readable
начать активно генерировать события по мере создания данных.
Вызов readable.pause()
, readable.unpipe()
или получение обратного давления приведут к тому, что readable.readableFlowing
будет установлен как false
, временно приостановив поток событий, но не приостановив генерацию данных. В этом состоянии подключение обработчика события 'data'
не переключит readable.readableFlowing
на true
.
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()
pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing теперь false.
pass.on('data', chunk => {
console.log(chunk.toString())
})
// readableFlowing по-прежнему false.
pass.write('ok') // Не будет генерировать 'data'.
pass.resume() // Необходимо вызвать для того, чтобы поток генерировал 'data'.
// readableFlowing теперь true.
Пока readable.readableFlowing
равно false
, данные могут накапливаться во внутреннем буфере потока.
Выберите один стиль API
API потока Readable
развивался в нескольких версиях Node.js и предоставляет несколько способов потребления данных потока. В целом, разработчики должны выбирать один из способов потребления данных и никогда не должны использовать несколько способов для потребления данных из одного потока. В частности, использование комбинации on('data')
, on('readable')
, pipe()
или асинхронных итераторов может привести к неинтуитивному поведению.
Класс: stream.Readable
Добавлено в: v0.9.4
Событие: 'close'
[История]
Версия | Изменения |
---|---|
v10.0.0 | Добавлен параметр emitClose для указания, должно ли событие 'close' генерироваться при уничтожении. |
v0.9.4 | Добавлено в: v0.9.4 |
Событие 'close'
генерируется, когда поток и любые его базовые ресурсы (например, дескриптор файла) закрыты. Событие указывает, что больше событий генерироваться не будет, и дальнейшие вычисления выполняться не будут.
Поток Readable
всегда будет генерировать событие 'close'
, если он создан с параметром emitClose
.
Событие: 'data'
Добавлено в: v0.9.4
chunk
<Buffer> | <string> | <any> Фрагмент данных. Для потоков, не работающих в объектном режиме, фрагмент будет строкой илиBuffer
. Для потоков, работающих в объектном режиме, фрагмент может быть любым значением JavaScript, кромеnull
.
Событие 'data'
генерируется всякий раз, когда поток отказывается от владения фрагментом данных в пользу потребителя. Это может произойти, когда поток переключается в режим потока вызовом readable.pipe()
, readable.resume()
или путем добавления обработчика обратного вызова к событию 'data'
. Событие 'data'
также будет генерироваться всякий раз, когда вызывается метод readable.read()
, и доступен фрагмент данных для возврата.
Добавление обработчика события 'data'
к потоку, который не был явно приостановлен, переведет поток в режим потока. Данные будут передаваться, как только они станут доступны.
Обработчик обратного вызова получит фрагмент данных в виде строки, если для потока указана кодировка по умолчанию с помощью метода readable.setEncoding()
; в противном случае данные будут переданы как Buffer
.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Получено ${chunk.length} байт данных.`)
})
Событие: 'end'
Добавлено в: v0.9.4
Событие 'end'
генерируется, когда больше нет данных для потребления из потока.
Событие 'end'
не будет генерироваться, если данные не будут полностью потреблены. Этого можно добиться, переведя поток в режим потока или вызвав stream.read()
многократно, пока все данные не будут потреблены.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Получено ${chunk.length} байт данных.`)
})
readable.on('end', () => {
console.log('Больше данных не будет.')
})
Событие: 'error'
Добавлено в: v0.9.4
Событие 'error'
может быть генерировано реализацией Readable
в любое время. Как правило, это может произойти, если базовый поток не может генерировать данные из-за внутренней ошибки или когда реализация потока пытается передать недействительный фрагмент данных.
Функция обратного вызова обработчика получит один объект Error
.
Событие: 'pause'
Добавлено в: v0.9.4
Событие 'pause'
генерируется, когда вызывается stream.pause()
, а readableFlowing
не равно false
.
Событие: 'readable'
[История]
Версия | Изменения |
---|---|
v10.0.0 | Событие 'readable' всегда генерируется на следующем тике после вызова .push() . |
v10.0.0 | Использование 'readable' требует вызова .read() . |
v0.9.4 | Добавлено в: v0.9.4 |
Событие 'readable'
генерируется, когда доступны данные для чтения из потока, вплоть до установленной верхней границы уровня воды (state.highWaterMark
). Фактически, это указывает на то, что поток содержит новую информацию в буфере. Если данные доступны в этом буфере, можно вызвать stream.read()
для извлечения этих данных. Кроме того, событие 'readable'
также может быть генерировано, когда достигнут конец потока.
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
// Сейчас есть данные для чтения.
let data
while ((data = this.read()) !== null) {
console.log(data)
}
})
Если достигнут конец потока, вызов stream.read()
вернет null
и вызовет событие 'end'
. Это также верно, если данных для чтения никогда не было. Например, в следующем примере foo.txt
— это пустой файл:
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
console.log('end')
})
Вывод выполнения этого скрипта:
$ node test.js
readable: null
end
В некоторых случаях подключение обработчика события 'readable'
приведет к чтению некоторого количества данных во внутренний буфер.
В целом, механизмы readable.pipe()
и события 'data'
проще для понимания, чем событие 'readable'
. Однако обработка 'readable'
может привести к увеличению пропускной способности.
Если одновременно используются 'readable'
и 'data'
, 'readable'
имеет приоритет в управлении потоком, т.е. 'data'
будет генерироваться только при вызове stream.read()
. Свойство readableFlowing
станет false
. Если есть обработчики 'data'
, когда 'readable'
удален, поток начнет работать, т.е. события 'data'
будут генерироваться без вызова .resume()
.
Событие: 'resume'
Добавлено в: v0.9.4
Событие 'resume'
генерируется, когда вызывается stream.resume()
, а readableFlowing
не равно true
.
readable.destroy([error])
[История]
Версия | Изменения |
---|---|
v14.0.0 | Работает как no-op для потока, который уже был уничтожен. |
v8.0.0 | Добавлено в: v8.0.0 |
error
<Error> Ошибка, которая будет передана в качестве полезной нагрузки в событии'error'
- Возвращает: <this>
Уничтожает поток. По желанию генерирует событие 'error'
и событие 'close'
(если emitClose
не установлено в false
). После этого вызова читаемый поток освободит все внутренние ресурсы, и последующие вызовы push()
будут игнорироваться.
После вызова destroy()
любые дальнейшие вызовы будут no-op, и не будет генерироваться никаких дальнейших ошибок, за исключением ошибок из _destroy()
, которые могут быть выведены как 'error'
.
Разработчики не должны переопределять этот метод, а вместо этого должны реализовать readable._destroy()
.
readable.closed
Добавлено в: v18.0.0
Равно true
после того, как было сгенерировано событие 'close'
.
readable.destroyed
Добавлено в: v8.0.0
Равно true
после вызова readable.destroy()
.
readable.isPaused()
Добавлено в: v0.11.14
- Возвращает: <boolean>
Метод readable.isPaused()
возвращает текущее состояние работы Readable
. Он используется в основном механизмом, лежащим в основе метода readable.pipe()
. В большинстве типичных случаев нет необходимости использовать этот метод напрямую.
const readable = new stream.Readable()
readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()
Добавлен в: v0.9.4
- Возвращает: <this>
Метод readable.pause()
заставляет поток в режиме потока прекратить отправку событий 'data'
, выключая режим потока. Любые доступные данные останутся во внутреннем буфере.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Получено ${chunk.length} байт данных.`)
readable.pause()
console.log('В течение 1 секунды дополнительных данных не будет.')
setTimeout(() => {
console.log('Теперь данные начнут поступать снова.')
readable.resume()
}, 1000)
})
Метод readable.pause()
не оказывает никакого эффекта, если есть обработчик событий 'readable'
.
readable.pipe(destination[, options])
Добавлен в: v0.9.4
destination
<stream.Writable> Цель для записи данныхoptions
<Object> Параметры конвейераend
<boolean> Завершить запись, когда завершается чтение. По умолчанию:true
.
Возвращает: <stream.Writable> destination, что позволяет создавать цепочки конвейеров, если это поток
Duplex
илиTransform
Метод readable.pipe()
присоединяет поток Writable
к readable
, заставляя его автоматически переключаться в режим потока и передавать все свои данные в присоединенный поток Writable
. Поток данных будет автоматически управляться, чтобы поток назначения Writable
не был перегружен более быстрым потоком Readable
.
В следующем примере все данные из readable
записываются в файл с именем file.txt
:
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Все данные из readable записываются в 'file.txt'.
readable.pipe(writable)
Можно присоединять несколько потоков Writable
к одному потоку Readable
.
Метод readable.pipe()
возвращает ссылку на поток destination, что позволяет создавать цепочки связанных потоков:
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)
По умолчанию, stream.end()
вызывается в потоке назначения Writable
, когда исходный поток Readable
отправляет событие 'end'
, так что поток назначения больше не будет доступен для записи. Чтобы отключить это поведение по умолчанию, можно передать параметр end
со значением false
, что приведет к тому, что поток назначения останется открытым:
reader.pipe(writer, { end: false })
reader.on('end', () => {
writer.end('Goodbye\n')
})
Важно отметить, что если поток Readable
отправляет ошибку во время обработки, поток назначения Writable
не закрывается автоматически. Если возникает ошибка, необходимо вручную закрыть каждый поток, чтобы предотвратить утечки памяти.
Потоки Writable
process.stderr
и process.stdout
никогда не закрываются до завершения процесса Node.js, независимо от указанных параметров.
readable.read([size])
Добавлено в: v0.9.4
size
<number> Необязательный аргумент, указывающий, сколько данных нужно прочитать.- Возвращает: <string> | <Buffer> | <null> | <any>
Метод readable.read()
считывает данные из внутреннего буфера и возвращает их. Если доступных для чтения данных нет, возвращается null
. По умолчанию данные возвращаются в виде объекта Buffer
, если только кодировка не была указана с помощью метода readable.setEncoding()
или поток не работает в объектном режиме.
Необязательный аргумент size
указывает конкретное количество байтов для чтения. Если size
байтов недоступно для чтения, возвращается null
, за исключением случая завершения потока, в котором случае будут возвращены все оставшиеся данные во внутреннем буфере.
Если аргумент size
не указан, возвращаются все данные, содержащиеся во внутреннем буфере.
Аргумент size
должен быть меньше или равен 1 ГиБ.
Метод readable.read()
должен вызываться только для потоков Readable
, работающих в приостановленном режиме. В режиме потока readable.read()
вызывается автоматически, пока внутренний буфер полностью не опустошен.
const readable = getReadableStreamSomehow()
// 'readable' может срабатывать несколько раз по мере буферизации данных
readable.on('readable', () => {
let chunk
console.log('Поток доступен для чтения (новые данные получены в буфер)')
// Используем цикл, чтобы убедиться, что мы прочитали все доступные данные
while (null !== (chunk = readable.read())) {
console.log(`Прочитано ${chunk.length} байт данных...`)
}
})
// 'end' будет вызван один раз, когда больше не будет доступных данных
readable.on('end', () => {
console.log('Достигнут конец потока.')
})
Каждый вызов readable.read()
возвращает фрагмент данных или null
, что означает, что в данный момент больше нет данных для чтения. Эти фрагменты не объединяются автоматически. Поскольку один вызов read()
не возвращает все данные, для непрерывного чтения фрагментов до получения всех данных может потребоваться использование цикла while. При чтении большого файла .read()
может временно возвращать null
, указывая на то, что он использовал все буферизованное содержимое, но могут быть еще данные, которые еще не буферизованы. В таких случаях новое событие 'readable'
генерируется, как только в буфере появляются новые данные, а событие 'end'
сигнализирует о завершении передачи данных.
Поэтому для чтения всего содержимого файла из readable
необходимо собирать фрагменты из нескольких событий 'readable'
:
const chunks = []
readable.on('readable', () => {
let chunk
while (null !== (chunk = readable.read())) {
chunks.push(chunk)
}
})
readable.on('end', () => {
const content = chunks.join('')
})
Поток Readable
в объектном режиме всегда возвращает один элемент из вызова readable.read(size)
, независимо от значения аргумента size
.
Если метод readable.read()
возвращает фрагмент данных, событие 'data'
также будет генерироваться.
Вызов stream.read([size])
после того, как событие 'end'
было сгенерировано, вернет null
. Ошибка времени выполнения не будет вызвана.
readable.readable
Добавлен в: v11.4.0
Имеет значение true
, если безопасен вызов readable.read()
, что означает, что поток не был уничтожен и не выдал события 'error'
или 'end'
.
readable.readableAborted
Добавлен в: v16.8.0
[Стабильность: 1 - Экспериментальный]
Стабильность: 1 Стабильность: 1 - Экспериментальный
Возвращает, был ли поток уничтожен или произошла ошибка до выдачи события 'end'
.
readable.readableDidRead
Добавлен в: v16.7.0, v14.18.0
[Стабильность: 1 - Экспериментальный]
Стабильность: 1 Стабильность: 1 - Экспериментальный
Возвращает, было ли выдано событие 'data'
.
readable.readableEncoding
Добавлен в: v12.7.0
Геттер для свойства encoding
данного потока Readable
. Свойство encoding
может быть установлено с помощью метода readable.setEncoding()
.
readable.readableEnded
Добавлен в: v12.9.0
Принимает значение true
, когда выдаётся событие 'end'
.
readable.errored
Добавлен в: v18.0.0
Возвращает ошибку, если поток был уничтожен с ошибкой.
readable.readableFlowing
Добавлен в: v9.4.0
Это свойство отражает текущее состояние потока Readable
, как описано в разделе Три состояния.
readable.readableHighWaterMark
Добавлен в: v9.3.0
Возвращает значение highWaterMark
, переданное при создании этого Readable
.
readable.readableLength
Добавлен в: v9.4.0
Это свойство содержит количество байтов (или объектов) в очереди, готовых к чтению. Значение предоставляет данные для интроспекции относительно состояния highWaterMark
.
readable.readableObjectMode
Добавлен в: v12.3.0
Геттер для свойства objectMode
данного потока Readable
.
readable.resume()
[История]
Версия | Изменения |
---|---|
v10.0.0 | resume() не оказывает никакого эффекта, если есть слушатель события 'readable' . |
v0.9.4 | Добавлено в: v0.9.4 |
- Возвращает: <this>
Метод readable.resume()
заставляет явно приостановленный поток Readable
возобновить излучение событий 'data'
, переключая поток в режим потока.
Метод readable.resume()
может использоваться для полного потребления данных из потока без фактической обработки каких-либо из этих данных:
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Достигнут конец, но ничего не прочитано.')
})
Метод readable.resume()
не оказывает никакого эффекта, если есть слушатель события 'readable'
.
readable.setEncoding(encoding)
Добавлен в: v0.9.4
Метод readable.setEncoding()
устанавливает кодировку символов для данных, считываемых из потока Readable
.
По умолчанию кодировка не назначена, и данные потока будут возвращены в виде объектов Buffer
. Установка кодировки приводит к тому, что данные потока возвращаются в виде строк указанной кодировки, а не в виде объектов Buffer
. Например, вызов readable.setEncoding('utf8')
приведет к тому, что выходные данные будут интерпретироваться как данные UTF-8 и передаваться как строки. Вызов readable.setEncoding('hex')
приведет к кодированию данных в шестнадцатеричном строковом формате.
Поток Readable
будет правильно обрабатывать многобайтовые символы, передаваемые через поток, которые в противном случае были бы неправильно декодированы, если бы просто извлекались из потока как объекты Buffer
.
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
assert.equal(typeof chunk, 'string')
console.log('Получено %d символов строковых данных:', chunk.length)
})
readable.unpipe([destination])
Добавлено в: v0.9.4
destination
<stream.Writable> Необязательный конкретный поток для отключения- Возвращает: <this>
Метод readable.unpipe()
отсоединяет поток Writable
, ранее подключенный с помощью метода stream.pipe()
.
Если destination
не указан, то отсоединяются все каналы.
Если destination
указан, но для него не установлен канал, метод ничего не делает.
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Все данные из readable поступают в 'file.txt',
// но только в течение первой секунды.
readable.pipe(writable)
setTimeout(() => {
console.log('Прекращение записи в file.txt.')
readable.unpipe(writable)
console.log('Ручное закрытие потока файла.')
writable.end()
}, 1000)
readable.unshift(chunk[, encoding])
[История]
Версия | Изменения |
---|---|
v22.0.0, v20.13.0 | Аргумент chunk теперь может быть экземпляром TypedArray или DataView . |
v8.0.0 | Аргумент chunk теперь может быть экземпляром Uint8Array . |
v0.9.11 | Добавлено в: v0.9.11 |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Фрагмент данных для добавления в начало очереди чтения. Для потоков, не работающих в объективном режиме,chunk
должен быть <string>, <Buffer>, <TypedArray>, <DataView> илиnull
. Для потоков в объективном режимеchunk
может быть любым значением JavaScript.encoding
<string> Кодировка строковых фрагментов. Должна быть допустимой кодировкойBuffer
, например,'utf8'
или'ascii'
.
Передача chunk
как null
сигнализирует о конце потока (EOF) и ведет себя так же, как readable.push(null)
, после чего больше данных записать нельзя. Сигнал EOF помещается в конец буфера, и все буферизованные данные все равно будут отправлены.
Метод readable.unshift()
добавляет фрагмент данных в начало внутреннего буфера. Это полезно в некоторых ситуациях, когда поток потребляется кодом, которому нужно «отменить потребление» некоторого количества данных, которые он оптимистично извлек из источника, чтобы эти данные можно было передать другой стороне.
Метод stream.unshift(chunk)
нельзя вызывать после того, как событие 'end'
было отправлено, иначе будет выброшено исключение времени выполнения.
Разработчикам, использующим stream.unshift()
, часто следует рассмотреть возможность перехода на использование потока Transform
вместо этого. Дополнительные сведения см. в разделе API для разработчиков потоков.
// Извлечь заголовок, ограниченный \n\n.
// Использовать unshift(), если получим слишком много.
// Вызвать обратный вызов с (ошибка, заголовок, поток).
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
stream.on('error', callback)
stream.on('readable', onReadable)
const decoder = new StringDecoder('utf8')
let header = ''
function onReadable() {
let chunk
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk)
if (str.includes('\n\n')) {
// Найдена граница заголовка.
const split = str.split(/\n\n/)
header += split.shift()
const remaining = split.join('\n\n')
const buf = Buffer.from(remaining, 'utf8')
stream.removeListener('error', callback)
// Удалить прослушиватель 'readable' перед unshifting.
stream.removeListener('readable', onReadable)
if (buf.length) stream.unshift(buf)
// Теперь тело сообщения можно прочитать из потока.
callback(null, header, stream)
return
}
// Продолжается чтение заголовка.
header += str
}
}
}
В отличие от stream.push(chunk)
, stream.unshift(chunk)
не завершит процесс чтения, сбросив внутреннее состояние чтения потока. Это может привести к неожиданным результатам, если readable.unshift()
вызывается во время чтения (т. е. из реализации stream._read()
в пользовательском потоке). Вызов readable.unshift()
с последующим немедленным вызовом stream.push('')
правильно сбросит состояние чтения, однако лучше просто избегать вызова readable.unshift()
, находясь в процессе чтения.
readable.wrap(stream)
Добавлено в: v0.9.4
До Node.js 0.10 потоки не реализовывали весь API модуля node:stream
в том виде, в котором он определен сейчас. (См. Совместимость для получения дополнительной информации.)
При использовании старой библиотеки Node.js, которая генерирует события 'data'
и имеет метод stream.pause()
, который является только рекомендательным, метод readable.wrap()
может использоваться для создания потока Readable
, который использует старый поток в качестве источника данных.
В редких случаях потребуется использовать readable.wrap()
, но этот метод был предоставлен для удобства взаимодействия со старыми приложениями и библиотеками Node.js.
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)
myReader.on('readable', () => {
myReader.read() // и т.д.
})
readable[Symbol.asyncIterator]()
[История]
Версия | Изменения |
---|---|
v11.14.0 | Поддержка Symbol.asyncIterator больше не является экспериментальной. |
v10.0.0 | Добавлено в: v10.0.0 |
- Возвращает: <AsyncIterator> для полного потребления потока.
const fs = require('node:fs')
async function print(readable) {
readable.setEncoding('utf8')
let data = ''
for await (const chunk of readable) {
data += chunk
}
console.log(data)
}
print(fs.createReadStream('file')).catch(console.error)
Если цикл завершается с помощью break
, return
или throw
, поток будет уничтожен. Иными словами, итерация по потоку полностью его потребляет. Поток будет считываться фрагментами размером, равным параметру highWaterMark
. В приведенном выше примере кода данные будут находиться в одном фрагменте, если файл содержит менее 64 КБ данных, поскольку параметр highWaterMark
не указан для fs.createReadStream()
.
readable[Symbol.asyncDispose]()
Добавлено в: v20.4.0, v18.18.0
[Стабильность: 1 - Экспериментальный]
Стабильность: 1 Стабильность: 1 - Экспериментальный
Вызывает readable.destroy()
с AbortError
и возвращает промис, который выполняется, когда поток завершен.
readable.compose(stream[, options])
Добавлено в: v19.1.0, v18.13.0
[Стабильность: 1 - Экспериментальный]
Стабильность: 1 Стабильность: 1 - Экспериментальный
stream
<Поток> | <Итерируемый объект> | <Асинхронный итерируемый объект> | <Функция>options
<Объект>signal
<AbortSignal> позволяет уничтожить поток, если сигнал прерван.
Возвращает: <Duplex> поток, составленный с потоком
stream
.
import { Readable } from 'node:stream'
async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ')
for (const word of words) {
yield word
}
}
}
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()
console.log(words) // выведет ['this', 'is', 'compose', 'as', 'operator']
См. stream.compose
для получения дополнительной информации.
readable.iterator([options])
Добавлено в: v16.3.0
[Стабильность: 1 - Экспериментальный]
Стабильность: 1 Стабильность: 1 - Экспериментальный
options
<Объект>destroyOnReturn
<булево> Если установлено значениеfalse
, вызовreturn
в асинхронном итераторе или выход из итерацииfor await...of
с помощьюbreak
,return
илиthrow
не уничтожит поток. По умолчанию:true
.
Возвращает: <Асинхронный итератор> для потребления потока.
Итератор, созданный этим методом, предоставляет пользователям возможность отменить уничтожение потока, если цикл for await...of
завершен с помощью return
, break
или throw
, или если итератор должен уничтожить поток, если поток выдал ошибку во время итерации.
const { Readable } = require('node:stream')
async function printIterator(readable) {
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk) // 1
break
}
console.log(readable.destroyed) // false
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk) // Выведет 2, а затем 3
}
console.log(readable.destroyed) // True, поток был полностью потреблён
}
async function printSymbolAsyncIterator(readable) {
for await (const chunk of readable) {
console.log(chunk) // 1
break
}
console.log(readable.destroyed) // true
}
async function showBoth() {
await printIterator(Readable.from([1, 2, 3]))
await printSymbolAsyncIterator(Readable.from([1, 2, 3]))
}
showBoth()
readable.map(fn[, options])
[История]
Версия | Изменения |
---|---|
v20.7.0, v18.19.0 | добавлен highWaterMark в options. |
v17.4.0, v16.14.0 | Добавлено в: v17.4.0, v16.14.0 |
[Стабильность: 1 - Экспериментально]
Стабильность: 1 Стабильность: 1 - Экспериментально
fn
<Function> | <AsyncFunction> функция для отображения каждого фрагмента в потоке.data
<any> фрагмент данных из потока.options
<Object>signal
<AbortSignal> прерывается, если поток уничтожается, что позволяет прервать вызовfn
раньше времени.
options
<Object>concurrency
<number> максимальное количество одновременных вызововfn
в потоке. По умолчанию:1
.highWaterMark
<number> сколько элементов буферизовать в ожидании потребления пользователем отображаемых элементов. По умолчанию:concurrency * 2 - 1
.signal
<AbortSignal> позволяет уничтожить поток, если сигнал прерван.
Возвращает: <Readable> поток, отображенный с помощью функции
fn
.
Этот метод позволяет отображать поток. Функция fn
будет вызываться для каждого фрагмента в потоке. Если функция fn
возвращает promise — этот promise будет await
ed перед передачей в результирующий поток.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// С синхронным отображателем.
for await (const chunk of Readable.from([1, 2, 3, 4]).map(x => x * 2)) {
console.log(chunk) // 2, 4, 6, 8
}
// С асинхронным отображателем, выполняя не более 2 запросов одновременно.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
domain => resolver.resolve4(domain),
{ concurrency: 2 }
)
for await (const result of dnsResults) {
console.log(result) // Выводит результат DNS resolver.resolve4.
}
readable.filter(fn[, options])
[История]
Версия | Изменения |
---|---|
v20.7.0, v18.19.0 | Добавлен highWaterMark в options. |
v17.4.0, v16.14.0 | Добавлено в: v17.4.0, v16.14.0 |
[Стабильность: 1 - Экспериментальный]
Стабильность: 1 Стабильность: 1 - Экспериментальный
fn
<Функция> | <AsyncFunction> функция для фильтрации фрагментов из потока.data
<любой> фрагмент данных из потока.options
<Объект>signal
<AbortSignal> прерывается, если поток уничтожен, что позволяет прервать вызовfn
раньше времени.
options
<Объект>concurrency
<число> максимальное количество одновременных вызововfn
для потока. По умолчанию:1
.highWaterMark
<число> сколько элементов буферизовать в ожидании потребления отфильтрованных элементов пользователем. По умолчанию:concurrency * 2 - 1
.signal
<AbortSignal> позволяет уничтожить поток, если сигнал прерван.
Возвращает: <Readable> поток, отфильтрованный с помощью предиката
fn
.
Этот метод позволяет фильтровать поток. Для каждого фрагмента в потоке будет вызвана функция fn
, и если она возвращает истинное значение, фрагмент будет передан в результирующий поток. Если функция fn
возвращает promise, этот promise будет await
ed.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// С синхронным предикатом.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
console.log(chunk) // 3, 4
}
// С асинхронным предикатом, выполняя максимум 2 запроса одновременно.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).filter(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address.ttl > 60
},
{ concurrency: 2 }
)
for await (const result of dnsResults) {
// Выводит домены с временем жизни записи DNS более 60 секунд.
console.log(result)
}
readable.forEach(fn[, options])
Добавлено в: v17.5.0, v16.15.0
[Стабильно: 1 - Экспериментально]
Стабильно: 1 Стабильность: 1 - Экспериментально
fn
<Function> | <AsyncFunction> функция, вызываемая для каждого фрагмента потока.data
<any> фрагмент данных из потока.options
<Object>signal
<AbortSignal> прерывается, если поток уничтожен, что позволяет прервать вызовfn
раньше времени.
options
<Object>concurrency
<number> максимальное количество одновременных вызововfn
для потока. По умолчанию:1
.signal
<AbortSignal> позволяет уничтожить поток, если сигнал прерван.
Возвращает: <Promise> промис, который завершается, когда поток завершен.
Этот метод позволяет итерировать поток. Для каждого фрагмента в потоке будет вызвана функция fn
. Если функция fn
возвращает промис, этот промис будет await
ed.
Этот метод отличается от циклов for await...of
тем, что он может обрабатывать фрагменты одновременно. Кроме того, итерация forEach
может быть остановлена только путем передачи параметра signal
и прерывания связанного AbortController
, тогда как for await...of
может быть остановлена с помощью break
или return
. В любом случае поток будет уничтожен.
Этот метод отличается от прослушивания события 'data'
тем, что он использует событие readable
в базовом механизме и может ограничивать количество одновременных вызовов fn
.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// С синхронным предикатом.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
console.log(chunk) // 3, 4
}
// С асинхронным предикатом, выполняя максимум 2 запроса одновременно.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address
},
{ concurrency: 2 }
)
await dnsResults.forEach(result => {
// Выводит результат, аналогично `for await (const result of dnsResults)`
console.log(result)
})
console.log('done') // Поток завершен
readable.toArray([options])
Добавлено в: v17.5.0, v16.15.0
[Стабильность: 1 - Экспериментальный]
Стабильность: 1 Стабильность: 1 - Экспериментальный
options
<Object>signal
<AbortSignal> позволяет отменить операцию toArray, если сигнал прерывается.
Возвращает: <Promise> promise, содержащий массив с содержимым потока.
Этот метод позволяет легко получить содержимое потока.
Поскольку этот метод считывает весь поток в память, он сводит на нет преимущества потоков. Он предназначен для обеспечения совместимости и удобства, а не в качестве основного способа использования потоков.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
await Readable.from([1, 2, 3, 4]).toArray() // [1, 2, 3, 4]
// Выполняем запросы DNS одновременно с помощью .map и собираем
// результаты в массив с помощью toArray
const dnsResults = await Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org'])
.map(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address
},
{ concurrency: 2 }
)
.toArray()
readable.some(fn[, options])
Добавлено в: v17.5.0, v16.15.0
[Стабильность: 1 - Экспериментальный]
Стабильность: 1 Стабильность: 1 - Экспериментальный
fn
<Function> | <AsyncFunction> функция, вызываемая для каждого фрагмента потока.data
<any> фрагмент данных из потока.options
<Object>signal
<AbortSignal> прерывается, если поток уничтожается, что позволяет прервать вызовfn
раньше времени.
options
<Object>concurrency
<number> максимальное количество одновременных вызововfn
для потока одновременно. По умолчанию:1
.signal
<AbortSignal> позволяет уничтожить поток, если сигнал прерывается.
Возвращает: <Promise> promise, вычисляющийся в
true
, еслиfn
возвратил истинное значение хотя бы для одного из фрагментов.
Этот метод похож на Array.prototype.some
и вызывает fn
для каждого фрагмента в потоке, пока ожидаемое возвращаемое значение не станет true
(или любым истинным значением). Как только ожидаемое возвращаемое значение вызова fn
для фрагмента является истинным, поток уничтожается, и promise выполняется с true
. Если ни один из вызовов fn
для фрагментов не возвращает истинное значение, promise выполняется с false
.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// С синхронным предикатом.
await Readable.from([1, 2, 3, 4]).some(x => x > 2) // true
await Readable.from([1, 2, 3, 4]).some(x => x < 0) // false
// С асинхронным предикатом, выполняя не более 2 проверок файлов одновременно.
const anyBigFile = await Readable.from(['file1', 'file2', 'file3']).some(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
console.log(anyBigFile) // `true`, если любой файл в списке больше 1 МБ
console.log('done') // Поток завершен
readable.find(fn[, options])
Добавлено в: v17.5.0, v16.17.0
[Стабильно: 1 - Экспериментально]
Стабильно: 1 Стабильность: 1 - Экспериментально
fn
<Function> | <AsyncFunction> функция, вызываемая для каждого фрагмента потока.data
<any> фрагмент данных из потока.options
<Object>signal
<AbortSignal> прерывается, если поток уничтожается, что позволяет преждевременно прервать вызовfn
.
options
<Object>concurrency
<number> максимальное количество одновременных вызововfn
в потоке. По умолчанию:1
.signal
<AbortSignal> позволяет уничтожить поток, если сигнал прерван.
Возвращает: <Promise> promise, вычисляемый в первый фрагмент, для которого
fn
вычислил истинное значение, илиundefined
, если элемент не найден.
Этот метод аналогичен Array.prototype.find
и вызывает fn
для каждого фрагмента в потоке, чтобы найти фрагмент с истинным значением для fn
. Как только ожидаемое возвращаемое значение вызова fn
истинно, поток уничтожается, и promise выполняется со значением, для которого fn
вернул истинное значение. Если все вызовы fn
для фрагментов возвращают ложное значение, promise выполняется с undefined
.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// С синхронным предикатом.
await Readable.from([1, 2, 3, 4]).find(x => x > 2) // 3
await Readable.from([1, 2, 3, 4]).find(x => x > 0) // 1
await Readable.from([1, 2, 3, 4]).find(x => x > 10) // undefined
// С асинхронным предикатом, выполняя не более 2 проверок файлов одновременно.
const foundBigFile = await Readable.from(['file1', 'file2', 'file3']).find(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
console.log(foundBigFile) // Имя файла большого файла, если какой-либо файл в списке больше 1 МБ
console.log('done') // Поток завершен
readable.every(fn[, options])
Добавлено в: v17.5.0, v16.15.0
[Стабильно: 1 - Экспериментально]
Стабильно: 1 Стабильность: 1 - Экспериментально
fn
<Function> | <AsyncFunction> функция, вызываемая для каждого фрагмента потока.data
<any> фрагмент данных из потока.options
<Object>signal
<AbortSignal> прерывается, если поток уничтожен, что позволяет прервать вызовfn
раньше времени.
options
<Object>concurrency
<number> максимальное количество одновременных вызововfn
в потоке. По умолчанию:1
.signal
<AbortSignal> позволяет уничтожить поток, если сигнал прерывается.
Возвращает: <Promise> promise, вычисляющийся в
true
, еслиfn
вернул истинное значение для всех фрагментов.
Этот метод похож на Array.prototype.every
и вызывает fn
для каждого фрагмента в потоке, чтобы проверить, являются ли все ожидаемые возвращаемые значения истинными значениями для fn
. Как только возвращаемое значение вызова fn
для фрагмента является ложным, поток уничтожается, и promise выполняется с false
. Если все вызовы fn
для фрагментов возвращают истинное значение, promise выполняется с true
.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// С синхронным предикатом.
await Readable.from([1, 2, 3, 4]).every(x => x > 2) // false
await Readable.from([1, 2, 3, 4]).every(x => x > 0) // true
// С асинхронным предикатом, выполняя не более 2 проверок файлов одновременно.
const allBigFiles = await Readable.from(['file1', 'file2', 'file3']).every(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
// `true`, если все файлы в списке больше 1 MiB
console.log(allBigFiles)
console.log('done') // Поток завершен
readable.flatMap(fn[, options])
Добавлено в: v17.5.0, v16.15.0
[Стабильно: 1 - Экспериментально]
Стабильно: 1 Стабильность: 1 - Экспериментально
fn
<Function> | <AsyncGeneratorFunction> | <AsyncFunction> функция для отображения каждого фрагмента в потоке.data
<any> фрагмент данных из потока.options
<Object>signal
<AbortSignal> прерывается, если поток уничтожается, что позволяет прервать вызовfn
раньше времени.
options
<Object>concurrency
<number> максимальное количество одновременных вызововfn
в потоке. По умолчанию:1
.signal
<AbortSignal> позволяет уничтожить поток, если сигнал прерывается.
Возвращает: <Readable> поток, преобразованный с помощью функции
fn
.
Этот метод возвращает новый поток, применяя заданный обратный вызов к каждому фрагменту потока, а затем выравнивая результат.
Можно возвращать поток или другую итерируемую или асинхронную итерируемую сущность из fn
, и результирующие потоки будут объединены (выровнены) в возвращаемый поток.
import { Readable } from 'node:stream'
import { createReadStream } from 'node:fs'
// С синхронным отображателем.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap(x => [x, x])) {
console.log(chunk) // 1, 1, 2, 2, 3, 3, 4, 4
}
// С асинхронным отображателем, объединяем содержимое 4 файлов
const concatResult = Readable.from(['./1.mjs', './2.mjs', './3.mjs', './4.mjs']).flatMap(fileName =>
createReadStream(fileName)
)
for await (const result of concatResult) {
// Это будет содержать содержимое (все фрагменты) всех 4 файлов
console.log(result)
}
readable.drop(limit[, options])
Добавлено в: v17.5.0, v16.15.0
[Стабильность: 1 - Экспериментально]
Стабильность: 1 Стабильность: 1 - Экспериментально
limit
<number> количество фрагментов, которые нужно пропустить в потоке чтения.options
<Object>signal
<AbortSignal> позволяет уничтожить поток, если сигнал прерван.
Возвращает: <Readable> поток с пропущенными
limit
фрагментами.
Этот метод возвращает новый поток с пропущенными первыми limit
фрагментами.
import { Readable } from 'node:stream'
await Readable.from([1, 2, 3, 4]).drop(2).toArray() // [3, 4]
readable.take(limit[, options])
Добавлено в: v17.5.0, v16.15.0
[Стабильность: 1 - Экспериментально]
Стабильность: 1 Стабильность: 1 - Экспериментально
limit
<number> количество фрагментов, которые нужно взять из потока чтения.options
<Object>signal
<AbortSignal> позволяет уничтожить поток, если сигнал прерван.
Возвращает: <Readable> поток с взятыми
limit
фрагментами.
Этот метод возвращает новый поток с первыми limit
фрагментами.
import { Readable } from 'node:stream'
await Readable.from([1, 2, 3, 4]).take(2).toArray() // [1, 2]
readable.reduce(fn[, initial[, options]])
Добавлено в: v17.5.0, v16.15.0
[Стабильность: 1 - Экспериментально]
Стабильность: 1 Стабильность: 1 - Экспериментально
fn
<Function> | <AsyncFunction> функция-редуктор, вызываемая для каждого фрагмента в потоке.previous
<any> значение, полученное из последнего вызоваfn
или значениеinitial
, если оно указано, или первый фрагмент потока в противном случае.data
<any> фрагмент данных из потока.options
<Object>signal
<AbortSignal> прерывается, если поток уничтожен, что позволяет прервать вызовfn
раньше времени.
initial
<any> начальное значение для использования в редукции.options
<Object>signal
<AbortSignal> позволяет уничтожить поток, если сигнал прерван.
Возвращает: <Promise> промис для конечного значения редукции.
Этот метод вызывает fn
для каждого фрагмента потока по порядку, передавая ему результат вычисления предыдущего элемента. Он возвращает промис для конечного значения редукции.
Если значение initial
не указано, в качестве начального значения используется первый фрагмент потока. Если поток пуст, промис отклоняется с TypeError
со свойством кода ERR_INVALID_ARGS
.
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'
const directoryPath = './src'
const filesInDir = await readdir(directoryPath)
const folderSize = await Readable.from(filesInDir).reduce(async (totalSize, file) => {
const { size } = await stat(join(directoryPath, file))
return totalSize + size
}, 0)
console.log(folderSize)
Функция-редуктор итерирует элементы потока по одному, что означает, что нет параметра concurrency
или параллелизма. Для выполнения reduce
одновременно можно извлечь асинхронную функцию в метод readable.map
.
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'
const directoryPath = './src'
const filesInDir = await readdir(directoryPath)
const folderSize = await Readable.from(filesInDir)
.map(file => stat(join(directoryPath, file)), { concurrency: 2 })
.reduce((totalSize, { size }) => totalSize + size, 0)
console.log(folderSize)
Дуплексные и преобразующие потоки
Класс: stream.Duplex
[История]
Версия | Изменения |
---|---|
v6.8.0 | Экземпляры Duplex теперь возвращают true при проверке instanceof stream.Writable . |
v0.9.4 | Добавлено в: v0.9.4 |
Дуплексные потоки — это потоки, реализующие как интерфейс Readable
, так и Writable
.
Примеры Duplex
потоков:
duplex.allowHalfOpen
Добавлен в: v0.9.4
Если false
, то поток автоматически завершит запись, когда завершится чтение. Изначально устанавливается с помощью параметра конструктора allowHalfOpen
, который по умолчанию имеет значение true
.
Это можно изменить вручную, чтобы изменить поведение полуоткрытого соединения существующего экземпляра потока Duplex
, но это должно быть изменено до того, как будет вызвано событие 'end'
.
Класс: stream.Transform
Добавлен в: v0.9.4
Преобразующие потоки — это Duplex
потоки, где вывод каким-либо образом связан с вводом. Как и все Duplex
потоки, Transform
потоки реализуют как интерфейс Readable
, так и Writable
.
Примеры Transform
потоков:
transform.destroy([error])
[История]
Версия | Изменения |
---|---|
v14.0.0 | Работает как no-op в потоке, который уже был уничтожен. |
v8.0.0 | Добавлено в: v8.0.0 |
Уничтожает поток и, при необходимости, генерирует событие 'error'
. После этого вызова преобразующий поток освободит все внутренние ресурсы. Разработчики не должны переопределять этот метод, а вместо этого реализовать readable._destroy()
. Реализация по умолчанию _destroy()
для Transform
также генерирует 'close'
, если emitClose
не установлен в false
.
После вызова destroy()
любые дальнейшие вызовы будут являться no-op, и никакие другие ошибки, кроме ошибок из _destroy()
, не будут генерироваться как 'error'
.
stream.duplexPair([options])
Добавлено в: v22.6.0, v20.17.0
options
<Object> Значение, передаваемое обоим конструкторамDuplex
для установки параметров, таких как буферизация.- Возвращает: <Array> из двух экземпляров
Duplex
.
Вспомогательная функция duplexPair
возвращает массив из двух элементов, каждый из которых представляет собой поток Duplex
, подключенный к другой стороне:
const [sideA, sideB] = duplexPair()
Все, что записывается в один поток, становится доступным для чтения в другом. Он обеспечивает поведение, аналогичное сетевому соединению, где данные, записанные клиентом, становятся доступными для чтения сервером и наоборот.
Потоки Duplex симметричны; один или другой может использоваться без каких-либо различий в поведении.
stream.finished(stream[, options], callback)
[История]
Версия | Изменения |
---|---|
v19.5.0 | Добавлена поддержка ReadableStream и WritableStream . |
v15.11.0 | Добавлен параметр signal . |
v14.0.0 | finished(stream, cb) будет ожидать события 'close' перед вызовом обратного вызова. Реализация пытается обнаружить устаревшие потоки и применять это поведение только к потокам, которые, как ожидается, будут генерировать событие 'close' . |
v14.0.0 | Генерация события 'close' до 'end' в потоке Readable вызовет ошибку ERR_STREAM_PREMATURE_CLOSE . |
v14.0.0 | Обратный вызов будет вызван для потоков, которые уже завершены до вызова finished(stream, cb) . |
v10.0.0 | Добавлено в: v10.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> Читаемый и/или записываемый поток/веб-поток.options
<Object>error
<boolean> Если установлено вfalse
, то вызовemit('error', err)
не рассматривается как завершенный. По умолчанию:true
.readable
<boolean> Если установлено вfalse
, обратный вызов будет вызван при завершении потока, даже если поток может быть еще доступен для чтения. По умолчанию:true
.writable
<boolean> Если установлено вfalse
, обратный вызов будет вызван при завершении потока, даже если поток может быть еще доступен для записи. По умолчанию:true
.signal
<AbortSignal> позволяет прервать ожидание завершения потока. Базовый поток не будет прерван, если сигнал будет прерван. Обратный вызов будет вызван сAbortError
. Все зарегистрированные слушатели, добавленные этой функцией, также будут удалены.
callback
<Function> Функция обратного вызова, которая принимает необязательный аргумент ошибки.Возвращает: <Function> Функция очистки, которая удаляет всех зарегистрированных слушателей.
Функция для получения уведомления о том, что поток больше не доступен для чтения, записи или произошла ошибка или событие преждевременного закрытия.
const { finished } = require('node:stream')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
finished(rs, err => {
if (err) {
console.error('Поток завершился с ошибкой.', err)
} else {
console.log('Поток завершил чтение.')
}
})
rs.resume() // Осушаем поток.
Особенно полезно в сценариях обработки ошибок, когда поток уничтожается преждевременно (например, прерванный HTTP-запрос) и не генерирует события 'end'
или 'finish'
.
API finished
предоставляет версию с promise.
stream.finished()
оставляет висячие обработчики событий (в частности, 'error'
, 'end'
, 'finish'
и 'close'
) после вызова callback
. Причина этого заключается в том, что непредвиденные события 'error'
(из-за неправильных реализаций потоков) не вызывают неожиданных сбоев. Если это нежелательное поведение, то необходимо вызвать возвращаемую функцию очистки в обратном вызове:
const cleanup = finished(rs, err => {
cleanup()
// ...
})
stream.pipeline(source[, ...transforms], destination, callback)
stream.pipeline(streams, callback)
[История]
Версия | Изменения |
---|---|
v19.7.0, v18.16.0 | Добавлена поддержка webstreams. |
v18.0.0 | Передача недопустимого обратного вызова в аргумент callback теперь вызывает ERR_INVALID_ARG_TYPE вместо ERR_INVALID_CALLBACK . |
v14.0.0 | pipeline(..., cb) будет ждать события 'close' , прежде чем вызывать обратный вызов. Реализация пытается обнаружить устаревшие потоки и применять это поведение только к потокам, которые, как ожидается, будут генерировать событие 'close' . |
v13.10.0 | Добавлена поддержка асинхронных генераторов. |
v10.0.0 | Добавлено в: v10.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function> | <ReadableStream>- Возвращает: <Iterable> | <AsyncIterable>
...transforms
<Stream> | <Function> | <TransformStream>source
<AsyncIterable>- Возвращает: <AsyncIterable>
destination
<Stream> | <Function> | <WritableStream>source
<AsyncIterable>- Возвращает: <AsyncIterable> | <Promise>
callback
<Function> Вызывается, когда конвейер полностью завершен.err
<Error>val
Значение, разрешенноеPromise
, возвращенноеdestination
.
Возвращает: <Stream>
Метод модуля для передачи данных между потоками и генераторами, перенаправляющий ошибки, обеспечивающий правильную очистку и предоставляющий обратный вызов по завершении конвейера.
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// Используйте API pipeline для простого соединения последовательности потоков
// и получения уведомления о полном завершении конвейера.
// Конвейер для эффективной gzip-сжатия потенциально огромного tar-файла:
pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
if (err) {
console.error('Конвейер завершился с ошибкой.', err)
} else {
console.log('Конвейер завершен успешно.')
}
})
API pipeline
предоставляет версию с promise.
stream.pipeline()
вызовет stream.destroy(err)
для всех потоков, за исключением:
- Потоков
Readable
, которые выпустили событие'end'
или'close'
. - Потоков
Writable
, которые выпустили событие'finish'
или'close'
.
stream.pipeline()
оставляет висячие обработчики событий в потоках после вызова обратного вызова. В случае повторного использования потоков после сбоя это может привести к утечкам обработчиков событий и пропущенным ошибкам. Если последний поток является читаемым, висячие обработчики событий будут удалены, чтобы последний поток можно было использовать позже.
stream.pipeline()
закрывает все потоки, когда возникает ошибка. Использование IncomingRequest
с pipeline
может привести к неожиданному поведению, поскольку оно уничтожит сокет, не отправив ожидаемый ответ. См. пример ниже:
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt')
pipeline(fileStream, res, err => {
if (err) {
console.log(err) // Нет такого файла
// Это сообщение не может быть отправлено, так как `pipeline` уже уничтожил сокет
return res.end('error!!!')
}
})
})
stream.compose(...streams)
[История]
Версия | Изменения |
---|---|
v21.1.0, v20.10.0 | Добавлена поддержка класса stream. |
v19.8.0, v18.16.0 | Добавлена поддержка webstreams. |
v16.9.0 | Добавлено в: v16.9.0 |
[Стабильность: 1 - Экспериментально]
Стабильность: 1 Стабильность: 1 - stream.compose
является экспериментальным.
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]> | <Duplex[]> | <Function>- Возвращает: <stream.Duplex>
Объединяет два или более потока в поток Duplex
, который записывает в первый поток и считывает из последнего. Каждый предоставленный поток передаётся в следующий с использованием stream.pipeline
. Если в любом из потоков возникает ошибка, то все они уничтожаются, включая внешний поток Duplex
.
Поскольку stream.compose
возвращает новый поток, который, в свою очередь, может (и должен) быть передан в другие потоки, он позволяет использовать композицию. В отличие от этого, при передаче потоков в stream.pipeline
, обычно первый поток является читаемым потоком, а последний — записываемым, образуя замкнутую цепь.
Если передана функция Function
, она должна быть методом-фабрикой, принимающим source
Iterable
.
import { compose, Transform } from 'node:stream'
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''))
},
})
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
}
let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf
}
console.log(res) // выведет 'HELLOWORLD'
stream.compose
может использоваться для преобразования асинхронных итерируемых объектов, генераторов и функций в потоки.
AsyncIterable
преобразуется в читаемыйDuplex
. Не может возвращатьnull
.AsyncGeneratorFunction
преобразуется в читаемый/записываемый transformDuplex
. Должен принимать исходныйAsyncIterable
в качестве первого параметра. Не может возвращатьnull
.AsyncFunction
преобразуется в записываемыйDuplex
. Должен возвращать либоnull
, либоundefined
.
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'
// Преобразование AsyncIterable в читаемый Duplex.
const s1 = compose(
(async function* () {
yield 'Hello'
yield 'World'
})()
)
// Преобразование AsyncGenerator в transform Duplex.
const s2 = compose(async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
})
let res = ''
// Преобразование AsyncFunction в записываемый Duplex.
const s3 = compose(async function (source) {
for await (const chunk of source) {
res += chunk
}
})
await finished(compose(s1, s2, s3))
console.log(res) // выведет 'HELLOWORLD'
См. readable.compose(stream)
для stream.compose
как оператора.
stream.Readable.from(iterable[, options])
Добавлено в: v12.3.0, v10.17.0
iterable
<Iterable> Объект, реализующий протокол итерируемого объектаSymbol.asyncIterator
илиSymbol.iterator
. Генерирует событие 'error', если передано значение null.options
<Object> Параметры, предоставляемыеnew stream.Readable([options])
. По умолчаниюReadable.from()
установитoptions.objectMode
вtrue
, если это явно не отменено установкойoptions.objectMode
вfalse
.- Возвращает: <stream.Readable>
Вспомогательный метод для создания читаемых потоков из итераторов.
const { Readable } = require('node:stream')
async function* generate() {
yield 'hello'
yield 'streams'
}
const readable = Readable.from(generate())
readable.on('data', chunk => {
console.log(chunk)
})
Вызов Readable.from(string)
или Readable.from(buffer)
не будет итерировать строки или буферы для соответствия семантике других потоков по причинам производительности.
Если в качестве аргумента передаётся объект Iterable
, содержащий промисы, это может привести к необработанному отклонению.
const { Readable } = require('node:stream')
Readable.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Необработанное отклонение
])
stream.Readable.fromWeb(readableStream[, options])
Добавлено в: v17.0.0
[Стабильность: 1 - Экспериментально]
Стабильность: 1 Стабильность: 1 - Экспериментально
readableStream
<ReadableStream>options
<Object>encoding
<string>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
Возвращает: <stream.Readable>
stream.Readable.isDisturbed(stream)
Добавлен в: v16.8.0
[Стабильность: 1 — Экспериментальный]
Стабильность: 1 Стабильность: 1 — Экспериментальный
stream
<stream.Readable> | <ReadableStream>- Возвращает:
boolean
Возвращает, был ли поток прочитан или отменен.
stream.isErrored(stream)
Добавлен в: v17.3.0, v16.14.0
[Стабильность: 1 — Экспериментальный]
Стабильность: 1 Стабильность: 1 — Экспериментальный
stream
<Readable> | <Writable> | <Duplex> | <WritableStream> | <ReadableStream>- Возвращает: <boolean>
Возвращает, произошла ли ошибка в потоке.
stream.isReadable(stream)
Добавлен в: v17.4.0, v16.14.0
[Стабильность: 1 — Экспериментальный]
Стабильность: 1 Стабильность: 1 — Экспериментальный
stream
<Readable> | <Duplex> | <ReadableStream>- Возвращает: <boolean>
Возвращает, является ли поток читаемым.
stream.Readable.toWeb(streamReadable[, options])
Добавлен в: v17.0.0
[Стабильность: 1 — Экспериментальный]
Стабильность: 1 Стабильность: 1 — Экспериментальный
streamReadable
<stream.Readable>options
<Object>strategy
<Object>highWaterMark
<number> Максимальный размер внутренней очереди (созданногоReadableStream
) перед применением противодавления при чтении из заданногоstream.Readable
. Если значение не указано, оно будет взято из заданногоstream.Readable
.size
<Function> Функция, определяющая размер заданного фрагмента данных. Если значение не указано, размер будет равен1
для всех фрагментов.chunk
<any>- Возвращает: <number>
Возвращает: <ReadableStream>
stream.Writable.fromWeb(writableStream[, options])
Добавлено в: v17.0.0
[Стабильность: 1 - Экспериментальный]
Стабильность: 1 Стабильность: 1 - Экспериментальный
writableStream
<WritableStream>options
<Object>decodeStrings
<boolean>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
Возвращает: <stream.Writable>
stream.Writable.toWeb(streamWritable)
Добавлено в: v17.0.0
[Стабильность: 1 - Экспериментальный]
Стабильность: 1 Стабильность: 1 - Экспериментальный
streamWritable
<stream.Writable>- Возвращает: <WritableStream>
stream.Duplex.from(src)
[История]
Версия | Изменения |
---|---|
v19.5.0, v18.17.0 | Аргумент src теперь может быть ReadableStream или WritableStream . |
v16.8.0 | Добавлено в: v16.8.0 |
src
<Stream> | <Blob> | <ArrayBuffer> | <string> | <Iterable> | <AsyncIterable> | <AsyncGeneratorFunction> | <AsyncFunction> | <Promise> | <Object> | <ReadableStream> | <WritableStream>
Вспомогательный метод для создания дуплексных потоков.
Stream
преобразует выходной поток в выходнойDuplex
и входной поток вDuplex
.Blob
преобразуется во входнойDuplex
.string
преобразуется во входнойDuplex
.ArrayBuffer
преобразуется во входнойDuplex
.AsyncIterable
преобразуется во входнойDuplex
. Не может возвращатьnull
.AsyncGeneratorFunction
преобразуется во входной/выходной transformDuplex
. Должен принимать исходныйAsyncIterable
в качестве первого параметра. Не может возвращатьnull
.AsyncFunction
преобразуется в выходнойDuplex
. Должен возвращать либоnull
, либоundefined
.Object ({ writable, readable })
преобразуетreadable
иwritable
вStream
, а затем объединяет их вDuplex
, гдеDuplex
будет записывать вwritable
и читать изreadable
.Promise
преобразуется во входнойDuplex
. Значениеnull
игнорируется.ReadableStream
преобразуется во входнойDuplex
.WritableStream
преобразуется в выходнойDuplex
.- Возвращает: <stream.Duplex>
Если в качестве аргумента передаётся объект Iterable
, содержащий промисы, это может привести к необработанному отклонению.
const { Duplex } = require('node:stream')
Duplex.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Необработанное отклонение
])
stream.Duplex.fromWeb(pair[, options])
Добавлено в: v17.0.0
[Стабильность: 1 - Экспериментально]
Стабильность: 1 Стабильность: 1 - Экспериментально
pair
<Object>readable
<ReadableStream>writable
<WritableStream>
options
<Object>Возвращает: <stream.Duplex>
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
for await (const chunk of duplex) {
console.log('readable', chunk)
}
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))
stream.Duplex.toWeb(streamDuplex)
Добавлено в: v17.0.0
[Стабильно: 1 - Экспериментально]
Стабильно: 1 Стабильность: 1 - Экспериментально
streamDuplex
<stream.Duplex>- Возвращает: <Object>
readable
<ReadableStream>writable
<WritableStream>
import { Duplex } from 'node:stream'
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
const { value } = await readable.getReader().read()
console.log('readable', value)
const { Duplex } = require('node:stream')
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
readable
.getReader()
.read()
.then(result => {
console.log('readable', result.value)
})
stream.addAbortSignal(signal, stream)
[История]
Версия | Изменения |
---|---|
v19.7.0, v18.16.0 | Добавлена поддержка ReadableStream и WritableStream . |
v15.4.0 | Добавлено в: v15.4.0 |
signal
<AbortSignal> Сигнал, представляющий возможное отключениеstream
<Stream> | <ReadableStream> | <WritableStream> Поток, к которому нужно прикрепить сигнал.
Прикрепляет AbortSignal к читаемому или записываемому потоку. Это позволяет коду управлять уничтожением потока с помощью AbortController
.
Вызов abort
для AbortController
, соответствующего переданному AbortSignal
, будет работать так же, как вызов .destroy(new AbortError())
для потока, и controller.error(new AbortError())
для веб-потоков.
const fs = require('node:fs')
const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// Позже прервите операцию, закрыв поток
controller.abort()
Или используя AbortSignal
с читаемым потоком как асинхронную итерируемую сущность:
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // установить таймаут
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
try {
for await (const chunk of stream) {
await process(chunk)
}
} catch (e) {
if (e.name === 'AbortError') {
// Операция была отменена
} else {
throw e
}
}
})()
Или используя AbortSignal
с ReadableStream:
const controller = new AbortController()
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello')
controller.enqueue('world')
controller.close()
},
})
addAbortSignal(controller.signal, rs)
finished(rs, err => {
if (err) {
if (err.name === 'AbortError') {
// Операция была отменена
}
}
})
const reader = rs.getReader()
reader.read().then(({ value, done }) => {
console.log(value) // hello
console.log(done) // false
controller.abort()
})
stream.getDefaultHighWaterMark(objectMode)
Добавлен в: v19.9.0, v18.17.0
Возвращает значение default highWaterMark, используемое потоками. По умолчанию равно 65536
(64 KiB) или 16
для objectMode
.
stream.setDefaultHighWaterMark(objectMode, value)
Добавлен в: v19.9.0, v18.17.0
Устанавливает значение default highWaterMark, используемое потоками.
API для разработчиков потоков
API модуля node:stream
разработан таким образом, чтобы упростить реализацию потоков с использованием модели прототипного наследования JavaScript.
Сначала разработчик потока объявляет новый класс JavaScript, который расширяет один из четырех основных классов потоков (stream.Writable
, stream.Readable
, stream.Duplex
или stream.Transform
), обязательно вызывая соответствующий конструктор родительского класса:
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark })
// ...
}
}
При расширении потоков учитывайте, какие параметры пользователь может и должен предоставлять, прежде чем передавать их базовому конструктору. Например, если реализация делает предположения относительно параметров autoDestroy
и emitClose
, не позволяйте пользователю переопределять их. Явно указывайте, какие параметры передаются, вместо неявной передачи всех параметров.
Затем новый класс потока должен реализовать один или несколько конкретных методов в зависимости от типа создаваемого потока, как подробно описано в таблице ниже:
Случай использования | Класс | Метод(ы) для реализации |
---|---|---|
Только чтение | Readable | _read() |
Только запись | Writable | _write() , _writev() , _final() |
Чтение и запись | Duplex | _read() , _write() , _writev() , _final() |
Обработка записанных данных, затем чтение результата | Transform | _transform() , _flush() , _final() |
Код реализации потока никогда не должен вызывать "публичные" методы потока, предназначенные для использования потребителями (как описано в разделе API для потребителей потоков). Это может привести к нежелательным побочным эффектам в коде приложения, потребляющем поток.
Избегайте переопределения публичных методов, таких как write()
, end()
, cork()
, uncork()
, read()
и destroy()
, или генерации внутренних событий, таких как 'error'
, 'data'
, 'end'
, 'finish'
и 'close'
через .emit()
. Это может нарушить текущие и будущие инварианты потока, что приведет к проблемам с поведением и/или совместимостью с другими потоками, утилитами потоков и ожиданиями пользователей.
Упрощенное построение
Добавлено в: v1.2.0
Для многих простых случаев можно создавать поток без использования наследования. Этого можно достичь путем непосредственного создания экземпляров объектов stream.Writable
, stream.Readable
, stream.Duplex
или stream.Transform
и передачи соответствующих методов в качестве параметров конструктора.
const { Writable } = require('node:stream')
const myWritable = new Writable({
construct(callback) {
// Инициализация состояния и загрузка ресурсов...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Освобождение ресурсов...
},
})
Реализация потока записи
Класс stream.Writable
расширяется для реализации потока Writable
.
Пользовательские потоки Writable
должны вызывать конструктор new stream.Writable([options])
и реализовывать метод writable._write()
и/или writable._writev()
.
new stream.Writable([options])
[История]
Версия | Изменения |
---|---|
v22.0.0 | Увеличение значения highWaterMark по умолчанию. |
v15.5.0 | Поддержка передачи AbortSignal. |
v14.0.0 | Изменение значения параметра autoDestroy по умолчанию на true . |
v11.2.0, v10.16.0 | Добавление параметра autoDestroy для автоматического вызова destroy() потока при возникновении события 'finish' или ошибок. |
v10.0.0 | Добавление параметра emitClose для указания, должно ли событие 'close' генерироваться при уничтожении. |
options
<Object>highWaterMark
<number> Уровень буфера, при которомstream.write()
начинает возвращатьfalse
. По умолчанию:65536
(64 KiB) или16
для потоков сobjectMode
.decodeStrings
<boolean> Кодировать ли строки, переданные вstream.write()
, вBuffer
(с кодировкой, указанной в вызовеstream.write()
) перед передачей их вstream._write()
. Другие типы данных не преобразуются (т. е.Buffer
не декодируются в строки). Установка вfalse
предотвратит преобразование строк. По умолчанию:true
.defaultEncoding
<string> Кодировка по умолчанию, используемая, когда кодировка не указана в качестве аргумента дляstream.write()
. По умолчанию:'utf8'
.objectMode
<boolean> Является лиstream.write(anyObj)
допустимой операцией. При установке этого параметра становится возможным записывать значения JavaScript, отличные от строки, <Buffer>, <TypedArray> или <DataView>, если это поддерживается реализацией потока. По умолчанию:false
.emitClose
<boolean> Должен ли поток генерировать событие'close'
после того, как он был уничтожен. По умолчанию:true
.write
<Function> Реализация для методаstream._write()
.writev
<Function> Реализация для методаstream._writev()
.destroy
<Function> Реализация для методаstream._destroy()
.final
<Function> Реализация для методаstream._final()
.construct
<Function> Реализация для методаstream._construct()
.autoDestroy
<boolean> Должен ли этот поток автоматически вызывать.destroy()
сам по себе после завершения. По умолчанию:true
.signal
<AbortSignal> Сигнал, представляющий возможное отключение.
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor(options) {
// Вызывает конструктор stream.Writable().
super(options)
// ...
}
}
Или, при использовании конструкторов в стиле pre-ES6:
const { Writable } = require('node:stream')
const util = require('node:util')
function MyWritable(options) {
if (!(this instanceof MyWritable)) return new MyWritable(options)
Writable.call(this, options)
}
util.inherits(MyWritable, Writable)
Или, используя упрощенный подход к конструктору:
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
})
Вызов abort
для AbortController
, соответствующего переданному AbortSignal
, будет работать так же, как вызов .destroy(new AbortError())
для потока записи.
const { Writable } = require('node:stream')
const controller = new AbortController()
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
})
// Позже, прервать операцию, закрыв поток
controller.abort()
writable._construct(callback)
Добавлено в: v15.0.0
callback
<Function> Вызовите эту функцию (необязательно с аргументом ошибки), когда поток завершит инициализацию.
Метод _construct()
НИКОГДА не должен вызываться напрямую. Он может быть реализован дочерними классами, и если это так, он будет вызываться только внутренними методами класса Writable
.
Эта необязательная функция будет вызвана через такт после возврата конструктора потока, откладывая любые вызовы _write()
, _final()
и _destroy()
до тех пор, пока не будет вызван callback
. Это полезно для инициализации состояния или асинхронной инициализации ресурсов, прежде чем поток станет доступен для использования.
const { Writable } = require('node:stream')
const fs = require('node:fs')
class WriteStream extends Writable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, 'w', (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback)
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
writable._write(chunk, encoding, callback)
[История]
Версия | Изменения |
---|---|
v12.11.0 | _write() является необязательным при предоставлении _writev(). |
chunk
<Buffer> | <string> | <any>Buffer
, который будет записан, преобразованный из строки, переданной вstream.write()
. Если параметрdecodeStrings
потока имеет значениеfalse
или поток работает в объективном режиме, фрагмент не будет преобразован и будет тем, что было передано вstream.write()
.encoding
<string> Если фрагмент является строкой, тоencoding
— это кодировка этой строки. Если фрагмент — этоBuffer
или поток работает в объективном режиме,encoding
может игнорироваться.callback
<Function> Вызовите эту функцию (необязательно с аргументом ошибки), когда обработка предоставленного фрагмента будет завершена.
Все реализации потока Writable
должны предоставлять метод writable._write()
и/или writable._writev()
для отправки данных в базовый ресурс.
Потоки Transform
предоставляют свою собственную реализацию writable._write()
.
Эту функцию НИКОГДА не следует вызывать непосредственно кодом приложения. Она должна быть реализована дочерними классами и вызываться только внутренними методами класса Writable
.
Функция callback
должна вызываться синхронно внутри writable._write()
или асинхронно (т. е. в другом такте), чтобы сигнализировать о том, что запись завершилась успешно или завершилась с ошибкой. Первый аргумент, переданный в callback
, должен быть объектом Error
, если вызов завершился неудачей, или null
, если запись прошла успешно.
Все вызовы writable.write()
, которые происходят между моментом вызова writable._write()
и вызова callback
, приведут к буферизации записанных данных. Когда callback
вызывается, поток может генерировать событие 'drain'
. Если реализация потока способна обрабатывать несколько фрагментов данных одновременно, следует реализовать метод writable._writev()
.
Если свойство decodeStrings
явно установлено в false
в параметрах конструктора, то chunk
останется тем же объектом, который передается в .write()
, и может быть строкой, а не Buffer
. Это необходимо для поддержки реализаций, которые имеют оптимизированную обработку для определенных кодировок строковых данных. В этом случае аргумент encoding
будет указывать кодировку строки. В противном случае аргумент encoding
можно спокойно игнорировать.
Метод writable._write()
имеет префикс подчеркивания, поскольку он является внутренним для класса, который его определяет, и никогда не должен вызываться непосредственно пользовательскими программами.
writable._writev(chunks, callback)
chunks
<Object[]> Записываемые данные. Значение представляет собой массив <Object>, каждый из которых представляет отдельный фрагмент данных для записи. Свойства этих объектов:chunk
<Buffer> | <string> Экземпляр буфера или строка, содержащая записываемые данные.chunk
будет строкой, еслиWritable
был создан с параметромdecodeStrings
, установленным вfalse
, и вwrite()
была передана строка.encoding
<string> Кодировка символовchunk
. Еслиchunk
являетсяBuffer
,encoding
будет'buffer'
.
callback
<Function> Функция обратного вызова (необязательно с аргументом ошибки), которая вызывается по завершении обработки предоставленных фрагментов.
Эту функцию НИ В КОЕМ СЛУЧАЕ НЕ ДОЛЖЕН вызывать код приложения напрямую. Она должна быть реализована дочерними классами и вызываться только внутренними методами класса Writable
.
Метод writable._writev()
может быть реализован дополнительно или вместо writable._write()
в реализациях потоков, способных обрабатывать несколько фрагментов данных одновременно. Если он реализован и есть буферизованные данные из предыдущих записей, будет вызван _writev()
, а не _write()
.
Метод writable._writev()
имеет префикс подчеркивания, поскольку является внутренним для класса, который его определяет, и никогда не должен вызываться пользовательскими программами напрямую.
writable._destroy(err, callback)
Добавлено в: v8.0.0
err
<Error> Возможная ошибка.callback
<Function> Функция обратного вызова, принимающая необязательный аргумент ошибки.
Метод _destroy()
вызывается методом writable.destroy()
. Он может быть переопределен дочерними классами, но его нельзя вызывать напрямую.
writable._final(callback)
Добавлено в: v8.0.0
callback
<Function> Эта функция вызывается (необязательно с аргументом ошибки) после завершения записи оставшихся данных.
Метод _final()
не должен вызываться напрямую. Он может быть реализован дочерними классами, и если это так, то будет вызываться только внутренними методами класса Writable
.
Эта необязательная функция будет вызвана перед закрытием потока, откладывая событие 'finish'
до вызова callback
. Это полезно для закрытия ресурсов или записи буферизованных данных перед завершением потока.
Ошибки во время записи
Ошибки, возникающие во время обработки методов writable._write()
, writable._writev()
и writable._final()
, должны распространяться путем вызова обратного вызова и передачи ошибки в качестве первого аргумента. Выбрасывание Error
изнутри этих методов или ручное излучение события 'error'
приводит к неопределенному поведению.
Если поток Readable
перенаправляется в поток Writable
, когда Writable
излучает ошибку, поток Readable
будет отсоединен.
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
},
})
Пример потока writable
Следующий пример иллюстрирует довольно упрощенную (и несколько бессмысленную) реализацию пользовательского потока Writable
. Хотя этот конкретный экземпляр потока Writable
не имеет какой-либо реальной практической пользы, пример иллюстрирует каждый из необходимых элементов пользовательского экземпляра потока Writable
:
const { Writable } = require('node:stream')
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
}
}
Декодирование буферов в записываемом потоке
Декодирование буферов — распространенная задача, например, при использовании трансформаторов, входными данными которых является строка. Это нетривиальный процесс при использовании кодировки многобайтовых символов, таких как UTF-8. В следующем примере показано, как декодировать многобайтовые строки с помощью StringDecoder
и Writable
.
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')
class StringWritable extends Writable {
constructor(options) {
super(options)
this._decoder = new StringDecoder(options?.defaultEncoding)
this.data = ''
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk)
}
this.data += chunk
callback()
}
_final(callback) {
this.data += this._decoder.end()
callback()
}
}
const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()
w.write('currency: ')
w.write(euro[0])
w.end(euro[1])
console.log(w.data) // currency: €
Реализация читаемого потока
Класс stream.Readable
расширяется для реализации потока Readable
.
Пользовательские потоки Readable
должны вызывать конструктор new stream.Readable([options])
и реализовывать метод readable._read()
.
new stream.Readable([options])
[История]
Версия | Изменения |
---|---|
v22.0.0 | увеличение значения highWaterMark по умолчанию. |
v15.5.0 | поддержка передачи AbortSignal. |
v14.0.0 | изменение значения параметра autoDestroy по умолчанию на true . |
v11.2.0, v10.16.0 | добавление параметра autoDestroy для автоматического вызова destroy() потока при генерации события 'end' или ошибок. |
options
<Object>highWaterMark
<number> Максимальное количество байтов, которые можно хранить во внутреннем буфере, прежде чем прекратить чтение из базового ресурса. По умолчанию:65536
(64 KiB) или16
для потоковobjectMode
.encoding
<string> Если указано, буферы будут декодированы в строки с использованием указанной кодировки. По умолчанию:null
.objectMode
<boolean> Должен ли этот поток вести себя как поток объектов. Это означает, чтоstream.read(n)
возвращает одно значение вместоBuffer
размераn
. По умолчанию:false
.emitClose
<boolean> Должен ли поток генерировать событие'close'
после того, как он будет уничтожен. По умолчанию:true
.read
<Function> Реализация методаstream._read()
.destroy
<Function> Реализация методаstream._destroy()
.construct
<Function> Реализация методаstream._construct()
.autoDestroy
<boolean> Должен ли этот поток автоматически вызывать.destroy()
сам по себе после завершения. По умолчанию:true
.signal
<AbortSignal> Сигнал, представляющий возможное отключение.
const { Readable } = require('node:stream')
class MyReadable extends Readable {
constructor(options) {
// Вызывает конструктор stream.Readable(options).
super(options)
// ...
}
}
Или, при использовании конструкторов в стиле pre-ES6:
const { Readable } = require('node:stream')
const util = require('node:util')
function MyReadable(options) {
if (!(this instanceof MyReadable)) return new MyReadable(options)
Readable.call(this, options)
}
util.inherits(MyReadable, Readable)
Или, используя упрощенный подход к конструктору:
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
// ...
},
})
Вызов abort
для AbortController
, соответствующего переданному AbortSignal
, будет вести себя так же, как вызов .destroy(new AbortError())
для созданного читаемого объекта.
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal,
})
// Позже прервите операцию, закрыв поток
controller.abort()
readable._construct(callback)
Добавлено в: v15.0.0
callback
<Function> Вызов этой функции (необязательно с аргументом ошибки) после завершения инициализации потока.
Метод _construct()
НЕ ДОЛЖЕН вызываться напрямую. Он может быть реализован дочерними классами, и в этом случае будет вызываться только внутренними методами класса Readable
.
Эта необязательная функция будет запланирована на следующий такт конструктором потока, откладывая любые вызовы _read()
и _destroy()
до вызова callback
. Это полезно для инициализации состояния или асинхронной инициализации ресурсов, прежде чем поток станет доступен для использования.
const { Readable } = require('node:stream')
const fs = require('node:fs')
class ReadStream extends Readable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_read(n) {
const buf = Buffer.alloc(n)
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err)
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
}
})
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
readable._read(size)
Добавлено в: v0.9.4
size
<number> Количество байтов для асинхронного чтения
Эту функцию НЕ ДОЛЖЕН вызывать код приложения напрямую. Она должна быть реализована дочерними классами и вызываться только внутренними методами класса Readable
.
Все реализации потока Readable
должны предоставлять реализацию метода readable._read()
для извлечения данных из базового ресурса.
Когда вызывается readable._read()
, если данные доступны из ресурса, реализация должна начать отправку этих данных в очередь чтения с помощью метода this.push(dataChunk)
. _read()
будет вызываться снова после каждого вызова this.push(dataChunk)
, как только поток будет готов принять больше данных. _read()
может продолжить чтение из ресурса и отправку данных до тех пор, пока readable.push()
не вернет false
. Только когда _read()
вызывается снова после того, как он остановился, он должен возобновить отправку дополнительных данных в очередь.
После того, как метод readable._read()
был вызван, он не будет вызываться снова, пока не будут отправлены дополнительные данные через метод readable.push()
. Пустые данные, такие как пустые буферы и строки, не приведут к вызову readable._read()
.
Аргумент size
является рекомендательным. Для реализаций, где "чтение" - это единственная операция, которая возвращает данные, можно использовать аргумент size
для определения того, сколько данных нужно извлечь. Другие реализации могут игнорировать этот аргумент и просто предоставлять данные, как только они станут доступны. Нет необходимости "ждать", пока не станет доступно size
байтов, прежде чем вызывать stream.push(chunk)
.
Метод readable._read()
имеет префикс подчеркивания, потому что он является внутренним для класса, который его определяет, и никогда не должен вызываться напрямую пользовательскими программами.
readable._destroy(err, callback)
Добавлено в: v8.0.0
err
<Error> Возможная ошибка.callback
<Function> Функция обратного вызова, принимающая необязательный аргумент ошибки.
Метод _destroy()
вызывается методом readable.destroy()
. Его можно переопределить в дочерних классах, но нельзя вызывать напрямую.
readable.push(chunk[, encoding])
[История]
Версия | Изменения |
---|---|
v22.0.0, v20.13.0 | Аргумент chunk теперь может быть экземпляром TypedArray или DataView . |
v8.0.0 | Аргумент chunk теперь может быть экземпляром Uint8Array . |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Фрагмент данных для добавления в очередь чтения. Для потоков, не работающих в объектном режиме,chunk
должен быть <string>, <Buffer>, <TypedArray> или <DataView>. Для потоков в объектном режимеchunk
может быть любым значением JavaScript.encoding
<string> Кодировка строковых фрагментов. Должна быть допустимой кодировкойBuffer
, например,'utf8'
или'ascii'
.- Возвращает: <boolean>
true
, если можно продолжать добавлять дополнительные фрагменты данных;false
в противном случае.
Когда chunk
является <Buffer>, <TypedArray>, <DataView> или <string>, фрагмент данных будет добавлен во внутренний буфер для использования потребителями потока. Передача chunk
как null
сигнализирует о конце потока (EOF), после чего больше данных записать нельзя.
Когда Readable
работает в приостановленном режиме, данные, добавленные с помощью readable.push()
, можно считать, вызвав метод readable.read()
, когда возникает событие 'readable'
.
Когда Readable
работает в непрерывном режиме, данные, добавленные с помощью readable.push()
, будут доставлены путем генерации события 'data'
.
Метод readable.push()
разработан для максимальной гибкости. Например, при обёртывании низкоуровневого источника, предоставляющего какой-либо механизм приостановки/возобновления и обратный вызов данных, низкоуровневый источник может быть обернут пользовательским экземпляром Readable
:
// `_source` — это объект с методами readStop() и readStart(),
// а также членом `ondata`, который вызывается при наличии данных, и
// членом `onend`, который вызывается по окончании данных.
class SourceWrapper extends Readable {
constructor(options) {
super(options)
this._source = getLowLevelSourceObject()
// Каждый раз, когда есть данные, добавляем их во внутренний буфер.
this._source.ondata = chunk => {
// Если push() возвращает false, то прекращаем чтение из источника.
if (!this.push(chunk)) this._source.readStop()
}
// Когда источник заканчивается, добавляем фрагмент `null`, сигнализирующий о конце файла.
this._source.onend = () => {
this.push(null)
}
}
// _read() будет вызываться, когда поток захочет получить больше данных.
// Аргумент advisory size в этом случае игнорируется.
_read(size) {
this._source.readStart()
}
}
Метод readable.push()
используется для добавления содержимого во внутренний буфер. Он может управляться методом readable._read()
.
Для потоков, не работающих в объектном режиме, если параметр chunk
метода readable.push()
равен undefined
, он будет рассматриваться как пустая строка или буфер. См. readable.push('')
для получения дополнительной информации.
Ошибки при чтении
Ошибки, возникающие во время обработки readable._read()
, должны распространяться через метод readable.destroy(err)
. Выбрасывание Error
изнутри readable._read()
или ручная эмиссия события 'error'
приводят к неопределённому поведению.
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
const err = checkSomeErrorCondition()
if (err) {
this.destroy(err)
} else {
// Выполнить некоторые действия.
}
},
})
Пример потока подсчёта
Ниже приведён базовый пример потока Readable
, который излучает числовые значения от 1 до 1 000 000 в порядке возрастания, а затем завершается.
const { Readable } = require('node:stream')
class Counter extends Readable {
constructor(opt) {
super(opt)
this._max = 1000000
this._index = 1
}
_read() {
const i = this._index++
if (i > this._max) this.push(null)
else {
const str = String(i)
const buf = Buffer.from(str, 'ascii')
this.push(buf)
}
}
}
Реализация дуплексного потока
Поток Duplex
— это поток, который реализует как Readable
, так и Writable
, например, соединение сокета TCP.
Поскольку JavaScript не поддерживает множественное наследование, класс stream.Duplex
расширяется для реализации потока Duplex
(в отличие от расширения классов stream.Readable
и stream.Writable
).
Класс stream.Duplex
прототипически наследуется от stream.Readable
и паразитически от stream.Writable
, но instanceof
будет работать корректно для обоих базовых классов из-за переопределения Symbol.hasInstance
в stream.Writable
.
Пользовательские потоки Duplex
должны вызывать конструктор new stream.Duplex([options])
и реализовывать оба метода readable._read()
и writable._write()
.
new stream.Duplex(options)
[История]
Версия | Изменения |
---|---|
v8.4.0 | Теперь поддерживаются параметры readableHighWaterMark и writableHighWaterMark . |
options
<Объект> Передаётся как конструкторамWritable
, так иReadable
. Также имеет следующие поля:allowHalfOpen
<булево> Если установлено вfalse
, то поток автоматически завершит запись, когда завершится чтение. По умолчанию:true
.readable
<булево> Указывает, должен лиDuplex
быть читаемым. По умолчанию:true
.writable
<булево> Указывает, должен лиDuplex
быть записываемым. По умолчанию:true
.readableObjectMode
<булево> УстанавливаетobjectMode
для читаемой стороны потока. Не оказывает влияния, еслиobjectMode
равенtrue
. По умолчанию:false
.writableObjectMode
<булево> УстанавливаетobjectMode
для записываемой стороны потока. Не оказывает влияния, еслиobjectMode
равенtrue
. По умолчанию:false
.readableHighWaterMark
<число> УстанавливаетhighWaterMark
для читаемой стороны потока. Не оказывает влияния, если указанhighWaterMark
.writableHighWaterMark
<число> УстанавливаетhighWaterMark
для записываемой стороны потока. Не оказывает влияния, если указанhighWaterMark
.
const { Duplex } = require('node:stream')
class MyDuplex extends Duplex {
constructor(options) {
super(options)
// ...
}
}
Или, при использовании конструкторов в стиле pre-ES6:
const { Duplex } = require('node:stream')
const util = require('node:util')
function MyDuplex(options) {
if (!(this instanceof MyDuplex)) return new MyDuplex(options)
Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)
Или, используя упрощённый подход к конструктору:
const { Duplex } = require('node:stream')
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
},
})
При использовании pipeline:
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')
pipeline(
fs.createReadStream('object.json').setEncoding('utf8'),
new Transform({
decodeStrings: false, // Принимает строковый ввод вместо буферов
construct(callback) {
this.data = ''
callback()
},
transform(chunk, encoding, callback) {
this.data += chunk
callback()
},
flush(callback) {
try {
// Убеждаемся, что это валидный JSON.
JSON.parse(this.data)
this.push(this.data)
callback()
} catch (err) {
callback(err)
}
},
}),
fs.createWriteStream('valid-object.json'),
err => {
if (err) {
console.error('failed', err)
} else {
console.log('completed')
}
}
)
Пример дуплексного потока
Ниже приведён простой пример Duplex
-потока, который обертывает гипотетический низкоуровневый исходный объект, в который можно записывать данные и из которого можно считывать данные, хотя и с использованием API, несовместимого с потоками Node.js. Следующий пример иллюстрирует простой пример Duplex
-потока, который буферизует входящие записанные данные через интерфейс Writable
, которые затем считываются через интерфейс Readable
.
const { Duplex } = require('node:stream')
const kSource = Symbol('source')
class MyDuplex extends Duplex {
constructor(source, options) {
super(options)
this[kSource] = source
}
_write(chunk, encoding, callback) {
// Исходный объект работает только со строками.
if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
this[kSource].writeSomeData(chunk)
callback()
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding))
})
}
}
Самый важный аспект Duplex
-потока заключается в том, что стороны Readable
и Writable
работают независимо друг от друга, несмотря на то, что сосуществуют в одном экземпляре объекта.
Дуплексные потоки в объектом режиме
Для Duplex
-потоков objectMode
может быть установлен исключительно для стороны Readable
или Writable
с помощью опций readableObjectMode
и writableObjectMode
соответственно.
В следующем примере, например, создаётся новый поток Transform
(который является типом потока Duplex
), имеющий сторону Writable
в объектом режиме, которая принимает числа JavaScript, которые преобразуются в шестнадцатеричные строки на стороне Readable
.
const { Transform } = require('node:stream')
// Все потоки Transform также являются Duplex потоками.
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// Приведение чанка к числу, если необходимо.
chunk |= 0
// Преобразование чанка во что-то другое.
const data = chunk.toString(16)
// Добавление данных в очередь чтения.
callback(null, '0'.repeat(data.length % 2) + data)
},
})
myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))
myTransform.write(1)
// Выводит: 01
myTransform.write(10)
// Выводит: 0a
myTransform.write(100)
// Выводит: 64
Реализация потока преобразования
Поток Transform
— это поток Duplex
, в котором выходные данные каким-либо образом вычисляются из входных данных. Примерами являются потоки zlib или crypto, которые сжимают, шифруют или дешифруют данные.
Нет необходимости, чтобы размер выходных данных был равен размеру входных данных, количество фрагментов было одинаковым или они поступали одновременно. Например, поток Hash
будет иметь только один фрагмент выходных данных, который предоставляется по завершении ввода. Поток zlib
будет производить выходные данные, которые значительно меньше или больше, чем входные.
Класс stream.Transform
расширяется для реализации потока Transform
.
Класс stream.Transform
прототипически наследуется от stream.Duplex
и реализует собственные версии методов writable._write()
и readable._read()
. Пользовательские реализации Transform
должны реализовывать метод transform._transform()
и могут также реализовывать метод transform._flush()
.
Следует соблюдать осторожность при использовании потоков Transform
, поскольку данные, записанные в поток, могут привести к приостановке части Writable
потока, если выходные данные со стороны Readable
не потребляются.
new stream.Transform([options])
options
<Object> Передается как конструкторамWritable
, так иReadable
. Также имеет следующие поля:transform
<Function> Реализация для методаstream._transform()
.flush
<Function> Реализация для методаstream._flush()
.
const { Transform } = require('node:stream')
class MyTransform extends Transform {
constructor(options) {
super(options)
// ...
}
}
Или, при использовании конструкторов в стиле pre-ES6:
const { Transform } = require('node:stream')
const util = require('node:util')
function MyTransform(options) {
if (!(this instanceof MyTransform)) return new MyTransform(options)
Transform.call(this, options)
}
util.inherits(MyTransform, Transform)
Или, используя упрощенный подход к конструктору:
const { Transform } = require('node:stream')
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
},
})
Событие: 'end'
Событие 'end'
происходит от класса stream.Readable
. Событие 'end'
генерируется после вывода всех данных, что происходит после вызова обратного вызова в transform._flush()
. В случае ошибки 'end'
не должно генерироваться.
Событие: 'finish'
Событие 'finish'
происходит от класса stream.Writable
. Событие 'finish'
генерируется после вызова stream.end()
и обработки всех фрагментов методом stream._transform()
. В случае ошибки 'finish'
не должно генерироваться.
transform._flush(callback)
callback
<Function> Функция обратного вызова (опционально с аргументом ошибки и данными), которая вызывается после очистки оставшихся данных.
Эта функция НИ В КОЕМ СЛУЧАЕ не должна вызываться кодом приложения напрямую. Она должна быть реализована дочерними классами и вызываться только внутренними методами класса Readable
.
В некоторых случаях операции преобразования могут потребовать вывода дополнительных данных в конце потока. Например, поток сжатия zlib
будет хранить некоторое количество внутреннего состояния, используемого для оптимального сжатия вывода. Однако, когда поток заканчивается, эти дополнительные данные должны быть очищены, чтобы сжатые данные были полными.
Пользовательские реализации Transform
могут реализовывать метод transform._flush()
. Он будет вызван, когда нет больше записываемых данных для обработки, но до того, как будет сгенерировано событие 'end'
, сигнализирующее о конце потока Readable
.
Внутри реализации transform._flush()
метод transform.push()
может вызываться ноль или более раз, в зависимости от ситуации. Функция callback
должна быть вызвана по завершении операции очистки.
Метод transform._flush()
имеет префикс подчеркивания, поскольку он является внутренним для класса, который его определяет, и никогда не должен вызываться напрямую пользовательскими программами.
transform._transform(chunk, encoding, callback)
chunk
<Buffer> | <string> | <any> Преобразуемый буфер, преобразованный из строки, переданной вstream.write()
. Если параметрdecodeStrings
потока имеет значениеfalse
или поток работает в объектном режиме, фрагмент не будет преобразован и будет таким же, как переданный вstream.write()
.encoding
<string> Если фрагмент является строкой, то это тип кодировки. Если фрагмент является буфером, то это специальное значение'buffer'
. В этом случае игнорируйте его.callback
<Function> Функция обратного вызова (необязательно с аргументом ошибки и данными), которая вызывается после обработки предоставленного фрагментаchunk
.
Эту функцию НИ В КОЕМ СЛУЧАЕ нельзя вызывать непосредственно из кода приложения. Она должна быть реализована дочерними классами и вызываться только внутренними методами класса Readable
.
Все реализации потоков Transform
должны предоставлять метод _transform()
для приема входных данных и вывода результата. Реализация transform._transform()
обрабатывает записываемые байты, вычисляет выходные данные, а затем передает эти выходные данные в читаемую часть с помощью метода transform.push()
.
Метод transform.push()
может вызываться от нуля и более раз для генерации выходных данных из одного входного фрагмента, в зависимости от того, сколько данных должно быть выведено в результате обработки фрагмента.
Возможно, что никакие выходные данные не будут сгенерированы из любого данного входного фрагмента данных.
Функция callback
должна вызываться только тогда, когда текущий фрагмент полностью обработан. Первый аргумент, переданный в callback
, должен быть объектом Error
, если произошла ошибка во время обработки входных данных, или null
в противном случае. Если второй аргумент передается в callback
, он будет передан в метод transform.push()
, но только если первый аргумент является ложным. Другими словами, следующие примеры эквивалентны:
transform.prototype._transform = function (data, encoding, callback) {
this.push(data)
callback()
}
transform.prototype._transform = function (data, encoding, callback) {
callback(null, data)
}
Метод transform._transform()
имеет префикс подчеркивания, поскольку он является внутренним для класса, который его определяет, и никогда не должен вызываться непосредственно пользовательскими программами.
transform._transform()
никогда не вызывается параллельно; потоки реализуют механизм очереди, и для получения следующего фрагмента необходимо вызвать callback
, синхронно или асинхронно.
Класс: stream.PassThrough
Класс stream.PassThrough
— это тривиальная реализация потока Transform
, который просто передает входные байты на выход. Его основное назначение — примеры и тестирование, но есть некоторые случаи использования, когда stream.PassThrough
полезен как строительный блок для новых типов потоков.
Дополнительные заметки
Совместимость потоков с асинхронными генераторами и асинхронными итераторами
С поддержкой асинхронных генераторов и итераторов в JavaScript асинхронные генераторы на данный момент являются фактически полноценной языковой конструкцией потока.
Ниже приведены некоторые распространенные примеры взаимодействия использования потоков Node.js с асинхронными генераторами и асинхронными итераторами.
Потребление читаемых потоков с помощью асинхронных итераторов
;(async function () {
for await (const chunk of readable) {
console.log(chunk)
}
})()
Асинхронные итераторы регистрируют постоянный обработчик ошибок в потоке, чтобы предотвратить любые необработанные ошибки после уничтожения.
Создание читаемых потоков с помощью асинхронных генераторов
Читаемый поток Node.js может быть создан из асинхронного генератора с помощью вспомогательного метода Readable.from()
:
const { Readable } = require('node:stream')
const ac = new AbortController()
const signal = ac.signal
async function* generate() {
yield 'a'
await someLongRunningFn({ signal })
yield 'b'
yield 'c'
}
const readable = Readable.from(generate())
readable.on('close', () => {
ac.abort()
})
readable.on('data', chunk => {
console.log(chunk)
})
Передача данных в записываемые потоки из асинхронных итераторов
При записи в записываемый поток из асинхронного итератора обеспечьте правильную обработку противодавления и ошибок. stream.pipeline()
абстрагирует обработку противодавления и связанных с ним ошибок:
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')
const writable = fs.createWriteStream('./file')
const ac = new AbortController()
const signal = ac.signal
const iterator = createIterator({ signal })
// Шаблон обратного вызова
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err)
} else {
console.log(value, 'value returned')
}
}).on('close', () => {
ac.abort()
})
// Шаблон Promise
pipelinePromise(iterator, writable)
.then(value => {
console.log(value, 'value returned')
})
.catch(err => {
console.error(err)
ac.abort()
})
Совместимость со старыми версиями Node.js
До Node.js 0.10 интерфейс потока Readable
был проще, но и менее мощным и полезным.
- Вместо ожидания вызовов метода
stream.read()
, события'data'
начинали генерироваться немедленно. Приложениям, которым требовалось выполнить некоторую работу для определения способа обработки данных, приходилось сохранять считанные данные в буферы, чтобы данные не были потеряны. - Метод
stream.pause()
был рекомендательным, а не гарантированным. Это означало, что по-прежнему необходимо было быть готовым получать события'data'
даже когда поток находился в приостановленном состоянии.
В Node.js 0.10 был добавлен класс Readable
. Для обратной совместимости со старыми программами Node.js потоки Readable
переключаются в "режим потока", когда добавляется обработчик событий 'data'
или вызывается метод stream.resume()
. Эффект заключается в том, что даже при не использовании нового метода stream.read()
и события 'readable'
, больше не нужно беспокоиться о потере фрагментов 'data'
.
Хотя большинство приложений будут продолжать работать нормально, это вносит пограничный случай в следующих условиях:
- Не добавлен обработчик событий
'data'
. - Метод
stream.resume()
никогда не вызывается. - Поток не передаётся ни в один записываемый пункт назначения.
Например, рассмотрим следующий код:
// ВНИМАНИЕ! НЕ РАБОТАЕТ!
net
.createServer(socket => {
// Мы добавляем обработчик 'end', но никогда не потребляем данные.
socket.on('end', () => {
// Сюда никогда не дойдёт.
socket.end('The message was received but was not processed.\n')
})
})
.listen(1337)
До Node.js 0.10 входящие данные сообщения просто отбрасывались. Однако в Node.js 0.10 и выше сокет остаётся приостановленным навсегда.
Решение в этой ситуации — вызов метода stream.resume()
для начала потока данных:
// Решение.
net
.createServer(socket => {
socket.on('end', () => {
socket.end('The message was received but was not processed.\n')
})
// Запуск потока данных, отбрасывая их.
socket.resume()
})
.listen(1337)
В дополнение к новым потокам Readable
, переключающимся в режим потока, потоки в стиле до 0.10 могут быть обернуты в класс Readable
с помощью метода readable.wrap()
.
readable.read(0)
Существуют случаи, когда необходимо инициировать обновление базовых механизмов потока чтения, не потребляя при этом никаких данных. В таких случаях можно вызвать readable.read(0)
, который всегда возвращает null
.
Если внутренний буфер чтения меньше highWaterMark
, и поток в данный момент не читается, то вызов stream.read(0)
инициирует низкоуровневый вызов stream._read()
.
Хотя большинству приложений это практически никогда не понадобится, существуют ситуации в Node.js, где это делается, особенно во внутренних компонентах класса потока Readable
.
readable.push('')
Использование readable.push('')
не рекомендуется.
Передача нулевого байта <string>, <Buffer>, <TypedArray> или <DataView> в поток, который не находится в объектном режиме, имеет интересный побочный эффект. Поскольку это является вызовом readable.push()
, вызов завершит процесс чтения. Однако, поскольку аргумент является пустой строкой, данные не добавляются в буфер чтения, поэтому пользователю нечего потреблять.
Расхождение highWaterMark
после вызова readable.setEncoding()
Использование readable.setEncoding()
изменит поведение работы highWaterMark
в не объектном режиме.
Обычно размер текущего буфера измеряется относительно highWaterMark
в байтах. Однако после вызова setEncoding()
функция сравнения начнет измерять размер буфера в символах.
В обычных случаях с latin1
или ascii
это не проблема. Но рекомендуется учитывать это поведение при работе со строками, которые могут содержать многобайтовые символы.