Skip to content

Stream

[Estable: 2 - Estable]

Estable: 2 Estabilidad: 2 - Estable

Código fuente: lib/stream.js

Un stream es una interfaz abstracta para trabajar con datos en streaming en Node.js. El módulo node:stream proporciona una API para implementar la interfaz de stream.

Node.js proporciona muchos objetos stream. Por ejemplo, una solicitud a un servidor HTTP y process.stdout son ambas instancias de stream.

Los streams pueden ser legibles, escribibles o ambos. Todos los streams son instancias de EventEmitter.

Para acceder al módulo node:stream:

js
const stream = require('node:stream')

El módulo node:stream es útil para crear nuevos tipos de instancias de stream. Generalmente, no es necesario utilizar el módulo node:stream para consumir streams.

Organización de este documento

Este documento contiene dos secciones principales y una tercera sección para notas. La primera sección explica cómo utilizar los streams existentes dentro de una aplicación. La segunda sección explica cómo crear nuevos tipos de streams.

Tipos de streams

Hay cuatro tipos de streams fundamentales dentro de Node.js:

Adicionalmente, este módulo incluye las funciones de utilidad stream.duplexPair(), stream.pipeline(), stream.finished() stream.Readable.from(), y stream.addAbortSignal().

API de Promesas de Streams

Añadido en: v15.0.0

La API stream/promises proporciona un conjunto alternativo de funciones de utilidad asíncronas para streams que devuelven objetos Promise en lugar de usar callbacks. Se puede acceder a la API a través de require('node:stream/promises') o require('node:stream').promises.

stream.pipeline(source[, ...transforms], destination[, options])

stream.pipeline(streams[, options])

[Historial]

VersiónCambios
v18.0.0, v17.2.0, v16.14.0Añadida la opción end, que se puede establecer en false para evitar que el stream de destino se cierre automáticamente cuando finaliza el origen.
v15.0.0Añadido en: v15.0.0
js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
  console.log('Pipeline succeeded.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Pipeline succeeded.')

Para usar un AbortSignal, páselo dentro de un objeto de opciones, como último argumento. Cuando se aborta la señal, se llamará a destroy en la pipeline subyacente, con un AbortError.

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  const ac = new AbortController()
  const signal = ac.signal

  setImmediate(() => ac.abort())
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
    signal,
  })
}

run().catch(console.error) // AbortError
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
  await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
  console.error(err) // AbortError
}

La API pipeline también admite generadores asíncronos:

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(
    fs.createReadStream('lowercase.txt'),
    async function* (source, { signal }) {
      source.setEncoding('utf8') // Trabaja con cadenas en lugar de `Buffer`s.
      for await (const chunk of source) {
        yield await processChunk(chunk, { signal })
      }
    },
    fs.createWriteStream('uppercase.txt')
  )
  console.log('Pipeline succeeded.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'

await pipeline(
  createReadStream('lowercase.txt'),
  async function* (source, { signal }) {
    source.setEncoding('utf8') // Trabaja con cadenas en lugar de `Buffer`s.
    for await (const chunk of source) {
      yield await processChunk(chunk, { signal })
    }
  },
  createWriteStream('uppercase.txt')
)
console.log('Pipeline succeeded.')

Recuerda manejar el argumento signal pasado al generador asíncrono. Especialmente en el caso de que el generador asíncrono sea el origen de la pipeline (es decir, el primer argumento) o la pipeline nunca se completará.

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(async function* ({ signal }) {
    await someLongRunningfn({ signal })
    yield 'asd'
  }, fs.createWriteStream('uppercase.txt'))
  console.log('Pipeline succeeded.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
  await someLongRunningfn({ signal })
  yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline succeeded.')

La API pipeline proporciona versión de callback:

stream.finished(stream[, options])

[Historial]

VersiónCambios
v19.5.0, v18.14.0Se agregó soporte para ReadableStream y WritableStream.
v19.1.0, v18.13.0Se agregó la opción cleanup.
v15.0.0Agregado en: v15.0.0
js
const { finished } = require('node:stream/promises')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('La lectura del flujo ha terminado.')
}

run().catch(console.error)
rs.resume() // Vaciar el flujo.
js
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'

const rs = createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('La lectura del flujo ha terminado.')
}

run().catch(console.error)
rs.resume() // Vaciar el flujo.

La API finished también proporciona una versión de callback.

stream.finished() deja listeners de eventos colgando (en particular 'error', 'end', 'finish' y 'close') después de que la promesa devuelta se resuelve o rechaza. La razón de esto es para que los eventos 'error' inesperados (debido a implementaciones incorrectas de flujo) no causen fallos inesperados. Si este es un comportamiento no deseado, entonces options.cleanup debe establecerse en true:

js
await finished(rs, { cleanup: true })

Modo objeto

Todos los flujos creados por las API de Node.js operan exclusivamente con cadenas, objetos <Buffer>, <TypedArray> y <DataView>:

  • Las Strings y los Buffers son los tipos más comunes utilizados con los flujos.
  • TypedArray y DataView te permiten manejar datos binarios con tipos como Int32Array o Uint8Array. Cuando escribes un TypedArray o DataView en un flujo, Node.js procesa los bytes sin procesar.

Es posible, sin embargo, que las implementaciones de flujo funcionen con otros tipos de valores de JavaScript (con la excepción de null, que tiene un propósito especial dentro de los flujos). Se considera que dichos flujos operan en "modo objeto".

Las instancias de flujo se cambian al modo objeto utilizando la opción objectMode cuando se crea el flujo. Intentar cambiar un flujo existente al modo objeto no es seguro.

Almacenamiento en búfer

Tanto los flujos Writable como los Readable almacenarán datos en un búfer interno.

La cantidad de datos que se pueden almacenar en búfer depende de la opción highWaterMark que se pasa al constructor del flujo. Para los flujos normales, la opción highWaterMark especifica un número total de bytes. Para los flujos que operan en modo objeto, highWaterMark especifica un número total de objetos. Para los flujos que operan con (pero no decodifican) cadenas, highWaterMark especifica un número total de unidades de código UTF-16.

Los datos se almacenan en búfer en los flujos Readable cuando la implementación llama a stream.push(chunk). Si el consumidor del flujo no llama a stream.read(), los datos permanecerán en la cola interna hasta que se consuman.

Una vez que el tamaño total del búfer de lectura interno alcanza el umbral especificado por highWaterMark, el flujo dejará temporalmente de leer datos del recurso subyacente hasta que los datos almacenados en búfer actualmente se puedan consumir (es decir, el flujo dejará de llamar al método interno readable._read() que se utiliza para llenar el búfer de lectura).

Los datos se almacenan en búfer en los flujos Writable cuando se llama repetidamente al método writable.write(chunk). Mientras que el tamaño total del búfer de escritura interno esté por debajo del umbral establecido por highWaterMark, las llamadas a writable.write() devolverán true. Una vez que el tamaño del búfer interno alcance o supere el highWaterMark, se devolverá false.

Un objetivo clave de la API stream, particularmente el método stream.pipe(), es limitar el almacenamiento en búfer de datos a niveles aceptables de tal manera que las fuentes y los destinos de diferentes velocidades no sobrecarguen la memoria disponible.

La opción highWaterMark es un umbral, no un límite: dicta la cantidad de datos que un flujo almacena en búfer antes de dejar de solicitar más datos. No impone una limitación estricta de memoria en general. Las implementaciones de flujo específicas pueden optar por imponer límites más estrictos, pero hacerlo es opcional.

Debido a que los flujos Duplex y Transform son tanto Readable como Writable, cada uno mantiene dos búferes internos separados utilizados para leer y escribir, lo que permite que cada lado funcione independientemente del otro mientras se mantiene un flujo de datos adecuado y eficiente. Por ejemplo, las instancias de net.Socket son flujos Duplex cuyo lado Readable permite el consumo de datos recibidos del socket y cuyo lado Writable permite escribir datos al socket. Dado que los datos se pueden escribir en el socket a una velocidad más rápida o más lenta que la velocidad a la que se reciben los datos, cada lado debe operar (y almacenar en búfer) independientemente del otro.

La mecánica del almacenamiento en búfer interno es un detalle de implementación interna y se puede cambiar en cualquier momento. Sin embargo, para ciertas implementaciones avanzadas, los búferes internos se pueden recuperar utilizando writable.writableBuffer o readable.readableBuffer. Se desaconseja el uso de estas propiedades no documentadas.

API para consumidores de flujos

Casi todas las aplicaciones de Node.js, sin importar cuán simples sean, utilizan flujos de alguna manera. El siguiente es un ejemplo del uso de flujos en una aplicación Node.js que implementa un servidor HTTP:

js
const http = require('node:http')

const server = http.createServer((req, res) => {
  // `req` es un http.IncomingMessage, que es un flujo de lectura.
  // `res` es un http.ServerResponse, que es un flujo de escritura.

  let body = ''
  // Obtiene los datos como cadenas utf8.
  // Si no se establece una codificación, se recibirán objetos Buffer.
  req.setEncoding('utf8')

  // Los flujos legibles emiten eventos 'data' una vez que se añade un listener.
  req.on('data', chunk => {
    body += chunk
  })

  // El evento 'end' indica que se ha recibido todo el cuerpo.
  req.on('end', () => {
    try {
      const data = JSON.parse(body)
      // Escribe algo interesante al usuario:
      res.write(typeof data)
      res.end()
    } catch (er) {
      // ¡oh no! ¡json incorrecto!
      res.statusCode = 400
      return res.end(`error: ${er.message}`)
    }
  })
})

server.listen(1337)

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON

Los flujos Writable (como res en el ejemplo) exponen métodos como write() y end() que se utilizan para escribir datos en el flujo.

Los flujos Readable utilizan la API EventEmitter para notificar al código de la aplicación cuando hay datos disponibles para ser leídos del flujo. Esos datos disponibles se pueden leer del flujo de varias maneras.

Tanto los flujos Writable como Readable utilizan la API EventEmitter de diversas maneras para comunicar el estado actual del flujo.

Los flujos Duplex y Transform son tanto Writable como Readable.

Las aplicaciones que escriben o consumen datos de un flujo no están obligadas a implementar las interfaces de flujo directamente y generalmente no tendrán ninguna razón para llamar a require('node:stream').

Los desarrolladores que deseen implementar nuevos tipos de flujos deben consultar la sección API para implementadores de flujos.

Streams de escritura

Los streams de escritura son una abstracción para un destino al que se escriben los datos.

Ejemplos de streams Writable incluyen:

Algunos de estos ejemplos son en realidad streams Duplex que implementan la interfaz Writable.

Todos los streams Writable implementan la interfaz definida por la clase stream.Writable.

Si bien las instancias específicas de streams Writable pueden diferir de varias maneras, todos los streams Writable siguen el mismo patrón de uso fundamental como se ilustra en el ejemplo a continuación:

js
const myStream = getWritableStreamSomehow()
myStream.write('algunos datos')
myStream.write('algunos datos más')
myStream.end('terminó de escribir datos')

Clase: stream.Writable

Añadido en: v0.9.4

Evento: 'close'

[Historial]

VersiónCambios
v10.0.0Se añadió la opción emitClose para especificar si se emite 'close' al destruir.
v0.9.4Añadido en: v0.9.4

El evento 'close' se emite cuando el stream y cualquiera de sus recursos subyacentes (un descriptor de archivo, por ejemplo) se han cerrado. El evento indica que no se emitirán más eventos y no se producirá ningún cálculo adicional.

Un stream Writable siempre emitirá el evento 'close' si se crea con la opción emitClose.

Evento: 'drain'

Añadido en: v0.9.4

Si una llamada a stream.write(chunk) devuelve false, el evento 'drain' se emitirá cuando sea apropiado reanudar la escritura de datos en el stream.

js
// Escribe los datos en el stream grabable proporcionado un millón de veces.
// Presta atención a la contrapresión.
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000
  write()
  function write() {
    let ok = true
    do {
      i--
      if (i === 0) {
        // ¡Última vez!
        writer.write(data, encoding, callback)
      } else {
        // Mira si debemos continuar o esperar.
        // No pases el callback, porque aún no hemos terminado.
        ok = writer.write(data, encoding)
      }
    } while (i > 0 && ok)
    if (i > 0) {
      // ¡Tuvo que parar antes!
      // Escribe un poco más una vez que se drene.
      writer.once('drain', write)
    }
  }
}
Evento: 'error'

Añadido en: v0.9.4

El evento 'error' se emite si se produce un error al escribir o canalizar datos. Al llamar al callback del listener, se le pasa un único argumento Error.

El stream se cierra cuando se emite el evento 'error' a menos que la opción autoDestroy se haya establecido en false al crear el stream.

Después de 'error', no se deberían emitir más eventos que 'close' (incluidos los eventos 'error').

Evento: 'finish'

Añadido en: v0.9.4

El evento 'finish' se emite después de que se haya llamado al método stream.end() y todos los datos se hayan vaciado en el sistema subyacente.

js
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
  writer.write(`hola, #${i}!\n`)
}
writer.on('finish', () => {
  console.log('Todas las escrituras ya están completas.')
})
writer.end('Este es el final\n')
Evento: 'pipe'

Añadido en: v0.9.4

El evento 'pipe' se emite cuando se llama al método stream.pipe() en un stream legible, añadiendo este grabable a su conjunto de destinos.

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
  console.log('Algo se está canalizando en el escritor.')
  assert.equal(src, reader)
})
reader.pipe(writer)
Evento: 'unpipe'

Añadido en: v0.9.4

El evento 'unpipe' se emite cuando se llama al método stream.unpipe() en un stream Readable, eliminando este Writable de su conjunto de destinos.

También se emite en caso de que este stream Writable emita un error cuando un stream Readable se canaliza hacia él.

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
  console.log('Algo ha dejado de canalizar en el escritor.')
  assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()

Añadido en: v0.11.2

El método writable.cork() fuerza a que todos los datos escritos se almacenen en la memoria. Los datos almacenados se vaciarán cuando se llamen los métodos stream.uncork() o stream.end().

La intención principal de writable.cork() es dar cabida a una situación en la que se escriben varios fragmentos pequeños en el stream en rápida sucesión. En lugar de reenviarlos inmediatamente al destino subyacente, writable.cork() almacena en búfer todos los fragmentos hasta que se llama a writable.uncork(), que los pasará todos a writable._writev(), si está presente. Esto evita una situación de bloqueo de cabeza de línea en la que los datos se almacenan en búfer mientras se espera a que se procese el primer fragmento pequeño. Sin embargo, el uso de writable.cork() sin implementar writable._writev() puede tener un efecto adverso en el rendimiento.

Véase también: writable.uncork(), writable._writev().

writable.destroy([error])

[Historial]

VersiónCambios
v14.0.0Funciona como una no-operación en un stream que ya ha sido destruido.
v8.0.0Añadido en: v8.0.0
  • error <Error> Opcional, un error para emitir con el evento 'error'.
  • Devuelve: <this>

Destruye el stream. Opcionalmente, emite un evento 'error' y emite un evento 'close' (a menos que emitClose esté establecido en false). Después de esta llamada, el stream grabable ha terminado y las llamadas posteriores a write() o end() darán como resultado un error ERR_STREAM_DESTROYED. Esta es una forma destructiva e inmediata de destruir un stream. Es posible que las llamadas anteriores a write() no se hayan vaciado y pueden desencadenar un error ERR_STREAM_DESTROYED. Utiliza end() en lugar de destroy si los datos deben vaciarse antes de cerrar o espera al evento 'drain' antes de destruir el stream.

js
const { Writable } = require('node:stream')

const myStream = new Writable()

const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
js
const { Writable } = require('node:stream')

const myStream = new Writable()

myStream.destroy()
myStream.on('error', function wontHappen() {})
js
const { Writable } = require('node:stream')

const myStream = new Writable()
myStream.destroy()

myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED

Una vez que se ha llamado a destroy(), cualquier llamada posterior será una no-operación y no se emitirán más errores que los de _destroy() como 'error'.

Los implementadores no deberían sobreescribir este método, sino implementar writable._destroy().

writable.closed

Añadido en: v18.0.0

Es true después de que se haya emitido 'close'.

writable.destroyed

Añadido en: v8.0.0

Es true después de que se haya llamado a writable.destroy().

js
const { Writable } = require('node:stream')

const myStream = new Writable()

console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])

[Historial]

VersiónCambios
v22.0.0, v20.13.0El argumento chunk ahora puede ser una instancia de TypedArray o DataView.
v15.0.0Se invoca al callback antes de 'finish' o en caso de error.
v14.0.0Se invoca al callback si se emite 'finish' o 'error'.
v10.0.0Este método ahora devuelve una referencia a writable.
v8.0.0El argumento chunk ahora puede ser una instancia de Uint8Array.
v0.9.4Añadido en: v0.9.4

Llamar al método writable.end() indica que no se escribirán más datos en el Writable. Los argumentos opcionales chunk y encoding permiten escribir un último fragmento adicional de datos inmediatamente antes de cerrar el stream.

Llamar al método stream.write() después de llamar a stream.end() generará un error.

js
// Escribe 'hola, ' y luego termina con '¡mundo!'.
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hola, ')
file.end('¡mundo!')
// ¡Ahora no está permitido escribir más!
writable.setDefaultEncoding(encoding)

[Historial]

VersiónCambios
v6.1.0Este método ahora devuelve una referencia a writable.
v0.11.15Añadido en: v0.11.15

El método writable.setDefaultEncoding() establece la encoding por defecto para un stream Writable.

writable.uncork()

Añadido en: v0.11.2

El método writable.uncork() vacía todos los datos almacenados en búfer desde que se llamó a stream.cork().

Cuando se utilizan writable.cork() y writable.uncork() para gestionar el almacenamiento en búfer de las escrituras en un stream, aplazar las llamadas a writable.uncork() utilizando process.nextTick(). Esto permite agrupar todas las llamadas writable.write() que se producen dentro de una fase determinada del bucle de eventos de Node.js.

js
stream.cork()
stream.write('algunos ')
stream.write('datos ')
process.nextTick(() => stream.uncork())

Si el método writable.cork() se llama varias veces en un stream, debe llamarse el mismo número de veces a writable.uncork() para vaciar los datos almacenados en el búfer.

js
stream.cork()
stream.write('algunos ')
stream.cork()
stream.write('datos ')
process.nextTick(() => {
  stream.uncork()
  // Los datos no se vaciarán hasta que se llame a uncork() por segunda vez.
  stream.uncork()
})

Véase también: writable.cork().

writable.writable

Añadido en: v11.4.0

Es true si es seguro llamar a writable.write(), lo que significa que el stream no ha sido destruido, no ha tenido errores o no ha terminado.

writable.writableAborted

Añadido en: v18.0.0, v16.17.0

[Estable: 1 - Experimental]

Estable: 1 Estabilidad: 1 - Experimental

Devuelve si el stream fue destruido o tuvo un error antes de emitir 'finish'.

writable.writableEnded

Añadido en: v12.9.0

Es true después de que se haya llamado a writable.end(). Esta propiedad no indica si los datos se han vaciado, para ello utilice writable.writableFinished en su lugar.

writable.writableCorked

Añadido en: v13.2.0, v12.16.0

Número de veces que hay que llamar a writable.uncork() para desconectar completamente el stream.

writable.errored

Añadido en: v18.0.0

Devuelve el error si el stream ha sido destruido con un error.

writable.writableFinished

Añadido en: v12.6.0

Se establece en true inmediatamente antes de que se emita el evento 'finish'.

writable.writableHighWaterMark

Añadido en: v9.3.0

Devuelve el valor de highWaterMark pasado al crear este Writable.

writable.writableLength

Añadido en: v9.4.0

Esta propiedad contiene el número de bytes (u objetos) en la cola listos para ser escritos. El valor proporciona datos de introspección sobre el estado de highWaterMark.

writable.writableNeedDrain

Añadido en: v15.2.0, v14.17.0

Es true si el búfer del stream se ha llenado y el stream emitirá 'drain'.

writable.writableObjectMode

Añadido en: v12.3.0

Getter para la propiedad objectMode de un stream Writable dado.

writable[Symbol.asyncDispose]()

Añadido en: v22.4.0, v20.16.0

[Estable: 1 - Experimental]

Estable: 1 Estabilidad: 1 - Experimental

Llama a writable.destroy() con un AbortError y devuelve una promesa que se cumple cuando el stream ha terminado.

writable.write(chunk[, encoding][, callback])

[Historial]

VersiónCambios
v22.0.0, v20.13.0El argumento chunk ahora puede ser una instancia de TypedArray o DataView.
v8.0.0El argumento chunk ahora puede ser una instancia de Uint8Array.
v6.0.0Pasar null como parámetro chunk siempre se considerará inválido ahora, incluso en modo objeto.
v0.9.4Añadido en: v0.9.4
  • chunk <string> | <Buffer> | <TypedArray> | <DataView> | <any> Datos opcionales para escribir. Para los streams que no operan en modo objeto, chunk debe ser un <string>, <Buffer>, <TypedArray> o <DataView>. Para los streams en modo objeto, chunk puede ser cualquier valor de JavaScript que no sea null.
  • encoding <string> | <null> La codificación, si chunk es una string. Por defecto: 'utf8'
  • callback <Function> Callback para cuando se haya vaciado este fragmento de datos.
  • Devuelve: <boolean> false si el stream desea que el código de llamada espere a que se emita el evento 'drain' antes de seguir escribiendo datos adicionales; de lo contrario, true.

El método writable.write() escribe algunos datos en el stream y llama al callback proporcionado una vez que los datos se han gestionado por completo. Si se produce un error, se llamará a callback con el error como primer argumento. Se llama al callback de forma asíncrona y antes de que se emita 'error'.

El valor devuelto es true si el búfer interno es menor que el highWaterMark configurado cuando se creó el stream después de admitir chunk. Si se devuelve false, los intentos adicionales de escribir datos en el stream deberían detenerse hasta que se emita el evento 'drain'.

Mientras un stream no se está drenando, las llamadas a write() almacenarán en búfer chunk y devolverán false. Una vez que todos los fragmentos almacenados actualmente en búfer se hayan drenado (aceptados para la entrega por el sistema operativo), se emitirá el evento 'drain'. Una vez que write() devuelve false, no escribas más fragmentos hasta que se emita el evento 'drain'. Si bien está permitido llamar a write() en un stream que no se está drenando, Node.js almacenará en búfer todos los fragmentos escritos hasta que se produzca el uso máximo de la memoria, momento en el que se abortará incondicionalmente. Incluso antes de que se aborte, el alto uso de memoria provocará un bajo rendimiento del recolector de basura y un alto RSS (que no suele liberarse de vuelta al sistema, incluso después de que la memoria ya no sea necesaria). Dado que los sockets TCP pueden no drenar nunca si el par remoto no lee los datos, escribir en un socket que no se está drenando puede conducir a una vulnerabilidad explotable remotamente.

Escribir datos mientras el stream no se está drenando es particularmente problemático para un Transform, porque los streams Transform se pausan por defecto hasta que se canalizan o se añade un controlador de eventos 'data' o 'readable'.

Si los datos que se van a escribir pueden generarse o recuperarse a petición, se recomienda encapsular la lógica en un Readable y utilizar stream.pipe(). Sin embargo, si se prefiere llamar a write(), es posible respetar la contrapresión y evitar problemas de memoria utilizando el evento 'drain':

js
function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb)
  } else {
    process.nextTick(cb)
  }
}

// Espera a que se llame a cb antes de realizar cualquier otra escritura.
write('hola', () => {
  console.log('La escritura se ha completado, realiza más escrituras ahora.')
})

Un stream Writable en modo objeto siempre ignorará el argumento encoding.

Streams legibles

Los streams legibles son una abstracción de una fuente de la que se consumen datos.

Ejemplos de streams Readable incluyen:

Todos los streams Readable implementan la interfaz definida por la clase stream.Readable.

Dos modos de lectura

Los streams Readable operan de manera efectiva en uno de dos modos: flujo y pausa. Estos modos son independientes del modo objeto. Un stream Readable puede estar en modo objeto o no, independientemente de si está en modo flujo o en modo pausa.

  • En modo flujo, los datos se leen del sistema subyacente automáticamente y se proporcionan a una aplicación lo más rápido posible utilizando eventos a través de la interfaz EventEmitter.
  • En modo pausa, el método stream.read() debe llamarse explícitamente para leer fragmentos de datos del stream.

Todos los streams Readable comienzan en modo pausa, pero se pueden cambiar al modo flujo de una de las siguientes maneras:

El Readable puede volver al modo pausa utilizando uno de los siguientes:

  • Si no hay destinos pipe, llamando al método stream.pause().
  • Si hay destinos pipe, eliminando todos los destinos pipe. Se pueden eliminar varios destinos pipe llamando al método stream.unpipe().

El concepto importante que hay que recordar es que un Readable no generará datos hasta que se proporcione un mecanismo para consumir o ignorar esos datos. Si el mecanismo de consumo se deshabilita o se elimina, el Readable intentará dejar de generar los datos.

Por razones de compatibilidad con versiones anteriores, la eliminación de controladores de eventos 'data' no pausará automáticamente el stream. Además, si hay destinos pipe, entonces llamar a stream.pause() no garantizará que el stream permanezca pausado una vez que esos destinos se vacíen y soliciten más datos.

Si un Readable se cambia al modo flujo y no hay consumidores disponibles para manejar los datos, esos datos se perderán. Esto puede ocurrir, por ejemplo, cuando se llama al método readable.resume() sin un listener adjunto al evento 'data', o cuando se elimina un controlador de eventos 'data' del stream.

Añadir un controlador de eventos 'readable' hace que el stream deje de fluir automáticamente, y los datos tienen que ser consumidos a través de readable.read(). Si se elimina el controlador de eventos 'readable', el stream volverá a fluir si hay un controlador de eventos 'data'.

Tres estados

Los "dos modos" de operación para un flujo Readable son una abstracción simplificada de la gestión de estados internos más compleja que está ocurriendo dentro de la implementación del flujo Readable.

Específicamente, en cualquier momento dado, cada Readable está en uno de tres estados posibles:

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

Cuando readable.readableFlowing es null, no se proporciona ningún mecanismo para consumir los datos del flujo. Por lo tanto, el flujo no generará datos. Mientras esté en este estado, adjuntar un listener para el evento 'data', llamar al método readable.pipe() o llamar al método readable.resume() cambiará readable.readableFlowing a true, lo que hará que el Readable comience a emitir eventos activamente a medida que se generen los datos.

Llamar a readable.pause(), readable.unpipe() o recibir contrapresión hará que readable.readableFlowing se establezca como false, deteniendo temporalmente el flujo de eventos pero no deteniendo la generación de datos. Mientras esté en este estado, adjuntar un listener para el evento 'data' no cambiará readable.readableFlowing a true.

js
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()

pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing ahora es false.

pass.on('data', chunk => {
  console.log(chunk.toString())
})
// readableFlowing sigue siendo false.
pass.write('ok') // No emitirá 'data'.
pass.resume() // Debe ser llamado para que el flujo emita 'data'.
// readableFlowing ahora es true.

Mientras readable.readableFlowing sea false, los datos pueden acumularse dentro del búfer interno del flujo.

Elige un estilo de API

La API de flujo Readable evolucionó a través de múltiples versiones de Node.js y proporciona múltiples métodos para consumir datos de flujo. En general, los desarrolladores deben elegir uno de los métodos de consumo de datos y nunca deben usar múltiples métodos para consumir datos de un solo flujo. Específicamente, usar una combinación de on('data'), on('readable'), pipe() o iteradores asíncronos podría llevar a un comportamiento poco intuitivo.

Clase: stream.Readable

Agregado en: v0.9.4

Evento: 'close'

[Historial]

VersiónCambios
v10.0.0Agregada la opción emitClose para especificar si se emite 'close' al destruir.
v0.9.4Agregado en: v0.9.4

El evento 'close' se emite cuando el flujo y cualquiera de sus recursos subyacentes (un descriptor de archivo, por ejemplo) se han cerrado. El evento indica que no se emitirán más eventos y no se producirá más computación.

Un flujo Readable siempre emitirá el evento 'close' si se crea con la opción emitClose.

Evento: 'data'

Agregado en: v0.9.4

  • chunk <Buffer> | <string> | <any> El fragmento de datos. Para los flujos que no están operando en modo objeto, el fragmento será una cadena o Buffer. Para los flujos que están en modo objeto, el fragmento puede ser cualquier valor de JavaScript que no sea null.

El evento 'data' se emite cada vez que el flujo renuncia a la propiedad de un fragmento de datos a un consumidor. Esto puede ocurrir cuando el flujo se cambia a modo de flujo llamando a readable.pipe(), readable.resume() o adjuntando una función de devolución de llamada de escucha al evento 'data'. El evento 'data' también se emitirá siempre que se llame al método readable.read() y haya un fragmento de datos disponible para ser devuelto.

Adjuntar un escucha de evento 'data' a un flujo que no se ha pausado explícitamente cambiará el flujo al modo de flujo. Los datos se pasarán tan pronto como estén disponibles.

La función de devolución de llamada del escucha recibirá el fragmento de datos como una cadena si se ha especificado una codificación predeterminada para el flujo utilizando el método readable.setEncoding(); de lo contrario, los datos se pasarán como un Buffer.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Recibidos ${chunk.length} bytes de datos.`)
})
Evento: 'end'

Agregado en: v0.9.4

El evento 'end' se emite cuando no hay más datos para consumir del flujo.

El evento 'end' no se emitirá a menos que los datos se consuman por completo. Esto se puede lograr cambiando el flujo al modo de flujo o llamando a stream.read() repetidamente hasta que se hayan consumido todos los datos.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Recibidos ${chunk.length} bytes de datos.`)
})
readable.on('end', () => {
  console.log('No habrá más datos.')
})
Evento: 'error'

Agregado en: v0.9.4

El evento 'error' puede ser emitido por una implementación de Readable en cualquier momento. Por lo general, esto puede ocurrir si el flujo subyacente no puede generar datos debido a una falla interna subyacente, o cuando una implementación de flujo intenta insertar un fragmento de datos no válido.

La función de devolución de llamada del escucha recibirá un solo objeto Error.

Evento: 'pause'

Agregado en: v0.9.4

El evento 'pause' se emite cuando se llama a stream.pause() y readableFlowing no es false.

Evento: 'readable'

[Historial]

VersiónCambios
v10.0.0El 'readable' siempre se emite en el siguiente tick después de que se llama a .push().
v10.0.0El uso de 'readable' requiere llamar a .read().
v0.9.4Agregado en: v0.9.4

El evento 'readable' se emite cuando hay datos disponibles para ser leídos del flujo, hasta la marca de agua alta configurada (state.highWaterMark). Efectivamente, indica que el flujo tiene nueva información dentro del búfer. Si hay datos disponibles dentro de este búfer, se puede llamar a stream.read() para recuperar esos datos. Además, el evento 'readable' también se puede emitir cuando se ha alcanzado el final del flujo.

js
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
  // Hay algunos datos para leer ahora.
  let data

  while ((data = this.read()) !== null) {
    console.log(data)
  }
})

Si se ha alcanzado el final del flujo, llamar a stream.read() devolverá null y activará el evento 'end'. Esto también es cierto si nunca hubo datos para leer. Por ejemplo, en el siguiente ejemplo, foo.txt es un archivo vacío:

js
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
  console.log('end')
})

La salida de la ejecución de este script es:

bash
$ node test.js
readable: null
end

En algunos casos, adjuntar un escucha para el evento 'readable' hará que se lea una cierta cantidad de datos en un búfer interno.

En general, los mecanismos readable.pipe() y evento 'data' son más fáciles de entender que el evento 'readable'. Sin embargo, el manejo de 'readable' podría resultar en un mayor rendimiento.

Si tanto 'readable' como 'data' se usan al mismo tiempo, 'readable' tiene prioridad para controlar el flujo, es decir, 'data' se emitirá solo cuando se llame a stream.read(). La propiedad readableFlowing se volvería false. Si hay escuchas 'data' cuando se elimina 'readable', el flujo comenzará a fluir, es decir, los eventos 'data' se emitirán sin llamar a .resume().

Evento: 'resume'

Agregado en: v0.9.4

El evento 'resume' se emite cuando se llama a stream.resume() y readableFlowing no es true.

readable.destroy([error])

[Historial]

VersiónCambios
v14.0.0Funciona como una operación sin efecto en un flujo que ya ha sido destruido.
v8.0.0Agregado en: v8.0.0
  • error <Error> Error que se pasará como carga útil en el evento 'error'
  • Devuelve: <this>

Destruye el flujo. Opcionalmente, emite un evento 'error' y emite un evento 'close' (a menos que emitClose se establezca en false). Después de esta llamada, el flujo legible liberará cualquier recurso interno y se ignorarán las llamadas posteriores a push().

Una vez que se ha llamado a destroy(), cualquier llamada adicional será una operación sin efecto y no se emitirán más errores que no sean de _destroy() como 'error'.

Los implementadores no deben anular este método, sino implementar readable._destroy().

readable.closed

Agregado en: v18.0.0

Es true después de que se ha emitido 'close'.

readable.destroyed

Agregado en: v8.0.0

Es true después de que se ha llamado a readable.destroy().

readable.isPaused()

Agregado en: v0.11.14

El método readable.isPaused() devuelve el estado operativo actual de Readable. Esto es utilizado principalmente por el mecanismo que subyace al método readable.pipe(). En la mayoría de los casos típicos, no habrá razón para usar este método directamente.

js
const readable = new stream.Readable()

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()

Agregado en: v0.9.4

El método readable.pause() hará que un flujo en modo de flujo deje de emitir eventos 'data', saliendo del modo de flujo. Cualquier dato que esté disponible permanecerá en el búfer interno.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Recibidos ${chunk.length} bytes de datos.`)
  readable.pause()
  console.log('No habrá datos adicionales durante 1 segundo.')
  setTimeout(() => {
    console.log('Ahora los datos comenzarán a fluir nuevamente.')
    readable.resume()
  }, 1000)
})

El método readable.pause() no tiene ningún efecto si hay un escucha de evento 'readable'.

readable.pipe(destination[, options])

Agregado en: v0.9.4

El método readable.pipe() adjunta un flujo Writable a readable, lo que hace que cambie automáticamente al modo de flujo y envíe todos sus datos al Writable adjunto. El flujo de datos se gestionará automáticamente para que el flujo Writable de destino no se vea desbordado por un flujo Readable más rápido.

El siguiente ejemplo canaliza todos los datos de readable a un archivo llamado file.txt:

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Todos los datos de readable van a 'file.txt'.
readable.pipe(writable)

Es posible adjuntar múltiples flujos Writable a un solo flujo Readable.

El método readable.pipe() devuelve una referencia al flujo destino, lo que permite configurar cadenas de flujos canalizados:

js
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)

De forma predeterminada, se llama a stream.end() en el flujo Writable de destino cuando el flujo Readable de origen emite 'end', de modo que el destino ya no sea escribible. Para deshabilitar este comportamiento predeterminado, se puede pasar la opción end como false, lo que hace que el flujo de destino permanezca abierto:

js
reader.pipe(writer, { end: false })
reader.on('end', () => {
  writer.end('Adiós\n')
})

Una advertencia importante es que si el flujo Readable emite un error durante el procesamiento, el destino Writable no se cierra automáticamente. Si se produce un error, será necesario cerrar manualmente cada flujo para evitar fugas de memoria.

Los flujos Writable process.stderr y process.stdout nunca se cierran hasta que finaliza el proceso de Node.js, independientemente de las opciones especificadas.

readable.read([size])

Agregado en: v0.9.4

El método readable.read() lee los datos del búfer interno y los devuelve. Si no hay datos disponibles para ser leídos, se devuelve null. De forma predeterminada, los datos se devuelven como un objeto Buffer a menos que se haya especificado una codificación utilizando el método readable.setEncoding() o el flujo esté operando en modo de objeto.

El argumento opcional size especifica un número específico de bytes para leer. Si no hay size bytes disponibles para leer, se devolverá null a menos que el flujo haya terminado, en cuyo caso se devolverán todos los datos restantes en el búfer interno.

Si no se especifica el argumento size, se devolverán todos los datos contenidos en el búfer interno.

El argumento size debe ser menor o igual a 1 GiB.

El método readable.read() solo debe llamarse en flujos Readable que operan en modo pausado. En modo de flujo, se llama automáticamente a readable.read() hasta que el búfer interno se drena por completo.

js
const readable = getReadableStreamSomehow()

// 'readable' se puede activar varias veces a medida que los datos se almacenan en el búfer
readable.on('readable', () => {
  let chunk
  console.log('El flujo es legible (nuevos datos recibidos en el búfer)')
  // Use un bucle para asegurarse de leer todos los datos disponibles actualmente
  while (null !== (chunk = readable.read())) {
    console.log(`Leídos ${chunk.length} bytes de datos...`)
  }
})

// 'end' se activará una vez cuando no haya más datos disponibles
readable.on('end', () => {
  console.log('Se alcanzó el final del flujo.')
})

Cada llamada a readable.read() devuelve un fragmento de datos o null, lo que significa que no hay más datos para leer en ese momento. Estos fragmentos no se concatenan automáticamente. Debido a que una sola llamada a read() no devuelve todos los datos, puede ser necesario usar un bucle while para leer continuamente fragmentos hasta que se recuperen todos los datos. Al leer un archivo grande, .read() podría devolver null temporalmente, lo que indica que ha consumido todo el contenido almacenado en el búfer, pero puede haber más datos aún por almacenar en el búfer. En tales casos, se emite un nuevo evento 'readable' una vez que hay más datos en el búfer, y el evento 'end' significa el final de la transmisión de datos.

Por lo tanto, para leer todo el contenido de un archivo de un readable, es necesario recopilar fragmentos en múltiples eventos 'readable':

js
const chunks = []

readable.on('readable', () => {
  let chunk
  while (null !== (chunk = readable.read())) {
    chunks.push(chunk)
  }
})

readable.on('end', () => {
  const content = chunks.join('')
})

Un flujo Readable en modo objeto siempre devolverá un solo elemento de una llamada a readable.read(size), independientemente del valor del argumento size.

Si el método readable.read() devuelve un fragmento de datos, también se emitirá un evento 'data'.

Llamar a stream.read([size]) después de que se haya emitido el evento 'end' devolverá null. No se generará ningún error de tiempo de ejecución.

readable.readable

Agregado en: v11.4.0

Es true si es seguro llamar a readable.read(), lo que significa que el flujo no ha sido destruido ni ha emitido 'error' o 'end'.

readable.readableAborted

Agregado en: v16.8.0

[Estable: 1 - Experimental]

Estable: 1 Estabilidad: 1 - Experimental

Devuelve si el flujo fue destruido o tuvo un error antes de emitir 'end'.

readable.readableDidRead

Agregado en: v16.7.0, v14.18.0

[Estable: 1 - Experimental]

Estable: 1 Estabilidad: 1 - Experimental

Devuelve si se ha emitido 'data'.

readable.readableEncoding

Agregado en: v12.7.0

Obtenedor para la propiedad encoding de un flujo Readable dado. La propiedad encoding se puede configurar utilizando el método readable.setEncoding().

readable.readableEnded

Agregado en: v12.9.0

Se vuelve true cuando se emite el evento 'end'.

readable.errored

Agregado en: v18.0.0

Devuelve el error si el flujo ha sido destruido con un error.

readable.readableFlowing

Agregado en: v9.4.0

Esta propiedad refleja el estado actual de un flujo Readable como se describe en la sección Tres estados.

readable.readableHighWaterMark

Agregado en: v9.3.0

Devuelve el valor de highWaterMark pasado al crear este Readable.

readable.readableLength

Agregado en: v9.4.0

Esta propiedad contiene el número de bytes (u objetos) en la cola listos para ser leídos. El valor proporciona datos de introspección con respecto al estado de la highWaterMark.

readable.readableObjectMode

Agregado en: v12.3.0

Obtenedor para la propiedad objectMode de un flujo Readable dado.

readable.resume()

[Historial]

VersiónCambios
v10.0.0El resume() no tiene efecto si hay un escucha de evento 'readable'.
v0.9.4Agregado en: v0.9.4

El método readable.resume() hace que un flujo Readable pausado explícitamente reanude la emisión de eventos 'data', cambiando el flujo al modo de flujo.

El método readable.resume() se puede utilizar para consumir por completo los datos de un flujo sin procesar realmente ninguno de esos datos:

js
getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Llegó al final, pero no leyó nada.')
  })

El método readable.resume() no tiene ningún efecto si hay un escucha de evento 'readable'.

readable.setEncoding(encoding)

Agregado en: v0.9.4

El método readable.setEncoding() establece la codificación de caracteres para los datos leídos del flujo Readable.

De forma predeterminada, no se asigna ninguna codificación y los datos de flujo se devolverán como objetos Buffer. Establecer una codificación hace que los datos de flujo se devuelvan como cadenas de la codificación especificada en lugar de como objetos Buffer. Por ejemplo, llamar a readable.setEncoding('utf8') hará que los datos de salida se interpreten como datos UTF-8 y se pasen como cadenas. Llamar a readable.setEncoding('hex') hará que los datos se codifiquen en formato de cadena hexadecimal.

El flujo Readable manejará correctamente los caracteres multibyte entregados a través del flujo que de otro modo se decodificarían incorrectamente si se extrajeran simplemente del flujo como objetos Buffer.

js
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
  assert.equal(typeof chunk, 'string')
  console.log('Obtenidos %d caracteres de datos de cadena:', chunk.length)
})
readable.unpipe([destination])

Agregado en: v0.9.4

El método readable.unpipe() desconecta un flujo Writable adjuntado previamente utilizando el método stream.pipe().

Si no se especifica el destination, se desconectan todas las tuberías.

Si se especifica el destination, pero no se ha configurado ninguna tubería para él, el método no hace nada.

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Todos los datos de readable van a 'file.txt',
// pero solo durante el primer segundo.
readable.pipe(writable)
setTimeout(() => {
  console.log('Dejar de escribir en file.txt.')
  readable.unpipe(writable)
  console.log('Cerrar manualmente el flujo de archivos.')
  writable.end()
}, 1000)
readable.unshift(chunk[, encoding])

[Historial]

VersiónCambios
v22.0.0, v20.13.0El argumento chunk ahora puede ser una instancia de TypedArray o DataView.
v8.0.0El argumento chunk ahora puede ser una instancia de Uint8Array.
v0.9.11Agregado en: v0.9.11

Pasar chunk como null señala el final del flujo (EOF) y se comporta de la misma manera que readable.push(null), después de lo cual no se pueden escribir más datos. La señal EOF se coloca al final del búfer y cualquier dato en búfer todavía se vaciará.

El método readable.unshift() vuelve a insertar un fragmento de datos en el búfer interno. Esto es útil en ciertas situaciones en las que un flujo está siendo consumido por un código que necesita "desconsumir" una cierta cantidad de datos que ha extraído de forma optimista de la fuente, de modo que los datos se puedan pasar a otra parte.

El método stream.unshift(chunk) no se puede llamar después de que se haya emitido el evento 'end' o se producirá un error de tiempo de ejecución.

Los desarrolladores que usan stream.unshift() a menudo deberían considerar cambiar al uso de un flujo Transform en su lugar. Consulte la sección API para implementadores de flujos para obtener más información.

js
// Extraer un encabezado delimitado por \n\n.
// Usar unshift() si obtenemos demasiado.
// Llamar a la función de devolución de llamada con (error, encabezado, flujo).
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
  stream.on('error', callback)
  stream.on('readable', onReadable)
  const decoder = new StringDecoder('utf8')
  let header = ''
  function onReadable() {
    let chunk
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk)
      if (str.includes('\n\n')) {
        // Se encontró el límite del encabezado.
        const split = str.split(/\n\n/)
        header += split.shift()
        const remaining = split.join('\n\n')
        const buf = Buffer.from(remaining, 'utf8')
        stream.removeListener('error', callback)
        // Eliminar el escucha 'readable' antes de desviar.
        stream.removeListener('readable', onReadable)
        if (buf.length) stream.unshift(buf)
        // Ahora se puede leer el cuerpo del mensaje desde el flujo.
        callback(null, header, stream)
        return
      }
      // Todavía leyendo el encabezado.
      header += str
    }
  }
}

A diferencia de stream.push(chunk), stream.unshift(chunk) no terminará el proceso de lectura restableciendo el estado de lectura interno del flujo. Esto puede causar resultados inesperados si se llama a readable.unshift() durante una lectura (es decir, desde dentro de una implementación de stream._read() en un flujo personalizado). Seguir la llamada a readable.unshift() con un stream.push('') inmediato restablecerá el estado de lectura de forma adecuada, sin embargo, es mejor simplemente evitar llamar a readable.unshift() mientras se está realizando una lectura.

readable.wrap(stream)

Agregado en: v0.9.4

Antes de Node.js 0.10, los flujos no implementaban toda la API del módulo node:stream como está definida actualmente. (Consulte Compatibilidad para obtener más información).

Cuando se utiliza una biblioteca Node.js anterior que emite eventos 'data' y tiene un método stream.pause() que es solo de asesoramiento, se puede utilizar el método readable.wrap() para crear un flujo Readable que utiliza el flujo anterior como fuente de datos.

Rara vez será necesario usar readable.wrap(), pero el método se ha proporcionado como una conveniencia para interactuar con aplicaciones y bibliotecas Node.js más antiguas.

js
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)

myReader.on('readable', () => {
  myReader.read() // etc.
})
readable[Symbol.asyncIterator]()

[Historial]

VersiónCambios
v11.14.0El soporte de Symbol.asyncIterator ya no es experimental.
v10.0.0Agregado en: v10.0.0

Streams dúplex y de transformación

Clase: stream.Duplex

[Historial]

VersiónCambios
v6.8.0Las instancias de Duplex ahora devuelven true al verificar instanceof stream.Writable.
v0.9.4Agregado en: v0.9.4

Los streams dúplex son streams que implementan las interfaces Readable y Writable.

Ejemplos de streams Duplex incluyen:

duplex.allowHalfOpen

Agregado en: v0.9.4

Si es false, el stream finalizará automáticamente el lado de escritura cuando finalice el lado de lectura. Se establece inicialmente mediante la opción de constructor allowHalfOpen, que por defecto es true.

Esto se puede cambiar manualmente para modificar el comportamiento de apertura parcial de una instancia de stream Duplex existente, pero se debe cambiar antes de que se emita el evento 'end'.

Clase: stream.Transform

Añadido en: v0.9.4

Las secuencias de transformación son secuencias Duplex donde la salida está relacionada de alguna manera con la entrada. Al igual que todas las secuencias Duplex, las secuencias Transform implementan tanto las interfaces Readable como Writable.

Algunos ejemplos de secuencias Transform son:

transform.destroy([error])

[Historial]

VersiónCambios
v14.0.0Funciona como una operación no op en una secuencia que ya ha sido destruida.
v8.0.0Añadido en: v8.0.0

Destruye la secuencia y, opcionalmente, emite un evento 'error'. Después de esta llamada, la secuencia de transformación liberará cualquier recurso interno. Los implementadores no deben anular este método, sino implementar readable._destroy(). La implementación predeterminada de _destroy() para Transform también emite 'close' a menos que emitClose se establezca en falso.

Una vez que se ha llamado a destroy(), cualquier llamada posterior será una operación no op y no se podrán emitir más errores que los de _destroy() como 'error'.

stream.duplexPair([opciones])

Agregado en: v22.6.0, v20.17.0

  • opciones <Objeto> Un valor para pasar a ambos constructores Duplex, para establecer opciones como el almacenamiento en búfer.
  • Devuelve: <Array> de dos instancias Duplex.

La función de utilidad duplexPair devuelve un Array con dos elementos, cada uno siendo un flujo Duplex conectado al otro lado:

js
const [ladoA, ladoB] = duplexPair()

Lo que sea que se escriba en un flujo se hace legible en el otro. Proporciona un comportamiento análogo a una conexión de red, donde los datos escritos por el cliente se vuelven legibles por el servidor, y viceversa.

Los flujos Duplex son simétricos; uno u otro puede ser usado sin ninguna diferencia en el comportamiento.

stream.finished(stream[, opciones], callback)

[Historial]

VersiónCambios
v19.5.0Se agregó soporte para ReadableStream y WritableStream.
v15.11.0Se agregó la opción signal.
v14.0.0El finished(stream, cb) esperará el evento 'close' antes de invocar la devolución de llamada. La implementación intenta detectar flujos heredados y solo aplica este comportamiento a flujos que se espera que emitan 'close'.
v14.0.0Emitir 'close' antes de 'end' en un flujo Readable causará un error ERR_STREAM_PREMATURE_CLOSE.
v14.0.0La devolución de llamada se invocará en flujos que ya hayan terminado antes de la llamada a finished(stream, cb).
v10.0.0Agregado en: v10.0.0
  • stream <Flujo> | <ReadableStream> | <WritableStream> Un flujo/webstream legible y/o escribible.

  • opciones <Objeto>

    • error <booleano> Si se establece en false, entonces una llamada a emit('error', err) no se trata como terminada. Predeterminado: true.
    • readable <booleano> Cuando se establece en false, la devolución de llamada se llamará cuando el flujo termine, aunque el flujo aún pueda ser legible. Predeterminado: true.
    • writable <booleano> Cuando se establece en false, la devolución de llamada se llamará cuando el flujo termine, aunque el flujo aún pueda ser escribible. Predeterminado: true.
    • signal <AbortSignal> permite abortar la espera para que termine el flujo. El flujo subyacente no se abortará si se aborta la señal. La devolución de llamada se llamará con un AbortError. Todos los listeners registrados agregados por esta función también se eliminarán.
  • callback <Función> Una función de devolución de llamada que toma un argumento de error opcional.

  • Devuelve: <Función> Una función de limpieza que elimina todos los listeners registrados.

Una función para recibir una notificación cuando un flujo ya no es legible, escribible o ha experimentado un error o un evento de cierre prematuro.

js
const { finished } = require('node:stream')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

finished(rs, err => {
  if (err) {
    console.error('El flujo falló.', err)
  } else {
    console.log('El flujo ha terminado de leer.')
  }
})

rs.resume() // Vaciar el flujo.

Especialmente útil en escenarios de manejo de errores donde un flujo se destruye prematuramente (como una solicitud HTTP abortada), y no emitirá 'end' o 'finish'.

La API finished proporciona versión de promesa.

stream.finished() deja listeners de eventos colgantes (en particular 'error', 'end', 'finish' y 'close') después de que se ha invocado callback. La razón de esto es para que los eventos 'error' inesperados (debido a implementaciones de flujo incorrectas) no causen fallas inesperadas. Si este es un comportamiento no deseado, entonces la función de limpieza devuelta debe invocarse en la devolución de llamada:

js
const cleanup = finished(rs, err => {
  cleanup()
  // ...
})

stream.pipeline(source[, ...transforms], destination, callback)

stream.pipeline(streams, callback)

[Historial]

VersiónCambios
v19.7.0, v18.16.0Se agregó soporte para webstreams.
v18.0.0Pasar una callback no válida al argumento callback ahora lanza ERR_INVALID_ARG_TYPE en lugar de ERR_INVALID_CALLBACK.
v14.0.0El pipeline(..., cb) esperará el evento 'close' antes de invocar la callback. La implementación intenta detectar flujos heredados y solo aplica este comportamiento a flujos que se espera que emitan 'close'.
v13.10.0Se añadió soporte para generadores asíncronos.
v10.0.0Añadido en: v10.0.0

Un método de módulo para conectar entre flujos y generadores reenviando errores y limpiando correctamente y proporcionando una callback cuando la pipeline está completa.

js
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')

// Usa la API de pipeline para conectar fácilmente una serie de flujos
// juntos y ser notificado cuando la pipeline esté completamente lista.

// Una pipeline para comprimir con gzip un archivo tar potencialmente enorme de manera eficiente:

pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
  if (err) {
    console.error('La pipeline falló.', err)
  } else {
    console.log('La pipeline tuvo éxito.')
  }
})

La API de pipeline proporciona una versión de promesa.

stream.pipeline() llamará a stream.destroy(err) en todos los flujos excepto:

  • Flujos Readable que han emitido 'end' o 'close'.
  • Flujos Writable que han emitido 'finish' o 'close'.

stream.pipeline() deja escuchadores de eventos colgando en los flujos después de que se ha invocado la callback. En el caso de la reutilización de flujos después de un fallo, esto puede causar fugas de escuchadores de eventos y errores tragados. Si el último flujo es legible, se eliminarán los escuchadores de eventos colgantes para que el último flujo pueda consumirse más tarde.

stream.pipeline() cierra todos los flujos cuando se produce un error. El uso de IncomingRequest con pipeline podría conducir a un comportamiento inesperado una vez que destruiría el socket sin enviar la respuesta esperada. Consulta el ejemplo a continuación:

js
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')

const server = http.createServer((req, res) => {
  const fileStream = fs.createReadStream('./fileNotExist.txt')
  pipeline(fileStream, res, err => {
    if (err) {
      console.log(err) // No existe tal archivo
      // este mensaje no se puede enviar una vez que `pipeline` ya ha destruido el socket
      return res.end('¡¡¡error!!!')
    }
  })
})

stream.compose(...streams)

[Historial]

VersiónCambios
v21.1.0, v20.10.0Se agregó soporte para la clase stream.
v19.8.0, v18.16.0Se agregó soporte para webstreams.
v16.9.0Agregado en: v16.9.0

[Estable: 1 - Experimental]

Estable: 1 Estabilidad: 1 - stream.compose es experimental.

Combina dos o más streams en un stream Duplex que escribe en el primer stream y lee desde el último. Cada stream proporcionado se canaliza al siguiente, utilizando stream.pipeline. Si alguno de los streams genera un error, todos se destruyen, incluido el stream Duplex externo.

Debido a que stream.compose devuelve un nuevo stream que a su vez puede (y debe) canalizarse en otros streams, permite la composición. En contraste, al pasar streams a stream.pipeline, normalmente el primer stream es un stream legible y el último un stream de escritura, formando un circuito cerrado.

Si se pasa una Function, debe ser un método de fábrica que tome un Iterable de source.

js
import { compose, Transform } from 'node:stream'

const removeSpaces = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''))
  },
})

async function* toUpper(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
}

let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
  res += buf
}

console.log(res) // imprime 'HELLOWORLD'

stream.compose se puede utilizar para convertir iterables asíncronos, generadores y funciones en streams.

  • AsyncIterable se convierte en un Duplex legible. No puede producir null.
  • AsyncGeneratorFunction se convierte en un Duplex de transformación legible/escribible. Debe tomar un AsyncIterable de source como primer parámetro. No puede producir null.
  • AsyncFunction se convierte en un Duplex de escritura. Debe devolver null o undefined.
js
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'

// Convierte AsyncIterable en un Duplex legible.
const s1 = compose(
  (async function* () {
    yield 'Hello'
    yield 'World'
  })()
)

// Convierte AsyncGenerator en un Duplex de transformación.
const s2 = compose(async function* (source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
})

let res = ''

// Convierte AsyncFunction en un Duplex de escritura.
const s3 = compose(async function (source) {
  for await (const chunk of source) {
    res += chunk
  }
})

await finished(compose(s1, s2, s3))

console.log(res) // imprime 'HELLOWORLD'

Consulta readable.compose(stream) para ver stream.compose como operador.

stream.Readable.from(iterable[, options])

Agregado en: v12.3.0, v10.17.0

  • iterable <Iterable> Objeto que implementa el protocolo iterable Symbol.asyncIterator o Symbol.iterator. Emite un evento 'error' si se pasa un valor nulo.
  • options <Object> Opciones proporcionadas a new stream.Readable([options]). Por defecto, Readable.from() establecerá options.objectMode a true, a menos que se opte explícitamente por no hacerlo estableciendo options.objectMode a false.
  • Devuelve: <stream.Readable>

Un método de utilidad para crear streams legibles a partir de iteradores.

js
const { Readable } = require('node:stream')

async function* generate() {
  yield 'hola'
  yield 'streams'
}

const readable = Readable.from(generate())

readable.on('data', chunk => {
  console.log(chunk)
})

Llamar a Readable.from(string) o Readable.from(buffer) no hará que las cadenas o buffers se iteren para que coincidan con la semántica de otros streams por razones de rendimiento.

Si un objeto Iterable que contiene promesas se pasa como argumento, podría resultar en un rechazo no manejado.

js
const { Readable } = require('node:stream')

Readable.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Rechazo no manejado
])

stream.Readable.fromWeb(readableStream[, options])

Agregado en: v17.0.0

[Estable: 1 - Experimental]

Estable: 1 Estabilidad: 1 - Experimental

stream.Readable.isDisturbed(stream)

Agregado en: v16.8.0

[Estable: 1 - Experimental]

Estable: 1 Estabilidad: 1 - Experimental

Devuelve si el stream ha sido leído o cancelado.

stream.isErrored(stream)

Agregado en: v17.3.0, v16.14.0

[Estable: 1 - Experimental]

Estable: 1 Estabilidad: 1 - Experimental

Devuelve si el flujo ha encontrado un error.

stream.isReadable(stream)

Agregado en: v17.4.0, v16.14.0

[Estable: 1 - Experimental]

Estable: 1 Estabilidad: 1 - Experimental

Devuelve si el flujo es legible.

stream.Readable.toWeb(streamReadable[, options])

Agregado en: v17.0.0

[Estable: 1 - Experimental]

Estable: 1 Estabilidad: 1 - Experimental

  • streamReadable <stream.Readable>

  • options <Object>

    • strategy <Object>
    • highWaterMark <number> El tamaño máximo de la cola interna (del ReadableStream creado) antes de que se aplique la contrapresión al leer del stream.Readable dado. Si no se proporciona ningún valor, se tomará del stream.Readable dado.
    • size <Function> Una función que indica el tamaño del fragmento de datos dado. Si no se proporciona ningún valor, el tamaño será 1 para todos los fragmentos.
    • chunk <any>
    • Returns: <number>
  • Returns: <ReadableStream>

stream.Writable.fromWeb(writableStream[, options])

Agregado en: v17.0.0

[Estable: 1 - Experimental]

Estable: 1 Estabilidad: 1 - Experimental

stream.Writable.toWeb(streamWritable)

Agregado en: v17.0.0

[Estable: 1 - Experimental]

Estable: 1 Estabilidad: 1 - Experimental

stream.Duplex.from(src)

[Historial]

VersiónCambios
v19.5.0, v18.17.0El argumento src ahora puede ser un ReadableStream o un WritableStream.
v16.8.0Añadido en: v16.8.0

Un método de utilidad para crear streams dúplex.

  • Stream convierte un stream de escritura en un Duplex de escritura y un stream de lectura en un Duplex.
  • Blob se convierte en un Duplex de lectura.
  • string se convierte en un Duplex de lectura.
  • ArrayBuffer se convierte en un Duplex de lectura.
  • AsyncIterable se convierte en un Duplex de lectura. No puede producir null.
  • AsyncGeneratorFunction se convierte en un Duplex de transformación de lectura/escritura. Debe tomar un AsyncIterable de origen como primer parámetro. No puede producir null.
  • AsyncFunction se convierte en un Duplex de escritura. Debe devolver null o undefined
  • Object ({ writable, readable }) convierte readable y writable en Stream y luego los combina en Duplex donde el Duplex escribirá en writable y leerá desde readable.
  • Promise se convierte en un Duplex de lectura. El valor null se ignora.
  • ReadableStream se convierte en un Duplex de lectura.
  • WritableStream se convierte en un Duplex de escritura.
  • Devuelve: <stream.Duplex>

Si se pasa un objeto Iterable que contiene promesas como argumento, podría resultar en un rechazo no manejado.

js
const { Duplex } = require('node:stream')

Duplex.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Rechazo no manejado
])

stream.Duplex.fromWeb(pair[, options])

Añadido en: v17.0.0

[Stable: 1 - Experimental]

Stable: 1 Estabilidad: 1 - Experimental

js
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')

for await (const chunk of duplex) {
  console.log('readable', chunk)
}
js
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))

stream.Duplex.toWeb(streamDuplex)

Añadido en: v17.0.0

[Estable: 1 - Experimental]

Estable: 1 Estabilidad: 1 - Experimental

js
import { Duplex } from 'node:stream'

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

const { value } = await readable.getReader().read()
console.log('readable', value)
js
const { Duplex } = require('node:stream')

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

readable
  .getReader()
  .read()
  .then(result => {
    console.log('readable', result.value)
  })

stream.addAbortSignal(signal, stream)

[Historial]

VersiónCambios
v19.7.0, v18.16.0Se añadió soporte para ReadableStream y WritableStream.
v15.4.0Añadido en: v15.4.0

Adjunta una AbortSignal a un flujo de lectura o escritura. Esto permite que el código controle la destrucción del flujo utilizando un AbortController.

Llamar a abort en el AbortController correspondiente a la AbortSignal pasada se comportará de la misma manera que llamar a .destroy(new AbortError()) en el flujo, y controller.error(new AbortError()) para los flujos web.

js
const fs = require('node:fs')

const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// Más tarde, aborta la operación cerrando el flujo
controller.abort()

O usando una AbortSignal con un flujo legible como un iterable asíncrono:

js
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // establecer un tiempo de espera
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
  try {
    for await (const chunk of stream) {
      await process(chunk)
    }
  } catch (e) {
    if (e.name === 'AbortError') {
      // La operación fue cancelada
    } else {
      throw e
    }
  }
})()

O usando un AbortSignal con un ReadableStream:

js
const controller = new AbortController()
const rs = new ReadableStream({
  start(controller) {
    controller.enqueue('hola')
    controller.enqueue('mundo')
    controller.close()
  },
})

addAbortSignal(controller.signal, rs)

finished(rs, err => {
  if (err) {
    if (err.name === 'AbortError') {
      // La operación fue cancelada
    }
  }
})

const reader = rs.getReader()

reader.read().then(({ value, done }) => {
  console.log(value) // hola
  console.log(done) // false
  controller.abort()
})

stream.getDefaultHighWaterMark(objectMode)

Agregado en: v19.9.0, v18.17.0

Devuelve el highWaterMark predeterminado utilizado por los streams. El valor predeterminado es 65536 (64 KiB) o 16 para objectMode.

stream.setDefaultHighWaterMark(objectMode, value)

Agregado en: v19.9.0, v18.17.0

Establece el highWaterMark predeterminado utilizado por los streams.

API para implementadores de streams

La API del módulo node:stream se ha diseñado para que sea posible implementar fácilmente streams utilizando el modelo de herencia prototípica de JavaScript.

Primero, un desarrollador de streams declararía una nueva clase de JavaScript que extiende una de las cuatro clases de streams básicas (stream.Writable, stream.Readable, stream.Duplex o stream.Transform), asegurándose de llamar al constructor de la clase padre apropiado:

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor({ highWaterMark, ...options }) {
    super({ highWaterMark })
    // ...
  }
}

Al extender streams, tenga en cuenta qué opciones puede y debe proporcionar el usuario antes de reenviarlas al constructor base. Por ejemplo, si la implementación hace suposiciones con respecto a las opciones autoDestroy y emitClose, no permita que el usuario las anule. Sea explícito sobre qué opciones se reenvían en lugar de reenviar implícitamente todas las opciones.

La nueva clase de stream debe implementar uno o más métodos específicos, según el tipo de stream que se esté creando, como se detalla en el siguiente cuadro:

Caso de usoClaseMétodo(s) a implementar
Solo lecturaReadable_read()
Solo escrituraWritable_write() , _writev() , _final()
Lectura y escrituraDuplex_read() , _write() , _writev() , _final()
Operar con datos escritos, luego leer el resultadoTransform_transform() , _flush() , _final()

El código de implementación de un stream nunca debe llamar a los métodos "públicos" de un stream que están destinados a ser utilizados por los consumidores (como se describe en la sección API para consumidores de streams). Hacerlo puede provocar efectos secundarios adversos en el código de la aplicación que consume el stream.

Evite anular los métodos públicos como write(), end(), cork(), uncork(), read() y destroy(), o emitir eventos internos como 'error', 'data', 'end', 'finish' y 'close' a través de .emit(). Hacerlo puede romper los invariantes de los streams actuales y futuros, lo que puede generar problemas de comportamiento y/o compatibilidad con otros streams, utilidades de streams y expectativas de los usuarios.

Construcción simplificada

Añadido en: v1.2.0

Para muchos casos sencillos, es posible crear un flujo sin depender de la herencia. Esto se puede lograr creando directamente instancias de los objetos stream.Writable, stream.Readable, stream.Duplex o stream.Transform y pasando los métodos apropiados como opciones del constructor.

js
const { Writable } = require('node:stream')

const miWritable = new Writable({
  construct(callback) {
    // Inicializar el estado y cargar recursos...
  },
  write(chunk, encoding, callback) {
    // ...
  },
  destroy() {
    // Liberar recursos...
  },
})

Implementación de un flujo de escritura

La clase stream.Writable se extiende para implementar un flujo Writable.

Los flujos Writable personalizados deben llamar al constructor new stream.Writable([options]) e implementar el método writable._write() y/o writable._writev().

new stream.Writable([options])

[Historial]

VersiónCambios
v22.0.0Aumento de highWaterMark predeterminado.
v15.5.0Soporte para pasar un AbortSignal.
v14.0.0Cambio de la opción autoDestroy a true como valor predeterminado.
v11.2.0, v10.16.0Añadida la opción autoDestroy para destroy() automáticamente el flujo cuando emite 'finish' o errores.
v10.0.0Añadida la opción emitClose para especificar si se emite 'close' al destruirse.
  • options <Object>
    • highWaterMark <number> Nivel de búfer cuando stream.write() comienza a devolver false. Predeterminado: 65536 (64 KiB), o 16 para flujos objectMode.
    • decodeStrings <boolean> Si se codifican las strings pasadas a stream.write() en Buffers (con la codificación especificada en la llamada a stream.write()) antes de pasarlas a stream._write(). Otros tipos de datos no se convierten (es decir, los Buffers no se decodifican en strings). Si se establece en falso, se evitará que las strings se conviertan. Predeterminado: true.
    • defaultEncoding <string> La codificación predeterminada que se utiliza cuando no se especifica ninguna codificación como argumento para stream.write(). Predeterminado: 'utf8'.
    • objectMode <boolean> Si la operación stream.write(anyObj) es válida o no. Cuando se establece, se hace posible escribir valores de JavaScript que no sean string, <Buffer>, <TypedArray> o <DataView> si la implementación del flujo lo admite. Predeterminado: false.
    • emitClose <boolean> Si el flujo debe o no emitir 'close' después de haber sido destruido. Predeterminado: true.
    • write <Function> Implementación para el método stream._write().
    • writev <Function> Implementación para el método stream._writev().
    • destroy <Function> Implementación para el método stream._destroy().
    • final <Function> Implementación para el método stream._final().
    • construct <Function> Implementación para el método stream._construct().
    • autoDestroy <boolean> Si este flujo debe llamar automáticamente a .destroy() sobre sí mismo después de terminar. Predeterminado: true.
    • signal <AbortSignal> Una señal que representa una posible cancelación.
js
const { Writable } = require('node:stream')

class MiWritable extends Writable {
  constructor(options) {
    // Llama al constructor stream.Writable().
    super(options)
    // ...
  }
}

O bien, al usar constructores de estilo pre-ES6:

js
const { Writable } = require('node:stream')
const util = require('node:util')

function MiWritable(options) {
  if (!(this instanceof MiWritable)) return new MiWritable(options)
  Writable.call(this, options)
}
util.inherits(MiWritable, Writable)

O bien, utilizando el enfoque del constructor simplificado:

js
const { Writable } = require('node:stream')

const miWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
})

Llamar a abort en el AbortController correspondiente al AbortSignal pasado se comportará de la misma manera que llamar a .destroy(new AbortError()) en el flujo de escritura.

js
const { Writable } = require('node:stream')

const controller = new AbortController()
const miWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
  signal: controller.signal,
})
// Más tarde, aborta la operación cerrando el flujo
controller.abort()

writable._construct(callback)

Agregado en: v15.0.0

  • callback <Function> Llama a esta función (opcionalmente con un argumento de error) cuando la secuencia haya terminado de inicializarse.

El método _construct() NO DEBE llamarse directamente. Puede ser implementado por clases hijas, y si es así, será llamado solo por los métodos internos de la clase Writable.

Esta función opcional se llamará en un tick después de que el constructor de la secuencia haya regresado, retrasando cualquier llamada a _write(), _final() y _destroy() hasta que se llame a callback. Esto es útil para inicializar el estado o inicializar asíncronamente los recursos antes de que se pueda usar la secuencia.

js
const { Writable } = require('node:stream')
const fs = require('node:fs')

class WriteStream extends Writable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, 'w', (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _write(chunk, encoding, callback) {
    fs.write(this.fd, chunk, callback)
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

writable._write(chunk, encoding, callback)

[Historial]

VersiónCambios
v12.11.0_write() es opcional cuando se proporciona _writev().
  • chunk <Buffer> | <string> | <any> El Buffer que se va a escribir, convertido desde la string pasada a stream.write(). Si la opción decodeStrings del stream es false o el stream está operando en modo objeto, el chunk no se convertirá y será lo que se pasó a stream.write().
  • encoding <string> Si el chunk es una string, entonces encoding es la codificación de caracteres de esa string. Si el chunk es un Buffer, o si el stream está operando en modo objeto, encoding puede ser ignorado.
  • callback <Function> Llama a esta función (opcionalmente con un argumento de error) cuando el procesamiento esté completo para el chunk suministrado.

Todas las implementaciones de stream Writable deben proporcionar un método writable._write() y/o writable._writev() para enviar datos al recurso subyacente.

Los streams Transform proporcionan su propia implementación de writable._write().

Esta función NO DEBE ser llamada directamente por el código de la aplicación. Debe ser implementada por las clases hijas, y ser llamada solo por los métodos internos de la clase Writable.

La función callback debe ser llamada de forma síncrona dentro de writable._write() o asíncrona (es decir, en un tick diferente) para indicar si la escritura se completó con éxito o falló con un error. El primer argumento pasado a la callback debe ser el objeto Error si la llamada falló o null si la escritura tuvo éxito.

Todas las llamadas a writable.write() que ocurran entre el momento en que se llama a writable._write() y se llama a la callback harán que los datos escritos se almacenen en búfer. Cuando se invoca la callback, el stream podría emitir un evento 'drain'. Si una implementación de stream es capaz de procesar múltiples chunks de datos a la vez, se debe implementar el método writable._writev().

Si la propiedad decodeStrings se establece explícitamente en false en las opciones del constructor, entonces chunk seguirá siendo el mismo objeto que se pasa a .write(), y puede ser una string en lugar de un Buffer. Esto es para admitir implementaciones que tienen un manejo optimizado para ciertas codificaciones de datos de string. En ese caso, el argumento encoding indicará la codificación de caracteres de la string. De lo contrario, el argumento encoding puede ignorarse de forma segura.

El método writable._write() tiene un prefijo con un guión bajo porque es interno a la clase que lo define, y nunca debe ser llamado directamente por los programas de usuario.

writable._writev(chunks, callback)

  • chunks <Object[]> Los datos a escribir. El valor es un array de <Object> que representan cada uno un fragmento discreto de datos a escribir. Las propiedades de estos objetos son:

    • chunk <Buffer> | <string> Una instancia de buffer o una cadena que contiene los datos a escribir. El chunk será una cadena si el Writable se creó con la opción decodeStrings establecida en false y se pasó una cadena a write().
    • encoding <string> La codificación de caracteres del chunk. Si chunk es un Buffer, el encoding será 'buffer'.
  • callback <Function> Una función de devolución de llamada (opcionalmente con un argumento de error) que se invocará cuando se complete el procesamiento de los fragmentos suministrados.

Esta función NO DEBE ser llamada directamente por el código de la aplicación. Debe ser implementada por las clases hijas, y ser llamada solo por los métodos internos de la clase Writable.

El método writable._writev() puede ser implementado adicional o alternativamente a writable._write() en implementaciones de flujo que sean capaces de procesar múltiples fragmentos de datos a la vez. Si se implementa y si hay datos almacenados en búfer de escrituras anteriores, se llamará a _writev() en lugar de _write().

El método writable._writev() tiene un prefijo con un guion bajo porque es interno a la clase que lo define, y nunca debe ser llamado directamente por los programas de usuario.

writable._destroy(err, callback)

Agregado en: v8.0.0

  • err <Error> Un posible error.
  • callback <Function> Una función de callback que toma un argumento de error opcional.

El método _destroy() es llamado por writable.destroy(). Puede ser sobrescrito por clases hijas pero no debe ser llamado directamente.

writable._final(callback)

Agregado en: v8.0.0

  • callback <Function> Llama a esta función (opcionalmente con un argumento de error) cuando termine de escribir los datos restantes.

El método _final() no debe ser llamado directamente. Puede ser implementado por clases hijas, y si es así, será llamado solo por los métodos internos de la clase Writable.

Esta función opcional será llamada antes de que se cierre el stream, retrasando el evento 'finish' hasta que se llame a callback. Esto es útil para cerrar recursos o escribir datos en búfer antes de que termine un stream.

Errores al escribir

Los errores que ocurran durante el procesamiento de los métodos writable._write(), writable._writev() y writable._final() deben propagarse invocando la función de callback y pasando el error como primer argumento. Lanzar un Error desde dentro de estos métodos o emitir manualmente un evento 'error' resulta en un comportamiento indefinido.

Si un flujo Readable se canaliza a un flujo Writable cuando Writable emite un error, el flujo Readable se desconectará.

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'))
    } else {
      callback()
    }
  },
})

Un ejemplo de flujo writable

Lo siguiente ilustra una implementación de flujo Writable personalizado bastante simplista (y algo inútil). Si bien esta instancia específica de flujo Writable no tiene ninguna utilidad real en particular, el ejemplo ilustra cada uno de los elementos requeridos de una instancia de flujo Writable personalizado:

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'))
    } else {
      callback()
    }
  }
}

Decodificación de buffers en un flujo de escritura

La decodificación de buffers es una tarea común, por ejemplo, al usar transformadores cuya entrada es una cadena. Este no es un proceso trivial cuando se usa la codificación de caracteres multibyte, como UTF-8. El siguiente ejemplo muestra cómo decodificar cadenas multibyte usando StringDecoder y Writable.

js
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')

class StringWritable extends Writable {
  constructor(options) {
    super(options)
    this._decoder = new StringDecoder(options?.defaultEncoding)
    this.data = ''
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk)
    }
    this.data += chunk
    callback()
  }
  _final(callback) {
    this.data += this._decoder.end()
    callback()
  }
}

const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()

w.write('currency: ')
w.write(euro[0])
w.end(euro[1])

console.log(w.data) // currency: €

Implementación de un flujo legible

La clase stream.Readable se extiende para implementar un flujo Readable.

Los flujos Readable personalizados deben llamar al constructor new stream.Readable([options]) e implementar el método readable._read().

new stream.Readable([options])

[Historial]

VersiónCambios
v22.0.0Aumento del valor predeterminado de highWaterMark.
v15.5.0Soporte para pasar un AbortSignal.
v14.0.0Cambiar el valor predeterminado de la opción autoDestroy a true.
v11.2.0, v10.16.0Agregar la opción autoDestroy para destroy() automáticamente el flujo cuando emite 'end' o errores.
  • options <Object>
    • highWaterMark <number> El número de bytes máximo para almacenar en el búfer interno antes de dejar de leer del recurso subyacente. Predeterminado: 65536 (64 KiB), o 16 para flujos objectMode.
    • encoding <string> Si se especifica, los búferes se decodificarán a cadenas utilizando la codificación especificada. Predeterminado: null.
    • objectMode <boolean> Indica si este flujo debe comportarse como un flujo de objetos. Esto significa que stream.read(n) devuelve un único valor en lugar de un Buffer de tamaño n. Predeterminado: false.
    • emitClose <boolean> Indica si el flujo debe emitir 'close' después de haber sido destruido. Predeterminado: true.
    • read <Function> Implementación para el método stream._read().
    • destroy <Function> Implementación para el método stream._destroy().
    • construct <Function> Implementación para el método stream._construct().
    • autoDestroy <boolean> Indica si este flujo debe llamar automáticamente a .destroy() sobre sí mismo después de terminar. Predeterminado: true.
    • signal <AbortSignal> Una señal que representa una posible cancelación.
js
const { Readable } = require('node:stream')

class MyReadable extends Readable {
  constructor(options) {
    // Llama al constructor stream.Readable(options).
    super(options)
    // ...
  }
}

O, al usar constructores de estilo pre-ES6:

js
const { Readable } = require('node:stream')
const util = require('node:util')

function MyReadable(options) {
  if (!(this instanceof MyReadable)) return new MyReadable(options)
  Readable.call(this, options)
}
util.inherits(MyReadable, Readable)

O, usando el enfoque de constructor simplificado:

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    // ...
  },
})

Llamar a abort en el AbortController correspondiente a la AbortSignal pasada se comportará de la misma manera que llamar a .destroy(new AbortError()) en el readable creado.

js
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
  read(size) {
    // ...
  },
  signal: controller.signal,
})
// Más tarde, aborta la operación cerrando el flujo
controller.abort()

readable._construct(callback)

Agregado en: v15.0.0

  • callback <Function> Llama a esta función (opcionalmente con un argumento de error) cuando el stream ha terminado de inicializarse.

El método _construct() NO DEBE ser llamado directamente. Puede ser implementado por clases hijas, y si es así, será llamado solo por los métodos internos de la clase Readable.

Esta función opcional será programada en el siguiente tick por el constructor del stream, retrasando cualquier llamada a _read() y _destroy() hasta que se llame a callback. Esto es útil para inicializar el estado o inicializar asíncronamente los recursos antes de que el stream pueda ser usado.

js
const { Readable } = require('node:stream')
const fs = require('node:fs')

class ReadStream extends Readable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _read(n) {
    const buf = Buffer.alloc(n)
    fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
      if (err) {
        this.destroy(err)
      } else {
        this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
      }
    })
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

readable._read(size)

Añadido en: v0.9.4

  • size <number> Número de bytes a leer de forma asíncrona

Esta función NO DEBE ser llamada directamente por el código de la aplicación. Debe ser implementada por las clases hijas y llamada únicamente por los métodos internos de la clase Readable.

Todas las implementaciones de flujos Readable deben proporcionar una implementación del método readable._read() para obtener datos del recurso subyacente.

Cuando se llama a readable._read(), si hay datos disponibles del recurso, la implementación debe comenzar a insertar esos datos en la cola de lectura utilizando el método this.push(dataChunk). Se volverá a llamar a _read() después de cada llamada a this.push(dataChunk) una vez que el flujo esté listo para aceptar más datos. _read() puede continuar leyendo del recurso e insertando datos hasta que readable.push() devuelva false. Solo cuando se vuelva a llamar a _read() después de que se haya detenido, debe reanudar la inserción de datos adicionales en la cola.

Una vez que se ha llamado al método readable._read(), no se volverá a llamar hasta que se inserten más datos a través del método readable.push(). Los datos vacíos, como búferes y cadenas vacías, no harán que se llame a readable._read().

El argumento size es orientativo. Para las implementaciones en las que una "lectura" es una única operación que devuelve datos, se puede utilizar el argumento size para determinar cuántos datos obtener. Otras implementaciones pueden ignorar este argumento y simplemente proporcionar datos cuando estén disponibles. No es necesario "esperar" hasta que haya size bytes disponibles antes de llamar a stream.push(chunk).

El método readable._read() tiene el prefijo con un guión bajo porque es interno a la clase que lo define y nunca debe ser llamado directamente por los programas de usuario.

readable._destroy(err, callback)

Agregado en: v8.0.0

  • err <Error> Un posible error.
  • callback <Function> Una función de callback que toma un argumento de error opcional.

El método _destroy() es llamado por readable.destroy(). Puede ser sobrescrito por clases hijas pero no debe ser llamado directamente.

readable.push(chunk[, encoding])

[Historial]

VersiónCambios
v22.0.0, v20.13.0El argumento chunk ahora puede ser una instancia de TypedArray o DataView.
v8.0.0El argumento chunk ahora puede ser una instancia de Uint8Array.
  • chunk <Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Fragmento de datos para enviar a la cola de lectura. Para flujos que no operan en modo de objeto, chunk debe ser un <string>, <Buffer>, <TypedArray> o <DataView>. Para flujos en modo de objeto, chunk puede ser cualquier valor de JavaScript.
  • encoding <string> Codificación de fragmentos de cadena. Debe ser una codificación Buffer válida, como 'utf8' o 'ascii'.
  • Devuelve: <boolean> true si se pueden seguir enviando fragmentos de datos adicionales; false de lo contrario.

Cuando chunk es un <Buffer>, <TypedArray>, <DataView> o <string>, el chunk de datos se agregará a la cola interna para que los usuarios del flujo la consuman. Pasar chunk como null señala el final del flujo (EOF), después de lo cual no se pueden escribir más datos.

Cuando el Readable está operando en modo pausado, los datos agregados con readable.push() se pueden leer llamando al método readable.read() cuando se emite el evento 'readable'.

Cuando el Readable está operando en modo de flujo, los datos agregados con readable.push() se entregarán emitiendo un evento 'data'.

El método readable.push() está diseñado para ser lo más flexible posible. Por ejemplo, al envolver una fuente de nivel inferior que proporciona alguna forma de mecanismo de pausa/reanudación, y una devolución de llamada de datos, la fuente de nivel inferior se puede envolver mediante la instancia personalizada de Readable:

js
// `_source` es un objeto con métodos readStop() y readStart(),
// y un miembro `ondata` que se llama cuando tiene datos, y
// un miembro `onend` que se llama cuando los datos terminan.

class SourceWrapper extends Readable {
  constructor(options) {
    super(options)

    this._source = getLowLevelSourceObject()

    // Cada vez que hay datos, envíalos al búfer interno.
    this._source.ondata = chunk => {
      // Si push() devuelve false, entonces deja de leer de la fuente.
      if (!this.push(chunk)) this._source.readStop()
    }

    // Cuando termina la fuente, envía el fragmento `null` que señala el EOF.
    this._source.onend = () => {
      this.push(null)
    }
  }
  // _read() se llamará cuando el flujo quiera extraer más datos.
  // El argumento de tamaño de asesoramiento se ignora en este caso.
  _read(size) {
    this._source.readStart()
  }
}

El método readable.push() se utiliza para enviar el contenido al búfer interno. Puede ser impulsado por el método readable._read().

Para flujos que no operan en modo de objeto, si el parámetro chunk de readable.push() es undefined, se tratará como una cadena o búfer vacío. Consulta readable.push('') para obtener más información.

Errores durante la lectura

Los errores que ocurran durante el procesamiento de readable._read() deben propagarse a través del método readable.destroy(err). Lanzar un Error desde dentro de readable._read() o emitir manualmente un evento 'error' resulta en un comportamiento indefinido.

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    const err = checkSomeErrorCondition()
    if (err) {
      this.destroy(err)
    } else {
      // Do some work.
    }
  },
})

Un ejemplo de stream de conteo

El siguiente es un ejemplo básico de un stream Readable que emite los numerales del 1 al 1.000.000 en orden ascendente y luego finaliza.

js
const { Readable } = require('node:stream')

class Counter extends Readable {
  constructor(opt) {
    super(opt)
    this._max = 1000000
    this._index = 1
  }

  _read() {
    const i = this._index++
    if (i > this._max) this.push(null)
    else {
      const str = String(i)
      const buf = Buffer.from(str, 'ascii')
      this.push(buf)
    }
  }
}

Implementación de un flujo dúplex

Un flujo Duplex es aquel que implementa tanto Readable como Writable, como una conexión de socket TCP.

Dado que JavaScript no tiene soporte para la herencia múltiple, la clase stream.Duplex se extiende para implementar un flujo Duplex (en lugar de extender las clases stream.Readable y stream.Writable).

La clase stream.Duplex hereda prototípicamente de stream.Readable y parasíticamente de stream.Writable, pero instanceof funcionará correctamente para ambas clases base debido a la anulación de Symbol.hasInstance en stream.Writable.

Los flujos Duplex personalizados deben llamar al constructor new stream.Duplex([options]) e implementar tanto los métodos readable._read() como writable._write().

new stream.Duplex(options)

[Historial]

VersiónCambios
v8.4.0Ahora se admiten las opciones readableHighWaterMark y writableHighWaterMark.
  • options <Objeto> Se pasa a los constructores Writable y Readable. También tiene los siguientes campos:
    • allowHalfOpen <booleano> Si se establece en false, la secuencia finalizará automáticamente el lado escribible cuando finalice el lado legible. Predeterminado: true.
    • readable <booleano> Establece si el Duplex debe ser legible. Predeterminado: true.
    • writable <booleano> Establece si el Duplex debe ser escribible. Predeterminado: true.
    • readableObjectMode <booleano> Establece objectMode para el lado legible de la secuencia. No tiene efecto si objectMode es true. Predeterminado: false.
    • writableObjectMode <booleano> Establece objectMode para el lado escribible de la secuencia. No tiene efecto si objectMode es true. Predeterminado: false.
    • readableHighWaterMark <número> Establece highWaterMark para el lado legible de la secuencia. No tiene efecto si se proporciona highWaterMark.
    • writableHighWaterMark <número> Establece highWaterMark para el lado escribible de la secuencia. No tiene efecto si se proporciona highWaterMark.
js
const { Duplex } = require('node:stream')

class MyDuplex extends Duplex {
  constructor(options) {
    super(options)
    // ...
  }
}

O, al usar constructores de estilo pre-ES6:

js
const { Duplex } = require('node:stream')
const util = require('node:util')

function MyDuplex(options) {
  if (!(this instanceof MyDuplex)) return new MyDuplex(options)
  Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)

O, usando el enfoque de constructor simplificado:

js
const { Duplex } = require('node:stream')

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  },
})

Al usar pipeline:

js
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')

pipeline(
  fs.createReadStream('object.json').setEncoding('utf8'),
  new Transform({
    decodeStrings: false, // Aceptar entrada de cadena en lugar de Buffers
    construct(callback) {
      this.data = ''
      callback()
    },
    transform(chunk, encoding, callback) {
      this.data += chunk
      callback()
    },
    flush(callback) {
      try {
        // Asegurarse de que sea un json válido.
        JSON.parse(this.data)
        this.push(this.data)
        callback()
      } catch (err) {
        callback(err)
      }
    },
  }),
  fs.createWriteStream('valid-object.json'),
  err => {
    if (err) {
      console.error('failed', err)
    } else {
      console.log('completed')
    }
  }
)

Un ejemplo de flujo dúplex

Lo siguiente ilustra un ejemplo simple de un flujo Duplex que envuelve un objeto fuente hipotético de nivel inferior al que se pueden escribir datos y del que se pueden leer datos, aunque utilizando una API que no es compatible con los flujos de Node.js. Lo siguiente ilustra un ejemplo simple de un flujo Duplex que almacena en búfer los datos escritos entrantes a través de la interfaz Writable que se lee de nuevo a través de la interfaz Readable.

js
const { Duplex } = require('node:stream')
const kSource = Symbol('source')

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options)
    this[kSource] = source
  }

  _write(chunk, encoding, callback) {
    // La fuente subyacente solo trata con cadenas.
    if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
    this[kSource].writeSomeData(chunk)
    callback()
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding))
    })
  }
}

El aspecto más importante de un flujo Duplex es que los lados Readable y Writable operan de forma independiente entre sí a pesar de coexistir dentro de una sola instancia de objeto.

Streams dúplex en modo objeto

Para los streams Duplex, objectMode se puede establecer exclusivamente para el lado Readable o Writable utilizando las opciones readableObjectMode y writableObjectMode respectivamente.

En el siguiente ejemplo, por ejemplo, se crea un nuevo stream Transform (que es un tipo de stream Duplex) que tiene un lado Writable en modo objeto que acepta números de JavaScript que se convierten en cadenas hexadecimales en el lado Readable.

js
const { Transform } = require('node:stream')

// Todos los streams Transform también son streams Duplex.
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // Coacciona el chunk a un número si es necesario.
    chunk |= 0

    // Transforma el chunk en otra cosa.
    const data = chunk.toString(16)

    // Empuja los datos a la cola readable.
    callback(null, '0'.repeat(data.length % 2) + data)
  },
})

myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))

myTransform.write(1)
// Imprime: 01
myTransform.write(10)
// Imprime: 0a
myTransform.write(100)
// Imprime: 64

Implementación de un flujo de transformación

Un flujo Transform es un flujo Duplex donde la salida se calcula de alguna manera a partir de la entrada. Algunos ejemplos incluyen los flujos zlib o los flujos crypto que comprimen, encriptan o desencriptan datos.

No se requiere que la salida tenga el mismo tamaño que la entrada, la misma cantidad de fragmentos o que llegue al mismo tiempo. Por ejemplo, un flujo Hash solo tendrá un único fragmento de salida que se proporciona cuando se termina la entrada. Un flujo zlib producirá una salida que es mucho más pequeña o mucho más grande que su entrada.

La clase stream.Transform se extiende para implementar un flujo Transform.

La clase stream.Transform hereda prototípicamente de stream.Duplex e implementa sus propias versiones de los métodos writable._write() y readable._read(). Las implementaciones personalizadas de Transform deben implementar el método transform._transform() y también pueden implementar el método transform._flush().

Se debe tener cuidado al usar flujos Transform, ya que los datos escritos en el flujo pueden hacer que el lado Writable del flujo se pause si la salida en el lado Readable no se consume.

new stream.Transform([options])

js
const { Transform } = require('node:stream')

class MyTransform extends Transform {
  constructor(options) {
    super(options)
    // ...
  }
}

O, cuando se usan constructores de estilo pre-ES6:

js
const { Transform } = require('node:stream')
const util = require('node:util')

function MyTransform(options) {
  if (!(this instanceof MyTransform)) return new MyTransform(options)
  Transform.call(this, options)
}
util.inherits(MyTransform, Transform)

O, utilizando el enfoque de constructor simplificado:

js
const { Transform } = require('node:stream')

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  },
})

Evento: 'end'

El evento 'end' proviene de la clase stream.Readable. El evento 'end' se emite después de que todos los datos han sido emitidos, lo cual ocurre después de que se ha llamado a la función de callback en transform._flush(). En caso de un error, no se debe emitir 'end'.

Evento: 'finish'

El evento 'finish' proviene de la clase stream.Writable. El evento 'finish' se emite después de que se llama a stream.end() y todos los fragmentos han sido procesados por stream._transform(). En caso de un error, no se debe emitir 'finish'.

transform._flush(callback)

  • callback <Function> Una función de callback (opcionalmente con un argumento de error y datos) que se llamará cuando los datos restantes se hayan vaciado.

Esta función NO DEBE ser llamada directamente por el código de la aplicación. Debe ser implementada por las clases hijas, y ser llamada únicamente por los métodos internos de la clase Readable.

En algunos casos, una operación de transformación puede necesitar emitir un bit adicional de datos al final del flujo. Por ejemplo, un flujo de compresión zlib almacenará una cantidad de estado interno usado para comprimir de manera óptima la salida. Cuando el flujo termina, sin embargo, esos datos adicionales necesitan ser vaciados para que los datos comprimidos estén completos.

Las implementaciones personalizadas de Transform pueden implementar el método transform._flush(). Esto será llamado cuando no haya más datos escritos para ser consumidos, pero antes de que el evento 'end' se emita señalando el final del flujo Readable.

Dentro de la implementación de transform._flush(), el método transform.push() puede ser llamado cero o más veces, según corresponda. La función callback debe ser llamada cuando la operación de vaciado esté completa.

El método transform._flush() tiene como prefijo un guion bajo porque es interno a la clase que lo define, y nunca debe ser llamado directamente por programas de usuario.

transform._transform(chunk, encoding, callback)

  • chunk <Buffer> | <string> | <any> El Buffer a ser transformado, convertido desde la string pasada a stream.write(). Si la opción decodeStrings del stream es false o el stream está operando en modo objeto, el chunk no será convertido y será lo que sea que fue pasado a stream.write().
  • encoding <string> Si el chunk es una string, entonces este es el tipo de codificación. Si el chunk es un buffer, entonces este es el valor especial 'buffer'. Ignóralo en ese caso.
  • callback <Function> Una función de callback (opcionalmente con un argumento de error y datos) para ser llamada después de que el chunk suministrado haya sido procesado.

Esta función NO DEBE ser llamada directamente por el código de la aplicación. Debe ser implementada por clases hijas, y llamada por los métodos internos de la clase Readable solamente.

Todas las implementaciones de streams Transform deben proveer un método _transform() para aceptar entrada y producir salida. La implementación de transform._transform() maneja los bytes que están siendo escritos, calcula una salida, y luego pasa esa salida a la porción legible utilizando el método transform.push().

El método transform.push() puede ser llamado cero o más veces para generar salida desde un solo chunk de entrada, dependiendo de cuánto debe ser la salida como resultado del chunk.

Es posible que no se genere ninguna salida de ningún chunk dado de datos de entrada.

La función callback debe ser llamada solo cuando el chunk actual ha sido completamente consumido. El primer argumento pasado al callback debe ser un objeto Error si ha ocurrido un error mientras se procesaba la entrada o null de lo contrario. Si un segundo argumento es pasado al callback, será reenviado al método transform.push(), pero solo si el primer argumento es falsy. En otras palabras, lo siguiente es equivalente:

js
transform.prototype._transform = function (data, encoding, callback) {
  this.push(data)
  callback()
}

transform.prototype._transform = function (data, encoding, callback) {
  callback(null, data)
}

El método transform._transform() tiene el prefijo de un guión bajo porque es interno a la clase que lo define, y nunca debería ser llamado directamente por programas de usuario.

transform._transform() nunca es llamado en paralelo; los streams implementan un mecanismo de cola, y para recibir el siguiente chunk, callback debe ser llamado, ya sea sincrónica o asincrónicamente.

Clase: stream.PassThrough

La clase stream.PassThrough es una implementación trivial de un stream Transform que simplemente pasa los bytes de entrada a la salida. Su propósito es principalmente para ejemplos y pruebas, pero hay algunos casos de uso donde stream.PassThrough es útil como bloque de construcción para nuevos tipos de streams.

Notas adicionales

Compatibilidad de Streams con generadores asíncronos e iteradores asíncronos

Con el soporte de generadores e iteradores asíncronos en JavaScript, los generadores asíncronos son efectivamente una construcción de stream de primer nivel a nivel de lenguaje en este momento.

A continuación, se proporcionan algunos casos comunes de interoperabilidad del uso de streams de Node.js con generadores asíncronos e iteradores asíncronos.

Consumo de streams legibles con iteradores asíncronos

js
;(async function () {
  for await (const chunk of readable) {
    console.log(chunk)
  }
})()

Los iteradores asíncronos registran un manejador de errores permanente en el stream para prevenir cualquier error post-destrucción no manejado.

Creación de flujos de lectura con generadores asíncronos

Se puede crear un flujo de lectura de Node.js a partir de un generador asíncrono utilizando el método de utilidad Readable.from():

js
const { Readable } = require('node:stream')

const ac = new AbortController()
const signal = ac.signal

async function* generate() {
  yield 'a'
  await someLongRunningFn({ signal })
  yield 'b'
  yield 'c'
}

const readable = Readable.from(generate())
readable.on('close', () => {
  ac.abort()
})

readable.on('data', chunk => {
  console.log(chunk)
})

Canalización a flujos de escritura desde iteradores asíncronos

Al escribir en un flujo de escritura desde un iterador asíncrono, asegúrese de manejar correctamente la contrapresión y los errores. stream.pipeline() abstrae el manejo de la contrapresión y los errores relacionados con la contrapresión:

js
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')

const writable = fs.createWriteStream('./file')

const ac = new AbortController()
const signal = ac.signal

const iterator = createIterator({ signal })

// Patrón de Callback
pipeline(iterator, writable, (err, value) => {
  if (err) {
    console.error(err)
  } else {
    console.log(value, 'valor devuelto')
  }
}).on('close', () => {
  ac.abort()
})

// Patrón de Promesa
pipelinePromise(iterator, writable)
  .then(value => {
    console.log(value, 'valor devuelto')
  })
  .catch(err => {
    console.error(err)
    ac.abort()
  })

Compatibilidad con versiones antiguas de Node.js

Antes de Node.js 0.10, la interfaz de flujo Readable era más simple, pero también menos potente y menos útil.

  • En lugar de esperar a las llamadas al método stream.read(), los eventos 'data' comenzaban a emitirse de inmediato. Las aplicaciones que necesitaban realizar algún trabajo para decidir cómo manejar los datos debían almacenar los datos leídos en búferes para que no se perdieran.
  • El método stream.pause() era consultivo, en lugar de garantizado. Esto significaba que todavía era necesario estar preparado para recibir eventos 'data' incluso cuando el flujo estaba en estado de pausa.

En Node.js 0.10, se agregó la clase Readable. Para la compatibilidad con versiones anteriores de los programas Node.js, los flujos Readable cambian al "modo de flujo" cuando se agrega un controlador de eventos 'data', o cuando se llama al método stream.resume(). El efecto es que, incluso cuando no se usan el nuevo método stream.read() y el evento 'readable', ya no es necesario preocuparse por perder fragmentos 'data'.

Si bien la mayoría de las aplicaciones seguirán funcionando normalmente, esto introduce un caso límite en las siguientes condiciones:

  • No se agrega ningún listener de eventos 'data'.
  • Nunca se llama al método stream.resume().
  • El flujo no está conectado a ningún destino de escritura.

Por ejemplo, considere el siguiente código:

js
// ¡ADVERTENCIA! ¡ROTO!
net
  .createServer(socket => {
    // Agregamos un listener 'end', pero nunca consumimos los datos.
    socket.on('end', () => {
      // Nunca llegará aquí.
      socket.end('El mensaje fue recibido pero no fue procesado.\n')
    })
  })
  .listen(1337)

Antes de Node.js 0.10, los datos del mensaje entrante simplemente se descartaban. Sin embargo, en Node.js 0.10 y posteriores, el socket permanece en pausa para siempre.

La solución en esta situación es llamar al método stream.resume() para iniciar el flujo de datos:

js
// Solución.
net
  .createServer(socket => {
    socket.on('end', () => {
      socket.end('El mensaje fue recibido pero no fue procesado.\n')
    })

    // Inicia el flujo de datos, descartándolo.
    socket.resume()
  })
  .listen(1337)

Además de que los nuevos flujos Readable cambien al modo de flujo, los flujos de estilo pre-0.10 se pueden envolver en una clase Readable utilizando el método readable.wrap().

readable.read(0)

Hay algunos casos en los que es necesario activar una actualización de los mecanismos subyacentes de la secuencia de lectura, sin consumir realmente ningún dato. En tales casos, es posible llamar a readable.read(0), que siempre devolverá null.

Si el búfer de lectura interno está por debajo de highWaterMark y la secuencia no está leyendo actualmente, entonces llamar a stream.read(0) activará una llamada de bajo nivel a stream._read().

Si bien la mayoría de las aplicaciones casi nunca necesitarán hacer esto, existen situaciones dentro de Node.js donde se hace esto, particularmente en el funcionamiento interno de la clase Readable stream.

readable.push('')

No se recomienda el uso de readable.push('').

Enviar un <string>, <Buffer>, <TypedArray> o <DataView> de cero bytes a una secuencia que no está en modo objeto tiene un efecto secundario interesante. Debido a que es una llamada a readable.push(), la llamada finalizará el proceso de lectura. Sin embargo, debido a que el argumento es una cadena vacía, no se agrega ningún dato al búfer de lectura, por lo que no hay nada para que un usuario consuma.

Discrepancia en highWaterMark después de llamar a readable.setEncoding()

El uso de readable.setEncoding() cambiará el comportamiento de cómo opera highWaterMark en modo no objeto.

Normalmente, el tamaño del búfer actual se mide contra highWaterMark en bytes. Sin embargo, después de llamar a setEncoding(), la función de comparación comenzará a medir el tamaño del búfer en caracteres.

Esto no es un problema en casos comunes con latin1 o ascii. Pero se recomienda tener en cuenta este comportamiento cuando se trabaja con cadenas que podrían contener caracteres multibyte.