Skip to content

Stream

[Estável: 2 - Estável]

Estável: 2 Estabilidade: 2 - Estável

Código-fonte: lib/stream.js

Um stream é uma interface abstrata para trabalhar com dados em streaming no Node.js. O módulo node:stream fornece uma API para implementar a interface de stream.

Existem muitos objetos de stream fornecidos pelo Node.js. Por exemplo, uma solicitação a um servidor HTTP e process.stdout são ambos instâncias de stream.

Os streams podem ser legíveis, graváveis ou ambos. Todos os streams são instâncias de EventEmitter.

Para acessar o módulo node:stream:

js
const stream = require('node:stream')

O módulo node:stream é útil para criar novos tipos de instâncias de stream. Normalmente, não é necessário usar o módulo node:stream para consumir streams.

Organização deste documento

Este documento contém duas seções principais e uma terceira seção para notas. A primeira seção explica como usar streams existentes dentro de um aplicativo. A segunda seção explica como criar novos tipos de streams.

Tipos de streams

Existem quatro tipos de streams fundamentais no Node.js:

Além disso, este módulo inclui as funções utilitárias stream.duplexPair(), stream.pipeline(), stream.finished() stream.Readable.from() e stream.addAbortSignal().

Streams Promises API

Adicionado em: v15.0.0

A API stream/promises fornece um conjunto alternativo de funções de utilitário assíncronas para streams que retornam objetos Promise em vez de usar callbacks. A API é acessível via require('node:stream/promises') ou require('node:stream').promises.

stream.pipeline(source[, ...transforms], destination[, options])

stream.pipeline(streams[, options])

[Histórico]

VersãoAlterações
v18.0.0, v17.2.0, v16.14.0Adiciona a opção end, que pode ser definida como false para evitar o fechamento automático do stream de destino quando a origem termina.
v15.0.0Adicionada em: v15.0.0
js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
  console.log('Pipeline succeeded.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Pipeline succeeded.')

Para usar um AbortSignal, passe-o dentro de um objeto de opções, como o último argumento. Quando o sinal for abortado, destroy será chamado no pipeline subjacente, com um AbortError.

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  const ac = new AbortController()
  const signal = ac.signal

  setImmediate(() => ac.abort())
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
    signal,
  })
}

run().catch(console.error) // AbortError
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
  await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
  console.error(err) // AbortError
}

A API pipeline também suporta geradores assíncronos:

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(
    fs.createReadStream('lowercase.txt'),
    async function* (source, { signal }) {
      source.setEncoding('utf8') // Trabalha com strings em vez de `Buffer`s.
      for await (const chunk of source) {
        yield await processChunk(chunk, { signal })
      }
    },
    fs.createWriteStream('uppercase.txt')
  )
  console.log('Pipeline succeeded.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'

await pipeline(
  createReadStream('lowercase.txt'),
  async function* (source, { signal }) {
    source.setEncoding('utf8') // Trabalha com strings em vez de `Buffer`s.
    for await (const chunk of source) {
      yield await processChunk(chunk, { signal })
    }
  },
  createWriteStream('uppercase.txt')
)
console.log('Pipeline succeeded.')

Lembre-se de lidar com o argumento signal passado para o gerador assíncrono. Especialmente no caso em que o gerador assíncrono é a fonte do pipeline (ou seja, o primeiro argumento) ou o pipeline nunca será concluído.

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(async function* ({ signal }) {
    await someLongRunningfn({ signal })
    yield 'asd'
  }, fs.createWriteStream('uppercase.txt'))
  console.log('Pipeline succeeded.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
  await someLongRunningfn({ signal })
  yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline succeeded.')

A API pipeline fornece versão com callback:

stream.finished(stream[, options])

[Histórico]

VersãoAlterações
v19.5.0, v18.14.0Adicionou suporte para ReadableStream e WritableStream.
v19.1.0, v18.13.0A opção cleanup foi adicionada.
v15.0.0Adicionada em: v15.0.0
js
const { finished } = require('node:stream/promises')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Stream terminou a leitura.')
}

run().catch(console.error)
rs.resume() // Esvazia o fluxo.
js
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'

const rs = createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Stream terminou a leitura.')
}

run().catch(console.error)
rs.resume() // Esvazia o fluxo.

A API finished também fornece uma versão de callback.

stream.finished() deixa ouvintes de eventos pendentes (em particular 'error', 'end', 'finish' e 'close') depois que a promessa retornada é resolvida ou rejeitada. A razão para isso é que eventos 'error' inesperados (devido a implementações de fluxo incorretas) não causam travamentos inesperados. Se este for um comportamento indesejado, options.cleanup deve ser definido como true:

js
await finished(rs, { cleanup: true })

Modo objeto

Todos os fluxos criados pelas APIs do Node.js operam exclusivamente em strings, <Buffer>, <TypedArray> e <DataView> objetos:

  • Strings e Buffers são os tipos mais comuns usados com fluxos.
  • TypedArray e DataView permitem que você manipule dados binários com tipos como Int32Array ou Uint8Array. Quando você grava um TypedArray ou DataView em um fluxo, o Node.js processa os bytes brutos.

No entanto, é possível que as implementações de fluxo funcionem com outros tipos de valores JavaScript (com exceção de null, que tem um propósito especial dentro dos fluxos). Esses fluxos são considerados como operando em "modo objeto".

As instâncias de fluxo são alteradas para o modo objeto usando a opção objectMode quando o fluxo é criado. Tentar alternar um fluxo existente para o modo objeto não é seguro.

Buffering

Tanto os fluxos Writable quanto Readable armazenarão dados em um buffer interno.

A quantidade de dados potencialmente armazenados em buffer depende da opção highWaterMark passada para o construtor do fluxo. Para fluxos normais, a opção highWaterMark especifica um número total de bytes. Para fluxos operando em modo objeto, o highWaterMark especifica um número total de objetos. Para fluxos operando em (mas não decodificando) strings, o highWaterMark especifica um número total de unidades de código UTF-16.

Os dados são armazenados em buffer em fluxos Readable quando a implementação chama stream.push(chunk). Se o consumidor do Stream não chamar stream.read(), os dados permanecerão na fila interna até serem consumidos.

Assim que o tamanho total do buffer de leitura interno atingir o limite especificado por highWaterMark, o fluxo interromperá temporariamente a leitura de dados do recurso subjacente até que os dados atualmente armazenados em buffer possam ser consumidos (ou seja, o fluxo parará de chamar o método interno readable._read() usado para preencher o buffer de leitura).

Os dados são armazenados em buffer em fluxos Writable quando o método writable.write(chunk) é chamado repetidamente. Enquanto o tamanho total do buffer de gravação interno estiver abaixo do limite definido por highWaterMark, as chamadas para writable.write() retornarão true. Assim que o tamanho do buffer interno atingir ou exceder o highWaterMark, false será retornado.

Um objetivo principal da API stream, particularmente o método stream.pipe(), é limitar o armazenamento em buffer de dados a níveis aceitáveis, de modo que fontes e destinos com velocidades diferentes não sobrecarreguem a memória disponível.

A opção highWaterMark é um limite, não um limite estrito: ela dita a quantidade de dados que um fluxo armazena em buffer antes de parar de solicitar mais dados. Ela não impõe uma limitação de memória estrita em geral. Implementações de fluxo específicas podem optar por impor limites mais rígidos, mas isso é opcional.

Como os fluxos Duplex e Transform são Readable e Writable, cada um mantém dois buffers internos separados usados para leitura e gravação, permitindo que cada lado opere independentemente do outro, mantendo um fluxo apropriado e eficiente de dados. Por exemplo, as instâncias net.Socket são fluxos Duplex cujo lado Readable permite o consumo de dados recebidos de a socket e cujo lado Writable permite a gravação de dados para a socket. Como os dados podem ser escritos para a socket a uma taxa mais rápida ou mais lenta do que os dados são recebidos, cada lado deve operar (e armazenar em buffer) independentemente do outro.

A mecânica do buffering interno é um detalhe de implementação interna e pode ser alterada a qualquer momento. No entanto, para certas implementações avançadas, os buffers internos podem ser recuperados usando writable.writableBuffer ou readable.readableBuffer. O uso dessas propriedades não documentadas é desencorajado.

API para consumidores de stream

Quase todos os aplicativos Node.js, por mais simples que sejam, usam streams de alguma forma. O seguinte é um exemplo de uso de streams em um aplicativo Node.js que implementa um servidor HTTP:

js
const http = require('node:http')

const server = http.createServer((req, res) => {
  // `req` é um http.IncomingMessage, que é um stream legível.
  // `res` é um http.ServerResponse, que é um stream gravável.

  let body = ''
  // Obter os dados como strings utf8.
  // Se uma codificação não for definida, objetos Buffer serão recebidos.
  req.setEncoding('utf8')

  // Streams legíveis emitem eventos 'data' assim que um listener é adicionado.
  req.on('data', chunk => {
    body += chunk
  })

  // O evento 'end' indica que todo o corpo foi recebido.
  req.on('end', () => {
    try {
      const data = JSON.parse(body)
      // Escrever algo interessante de volta para o usuário:
      res.write(typeof data)
      res.end()
    } catch (er) {
      // opa! json inválido!
      res.statusCode = 400
      return res.end(`error: ${er.message}`)
    }
  })
})

server.listen(1337)

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON

Streams Writable (como res no exemplo) expõem métodos como write() e end() que são usados para escrever dados no stream.

Streams Readable usam a API EventEmitter para notificar o código do aplicativo quando os dados estão disponíveis para leitura no stream. Esses dados disponíveis podem ser lidos do stream de várias maneiras.

Tanto os streams Writable quanto os streams Readable usam a API EventEmitter de várias maneiras para comunicar o estado atual do stream.

Streams Duplex e Transform são ambos Writable e Readable.

Aplicativos que estão escrevendo dados ou consumindo dados de um stream não precisam implementar as interfaces de stream diretamente e geralmente não terão motivo para chamar require('node:stream').

Desenvolvedores que desejam implementar novos tipos de streams devem consultar a seção API para implementadores de stream.

Fluxos graváveis

Fluxos graváveis são uma abstração para um destino para o qual os dados são escritos.

Exemplos de fluxos Writable incluem:

Alguns desses exemplos são, na verdade, fluxos Duplex que implementam a interface Writable.

Todos os fluxos Writable implementam a interface definida pela classe stream.Writable.

Embora instâncias específicas de fluxos Writable possam diferir de várias maneiras, todos os fluxos Writable seguem o mesmo padrão de uso fundamental, como ilustrado no exemplo abaixo:

js
const myStream = getWritableStreamSomehow()
myStream.write('some data')
myStream.write('some more data')
myStream.end('done writing data')

Classe: stream.Writable

Adicionado em: v0.9.4

Evento: 'close'

[Histórico]

VersãoAlterações
v10.0.0Adiciona a opção emitClose para especificar se 'close' é emitido em destroy.
v0.9.4Adicionada em: v0.9.4

O evento 'close' é emitido quando o fluxo e quaisquer recursos subjacentes (um descritor de arquivo, por exemplo) foram fechados. O evento indica que nenhum outro evento será emitido e nenhum cálculo adicional ocorrerá.

Um fluxo Writable sempre emitirá o evento 'close' se for criado com a opção emitClose.

Evento: 'drain'

Adicionado em: v0.9.4

Se uma chamada para stream.write(chunk) retornar false, o evento 'drain' será emitido quando for apropriado retomar a gravação de dados no fluxo.

js
// Escreva os dados no fluxo gravável fornecido um milhão de vezes.
// Esteja atento à contrapressão.
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000
  write()
  function write() {
    let ok = true
    do {
      i--
      if (i === 0) {
        // Última vez!
        writer.write(data, encoding, callback)
      } else {
        // Veja se devemos continuar ou esperar.
        // Não passe o callback, porque ainda não terminamos.
        ok = writer.write(data, encoding)
      }
    } while (i > 0 && ok)
    if (i > 0) {
      // Teve que parar cedo!
      // Escreva mais uma vez que esvaiar.
      writer.once('drain', write)
    }
  }
}
Evento: 'error'

Adicionado em: v0.9.4

O evento 'error' é emitido se ocorrer um erro durante a escrita ou o envio de dados por pipe. O callback do listener recebe um único argumento Error quando chamado.

O fluxo é fechado quando o evento 'error' é emitido, a menos que a opção autoDestroy tenha sido definida como false ao criar o fluxo.

Após 'error', nenhum outro evento além de 'close' deve ser emitido (incluindo eventos 'error').

Evento: 'finish'

Adicionado em: v0.9.4

O evento 'finish' é emitido depois que o método stream.end() foi chamado e todos os dados foram enviados para o sistema subjacente.

js
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
  console.log('Todas as escritas estão agora completas.')
})
writer.end('Este é o fim\n')
Evento: 'pipe'

Adicionado em: v0.9.4

  • src <stream.Readable> fluxo de origem que está sendo enviado por pipe para este gravável

O evento 'pipe' é emitido quando o método stream.pipe() é chamado em um fluxo legível, adicionando este gravável ao seu conjunto de destinos.

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
  console.log('Algo está sendo enviado por pipe para o gravador.')
  assert.equal(src, reader)
})
reader.pipe(writer)
Evento: 'unpipe'

Adicionado em: v0.9.4

O evento 'unpipe' é emitido quando o método stream.unpipe() é chamado em um fluxo Readable, removendo este Writable do seu conjunto de destinos.

Isso também é emitido caso este fluxo Writable emita um erro quando um fluxo Readable o utiliza por pipe.

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
  console.log('Algo parou de ser enviado por pipe para o gravador.')
  assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()

Adicionado em: v0.11.2

O método writable.cork() força que todos os dados escritos sejam armazenados em buffer na memória. Os dados em buffer serão liberados quando os métodos stream.uncork() ou stream.end() forem chamados.

A intenção principal de writable.cork() é acomodar uma situação em que vários pequenos chunks são escritos para o stream em rápida sucessão. Em vez de encaminhá-los imediatamente para o destino subjacente, writable.cork() armazena em buffer todos os chunks até que writable.uncork() seja chamado, o que passará todos eles para writable._writev(), se presente. Isso evita uma situação de bloqueio head-of-line onde os dados estão sendo armazenados em buffer enquanto aguardam o processamento do primeiro pequeno chunk. No entanto, o uso de writable.cork() sem implementar writable._writev() pode ter um efeito adverso na taxa de transferência.

Veja também: writable.uncork(), writable._writev().

writable.destroy([error])

[Histórico]

VersãoAlterações
v14.0.0Funciona como uma operação sem efeito em um stream que já foi destruído.
v8.0.0Adicionada em: v8.0.0
  • error <Error> Opcional, um erro para emitir com o evento 'error'.
  • Retorna: <this>

Destrói o stream. Opcionalmente, emite um evento 'error' e emite um evento 'close' (a menos que emitClose esteja definido como false). Após esta chamada, o stream gravável terminou e chamadas subsequentes a write() ou end() resultarão em um erro ERR_STREAM_DESTROYED. Esta é uma maneira destrutiva e imediata de destruir um stream. Chamadas anteriores a write() podem não ter sido drenadas e podem disparar um erro ERR_STREAM_DESTROYED. Use end() em vez de destruir se os dados devem ser liberados antes do fechamento, ou aguarde o evento 'drain' antes de destruir o stream.

js
const { Writable } = require('node:stream')

const myStream = new Writable()

const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
js
const { Writable } = require('node:stream')

const myStream = new Writable()

myStream.destroy()
myStream.on('error', function wontHappen() {})
js
const { Writable } = require('node:stream')

const myStream = new Writable()
myStream.destroy()

myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED

Uma vez que destroy() foi chamado, quaisquer outras chamadas serão uma operação sem efeito e nenhum outro erro, exceto de _destroy(), poderá ser emitido como 'error'.

Os implementadores não devem substituir este método, mas sim implementar writable._destroy().

writable.closed

Adicionado em: v18.0.0

É true depois que 'close' foi emitido.

writable.destroyed

Adicionado em: v8.0.0

É true depois que writable.destroy() foi chamado.

js
const { Writable } = require('node:stream')

const myStream = new Writable()

console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])

[Histórico]

VersãoAlterações
v22.0.0, v20.13.0O argumento chunk agora pode ser uma instância de TypedArray ou DataView.
v15.0.0O callback é chamado antes de 'finish' ou em caso de erro.
v14.0.0O callback é chamado se 'finish' ou 'error' for emitido.
v10.0.0Este método agora retorna uma referência para writable.
v8.0.0O argumento chunk agora pode ser uma instância de Uint8Array.
v0.9.4Adicionada em: v0.9.4

Chamar o método writable.end() sinaliza que nenhum dado adicional será escrito no Writable. Os argumentos opcionais chunk e encoding permitem que um último bloco de dados adicional seja escrito imediatamente antes de fechar o fluxo.

Chamar o método stream.write() depois de chamar stream.end() gerará um erro.

js
// Escreve 'hello, ' e depois termina com 'world!'.
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hello, ')
file.end('world!')
// Escrever mais agora não é permitido!
writable.setDefaultEncoding(encoding)

[Histórico]

VersãoAlterações
v6.1.0Este método agora retorna uma referência para writable.
v0.11.15Adicionado em: v0.11.15

O método writable.setDefaultEncoding() define a codificação encoding padrão para um fluxo Writable.

writable.uncork()

Adicionado em: v0.11.2

O método writable.uncork() limpa todos os dados em buffer desde que stream.cork() foi chamado.

Quando usar writable.cork() e writable.uncork() para gerenciar o buffer de gravações em um fluxo, adiar chamadas para writable.uncork() usando process.nextTick(). Fazer isso permite o agrupamento de todas as chamadas writable.write() que ocorrem dentro de uma determinada fase do loop de eventos do Node.js.

js
stream.cork()
stream.write('some ')
stream.write('data ')
process.nextTick(() => stream.uncork())

Se o método writable.cork() for chamado várias vezes em um fluxo, o mesmo número de chamadas para writable.uncork() deve ser chamado para limpar os dados em buffer.

js
stream.cork()
stream.write('some ')
stream.cork()
stream.write('data ')
process.nextTick(() => {
  stream.uncork()
  // Os dados não serão limpos até que uncork() seja chamado pela segunda vez.
  stream.uncork()
})

Veja também: writable.cork().

writable.writable

Adicionado em: v11.4.0

É true se for seguro chamar writable.write(), o que significa que o fluxo não foi destruído, ocorreu um erro ou foi encerrado.

writable.writableAborted

Adicionado em: v18.0.0, v16.17.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

Retorna se o fluxo foi destruído ou ocorreu um erro antes de emitir 'finish'.

writable.writableEnded

Adicionado em: v12.9.0

É true depois que writable.end() foi chamado. Esta propriedade não indica se os dados foram liberados; para isso, use writable.writableFinished em vez disso.

writable.writableCorked

Adicionado em: v13.2.0, v12.16.0

Número de vezes que writable.uncork() precisa ser chamado para destampar completamente o fluxo.

writable.errored

Adicionado em: v18.0.0

Retorna um erro se o fluxo foi destruído com um erro.

writable.writableFinished

Adicionado em: v12.6.0

É definido como true imediatamente antes do evento 'finish' ser emitido.

writable.writableHighWaterMark

Adicionado em: v9.3.0

Retorna o valor de highWaterMark passado ao criar este Writable.

writable.writableLength

Adicionado em: v9.4.0

Esta propriedade contém o número de bytes (ou objetos) na fila prontos para serem escritos. O valor fornece dados de introspecção sobre o estado do highWaterMark.

writable.writableNeedDrain

Adicionado em: v15.2.0, v14.17.0

É true se o buffer do fluxo estiver cheio e o fluxo emitir 'drain'.

writable.writableObjectMode

Adicionado em: v12.3.0

Getter para a propriedade objectMode de um fluxo Writable fornecido.

writable[Symbol.asyncDispose]()

Adicionado em: v22.4.0, v20.16.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

Chama writable.destroy() com um AbortError e retorna uma promessa que se cumpre quando o fluxo terminar.

writable.write(chunk[, encoding][, callback])

[Histórico]

VersãoAlterações
v22.0.0, v20.13.0O argumento chunk agora pode ser uma instância de TypedArray ou DataView.
v8.0.0O argumento chunk agora pode ser uma instância de Uint8Array.
v6.0.0Passar null como parâmetro chunk será sempre considerado inválido agora, mesmo no modo objeto.
v0.9.4Adicionada em: v0.9.4
  • chunk <string> | <Buffer> | <TypedArray> | <DataView> | <any> Dados opcionais para gravar. Para fluxos que não operam no modo objeto, chunk deve ser uma <string>, <Buffer>, <TypedArray> ou <DataView>. Para fluxos em modo objeto, chunk pode ser qualquer valor JavaScript diferente de null.
  • encoding <string> | <null> A codificação, se chunk for uma string. Padrão: 'utf8'
  • callback <Function> Callback para quando este bloco de dados for liberado.
  • Retorna: <boolean> false se o fluxo desejar que o código de chamada aguarde o evento 'drain' ser emitido antes de continuar a gravar dados adicionais; caso contrário, true.

O método writable.write() grava alguns dados no fluxo e chama o callback fornecido assim que os dados forem totalmente processados. Se ocorrer um erro, o callback será chamado com o erro como seu primeiro argumento. O callback é chamado assincronamente e antes de 'error' ser emitido.

O valor retornado é true se o buffer interno for menor que o highWaterMark configurado quando o fluxo foi criado após admitir chunk. Se false for retornado, novas tentativas de gravar dados no fluxo devem parar até que o evento 'drain' seja emitido.

Enquanto um fluxo não está drenando, as chamadas para write() armazenarão em buffer chunk e retornarão false. Assim que todos os chunks atualmente armazenados em buffer forem drenados (aceitos para entrega pelo sistema operacional), o evento 'drain' será emitido. Depois que write() retornar false, não grave mais chunks até que o evento 'drain' seja emitido. Embora chamar write() em um fluxo que não está drenando seja permitido, o Node.js armazenará em buffer todos os chunks gravados até que o uso máximo de memória ocorra, momento em que ele abortará incondicionalmente. Mesmo antes de abortar, o alto uso de memória causará baixo desempenho do coletor de lixo e alto RSS (que normalmente não é liberado de volta para o sistema, mesmo depois que a memória não for mais necessária). Como as sockets TCP podem nunca drenar se o peer remoto não ler os dados, gravar em uma socket que não está drenando pode levar a uma vulnerabilidade explorável remotamente.

Gravar dados enquanto o fluxo não está drenando é particularmente problemático para um Transform, porque os fluxos Transform são pausados por padrão até que sejam canalizados ou um manipulador de eventos 'data' ou 'readable' seja adicionado.

Se os dados a serem escritos puderem ser gerados ou buscados sob demanda, recomenda-se encapsular a lógica em um Readable e usar stream.pipe(). No entanto, se chamar write() for preferível, é possível respeitar a contrapressão e evitar problemas de memória usando o evento 'drain':

js
function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb)
  } else {
    process.nextTick(cb)
  }
}

// Aguarde até que cb seja chamado antes de fazer qualquer outra gravação.
write('hello', () => {
  console.log('Gravação concluída, faça mais gravações agora.')
})

Um fluxo Writable no modo objeto sempre ignorará o argumento encoding.

Fluxos legíveis

Fluxos legíveis são uma abstração para uma fonte da qual os dados são consumidos.

Exemplos de fluxos Readable incluem:

Todos os fluxos Readable implementam a interface definida pela classe stream.Readable.

Dois modos de leitura

Os fluxos Readable operam efetivamente em um dos dois modos: fluxo e pausa. Esses modos são separados do modo objeto. Um fluxo Readable pode estar no modo objeto ou não, independentemente de estar no modo de fluxo ou no modo de pausa.

  • No modo de fluxo, os dados são lidos do sistema subjacente automaticamente e fornecidos a um aplicativo o mais rápido possível usando eventos por meio da interface EventEmitter.
  • No modo de pausa, o método stream.read() deve ser chamado explicitamente para ler blocos de dados do fluxo.

Todos os fluxos Readable começam no modo de pausa, mas podem ser alterados para o modo de fluxo de uma das seguintes maneiras:

O Readable pode alternar de volta para o modo de pausa usando um dos seguintes:

  • Se não houver destinos de pipe, chamando o método stream.pause().
  • Se houver destinos de pipe, removendo todos os destinos de pipe. Vários destinos de pipe podem ser removidos chamando o método stream.unpipe().

O conceito importante a ser lembrado é que um Readable não gerará dados até que um mecanismo para consumir ou ignorar esses dados seja fornecido. Se o mecanismo de consumo for desabilitado ou removido, o Readable tentará parar de gerar os dados.

Por razões de compatibilidade com versões anteriores, a remoção dos manipuladores de eventos 'data' não pausará automaticamente o fluxo. Além disso, se houver destinos de pipe, chamar stream.pause() não garantirá que o fluxo permanecerá pausado depois que esses destinos esvaiarem e solicitarem mais dados.

Se um Readable for alterado para o modo de fluxo e não houver consumidores disponíveis para lidar com os dados, esses dados serão perdidos. Isso pode ocorrer, por exemplo, quando o método readable.resume() é chamado sem um ouvinte anexado ao evento 'data', ou quando um manipulador de eventos 'data' é removido do fluxo.

Adicionar um manipulador de eventos 'readable' faz com que o fluxo pare automaticamente, e os dados devem ser consumidos via readable.read(). Se o manipulador de eventos 'readable' for removido, o fluxo começará novamente se houver um manipulador de eventos 'data'.

Três estados

Os "dois modos" de operação para um fluxo Readable são uma abstração simplificada para a gestão de estado interno mais complicada que ocorre na implementação do fluxo Readable.

Especificamente, em qualquer ponto no tempo, cada Readable está em um dos três estados possíveis:

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

Quando readable.readableFlowing é null, nenhum mecanismo para consumir os dados do fluxo é fornecido. Portanto, o fluxo não gerará dados. Enquanto estiver neste estado, anexar um listener para o evento 'data', chamar o método readable.pipe() ou chamar o método readable.resume() mudará readable.readableFlowing para true, fazendo com que o Readable comece a emitir eventos ativamente à medida que os dados são gerados.

Chamar readable.pause(), readable.unpipe() ou receber backpressure fará com que readable.readableFlowing seja definido como false, interrompendo temporariamente o fluxo de eventos, mas não interrompendo a geração de dados. Enquanto estiver neste estado, anexar um listener para o evento 'data' não mudará readable.readableFlowing para true.

js
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()

pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing is now false.

pass.on('data', chunk => {
  console.log(chunk.toString())
})
// readableFlowing is still false.
pass.write('ok') // Will not emit 'data'.
pass.resume() // Must be called to make stream emit 'data'.
// readableFlowing is now true.

Enquanto readable.readableFlowing for false, dados podem estar se acumulando no buffer interno do fluxo.

Escolha um estilo de API

A API do fluxo Readable evoluiu ao longo de várias versões do Node.js e fornece vários métodos de consumo de dados do fluxo. Em geral, os desenvolvedores devem escolher um dos métodos de consumo de dados e nunca devem usar vários métodos para consumir dados de um único fluxo. Especificamente, usar uma combinação de on('data'), on('readable'), pipe() ou iteradores assíncronos pode levar a um comportamento não intuitivo.

Classe: stream.Readable

Adicionado em: v0.9.4

Evento: 'close'

[Histórico]

VersãoAlterações
v10.0.0Adiciona a opção emitClose para especificar se 'close' é emitido em destroy.
v0.9.4Adicionada em: v0.9.4

O evento 'close' é emitido quando o fluxo e quaisquer recursos subjacentes (um descritor de arquivo, por exemplo) foram fechados. O evento indica que nenhum outro evento será emitido e nenhuma computação adicional ocorrerá.

Um fluxo Readable sempre emitirá o evento 'close' se for criado com a opção emitClose.

Evento: 'data'

Adicionado em: v0.9.4

  • chunk <Buffer> | <string> | <any> O bloco de dados. Para fluxos que não estão operando no modo objeto, o bloco será uma string ou Buffer. Para fluxos que estão no modo objeto, o bloco pode ser qualquer valor JavaScript diferente de null.

O evento 'data' é emitido sempre que o fluxo está renunciando à propriedade de um bloco de dados para um consumidor. Isso pode ocorrer sempre que o fluxo é comutado no modo de fluxo chamando readable.pipe(), readable.resume(), ou anexando um callback de ouvinte ao evento 'data'. O evento 'data' também será emitido sempre que o método readable.read() for chamado e um bloco de dados estiver disponível para ser retornado.

Anexar um ouvinte de evento 'data' a um fluxo que não foi explicitamente pausado mudará o fluxo para o modo de fluxo. Os dados serão então passados assim que estiverem disponíveis.

O callback do ouvinte receberá o bloco de dados como uma string se uma codificação padrão tiver sido especificada para o fluxo usando o método readable.setEncoding(); caso contrário, os dados serão passados como um Buffer.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Recebido ${chunk.length} bytes de dados.`)
})
Evento: 'end'

Adicionado em: v0.9.4

O evento 'end' é emitido quando não há mais dados para serem consumidos do fluxo.

O evento 'end' não será emitido a menos que os dados sejam completamente consumidos. Isso pode ser alcançado mudando o fluxo para o modo de fluxo, ou chamando stream.read() repetidamente até que todos os dados tenham sido consumidos.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Recebido ${chunk.length} bytes de dados.`)
})
readable.on('end', () => {
  console.log('Não haverá mais dados.')
})
Evento: 'error'

Adicionado em: v0.9.4

O evento 'error' pode ser emitido por uma implementação Readable a qualquer momento. Normalmente, isso pode ocorrer se o fluxo subjacente não conseguir gerar dados devido a uma falha interna subjacente, ou quando uma implementação de fluxo tenta enviar um bloco de dados inválido.

O callback do listener receberá um único objeto Error.

Evento: 'pause'

Adicionado em: v0.9.4

O evento 'pause' é emitido quando stream.pause() é chamado e readableFlowing não é false.

Evento: 'readable'

[Histórico]

VersãoAlterações
v10.0.0O 'readable' é sempre emitido no próximo tick após a chamada de .push().
v10.0.0Usar 'readable' requer chamar .read().
v0.9.4Adicionada em: v0.9.4

O evento 'readable' é emitido quando há dados disponíveis para serem lidos do fluxo, até a marca d'água alta configurada (state.highWaterMark). Efetivamente, indica que o fluxo tem novas informações dentro do buffer. Se os dados estiverem disponíveis dentro deste buffer, stream.read() pode ser chamado para recuperar esses dados. Além disso, o evento 'readable' também pode ser emitido quando o fim do fluxo for atingido.

js
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
  // Há alguns dados para ler agora.
  let data

  while ((data = this.read()) !== null) {
    console.log(data)
  }
})

Se o fim do fluxo for atingido, chamar stream.read() retornará null e acionará o evento 'end'. Isso também é verdadeiro se nunca houve dados a serem lidos. Por exemplo, no exemplo a seguir, foo.txt é um arquivo vazio:

js
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
  console.log('end')
})

A saída da execução deste script é:

bash
$ node test.js
readable: null
end

Em alguns casos, anexar um listener para o evento 'readable' fará com que uma certa quantidade de dados seja lida para um buffer interno.

Em geral, os mecanismos de evento readable.pipe() e 'data' são mais fáceis de entender do que o evento 'readable'. No entanto, o tratamento de 'readable' pode resultar em maior taxa de transferência.

Se ambos 'readable' e 'data' forem usados ao mesmo tempo, 'readable' terá precedência no controle do fluxo, ou seja, 'data' será emitido apenas quando stream.read() for chamado. A propriedade readableFlowing se tornará false. Se houver listeners 'data' quando 'readable' for removido, o fluxo começará a fluir, ou seja, os eventos 'data' serão emitidos sem chamar .resume().

Evento: 'resume'

Adicionado em: v0.9.4

O evento 'resume' é emitido quando stream.resume() é chamado e readableFlowing não é true.

readable.destroy([error])

[Histórico]

VersãoAlterações
v14.0.0Funciona como uma operação sem efeito colateral em um stream que já foi destruído.
v8.0.0Adicionada em: v8.0.0
  • error <Error> Erro que será passado como payload no evento 'error'
  • Retorna: <this>

Destrói o stream. Opcionalmente emite um evento 'error' e emite um evento 'close' (a menos que emitClose esteja definido como false). Após esta chamada, o stream legível liberará quaisquer recursos internos e chamadas subsequentes a push() serão ignoradas.

Assim que destroy() for chamado, quaisquer outras chamadas serão uma operação sem efeito colateral e nenhum outro erro, exceto de _destroy(), poderá ser emitido como 'error'.

Os implementadores não devem substituir este método, mas sim implementar readable._destroy().

readable.closed

Adicionado em: v18.0.0

É true depois que 'close' foi emitido.

readable.destroyed

Adicionado em: v8.0.0

É true depois que readable.destroy() foi chamado.

readable.isPaused()

Adicionado em: v0.11.14

O método readable.isPaused() retorna o estado operacional atual do Readable. Isso é usado principalmente pelo mecanismo que está por trás do método readable.pipe(). Na maioria dos casos típicos, não haverá razão para usar este método diretamente.

js
const readable = new stream.Readable()

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()

Adicionado em: v0.9.4

O método readable.pause() fará com que um fluxo no modo de fluxo pare de emitir eventos 'data', saindo do modo de fluxo. Quaisquer dados que se tornarem disponíveis permanecerão no buffer interno.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Recebido ${chunk.length} bytes de dados.`)
  readable.pause()
  console.log('Não haverá dados adicionais por 1 segundo.')
  setTimeout(() => {
    console.log('Agora os dados começarão a fluir novamente.')
    readable.resume()
  }, 1000)
})

O método readable.pause() não tem efeito se houver um ouvinte de eventos 'readable'.

readable.pipe(destination[, options])

Adicionado em: v0.9.4

O método readable.pipe() anexa um fluxo Writable ao readable, fazendo com que ele alterne automaticamente para o modo de fluxo e empurre todos os seus dados para o Writable anexado. O fluxo de dados será gerenciado automaticamente para que o fluxo de destino Writable não seja sobrecarregado por um fluxo Readable mais rápido.

O exemplo a seguir envia todos os dados do readable para um arquivo chamado file.txt:

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Todos os dados de readable vão para 'file.txt'.
readable.pipe(writable)

É possível anexar vários fluxos Writable a um único fluxo Readable.

O método readable.pipe() retorna uma referência ao fluxo destino, tornando possível configurar cadeias de fluxos conectados:

js
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)

Por padrão, stream.end() é chamado no fluxo de destino Writable quando o fluxo de origem Readable emite 'end', de modo que o destino não seja mais gravável. Para desabilitar esse comportamento padrão, a opção end pode ser passada como false, fazendo com que o fluxo de destino permaneça aberto:

js
reader.pipe(writer, { end: false })
reader.on('end', () => {
  writer.end('Goodbye\n')
})

Uma ressalva importante é que se o fluxo Readable emitir um erro durante o processamento, o destino Writable não é fechado automaticamente. Se ocorrer um erro, será necessário fechar manualmente cada fluxo para evitar vazamentos de memória.

Os fluxos Writable process.stderr e process.stdout nunca são fechados até que o processo Node.js termine, independentemente das opções especificadas.

readable.read([size])

Adicionado em: v0.9.4

O método readable.read() lê dados do buffer interno e os retorna. Se não houver dados disponíveis para leitura, null é retornado. Por padrão, os dados são retornados como um objeto Buffer, a menos que uma codificação tenha sido especificada usando o método readable.setEncoding() ou o stream esteja operando em modo objeto.

O argumento opcional size especifica um número específico de bytes a serem lidos. Se size bytes não estiverem disponíveis para leitura, null será retornado, a menos que o stream tenha terminado, caso em que todos os dados restantes no buffer interno serão retornados.

Se o argumento size não for especificado, todos os dados contidos no buffer interno serão retornados.

O argumento size deve ser menor ou igual a 1 GiB.

O método readable.read() só deve ser chamado em streams Readable operando em modo pausado. No modo de fluxo, readable.read() é chamado automaticamente até que o buffer interno esteja totalmente esvaziado.

js
const readable = getReadableStreamSomehow()

// 'readable' pode ser acionado várias vezes à medida que os dados são armazenados em buffer
readable.on('readable', () => {
  let chunk
  console.log('Stream é legível (novos dados recebidos no buffer)')
  // Use um loop para garantir que lemos todos os dados atualmente disponíveis
  while (null !== (chunk = readable.read())) {
    console.log(`Lidos ${chunk.length} bytes de dados...`)
  }
})

// 'end' será acionado uma vez quando não houver mais dados disponíveis
readable.on('end', () => {
  console.log('Chegou ao fim do stream.')
})

Cada chamada para readable.read() retorna um pedaço de dados ou null, significando que não há mais dados para ler naquele momento. Esses pedaços não são concatenados automaticamente. Como uma única chamada read() não retorna todos os dados, pode ser necessário usar um loop while para ler continuamente os pedaços até que todos os dados sejam recuperados. Ao ler um arquivo grande, .read() pode retornar null temporariamente, indicando que consumiu todo o conteúdo em buffer, mas pode haver mais dados ainda a serem armazenados em buffer. Nesses casos, um novo evento 'readable' é emitido assim que houver mais dados no buffer, e o evento 'end' significa o fim da transmissão de dados.

Portanto, para ler todo o conteúdo de um arquivo de um readable, é necessário coletar pedaços em vários eventos 'readable':

js
const chunks = []

readable.on('readable', () => {
  let chunk
  while (null !== (chunk = readable.read())) {
    chunks.push(chunk)
  }
})

readable.on('end', () => {
  const content = chunks.join('')
})

Um stream Readable em modo objeto sempre retornará um único item de uma chamada para readable.read(size), independentemente do valor do argumento size.

Se o método readable.read() retornar um pedaço de dados, um evento 'data' também será emitido.

Chamar stream.read([size]) depois que o evento 'end' foi emitido retornará null. Nenhum erro de tempo de execução será levantado.

readable.readable

Adicionado em: v11.4.0

É true se for seguro chamar readable.read(), o que significa que o stream não foi destruído ou emitiu 'error' ou 'end'.

readable.readableAborted

Adicionado em: v16.8.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

Retorna se o stream foi destruído ou apresentou erro antes de emitir 'end'.

readable.readableDidRead

Adicionado em: v16.7.0, v14.18.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

Retorna se 'data' foi emitido.

readable.readableEncoding

Adicionado em: v12.7.0

Getter para a propriedade encoding de um dado stream Readable. A propriedade encoding pode ser definida usando o método readable.setEncoding().

readable.readableEnded

Adicionado em: v12.9.0

Torna-se true quando o evento 'end' é emitido.

readable.errored

Adicionado em: v18.0.0

Retorna um erro se o stream foi destruído com um erro.

readable.readableFlowing

Adicionado em: v9.4.0

Esta propriedade reflete o estado atual de um stream Readable como descrito na seção Três estados.

readable.readableHighWaterMark

Adicionado em: v9.3.0

Retorna o valor de highWaterMark passado ao criar este Readable.

readable.readableLength

Adicionado em: v9.4.0

Esta propriedade contém o número de bytes (ou objetos) na fila prontos para serem lidos. O valor fornece dados de introspecção sobre o estado do highWaterMark.

readable.readableObjectMode

Adicionado em: v12.3.0

Getter para a propriedade objectMode de um dado fluxo Readable.

readable.resume()

[Histórico]

VersãoAlterações
v10.0.0O resume() não tem efeito se houver um ouvinte de evento 'readable'.
v0.9.4Adicionada em: v0.9.4

O método readable.resume() faz com que um fluxo Readable explicitamente pausado retome a emissão de eventos 'data', alternando o fluxo para o modo de fluxo.

O método readable.resume() pode ser usado para consumir completamente os dados de um fluxo sem realmente processar nenhum desses dados:

js
getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Alcançou o fim, mas não leu nada.')
  })

O método readable.resume() não tem efeito se houver um ouvinte de evento 'readable'.

readable.setEncoding(encoding)

Adicionado em: v0.9.4

O método readable.setEncoding() define a codificação de caracteres para os dados lidos do fluxo Readable.

Por padrão, nenhuma codificação é atribuída e os dados do fluxo serão retornados como objetos Buffer. Definir uma codificação faz com que os dados do fluxo sejam retornados como strings da codificação especificada em vez de como objetos Buffer. Por exemplo, chamar readable.setEncoding('utf8') fará com que os dados de saída sejam interpretados como dados UTF-8 e passados como strings. Chamar readable.setEncoding('hex') fará com que os dados sejam codificados no formato de string hexadecimal.

O fluxo Readable tratará adequadamente os caracteres de vários bytes entregues através do fluxo que, de outra forma, seriam decodificados incorretamente se simplesmente puxados do fluxo como objetos Buffer.

js
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
  assert.equal(typeof chunk, 'string')
  console.log('Obteve %d caracteres de dados de string:', chunk.length)
})
readable.unpipe([destination])

Adicionado em: v0.9.4

O método readable.unpipe() destaca um stream Writable previamente anexado usando o método stream.pipe().

Se o destination não for especificado, então todos os pipes serão destacados.

Se o destination for especificado, mas nenhum pipe for configurado para ele, então o método não faz nada.

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Todos os dados de readable vão para 'file.txt',
// mas apenas pelo primeiro segundo.
readable.pipe(writable)
setTimeout(() => {
  console.log('Parar de escrever em file.txt.')
  readable.unpipe(writable)
  console.log('Fechar manualmente o stream de arquivo.')
  writable.end()
}, 1000)
readable.unshift(chunk[, encoding])

[Histórico]

VersãoAlterações
v22.0.0, v20.13.0O argumento chunk agora pode ser uma instância de TypedArray ou DataView.
v8.0.0O argumento chunk agora pode ser uma instância de Uint8Array.
v0.9.11Adicionada em: v0.9.11

Passando chunk como null sinaliza o fim do stream (EOF) e se comporta da mesma forma que readable.push(null), após o qual nenhum dado poderá mais ser escrito. O sinal EOF é colocado no final do buffer e quaisquer dados em buffer ainda serão liberados.

O método readable.unshift() insere um fragmento de dados de volta no buffer interno. Isso é útil em certas situações em que um stream está sendo consumido por código que precisa "des-consumir" uma certa quantidade de dados que ele retirou otimisticamente da fonte, para que os dados possam ser repassados a outra parte.

O método stream.unshift(chunk) não pode ser chamado após o evento 'end' ter sido emitido ou um erro de tempo de execução será lançado.

Desenvolvedores que usam stream.unshift() geralmente devem considerar a mudança para o uso de um stream Transform em vez disso. Consulte a seção API para implementadores de stream para obter mais informações.

js
// Extrair um cabeçalho delimitado por \n\n.
// Usar unshift() se recebermos muito.
// Chamar o callback com (erro, cabeçalho, stream).
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
  stream.on('error', callback)
  stream.on('readable', onReadable)
  const decoder = new StringDecoder('utf8')
  let header = ''
  function onReadable() {
    let chunk
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk)
      if (str.includes('\n\n')) {
        // Encontrou o limite do cabeçalho.
        const split = str.split(/\n\n/)
        header += split.shift()
        const remaining = split.join('\n\n')
        const buf = Buffer.from(remaining, 'utf8')
        stream.removeListener('error', callback)
        // Remover o listener 'readable' antes de unshift.
        stream.removeListener('readable', onReadable)
        if (buf.length) stream.unshift(buf)
        // Agora o corpo da mensagem pode ser lido do stream.
        callback(null, header, stream)
        return
      }
      // Ainda lendo o cabeçalho.
      header += str
    }
  }
}

Ao contrário de stream.push(chunk), stream.unshift(chunk) não terminará o processo de leitura redefinindo o estado interno de leitura do stream. Isso pode causar resultados inesperados se readable.unshift() for chamado durante uma leitura (ou seja, de dentro de uma implementação stream._read() em um stream personalizado). Seguir a chamada para readable.unshift() com um stream.push('') imediato redefinirá o estado de leitura adequadamente, no entanto, é melhor simplesmente evitar chamar readable.unshift() enquanto estiver no processo de realizar uma leitura.

readable.wrap(stream)

Adicionado em: v0.9.4

Antes do Node.js 0.10, os streams não implementavam toda a API do módulo node:stream como atualmente definida. (Consulte Compatibilidade para obter mais informações.)

Ao usar uma biblioteca Node.js mais antiga que emite eventos 'data' e possui um método stream.pause() que é apenas consultivo, o método readable.wrap() pode ser usado para criar um stream Readable que usa o stream antigo como sua fonte de dados.

Raramente será necessário usar readable.wrap(), mas o método foi fornecido como uma conveniência para interagir com aplicativos e bibliotecas Node.js mais antigos.

js
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)

myReader.on('readable', () => {
  myReader.read() // etc.
})
readable[Symbol.asyncIterator]()

[Histórico]

VersãoAlterações
v11.14.0O suporte a Symbol.asyncIterator não é mais experimental.
v10.0.0Adicionada em: v10.0.0
js
const fs = require('node:fs')

async function print(readable) {
  readable.setEncoding('utf8')
  let data = ''
  for await (const chunk of readable) {
    data += chunk
  }
  console.log(data)
}

print(fs.createReadStream('file')).catch(console.error)

Se o loop terminar com um break, return ou um throw, o stream será destruído. Em outras palavras, iterar sobre um stream consumirá o stream completamente. O stream será lido em chunks de tamanho igual à opção highWaterMark. No exemplo de código acima, os dados estarão em um único chunk se o arquivo tiver menos de 64 KiB de dados, pois nenhuma opção highWaterMark é fornecida para fs.createReadStream().

readable[Symbol.asyncDispose]()

Adicionado em: v20.4.0, v18.18.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

Chama readable.destroy() com um AbortError e retorna uma promessa que se cumpre quando o stream termina.

readable.compose(stream[, options])

Adicionado em: v19.1.0, v18.13.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

js
import { Readable } from 'node:stream'

async function* splitToWords(source) {
  for await (const chunk of source) {
    const words = String(chunk).split(' ')

    for (const word of words) {
      yield word
    }
  }
}

const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()

console.log(words) // imprime ['this', 'is', 'compose', 'as', 'operator']

Veja stream.compose para mais informações.

readable.iterator([options])

Adicionado em: v16.3.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

  • options <Object>

    • destroyOnReturn <boolean> Quando definido como false, chamar return no iterador assíncrono, ou sair de uma iteração for await...of usando break, return ou throw não destruirá o stream. Padrão: true.
  • Retorna: <AsyncIterator> para consumir o stream.

O iterador criado por este método dá aos usuários a opção de cancelar a destruição do stream se o loop for await...of for interrompido por return, break ou throw, ou se o iterador deve destruir o stream se o stream emitiu um erro durante a iteração.

js
const { Readable } = require('node:stream')

async function printIterator(readable) {
  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk) // 1
    break
  }

  console.log(readable.destroyed) // false

  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk) // Imprimirá 2 e depois 3
  }

  console.log(readable.destroyed) // True, stream foi totalmente consumido
}

async function printSymbolAsyncIterator(readable) {
  for await (const chunk of readable) {
    console.log(chunk) // 1
    break
  }

  console.log(readable.destroyed) // true
}

async function showBoth() {
  await printIterator(Readable.from([1, 2, 3]))
  await printSymbolAsyncIterator(Readable.from([1, 2, 3]))
}

showBoth()
readable.map(fn[, options])

[Histórico]

VersãoAlterações
v20.7.0, v18.19.0adicionado highWaterMark nas opções.
v17.4.0, v16.14.0Adicionado em: v17.4.0, v16.14.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

  • fn <Função> | <AsyncFunction> uma função para mapear cada chunk no stream.

    • data <qualquer> um chunk de dados do stream.
    • options <Objeto>
    • signal <AbortSignal> abortado se o stream for destruído, permitindo abortar a chamada fn antecipadamente.
  • options <Objeto>

    • concurrency <número> a invocação concorrente máxima de fn a ser chamada no stream de uma vez. Padrão: 1.
    • highWaterMark <número> quantos itens armazenar em buffer enquanto aguarda o consumo pelo usuário dos itens mapeados. Padrão: concurrency * 2 - 1.
    • signal <AbortSignal> permite destruir o stream se o sinal for abortado.
  • Retorna: <Readable> um stream mapeado com a função fn.

Este método permite mapear o stream. A função fn será chamada para cada chunk no stream. Se a função fn retornar uma promise, essa promise será awaited antes de ser passada para o stream de resultado.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// Com um mapeador síncrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).map(x => x * 2)) {
  console.log(chunk) // 2, 4, 6, 8
}
// Com um mapeador assíncrono, fazendo no máximo 2 consultas por vez.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
  domain => resolver.resolve4(domain),
  { concurrency: 2 }
)
for await (const result of dnsResults) {
  console.log(result) // Registra o resultado DNS de resolver.resolve4.
}
readable.filter(fn[, options])

[Histórico]

VersãoAlterações
v20.7.0, v18.19.0adicionou highWaterMark nas opções.
v17.4.0, v16.14.0Adicionada em: v17.4.0, v16.14.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

  • fn <Function> | <AsyncFunction> uma função para filtrar chunks do stream.

    • data <any> um chunk de dados do stream.
    • options <Object>
    • signal <AbortSignal> aborta se o stream for destruído, permitindo abortar a chamada fn antecipadamente.
  • options <Object>

    • concurrency <number> a invocação concorrente máxima de fn a ser chamada no stream de uma só vez. Padrão: 1.
    • highWaterMark <number> quantos itens bufferizar enquanto aguarda o consumo pelo usuário dos itens filtrados. Padrão: concurrency * 2 - 1.
    • signal <AbortSignal> permite destruir o stream se o sinal for abortado.
  • Retorna: <Readable> um stream filtrado com o predicado fn.

Este método permite filtrar o stream. Para cada chunk no stream, a função fn será chamada e, se retornar um valor truthy, o chunk será passado para o stream de resultado. Se a função fn retornar uma promise, essa promise será awaited.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// Com um predicado síncrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
  console.log(chunk) // 3, 4
}
// Com um predicado assíncrono, fazendo no máximo 2 consultas por vez.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).filter(
  async domain => {
    const { address } = await resolver.resolve4(domain, { ttl: true })
    return address.ttl > 60
  },
  { concurrency: 2 }
)
for await (const result of dnsResults) {
  // Registra domínios com mais de 60 segundos no registo DNS resolvido.
  console.log(result)
}
readable.forEach(fn[, options])

Adicionado em: v17.5.0, v16.15.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

  • fn <Function> | <AsyncFunction> uma função a ser chamada em cada pedaço do stream.

    • data <any> um pedaço de dados do stream.
    • options <Object>
    • signal <AbortSignal> abortado se o stream for destruído, permitindo abortar a chamada fn antecipadamente.
  • options <Object>

    • concurrency <number> a invocação concorrente máxima de fn a ser chamada no stream de uma vez. Padrão: 1.
    • signal <AbortSignal> permite destruir o stream se o sinal for abortado.
  • Retorna: <Promise> uma promise para quando o stream tiver terminado.

Este método permite iterar um stream. Para cada pedaço no stream, a função fn será chamada. Se a função fn retornar uma promise, essa promise será awaited.

Este método é diferente dos loops for await...of no sentido de que ele pode opcionalmente processar pedaços concorrentemente. Além disso, uma iteração forEach só pode ser interrompida passando uma opção signal e abortando o AbortController relacionado, enquanto for await...of pode ser interrompido com break ou return. Em qualquer caso, o stream será destruído.

Este método é diferente de ouvir o evento 'data' porque usa o evento readable na maquinaria subjacente e pode limitar o número de chamadas concorrentes de fn.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// Com um predicado síncrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
  console.log(chunk) // 3, 4
}
// Com um predicado assíncrono, fazendo no máximo 2 consultas por vez.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
  async domain => {
    const { address } = await resolver.resolve4(domain, { ttl: true })
    return address
  },
  { concurrency: 2 }
)
await dnsResults.forEach(result => {
  // Registra o resultado, semelhante a `for await (const result of dnsResults)`
  console.log(result)
})
console.log('done') // O stream terminou
readable.toArray([options])

Adicionado em: v17.5.0, v16.15.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

  • options <Object>

    • signal <AbortSignal> permite cancelar a operação toArray se o sinal for abortado.
  • Retorna: <Promise> uma promessa contendo um array com o conteúdo do stream.

Este método permite obter facilmente o conteúdo de um stream.

Como este método lê todo o stream na memória, ele nega os benefícios dos streams. Ele é destinado à interoperabilidade e conveniência, não como a maneira principal de consumir streams.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

await Readable.from([1, 2, 3, 4]).toArray() // [1, 2, 3, 4]

// Faz consultas dns concorrentemente usando .map e coleta
// os resultados em um array usando toArray
const dnsResults = await Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org'])
  .map(
    async domain => {
      const { address } = await resolver.resolve4(domain, { ttl: true })
      return address
    },
    { concurrency: 2 }
  )
  .toArray()
readable.some(fn[, options])

Adicionado em: v17.5.0, v16.15.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

  • fn <Function> | <AsyncFunction> uma função para chamar em cada chunk do stream.

    • data <any> um chunk de dados do stream.
    • options <Object>
    • signal <AbortSignal> abortado se o stream for destruído, permitindo abortar a chamada fn antecipadamente.
  • options <Object>

    • concurrency <number> a invocação concorrente máxima de fn para chamar no stream de uma vez. Padrão: 1.
    • signal <AbortSignal> permite destruir o stream se o sinal for abortado.
  • Retorna: <Promise> uma promessa que resulta em true se fn retornou um valor truthy para pelo menos um dos chunks.

Este método é semelhante ao Array.prototype.some e chama fn em cada chunk do stream até que o valor retornado aguardado seja true (ou qualquer valor truthy). Assim que uma chamada fn em um chunk tiver um valor de retorno aguardado truthy, o stream é destruído e a promessa é cumprida com true. Se nenhuma das chamadas fn nos chunks retornar um valor truthy, a promessa é cumprida com false.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// Com um predicado síncrono.
await Readable.from([1, 2, 3, 4]).some(x => x > 2) // true
await Readable.from([1, 2, 3, 4]).some(x => x < 0) // false

// Com um predicado assíncrono, fazendo no máximo 2 verificações de arquivo por vez.
const anyBigFile = await Readable.from(['file1', 'file2', 'file3']).some(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
console.log(anyBigFile) // `true` se qualquer arquivo na lista for maior que 1MB
console.log('done') // O Stream terminou
readable.find(fn[, options])

Adicionado em: v17.5.0, v16.17.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

  • fn <Function> | <AsyncFunction> uma função a ser chamada em cada parte do fluxo.

    • data <any> uma parte de dados do fluxo.
    • options <Object>
    • signal <AbortSignal> aborta se o fluxo for destruído, permitindo abortar a chamada de fn antecipadamente.
  • options <Object>

    • concurrency <number> a invocação simultânea máxima de fn a ser chamada no fluxo de uma vez. Padrão: 1.
    • signal <AbortSignal> permite destruir o fluxo se o sinal for abortado.
  • Retorna: <Promise> uma promise que avalia para a primeira parte para a qual fn avaliou com um valor verdadeiro, ou undefined se nenhum elemento foi encontrado.

Este método é semelhante a Array.prototype.find e chama fn em cada parte do fluxo para encontrar uma parte com um valor verdadeiro para fn. Assim que o valor retornado aguardado de uma chamada fn for verdadeiro, o fluxo é destruído e a promise é cumprida com o valor para o qual fn retornou um valor verdadeiro. Se todas as chamadas fn nas partes retornarem um valor falso, a promise é cumprida com undefined.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// Com um predicado síncrono.
await Readable.from([1, 2, 3, 4]).find(x => x > 2) // 3
await Readable.from([1, 2, 3, 4]).find(x => x > 0) // 1
await Readable.from([1, 2, 3, 4]).find(x => x > 10) // undefined

// Com um predicado assíncrono, fazendo no máximo 2 verificações de arquivo por vez.
const foundBigFile = await Readable.from(['file1', 'file2', 'file3']).find(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
console.log(foundBigFile) // Nome do arquivo grande, se algum arquivo na lista for maior que 1MB
console.log('done') // O fluxo terminou
readable.every(fn[, options])

Adicionado em: v17.5.0, v16.15.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

  • fn <Function> | <AsyncFunction> uma função para chamar em cada pedaço do fluxo.

    • data <any> um pedaço de dados do fluxo.
    • options <Object>
    • signal <AbortSignal> abortado se o fluxo for destruído, permitindo abortar a chamada fn antecipadamente.
  • options <Object>

    • concurrency <number> a invocação concorrente máxima de fn para chamar no fluxo de uma vez. Padrão: 1.
    • signal <AbortSignal> permite destruir o fluxo se o sinal for abortado.
  • Retorna: <Promise> uma promise que resulta em true se fn retornar um valor truthy para todos os pedaços.

Este método é semelhante a Array.prototype.every e chama fn em cada pedaço do fluxo para verificar se todos os valores de retorno aguardados são valores truthy para fn. Assim que uma chamada fn em um pedaço de valor de retorno aguardado for falsy, o fluxo é destruído e a promise é cumprida com false. Se todas as chamadas fn nos pedaços retornarem um valor truthy, a promise é cumprida com true.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// Com um predicado síncrono.
await Readable.from([1, 2, 3, 4]).every(x => x > 2) // false
await Readable.from([1, 2, 3, 4]).every(x => x > 0) // true

// Com um predicado assíncrono, fazendo no máximo 2 verificações de arquivo por vez.
const allBigFiles = await Readable.from(['file1', 'file2', 'file3']).every(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
// `true` se todos os arquivos na lista forem maiores que 1MiB
console.log(allBigFiles)
console.log('done') // O fluxo terminou
readable.flatMap(fn[, options])

Adicionado em: v17.5.0, v16.15.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

Este método retorna um novo stream aplicando o callback dado a cada chunk do stream e então achatando o resultado.

É possível retornar um stream ou outro iterável ou iterável assíncrono de fn e os streams resultantes serão mesclados (achatados) no stream retornado.

js
import { Readable } from 'node:stream'
import { createReadStream } from 'node:fs'

// Com um mapeador síncrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap(x => [x, x])) {
  console.log(chunk) // 1, 1, 2, 2, 3, 3, 4, 4
}
// Com um mapeador assíncrono, combine o conteúdo de 4 arquivos
const concatResult = Readable.from(['./1.mjs', './2.mjs', './3.mjs', './4.mjs']).flatMap(fileName =>
  createReadStream(fileName)
)
for await (const result of concatResult) {
  // Isto conterá o conteúdo (todos os chunks) de todos os 4 arquivos
  console.log(result)
}
readable.drop(limit[, options])

Adicionado em: v17.5.0, v16.15.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

  • limit <number> o número de chunks para descartar do readable.

  • options <Object>

    • signal <AbortSignal> permite destruir o stream se o sinal for abortado.
  • Retorna: <Readable> um stream com limit chunks descartados.

Este método retorna um novo stream com os primeiros limit chunks descartados.

js
import { Readable } from 'node:stream'

await Readable.from([1, 2, 3, 4]).drop(2).toArray() // [3, 4]
readable.take(limit[, options])

Adicionado em: v17.5.0, v16.15.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

  • limit <number> o número de chunks para pegar do readable.

  • options <Object>

    • signal <AbortSignal> permite destruir o stream se o sinal for abortado.
  • Retorna: <Readable> um stream com limit chunks pegos.

Este método retorna um novo stream com os primeiros limit chunks.

js
import { Readable } from 'node:stream'

await Readable.from([1, 2, 3, 4]).take(2).toArray() // [1, 2]
readable.reduce(fn[, initial[, options]])

Adicionado em: v17.5.0, v16.15.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

  • fn <Function> | <AsyncFunction> uma função redutora para chamar em cada chunk no stream.

    • previous <any> o valor obtido da última chamada para fn ou o valor initial se especificado ou o primeiro chunk do stream caso contrário.
    • data <any> um chunk de dados do stream.
    • options <Object>
    • signal <AbortSignal> abortado se o stream for destruído, permitindo abortar a chamada fn antecipadamente.
  • initial <any> o valor inicial a ser usado na redução.

  • options <Object>

    • signal <AbortSignal> permite destruir o stream se o sinal for abortado.
  • Retorna: <Promise> uma promise para o valor final da redução.

Este método chama fn em cada chunk do stream em ordem, passando-lhe o resultado do cálculo no elemento anterior. Ele retorna uma promise para o valor final da redução.

Se nenhum valor initial for fornecido, o primeiro chunk do stream será usado como valor inicial. Se o stream estiver vazio, a promise será rejeitada com um TypeError com a propriedade de código ERR_INVALID_ARGS.

js
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'

const directoryPath = './src'
const filesInDir = await readdir(directoryPath)

const folderSize = await Readable.from(filesInDir).reduce(async (totalSize, file) => {
  const { size } = await stat(join(directoryPath, file))
  return totalSize + size
}, 0)

console.log(folderSize)

A função redutora itera o elemento do stream elemento por elemento, o que significa que não há parâmetro concurrency ou paralelismo. Para executar uma redução concorrentemente, você pode extrair a função assíncrona para o método readable.map.

js
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'

const directoryPath = './src'
const filesInDir = await readdir(directoryPath)

const folderSize = await Readable.from(filesInDir)
  .map(file => stat(join(directoryPath, file)), { concurrency: 2 })
  .reduce((totalSize, { size }) => totalSize + size, 0)

console.log(folderSize)

Fluxos Duplex e de transformação

Classe: stream.Duplex

[Histórico]

VersãoAlterações
v6.8.0Instâncias de Duplex agora retornam true ao verificar instanceof stream.Writable.
v0.9.4Adicionado em: v0.9.4

Fluxos Duplex são fluxos que implementam ambas as interfaces Readable e Writable.

Exemplos de fluxos Duplex incluem:

duplex.allowHalfOpen

Adicionado em: v0.9.4

Se false, o fluxo terminará automaticamente o lado gravável quando o lado legível terminar. Definido inicialmente pela opção do construtor allowHalfOpen, que é true por padrão.

Isso pode ser alterado manualmente para alterar o comportamento de meio aberto de uma instância de fluxo Duplex existente, mas deve ser alterado antes do evento 'end' ser emitido.

Classe: stream.Transform

Adicionado em: v0.9.4

Fluxos de transformação são fluxos Duplex onde a saída está de alguma forma relacionada à entrada. Como todos os fluxos Duplex, os fluxos Transform implementam ambas as interfaces Readable e Writable.

Exemplos de fluxos Transform incluem:

transform.destroy([error])

[Histórico]

VersãoAlterações
v14.0.0Funciona como uma operação sem efeito em um fluxo que já foi destruído.
v8.0.0Adicionado em: v8.0.0

Destrói o fluxo e, opcionalmente, emite um evento 'error'. Após esta chamada, o fluxo de transformação liberará quaisquer recursos internos. Os implementadores não devem substituir este método, mas sim implementar readable._destroy(). A implementação padrão de _destroy() para Transform também emite 'close' a menos que emitClose seja definido como falso.

Uma vez que destroy() foi chamado, quaisquer outras chamadas serão uma operação sem efeito e nenhum outro erro, exceto de _destroy(), pode ser emitido como 'error'.

stream.duplexPair([options])

Adicionado em: v22.6.0, v20.17.0

  • options <Object> Um valor para passar para ambos os construtores Duplex, para definir opções como buffer.
  • Retorna: <Array> de duas instâncias Duplex.

A função utilitária duplexPair retorna um Array com dois itens, cada um sendo um fluxo Duplex conectado ao outro lado:

js
const [sideA, sideB] = duplexPair()

Tudo o que é escrito em um fluxo fica legível no outro. Ele fornece um comportamento análogo a uma conexão de rede, onde os dados escritos pelo cliente se tornam legíveis pelo servidor, e vice-versa.

Os fluxos Duplex são simétricos; um ou outro pode ser usado sem nenhuma diferença de comportamento.

stream.finished(stream[, options], callback)

[Histórico]

VersãoAlterações
v19.5.0Adicionou suporte para ReadableStream e WritableStream.
v15.11.0A opção signal foi adicionada.
v14.0.0O finished(stream, cb) aguardará o evento 'close' antes de invocar o callback. A implementação tenta detectar fluxos legados e aplica este comportamento apenas a fluxos que se espera que emitam 'close'.
v14.0.0Emitir 'close' antes de 'end' em um fluxo Readable causará um erro ERR_STREAM_PREMATURE_CLOSE.
v14.0.0O callback será invocado em fluxos que já terminaram antes da chamada para finished(stream, cb).
v10.0.0Adicionada em: v10.0.0
  • stream <Stream> | <ReadableStream> | <WritableStream> Um fluxo/webstream legível e/ou gravável.

  • options <Object>

    • error <boolean> Se definido como false, uma chamada para emit('error', err) não é tratada como finalizada. Padrão: true.
    • readable <boolean> Quando definido como false, o callback será chamado quando o fluxo terminar, mesmo que o fluxo ainda possa ser legível. Padrão: true.
    • writable <boolean> Quando definido como false, o callback será chamado quando o fluxo terminar, mesmo que o fluxo ainda possa ser gravável. Padrão: true.
    • signal <AbortSignal> permite abortar a espera pelo término do fluxo. O fluxo subjacente não será abortado se o sinal for abortado. O callback será chamado com um AbortError. Todos os ouvintes registrados adicionados por esta função também serão removidos.
  • callback <Function> Uma função de callback que recebe um argumento de erro opcional.

  • Retorna: <Function> Uma função de limpeza que remove todos os ouvintes registrados.

Uma função para ser notificada quando um fluxo não for mais legível, gravável ou tiver experimentado um erro ou um evento de fechamento prematuro.

js
const { finished } = require('node:stream')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

finished(rs, err => {
  if (err) {
    console.error('Stream falhou.', err)
  } else {
    console.log('Stream terminou a leitura.')
  }
})

rs.resume() // Drene o fluxo.

Especialmente útil em cenários de tratamento de erros em que um fluxo é destruído prematuramente (como uma solicitação HTTP abortada) e não emitirá 'end' ou 'finish'.

A API finished fornece versão promise.

stream.finished() deixa ouvintes de eventos pendentes (em particular 'error', 'end', 'finish' e 'close') após a invocação do callback. A razão para isso é que eventos 'error' inesperados (devido a implementações de fluxo incorretas) não causam travamentos inesperados. Se este for um comportamento indesejado, a função de limpeza retornada precisa ser invocada no callback:

js
const cleanup = finished(rs, err => {
  cleanup()
  // ...
})

stream.pipeline(source[, ...transforms], destination, callback)

stream.pipeline(streams, callback)

[Histórico]

VersãoAlterações
v19.7.0, v18.16.0Adicionou suporte para webstreams.
v18.0.0Passar um callback inválido para o argumento callback agora lança ERR_INVALID_ARG_TYPE em vez de ERR_INVALID_CALLBACK.
v14.0.0O pipeline(..., cb) aguardará o evento 'close' antes de invocar o callback. A implementação tenta detectar streams legadas e aplica este comportamento apenas a streams que se espera que emitam 'close'.
v13.10.0Adicionou suporte para geradores assíncronos.
v10.0.0Adicionado em: v10.0.0

Um método de módulo para conectar streams e geradores, encaminhando erros e limpando adequadamente, e fornecendo um callback quando o pipeline estiver completo.

js
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')

// Use a API pipeline para conectar facilmente uma série de streams
// juntas e receber notificação quando o pipeline estiver totalmente concluído.

// Um pipeline para compactar um arquivo tar potencialmente enorme de forma eficiente:

pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
  if (err) {
    console.error('Pipeline falhou.', err)
  } else {
    console.log('Pipeline concluído com sucesso.')
  }
})

A API pipeline fornece uma versão promise.

stream.pipeline() chamará stream.destroy(err) em todas as streams, exceto:

  • Streams Readable que emitiram 'end' ou 'close'.
  • Streams Writable que emitiram 'finish' ou 'close'.

stream.pipeline() deixa ouvintes de eventos pendentes nas streams após a invocação do callback. No caso de reutilização de streams após falha, isso pode causar vazamentos de ouvintes de eventos e erros ignorados. Se a última stream for legível, os ouvintes de eventos pendentes serão removidos para que a última stream possa ser consumida posteriormente.

stream.pipeline() fecha todas as streams quando um erro é gerado. O uso de IncomingRequest com pipeline pode levar a um comportamento inesperado, pois destrói o socket sem enviar a resposta esperada. Veja o exemplo abaixo:

js
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')

const server = http.createServer((req, res) => {
  const fileStream = fs.createReadStream('./fileNotExist.txt')
  pipeline(fileStream, res, err => {
    if (err) {
      console.log(err) // Arquivo não encontrado
      // esta mensagem não pode ser enviada uma vez que `pipeline` já destruiu o socket
      return res.end('error!!!')
    }
  })
})

stream.compose(...streams)

[Histórico]

VersãoAlterações
v21.1.0, v20.10.0Adicionou suporte para a classe stream.
v19.8.0, v18.16.0Adicionou suporte para webstreams.
v16.9.0Adicionada em: v16.9.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - stream.compose é experimental.

Combina dois ou mais streams em um stream Duplex que escreve para o primeiro stream e lê do último. Cada stream fornecido é canalizado para o próximo, usando stream.pipeline. Se algum dos streams apresentar erro, todos são destruídos, incluindo o stream Duplex externo.

Como stream.compose retorna um novo stream que por sua vez pode (e deve) ser canalizado para outros streams, ele permite a composição. Em contraste, ao passar streams para stream.pipeline, tipicamente o primeiro stream é um stream legível e o último um stream gravável, formando um circuito fechado.

Se uma Function for passada, ela deve ser um método de fábrica que recebe um source Iterable.

js
import { compose, Transform } from 'node:stream'

const removeSpaces = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''))
  },
})

async function* toUpper(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
}

let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
  res += buf
}

console.log(res) // imprime 'HELLOWORLD'

stream.compose pode ser usado para converter iteráveis assíncronos, geradores e funções em streams.

  • AsyncIterable converte em um Duplex legível. Não pode produzir null.
  • AsyncGeneratorFunction converte em um Duplex de transformação legível/gravável. Deve receber um AsyncIterable de origem como primeiro parâmetro. Não pode produzir null.
  • AsyncFunction converte em um Duplex gravável. Deve retornar null ou undefined.
js
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'

// Converter AsyncIterable em Duplex legível.
const s1 = compose(
  (async function* () {
    yield 'Hello'
    yield 'World'
  })()
)

// Converter AsyncGenerator em Duplex de transformação.
const s2 = compose(async function* (source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
})

let res = ''

// Converter AsyncFunction em Duplex gravável.
const s3 = compose(async function (source) {
  for await (const chunk of source) {
    res += chunk
  }
})

await finished(compose(s1, s2, s3))

console.log(res) // imprime 'HELLOWORLD'

Veja readable.compose(stream) para stream.compose como operador.

stream.Readable.from(iterable[, options])

Adicionado em: v12.3.0, v10.17.0

  • iterable <Iterable> Objeto que implementa o protocolo iterável Symbol.asyncIterator ou Symbol.iterator. Emite um evento 'error' se um valor nulo for passado.
  • options <Object> Opções fornecidas para new stream.Readable([options]). Por padrão, Readable.from() definirá options.objectMode como true, a menos que isso seja explicitamente desabilitado definindo options.objectMode como false.
  • Retorna: <stream.Readable>

Um método utilitário para criar streams legíveis a partir de iteradores.

js
const { Readable } = require('node:stream')

async function* generate() {
  yield 'hello'
  yield 'streams'
}

const readable = Readable.from(generate())

readable.on('data', chunk => {
  console.log(chunk)
})

Chamar Readable.from(string) ou Readable.from(buffer) não fará com que as strings ou buffers sejam iterados para corresponder à semântica de outros streams por razões de desempenho.

Se um objeto Iterable contendo promises for passado como argumento, pode resultar em uma rejeição não tratada.

js
const { Readable } = require('node:stream')

Readable.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Rejeição não tratada
])

stream.Readable.fromWeb(readableStream[, options])

Adicionado em: v17.0.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

stream.Readable.isDisturbed(stream)

Adicionado em: v16.8.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

Retorna se o stream foi lido ou cancelado.

stream.isErrored(stream)

Adicionado em: v17.3.0, v16.14.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

Retorna se o stream encontrou um erro.

stream.isReadable(stream)

Adicionado em: v17.4.0, v16.14.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

Retorna se o stream é legível.

stream.Readable.toWeb(streamReadable[, options])

Adicionado em: v17.0.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

  • streamReadable <stream.Readable>

  • options <Object>

    • strategy <Object>
    • highWaterMark <number> O tamanho máximo da fila interna (do ReadableStream criado) antes que a contrapressão seja aplicada na leitura do stream.Readable fornecido. Se nenhum valor for fornecido, ele será retirado do stream.Readable fornecido.
    • size <Function> Uma função que define o tamanho do chunk de dados fornecido. Se nenhum valor for fornecido, o tamanho será 1 para todos os chunks.
    • chunk <any>
    • Retorna: <number>
  • Retorna: <ReadableStream>

stream.Writable.fromWeb(writableStream[, options])

Adicionado em: v17.0.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

stream.Writable.toWeb(streamWritable)

Adicionado em: v17.0.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

stream.Duplex.from(src)

[Histórico]

VersãoAlterações
v19.5.0, v18.17.0O argumento src agora pode ser um ReadableStream ou WritableStream.
v16.8.0Adicionada em: v16.8.0

Um método utilitário para criar fluxos duplex.

  • Stream converte o fluxo gravável em Duplex gravável e o fluxo legível em Duplex.
  • Blob converte em Duplex legível.
  • string converte em Duplex legível.
  • ArrayBuffer converte em Duplex legível.
  • AsyncIterable converte em um Duplex legível. Não pode gerar null.
  • AsyncGeneratorFunction converte em um Duplex de transformação legível/gravável. Deve receber um AsyncIterable de origem como primeiro parâmetro. Não pode gerar null.
  • AsyncFunction converte em um Duplex gravável. Deve retornar null ou undefined.
  • Object ({ writable, readable }) converte readable e writable em Stream e, em seguida, combina-os em Duplex, onde o Duplex gravará no writable e lerá do readable.
  • Promise converte em Duplex legível. O valor null é ignorado.
  • ReadableStream converte em Duplex legível.
  • WritableStream converte em Duplex gravável.
  • Retorna: <stream.Duplex>

Se um objeto Iterable contendo promises for passado como argumento, pode resultar em uma rejeição não tratada.

js
const { Duplex } = require('node:stream')

Duplex.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Rejeição não tratada
])

stream.Duplex.fromWeb(pair[, options])

Adicionado em: v17.0.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

js
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')

for await (const chunk of duplex) {
  console.log('readable', chunk)
}
js
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))

stream.Duplex.toWeb(streamDuplex)

Adicionado em: v17.0.0

[Estável: 1 - Experimental]

Estável: 1 Estabilidade: 1 - Experimental

js
import { Duplex } from 'node:stream'

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

const { value } = await readable.getReader().read()
console.log('readable', value)
js
const { Duplex } = require('node:stream')

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

readable
  .getReader()
  .read()
  .then(result => {
    console.log('readable', result.value)
  })

stream.addAbortSignal(signal, stream)

[Histórico]

VersãoAlterações
v19.7.0, v18.16.0Suporte adicionado para ReadableStream e WritableStream.
v15.4.0Adicionada em: v15.4.0

Anexa um AbortSignal a um fluxo legível ou gravável. Isso permite que o código controle a destruição do fluxo usando um AbortController.

Chamar abort no AbortController correspondente ao AbortSignal passado se comportará da mesma maneira que chamar .destroy(new AbortError()) no fluxo, e controller.error(new AbortError()) para fluxos web.

js
const fs = require('node:fs')

const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// Mais tarde, interrompa a operação fechando o fluxo
controller.abort()

Ou usando um AbortSignal com um fluxo legível como um iterável assíncrono:

js
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // definir um tempo limite
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
  try {
    for await (const chunk of stream) {
      await process(chunk)
    }
  } catch (e) {
    if (e.name === 'AbortError') {
      // A operação foi cancelada
    } else {
      throw e
    }
  }
})()

Ou usando um AbortSignal com um ReadableStream:

js
const controller = new AbortController()
const rs = new ReadableStream({
  start(controller) {
    controller.enqueue('hello')
    controller.enqueue('world')
    controller.close()
  },
})

addAbortSignal(controller.signal, rs)

finished(rs, err => {
  if (err) {
    if (err.name === 'AbortError') {
      // A operação foi cancelada
    }
  }
})

const reader = rs.getReader()

reader.read().then(({ value, done }) => {
  console.log(value) // hello
  console.log(done) // false
  controller.abort()
})

stream.getDefaultHighWaterMark(objectMode)

Adicionado em: v19.9.0, v18.17.0

Retorna o highWaterMark padrão usado pelos streams. O padrão é 65536 (64 KiB), ou 16 para objectMode.

stream.setDefaultHighWaterMark(objectMode, value)

Adicionado em: v19.9.0, v18.17.0

Define o highWaterMark padrão usado pelos streams.

API para implementadores de stream

A API do módulo node:stream foi projetada para facilitar a implementação de streams usando o modelo de herança prototípica do JavaScript.

Primeiro, um desenvolvedor de stream declararia uma nova classe JavaScript que estende uma das quatro classes básicas de stream (stream.Writable, stream.Readable, stream.Duplex ou stream.Transform), certificando-se de chamar o construtor da classe pai apropriado:

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor({ highWaterMark, ...options }) {
    super({ highWaterMark })
    // ...
  }
}

Ao estender streams, lembre-se de quais opções o usuário pode e deve fornecer antes de encaminhá-las para o construtor base. Por exemplo, se a implementação fizer suposições em relação às opções autoDestroy e emitClose, não permita que o usuário as substitua. Seja explícito sobre quais opções são encaminhadas em vez de encaminhar implicitamente todas as opções.

A nova classe de stream deve então implementar um ou mais métodos específicos, dependendo do tipo de stream que está sendo criado, conforme detalhado na tabela abaixo:

Caso de usoClasseMétodo(s) a implementar
Somente leituraReadable_read()
Somente escritaWritable_write() , _writev() , _final()
Leitura e escritaDuplex_read() , _write() , _writev() , _final()
Operar em dados escritos, então ler o resultadoTransform_transform() , _flush() , _final()

O código de implementação para um stream nunca deve chamar os métodos "públicos" de um stream que são destinados ao uso por consumidores (conforme descrito na seção API para consumidores de stream). Fazer isso pode levar a efeitos colaterais adversos no código do aplicativo que consome o stream.

Evite substituir métodos públicos como write(), end(), cork(), uncork(), read() e destroy(), ou emitir eventos internos como 'error', 'data', 'end', 'finish' e 'close' através de .emit(). Fazer isso pode quebrar invariantes de stream atuais e futuros, levando a problemas de comportamento e/ou compatibilidade com outros streams, utilitários de stream e expectativas do usuário.

Construção simplificada

Adicionado em: v1.2.0

Para muitos casos simples, é possível criar um fluxo sem depender de herança. Isso pode ser feito criando diretamente instâncias dos objetos stream.Writable, stream.Readable, stream.Duplex ou stream.Transform e passando métodos apropriados como opções do construtor.

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  construct(callback) {
    // Inicializar estado e carregar recursos...
  },
  write(chunk, encoding, callback) {
    // ...
  },
  destroy() {
    // Liberar recursos...
  },
})

Implementando um fluxo gravável

A classe stream.Writable é estendida para implementar um fluxo Writable.

Fluxos Writable personalizados devem chamar o construtor new stream.Writable([options]) e implementar o método writable._write() e/ou writable._writev().

new stream.Writable([options])

[Histórico]

VersãoAlterações
v22.0.0Aumentar highWaterMark padrão.
v15.5.0Suporte para passar um AbortSignal.
v14.0.0Alterar o padrão da opção autoDestroy para true.
v11.2.0, v10.16.0Adicionar a opção autoDestroy para destruir automaticamente o fluxo quando ele emitir 'finish' ou erros.
v10.0.0Adicionar a opção emitClose para especificar se 'close' é emitido na destruição.
js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor(options) {
    // Chama o construtor stream.Writable().
    super(options)
    // ...
  }
}

Ou, quando usar construtores de estilo pré-ES6:

js
const { Writable } = require('node:stream')
const util = require('node:util')

function MyWritable(options) {
  if (!(this instanceof MyWritable)) return new MyWritable(options)
  Writable.call(this, options)
}
util.inherits(MyWritable, Writable)

Ou, usando a abordagem de construtor simplificada:

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
})

Chamar abort no AbortController correspondente ao AbortSignal passado se comportará da mesma maneira que chamar .destroy(new AbortError()) no fluxo gravável.

js
const { Writable } = require('node:stream')

const controller = new AbortController()
const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
  signal: controller.signal,
})
// Mais tarde, interrompa a operação fechando o fluxo
controller.abort()

writable._construct(callback)

Adicionado em: v15.0.0

  • callback <Function> Chame esta função (opcionalmente com um argumento de erro) quando o stream tiver terminado a inicialização.

O método _construct() NÃO DEVE ser chamado diretamente. Pode ser implementado por classes filhas e, se assim for, será chamado apenas pelos métodos internos da classe Writable.

Esta função opcional será chamada em um tick após o construtor do stream ter retornado, atrasando quaisquer chamadas _write(), _final() e _destroy() até que callback seja chamado. Isso é útil para inicializar o estado ou inicializar recursos assincronamente antes que o stream possa ser usado.

js
const { Writable } = require('node:stream')
const fs = require('node:fs')

class WriteStream extends Writable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, 'w', (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _write(chunk, encoding, callback) {
    fs.write(this.fd, chunk, callback)
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

writable._write(chunk, encoding, callback)

[Histórico]

VersãoAlterações
v12.11.0_write() é opcional ao fornecer _writev().
  • chunk <Buffer> | <string> | <any> O Buffer a ser escrito, convertido a partir da string passada para stream.write(). Se a opção decodeStrings do stream for false ou se o stream estiver operando no modo objeto, o chunk não será convertido e será o que foi passado para stream.write().
  • encoding <string> Se o chunk for uma string, então encoding é a codificação de caracteres dessa string. Se o chunk for um Buffer, ou se o stream estiver operando no modo objeto, encoding pode ser ignorado.
  • callback <Function> Chame esta função (opcionalmente com um argumento de erro) quando o processamento estiver completo para o chunk fornecido.

Todas as implementações de stream Writable devem fornecer um método writable._write() e/ou writable._writev() para enviar dados para o recurso subjacente.

Os streams Transform fornecem sua própria implementação de writable._write().

Esta função NÃO DEVE ser chamada diretamente pelo código da aplicação. Deve ser implementada por classes filhas e chamada apenas pelos métodos internos da classe Writable.

A função callback deve ser chamada sincronicamente dentro de writable._write() ou assincronamente (ou seja, tick diferente) para sinalizar que a gravação foi concluída com sucesso ou falhou com um erro. O primeiro argumento passado para o callback deve ser o objeto Error se a chamada falhou ou null se a gravação foi bem-sucedida.

Todas as chamadas para writable.write() que ocorrem entre o momento em que writable._write() é chamado e o callback é chamado farão com que os dados escritos sejam armazenados em buffer. Quando o callback é chamado, o stream pode emitir um evento 'drain'. Se uma implementação de stream for capaz de processar vários chunks de dados ao mesmo tempo, o método writable._writev() deve ser implementado.

Se a propriedade decodeStrings for explicitamente definida como false nas opções do construtor, então chunk permanecerá o mesmo objeto que é passado para .write(), e pode ser uma string em vez de um Buffer. Isso é para suportar implementações que têm um tratamento otimizado para certas codificações de dados de string. Nesse caso, o argumento encoding indicará a codificação de caracteres da string. Caso contrário, o argumento encoding pode ser seguramente ignorado.

O método writable._write() tem o prefixo de um sublinhado porque é interno à classe que o define e nunca deve ser chamado diretamente por programas de usuário.

writable._writev(chunks, callback)

  • chunks <Object[]> Os dados a serem escritos. O valor é um array de <Object>, cada um representando um chunk discreto de dados a serem escritos. As propriedades desses objetos são:

    • chunk <Buffer> | <string> Uma instância de buffer ou string contendo os dados a serem escritos. O chunk será uma string se o Writable foi criado com a opção decodeStrings definida como false e uma string foi passada para write().
    • encoding <string> A codificação de caracteres do chunk. Se chunk for um Buffer, a encoding será 'buffer'.
  • callback <Function> Uma função de callback (opcionalmente com um argumento de erro) a ser invocada quando o processamento estiver completo para os chunks fornecidos.

Esta função NÃO DEVE ser chamada diretamente pelo código da aplicação. Ela deve ser implementada pelas classes filhas e chamada apenas pelos métodos internos da classe Writable.

O método writable._writev() pode ser implementado em adição ou como alternativa a writable._write() em implementações de stream que são capazes de processar múltiplos chunks de dados ao mesmo tempo. Se implementado e se houver dados em buffer de escritas anteriores, _writev() será chamado em vez de _write().

O método writable._writev() é prefixado com um underscore porque é interno à classe que o define e nunca deve ser chamado diretamente por programas de usuário.

writable._destroy(err, callback)

Adicionado em: v8.0.0

  • err <Error> Um possível erro.
  • callback <Function> Uma função de callback que recebe um argumento de erro opcional.

O método _destroy() é chamado por writable.destroy(). Ele pode ser sobrescrito por classes filhas, mas não deve ser chamado diretamente.

writable._final(callback)

Adicionado em: v8.0.0

  • callback <Function> Chame esta função (opcionalmente com um argumento de erro) quando terminar de escrever quaisquer dados restantes.

O método _final() não deve ser chamado diretamente. Ele pode ser implementado por classes filhas e, se assim for, será chamado apenas pelos métodos internos da classe Writable.

Esta função opcional será chamada antes do fechamento do fluxo, atrasando o evento 'finish' até que callback seja chamado. Isso é útil para fechar recursos ou escrever dados em buffer antes do término de um fluxo.

Erros durante a escrita

Erros que ocorrem durante o processamento dos métodos writable._write(), writable._writev() e writable._final() devem ser propagados invocando o callback e passando o erro como o primeiro argumento. Lançar um Error de dentro desses métodos ou emitir manualmente um evento 'error' resulta em comportamento indefinido.

Se um fluxo Readable for conectado a um fluxo Writable quando Writable emitir um erro, o fluxo Readable será desconectado.

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'))
    } else {
      callback()
    }
  },
})

Um exemplo de fluxo gravável

O seguinte ilustra uma implementação de fluxo Writable personalizado bastante simplista (e um tanto sem sentido). Embora esta instância de fluxo Writable específica não seja de nenhuma utilidade real em particular, o exemplo ilustra cada um dos elementos necessários de uma instância de fluxo Writable personalizado:

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'))
    } else {
      callback()
    }
  }
}

Decodificando buffers em um fluxo gravável

Decodificar buffers é uma tarefa comum, por exemplo, ao usar transformadores cuja entrada é uma string. Este não é um processo trivial quando se usa codificação de caracteres multibyte, como UTF-8. O exemplo a seguir mostra como decodificar strings multibyte usando StringDecoder e Writable.

js
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')

class StringWritable extends Writable {
  constructor(options) {
    super(options)
    this._decoder = new StringDecoder(options?.defaultEncoding)
    this.data = ''
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk)
    }
    this.data += chunk
    callback()
  }
  _final(callback) {
    this.data += this._decoder.end()
    callback()
  }
}

const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()

w.write('currency: ')
w.write(euro[0])
w.end(euro[1])

console.log(w.data) // currency: €

Implementando um fluxo legível

A classe stream.Readable é estendida para implementar um fluxo Readable.

Fluxos Readable personalizados devem chamar o construtor new stream.Readable([options]) e implementar o método readable._read().

new stream.Readable([options])

[Histórico]

VersãoAlterações
v22.0.0Aumento do highWaterMark padrão.
v15.5.0Suporte para passar um AbortSignal.
v14.0.0Altera o padrão da opção autoDestroy para true.
v11.2.0, v10.16.0Adiciona a opção autoDestroy para automaticamente destroy() o fluxo quando ele emite 'end' ou erros.
  • options <Object>
    • highWaterMark <number> O máximo número de bytes para armazenar no buffer interno antes de parar de ler do recurso subjacente. Padrão: 65536 (64 KiB), ou 16 para fluxos objectMode.
    • encoding <string> Se especificado, os buffers serão decodificados para strings usando a codificação especificada. Padrão: null.
    • objectMode <boolean> Se este fluxo deve se comportar como um fluxo de objetos. Significando que stream.read(n) retorna um único valor em vez de um Buffer de tamanho n. Padrão: false.
    • emitClose <boolean> Se o fluxo deve emitir 'close' depois de ter sido destruído. Padrão: true.
    • read <Function> Implementação para o método stream._read().
    • destroy <Function> Implementação para o método stream._destroy().
    • construct <Function> Implementação para o método stream._construct().
    • autoDestroy <boolean> Se este fluxo deve chamar automaticamente .destroy() em si mesmo após o término. Padrão: true.
    • signal <AbortSignal> Um sinal representando uma possível cancelamento.
js
const { Readable } = require('node:stream')

class MyReadable extends Readable {
  constructor(options) {
    // Chama o construtor stream.Readable(options).
    super(options)
    // ...
  }
}

Ou, quando usar construtores de estilo pré-ES6:

js
const { Readable } = require('node:stream')
const util = require('node:util')

function MyReadable(options) {
  if (!(this instanceof MyReadable)) return new MyReadable(options)
  Readable.call(this, options)
}
util.inherits(MyReadable, Readable)

Ou, usando a abordagem de construtor simplificada:

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    // ...
  },
})

Chamar abort no AbortController correspondente ao AbortSignal passado se comportará da mesma forma que chamar .destroy(new AbortError()) no readable criado.

js
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
  read(size) {
    // ...
  },
  signal: controller.signal,
})
// Mais tarde, interrompa a operação fechando o fluxo
controller.abort()

readable._construct(callback)

Adicionado em: v15.0.0

  • callback <Function> Chame esta função (opcionalmente com um argumento de erro) quando o stream terminar de inicializar.

O método _construct() NÃO deve ser chamado diretamente. Ele pode ser implementado por classes filhas e, se for o caso, será chamado apenas pelos métodos internos da classe Readable.

Esta função opcional será agendada no próximo tick pelo construtor do stream, atrasando quaisquer chamadas _read() e _destroy() até que callback seja chamado. Isso é útil para inicializar o estado ou inicializar recursos assincronamente antes que o stream possa ser usado.

js
const { Readable } = require('node:stream')
const fs = require('node:fs')

class ReadStream extends Readable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _read(n) {
    const buf = Buffer.alloc(n)
    fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
      if (err) {
        this.destroy(err)
      } else {
        this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
      }
    })
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

readable._read(size)

Adicionado em: v0.9.4

  • size <number> Número de bytes a serem lidos assincronamente

Esta função NÃO deve ser chamada diretamente pelo código do aplicativo. Ela deve ser implementada por classes filhas e chamada apenas pelos métodos internos da classe Readable.

Todas as implementações de stream Readable devem fornecer uma implementação do método readable._read() para buscar dados do recurso subjacente.

Quando readable._read() é chamado, se dados estiverem disponíveis no recurso, a implementação deve começar a inserir esses dados na fila de leitura usando o método this.push(dataChunk). _read() será chamado novamente após cada chamada para this.push(dataChunk) assim que o stream estiver pronto para aceitar mais dados. _read() pode continuar lendo do recurso e inserindo dados até que readable.push() retorne false. Somente quando _read() for chamado novamente depois de ter parado, ele deve retomar a inserção de dados adicionais na fila.

Assim que o método readable._read() tiver sido chamado, ele não será chamado novamente até que mais dados sejam inseridos por meio do método readable.push(). Dados vazios, como buffers e strings vazios, não farão com que readable._read() seja chamado.

O argumento size é consultivo. Para implementações em que uma "leitura" é uma única operação que retorna dados, pode-se usar o argumento size para determinar quantos dados buscar. Outras implementações podem ignorar este argumento e simplesmente fornecer dados sempre que estiverem disponíveis. Não há necessidade de "esperar" até que size bytes estejam disponíveis antes de chamar stream.push(chunk).

O método readable._read() tem um prefixo de sublinhado porque é interno à classe que o define e nunca deve ser chamado diretamente por programas de usuário.

readable._destroy(err, callback)

Adicionado em: v8.0.0

  • err <Error> Um possível erro.
  • callback <Function> Uma função de callback que recebe um argumento de erro opcional.

O método _destroy() é chamado por readable.destroy(). Ele pode ser sobrescrito por classes filhas, mas não deve ser chamado diretamente.

readable.push(chunk[, encoding])

[Histórico]

VersãoAlterações
v22.0.0, v20.13.0O argumento chunk agora pode ser uma instância de TypedArray ou DataView.
v8.0.0O argumento chunk agora pode ser uma instância de Uint8Array.
  • chunk <Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Trecho de dados para inserir na fila de leitura. Para fluxos que não operam no modo objeto, chunk deve ser uma <string>, <Buffer>, <TypedArray> ou <DataView>. Para fluxos em modo objeto, chunk pode ser qualquer valor JavaScript.
  • encoding <string> Codificação de trechos de string. Deve ser uma codificação Buffer válida, como 'utf8' ou 'ascii'.
  • Retorna: <boolean> true se trechos adicionais de dados podem continuar a ser inseridos; false caso contrário.

Quando chunk é um <Buffer>, <TypedArray>, <DataView> ou <string>, o trecho de dados será adicionado à fila interna para que os usuários do fluxo possam consumir. Passar chunk como null sinaliza o fim do fluxo (EOF), após o qual nenhum dado poderá mais ser escrito.

Quando o Readable está operando no modo pausado, os dados adicionados com readable.push() podem ser lidos chamando o método readable.read() quando o evento 'readable' for emitido.

Quando o Readable está operando no modo fluindo, os dados adicionados com readable.push() serão entregues emitindo um evento 'data'.

O método readable.push() é projetado para ser o mais flexível possível. Por exemplo, ao encapsular uma fonte de nível inferior que fornece algum tipo de mecanismo de pausa/reprise e um callback de dados, a fonte de nível inferior pode ser encapsulada pela instância Readable personalizada:

js
// `_source` é um objeto com métodos readStop() e readStart(),
// e um membro `ondata` que é chamado quando há dados, e
// um membro `onend` que é chamado quando os dados terminam.

class SourceWrapper extends Readable {
  constructor(options) {
    super(options)

    this._source = getLowLevelSourceObject()

    // Toda vez que houver dados, insira-os no buffer interno.
    this._source.ondata = chunk => {
      // Se push() retornar false, pare de ler da fonte.
      if (!this.push(chunk)) this._source.readStop()
    }

    // Quando a fonte terminar, insira o trecho `null` que sinaliza o EOF.
    this._source.onend = () => {
      this.push(null)
    }
  }
  // _read() será chamado quando o fluxo quiser puxar mais dados.
  // O argumento de tamanho consultivo é ignorado neste caso.
  _read(size) {
    this._source.readStart()
  }
}

O método readable.push() é usado para inserir o conteúdo no buffer interno. Ele pode ser acionado pelo método readable._read().

Para fluxos que não operam no modo objeto, se o parâmetro chunk de readable.push() for undefined, ele será tratado como uma string ou buffer vazio. Veja readable.push('') para mais informações.

Erros durante a leitura

Erros que ocorrem durante o processamento de readable._read() devem ser propagados através do método readable.destroy(err). Lançar um Error de dentro de readable._read() ou emitir manualmente um evento 'error' resulta em comportamento indefinido.

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    const err = checkSomeErrorCondition()
    if (err) {
      this.destroy(err)
    } else {
      // Faça algum trabalho.
    }
  },
})

Um exemplo de stream de contagem

O seguinte é um exemplo básico de um stream Readable que emite os numerais de 1 a 1.000.000 em ordem crescente e depois termina.

js
const { Readable } = require('node:stream')

class Counter extends Readable {
  constructor(opt) {
    super(opt)
    this._max = 1000000
    this._index = 1
  }

  _read() {
    const i = this._index++
    if (i > this._max) this.push(null)
    else {
      const str = String(i)
      const buf = Buffer.from(str, 'ascii')
      this.push(buf)
    }
  }
}

Implementando um stream duplex

Um stream Duplex é aquele que implementa tanto Readable quanto Writable, como uma conexão de socket TCP.

Como JavaScript não possui suporte para herança múltipla, a classe stream.Duplex é estendida para implementar um stream Duplex (ao contrário de estender as classes stream.Readable e stream.Writable).

A classe stream.Duplex herda prototipicamente de stream.Readable e parasitariamente de stream.Writable, mas instanceof funcionará corretamente para ambas as classes base devido à substituição de Symbol.hasInstance em stream.Writable.

Streams Duplex personalizados devem chamar o construtor new stream.Duplex([options]) e implementar ambos os métodos readable._read() e writable._write().

new stream.Duplex(options)

[Histórico]

VersãoAlterações
v8.4.0As opções readableHighWaterMark e writableHighWaterMark agora são suportadas.
  • options <Object> Passado para os construtores Writable e Readable. Também possui os seguintes campos:
    • allowHalfOpen <boolean> Se definido como false, o fluxo encerrará automaticamente o lado gravável quando o lado legível terminar. Padrão: true.
    • readable <boolean> Define se o Duplex deve ser legível. Padrão: true.
    • writable <boolean> Define se o Duplex deve ser gravável. Padrão: true.
    • readableObjectMode <boolean> Define objectMode para o lado legível do fluxo. Não tem efeito se objectMode for true. Padrão: false.
    • writableObjectMode <boolean> Define objectMode para o lado gravável do fluxo. Não tem efeito se objectMode for true. Padrão: false.
    • readableHighWaterMark <number> Define highWaterMark para o lado legível do fluxo. Não tem efeito se highWaterMark for fornecido.
    • writableHighWaterMark <number> Define highWaterMark para o lado gravável do fluxo. Não tem efeito se highWaterMark for fornecido.
js
const { Duplex } = require('node:stream')

class MyDuplex extends Duplex {
  constructor(options) {
    super(options)
    // ...
  }
}

Ou, quando usar construtores de estilo pré-ES6:

js
const { Duplex } = require('node:stream')
const util = require('node:util')

function MyDuplex(options) {
  if (!(this instanceof MyDuplex)) return new MyDuplex(options)
  Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)

Ou, usando a abordagem de construtor simplificada:

js
const { Duplex } = require('node:stream')

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  },
})

Quando usar pipeline:

js
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')

pipeline(
  fs.createReadStream('object.json').setEncoding('utf8'),
  new Transform({
    decodeStrings: false, // Aceita entrada de string em vez de Buffers
    construct(callback) {
      this.data = ''
      callback()
    },
    transform(chunk, encoding, callback) {
      this.data += chunk
      callback()
    },
    flush(callback) {
      try {
        // Certifique-se de que seja json válido.
        JSON.parse(this.data)
        this.push(this.data)
        callback()
      } catch (err) {
        callback(err)
      }
    },
  }),
  fs.createWriteStream('valid-object.json'),
  err => {
    if (err) {
      console.error('falhou', err)
    } else {
      console.log('concluído')
    }
  }
)

Um exemplo de stream duplex

O seguinte ilustra um exemplo simples de um stream Duplex que encapsula um objeto de origem de nível inferior hipotético para o qual os dados podem ser escritos e de onde os dados podem ser lidos, embora usando uma API que não é compatível com streams Node.js. O seguinte ilustra um exemplo simples de um stream Duplex que armazena em buffer os dados escritos recebidos por meio da interface Writable que é lida novamente por meio da interface Readable.

js
const { Duplex } = require('node:stream')
const kSource = Symbol('source')

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options)
    this[kSource] = source
  }

  _write(chunk, encoding, callback) {
    // A origem subjacente lida apenas com strings.
    if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
    this[kSource].writeSomeData(chunk)
    callback()
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding))
    })
  }
}

O aspecto mais importante de um stream Duplex é que os lados Readable e Writable operam independentemente um do outro, apesar de coexistirem em uma única instância de objeto.

Streams duplex em modo de objeto

Para streams Duplex, objectMode pode ser definido exclusivamente para o lado Readable ou Writable usando as opções readableObjectMode e writableObjectMode, respectivamente.

No exemplo a seguir, por exemplo, um novo stream Transform (que é um tipo de stream Duplex) é criado que possui um lado Writable em modo de objeto que aceita números JavaScript que são convertidos em strings hexadecimais no lado Readable.

js
const { Transform } = require('node:stream')

// Todos os streams Transform também são streams Duplex.
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // Coerce o chunk para um número, se necessário.
    chunk |= 0

    // Transforme o chunk em algo diferente.
    const data = chunk.toString(16)

    // Envie os dados para a fila legível.
    callback(null, '0'.repeat(data.length % 2) + data)
  },
})

myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))

myTransform.write(1)
// Imprime: 01
myTransform.write(10)
// Imprime: 0a
myTransform.write(100)
// Imprime: 64

Implementando um fluxo de transformação

Um fluxo Transform é um fluxo Duplex onde a saída é computada de alguma forma a partir da entrada. Exemplos incluem fluxos zlib ou fluxos crypto que comprimem, criptografam ou descriptografam dados.

Não há nenhum requisito de que a saída tenha o mesmo tamanho que a entrada, o mesmo número de chunks ou chegue ao mesmo tempo. Por exemplo, um fluxo Hash terá apenas um único chunk de saída, que é fornecido quando a entrada termina. Um fluxo zlib produzirá uma saída que é muito menor ou muito maior que sua entrada.

A classe stream.Transform é estendida para implementar um fluxo Transform.

A classe stream.Transform herda prototipicamente de stream.Duplex e implementa suas próprias versões dos métodos writable._write() e readable._read(). Implementações personalizadas de Transform devem implementar o método transform._transform() e podem também implementar o método transform._flush().

É preciso ter cuidado ao usar fluxos Transform, pois os dados escritos no fluxo podem fazer com que o lado Writable do fluxo seja pausado se a saída no lado Readable não for consumida.

new stream.Transform([options])

js
const { Transform } = require('node:stream')

class MyTransform extends Transform {
  constructor(options) {
    super(options)
    // ...
  }
}

Ou, ao usar construtores de estilo pré-ES6:

js
const { Transform } = require('node:stream')
const util = require('node:util')

function MyTransform(options) {
  if (!(this instanceof MyTransform)) return new MyTransform(options)
  Transform.call(this, options)
}
util.inherits(MyTransform, Transform)

Ou, usando a abordagem de construtor simplificada:

js
const { Transform } = require('node:stream')

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  },
})

Evento: 'end'

O evento 'end' é da classe stream.Readable. O evento 'end' é emitido depois que todos os dados foram enviados, o que ocorre depois que o callback em transform._flush() foi chamado. No caso de um erro, 'end' não deve ser emitido.

Evento: 'finish'

O evento 'finish' é da classe stream.Writable. O evento 'finish' é emitido depois que stream.end() é chamado e todos os chunks foram processados por stream._transform(). No caso de um erro, 'finish' não deve ser emitido.

transform._flush(callback)

  • callback <Function> Uma função de callback (opcionalmente com um argumento de erro e dados) a ser chamada quando os dados restantes foram liberados.

Esta função NÃO DEVE ser chamada diretamente pelo código da aplicação. Ela deve ser implementada por classes filhas e chamada apenas pelos métodos internos da classe Readable.

Em alguns casos, uma operação de transformação pode precisar emitir um pouco de dados adicionais no final do stream. Por exemplo, um stream de compressão zlib armazenará uma quantidade de estado interno usado para comprimir o resultado de forma otimizada. No entanto, quando o stream termina, esses dados adicionais precisam ser liberados para que os dados comprimidos estejam completos.

Implementações personalizadas de Transform podem implementar o método transform._flush(). Este será chamado quando não houver mais dados escritos para serem consumidos, mas antes do evento 'end' ser emitido, sinalizando o fim do stream Readable.

Dentro da implementação transform._flush(), o método transform.push() pode ser chamado zero ou mais vezes, conforme apropriado. A função callback deve ser chamada quando a operação de liberação estiver completa.

O método transform._flush() é prefixado com um underscore porque é interno à classe que o define e nunca deve ser chamado diretamente por programas de usuário.

transform._transform(chunk, encoding, callback)

  • chunk <Buffer> | <string> | <any> O Buffer a ser transformado, convertido a partir da string passada para stream.write(). Se a opção decodeStrings do stream for false ou o stream estiver operando no modo objeto, o chunk não será convertido e será o que foi passado para stream.write().
  • encoding <string> Se o chunk for uma string, este é o tipo de codificação. Se o chunk for um buffer, este é o valor especial 'buffer'. Ignore-o nesse caso.
  • callback <Function> Uma função de callback (opcionalmente com um argumento de erro e dados) a ser chamada após o chunk fornecido ter sido processado.

Esta função NÃO DEVE ser chamada diretamente pelo código do aplicativo. Ela deve ser implementada por classes filhas e chamada apenas pelos métodos internos da classe Readable.

Todas as implementações de stream Transform devem fornecer um método _transform() para aceitar entrada e produzir saída. A implementação transform._transform() lida com os bytes sendo escritos, calcula uma saída e, em seguida, passa essa saída para a parte legível usando o método transform.push().

O método transform.push() pode ser chamado zero ou mais vezes para gerar saída de um único chunk de entrada, dependendo de quanto deve ser gerado como resultado do chunk.

É possível que nenhuma saída seja gerada a partir de qualquer chunk de dados de entrada.

A função callback deve ser chamada apenas quando o chunk atual for completamente consumido. O primeiro argumento passado para o callback deve ser um objeto Error se ocorreu um erro durante o processamento da entrada ou null caso contrário. Se um segundo argumento for passado para o callback, ele será encaminhado para o método transform.push(), mas apenas se o primeiro argumento for falso. Em outras palavras, os seguintes são equivalentes:

js
transform.prototype._transform = function (data, encoding, callback) {
  this.push(data)
  callback()
}

transform.prototype._transform = function (data, encoding, callback) {
  callback(null, data)
}

O método transform._transform() é prefixado com um sublinhado porque é interno à classe que o define e nunca deve ser chamado diretamente por programas de usuário.

transform._transform() nunca é chamado em paralelo; os streams implementam um mecanismo de fila e, para receber o próximo chunk, callback deve ser chamado, sincronicamente ou assincronicamente.

Classe: stream.PassThrough

A classe stream.PassThrough é uma implementação trivial de um stream Transform que simplesmente passa os bytes de entrada para a saída. Seu propósito é principalmente para exemplos e testes, mas existem alguns casos de uso em que stream.PassThrough é útil como um bloco de construção para novos tipos de streams.

Notas adicionais

Compatibilidade de Streams com geradores assíncronos e iteradores assíncronos

Com o suporte de geradores assíncronos e iteradores em JavaScript, os geradores assíncronos são efetivamente uma construção de stream de nível de linguagem de primeira classe neste momento.

Alguns casos comuns de interoperabilidade usando streams Node.js com geradores assíncronos e iteradores assíncronos são fornecidos abaixo.

Consumindo streams legíveis com iteradores assíncronos

js
;(async function () {
  for await (const chunk of readable) {
    console.log(chunk)
  }
})()

Iteradores assíncronos registram um manipulador de erros permanente no stream para evitar quaisquer erros não tratados pós-destruição.

Criando streams legíveis com geradores assíncronos

Um stream legível Node.js pode ser criado a partir de um gerador assíncrono usando o método utilitário Readable.from():

js
const { Readable } = require('node:stream')

const ac = new AbortController()
const signal = ac.signal

async function* generate() {
  yield 'a'
  await someLongRunningFn({ signal })
  yield 'b'
  yield 'c'
}

const readable = Readable.from(generate())
readable.on('close', () => {
  ac.abort()
})

readable.on('data', chunk => {
  console.log(chunk)
})

Encadeando para streams graváveis a partir de iteradores assíncronos

Ao gravar em um stream gravável a partir de um iterador assíncrono, garanta o tratamento correto da contrapressão e erros. stream.pipeline() abstrai o tratamento da contrapressão e erros relacionados à contrapressão:

js
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')

const writable = fs.createWriteStream('./file')

const ac = new AbortController()
const signal = ac.signal

const iterator = createIterator({ signal })

// Padrão de Callback
pipeline(iterator, writable, (err, value) => {
  if (err) {
    console.error(err)
  } else {
    console.log(value, 'value returned')
  }
}).on('close', () => {
  ac.abort()
})

// Padrão de Promise
pipelinePromise(iterator, writable)
  .then(value => {
    console.log(value, 'value returned')
  })
  .catch(err => {
    console.error(err)
    ac.abort()
  })

Compatibilidade com versões antigas do Node.js

Antes do Node.js 0.10, a interface de stream Readable era mais simples, mas também menos poderosa e menos útil.

  • Em vez de esperar por chamadas ao método stream.read(), os eventos 'data' começariam a ser emitidos imediatamente. Aplicações que precisariam realizar alguma quantidade de trabalho para decidir como lidar com os dados eram obrigadas a armazenar os dados lidos em buffers para que os dados não fossem perdidos.
  • O método stream.pause() era consultivo, e não garantido. Isso significava que ainda era necessário estar preparado para receber eventos 'data' mesmo quando o stream estava em um estado pausado.

No Node.js 0.10, a classe Readable foi adicionada. Para compatibilidade com versões anteriores de programas Node.js, os streams Readable mudam para o "modo de fluxo" quando um manipulador de eventos 'data' é adicionado, ou quando o método stream.resume() é chamado. O efeito é que, mesmo sem usar o novo método stream.read() e o evento 'readable', não é mais necessário se preocupar com a perda de pedaços de 'data'.

Embora a maioria das aplicações continue funcionando normalmente, isso introduz um caso limite nas seguintes condições:

  • Nenhum ouvinte de evento 'data' é adicionado.
  • O método stream.resume() nunca é chamado.
  • O stream não é enviado para nenhum destino gravável.

Por exemplo, considere o seguinte código:

js
// AVISO! QUEBRADO!
net
  .createServer(socket => {
    // Adicionamos um ouvinte 'end', mas nunca consumimos os dados.
    socket.on('end', () => {
      // Nunca chegará aqui.
      socket.end('A mensagem foi recebida, mas não foi processada.\n')
    })
  })
  .listen(1337)

Antes do Node.js 0.10, os dados da mensagem de entrada seriam simplesmente descartados. No entanto, no Node.js 0.10 e posteriores, o socket permanece pausado para sempre.

A solução alternativa nessa situação é chamar o método stream.resume() para iniciar o fluxo de dados:

js
// Solução alternativa.
net
  .createServer(socket => {
    socket.on('end', () => {
      socket.end('A mensagem foi recebida, mas não foi processada.\n')
    })

    // Inicia o fluxo de dados, descartando-o.
    socket.resume()
  })
  .listen(1337)

Além dos novos streams Readable mudando para o modo de fluxo, streams no estilo anterior a 0.10 podem ser encapsulados em uma classe Readable usando o método readable.wrap().

readable.read(0)

Existem alguns casos em que é necessário disparar uma atualização dos mecanismos de stream legível subjacentes, sem realmente consumir nenhum dado. Nesses casos, é possível chamar readable.read(0), que sempre retornará null.

Se o buffer de leitura interno estiver abaixo do highWaterMark, e o stream não estiver lendo atualmente, então chamar stream.read(0) disparará uma chamada de baixo nível stream._read().

Embora a maioria dos aplicativos quase nunca precise fazer isso, existem situações dentro do Node.js em que isso é feito, particularmente nas internas da classe de stream Readable.

readable.push('')

O uso de readable.push('') não é recomendado.

Inserir uma <string> de zero byte, <Buffer>, <TypedArray> ou <DataView> em um stream que não está no modo objeto tem um efeito colateral interessante. Como é uma chamada para readable.push(), a chamada encerrará o processo de leitura. No entanto, como o argumento é uma string vazia, nenhum dado é adicionado ao buffer legível, portanto não há nada para o usuário consumir.

Discrepância de highWaterMark após chamar readable.setEncoding()

O uso de readable.setEncoding() mudará o comportamento de como o highWaterMark opera no modo não objeto.

Normalmente, o tamanho do buffer atual é medido em relação ao highWaterMark em bytes. No entanto, depois que setEncoding() é chamado, a função de comparação começará a medir o tamanho do buffer em caracteres.

Isso não é um problema em casos comuns com latin1 ou ascii. Mas é aconselhável estar atento a esse comportamento ao trabalhar com strings que podem conter caracteres multibyte.