Stream
[Estable: 2 - Estable]
Estable: 2 Estabilidad: 2 - Estable
Código fuente: lib/stream.js
Un stream es una interfaz abstracta para trabajar con datos en streaming en Node.js. El módulo node:stream
proporciona una API para implementar la interfaz de stream.
Node.js proporciona muchos objetos stream. Por ejemplo, una solicitud a un servidor HTTP y process.stdout
son ambas instancias de stream.
Los streams pueden ser legibles, escribibles o ambos. Todos los streams son instancias de EventEmitter
.
Para acceder al módulo node:stream
:
const stream = require('node:stream')
El módulo node:stream
es útil para crear nuevos tipos de instancias de stream. Generalmente, no es necesario utilizar el módulo node:stream
para consumir streams.
Organización de este documento
Este documento contiene dos secciones principales y una tercera sección para notas. La primera sección explica cómo utilizar los streams existentes dentro de una aplicación. La segunda sección explica cómo crear nuevos tipos de streams.
Tipos de streams
Hay cuatro tipos de streams fundamentales dentro de Node.js:
Writable
: streams a los que se pueden escribir datos (por ejemplo,fs.createWriteStream()
).Readable
: streams desde los que se pueden leer datos (por ejemplo,fs.createReadStream()
).Duplex
: streams que son tantoReadable
comoWritable
(por ejemplo,net.Socket
).Transform
: streamsDuplex
que pueden modificar o transformar los datos a medida que se escriben y se leen (por ejemplo,zlib.createDeflate()
).
Adicionalmente, este módulo incluye las funciones de utilidad stream.duplexPair()
, stream.pipeline()
, stream.finished()
stream.Readable.from()
, y stream.addAbortSignal()
.
API de Promesas de Streams
Añadido en: v15.0.0
La API stream/promises
proporciona un conjunto alternativo de funciones de utilidad asíncronas para streams que devuelven objetos Promise
en lugar de usar callbacks. Se puede acceder a la API a través de require('node:stream/promises')
o require('node:stream').promises
.
stream.pipeline(source[, ...transforms], destination[, options])
stream.pipeline(streams[, options])
[Historial]
Versión | Cambios |
---|---|
v18.0.0, v17.2.0, v16.14.0 | Añadida la opción end , que se puede establecer en false para evitar que el stream de destino se cierre automáticamente cuando finaliza el origen. |
v15.0.0 | Añadido en: v15.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function>- Devuelve: <Promise> | <AsyncIterable>
...transforms
<Stream> | <Function>source
<AsyncIterable>- Devuelve: <Promise> | <AsyncIterable>
destination
<Stream> | <Function>source
<AsyncIterable>- Devuelve: <Promise> | <AsyncIterable>
options
<Object> Opciones de Pipelinesignal
<AbortSignal>end
<boolean> Finalizar el stream de destino cuando finalice el stream de origen. Los streams de transformación siempre finalizan, incluso si este valor esfalse
. Predeterminado:true
.
Devuelve: <Promise> Se cumple cuando la pipeline está completa.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
console.log('Pipeline succeeded.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Pipeline succeeded.')
Para usar un AbortSignal
, páselo dentro de un objeto de opciones, como último argumento. Cuando se aborta la señal, se llamará a destroy
en la pipeline subyacente, con un AbortError
.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
const ac = new AbortController()
const signal = ac.signal
setImmediate(() => ac.abort())
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
signal,
})
}
run().catch(console.error) // AbortError
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
console.error(err) // AbortError
}
La API pipeline
también admite generadores asíncronos:
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // Trabaja con cadenas en lugar de `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
fs.createWriteStream('uppercase.txt')
)
console.log('Pipeline succeeded.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // Trabaja con cadenas en lugar de `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
createWriteStream('uppercase.txt')
)
console.log('Pipeline succeeded.')
Recuerda manejar el argumento signal
pasado al generador asíncrono. Especialmente en el caso de que el generador asíncrono sea el origen de la pipeline (es decir, el primer argumento) o la pipeline nunca se completará.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline succeeded.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline succeeded.')
La API pipeline
proporciona versión de callback:
stream.finished(stream[, options])
[Historial]
Versión | Cambios |
---|---|
v19.5.0, v18.14.0 | Se agregó soporte para ReadableStream y WritableStream . |
v19.1.0, v18.13.0 | Se agregó la opción cleanup . |
v15.0.0 | Agregado en: v15.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> Un flujo/webstream de lectura y/o escritura.options
<Object>error
<boolean> | <undefined>readable
<boolean> | <undefined>writable
<boolean> | <undefined>signal
<AbortSignal> | <undefined>cleanup
<boolean> | <undefined> Si estrue
, elimina los listeners registrados por esta función antes de que la promesa se cumpla. Predeterminado:false
.
Devuelve: <Promise> Se cumple cuando el flujo ya no es legible o grabable.
const { finished } = require('node:stream/promises')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('La lectura del flujo ha terminado.')
}
run().catch(console.error)
rs.resume() // Vaciar el flujo.
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'
const rs = createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('La lectura del flujo ha terminado.')
}
run().catch(console.error)
rs.resume() // Vaciar el flujo.
La API finished
también proporciona una versión de callback.
stream.finished()
deja listeners de eventos colgando (en particular 'error'
, 'end'
, 'finish'
y 'close'
) después de que la promesa devuelta se resuelve o rechaza. La razón de esto es para que los eventos 'error'
inesperados (debido a implementaciones incorrectas de flujo) no causen fallos inesperados. Si este es un comportamiento no deseado, entonces options.cleanup
debe establecerse en true
:
await finished(rs, { cleanup: true })
Modo objeto
Todos los flujos creados por las API de Node.js operan exclusivamente con cadenas, objetos <Buffer>, <TypedArray> y <DataView>:
- Las
Strings
y losBuffers
son los tipos más comunes utilizados con los flujos. TypedArray
yDataView
te permiten manejar datos binarios con tipos comoInt32Array
oUint8Array
. Cuando escribes un TypedArray o DataView en un flujo, Node.js procesa los bytes sin procesar.
Es posible, sin embargo, que las implementaciones de flujo funcionen con otros tipos de valores de JavaScript (con la excepción de null
, que tiene un propósito especial dentro de los flujos). Se considera que dichos flujos operan en "modo objeto".
Las instancias de flujo se cambian al modo objeto utilizando la opción objectMode
cuando se crea el flujo. Intentar cambiar un flujo existente al modo objeto no es seguro.
Almacenamiento en búfer
Tanto los flujos Writable
como los Readable
almacenarán datos en un búfer interno.
La cantidad de datos que se pueden almacenar en búfer depende de la opción highWaterMark
que se pasa al constructor del flujo. Para los flujos normales, la opción highWaterMark
especifica un número total de bytes. Para los flujos que operan en modo objeto, highWaterMark
especifica un número total de objetos. Para los flujos que operan con (pero no decodifican) cadenas, highWaterMark
especifica un número total de unidades de código UTF-16.
Los datos se almacenan en búfer en los flujos Readable
cuando la implementación llama a stream.push(chunk)
. Si el consumidor del flujo no llama a stream.read()
, los datos permanecerán en la cola interna hasta que se consuman.
Una vez que el tamaño total del búfer de lectura interno alcanza el umbral especificado por highWaterMark
, el flujo dejará temporalmente de leer datos del recurso subyacente hasta que los datos almacenados en búfer actualmente se puedan consumir (es decir, el flujo dejará de llamar al método interno readable._read()
que se utiliza para llenar el búfer de lectura).
Los datos se almacenan en búfer en los flujos Writable
cuando se llama repetidamente al método writable.write(chunk)
. Mientras que el tamaño total del búfer de escritura interno esté por debajo del umbral establecido por highWaterMark
, las llamadas a writable.write()
devolverán true
. Una vez que el tamaño del búfer interno alcance o supere el highWaterMark
, se devolverá false
.
Un objetivo clave de la API stream
, particularmente el método stream.pipe()
, es limitar el almacenamiento en búfer de datos a niveles aceptables de tal manera que las fuentes y los destinos de diferentes velocidades no sobrecarguen la memoria disponible.
La opción highWaterMark
es un umbral, no un límite: dicta la cantidad de datos que un flujo almacena en búfer antes de dejar de solicitar más datos. No impone una limitación estricta de memoria en general. Las implementaciones de flujo específicas pueden optar por imponer límites más estrictos, pero hacerlo es opcional.
Debido a que los flujos Duplex
y Transform
son tanto Readable
como Writable
, cada uno mantiene dos búferes internos separados utilizados para leer y escribir, lo que permite que cada lado funcione independientemente del otro mientras se mantiene un flujo de datos adecuado y eficiente. Por ejemplo, las instancias de net.Socket
son flujos Duplex
cuyo lado Readable
permite el consumo de datos recibidos del socket y cuyo lado Writable
permite escribir datos al socket. Dado que los datos se pueden escribir en el socket a una velocidad más rápida o más lenta que la velocidad a la que se reciben los datos, cada lado debe operar (y almacenar en búfer) independientemente del otro.
La mecánica del almacenamiento en búfer interno es un detalle de implementación interna y se puede cambiar en cualquier momento. Sin embargo, para ciertas implementaciones avanzadas, los búferes internos se pueden recuperar utilizando writable.writableBuffer
o readable.readableBuffer
. Se desaconseja el uso de estas propiedades no documentadas.
API para consumidores de flujos
Casi todas las aplicaciones de Node.js, sin importar cuán simples sean, utilizan flujos de alguna manera. El siguiente es un ejemplo del uso de flujos en una aplicación Node.js que implementa un servidor HTTP:
const http = require('node:http')
const server = http.createServer((req, res) => {
// `req` es un http.IncomingMessage, que es un flujo de lectura.
// `res` es un http.ServerResponse, que es un flujo de escritura.
let body = ''
// Obtiene los datos como cadenas utf8.
// Si no se establece una codificación, se recibirán objetos Buffer.
req.setEncoding('utf8')
// Los flujos legibles emiten eventos 'data' una vez que se añade un listener.
req.on('data', chunk => {
body += chunk
})
// El evento 'end' indica que se ha recibido todo el cuerpo.
req.on('end', () => {
try {
const data = JSON.parse(body)
// Escribe algo interesante al usuario:
res.write(typeof data)
res.end()
} catch (er) {
// ¡oh no! ¡json incorrecto!
res.statusCode = 400
return res.end(`error: ${er.message}`)
}
})
})
server.listen(1337)
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON
Los flujos Writable
(como res
en el ejemplo) exponen métodos como write()
y end()
que se utilizan para escribir datos en el flujo.
Los flujos Readable
utilizan la API EventEmitter
para notificar al código de la aplicación cuando hay datos disponibles para ser leídos del flujo. Esos datos disponibles se pueden leer del flujo de varias maneras.
Tanto los flujos Writable
como Readable
utilizan la API EventEmitter
de diversas maneras para comunicar el estado actual del flujo.
Los flujos Duplex
y Transform
son tanto Writable
como Readable
.
Las aplicaciones que escriben o consumen datos de un flujo no están obligadas a implementar las interfaces de flujo directamente y generalmente no tendrán ninguna razón para llamar a require('node:stream')
.
Los desarrolladores que deseen implementar nuevos tipos de flujos deben consultar la sección API para implementadores de flujos.
Streams de escritura
Los streams de escritura son una abstracción para un destino al que se escriben los datos.
Ejemplos de streams Writable
incluyen:
- Peticiones HTTP, en el cliente
- Respuestas HTTP, en el servidor
- Streams de escritura fs
- Streams zlib
- Streams crypto
- Sockets TCP
- stdin del proceso hijo
process.stdout
,process.stderr
Algunos de estos ejemplos son en realidad streams Duplex
que implementan la interfaz Writable
.
Todos los streams Writable
implementan la interfaz definida por la clase stream.Writable
.
Si bien las instancias específicas de streams Writable
pueden diferir de varias maneras, todos los streams Writable
siguen el mismo patrón de uso fundamental como se ilustra en el ejemplo a continuación:
const myStream = getWritableStreamSomehow()
myStream.write('algunos datos')
myStream.write('algunos datos más')
myStream.end('terminó de escribir datos')
Clase: stream.Writable
Añadido en: v0.9.4
Evento: 'close'
[Historial]
Versión | Cambios |
---|---|
v10.0.0 | Se añadió la opción emitClose para especificar si se emite 'close' al destruir. |
v0.9.4 | Añadido en: v0.9.4 |
El evento 'close'
se emite cuando el stream y cualquiera de sus recursos subyacentes (un descriptor de archivo, por ejemplo) se han cerrado. El evento indica que no se emitirán más eventos y no se producirá ningún cálculo adicional.
Un stream Writable
siempre emitirá el evento 'close'
si se crea con la opción emitClose
.
Evento: 'drain'
Añadido en: v0.9.4
Si una llamada a stream.write(chunk)
devuelve false
, el evento 'drain'
se emitirá cuando sea apropiado reanudar la escritura de datos en el stream.
// Escribe los datos en el stream grabable proporcionado un millón de veces.
// Presta atención a la contrapresión.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000
write()
function write() {
let ok = true
do {
i--
if (i === 0) {
// ¡Última vez!
writer.write(data, encoding, callback)
} else {
// Mira si debemos continuar o esperar.
// No pases el callback, porque aún no hemos terminado.
ok = writer.write(data, encoding)
}
} while (i > 0 && ok)
if (i > 0) {
// ¡Tuvo que parar antes!
// Escribe un poco más una vez que se drene.
writer.once('drain', write)
}
}
}
Evento: 'error'
Añadido en: v0.9.4
El evento 'error'
se emite si se produce un error al escribir o canalizar datos. Al llamar al callback del listener, se le pasa un único argumento Error
.
El stream se cierra cuando se emite el evento 'error'
a menos que la opción autoDestroy
se haya establecido en false
al crear el stream.
Después de 'error'
, no se deberían emitir más eventos que 'close'
(incluidos los eventos 'error'
).
Evento: 'finish'
Añadido en: v0.9.4
El evento 'finish'
se emite después de que se haya llamado al método stream.end()
y todos los datos se hayan vaciado en el sistema subyacente.
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
writer.write(`hola, #${i}!\n`)
}
writer.on('finish', () => {
console.log('Todas las escrituras ya están completas.')
})
writer.end('Este es el final\n')
Evento: 'pipe'
Añadido en: v0.9.4
src
<stream.Readable> stream de origen que se está canalizando a este grabable
El evento 'pipe'
se emite cuando se llama al método stream.pipe()
en un stream legible, añadiendo este grabable a su conjunto de destinos.
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
console.log('Algo se está canalizando en el escritor.')
assert.equal(src, reader)
})
reader.pipe(writer)
Evento: 'unpipe'
Añadido en: v0.9.4
src
<stream.Readable> El stream de origen que descanalizó este grabable
El evento 'unpipe'
se emite cuando se llama al método stream.unpipe()
en un stream Readable
, eliminando este Writable
de su conjunto de destinos.
También se emite en caso de que este stream Writable
emita un error cuando un stream Readable
se canaliza hacia él.
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
console.log('Algo ha dejado de canalizar en el escritor.')
assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()
Añadido en: v0.11.2
El método writable.cork()
fuerza a que todos los datos escritos se almacenen en la memoria. Los datos almacenados se vaciarán cuando se llamen los métodos stream.uncork()
o stream.end()
.
La intención principal de writable.cork()
es dar cabida a una situación en la que se escriben varios fragmentos pequeños en el stream en rápida sucesión. En lugar de reenviarlos inmediatamente al destino subyacente, writable.cork()
almacena en búfer todos los fragmentos hasta que se llama a writable.uncork()
, que los pasará todos a writable._writev()
, si está presente. Esto evita una situación de bloqueo de cabeza de línea en la que los datos se almacenan en búfer mientras se espera a que se procese el primer fragmento pequeño. Sin embargo, el uso de writable.cork()
sin implementar writable._writev()
puede tener un efecto adverso en el rendimiento.
Véase también: writable.uncork()
, writable._writev()
.
writable.destroy([error])
[Historial]
Versión | Cambios |
---|---|
v14.0.0 | Funciona como una no-operación en un stream que ya ha sido destruido. |
v8.0.0 | Añadido en: v8.0.0 |
Destruye el stream. Opcionalmente, emite un evento 'error'
y emite un evento 'close'
(a menos que emitClose
esté establecido en false
). Después de esta llamada, el stream grabable ha terminado y las llamadas posteriores a write()
o end()
darán como resultado un error ERR_STREAM_DESTROYED
. Esta es una forma destructiva e inmediata de destruir un stream. Es posible que las llamadas anteriores a write()
no se hayan vaciado y pueden desencadenar un error ERR_STREAM_DESTROYED
. Utiliza end()
en lugar de destroy si los datos deben vaciarse antes de cerrar o espera al evento 'drain'
antes de destruir el stream.
const { Writable } = require('node:stream')
const myStream = new Writable()
const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.on('error', function wontHappen() {})
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED
Una vez que se ha llamado a destroy()
, cualquier llamada posterior será una no-operación y no se emitirán más errores que los de _destroy()
como 'error'
.
Los implementadores no deberían sobreescribir este método, sino implementar writable._destroy()
.
writable.closed
Añadido en: v18.0.0
Es true
después de que se haya emitido 'close'
.
writable.destroyed
Añadido en: v8.0.0
Es true
después de que se haya llamado a writable.destroy()
.
const { Writable } = require('node:stream')
const myStream = new Writable()
console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])
[Historial]
Versión | Cambios |
---|---|
v22.0.0, v20.13.0 | El argumento chunk ahora puede ser una instancia de TypedArray o DataView . |
v15.0.0 | Se invoca al callback antes de 'finish' o en caso de error. |
v14.0.0 | Se invoca al callback si se emite 'finish' o 'error'. |
v10.0.0 | Este método ahora devuelve una referencia a writable . |
v8.0.0 | El argumento chunk ahora puede ser una instancia de Uint8Array . |
v0.9.4 | Añadido en: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> Datos opcionales para escribir. Para los streams que no operan en modo objeto,chunk
debe ser un <string>, <Buffer>, <TypedArray> o <DataView>. Para los streams en modo objeto,chunk
puede ser cualquier valor de JavaScript que no seanull
.encoding
<string> La codificación sichunk
es una stringcallback
<Function> Callback para cuando el stream haya terminado.- Devuelve: <this>
Llamar al método writable.end()
indica que no se escribirán más datos en el Writable
. Los argumentos opcionales chunk
y encoding
permiten escribir un último fragmento adicional de datos inmediatamente antes de cerrar el stream.
Llamar al método stream.write()
después de llamar a stream.end()
generará un error.
// Escribe 'hola, ' y luego termina con '¡mundo!'.
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hola, ')
file.end('¡mundo!')
// ¡Ahora no está permitido escribir más!
writable.setDefaultEncoding(encoding)
[Historial]
Versión | Cambios |
---|---|
v6.1.0 | Este método ahora devuelve una referencia a writable . |
v0.11.15 | Añadido en: v0.11.15 |
El método writable.setDefaultEncoding()
establece la encoding
por defecto para un stream Writable
.
writable.uncork()
Añadido en: v0.11.2
El método writable.uncork()
vacía todos los datos almacenados en búfer desde que se llamó a stream.cork()
.
Cuando se utilizan writable.cork()
y writable.uncork()
para gestionar el almacenamiento en búfer de las escrituras en un stream, aplazar las llamadas a writable.uncork()
utilizando process.nextTick()
. Esto permite agrupar todas las llamadas writable.write()
que se producen dentro de una fase determinada del bucle de eventos de Node.js.
stream.cork()
stream.write('algunos ')
stream.write('datos ')
process.nextTick(() => stream.uncork())
Si el método writable.cork()
se llama varias veces en un stream, debe llamarse el mismo número de veces a writable.uncork()
para vaciar los datos almacenados en el búfer.
stream.cork()
stream.write('algunos ')
stream.cork()
stream.write('datos ')
process.nextTick(() => {
stream.uncork()
// Los datos no se vaciarán hasta que se llame a uncork() por segunda vez.
stream.uncork()
})
Véase también: writable.cork()
.
writable.writable
Añadido en: v11.4.0
Es true
si es seguro llamar a writable.write()
, lo que significa que el stream no ha sido destruido, no ha tenido errores o no ha terminado.
writable.writableAborted
Añadido en: v18.0.0, v16.17.0
[Estable: 1 - Experimental]
Estable: 1 Estabilidad: 1 - Experimental
Devuelve si el stream fue destruido o tuvo un error antes de emitir 'finish'
.
writable.writableEnded
Añadido en: v12.9.0
Es true
después de que se haya llamado a writable.end()
. Esta propiedad no indica si los datos se han vaciado, para ello utilice writable.writableFinished
en su lugar.
writable.writableCorked
Añadido en: v13.2.0, v12.16.0
Número de veces que hay que llamar a writable.uncork()
para desconectar completamente el stream.
writable.errored
Añadido en: v18.0.0
Devuelve el error si el stream ha sido destruido con un error.
writable.writableFinished
Añadido en: v12.6.0
Se establece en true
inmediatamente antes de que se emita el evento 'finish'
.
writable.writableHighWaterMark
Añadido en: v9.3.0
Devuelve el valor de highWaterMark
pasado al crear este Writable
.
writable.writableLength
Añadido en: v9.4.0
Esta propiedad contiene el número de bytes (u objetos) en la cola listos para ser escritos. El valor proporciona datos de introspección sobre el estado de highWaterMark
.
writable.writableNeedDrain
Añadido en: v15.2.0, v14.17.0
Es true
si el búfer del stream se ha llenado y el stream emitirá 'drain'
.
writable.writableObjectMode
Añadido en: v12.3.0
Getter para la propiedad objectMode
de un stream Writable
dado.
writable[Symbol.asyncDispose]()
Añadido en: v22.4.0, v20.16.0
[Estable: 1 - Experimental]
Estable: 1 Estabilidad: 1 - Experimental
Llama a writable.destroy()
con un AbortError
y devuelve una promesa que se cumple cuando el stream ha terminado.
writable.write(chunk[, encoding][, callback])
[Historial]
Versión | Cambios |
---|---|
v22.0.0, v20.13.0 | El argumento chunk ahora puede ser una instancia de TypedArray o DataView . |
v8.0.0 | El argumento chunk ahora puede ser una instancia de Uint8Array . |
v6.0.0 | Pasar null como parámetro chunk siempre se considerará inválido ahora, incluso en modo objeto. |
v0.9.4 | Añadido en: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> Datos opcionales para escribir. Para los streams que no operan en modo objeto,chunk
debe ser un <string>, <Buffer>, <TypedArray> o <DataView>. Para los streams en modo objeto,chunk
puede ser cualquier valor de JavaScript que no seanull
.encoding
<string> | <null> La codificación, sichunk
es una string. Por defecto:'utf8'
callback
<Function> Callback para cuando se haya vaciado este fragmento de datos.- Devuelve: <boolean>
false
si el stream desea que el código de llamada espere a que se emita el evento'drain'
antes de seguir escribiendo datos adicionales; de lo contrario,true
.
El método writable.write()
escribe algunos datos en el stream y llama al callback
proporcionado una vez que los datos se han gestionado por completo. Si se produce un error, se llamará a callback
con el error como primer argumento. Se llama al callback
de forma asíncrona y antes de que se emita 'error'
.
El valor devuelto es true
si el búfer interno es menor que el highWaterMark
configurado cuando se creó el stream después de admitir chunk
. Si se devuelve false
, los intentos adicionales de escribir datos en el stream deberían detenerse hasta que se emita el evento 'drain'
.
Mientras un stream no se está drenando, las llamadas a write()
almacenarán en búfer chunk
y devolverán false. Una vez que todos los fragmentos almacenados actualmente en búfer se hayan drenado (aceptados para la entrega por el sistema operativo), se emitirá el evento 'drain'
. Una vez que write()
devuelve false, no escribas más fragmentos hasta que se emita el evento 'drain'
. Si bien está permitido llamar a write()
en un stream que no se está drenando, Node.js almacenará en búfer todos los fragmentos escritos hasta que se produzca el uso máximo de la memoria, momento en el que se abortará incondicionalmente. Incluso antes de que se aborte, el alto uso de memoria provocará un bajo rendimiento del recolector de basura y un alto RSS (que no suele liberarse de vuelta al sistema, incluso después de que la memoria ya no sea necesaria). Dado que los sockets TCP pueden no drenar nunca si el par remoto no lee los datos, escribir en un socket que no se está drenando puede conducir a una vulnerabilidad explotable remotamente.
Escribir datos mientras el stream no se está drenando es particularmente problemático para un Transform
, porque los streams Transform
se pausan por defecto hasta que se canalizan o se añade un controlador de eventos 'data'
o 'readable'
.
Si los datos que se van a escribir pueden generarse o recuperarse a petición, se recomienda encapsular la lógica en un Readable
y utilizar stream.pipe()
. Sin embargo, si se prefiere llamar a write()
, es posible respetar la contrapresión y evitar problemas de memoria utilizando el evento 'drain'
:
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb)
} else {
process.nextTick(cb)
}
}
// Espera a que se llame a cb antes de realizar cualquier otra escritura.
write('hola', () => {
console.log('La escritura se ha completado, realiza más escrituras ahora.')
})
Un stream Writable
en modo objeto siempre ignorará el argumento encoding
.
Streams legibles
Los streams legibles son una abstracción de una fuente de la que se consumen datos.
Ejemplos de streams Readable
incluyen:
- Respuestas HTTP, en el cliente
- Peticiones HTTP, en el servidor
- Streams de lectura fs
- Streams zlib
- Streams de cifrado
- Sockets TCP
- stdout y stderr de procesos hijo
process.stdin
Todos los streams Readable
implementan la interfaz definida por la clase stream.Readable
.
Dos modos de lectura
Los streams Readable
operan de manera efectiva en uno de dos modos: flujo y pausa. Estos modos son independientes del modo objeto. Un stream Readable
puede estar en modo objeto o no, independientemente de si está en modo flujo o en modo pausa.
- En modo flujo, los datos se leen del sistema subyacente automáticamente y se proporcionan a una aplicación lo más rápido posible utilizando eventos a través de la interfaz
EventEmitter
. - En modo pausa, el método
stream.read()
debe llamarse explícitamente para leer fragmentos de datos del stream.
Todos los streams Readable
comienzan en modo pausa, pero se pueden cambiar al modo flujo de una de las siguientes maneras:
- Añadiendo un controlador de eventos
'data'
. - Llamando al método
stream.resume()
. - Llamando al método
stream.pipe()
para enviar los datos a unWritable
.
El Readable
puede volver al modo pausa utilizando uno de los siguientes:
- Si no hay destinos pipe, llamando al método
stream.pause()
. - Si hay destinos pipe, eliminando todos los destinos pipe. Se pueden eliminar varios destinos pipe llamando al método
stream.unpipe()
.
El concepto importante que hay que recordar es que un Readable
no generará datos hasta que se proporcione un mecanismo para consumir o ignorar esos datos. Si el mecanismo de consumo se deshabilita o se elimina, el Readable
intentará dejar de generar los datos.
Por razones de compatibilidad con versiones anteriores, la eliminación de controladores de eventos 'data'
no pausará automáticamente el stream. Además, si hay destinos pipe, entonces llamar a stream.pause()
no garantizará que el stream permanezca pausado una vez que esos destinos se vacíen y soliciten más datos.
Si un Readable
se cambia al modo flujo y no hay consumidores disponibles para manejar los datos, esos datos se perderán. Esto puede ocurrir, por ejemplo, cuando se llama al método readable.resume()
sin un listener adjunto al evento 'data'
, o cuando se elimina un controlador de eventos 'data'
del stream.
Añadir un controlador de eventos 'readable'
hace que el stream deje de fluir automáticamente, y los datos tienen que ser consumidos a través de readable.read()
. Si se elimina el controlador de eventos 'readable'
, el stream volverá a fluir si hay un controlador de eventos 'data'
.
Tres estados
Los "dos modos" de operación para un flujo Readable
son una abstracción simplificada de la gestión de estados internos más compleja que está ocurriendo dentro de la implementación del flujo Readable
.
Específicamente, en cualquier momento dado, cada Readable
está en uno de tres estados posibles:
readable.readableFlowing === null
readable.readableFlowing === false
readable.readableFlowing === true
Cuando readable.readableFlowing
es null
, no se proporciona ningún mecanismo para consumir los datos del flujo. Por lo tanto, el flujo no generará datos. Mientras esté en este estado, adjuntar un listener para el evento 'data'
, llamar al método readable.pipe()
o llamar al método readable.resume()
cambiará readable.readableFlowing
a true
, lo que hará que el Readable
comience a emitir eventos activamente a medida que se generen los datos.
Llamar a readable.pause()
, readable.unpipe()
o recibir contrapresión hará que readable.readableFlowing
se establezca como false
, deteniendo temporalmente el flujo de eventos pero no deteniendo la generación de datos. Mientras esté en este estado, adjuntar un listener para el evento 'data'
no cambiará readable.readableFlowing
a true
.
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()
pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing ahora es false.
pass.on('data', chunk => {
console.log(chunk.toString())
})
// readableFlowing sigue siendo false.
pass.write('ok') // No emitirá 'data'.
pass.resume() // Debe ser llamado para que el flujo emita 'data'.
// readableFlowing ahora es true.
Mientras readable.readableFlowing
sea false
, los datos pueden acumularse dentro del búfer interno del flujo.
Elige un estilo de API
La API de flujo Readable
evolucionó a través de múltiples versiones de Node.js y proporciona múltiples métodos para consumir datos de flujo. En general, los desarrolladores deben elegir uno de los métodos de consumo de datos y nunca deben usar múltiples métodos para consumir datos de un solo flujo. Específicamente, usar una combinación de on('data')
, on('readable')
, pipe()
o iteradores asíncronos podría llevar a un comportamiento poco intuitivo.
Clase: stream.Readable
Agregado en: v0.9.4
Evento: 'close'
[Historial]
Versión | Cambios |
---|---|
v10.0.0 | Agregada la opción emitClose para especificar si se emite 'close' al destruir. |
v0.9.4 | Agregado en: v0.9.4 |
El evento 'close'
se emite cuando el flujo y cualquiera de sus recursos subyacentes (un descriptor de archivo, por ejemplo) se han cerrado. El evento indica que no se emitirán más eventos y no se producirá más computación.
Un flujo Readable
siempre emitirá el evento 'close'
si se crea con la opción emitClose
.
Evento: 'data'
Agregado en: v0.9.4
chunk
<Buffer> | <string> | <any> El fragmento de datos. Para los flujos que no están operando en modo objeto, el fragmento será una cadena oBuffer
. Para los flujos que están en modo objeto, el fragmento puede ser cualquier valor de JavaScript que no seanull
.
El evento 'data'
se emite cada vez que el flujo renuncia a la propiedad de un fragmento de datos a un consumidor. Esto puede ocurrir cuando el flujo se cambia a modo de flujo llamando a readable.pipe()
, readable.resume()
o adjuntando una función de devolución de llamada de escucha al evento 'data'
. El evento 'data'
también se emitirá siempre que se llame al método readable.read()
y haya un fragmento de datos disponible para ser devuelto.
Adjuntar un escucha de evento 'data'
a un flujo que no se ha pausado explícitamente cambiará el flujo al modo de flujo. Los datos se pasarán tan pronto como estén disponibles.
La función de devolución de llamada del escucha recibirá el fragmento de datos como una cadena si se ha especificado una codificación predeterminada para el flujo utilizando el método readable.setEncoding()
; de lo contrario, los datos se pasarán como un Buffer
.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Recibidos ${chunk.length} bytes de datos.`)
})
Evento: 'end'
Agregado en: v0.9.4
El evento 'end'
se emite cuando no hay más datos para consumir del flujo.
El evento 'end'
no se emitirá a menos que los datos se consuman por completo. Esto se puede lograr cambiando el flujo al modo de flujo o llamando a stream.read()
repetidamente hasta que se hayan consumido todos los datos.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Recibidos ${chunk.length} bytes de datos.`)
})
readable.on('end', () => {
console.log('No habrá más datos.')
})
Evento: 'error'
Agregado en: v0.9.4
El evento 'error'
puede ser emitido por una implementación de Readable
en cualquier momento. Por lo general, esto puede ocurrir si el flujo subyacente no puede generar datos debido a una falla interna subyacente, o cuando una implementación de flujo intenta insertar un fragmento de datos no válido.
La función de devolución de llamada del escucha recibirá un solo objeto Error
.
Evento: 'pause'
Agregado en: v0.9.4
El evento 'pause'
se emite cuando se llama a stream.pause()
y readableFlowing
no es false
.
Evento: 'readable'
[Historial]
Versión | Cambios |
---|---|
v10.0.0 | El 'readable' siempre se emite en el siguiente tick después de que se llama a .push() . |
v10.0.0 | El uso de 'readable' requiere llamar a .read() . |
v0.9.4 | Agregado en: v0.9.4 |
El evento 'readable'
se emite cuando hay datos disponibles para ser leídos del flujo, hasta la marca de agua alta configurada (state.highWaterMark
). Efectivamente, indica que el flujo tiene nueva información dentro del búfer. Si hay datos disponibles dentro de este búfer, se puede llamar a stream.read()
para recuperar esos datos. Además, el evento 'readable'
también se puede emitir cuando se ha alcanzado el final del flujo.
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
// Hay algunos datos para leer ahora.
let data
while ((data = this.read()) !== null) {
console.log(data)
}
})
Si se ha alcanzado el final del flujo, llamar a stream.read()
devolverá null
y activará el evento 'end'
. Esto también es cierto si nunca hubo datos para leer. Por ejemplo, en el siguiente ejemplo, foo.txt
es un archivo vacío:
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
console.log('end')
})
La salida de la ejecución de este script es:
$ node test.js
readable: null
end
En algunos casos, adjuntar un escucha para el evento 'readable'
hará que se lea una cierta cantidad de datos en un búfer interno.
En general, los mecanismos readable.pipe()
y evento 'data'
son más fáciles de entender que el evento 'readable'
. Sin embargo, el manejo de 'readable'
podría resultar en un mayor rendimiento.
Si tanto 'readable'
como 'data'
se usan al mismo tiempo, 'readable'
tiene prioridad para controlar el flujo, es decir, 'data'
se emitirá solo cuando se llame a stream.read()
. La propiedad readableFlowing
se volvería false
. Si hay escuchas 'data'
cuando se elimina 'readable'
, el flujo comenzará a fluir, es decir, los eventos 'data'
se emitirán sin llamar a .resume()
.
Evento: 'resume'
Agregado en: v0.9.4
El evento 'resume'
se emite cuando se llama a stream.resume()
y readableFlowing
no es true
.
readable.destroy([error])
[Historial]
Versión | Cambios |
---|---|
v14.0.0 | Funciona como una operación sin efecto en un flujo que ya ha sido destruido. |
v8.0.0 | Agregado en: v8.0.0 |
Destruye el flujo. Opcionalmente, emite un evento 'error'
y emite un evento 'close'
(a menos que emitClose
se establezca en false
). Después de esta llamada, el flujo legible liberará cualquier recurso interno y se ignorarán las llamadas posteriores a push()
.
Una vez que se ha llamado a destroy()
, cualquier llamada adicional será una operación sin efecto y no se emitirán más errores que no sean de _destroy()
como 'error'
.
Los implementadores no deben anular este método, sino implementar readable._destroy()
.
readable.closed
Agregado en: v18.0.0
Es true
después de que se ha emitido 'close'
.
readable.destroyed
Agregado en: v8.0.0
Es true
después de que se ha llamado a readable.destroy()
.
readable.isPaused()
Agregado en: v0.11.14
- Devuelve: <boolean>
El método readable.isPaused()
devuelve el estado operativo actual de Readable
. Esto es utilizado principalmente por el mecanismo que subyace al método readable.pipe()
. En la mayoría de los casos típicos, no habrá razón para usar este método directamente.
const readable = new stream.Readable()
readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()
Agregado en: v0.9.4
- Devuelve: <this>
El método readable.pause()
hará que un flujo en modo de flujo deje de emitir eventos 'data'
, saliendo del modo de flujo. Cualquier dato que esté disponible permanecerá en el búfer interno.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Recibidos ${chunk.length} bytes de datos.`)
readable.pause()
console.log('No habrá datos adicionales durante 1 segundo.')
setTimeout(() => {
console.log('Ahora los datos comenzarán a fluir nuevamente.')
readable.resume()
}, 1000)
})
El método readable.pause()
no tiene ningún efecto si hay un escucha de evento 'readable'
.
readable.pipe(destination[, options])
Agregado en: v0.9.4
destination
<stream.Writable> El destino para escribir datosoptions
<Object> Opciones de tuberíaend
<boolean> Termina el escritor cuando termina el lector. Predeterminado:true
.
Devuelve: <stream.Writable> El destino, lo que permite una cadena de tuberías si es un flujo
Duplex
oTransform
El método readable.pipe()
adjunta un flujo Writable
a readable
, lo que hace que cambie automáticamente al modo de flujo y envíe todos sus datos al Writable
adjunto. El flujo de datos se gestionará automáticamente para que el flujo Writable
de destino no se vea desbordado por un flujo Readable
más rápido.
El siguiente ejemplo canaliza todos los datos de readable
a un archivo llamado file.txt
:
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Todos los datos de readable van a 'file.txt'.
readable.pipe(writable)
Es posible adjuntar múltiples flujos Writable
a un solo flujo Readable
.
El método readable.pipe()
devuelve una referencia al flujo destino, lo que permite configurar cadenas de flujos canalizados:
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)
De forma predeterminada, se llama a stream.end()
en el flujo Writable
de destino cuando el flujo Readable
de origen emite 'end'
, de modo que el destino ya no sea escribible. Para deshabilitar este comportamiento predeterminado, se puede pasar la opción end
como false
, lo que hace que el flujo de destino permanezca abierto:
reader.pipe(writer, { end: false })
reader.on('end', () => {
writer.end('Adiós\n')
})
Una advertencia importante es que si el flujo Readable
emite un error durante el procesamiento, el destino Writable
no se cierra automáticamente. Si se produce un error, será necesario cerrar manualmente cada flujo para evitar fugas de memoria.
Los flujos Writable
process.stderr
y process.stdout
nunca se cierran hasta que finaliza el proceso de Node.js, independientemente de las opciones especificadas.
readable.read([size])
Agregado en: v0.9.4
size
<number> Argumento opcional para especificar cuántos datos leer.- Devuelve: <string> | <Buffer> | <null> | <any>
El método readable.read()
lee los datos del búfer interno y los devuelve. Si no hay datos disponibles para ser leídos, se devuelve null
. De forma predeterminada, los datos se devuelven como un objeto Buffer
a menos que se haya especificado una codificación utilizando el método readable.setEncoding()
o el flujo esté operando en modo de objeto.
El argumento opcional size
especifica un número específico de bytes para leer. Si no hay size
bytes disponibles para leer, se devolverá null
a menos que el flujo haya terminado, en cuyo caso se devolverán todos los datos restantes en el búfer interno.
Si no se especifica el argumento size
, se devolverán todos los datos contenidos en el búfer interno.
El argumento size
debe ser menor o igual a 1 GiB.
El método readable.read()
solo debe llamarse en flujos Readable
que operan en modo pausado. En modo de flujo, se llama automáticamente a readable.read()
hasta que el búfer interno se drena por completo.
const readable = getReadableStreamSomehow()
// 'readable' se puede activar varias veces a medida que los datos se almacenan en el búfer
readable.on('readable', () => {
let chunk
console.log('El flujo es legible (nuevos datos recibidos en el búfer)')
// Use un bucle para asegurarse de leer todos los datos disponibles actualmente
while (null !== (chunk = readable.read())) {
console.log(`Leídos ${chunk.length} bytes de datos...`)
}
})
// 'end' se activará una vez cuando no haya más datos disponibles
readable.on('end', () => {
console.log('Se alcanzó el final del flujo.')
})
Cada llamada a readable.read()
devuelve un fragmento de datos o null
, lo que significa que no hay más datos para leer en ese momento. Estos fragmentos no se concatenan automáticamente. Debido a que una sola llamada a read()
no devuelve todos los datos, puede ser necesario usar un bucle while para leer continuamente fragmentos hasta que se recuperen todos los datos. Al leer un archivo grande, .read()
podría devolver null
temporalmente, lo que indica que ha consumido todo el contenido almacenado en el búfer, pero puede haber más datos aún por almacenar en el búfer. En tales casos, se emite un nuevo evento 'readable'
una vez que hay más datos en el búfer, y el evento 'end'
significa el final de la transmisión de datos.
Por lo tanto, para leer todo el contenido de un archivo de un readable
, es necesario recopilar fragmentos en múltiples eventos 'readable'
:
const chunks = []
readable.on('readable', () => {
let chunk
while (null !== (chunk = readable.read())) {
chunks.push(chunk)
}
})
readable.on('end', () => {
const content = chunks.join('')
})
Un flujo Readable
en modo objeto siempre devolverá un solo elemento de una llamada a readable.read(size)
, independientemente del valor del argumento size
.
Si el método readable.read()
devuelve un fragmento de datos, también se emitirá un evento 'data'
.
Llamar a stream.read([size])
después de que se haya emitido el evento 'end'
devolverá null
. No se generará ningún error de tiempo de ejecución.
readable.readable
Agregado en: v11.4.0
Es true
si es seguro llamar a readable.read()
, lo que significa que el flujo no ha sido destruido ni ha emitido 'error'
o 'end'
.
readable.readableAborted
Agregado en: v16.8.0
[Estable: 1 - Experimental]
Estable: 1 Estabilidad: 1 - Experimental
Devuelve si el flujo fue destruido o tuvo un error antes de emitir 'end'
.
readable.readableDidRead
Agregado en: v16.7.0, v14.18.0
[Estable: 1 - Experimental]
Estable: 1 Estabilidad: 1 - Experimental
Devuelve si se ha emitido 'data'
.
readable.readableEncoding
Agregado en: v12.7.0
Obtenedor para la propiedad encoding
de un flujo Readable
dado. La propiedad encoding
se puede configurar utilizando el método readable.setEncoding()
.
readable.readableEnded
Agregado en: v12.9.0
Se vuelve true
cuando se emite el evento 'end'
.
readable.errored
Agregado en: v18.0.0
Devuelve el error si el flujo ha sido destruido con un error.
readable.readableFlowing
Agregado en: v9.4.0
Esta propiedad refleja el estado actual de un flujo Readable
como se describe en la sección Tres estados.
readable.readableHighWaterMark
Agregado en: v9.3.0
Devuelve el valor de highWaterMark
pasado al crear este Readable
.
readable.readableLength
Agregado en: v9.4.0
Esta propiedad contiene el número de bytes (u objetos) en la cola listos para ser leídos. El valor proporciona datos de introspección con respecto al estado de la highWaterMark
.
readable.readableObjectMode
Agregado en: v12.3.0
Obtenedor para la propiedad objectMode
de un flujo Readable
dado.
readable.resume()
[Historial]
Versión | Cambios |
---|---|
v10.0.0 | El resume() no tiene efecto si hay un escucha de evento 'readable' . |
v0.9.4 | Agregado en: v0.9.4 |
- Devuelve: <this>
El método readable.resume()
hace que un flujo Readable
pausado explícitamente reanude la emisión de eventos 'data'
, cambiando el flujo al modo de flujo.
El método readable.resume()
se puede utilizar para consumir por completo los datos de un flujo sin procesar realmente ninguno de esos datos:
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Llegó al final, pero no leyó nada.')
})
El método readable.resume()
no tiene ningún efecto si hay un escucha de evento 'readable'
.
readable.setEncoding(encoding)
Agregado en: v0.9.4
El método readable.setEncoding()
establece la codificación de caracteres para los datos leídos del flujo Readable
.
De forma predeterminada, no se asigna ninguna codificación y los datos de flujo se devolverán como objetos Buffer
. Establecer una codificación hace que los datos de flujo se devuelvan como cadenas de la codificación especificada en lugar de como objetos Buffer
. Por ejemplo, llamar a readable.setEncoding('utf8')
hará que los datos de salida se interpreten como datos UTF-8 y se pasen como cadenas. Llamar a readable.setEncoding('hex')
hará que los datos se codifiquen en formato de cadena hexadecimal.
El flujo Readable
manejará correctamente los caracteres multibyte entregados a través del flujo que de otro modo se decodificarían incorrectamente si se extrajeran simplemente del flujo como objetos Buffer
.
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
assert.equal(typeof chunk, 'string')
console.log('Obtenidos %d caracteres de datos de cadena:', chunk.length)
})
readable.unpipe([destination])
Agregado en: v0.9.4
destination
<stream.Writable> Flujo específico opcional para desconectar- Devuelve: <this>
El método readable.unpipe()
desconecta un flujo Writable
adjuntado previamente utilizando el método stream.pipe()
.
Si no se especifica el destination
, se desconectan todas las tuberías.
Si se especifica el destination
, pero no se ha configurado ninguna tubería para él, el método no hace nada.
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Todos los datos de readable van a 'file.txt',
// pero solo durante el primer segundo.
readable.pipe(writable)
setTimeout(() => {
console.log('Dejar de escribir en file.txt.')
readable.unpipe(writable)
console.log('Cerrar manualmente el flujo de archivos.')
writable.end()
}, 1000)
readable.unshift(chunk[, encoding])
[Historial]
Versión | Cambios |
---|---|
v22.0.0, v20.13.0 | El argumento chunk ahora puede ser una instancia de TypedArray o DataView . |
v8.0.0 | El argumento chunk ahora puede ser una instancia de Uint8Array . |
v0.9.11 | Agregado en: v0.9.11 |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Fragmento de datos para desviar de nuevo a la cola de lectura. Para flujos que no operan en modo de objeto,chunk
debe ser un <string>, <Buffer>, <TypedArray>, <DataView> onull
. Para los flujos de modo de objeto,chunk
puede ser cualquier valor de JavaScript.encoding
<string> Codificación de fragmentos de cadena. Debe ser una codificaciónBuffer
válida, como'utf8'
o'ascii'
.
Pasar chunk
como null
señala el final del flujo (EOF) y se comporta de la misma manera que readable.push(null)
, después de lo cual no se pueden escribir más datos. La señal EOF se coloca al final del búfer y cualquier dato en búfer todavía se vaciará.
El método readable.unshift()
vuelve a insertar un fragmento de datos en el búfer interno. Esto es útil en ciertas situaciones en las que un flujo está siendo consumido por un código que necesita "desconsumir" una cierta cantidad de datos que ha extraído de forma optimista de la fuente, de modo que los datos se puedan pasar a otra parte.
El método stream.unshift(chunk)
no se puede llamar después de que se haya emitido el evento 'end'
o se producirá un error de tiempo de ejecución.
Los desarrolladores que usan stream.unshift()
a menudo deberían considerar cambiar al uso de un flujo Transform
en su lugar. Consulte la sección API para implementadores de flujos para obtener más información.
// Extraer un encabezado delimitado por \n\n.
// Usar unshift() si obtenemos demasiado.
// Llamar a la función de devolución de llamada con (error, encabezado, flujo).
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
stream.on('error', callback)
stream.on('readable', onReadable)
const decoder = new StringDecoder('utf8')
let header = ''
function onReadable() {
let chunk
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk)
if (str.includes('\n\n')) {
// Se encontró el límite del encabezado.
const split = str.split(/\n\n/)
header += split.shift()
const remaining = split.join('\n\n')
const buf = Buffer.from(remaining, 'utf8')
stream.removeListener('error', callback)
// Eliminar el escucha 'readable' antes de desviar.
stream.removeListener('readable', onReadable)
if (buf.length) stream.unshift(buf)
// Ahora se puede leer el cuerpo del mensaje desde el flujo.
callback(null, header, stream)
return
}
// Todavía leyendo el encabezado.
header += str
}
}
}
A diferencia de stream.push(chunk)
, stream.unshift(chunk)
no terminará el proceso de lectura restableciendo el estado de lectura interno del flujo. Esto puede causar resultados inesperados si se llama a readable.unshift()
durante una lectura (es decir, desde dentro de una implementación de stream._read()
en un flujo personalizado). Seguir la llamada a readable.unshift()
con un stream.push('')
inmediato restablecerá el estado de lectura de forma adecuada, sin embargo, es mejor simplemente evitar llamar a readable.unshift()
mientras se está realizando una lectura.
readable.wrap(stream)
Agregado en: v0.9.4
Antes de Node.js 0.10, los flujos no implementaban toda la API del módulo node:stream
como está definida actualmente. (Consulte Compatibilidad para obtener más información).
Cuando se utiliza una biblioteca Node.js anterior que emite eventos 'data'
y tiene un método stream.pause()
que es solo de asesoramiento, se puede utilizar el método readable.wrap()
para crear un flujo Readable
que utiliza el flujo anterior como fuente de datos.
Rara vez será necesario usar readable.wrap()
, pero el método se ha proporcionado como una conveniencia para interactuar con aplicaciones y bibliotecas Node.js más antiguas.
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)
myReader.on('readable', () => {
myReader.read() // etc.
})
readable[Symbol.asyncIterator]()
[Historial]
Versión | Cambios |
---|---|
v11.14.0 | El soporte de Symbol.asyncIterator ya no es experimental. |
v10.0.0 | Agregado en: v10.0.0 |
- Devuelve: [<AsyncIterator>](https://tc39
Streams dúplex y de transformación
Clase: stream.Duplex
[Historial]
Versión | Cambios |
---|---|
v6.8.0 | Las instancias de Duplex ahora devuelven true al verificar instanceof stream.Writable . |
v0.9.4 | Agregado en: v0.9.4 |
Los streams dúplex son streams que implementan las interfaces Readable
y Writable
.
Ejemplos de streams Duplex
incluyen:
duplex.allowHalfOpen
Agregado en: v0.9.4
Si es false
, el stream finalizará automáticamente el lado de escritura cuando finalice el lado de lectura. Se establece inicialmente mediante la opción de constructor allowHalfOpen
, que por defecto es true
.
Esto se puede cambiar manualmente para modificar el comportamiento de apertura parcial de una instancia de stream Duplex
existente, pero se debe cambiar antes de que se emita el evento 'end'
.
Clase: stream.Transform
Añadido en: v0.9.4
Las secuencias de transformación son secuencias Duplex
donde la salida está relacionada de alguna manera con la entrada. Al igual que todas las secuencias Duplex
, las secuencias Transform
implementan tanto las interfaces Readable
como Writable
.
Algunos ejemplos de secuencias Transform
son:
transform.destroy([error])
[Historial]
Versión | Cambios |
---|---|
v14.0.0 | Funciona como una operación no op en una secuencia que ya ha sido destruida. |
v8.0.0 | Añadido en: v8.0.0 |
Destruye la secuencia y, opcionalmente, emite un evento 'error'
. Después de esta llamada, la secuencia de transformación liberará cualquier recurso interno. Los implementadores no deben anular este método, sino implementar readable._destroy()
. La implementación predeterminada de _destroy()
para Transform
también emite 'close'
a menos que emitClose
se establezca en falso.
Una vez que se ha llamado a destroy()
, cualquier llamada posterior será una operación no op y no se podrán emitir más errores que los de _destroy()
como 'error'
.
stream.duplexPair([opciones])
Agregado en: v22.6.0, v20.17.0
opciones
<Objeto> Un valor para pasar a ambos constructoresDuplex
, para establecer opciones como el almacenamiento en búfer.- Devuelve: <Array> de dos instancias
Duplex
.
La función de utilidad duplexPair
devuelve un Array con dos elementos, cada uno siendo un flujo Duplex
conectado al otro lado:
const [ladoA, ladoB] = duplexPair()
Lo que sea que se escriba en un flujo se hace legible en el otro. Proporciona un comportamiento análogo a una conexión de red, donde los datos escritos por el cliente se vuelven legibles por el servidor, y viceversa.
Los flujos Duplex son simétricos; uno u otro puede ser usado sin ninguna diferencia en el comportamiento.
stream.finished(stream[, opciones], callback)
[Historial]
Versión | Cambios |
---|---|
v19.5.0 | Se agregó soporte para ReadableStream y WritableStream . |
v15.11.0 | Se agregó la opción signal . |
v14.0.0 | El finished(stream, cb) esperará el evento 'close' antes de invocar la devolución de llamada. La implementación intenta detectar flujos heredados y solo aplica este comportamiento a flujos que se espera que emitan 'close' . |
v14.0.0 | Emitir 'close' antes de 'end' en un flujo Readable causará un error ERR_STREAM_PREMATURE_CLOSE . |
v14.0.0 | La devolución de llamada se invocará en flujos que ya hayan terminado antes de la llamada a finished(stream, cb) . |
v10.0.0 | Agregado en: v10.0.0 |
stream
<Flujo> | <ReadableStream> | <WritableStream> Un flujo/webstream legible y/o escribible.opciones
<Objeto>error
<booleano> Si se establece enfalse
, entonces una llamada aemit('error', err)
no se trata como terminada. Predeterminado:true
.readable
<booleano> Cuando se establece enfalse
, la devolución de llamada se llamará cuando el flujo termine, aunque el flujo aún pueda ser legible. Predeterminado:true
.writable
<booleano> Cuando se establece enfalse
, la devolución de llamada se llamará cuando el flujo termine, aunque el flujo aún pueda ser escribible. Predeterminado:true
.signal
<AbortSignal> permite abortar la espera para que termine el flujo. El flujo subyacente no se abortará si se aborta la señal. La devolución de llamada se llamará con unAbortError
. Todos los listeners registrados agregados por esta función también se eliminarán.
callback
<Función> Una función de devolución de llamada que toma un argumento de error opcional.Devuelve: <Función> Una función de limpieza que elimina todos los listeners registrados.
Una función para recibir una notificación cuando un flujo ya no es legible, escribible o ha experimentado un error o un evento de cierre prematuro.
const { finished } = require('node:stream')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
finished(rs, err => {
if (err) {
console.error('El flujo falló.', err)
} else {
console.log('El flujo ha terminado de leer.')
}
})
rs.resume() // Vaciar el flujo.
Especialmente útil en escenarios de manejo de errores donde un flujo se destruye prematuramente (como una solicitud HTTP abortada), y no emitirá 'end'
o 'finish'
.
La API finished
proporciona versión de promesa.
stream.finished()
deja listeners de eventos colgantes (en particular 'error'
, 'end'
, 'finish'
y 'close'
) después de que se ha invocado callback
. La razón de esto es para que los eventos 'error'
inesperados (debido a implementaciones de flujo incorrectas) no causen fallas inesperadas. Si este es un comportamiento no deseado, entonces la función de limpieza devuelta debe invocarse en la devolución de llamada:
const cleanup = finished(rs, err => {
cleanup()
// ...
})
stream.pipeline(source[, ...transforms], destination, callback)
stream.pipeline(streams, callback)
[Historial]
Versión | Cambios |
---|---|
v19.7.0, v18.16.0 | Se agregó soporte para webstreams. |
v18.0.0 | Pasar una callback no válida al argumento callback ahora lanza ERR_INVALID_ARG_TYPE en lugar de ERR_INVALID_CALLBACK . |
v14.0.0 | El pipeline(..., cb) esperará el evento 'close' antes de invocar la callback. La implementación intenta detectar flujos heredados y solo aplica este comportamiento a flujos que se espera que emitan 'close' . |
v13.10.0 | Se añadió soporte para generadores asíncronos. |
v10.0.0 | Añadido en: v10.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function> | <ReadableStream>- Devuelve: <Iterable> | <AsyncIterable>
...transforms
<Stream> | <Function> | <TransformStream>source
<AsyncIterable>- Devuelve: <AsyncIterable>
destination
<Stream> | <Function> | <WritableStream>source
<AsyncIterable>- Devuelve: <AsyncIterable> | <Promise>
callback
<Function> Se llama cuando la pipeline ha terminado por completo.err
<Error>val
Valor resuelto dePromise
devuelto pordestination
.
Devuelve: <Stream>
Un método de módulo para conectar entre flujos y generadores reenviando errores y limpiando correctamente y proporcionando una callback cuando la pipeline está completa.
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// Usa la API de pipeline para conectar fácilmente una serie de flujos
// juntos y ser notificado cuando la pipeline esté completamente lista.
// Una pipeline para comprimir con gzip un archivo tar potencialmente enorme de manera eficiente:
pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
if (err) {
console.error('La pipeline falló.', err)
} else {
console.log('La pipeline tuvo éxito.')
}
})
La API de pipeline
proporciona una versión de promesa.
stream.pipeline()
llamará a stream.destroy(err)
en todos los flujos excepto:
- Flujos
Readable
que han emitido'end'
o'close'
. - Flujos
Writable
que han emitido'finish'
o'close'
.
stream.pipeline()
deja escuchadores de eventos colgando en los flujos después de que se ha invocado la callback
. En el caso de la reutilización de flujos después de un fallo, esto puede causar fugas de escuchadores de eventos y errores tragados. Si el último flujo es legible, se eliminarán los escuchadores de eventos colgantes para que el último flujo pueda consumirse más tarde.
stream.pipeline()
cierra todos los flujos cuando se produce un error. El uso de IncomingRequest
con pipeline
podría conducir a un comportamiento inesperado una vez que destruiría el socket sin enviar la respuesta esperada. Consulta el ejemplo a continuación:
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt')
pipeline(fileStream, res, err => {
if (err) {
console.log(err) // No existe tal archivo
// este mensaje no se puede enviar una vez que `pipeline` ya ha destruido el socket
return res.end('¡¡¡error!!!')
}
})
})
stream.compose(...streams)
[Historial]
Versión | Cambios |
---|---|
v21.1.0, v20.10.0 | Se agregó soporte para la clase stream. |
v19.8.0, v18.16.0 | Se agregó soporte para webstreams. |
v16.9.0 | Agregado en: v16.9.0 |
[Estable: 1 - Experimental]
Estable: 1 Estabilidad: 1 - stream.compose
es experimental.
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]> | <Duplex[]> | <Function>- Devuelve: <stream.Duplex>
Combina dos o más streams en un stream Duplex
que escribe en el primer stream y lee desde el último. Cada stream proporcionado se canaliza al siguiente, utilizando stream.pipeline
. Si alguno de los streams genera un error, todos se destruyen, incluido el stream Duplex
externo.
Debido a que stream.compose
devuelve un nuevo stream que a su vez puede (y debe) canalizarse en otros streams, permite la composición. En contraste, al pasar streams a stream.pipeline
, normalmente el primer stream es un stream legible y el último un stream de escritura, formando un circuito cerrado.
Si se pasa una Function
, debe ser un método de fábrica que tome un Iterable
de source
.
import { compose, Transform } from 'node:stream'
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''))
},
})
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
}
let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf
}
console.log(res) // imprime 'HELLOWORLD'
stream.compose
se puede utilizar para convertir iterables asíncronos, generadores y funciones en streams.
AsyncIterable
se convierte en unDuplex
legible. No puede producirnull
.AsyncGeneratorFunction
se convierte en unDuplex
de transformación legible/escribible. Debe tomar unAsyncIterable
desource
como primer parámetro. No puede producirnull
.AsyncFunction
se convierte en unDuplex
de escritura. Debe devolvernull
oundefined
.
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'
// Convierte AsyncIterable en un Duplex legible.
const s1 = compose(
(async function* () {
yield 'Hello'
yield 'World'
})()
)
// Convierte AsyncGenerator en un Duplex de transformación.
const s2 = compose(async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
})
let res = ''
// Convierte AsyncFunction en un Duplex de escritura.
const s3 = compose(async function (source) {
for await (const chunk of source) {
res += chunk
}
})
await finished(compose(s1, s2, s3))
console.log(res) // imprime 'HELLOWORLD'
Consulta readable.compose(stream)
para ver stream.compose
como operador.
stream.Readable.from(iterable[, options])
Agregado en: v12.3.0, v10.17.0
iterable
<Iterable> Objeto que implementa el protocolo iterableSymbol.asyncIterator
oSymbol.iterator
. Emite un evento 'error' si se pasa un valor nulo.options
<Object> Opciones proporcionadas anew stream.Readable([options])
. Por defecto,Readable.from()
estableceráoptions.objectMode
atrue
, a menos que se opte explícitamente por no hacerlo estableciendooptions.objectMode
afalse
.- Devuelve: <stream.Readable>
Un método de utilidad para crear streams legibles a partir de iteradores.
const { Readable } = require('node:stream')
async function* generate() {
yield 'hola'
yield 'streams'
}
const readable = Readable.from(generate())
readable.on('data', chunk => {
console.log(chunk)
})
Llamar a Readable.from(string)
o Readable.from(buffer)
no hará que las cadenas o buffers se iteren para que coincidan con la semántica de otros streams por razones de rendimiento.
Si un objeto Iterable
que contiene promesas se pasa como argumento, podría resultar en un rechazo no manejado.
const { Readable } = require('node:stream')
Readable.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Rechazo no manejado
])
stream.Readable.fromWeb(readableStream[, options])
Agregado en: v17.0.0
[Estable: 1 - Experimental]
Estable: 1 Estabilidad: 1 - Experimental
readableStream
<ReadableStream>options
<Object>encoding
<string>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
Devuelve: <stream.Readable>
stream.Readable.isDisturbed(stream)
Agregado en: v16.8.0
[Estable: 1 - Experimental]
Estable: 1 Estabilidad: 1 - Experimental
stream
<stream.Readable> | <ReadableStream>- Devuelve:
boolean
Devuelve si el stream ha sido leído o cancelado.
stream.isErrored(stream)
Agregado en: v17.3.0, v16.14.0
[Estable: 1 - Experimental]
Estable: 1 Estabilidad: 1 - Experimental
stream
<Readable> | <Writable> | <Duplex> | <WritableStream> | <ReadableStream>- Devuelve: <boolean>
Devuelve si el flujo ha encontrado un error.
stream.isReadable(stream)
Agregado en: v17.4.0, v16.14.0
[Estable: 1 - Experimental]
Estable: 1 Estabilidad: 1 - Experimental
stream
<Readable> | <Duplex> | <ReadableStream>- Devuelve: <boolean>
Devuelve si el flujo es legible.
stream.Readable.toWeb(streamReadable[, options])
Agregado en: v17.0.0
[Estable: 1 - Experimental]
Estable: 1 Estabilidad: 1 - Experimental
streamReadable
<stream.Readable>options
<Object>strategy
<Object>highWaterMark
<number> El tamaño máximo de la cola interna (delReadableStream
creado) antes de que se aplique la contrapresión al leer delstream.Readable
dado. Si no se proporciona ningún valor, se tomará delstream.Readable
dado.size
<Function> Una función que indica el tamaño del fragmento de datos dado. Si no se proporciona ningún valor, el tamaño será1
para todos los fragmentos.chunk
<any>- Returns: <number>
Returns: <ReadableStream>
stream.Writable.fromWeb(writableStream[, options])
Agregado en: v17.0.0
[Estable: 1 - Experimental]
Estable: 1 Estabilidad: 1 - Experimental
writableStream
<WritableStream>options
<Objeto>decodeStrings
<booleano>highWaterMark
<número>objectMode
<booleano>signal
<AbortSignal>
Devuelve: <stream.Writable>
stream.Writable.toWeb(streamWritable)
Agregado en: v17.0.0
[Estable: 1 - Experimental]
Estable: 1 Estabilidad: 1 - Experimental
streamWritable
<stream.Writable>- Devuelve: <WritableStream>
stream.Duplex.from(src)
[Historial]
Versión | Cambios |
---|---|
v19.5.0, v18.17.0 | El argumento src ahora puede ser un ReadableStream o un WritableStream . |
v16.8.0 | Añadido en: v16.8.0 |
src
<Stream> | <Blob> | <ArrayBuffer> | <string> | <Iterable> | <AsyncIterable> | <AsyncGeneratorFunction> | <AsyncFunction> | <Promise> | <Object> | <ReadableStream> | <WritableStream>
Un método de utilidad para crear streams dúplex.
Stream
convierte un stream de escritura en unDuplex
de escritura y un stream de lectura en unDuplex
.Blob
se convierte en unDuplex
de lectura.string
se convierte en unDuplex
de lectura.ArrayBuffer
se convierte en unDuplex
de lectura.AsyncIterable
se convierte en unDuplex
de lectura. No puede producirnull
.AsyncGeneratorFunction
se convierte en unDuplex
de transformación de lectura/escritura. Debe tomar unAsyncIterable
de origen como primer parámetro. No puede producirnull
.AsyncFunction
se convierte en unDuplex
de escritura. Debe devolvernull
oundefined
Object ({ writable, readable })
conviertereadable
ywritable
enStream
y luego los combina enDuplex
donde elDuplex
escribirá enwritable
y leerá desdereadable
.Promise
se convierte en unDuplex
de lectura. El valornull
se ignora.ReadableStream
se convierte en unDuplex
de lectura.WritableStream
se convierte en unDuplex
de escritura.- Devuelve: <stream.Duplex>
Si se pasa un objeto Iterable
que contiene promesas como argumento, podría resultar en un rechazo no manejado.
const { Duplex } = require('node:stream')
Duplex.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Rechazo no manejado
])
stream.Duplex.fromWeb(pair[, options])
Añadido en: v17.0.0
[Stable: 1 - Experimental]
Stable: 1 Estabilidad: 1 - Experimental
pair
<Object>readable
<ReadableStream>writable
<WritableStream>
options
<Object>Devuelve: <stream.Duplex>
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
for await (const chunk of duplex) {
console.log('readable', chunk)
}
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))
stream.Duplex.toWeb(streamDuplex)
Añadido en: v17.0.0
[Estable: 1 - Experimental]
Estable: 1 Estabilidad: 1 - Experimental
streamDuplex
<stream.Duplex>- Devuelve: <Object>
readable
<ReadableStream>writable
<WritableStream>
import { Duplex } from 'node:stream'
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
const { value } = await readable.getReader().read()
console.log('readable', value)
const { Duplex } = require('node:stream')
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
readable
.getReader()
.read()
.then(result => {
console.log('readable', result.value)
})
stream.addAbortSignal(signal, stream)
[Historial]
Versión | Cambios |
---|---|
v19.7.0, v18.16.0 | Se añadió soporte para ReadableStream y WritableStream . |
v15.4.0 | Añadido en: v15.4.0 |
signal
<AbortSignal> Una señal que representa una posible cancelaciónstream
<Stream> | <ReadableStream> | <WritableStream> Un flujo al que adjuntar una señal.
Adjunta una AbortSignal a un flujo de lectura o escritura. Esto permite que el código controle la destrucción del flujo utilizando un AbortController
.
Llamar a abort
en el AbortController
correspondiente a la AbortSignal
pasada se comportará de la misma manera que llamar a .destroy(new AbortError())
en el flujo, y controller.error(new AbortError())
para los flujos web.
const fs = require('node:fs')
const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// Más tarde, aborta la operación cerrando el flujo
controller.abort()
O usando una AbortSignal
con un flujo legible como un iterable asíncrono:
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // establecer un tiempo de espera
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
try {
for await (const chunk of stream) {
await process(chunk)
}
} catch (e) {
if (e.name === 'AbortError') {
// La operación fue cancelada
} else {
throw e
}
}
})()
O usando un AbortSignal
con un ReadableStream:
const controller = new AbortController()
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hola')
controller.enqueue('mundo')
controller.close()
},
})
addAbortSignal(controller.signal, rs)
finished(rs, err => {
if (err) {
if (err.name === 'AbortError') {
// La operación fue cancelada
}
}
})
const reader = rs.getReader()
reader.read().then(({ value, done }) => {
console.log(value) // hola
console.log(done) // false
controller.abort()
})
stream.getDefaultHighWaterMark(objectMode)
Agregado en: v19.9.0, v18.17.0
Devuelve el highWaterMark predeterminado utilizado por los streams. El valor predeterminado es 65536
(64 KiB) o 16
para objectMode
.
stream.setDefaultHighWaterMark(objectMode, value)
Agregado en: v19.9.0, v18.17.0
Establece el highWaterMark predeterminado utilizado por los streams.
API para implementadores de streams
La API del módulo node:stream
se ha diseñado para que sea posible implementar fácilmente streams utilizando el modelo de herencia prototípica de JavaScript.
Primero, un desarrollador de streams declararía una nueva clase de JavaScript que extiende una de las cuatro clases de streams básicas (stream.Writable
, stream.Readable
, stream.Duplex
o stream.Transform
), asegurándose de llamar al constructor de la clase padre apropiado:
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark })
// ...
}
}
Al extender streams, tenga en cuenta qué opciones puede y debe proporcionar el usuario antes de reenviarlas al constructor base. Por ejemplo, si la implementación hace suposiciones con respecto a las opciones autoDestroy
y emitClose
, no permita que el usuario las anule. Sea explícito sobre qué opciones se reenvían en lugar de reenviar implícitamente todas las opciones.
La nueva clase de stream debe implementar uno o más métodos específicos, según el tipo de stream que se esté creando, como se detalla en el siguiente cuadro:
Caso de uso | Clase | Método(s) a implementar |
---|---|---|
Solo lectura | Readable | _read() |
Solo escritura | Writable | _write() , _writev() , _final() |
Lectura y escritura | Duplex | _read() , _write() , _writev() , _final() |
Operar con datos escritos, luego leer el resultado | Transform | _transform() , _flush() , _final() |
El código de implementación de un stream nunca debe llamar a los métodos "públicos" de un stream que están destinados a ser utilizados por los consumidores (como se describe en la sección API para consumidores de streams). Hacerlo puede provocar efectos secundarios adversos en el código de la aplicación que consume el stream.
Evite anular los métodos públicos como write()
, end()
, cork()
, uncork()
, read()
y destroy()
, o emitir eventos internos como 'error'
, 'data'
, 'end'
, 'finish'
y 'close'
a través de .emit()
. Hacerlo puede romper los invariantes de los streams actuales y futuros, lo que puede generar problemas de comportamiento y/o compatibilidad con otros streams, utilidades de streams y expectativas de los usuarios.
Construcción simplificada
Añadido en: v1.2.0
Para muchos casos sencillos, es posible crear un flujo sin depender de la herencia. Esto se puede lograr creando directamente instancias de los objetos stream.Writable
, stream.Readable
, stream.Duplex
o stream.Transform
y pasando los métodos apropiados como opciones del constructor.
const { Writable } = require('node:stream')
const miWritable = new Writable({
construct(callback) {
// Inicializar el estado y cargar recursos...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Liberar recursos...
},
})
Implementación de un flujo de escritura
La clase stream.Writable
se extiende para implementar un flujo Writable
.
Los flujos Writable
personalizados deben llamar al constructor new stream.Writable([options])
e implementar el método writable._write()
y/o writable._writev()
.
new stream.Writable([options])
[Historial]
Versión | Cambios |
---|---|
v22.0.0 | Aumento de highWaterMark predeterminado. |
v15.5.0 | Soporte para pasar un AbortSignal. |
v14.0.0 | Cambio de la opción autoDestroy a true como valor predeterminado. |
v11.2.0, v10.16.0 | Añadida la opción autoDestroy para destroy() automáticamente el flujo cuando emite 'finish' o errores. |
v10.0.0 | Añadida la opción emitClose para especificar si se emite 'close' al destruirse. |
options
<Object>highWaterMark
<number> Nivel de búfer cuandostream.write()
comienza a devolverfalse
. Predeterminado:65536
(64 KiB), o16
para flujosobjectMode
.decodeStrings
<boolean> Si se codifican lasstring
s pasadas astream.write()
enBuffer
s (con la codificación especificada en la llamada astream.write()
) antes de pasarlas astream._write()
. Otros tipos de datos no se convierten (es decir, losBuffer
s no se decodifican enstring
s). Si se establece en falso, se evitará que lasstring
s se conviertan. Predeterminado:true
.defaultEncoding
<string> La codificación predeterminada que se utiliza cuando no se especifica ninguna codificación como argumento parastream.write()
. Predeterminado:'utf8'
.objectMode
<boolean> Si la operaciónstream.write(anyObj)
es válida o no. Cuando se establece, se hace posible escribir valores de JavaScript que no sean string, <Buffer>, <TypedArray> o <DataView> si la implementación del flujo lo admite. Predeterminado:false
.emitClose
<boolean> Si el flujo debe o no emitir'close'
después de haber sido destruido. Predeterminado:true
.write
<Function> Implementación para el métodostream._write()
.writev
<Function> Implementación para el métodostream._writev()
.destroy
<Function> Implementación para el métodostream._destroy()
.final
<Function> Implementación para el métodostream._final()
.construct
<Function> Implementación para el métodostream._construct()
.autoDestroy
<boolean> Si este flujo debe llamar automáticamente a.destroy()
sobre sí mismo después de terminar. Predeterminado:true
.signal
<AbortSignal> Una señal que representa una posible cancelación.
const { Writable } = require('node:stream')
class MiWritable extends Writable {
constructor(options) {
// Llama al constructor stream.Writable().
super(options)
// ...
}
}
O bien, al usar constructores de estilo pre-ES6:
const { Writable } = require('node:stream')
const util = require('node:util')
function MiWritable(options) {
if (!(this instanceof MiWritable)) return new MiWritable(options)
Writable.call(this, options)
}
util.inherits(MiWritable, Writable)
O bien, utilizando el enfoque del constructor simplificado:
const { Writable } = require('node:stream')
const miWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
})
Llamar a abort
en el AbortController
correspondiente al AbortSignal
pasado se comportará de la misma manera que llamar a .destroy(new AbortError())
en el flujo de escritura.
const { Writable } = require('node:stream')
const controller = new AbortController()
const miWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
})
// Más tarde, aborta la operación cerrando el flujo
controller.abort()
writable._construct(callback)
Agregado en: v15.0.0
callback
<Function> Llama a esta función (opcionalmente con un argumento de error) cuando la secuencia haya terminado de inicializarse.
El método _construct()
NO DEBE llamarse directamente. Puede ser implementado por clases hijas, y si es así, será llamado solo por los métodos internos de la clase Writable
.
Esta función opcional se llamará en un tick después de que el constructor de la secuencia haya regresado, retrasando cualquier llamada a _write()
, _final()
y _destroy()
hasta que se llame a callback
. Esto es útil para inicializar el estado o inicializar asíncronamente los recursos antes de que se pueda usar la secuencia.
const { Writable } = require('node:stream')
const fs = require('node:fs')
class WriteStream extends Writable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, 'w', (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback)
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
writable._write(chunk, encoding, callback)
[Historial]
Versión | Cambios |
---|---|
v12.11.0 | _write() es opcional cuando se proporciona _writev(). |
chunk
<Buffer> | <string> | <any> ElBuffer
que se va a escribir, convertido desde lastring
pasada astream.write()
. Si la opcióndecodeStrings
del stream esfalse
o el stream está operando en modo objeto, el chunk no se convertirá y será lo que se pasó astream.write()
.encoding
<string> Si el chunk es una string, entoncesencoding
es la codificación de caracteres de esa string. Si el chunk es unBuffer
, o si el stream está operando en modo objeto,encoding
puede ser ignorado.callback
<Function> Llama a esta función (opcionalmente con un argumento de error) cuando el procesamiento esté completo para el chunk suministrado.
Todas las implementaciones de stream Writable
deben proporcionar un método writable._write()
y/o writable._writev()
para enviar datos al recurso subyacente.
Los streams Transform
proporcionan su propia implementación de writable._write()
.
Esta función NO DEBE ser llamada directamente por el código de la aplicación. Debe ser implementada por las clases hijas, y ser llamada solo por los métodos internos de la clase Writable
.
La función callback
debe ser llamada de forma síncrona dentro de writable._write()
o asíncrona (es decir, en un tick diferente) para indicar si la escritura se completó con éxito o falló con un error. El primer argumento pasado a la callback
debe ser el objeto Error
si la llamada falló o null
si la escritura tuvo éxito.
Todas las llamadas a writable.write()
que ocurran entre el momento en que se llama a writable._write()
y se llama a la callback
harán que los datos escritos se almacenen en búfer. Cuando se invoca la callback
, el stream podría emitir un evento 'drain'
. Si una implementación de stream es capaz de procesar múltiples chunks de datos a la vez, se debe implementar el método writable._writev()
.
Si la propiedad decodeStrings
se establece explícitamente en false
en las opciones del constructor, entonces chunk
seguirá siendo el mismo objeto que se pasa a .write()
, y puede ser una string en lugar de un Buffer
. Esto es para admitir implementaciones que tienen un manejo optimizado para ciertas codificaciones de datos de string. En ese caso, el argumento encoding
indicará la codificación de caracteres de la string. De lo contrario, el argumento encoding
puede ignorarse de forma segura.
El método writable._write()
tiene un prefijo con un guión bajo porque es interno a la clase que lo define, y nunca debe ser llamado directamente por los programas de usuario.
writable._writev(chunks, callback)
chunks
<Object[]> Los datos a escribir. El valor es un array de <Object> que representan cada uno un fragmento discreto de datos a escribir. Las propiedades de estos objetos son:chunk
<Buffer> | <string> Una instancia de buffer o una cadena que contiene los datos a escribir. Elchunk
será una cadena si elWritable
se creó con la opcióndecodeStrings
establecida enfalse
y se pasó una cadena awrite()
.encoding
<string> La codificación de caracteres delchunk
. Sichunk
es unBuffer
, elencoding
será'buffer'
.
callback
<Function> Una función de devolución de llamada (opcionalmente con un argumento de error) que se invocará cuando se complete el procesamiento de los fragmentos suministrados.
Esta función NO DEBE ser llamada directamente por el código de la aplicación. Debe ser implementada por las clases hijas, y ser llamada solo por los métodos internos de la clase Writable
.
El método writable._writev()
puede ser implementado adicional o alternativamente a writable._write()
en implementaciones de flujo que sean capaces de procesar múltiples fragmentos de datos a la vez. Si se implementa y si hay datos almacenados en búfer de escrituras anteriores, se llamará a _writev()
en lugar de _write()
.
El método writable._writev()
tiene un prefijo con un guion bajo porque es interno a la clase que lo define, y nunca debe ser llamado directamente por los programas de usuario.
writable._destroy(err, callback)
Agregado en: v8.0.0
err
<Error> Un posible error.callback
<Function> Una función de callback que toma un argumento de error opcional.
El método _destroy()
es llamado por writable.destroy()
. Puede ser sobrescrito por clases hijas pero no debe ser llamado directamente.
writable._final(callback)
Agregado en: v8.0.0
callback
<Function> Llama a esta función (opcionalmente con un argumento de error) cuando termine de escribir los datos restantes.
El método _final()
no debe ser llamado directamente. Puede ser implementado por clases hijas, y si es así, será llamado solo por los métodos internos de la clase Writable
.
Esta función opcional será llamada antes de que se cierre el stream, retrasando el evento 'finish'
hasta que se llame a callback
. Esto es útil para cerrar recursos o escribir datos en búfer antes de que termine un stream.
Errores al escribir
Los errores que ocurran durante el procesamiento de los métodos writable._write()
, writable._writev()
y writable._final()
deben propagarse invocando la función de callback y pasando el error como primer argumento. Lanzar un Error
desde dentro de estos métodos o emitir manualmente un evento 'error'
resulta en un comportamiento indefinido.
Si un flujo Readable
se canaliza a un flujo Writable
cuando Writable
emite un error, el flujo Readable
se desconectará.
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
},
})
Un ejemplo de flujo writable
Lo siguiente ilustra una implementación de flujo Writable
personalizado bastante simplista (y algo inútil). Si bien esta instancia específica de flujo Writable
no tiene ninguna utilidad real en particular, el ejemplo ilustra cada uno de los elementos requeridos de una instancia de flujo Writable
personalizado:
const { Writable } = require('node:stream')
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
}
}
Decodificación de buffers en un flujo de escritura
La decodificación de buffers es una tarea común, por ejemplo, al usar transformadores cuya entrada es una cadena. Este no es un proceso trivial cuando se usa la codificación de caracteres multibyte, como UTF-8. El siguiente ejemplo muestra cómo decodificar cadenas multibyte usando StringDecoder
y Writable
.
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')
class StringWritable extends Writable {
constructor(options) {
super(options)
this._decoder = new StringDecoder(options?.defaultEncoding)
this.data = ''
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk)
}
this.data += chunk
callback()
}
_final(callback) {
this.data += this._decoder.end()
callback()
}
}
const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()
w.write('currency: ')
w.write(euro[0])
w.end(euro[1])
console.log(w.data) // currency: €
Implementación de un flujo legible
La clase stream.Readable
se extiende para implementar un flujo Readable
.
Los flujos Readable
personalizados deben llamar al constructor new stream.Readable([options])
e implementar el método readable._read()
.
new stream.Readable([options])
[Historial]
Versión | Cambios |
---|---|
v22.0.0 | Aumento del valor predeterminado de highWaterMark. |
v15.5.0 | Soporte para pasar un AbortSignal. |
v14.0.0 | Cambiar el valor predeterminado de la opción autoDestroy a true . |
v11.2.0, v10.16.0 | Agregar la opción autoDestroy para destroy() automáticamente el flujo cuando emite 'end' o errores. |
options
<Object>highWaterMark
<number> El número de bytes máximo para almacenar en el búfer interno antes de dejar de leer del recurso subyacente. Predeterminado:65536
(64 KiB), o16
para flujosobjectMode
.encoding
<string> Si se especifica, los búferes se decodificarán a cadenas utilizando la codificación especificada. Predeterminado:null
.objectMode
<boolean> Indica si este flujo debe comportarse como un flujo de objetos. Esto significa questream.read(n)
devuelve un único valor en lugar de unBuffer
de tamañon
. Predeterminado:false
.emitClose
<boolean> Indica si el flujo debe emitir'close'
después de haber sido destruido. Predeterminado:true
.read
<Function> Implementación para el métodostream._read()
.destroy
<Function> Implementación para el métodostream._destroy()
.construct
<Function> Implementación para el métodostream._construct()
.autoDestroy
<boolean> Indica si este flujo debe llamar automáticamente a.destroy()
sobre sí mismo después de terminar. Predeterminado:true
.signal
<AbortSignal> Una señal que representa una posible cancelación.
const { Readable } = require('node:stream')
class MyReadable extends Readable {
constructor(options) {
// Llama al constructor stream.Readable(options).
super(options)
// ...
}
}
O, al usar constructores de estilo pre-ES6:
const { Readable } = require('node:stream')
const util = require('node:util')
function MyReadable(options) {
if (!(this instanceof MyReadable)) return new MyReadable(options)
Readable.call(this, options)
}
util.inherits(MyReadable, Readable)
O, usando el enfoque de constructor simplificado:
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
// ...
},
})
Llamar a abort
en el AbortController
correspondiente a la AbortSignal
pasada se comportará de la misma manera que llamar a .destroy(new AbortError())
en el readable creado.
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal,
})
// Más tarde, aborta la operación cerrando el flujo
controller.abort()
readable._construct(callback)
Agregado en: v15.0.0
callback
<Function> Llama a esta función (opcionalmente con un argumento de error) cuando el stream ha terminado de inicializarse.
El método _construct()
NO DEBE ser llamado directamente. Puede ser implementado por clases hijas, y si es así, será llamado solo por los métodos internos de la clase Readable
.
Esta función opcional será programada en el siguiente tick por el constructor del stream, retrasando cualquier llamada a _read()
y _destroy()
hasta que se llame a callback
. Esto es útil para inicializar el estado o inicializar asíncronamente los recursos antes de que el stream pueda ser usado.
const { Readable } = require('node:stream')
const fs = require('node:fs')
class ReadStream extends Readable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_read(n) {
const buf = Buffer.alloc(n)
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err)
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
}
})
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
readable._read(size)
Añadido en: v0.9.4
size
<number> Número de bytes a leer de forma asíncrona
Esta función NO DEBE ser llamada directamente por el código de la aplicación. Debe ser implementada por las clases hijas y llamada únicamente por los métodos internos de la clase Readable
.
Todas las implementaciones de flujos Readable
deben proporcionar una implementación del método readable._read()
para obtener datos del recurso subyacente.
Cuando se llama a readable._read()
, si hay datos disponibles del recurso, la implementación debe comenzar a insertar esos datos en la cola de lectura utilizando el método this.push(dataChunk)
. Se volverá a llamar a _read()
después de cada llamada a this.push(dataChunk)
una vez que el flujo esté listo para aceptar más datos. _read()
puede continuar leyendo del recurso e insertando datos hasta que readable.push()
devuelva false
. Solo cuando se vuelva a llamar a _read()
después de que se haya detenido, debe reanudar la inserción de datos adicionales en la cola.
Una vez que se ha llamado al método readable._read()
, no se volverá a llamar hasta que se inserten más datos a través del método readable.push()
. Los datos vacíos, como búferes y cadenas vacías, no harán que se llame a readable._read()
.
El argumento size
es orientativo. Para las implementaciones en las que una "lectura" es una única operación que devuelve datos, se puede utilizar el argumento size
para determinar cuántos datos obtener. Otras implementaciones pueden ignorar este argumento y simplemente proporcionar datos cuando estén disponibles. No es necesario "esperar" hasta que haya size
bytes disponibles antes de llamar a stream.push(chunk)
.
El método readable._read()
tiene el prefijo con un guión bajo porque es interno a la clase que lo define y nunca debe ser llamado directamente por los programas de usuario.
readable._destroy(err, callback)
Agregado en: v8.0.0
err
<Error> Un posible error.callback
<Function> Una función de callback que toma un argumento de error opcional.
El método _destroy()
es llamado por readable.destroy()
. Puede ser sobrescrito por clases hijas pero no debe ser llamado directamente.
readable.push(chunk[, encoding])
[Historial]
Versión | Cambios |
---|---|
v22.0.0, v20.13.0 | El argumento chunk ahora puede ser una instancia de TypedArray o DataView . |
v8.0.0 | El argumento chunk ahora puede ser una instancia de Uint8Array . |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Fragmento de datos para enviar a la cola de lectura. Para flujos que no operan en modo de objeto,chunk
debe ser un <string>, <Buffer>, <TypedArray> o <DataView>. Para flujos en modo de objeto,chunk
puede ser cualquier valor de JavaScript.encoding
<string> Codificación de fragmentos de cadena. Debe ser una codificaciónBuffer
válida, como'utf8'
o'ascii'
.- Devuelve: <boolean>
true
si se pueden seguir enviando fragmentos de datos adicionales;false
de lo contrario.
Cuando chunk
es un <Buffer>, <TypedArray>, <DataView> o <string>, el chunk
de datos se agregará a la cola interna para que los usuarios del flujo la consuman. Pasar chunk
como null
señala el final del flujo (EOF), después de lo cual no se pueden escribir más datos.
Cuando el Readable
está operando en modo pausado, los datos agregados con readable.push()
se pueden leer llamando al método readable.read()
cuando se emite el evento 'readable'
.
Cuando el Readable
está operando en modo de flujo, los datos agregados con readable.push()
se entregarán emitiendo un evento 'data'
.
El método readable.push()
está diseñado para ser lo más flexible posible. Por ejemplo, al envolver una fuente de nivel inferior que proporciona alguna forma de mecanismo de pausa/reanudación, y una devolución de llamada de datos, la fuente de nivel inferior se puede envolver mediante la instancia personalizada de Readable
:
// `_source` es un objeto con métodos readStop() y readStart(),
// y un miembro `ondata` que se llama cuando tiene datos, y
// un miembro `onend` que se llama cuando los datos terminan.
class SourceWrapper extends Readable {
constructor(options) {
super(options)
this._source = getLowLevelSourceObject()
// Cada vez que hay datos, envíalos al búfer interno.
this._source.ondata = chunk => {
// Si push() devuelve false, entonces deja de leer de la fuente.
if (!this.push(chunk)) this._source.readStop()
}
// Cuando termina la fuente, envía el fragmento `null` que señala el EOF.
this._source.onend = () => {
this.push(null)
}
}
// _read() se llamará cuando el flujo quiera extraer más datos.
// El argumento de tamaño de asesoramiento se ignora en este caso.
_read(size) {
this._source.readStart()
}
}
El método readable.push()
se utiliza para enviar el contenido al búfer interno. Puede ser impulsado por el método readable._read()
.
Para flujos que no operan en modo de objeto, si el parámetro chunk
de readable.push()
es undefined
, se tratará como una cadena o búfer vacío. Consulta readable.push('')
para obtener más información.
Errores durante la lectura
Los errores que ocurran durante el procesamiento de readable._read()
deben propagarse a través del método readable.destroy(err)
. Lanzar un Error
desde dentro de readable._read()
o emitir manualmente un evento 'error'
resulta en un comportamiento indefinido.
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
const err = checkSomeErrorCondition()
if (err) {
this.destroy(err)
} else {
// Do some work.
}
},
})
Un ejemplo de stream de conteo
El siguiente es un ejemplo básico de un stream Readable
que emite los numerales del 1 al 1.000.000 en orden ascendente y luego finaliza.
const { Readable } = require('node:stream')
class Counter extends Readable {
constructor(opt) {
super(opt)
this._max = 1000000
this._index = 1
}
_read() {
const i = this._index++
if (i > this._max) this.push(null)
else {
const str = String(i)
const buf = Buffer.from(str, 'ascii')
this.push(buf)
}
}
}
Implementación de un flujo dúplex
Un flujo Duplex
es aquel que implementa tanto Readable
como Writable
, como una conexión de socket TCP.
Dado que JavaScript no tiene soporte para la herencia múltiple, la clase stream.Duplex
se extiende para implementar un flujo Duplex
(en lugar de extender las clases stream.Readable
y stream.Writable
).
La clase stream.Duplex
hereda prototípicamente de stream.Readable
y parasíticamente de stream.Writable
, pero instanceof
funcionará correctamente para ambas clases base debido a la anulación de Symbol.hasInstance
en stream.Writable
.
Los flujos Duplex
personalizados deben llamar al constructor new stream.Duplex([options])
e implementar tanto los métodos readable._read()
como writable._write()
.
new stream.Duplex(options)
[Historial]
Versión | Cambios |
---|---|
v8.4.0 | Ahora se admiten las opciones readableHighWaterMark y writableHighWaterMark . |
options
<Objeto> Se pasa a los constructoresWritable
yReadable
. También tiene los siguientes campos:allowHalfOpen
<booleano> Si se establece enfalse
, la secuencia finalizará automáticamente el lado escribible cuando finalice el lado legible. Predeterminado:true
.readable
<booleano> Establece si elDuplex
debe ser legible. Predeterminado:true
.writable
<booleano> Establece si elDuplex
debe ser escribible. Predeterminado:true
.readableObjectMode
<booleano> EstableceobjectMode
para el lado legible de la secuencia. No tiene efecto siobjectMode
estrue
. Predeterminado:false
.writableObjectMode
<booleano> EstableceobjectMode
para el lado escribible de la secuencia. No tiene efecto siobjectMode
estrue
. Predeterminado:false
.readableHighWaterMark
<número> EstablecehighWaterMark
para el lado legible de la secuencia. No tiene efecto si se proporcionahighWaterMark
.writableHighWaterMark
<número> EstablecehighWaterMark
para el lado escribible de la secuencia. No tiene efecto si se proporcionahighWaterMark
.
const { Duplex } = require('node:stream')
class MyDuplex extends Duplex {
constructor(options) {
super(options)
// ...
}
}
O, al usar constructores de estilo pre-ES6:
const { Duplex } = require('node:stream')
const util = require('node:util')
function MyDuplex(options) {
if (!(this instanceof MyDuplex)) return new MyDuplex(options)
Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)
O, usando el enfoque de constructor simplificado:
const { Duplex } = require('node:stream')
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
},
})
Al usar pipeline:
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')
pipeline(
fs.createReadStream('object.json').setEncoding('utf8'),
new Transform({
decodeStrings: false, // Aceptar entrada de cadena en lugar de Buffers
construct(callback) {
this.data = ''
callback()
},
transform(chunk, encoding, callback) {
this.data += chunk
callback()
},
flush(callback) {
try {
// Asegurarse de que sea un json válido.
JSON.parse(this.data)
this.push(this.data)
callback()
} catch (err) {
callback(err)
}
},
}),
fs.createWriteStream('valid-object.json'),
err => {
if (err) {
console.error('failed', err)
} else {
console.log('completed')
}
}
)
Un ejemplo de flujo dúplex
Lo siguiente ilustra un ejemplo simple de un flujo Duplex
que envuelve un objeto fuente hipotético de nivel inferior al que se pueden escribir datos y del que se pueden leer datos, aunque utilizando una API que no es compatible con los flujos de Node.js. Lo siguiente ilustra un ejemplo simple de un flujo Duplex
que almacena en búfer los datos escritos entrantes a través de la interfaz Writable
que se lee de nuevo a través de la interfaz Readable
.
const { Duplex } = require('node:stream')
const kSource = Symbol('source')
class MyDuplex extends Duplex {
constructor(source, options) {
super(options)
this[kSource] = source
}
_write(chunk, encoding, callback) {
// La fuente subyacente solo trata con cadenas.
if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
this[kSource].writeSomeData(chunk)
callback()
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding))
})
}
}
El aspecto más importante de un flujo Duplex
es que los lados Readable
y Writable
operan de forma independiente entre sí a pesar de coexistir dentro de una sola instancia de objeto.
Streams dúplex en modo objeto
Para los streams Duplex
, objectMode
se puede establecer exclusivamente para el lado Readable
o Writable
utilizando las opciones readableObjectMode
y writableObjectMode
respectivamente.
En el siguiente ejemplo, por ejemplo, se crea un nuevo stream Transform
(que es un tipo de stream Duplex
) que tiene un lado Writable
en modo objeto que acepta números de JavaScript que se convierten en cadenas hexadecimales en el lado Readable
.
const { Transform } = require('node:stream')
// Todos los streams Transform también son streams Duplex.
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// Coacciona el chunk a un número si es necesario.
chunk |= 0
// Transforma el chunk en otra cosa.
const data = chunk.toString(16)
// Empuja los datos a la cola readable.
callback(null, '0'.repeat(data.length % 2) + data)
},
})
myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))
myTransform.write(1)
// Imprime: 01
myTransform.write(10)
// Imprime: 0a
myTransform.write(100)
// Imprime: 64
Implementación de un flujo de transformación
Un flujo Transform
es un flujo Duplex
donde la salida se calcula de alguna manera a partir de la entrada. Algunos ejemplos incluyen los flujos zlib o los flujos crypto que comprimen, encriptan o desencriptan datos.
No se requiere que la salida tenga el mismo tamaño que la entrada, la misma cantidad de fragmentos o que llegue al mismo tiempo. Por ejemplo, un flujo Hash
solo tendrá un único fragmento de salida que se proporciona cuando se termina la entrada. Un flujo zlib
producirá una salida que es mucho más pequeña o mucho más grande que su entrada.
La clase stream.Transform
se extiende para implementar un flujo Transform
.
La clase stream.Transform
hereda prototípicamente de stream.Duplex
e implementa sus propias versiones de los métodos writable._write()
y readable._read()
. Las implementaciones personalizadas de Transform
deben implementar el método transform._transform()
y también pueden implementar el método transform._flush()
.
Se debe tener cuidado al usar flujos Transform
, ya que los datos escritos en el flujo pueden hacer que el lado Writable
del flujo se pause si la salida en el lado Readable
no se consume.
new stream.Transform([options])
options
<Object> Se pasa a ambos constructoresWritable
yReadable
. También tiene los siguientes campos:transform
<Function> Implementación para el métodostream._transform()
.flush
<Function> Implementación para el métodostream._flush()
.
const { Transform } = require('node:stream')
class MyTransform extends Transform {
constructor(options) {
super(options)
// ...
}
}
O, cuando se usan constructores de estilo pre-ES6:
const { Transform } = require('node:stream')
const util = require('node:util')
function MyTransform(options) {
if (!(this instanceof MyTransform)) return new MyTransform(options)
Transform.call(this, options)
}
util.inherits(MyTransform, Transform)
O, utilizando el enfoque de constructor simplificado:
const { Transform } = require('node:stream')
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
},
})
Evento: 'end'
El evento 'end'
proviene de la clase stream.Readable
. El evento 'end'
se emite después de que todos los datos han sido emitidos, lo cual ocurre después de que se ha llamado a la función de callback en transform._flush()
. En caso de un error, no se debe emitir 'end'
.
Evento: 'finish'
El evento 'finish'
proviene de la clase stream.Writable
. El evento 'finish'
se emite después de que se llama a stream.end()
y todos los fragmentos han sido procesados por stream._transform()
. En caso de un error, no se debe emitir 'finish'
.
transform._flush(callback)
callback
<Function> Una función de callback (opcionalmente con un argumento de error y datos) que se llamará cuando los datos restantes se hayan vaciado.
Esta función NO DEBE ser llamada directamente por el código de la aplicación. Debe ser implementada por las clases hijas, y ser llamada únicamente por los métodos internos de la clase Readable
.
En algunos casos, una operación de transformación puede necesitar emitir un bit adicional de datos al final del flujo. Por ejemplo, un flujo de compresión zlib
almacenará una cantidad de estado interno usado para comprimir de manera óptima la salida. Cuando el flujo termina, sin embargo, esos datos adicionales necesitan ser vaciados para que los datos comprimidos estén completos.
Las implementaciones personalizadas de Transform
pueden implementar el método transform._flush()
. Esto será llamado cuando no haya más datos escritos para ser consumidos, pero antes de que el evento 'end'
se emita señalando el final del flujo Readable
.
Dentro de la implementación de transform._flush()
, el método transform.push()
puede ser llamado cero o más veces, según corresponda. La función callback
debe ser llamada cuando la operación de vaciado esté completa.
El método transform._flush()
tiene como prefijo un guion bajo porque es interno a la clase que lo define, y nunca debe ser llamado directamente por programas de usuario.
transform._transform(chunk, encoding, callback)
chunk
<Buffer> | <string> | <any> ElBuffer
a ser transformado, convertido desde lastring
pasada astream.write()
. Si la opcióndecodeStrings
del stream esfalse
o el stream está operando en modo objeto, el chunk no será convertido y será lo que sea que fue pasado astream.write()
.encoding
<string> Si el chunk es una string, entonces este es el tipo de codificación. Si el chunk es un buffer, entonces este es el valor especial'buffer'
. Ignóralo en ese caso.callback
<Function> Una función de callback (opcionalmente con un argumento de error y datos) para ser llamada después de que elchunk
suministrado haya sido procesado.
Esta función NO DEBE ser llamada directamente por el código de la aplicación. Debe ser implementada por clases hijas, y llamada por los métodos internos de la clase Readable
solamente.
Todas las implementaciones de streams Transform
deben proveer un método _transform()
para aceptar entrada y producir salida. La implementación de transform._transform()
maneja los bytes que están siendo escritos, calcula una salida, y luego pasa esa salida a la porción legible utilizando el método transform.push()
.
El método transform.push()
puede ser llamado cero o más veces para generar salida desde un solo chunk de entrada, dependiendo de cuánto debe ser la salida como resultado del chunk.
Es posible que no se genere ninguna salida de ningún chunk dado de datos de entrada.
La función callback
debe ser llamada solo cuando el chunk actual ha sido completamente consumido. El primer argumento pasado al callback
debe ser un objeto Error
si ha ocurrido un error mientras se procesaba la entrada o null
de lo contrario. Si un segundo argumento es pasado al callback
, será reenviado al método transform.push()
, pero solo si el primer argumento es falsy. En otras palabras, lo siguiente es equivalente:
transform.prototype._transform = function (data, encoding, callback) {
this.push(data)
callback()
}
transform.prototype._transform = function (data, encoding, callback) {
callback(null, data)
}
El método transform._transform()
tiene el prefijo de un guión bajo porque es interno a la clase que lo define, y nunca debería ser llamado directamente por programas de usuario.
transform._transform()
nunca es llamado en paralelo; los streams implementan un mecanismo de cola, y para recibir el siguiente chunk, callback
debe ser llamado, ya sea sincrónica o asincrónicamente.
Clase: stream.PassThrough
La clase stream.PassThrough
es una implementación trivial de un stream Transform
que simplemente pasa los bytes de entrada a la salida. Su propósito es principalmente para ejemplos y pruebas, pero hay algunos casos de uso donde stream.PassThrough
es útil como bloque de construcción para nuevos tipos de streams.
Notas adicionales
Compatibilidad de Streams con generadores asíncronos e iteradores asíncronos
Con el soporte de generadores e iteradores asíncronos en JavaScript, los generadores asíncronos son efectivamente una construcción de stream de primer nivel a nivel de lenguaje en este momento.
A continuación, se proporcionan algunos casos comunes de interoperabilidad del uso de streams de Node.js con generadores asíncronos e iteradores asíncronos.
Consumo de streams legibles con iteradores asíncronos
;(async function () {
for await (const chunk of readable) {
console.log(chunk)
}
})()
Los iteradores asíncronos registran un manejador de errores permanente en el stream para prevenir cualquier error post-destrucción no manejado.
Creación de flujos de lectura con generadores asíncronos
Se puede crear un flujo de lectura de Node.js a partir de un generador asíncrono utilizando el método de utilidad Readable.from()
:
const { Readable } = require('node:stream')
const ac = new AbortController()
const signal = ac.signal
async function* generate() {
yield 'a'
await someLongRunningFn({ signal })
yield 'b'
yield 'c'
}
const readable = Readable.from(generate())
readable.on('close', () => {
ac.abort()
})
readable.on('data', chunk => {
console.log(chunk)
})
Canalización a flujos de escritura desde iteradores asíncronos
Al escribir en un flujo de escritura desde un iterador asíncrono, asegúrese de manejar correctamente la contrapresión y los errores. stream.pipeline()
abstrae el manejo de la contrapresión y los errores relacionados con la contrapresión:
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')
const writable = fs.createWriteStream('./file')
const ac = new AbortController()
const signal = ac.signal
const iterator = createIterator({ signal })
// Patrón de Callback
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err)
} else {
console.log(value, 'valor devuelto')
}
}).on('close', () => {
ac.abort()
})
// Patrón de Promesa
pipelinePromise(iterator, writable)
.then(value => {
console.log(value, 'valor devuelto')
})
.catch(err => {
console.error(err)
ac.abort()
})
Compatibilidad con versiones antiguas de Node.js
Antes de Node.js 0.10, la interfaz de flujo Readable
era más simple, pero también menos potente y menos útil.
- En lugar de esperar a las llamadas al método
stream.read()
, los eventos'data'
comenzaban a emitirse de inmediato. Las aplicaciones que necesitaban realizar algún trabajo para decidir cómo manejar los datos debían almacenar los datos leídos en búferes para que no se perdieran. - El método
stream.pause()
era consultivo, en lugar de garantizado. Esto significaba que todavía era necesario estar preparado para recibir eventos'data'
incluso cuando el flujo estaba en estado de pausa.
En Node.js 0.10, se agregó la clase Readable
. Para la compatibilidad con versiones anteriores de los programas Node.js, los flujos Readable
cambian al "modo de flujo" cuando se agrega un controlador de eventos 'data'
, o cuando se llama al método stream.resume()
. El efecto es que, incluso cuando no se usan el nuevo método stream.read()
y el evento 'readable'
, ya no es necesario preocuparse por perder fragmentos 'data'
.
Si bien la mayoría de las aplicaciones seguirán funcionando normalmente, esto introduce un caso límite en las siguientes condiciones:
- No se agrega ningún listener de eventos
'data'
. - Nunca se llama al método
stream.resume()
. - El flujo no está conectado a ningún destino de escritura.
Por ejemplo, considere el siguiente código:
// ¡ADVERTENCIA! ¡ROTO!
net
.createServer(socket => {
// Agregamos un listener 'end', pero nunca consumimos los datos.
socket.on('end', () => {
// Nunca llegará aquí.
socket.end('El mensaje fue recibido pero no fue procesado.\n')
})
})
.listen(1337)
Antes de Node.js 0.10, los datos del mensaje entrante simplemente se descartaban. Sin embargo, en Node.js 0.10 y posteriores, el socket permanece en pausa para siempre.
La solución en esta situación es llamar al método stream.resume()
para iniciar el flujo de datos:
// Solución.
net
.createServer(socket => {
socket.on('end', () => {
socket.end('El mensaje fue recibido pero no fue procesado.\n')
})
// Inicia el flujo de datos, descartándolo.
socket.resume()
})
.listen(1337)
Además de que los nuevos flujos Readable
cambien al modo de flujo, los flujos de estilo pre-0.10 se pueden envolver en una clase Readable
utilizando el método readable.wrap()
.
readable.read(0)
Hay algunos casos en los que es necesario activar una actualización de los mecanismos subyacentes de la secuencia de lectura, sin consumir realmente ningún dato. En tales casos, es posible llamar a readable.read(0)
, que siempre devolverá null
.
Si el búfer de lectura interno está por debajo de highWaterMark
y la secuencia no está leyendo actualmente, entonces llamar a stream.read(0)
activará una llamada de bajo nivel a stream._read()
.
Si bien la mayoría de las aplicaciones casi nunca necesitarán hacer esto, existen situaciones dentro de Node.js donde se hace esto, particularmente en el funcionamiento interno de la clase Readable
stream.
readable.push('')
No se recomienda el uso de readable.push('')
.
Enviar un <string>, <Buffer>, <TypedArray> o <DataView> de cero bytes a una secuencia que no está en modo objeto tiene un efecto secundario interesante. Debido a que es una llamada a readable.push()
, la llamada finalizará el proceso de lectura. Sin embargo, debido a que el argumento es una cadena vacía, no se agrega ningún dato al búfer de lectura, por lo que no hay nada para que un usuario consuma.
Discrepancia en highWaterMark
después de llamar a readable.setEncoding()
El uso de readable.setEncoding()
cambiará el comportamiento de cómo opera highWaterMark
en modo no objeto.
Normalmente, el tamaño del búfer actual se mide contra highWaterMark
en bytes. Sin embargo, después de llamar a setEncoding()
, la función de comparación comenzará a medir el tamaño del búfer en caracteres.
Esto no es un problema en casos comunes con latin1
o ascii
. Pero se recomienda tener en cuenta este comportamiento cuando se trabaja con cadenas que podrían contener caracteres multibyte.