Stream
[Estável: 2 - Estável]
Estável: 2 Estabilidade: 2 - Estável
Código-fonte: lib/stream.js
Um stream é uma interface abstrata para trabalhar com dados em streaming no Node.js. O módulo node:stream
fornece uma API para implementar a interface de stream.
Existem muitos objetos de stream fornecidos pelo Node.js. Por exemplo, uma solicitação a um servidor HTTP e process.stdout
são ambos instâncias de stream.
Os streams podem ser legíveis, graváveis ou ambos. Todos os streams são instâncias de EventEmitter
.
Para acessar o módulo node:stream
:
const stream = require('node:stream')
O módulo node:stream
é útil para criar novos tipos de instâncias de stream. Normalmente, não é necessário usar o módulo node:stream
para consumir streams.
Organização deste documento
Este documento contém duas seções principais e uma terceira seção para notas. A primeira seção explica como usar streams existentes dentro de um aplicativo. A segunda seção explica como criar novos tipos de streams.
Tipos de streams
Existem quatro tipos de streams fundamentais no Node.js:
Writable
: streams para os quais os dados podem ser escritos (por exemplo,fs.createWriteStream()
).Readable
: streams dos quais os dados podem ser lidos (por exemplo,fs.createReadStream()
).Duplex
: streams que sãoReadable
eWritable
(por exemplo,net.Socket
).Transform
: streamsDuplex
que podem modificar ou transformar os dados à medida que são escritos e lidos (por exemplo,zlib.createDeflate()
).
Além disso, este módulo inclui as funções utilitárias stream.duplexPair()
, stream.pipeline()
, stream.finished()
stream.Readable.from()
e stream.addAbortSignal()
.
Streams Promises API
Adicionado em: v15.0.0
A API stream/promises
fornece um conjunto alternativo de funções de utilitário assíncronas para streams que retornam objetos Promise
em vez de usar callbacks. A API é acessível via require('node:stream/promises')
ou require('node:stream').promises
.
stream.pipeline(source[, ...transforms], destination[, options])
stream.pipeline(streams[, options])
[Histórico]
Versão | Alterações |
---|---|
v18.0.0, v17.2.0, v16.14.0 | Adiciona a opção end , que pode ser definida como false para evitar o fechamento automático do stream de destino quando a origem termina. |
v15.0.0 | Adicionada em: v15.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function>- Retorna: <Promise> | <AsyncIterable>
...transforms
<Stream> | <Function>source
<AsyncIterable>- Retorna: <Promise> | <AsyncIterable>
destination
<Stream> | <Function>source
<AsyncIterable>- Retorna: <Promise> | <AsyncIterable>
options
<Object> Opções do Pipelinesignal
<AbortSignal>end
<boolean> Encerra o stream de destino quando o stream de origem termina. Os streams de transformação são sempre encerrados, mesmo que este valor sejafalse
. Padrão:true
.
Retorna: <Promise> Cumpre quando o pipeline estiver completo.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
console.log('Pipeline succeeded.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Pipeline succeeded.')
Para usar um AbortSignal
, passe-o dentro de um objeto de opções, como o último argumento. Quando o sinal for abortado, destroy
será chamado no pipeline subjacente, com um AbortError
.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
const ac = new AbortController()
const signal = ac.signal
setImmediate(() => ac.abort())
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
signal,
})
}
run().catch(console.error) // AbortError
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
console.error(err) // AbortError
}
A API pipeline
também suporta geradores assíncronos:
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // Trabalha com strings em vez de `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
fs.createWriteStream('uppercase.txt')
)
console.log('Pipeline succeeded.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // Trabalha com strings em vez de `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
createWriteStream('uppercase.txt')
)
console.log('Pipeline succeeded.')
Lembre-se de lidar com o argumento signal
passado para o gerador assíncrono. Especialmente no caso em que o gerador assíncrono é a fonte do pipeline (ou seja, o primeiro argumento) ou o pipeline nunca será concluído.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline succeeded.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline succeeded.')
A API pipeline
fornece versão com callback:
stream.finished(stream[, options])
[Histórico]
Versão | Alterações |
---|---|
v19.5.0, v18.14.0 | Adicionou suporte para ReadableStream e WritableStream . |
v19.1.0, v18.13.0 | A opção cleanup foi adicionada. |
v15.0.0 | Adicionada em: v15.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> Um fluxo/fluxo da web legível e/ou gravável.options
<Object>error
<boolean> | <undefined>readable
<boolean> | <undefined>writable
<boolean> | <undefined>signal
<AbortSignal> | <undefined>cleanup
<boolean> | <undefined> Setrue
, remove os ouvintes registrados por esta função antes da promessa ser cumprida. Padrão:false
.
Retorna: <Promise> Cumpre quando o fluxo não é mais legível ou gravável.
const { finished } = require('node:stream/promises')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('Stream terminou a leitura.')
}
run().catch(console.error)
rs.resume() // Esvazia o fluxo.
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'
const rs = createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('Stream terminou a leitura.')
}
run().catch(console.error)
rs.resume() // Esvazia o fluxo.
A API finished
também fornece uma versão de callback.
stream.finished()
deixa ouvintes de eventos pendentes (em particular 'error'
, 'end'
, 'finish'
e 'close'
) depois que a promessa retornada é resolvida ou rejeitada. A razão para isso é que eventos 'error'
inesperados (devido a implementações de fluxo incorretas) não causam travamentos inesperados. Se este for um comportamento indesejado, options.cleanup
deve ser definido como true
:
await finished(rs, { cleanup: true })
Modo objeto
Todos os fluxos criados pelas APIs do Node.js operam exclusivamente em strings, <Buffer>, <TypedArray> e <DataView> objetos:
Strings
eBuffers
são os tipos mais comuns usados com fluxos.TypedArray
eDataView
permitem que você manipule dados binários com tipos comoInt32Array
ouUint8Array
. Quando você grava um TypedArray ou DataView em um fluxo, o Node.js processa os bytes brutos.
No entanto, é possível que as implementações de fluxo funcionem com outros tipos de valores JavaScript (com exceção de null
, que tem um propósito especial dentro dos fluxos). Esses fluxos são considerados como operando em "modo objeto".
As instâncias de fluxo são alteradas para o modo objeto usando a opção objectMode
quando o fluxo é criado. Tentar alternar um fluxo existente para o modo objeto não é seguro.
Buffering
Tanto os fluxos Writable
quanto Readable
armazenarão dados em um buffer interno.
A quantidade de dados potencialmente armazenados em buffer depende da opção highWaterMark
passada para o construtor do fluxo. Para fluxos normais, a opção highWaterMark
especifica um número total de bytes. Para fluxos operando em modo objeto, o highWaterMark
especifica um número total de objetos. Para fluxos operando em (mas não decodificando) strings, o highWaterMark
especifica um número total de unidades de código UTF-16.
Os dados são armazenados em buffer em fluxos Readable
quando a implementação chama stream.push(chunk)
. Se o consumidor do Stream não chamar stream.read()
, os dados permanecerão na fila interna até serem consumidos.
Assim que o tamanho total do buffer de leitura interno atingir o limite especificado por highWaterMark
, o fluxo interromperá temporariamente a leitura de dados do recurso subjacente até que os dados atualmente armazenados em buffer possam ser consumidos (ou seja, o fluxo parará de chamar o método interno readable._read()
usado para preencher o buffer de leitura).
Os dados são armazenados em buffer em fluxos Writable
quando o método writable.write(chunk)
é chamado repetidamente. Enquanto o tamanho total do buffer de gravação interno estiver abaixo do limite definido por highWaterMark
, as chamadas para writable.write()
retornarão true
. Assim que o tamanho do buffer interno atingir ou exceder o highWaterMark
, false
será retornado.
Um objetivo principal da API stream
, particularmente o método stream.pipe()
, é limitar o armazenamento em buffer de dados a níveis aceitáveis, de modo que fontes e destinos com velocidades diferentes não sobrecarreguem a memória disponível.
A opção highWaterMark
é um limite, não um limite estrito: ela dita a quantidade de dados que um fluxo armazena em buffer antes de parar de solicitar mais dados. Ela não impõe uma limitação de memória estrita em geral. Implementações de fluxo específicas podem optar por impor limites mais rígidos, mas isso é opcional.
Como os fluxos Duplex
e Transform
são Readable
e Writable
, cada um mantém dois buffers internos separados usados para leitura e gravação, permitindo que cada lado opere independentemente do outro, mantendo um fluxo apropriado e eficiente de dados. Por exemplo, as instâncias net.Socket
são fluxos Duplex
cujo lado Readable
permite o consumo de dados recebidos de a socket e cujo lado Writable
permite a gravação de dados para a socket. Como os dados podem ser escritos para a socket a uma taxa mais rápida ou mais lenta do que os dados são recebidos, cada lado deve operar (e armazenar em buffer) independentemente do outro.
A mecânica do buffering interno é um detalhe de implementação interna e pode ser alterada a qualquer momento. No entanto, para certas implementações avançadas, os buffers internos podem ser recuperados usando writable.writableBuffer
ou readable.readableBuffer
. O uso dessas propriedades não documentadas é desencorajado.
API para consumidores de stream
Quase todos os aplicativos Node.js, por mais simples que sejam, usam streams de alguma forma. O seguinte é um exemplo de uso de streams em um aplicativo Node.js que implementa um servidor HTTP:
const http = require('node:http')
const server = http.createServer((req, res) => {
// `req` é um http.IncomingMessage, que é um stream legível.
// `res` é um http.ServerResponse, que é um stream gravável.
let body = ''
// Obter os dados como strings utf8.
// Se uma codificação não for definida, objetos Buffer serão recebidos.
req.setEncoding('utf8')
// Streams legíveis emitem eventos 'data' assim que um listener é adicionado.
req.on('data', chunk => {
body += chunk
})
// O evento 'end' indica que todo o corpo foi recebido.
req.on('end', () => {
try {
const data = JSON.parse(body)
// Escrever algo interessante de volta para o usuário:
res.write(typeof data)
res.end()
} catch (er) {
// opa! json inválido!
res.statusCode = 400
return res.end(`error: ${er.message}`)
}
})
})
server.listen(1337)
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON
Streams Writable
(como res
no exemplo) expõem métodos como write()
e end()
que são usados para escrever dados no stream.
Streams Readable
usam a API EventEmitter
para notificar o código do aplicativo quando os dados estão disponíveis para leitura no stream. Esses dados disponíveis podem ser lidos do stream de várias maneiras.
Tanto os streams Writable
quanto os streams Readable
usam a API EventEmitter
de várias maneiras para comunicar o estado atual do stream.
Streams Duplex
e Transform
são ambos Writable
e Readable
.
Aplicativos que estão escrevendo dados ou consumindo dados de um stream não precisam implementar as interfaces de stream diretamente e geralmente não terão motivo para chamar require('node:stream')
.
Desenvolvedores que desejam implementar novos tipos de streams devem consultar a seção API para implementadores de stream.
Fluxos graváveis
Fluxos graváveis são uma abstração para um destino para o qual os dados são escritos.
Exemplos de fluxos Writable
incluem:
- Pedidos HTTP, no cliente
- Respostas HTTP, no servidor
- Fluxos de gravação fs
- Fluxos zlib
- Fluxos criptográficos
- Sockets TCP
- Entrada padrão do processo filho
process.stdout
,process.stderr
Alguns desses exemplos são, na verdade, fluxos Duplex
que implementam a interface Writable
.
Todos os fluxos Writable
implementam a interface definida pela classe stream.Writable
.
Embora instâncias específicas de fluxos Writable
possam diferir de várias maneiras, todos os fluxos Writable
seguem o mesmo padrão de uso fundamental, como ilustrado no exemplo abaixo:
const myStream = getWritableStreamSomehow()
myStream.write('some data')
myStream.write('some more data')
myStream.end('done writing data')
Classe: stream.Writable
Adicionado em: v0.9.4
Evento: 'close'
[Histórico]
Versão | Alterações |
---|---|
v10.0.0 | Adiciona a opção emitClose para especificar se 'close' é emitido em destroy. |
v0.9.4 | Adicionada em: v0.9.4 |
O evento 'close'
é emitido quando o fluxo e quaisquer recursos subjacentes (um descritor de arquivo, por exemplo) foram fechados. O evento indica que nenhum outro evento será emitido e nenhum cálculo adicional ocorrerá.
Um fluxo Writable
sempre emitirá o evento 'close'
se for criado com a opção emitClose
.
Evento: 'drain'
Adicionado em: v0.9.4
Se uma chamada para stream.write(chunk)
retornar false
, o evento 'drain'
será emitido quando for apropriado retomar a gravação de dados no fluxo.
// Escreva os dados no fluxo gravável fornecido um milhão de vezes.
// Esteja atento à contrapressão.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000
write()
function write() {
let ok = true
do {
i--
if (i === 0) {
// Última vez!
writer.write(data, encoding, callback)
} else {
// Veja se devemos continuar ou esperar.
// Não passe o callback, porque ainda não terminamos.
ok = writer.write(data, encoding)
}
} while (i > 0 && ok)
if (i > 0) {
// Teve que parar cedo!
// Escreva mais uma vez que esvaiar.
writer.once('drain', write)
}
}
}
Evento: 'error'
Adicionado em: v0.9.4
O evento 'error'
é emitido se ocorrer um erro durante a escrita ou o envio de dados por pipe. O callback do listener recebe um único argumento Error
quando chamado.
O fluxo é fechado quando o evento 'error'
é emitido, a menos que a opção autoDestroy
tenha sido definida como false
ao criar o fluxo.
Após 'error'
, nenhum outro evento além de 'close'
deve ser emitido (incluindo eventos 'error'
).
Evento: 'finish'
Adicionado em: v0.9.4
O evento 'finish'
é emitido depois que o método stream.end()
foi chamado e todos os dados foram enviados para o sistema subjacente.
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
console.log('Todas as escritas estão agora completas.')
})
writer.end('Este é o fim\n')
Evento: 'pipe'
Adicionado em: v0.9.4
src
<stream.Readable> fluxo de origem que está sendo enviado por pipe para este gravável
O evento 'pipe'
é emitido quando o método stream.pipe()
é chamado em um fluxo legível, adicionando este gravável ao seu conjunto de destinos.
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
console.log('Algo está sendo enviado por pipe para o gravador.')
assert.equal(src, reader)
})
reader.pipe(writer)
Evento: 'unpipe'
Adicionado em: v0.9.4
src
<stream.Readable> O fluxo de origem que desfez o pipe deste gravável
O evento 'unpipe'
é emitido quando o método stream.unpipe()
é chamado em um fluxo Readable
, removendo este Writable
do seu conjunto de destinos.
Isso também é emitido caso este fluxo Writable
emita um erro quando um fluxo Readable
o utiliza por pipe.
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
console.log('Algo parou de ser enviado por pipe para o gravador.')
assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()
Adicionado em: v0.11.2
O método writable.cork()
força que todos os dados escritos sejam armazenados em buffer na memória. Os dados em buffer serão liberados quando os métodos stream.uncork()
ou stream.end()
forem chamados.
A intenção principal de writable.cork()
é acomodar uma situação em que vários pequenos chunks são escritos para o stream em rápida sucessão. Em vez de encaminhá-los imediatamente para o destino subjacente, writable.cork()
armazena em buffer todos os chunks até que writable.uncork()
seja chamado, o que passará todos eles para writable._writev()
, se presente. Isso evita uma situação de bloqueio head-of-line onde os dados estão sendo armazenados em buffer enquanto aguardam o processamento do primeiro pequeno chunk. No entanto, o uso de writable.cork()
sem implementar writable._writev()
pode ter um efeito adverso na taxa de transferência.
Veja também: writable.uncork()
, writable._writev()
.
writable.destroy([error])
[Histórico]
Versão | Alterações |
---|---|
v14.0.0 | Funciona como uma operação sem efeito em um stream que já foi destruído. |
v8.0.0 | Adicionada em: v8.0.0 |
Destrói o stream. Opcionalmente, emite um evento 'error'
e emite um evento 'close'
(a menos que emitClose
esteja definido como false
). Após esta chamada, o stream gravável terminou e chamadas subsequentes a write()
ou end()
resultarão em um erro ERR_STREAM_DESTROYED
. Esta é uma maneira destrutiva e imediata de destruir um stream. Chamadas anteriores a write()
podem não ter sido drenadas e podem disparar um erro ERR_STREAM_DESTROYED
. Use end()
em vez de destruir se os dados devem ser liberados antes do fechamento, ou aguarde o evento 'drain'
antes de destruir o stream.
const { Writable } = require('node:stream')
const myStream = new Writable()
const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.on('error', function wontHappen() {})
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED
Uma vez que destroy()
foi chamado, quaisquer outras chamadas serão uma operação sem efeito e nenhum outro erro, exceto de _destroy()
, poderá ser emitido como 'error'
.
Os implementadores não devem substituir este método, mas sim implementar writable._destroy()
.
writable.closed
Adicionado em: v18.0.0
É true
depois que 'close'
foi emitido.
writable.destroyed
Adicionado em: v8.0.0
É true
depois que writable.destroy()
foi chamado.
const { Writable } = require('node:stream')
const myStream = new Writable()
console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])
[Histórico]
Versão | Alterações |
---|---|
v22.0.0, v20.13.0 | O argumento chunk agora pode ser uma instância de TypedArray ou DataView . |
v15.0.0 | O callback é chamado antes de 'finish' ou em caso de erro. |
v14.0.0 | O callback é chamado se 'finish' ou 'error' for emitido. |
v10.0.0 | Este método agora retorna uma referência para writable . |
v8.0.0 | O argumento chunk agora pode ser uma instância de Uint8Array . |
v0.9.4 | Adicionada em: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> Dados opcionais para escrever. Para fluxos que não operam em modo objeto,chunk
deve ser uma <string>, <Buffer>, <TypedArray> ou <DataView>. Para fluxos em modo objeto,chunk
pode ser qualquer valor JavaScript diferente denull
.encoding
<string> A codificação sechunk
for uma stringcallback
<Function> Callback para quando o fluxo terminar.- Retorna: <this>
Chamar o método writable.end()
sinaliza que nenhum dado adicional será escrito no Writable
. Os argumentos opcionais chunk
e encoding
permitem que um último bloco de dados adicional seja escrito imediatamente antes de fechar o fluxo.
Chamar o método stream.write()
depois de chamar stream.end()
gerará um erro.
// Escreve 'hello, ' e depois termina com 'world!'.
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hello, ')
file.end('world!')
// Escrever mais agora não é permitido!
writable.setDefaultEncoding(encoding)
[Histórico]
Versão | Alterações |
---|---|
v6.1.0 | Este método agora retorna uma referência para writable . |
v0.11.15 | Adicionado em: v0.11.15 |
O método writable.setDefaultEncoding()
define a codificação encoding
padrão para um fluxo Writable
.
writable.uncork()
Adicionado em: v0.11.2
O método writable.uncork()
limpa todos os dados em buffer desde que stream.cork()
foi chamado.
Quando usar writable.cork()
e writable.uncork()
para gerenciar o buffer de gravações em um fluxo, adiar chamadas para writable.uncork()
usando process.nextTick()
. Fazer isso permite o agrupamento de todas as chamadas writable.write()
que ocorrem dentro de uma determinada fase do loop de eventos do Node.js.
stream.cork()
stream.write('some ')
stream.write('data ')
process.nextTick(() => stream.uncork())
Se o método writable.cork()
for chamado várias vezes em um fluxo, o mesmo número de chamadas para writable.uncork()
deve ser chamado para limpar os dados em buffer.
stream.cork()
stream.write('some ')
stream.cork()
stream.write('data ')
process.nextTick(() => {
stream.uncork()
// Os dados não serão limpos até que uncork() seja chamado pela segunda vez.
stream.uncork()
})
Veja também: writable.cork()
.
writable.writable
Adicionado em: v11.4.0
É true
se for seguro chamar writable.write()
, o que significa que o fluxo não foi destruído, ocorreu um erro ou foi encerrado.
writable.writableAborted
Adicionado em: v18.0.0, v16.17.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
Retorna se o fluxo foi destruído ou ocorreu um erro antes de emitir 'finish'
.
writable.writableEnded
Adicionado em: v12.9.0
É true
depois que writable.end()
foi chamado. Esta propriedade não indica se os dados foram liberados; para isso, use writable.writableFinished
em vez disso.
writable.writableCorked
Adicionado em: v13.2.0, v12.16.0
Número de vezes que writable.uncork()
precisa ser chamado para destampar completamente o fluxo.
writable.errored
Adicionado em: v18.0.0
Retorna um erro se o fluxo foi destruído com um erro.
writable.writableFinished
Adicionado em: v12.6.0
É definido como true
imediatamente antes do evento 'finish'
ser emitido.
writable.writableHighWaterMark
Adicionado em: v9.3.0
Retorna o valor de highWaterMark
passado ao criar este Writable
.
writable.writableLength
Adicionado em: v9.4.0
Esta propriedade contém o número de bytes (ou objetos) na fila prontos para serem escritos. O valor fornece dados de introspecção sobre o estado do highWaterMark
.
writable.writableNeedDrain
Adicionado em: v15.2.0, v14.17.0
É true
se o buffer do fluxo estiver cheio e o fluxo emitir 'drain'
.
writable.writableObjectMode
Adicionado em: v12.3.0
Getter para a propriedade objectMode
de um fluxo Writable
fornecido.
writable[Symbol.asyncDispose]()
Adicionado em: v22.4.0, v20.16.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
Chama writable.destroy()
com um AbortError
e retorna uma promessa que se cumpre quando o fluxo terminar.
writable.write(chunk[, encoding][, callback])
[Histórico]
Versão | Alterações |
---|---|
v22.0.0, v20.13.0 | O argumento chunk agora pode ser uma instância de TypedArray ou DataView . |
v8.0.0 | O argumento chunk agora pode ser uma instância de Uint8Array . |
v6.0.0 | Passar null como parâmetro chunk será sempre considerado inválido agora, mesmo no modo objeto. |
v0.9.4 | Adicionada em: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> Dados opcionais para gravar. Para fluxos que não operam no modo objeto,chunk
deve ser uma <string>, <Buffer>, <TypedArray> ou <DataView>. Para fluxos em modo objeto,chunk
pode ser qualquer valor JavaScript diferente denull
.encoding
<string> | <null> A codificação, sechunk
for uma string. Padrão:'utf8'
callback
<Function> Callback para quando este bloco de dados for liberado.- Retorna: <boolean>
false
se o fluxo desejar que o código de chamada aguarde o evento'drain'
ser emitido antes de continuar a gravar dados adicionais; caso contrário,true
.
O método writable.write()
grava alguns dados no fluxo e chama o callback
fornecido assim que os dados forem totalmente processados. Se ocorrer um erro, o callback
será chamado com o erro como seu primeiro argumento. O callback
é chamado assincronamente e antes de 'error'
ser emitido.
O valor retornado é true
se o buffer interno for menor que o highWaterMark
configurado quando o fluxo foi criado após admitir chunk
. Se false
for retornado, novas tentativas de gravar dados no fluxo devem parar até que o evento 'drain'
seja emitido.
Enquanto um fluxo não está drenando, as chamadas para write()
armazenarão em buffer chunk
e retornarão false. Assim que todos os chunks atualmente armazenados em buffer forem drenados (aceitos para entrega pelo sistema operacional), o evento 'drain'
será emitido. Depois que write()
retornar false, não grave mais chunks até que o evento 'drain'
seja emitido. Embora chamar write()
em um fluxo que não está drenando seja permitido, o Node.js armazenará em buffer todos os chunks gravados até que o uso máximo de memória ocorra, momento em que ele abortará incondicionalmente. Mesmo antes de abortar, o alto uso de memória causará baixo desempenho do coletor de lixo e alto RSS (que normalmente não é liberado de volta para o sistema, mesmo depois que a memória não for mais necessária). Como as sockets TCP podem nunca drenar se o peer remoto não ler os dados, gravar em uma socket que não está drenando pode levar a uma vulnerabilidade explorável remotamente.
Gravar dados enquanto o fluxo não está drenando é particularmente problemático para um Transform
, porque os fluxos Transform
são pausados por padrão até que sejam canalizados ou um manipulador de eventos 'data'
ou 'readable'
seja adicionado.
Se os dados a serem escritos puderem ser gerados ou buscados sob demanda, recomenda-se encapsular a lógica em um Readable
e usar stream.pipe()
. No entanto, se chamar write()
for preferível, é possível respeitar a contrapressão e evitar problemas de memória usando o evento 'drain'
:
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb)
} else {
process.nextTick(cb)
}
}
// Aguarde até que cb seja chamado antes de fazer qualquer outra gravação.
write('hello', () => {
console.log('Gravação concluída, faça mais gravações agora.')
})
Um fluxo Writable
no modo objeto sempre ignorará o argumento encoding
.
Fluxos legíveis
Fluxos legíveis são uma abstração para uma fonte da qual os dados são consumidos.
Exemplos de fluxos Readable
incluem:
- Respostas HTTP, no cliente
- Solicitações HTTP, no servidor
- Fluxos de leitura fs
- Fluxos zlib
- Fluxos criptográficos
- Sockets TCP
- Saída padrão e stderr do processo filho
process.stdin
Todos os fluxos Readable
implementam a interface definida pela classe stream.Readable
.
Dois modos de leitura
Os fluxos Readable
operam efetivamente em um dos dois modos: fluxo e pausa. Esses modos são separados do modo objeto. Um fluxo Readable
pode estar no modo objeto ou não, independentemente de estar no modo de fluxo ou no modo de pausa.
- No modo de fluxo, os dados são lidos do sistema subjacente automaticamente e fornecidos a um aplicativo o mais rápido possível usando eventos por meio da interface
EventEmitter
. - No modo de pausa, o método
stream.read()
deve ser chamado explicitamente para ler blocos de dados do fluxo.
Todos os fluxos Readable
começam no modo de pausa, mas podem ser alterados para o modo de fluxo de uma das seguintes maneiras:
- Adicionando um manipulador de eventos
'data'
. - Chamando o método
stream.resume()
. - Chamando o método
stream.pipe()
para enviar os dados para umWritable
.
O Readable
pode alternar de volta para o modo de pausa usando um dos seguintes:
- Se não houver destinos de pipe, chamando o método
stream.pause()
. - Se houver destinos de pipe, removendo todos os destinos de pipe. Vários destinos de pipe podem ser removidos chamando o método
stream.unpipe()
.
O conceito importante a ser lembrado é que um Readable
não gerará dados até que um mecanismo para consumir ou ignorar esses dados seja fornecido. Se o mecanismo de consumo for desabilitado ou removido, o Readable
tentará parar de gerar os dados.
Por razões de compatibilidade com versões anteriores, a remoção dos manipuladores de eventos 'data'
não pausará automaticamente o fluxo. Além disso, se houver destinos de pipe, chamar stream.pause()
não garantirá que o fluxo permanecerá pausado depois que esses destinos esvaiarem e solicitarem mais dados.
Se um Readable
for alterado para o modo de fluxo e não houver consumidores disponíveis para lidar com os dados, esses dados serão perdidos. Isso pode ocorrer, por exemplo, quando o método readable.resume()
é chamado sem um ouvinte anexado ao evento 'data'
, ou quando um manipulador de eventos 'data'
é removido do fluxo.
Adicionar um manipulador de eventos 'readable'
faz com que o fluxo pare automaticamente, e os dados devem ser consumidos via readable.read()
. Se o manipulador de eventos 'readable'
for removido, o fluxo começará novamente se houver um manipulador de eventos 'data'
.
Três estados
Os "dois modos" de operação para um fluxo Readable
são uma abstração simplificada para a gestão de estado interno mais complicada que ocorre na implementação do fluxo Readable
.
Especificamente, em qualquer ponto no tempo, cada Readable
está em um dos três estados possíveis:
readable.readableFlowing === null
readable.readableFlowing === false
readable.readableFlowing === true
Quando readable.readableFlowing
é null
, nenhum mecanismo para consumir os dados do fluxo é fornecido. Portanto, o fluxo não gerará dados. Enquanto estiver neste estado, anexar um listener para o evento 'data'
, chamar o método readable.pipe()
ou chamar o método readable.resume()
mudará readable.readableFlowing
para true
, fazendo com que o Readable
comece a emitir eventos ativamente à medida que os dados são gerados.
Chamar readable.pause()
, readable.unpipe()
ou receber backpressure fará com que readable.readableFlowing
seja definido como false
, interrompendo temporariamente o fluxo de eventos, mas não interrompendo a geração de dados. Enquanto estiver neste estado, anexar um listener para o evento 'data'
não mudará readable.readableFlowing
para true
.
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()
pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing is now false.
pass.on('data', chunk => {
console.log(chunk.toString())
})
// readableFlowing is still false.
pass.write('ok') // Will not emit 'data'.
pass.resume() // Must be called to make stream emit 'data'.
// readableFlowing is now true.
Enquanto readable.readableFlowing
for false
, dados podem estar se acumulando no buffer interno do fluxo.
Escolha um estilo de API
A API do fluxo Readable
evoluiu ao longo de várias versões do Node.js e fornece vários métodos de consumo de dados do fluxo. Em geral, os desenvolvedores devem escolher um dos métodos de consumo de dados e nunca devem usar vários métodos para consumir dados de um único fluxo. Especificamente, usar uma combinação de on('data')
, on('readable')
, pipe()
ou iteradores assíncronos pode levar a um comportamento não intuitivo.
Classe: stream.Readable
Adicionado em: v0.9.4
Evento: 'close'
[Histórico]
Versão | Alterações |
---|---|
v10.0.0 | Adiciona a opção emitClose para especificar se 'close' é emitido em destroy. |
v0.9.4 | Adicionada em: v0.9.4 |
O evento 'close'
é emitido quando o fluxo e quaisquer recursos subjacentes (um descritor de arquivo, por exemplo) foram fechados. O evento indica que nenhum outro evento será emitido e nenhuma computação adicional ocorrerá.
Um fluxo Readable
sempre emitirá o evento 'close'
se for criado com a opção emitClose
.
Evento: 'data'
Adicionado em: v0.9.4
chunk
<Buffer> | <string> | <any> O bloco de dados. Para fluxos que não estão operando no modo objeto, o bloco será uma string ouBuffer
. Para fluxos que estão no modo objeto, o bloco pode ser qualquer valor JavaScript diferente denull
.
O evento 'data'
é emitido sempre que o fluxo está renunciando à propriedade de um bloco de dados para um consumidor. Isso pode ocorrer sempre que o fluxo é comutado no modo de fluxo chamando readable.pipe()
, readable.resume()
, ou anexando um callback de ouvinte ao evento 'data'
. O evento 'data'
também será emitido sempre que o método readable.read()
for chamado e um bloco de dados estiver disponível para ser retornado.
Anexar um ouvinte de evento 'data'
a um fluxo que não foi explicitamente pausado mudará o fluxo para o modo de fluxo. Os dados serão então passados assim que estiverem disponíveis.
O callback do ouvinte receberá o bloco de dados como uma string se uma codificação padrão tiver sido especificada para o fluxo usando o método readable.setEncoding()
; caso contrário, os dados serão passados como um Buffer
.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Recebido ${chunk.length} bytes de dados.`)
})
Evento: 'end'
Adicionado em: v0.9.4
O evento 'end'
é emitido quando não há mais dados para serem consumidos do fluxo.
O evento 'end'
não será emitido a menos que os dados sejam completamente consumidos. Isso pode ser alcançado mudando o fluxo para o modo de fluxo, ou chamando stream.read()
repetidamente até que todos os dados tenham sido consumidos.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Recebido ${chunk.length} bytes de dados.`)
})
readable.on('end', () => {
console.log('Não haverá mais dados.')
})
Evento: 'error'
Adicionado em: v0.9.4
O evento 'error'
pode ser emitido por uma implementação Readable
a qualquer momento. Normalmente, isso pode ocorrer se o fluxo subjacente não conseguir gerar dados devido a uma falha interna subjacente, ou quando uma implementação de fluxo tenta enviar um bloco de dados inválido.
O callback do listener receberá um único objeto Error
.
Evento: 'pause'
Adicionado em: v0.9.4
O evento 'pause'
é emitido quando stream.pause()
é chamado e readableFlowing
não é false
.
Evento: 'readable'
[Histórico]
Versão | Alterações |
---|---|
v10.0.0 | O 'readable' é sempre emitido no próximo tick após a chamada de .push() . |
v10.0.0 | Usar 'readable' requer chamar .read() . |
v0.9.4 | Adicionada em: v0.9.4 |
O evento 'readable'
é emitido quando há dados disponíveis para serem lidos do fluxo, até a marca d'água alta configurada (state.highWaterMark
). Efetivamente, indica que o fluxo tem novas informações dentro do buffer. Se os dados estiverem disponíveis dentro deste buffer, stream.read()
pode ser chamado para recuperar esses dados. Além disso, o evento 'readable'
também pode ser emitido quando o fim do fluxo for atingido.
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
// Há alguns dados para ler agora.
let data
while ((data = this.read()) !== null) {
console.log(data)
}
})
Se o fim do fluxo for atingido, chamar stream.read()
retornará null
e acionará o evento 'end'
. Isso também é verdadeiro se nunca houve dados a serem lidos. Por exemplo, no exemplo a seguir, foo.txt
é um arquivo vazio:
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
console.log('end')
})
A saída da execução deste script é:
$ node test.js
readable: null
end
Em alguns casos, anexar um listener para o evento 'readable'
fará com que uma certa quantidade de dados seja lida para um buffer interno.
Em geral, os mecanismos de evento readable.pipe()
e 'data'
são mais fáceis de entender do que o evento 'readable'
. No entanto, o tratamento de 'readable'
pode resultar em maior taxa de transferência.
Se ambos 'readable'
e 'data'
forem usados ao mesmo tempo, 'readable'
terá precedência no controle do fluxo, ou seja, 'data'
será emitido apenas quando stream.read()
for chamado. A propriedade readableFlowing
se tornará false
. Se houver listeners 'data'
quando 'readable'
for removido, o fluxo começará a fluir, ou seja, os eventos 'data'
serão emitidos sem chamar .resume()
.
Evento: 'resume'
Adicionado em: v0.9.4
O evento 'resume'
é emitido quando stream.resume()
é chamado e readableFlowing
não é true
.
readable.destroy([error])
[Histórico]
Versão | Alterações |
---|---|
v14.0.0 | Funciona como uma operação sem efeito colateral em um stream que já foi destruído. |
v8.0.0 | Adicionada em: v8.0.0 |
Destrói o stream. Opcionalmente emite um evento 'error'
e emite um evento 'close'
(a menos que emitClose
esteja definido como false
). Após esta chamada, o stream legível liberará quaisquer recursos internos e chamadas subsequentes a push()
serão ignoradas.
Assim que destroy()
for chamado, quaisquer outras chamadas serão uma operação sem efeito colateral e nenhum outro erro, exceto de _destroy()
, poderá ser emitido como 'error'
.
Os implementadores não devem substituir este método, mas sim implementar readable._destroy()
.
readable.closed
Adicionado em: v18.0.0
É true
depois que 'close'
foi emitido.
readable.destroyed
Adicionado em: v8.0.0
É true
depois que readable.destroy()
foi chamado.
readable.isPaused()
Adicionado em: v0.11.14
- Retorna: <boolean>
O método readable.isPaused()
retorna o estado operacional atual do Readable
. Isso é usado principalmente pelo mecanismo que está por trás do método readable.pipe()
. Na maioria dos casos típicos, não haverá razão para usar este método diretamente.
const readable = new stream.Readable()
readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()
Adicionado em: v0.9.4
- Retorna: <this>
O método readable.pause()
fará com que um fluxo no modo de fluxo pare de emitir eventos 'data'
, saindo do modo de fluxo. Quaisquer dados que se tornarem disponíveis permanecerão no buffer interno.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Recebido ${chunk.length} bytes de dados.`)
readable.pause()
console.log('Não haverá dados adicionais por 1 segundo.')
setTimeout(() => {
console.log('Agora os dados começarão a fluir novamente.')
readable.resume()
}, 1000)
})
O método readable.pause()
não tem efeito se houver um ouvinte de eventos 'readable'
.
readable.pipe(destination[, options])
Adicionado em: v0.9.4
destination
<stream.Writable> O destino para gravação de dadosoptions
<Object> Opções de pipeend
<boolean> Encerra o escritor quando o leitor termina. Padrão:true
.
Retorna: <stream.Writable> O destino, permitindo uma cadeia de pipes se for um fluxo
Duplex
ouTransform
O método readable.pipe()
anexa um fluxo Writable
ao readable
, fazendo com que ele alterne automaticamente para o modo de fluxo e empurre todos os seus dados para o Writable
anexado. O fluxo de dados será gerenciado automaticamente para que o fluxo de destino Writable
não seja sobrecarregado por um fluxo Readable
mais rápido.
O exemplo a seguir envia todos os dados do readable
para um arquivo chamado file.txt
:
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Todos os dados de readable vão para 'file.txt'.
readable.pipe(writable)
É possível anexar vários fluxos Writable
a um único fluxo Readable
.
O método readable.pipe()
retorna uma referência ao fluxo destino, tornando possível configurar cadeias de fluxos conectados:
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)
Por padrão, stream.end()
é chamado no fluxo de destino Writable
quando o fluxo de origem Readable
emite 'end'
, de modo que o destino não seja mais gravável. Para desabilitar esse comportamento padrão, a opção end
pode ser passada como false
, fazendo com que o fluxo de destino permaneça aberto:
reader.pipe(writer, { end: false })
reader.on('end', () => {
writer.end('Goodbye\n')
})
Uma ressalva importante é que se o fluxo Readable
emitir um erro durante o processamento, o destino Writable
não é fechado automaticamente. Se ocorrer um erro, será necessário fechar manualmente cada fluxo para evitar vazamentos de memória.
Os fluxos Writable
process.stderr
e process.stdout
nunca são fechados até que o processo Node.js termine, independentemente das opções especificadas.
readable.read([size])
Adicionado em: v0.9.4
size
<number> Argumento opcional para especificar quanta informação ler.- Retorna: <string> | <Buffer> | <null> | <any>
O método readable.read()
lê dados do buffer interno e os retorna. Se não houver dados disponíveis para leitura, null
é retornado. Por padrão, os dados são retornados como um objeto Buffer
, a menos que uma codificação tenha sido especificada usando o método readable.setEncoding()
ou o stream esteja operando em modo objeto.
O argumento opcional size
especifica um número específico de bytes a serem lidos. Se size
bytes não estiverem disponíveis para leitura, null
será retornado, a menos que o stream tenha terminado, caso em que todos os dados restantes no buffer interno serão retornados.
Se o argumento size
não for especificado, todos os dados contidos no buffer interno serão retornados.
O argumento size
deve ser menor ou igual a 1 GiB.
O método readable.read()
só deve ser chamado em streams Readable
operando em modo pausado. No modo de fluxo, readable.read()
é chamado automaticamente até que o buffer interno esteja totalmente esvaziado.
const readable = getReadableStreamSomehow()
// 'readable' pode ser acionado várias vezes à medida que os dados são armazenados em buffer
readable.on('readable', () => {
let chunk
console.log('Stream é legível (novos dados recebidos no buffer)')
// Use um loop para garantir que lemos todos os dados atualmente disponíveis
while (null !== (chunk = readable.read())) {
console.log(`Lidos ${chunk.length} bytes de dados...`)
}
})
// 'end' será acionado uma vez quando não houver mais dados disponíveis
readable.on('end', () => {
console.log('Chegou ao fim do stream.')
})
Cada chamada para readable.read()
retorna um pedaço de dados ou null
, significando que não há mais dados para ler naquele momento. Esses pedaços não são concatenados automaticamente. Como uma única chamada read()
não retorna todos os dados, pode ser necessário usar um loop while
para ler continuamente os pedaços até que todos os dados sejam recuperados. Ao ler um arquivo grande, .read()
pode retornar null
temporariamente, indicando que consumiu todo o conteúdo em buffer, mas pode haver mais dados ainda a serem armazenados em buffer. Nesses casos, um novo evento 'readable'
é emitido assim que houver mais dados no buffer, e o evento 'end'
significa o fim da transmissão de dados.
Portanto, para ler todo o conteúdo de um arquivo de um readable
, é necessário coletar pedaços em vários eventos 'readable'
:
const chunks = []
readable.on('readable', () => {
let chunk
while (null !== (chunk = readable.read())) {
chunks.push(chunk)
}
})
readable.on('end', () => {
const content = chunks.join('')
})
Um stream Readable
em modo objeto sempre retornará um único item de uma chamada para readable.read(size)
, independentemente do valor do argumento size
.
Se o método readable.read()
retornar um pedaço de dados, um evento 'data'
também será emitido.
Chamar stream.read([size])
depois que o evento 'end'
foi emitido retornará null
. Nenhum erro de tempo de execução será levantado.
readable.readable
Adicionado em: v11.4.0
É true
se for seguro chamar readable.read()
, o que significa que o stream não foi destruído ou emitiu 'error'
ou 'end'
.
readable.readableAborted
Adicionado em: v16.8.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
Retorna se o stream foi destruído ou apresentou erro antes de emitir 'end'
.
readable.readableDidRead
Adicionado em: v16.7.0, v14.18.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
Retorna se 'data'
foi emitido.
readable.readableEncoding
Adicionado em: v12.7.0
Getter para a propriedade encoding
de um dado stream Readable
. A propriedade encoding
pode ser definida usando o método readable.setEncoding()
.
readable.readableEnded
Adicionado em: v12.9.0
Torna-se true
quando o evento 'end'
é emitido.
readable.errored
Adicionado em: v18.0.0
Retorna um erro se o stream foi destruído com um erro.
readable.readableFlowing
Adicionado em: v9.4.0
Esta propriedade reflete o estado atual de um stream Readable
como descrito na seção Três estados.
readable.readableHighWaterMark
Adicionado em: v9.3.0
Retorna o valor de highWaterMark
passado ao criar este Readable
.
readable.readableLength
Adicionado em: v9.4.0
Esta propriedade contém o número de bytes (ou objetos) na fila prontos para serem lidos. O valor fornece dados de introspecção sobre o estado do highWaterMark
.
readable.readableObjectMode
Adicionado em: v12.3.0
Getter para a propriedade objectMode
de um dado fluxo Readable
.
readable.resume()
[Histórico]
Versão | Alterações |
---|---|
v10.0.0 | O resume() não tem efeito se houver um ouvinte de evento 'readable' . |
v0.9.4 | Adicionada em: v0.9.4 |
- Retorna: <this>
O método readable.resume()
faz com que um fluxo Readable
explicitamente pausado retome a emissão de eventos 'data'
, alternando o fluxo para o modo de fluxo.
O método readable.resume()
pode ser usado para consumir completamente os dados de um fluxo sem realmente processar nenhum desses dados:
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Alcançou o fim, mas não leu nada.')
})
O método readable.resume()
não tem efeito se houver um ouvinte de evento 'readable'
.
readable.setEncoding(encoding)
Adicionado em: v0.9.4
O método readable.setEncoding()
define a codificação de caracteres para os dados lidos do fluxo Readable
.
Por padrão, nenhuma codificação é atribuída e os dados do fluxo serão retornados como objetos Buffer
. Definir uma codificação faz com que os dados do fluxo sejam retornados como strings da codificação especificada em vez de como objetos Buffer
. Por exemplo, chamar readable.setEncoding('utf8')
fará com que os dados de saída sejam interpretados como dados UTF-8 e passados como strings. Chamar readable.setEncoding('hex')
fará com que os dados sejam codificados no formato de string hexadecimal.
O fluxo Readable
tratará adequadamente os caracteres de vários bytes entregues através do fluxo que, de outra forma, seriam decodificados incorretamente se simplesmente puxados do fluxo como objetos Buffer
.
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
assert.equal(typeof chunk, 'string')
console.log('Obteve %d caracteres de dados de string:', chunk.length)
})
readable.unpipe([destination])
Adicionado em: v0.9.4
destination
<stream.Writable> Stream específico opcional para desvincular- Retorna: <this>
O método readable.unpipe()
destaca um stream Writable
previamente anexado usando o método stream.pipe()
.
Se o destination
não for especificado, então todos os pipes serão destacados.
Se o destination
for especificado, mas nenhum pipe for configurado para ele, então o método não faz nada.
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Todos os dados de readable vão para 'file.txt',
// mas apenas pelo primeiro segundo.
readable.pipe(writable)
setTimeout(() => {
console.log('Parar de escrever em file.txt.')
readable.unpipe(writable)
console.log('Fechar manualmente o stream de arquivo.')
writable.end()
}, 1000)
readable.unshift(chunk[, encoding])
[Histórico]
Versão | Alterações |
---|---|
v22.0.0, v20.13.0 | O argumento chunk agora pode ser uma instância de TypedArray ou DataView . |
v8.0.0 | O argumento chunk agora pode ser uma instância de Uint8Array . |
v0.9.11 | Adicionada em: v0.9.11 |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Fragmento de dados para inserir na fila de leitura. Para streams que não operam no modo objeto,chunk
deve ser uma <string>, <Buffer>, <TypedArray>, <DataView> ounull
. Para streams no modo objeto,chunk
pode ser qualquer valor JavaScript.encoding
<string> Codificação de fragmentos de string. Deve ser uma codificaçãoBuffer
válida, como'utf8'
ou'ascii'
.
Passando chunk
como null
sinaliza o fim do stream (EOF) e se comporta da mesma forma que readable.push(null)
, após o qual nenhum dado poderá mais ser escrito. O sinal EOF é colocado no final do buffer e quaisquer dados em buffer ainda serão liberados.
O método readable.unshift()
insere um fragmento de dados de volta no buffer interno. Isso é útil em certas situações em que um stream está sendo consumido por código que precisa "des-consumir" uma certa quantidade de dados que ele retirou otimisticamente da fonte, para que os dados possam ser repassados a outra parte.
O método stream.unshift(chunk)
não pode ser chamado após o evento 'end'
ter sido emitido ou um erro de tempo de execução será lançado.
Desenvolvedores que usam stream.unshift()
geralmente devem considerar a mudança para o uso de um stream Transform
em vez disso. Consulte a seção API para implementadores de stream para obter mais informações.
// Extrair um cabeçalho delimitado por \n\n.
// Usar unshift() se recebermos muito.
// Chamar o callback com (erro, cabeçalho, stream).
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
stream.on('error', callback)
stream.on('readable', onReadable)
const decoder = new StringDecoder('utf8')
let header = ''
function onReadable() {
let chunk
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk)
if (str.includes('\n\n')) {
// Encontrou o limite do cabeçalho.
const split = str.split(/\n\n/)
header += split.shift()
const remaining = split.join('\n\n')
const buf = Buffer.from(remaining, 'utf8')
stream.removeListener('error', callback)
// Remover o listener 'readable' antes de unshift.
stream.removeListener('readable', onReadable)
if (buf.length) stream.unshift(buf)
// Agora o corpo da mensagem pode ser lido do stream.
callback(null, header, stream)
return
}
// Ainda lendo o cabeçalho.
header += str
}
}
}
Ao contrário de stream.push(chunk)
, stream.unshift(chunk)
não terminará o processo de leitura redefinindo o estado interno de leitura do stream. Isso pode causar resultados inesperados se readable.unshift()
for chamado durante uma leitura (ou seja, de dentro de uma implementação stream._read()
em um stream personalizado). Seguir a chamada para readable.unshift()
com um stream.push('')
imediato redefinirá o estado de leitura adequadamente, no entanto, é melhor simplesmente evitar chamar readable.unshift()
enquanto estiver no processo de realizar uma leitura.
readable.wrap(stream)
Adicionado em: v0.9.4
Antes do Node.js 0.10, os streams não implementavam toda a API do módulo node:stream
como atualmente definida. (Consulte Compatibilidade para obter mais informações.)
Ao usar uma biblioteca Node.js mais antiga que emite eventos 'data'
e possui um método stream.pause()
que é apenas consultivo, o método readable.wrap()
pode ser usado para criar um stream Readable
que usa o stream antigo como sua fonte de dados.
Raramente será necessário usar readable.wrap()
, mas o método foi fornecido como uma conveniência para interagir com aplicativos e bibliotecas Node.js mais antigos.
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)
myReader.on('readable', () => {
myReader.read() // etc.
})
readable[Symbol.asyncIterator]()
[Histórico]
Versão | Alterações |
---|---|
v11.14.0 | O suporte a Symbol.asyncIterator não é mais experimental. |
v10.0.0 | Adicionada em: v10.0.0 |
- Retorna: <AsyncIterator> para consumir totalmente o stream.
const fs = require('node:fs')
async function print(readable) {
readable.setEncoding('utf8')
let data = ''
for await (const chunk of readable) {
data += chunk
}
console.log(data)
}
print(fs.createReadStream('file')).catch(console.error)
Se o loop terminar com um break
, return
ou um throw
, o stream será destruído. Em outras palavras, iterar sobre um stream consumirá o stream completamente. O stream será lido em chunks de tamanho igual à opção highWaterMark
. No exemplo de código acima, os dados estarão em um único chunk se o arquivo tiver menos de 64 KiB de dados, pois nenhuma opção highWaterMark
é fornecida para fs.createReadStream()
.
readable[Symbol.asyncDispose]()
Adicionado em: v20.4.0, v18.18.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
Chama readable.destroy()
com um AbortError
e retorna uma promessa que se cumpre quando o stream termina.
readable.compose(stream[, options])
Adicionado em: v19.1.0, v18.13.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
stream
<Stream> | <Iterable> | <AsyncIterable> | <Function>options
<Object>signal
<AbortSignal> permite destruir o stream se o sinal for abortado.
Retorna: <Duplex> um stream composto com o stream
stream
.
import { Readable } from 'node:stream'
async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ')
for (const word of words) {
yield word
}
}
}
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()
console.log(words) // imprime ['this', 'is', 'compose', 'as', 'operator']
Veja stream.compose
para mais informações.
readable.iterator([options])
Adicionado em: v16.3.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
options
<Object>destroyOnReturn
<boolean> Quando definido comofalse
, chamarreturn
no iterador assíncrono, ou sair de uma iteraçãofor await...of
usandobreak
,return
outhrow
não destruirá o stream. Padrão:true
.
Retorna: <AsyncIterator> para consumir o stream.
O iterador criado por este método dá aos usuários a opção de cancelar a destruição do stream se o loop for await...of
for interrompido por return
, break
ou throw
, ou se o iterador deve destruir o stream se o stream emitiu um erro durante a iteração.
const { Readable } = require('node:stream')
async function printIterator(readable) {
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk) // 1
break
}
console.log(readable.destroyed) // false
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk) // Imprimirá 2 e depois 3
}
console.log(readable.destroyed) // True, stream foi totalmente consumido
}
async function printSymbolAsyncIterator(readable) {
for await (const chunk of readable) {
console.log(chunk) // 1
break
}
console.log(readable.destroyed) // true
}
async function showBoth() {
await printIterator(Readable.from([1, 2, 3]))
await printSymbolAsyncIterator(Readable.from([1, 2, 3]))
}
showBoth()
readable.map(fn[, options])
[Histórico]
Versão | Alterações |
---|---|
v20.7.0, v18.19.0 | adicionado highWaterMark nas opções. |
v17.4.0, v16.14.0 | Adicionado em: v17.4.0, v16.14.0 |
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
fn
<Função> | <AsyncFunction> uma função para mapear cada chunk no stream.data
<qualquer> um chunk de dados do stream.options
<Objeto>signal
<AbortSignal> abortado se o stream for destruído, permitindo abortar a chamadafn
antecipadamente.
options
<Objeto>concurrency
<número> a invocação concorrente máxima defn
a ser chamada no stream de uma vez. Padrão:1
.highWaterMark
<número> quantos itens armazenar em buffer enquanto aguarda o consumo pelo usuário dos itens mapeados. Padrão:concurrency * 2 - 1
.signal
<AbortSignal> permite destruir o stream se o sinal for abortado.
Retorna: <Readable> um stream mapeado com a função
fn
.
Este método permite mapear o stream. A função fn
será chamada para cada chunk no stream. Se a função fn
retornar uma promise, essa promise será await
ed antes de ser passada para o stream de resultado.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// Com um mapeador síncrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).map(x => x * 2)) {
console.log(chunk) // 2, 4, 6, 8
}
// Com um mapeador assíncrono, fazendo no máximo 2 consultas por vez.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
domain => resolver.resolve4(domain),
{ concurrency: 2 }
)
for await (const result of dnsResults) {
console.log(result) // Registra o resultado DNS de resolver.resolve4.
}
readable.filter(fn[, options])
[Histórico]
Versão | Alterações |
---|---|
v20.7.0, v18.19.0 | adicionou highWaterMark nas opções. |
v17.4.0, v16.14.0 | Adicionada em: v17.4.0, v16.14.0 |
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
fn
<Function> | <AsyncFunction> uma função para filtrar chunks do stream.data
<any> um chunk de dados do stream.options
<Object>signal
<AbortSignal> aborta se o stream for destruído, permitindo abortar a chamadafn
antecipadamente.
options
<Object>concurrency
<number> a invocação concorrente máxima defn
a ser chamada no stream de uma só vez. Padrão:1
.highWaterMark
<number> quantos itens bufferizar enquanto aguarda o consumo pelo usuário dos itens filtrados. Padrão:concurrency * 2 - 1
.signal
<AbortSignal> permite destruir o stream se o sinal for abortado.
Retorna: <Readable> um stream filtrado com o predicado
fn
.
Este método permite filtrar o stream. Para cada chunk no stream, a função fn
será chamada e, se retornar um valor truthy, o chunk será passado para o stream de resultado. Se a função fn
retornar uma promise, essa promise será await
ed.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// Com um predicado síncrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
console.log(chunk) // 3, 4
}
// Com um predicado assíncrono, fazendo no máximo 2 consultas por vez.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).filter(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address.ttl > 60
},
{ concurrency: 2 }
)
for await (const result of dnsResults) {
// Registra domínios com mais de 60 segundos no registo DNS resolvido.
console.log(result)
}
readable.forEach(fn[, options])
Adicionado em: v17.5.0, v16.15.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
fn
<Function> | <AsyncFunction> uma função a ser chamada em cada pedaço do stream.data
<any> um pedaço de dados do stream.options
<Object>signal
<AbortSignal> abortado se o stream for destruído, permitindo abortar a chamadafn
antecipadamente.
options
<Object>concurrency
<number> a invocação concorrente máxima defn
a ser chamada no stream de uma vez. Padrão:1
.signal
<AbortSignal> permite destruir o stream se o sinal for abortado.
Retorna: <Promise> uma promise para quando o stream tiver terminado.
Este método permite iterar um stream. Para cada pedaço no stream, a função fn
será chamada. Se a função fn
retornar uma promise, essa promise será await
ed.
Este método é diferente dos loops for await...of
no sentido de que ele pode opcionalmente processar pedaços concorrentemente. Além disso, uma iteração forEach
só pode ser interrompida passando uma opção signal
e abortando o AbortController
relacionado, enquanto for await...of
pode ser interrompido com break
ou return
. Em qualquer caso, o stream será destruído.
Este método é diferente de ouvir o evento 'data'
porque usa o evento readable
na maquinaria subjacente e pode limitar o número de chamadas concorrentes de fn
.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// Com um predicado síncrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
console.log(chunk) // 3, 4
}
// Com um predicado assíncrono, fazendo no máximo 2 consultas por vez.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address
},
{ concurrency: 2 }
)
await dnsResults.forEach(result => {
// Registra o resultado, semelhante a `for await (const result of dnsResults)`
console.log(result)
})
console.log('done') // O stream terminou
readable.toArray([options])
Adicionado em: v17.5.0, v16.15.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
options
<Object>signal
<AbortSignal> permite cancelar a operação toArray se o sinal for abortado.
Retorna: <Promise> uma promessa contendo um array com o conteúdo do stream.
Este método permite obter facilmente o conteúdo de um stream.
Como este método lê todo o stream na memória, ele nega os benefícios dos streams. Ele é destinado à interoperabilidade e conveniência, não como a maneira principal de consumir streams.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
await Readable.from([1, 2, 3, 4]).toArray() // [1, 2, 3, 4]
// Faz consultas dns concorrentemente usando .map e coleta
// os resultados em um array usando toArray
const dnsResults = await Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org'])
.map(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address
},
{ concurrency: 2 }
)
.toArray()
readable.some(fn[, options])
Adicionado em: v17.5.0, v16.15.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
fn
<Function> | <AsyncFunction> uma função para chamar em cada chunk do stream.data
<any> um chunk de dados do stream.options
<Object>signal
<AbortSignal> abortado se o stream for destruído, permitindo abortar a chamadafn
antecipadamente.
options
<Object>concurrency
<number> a invocação concorrente máxima defn
para chamar no stream de uma vez. Padrão:1
.signal
<AbortSignal> permite destruir o stream se o sinal for abortado.
Retorna: <Promise> uma promessa que resulta em
true
sefn
retornou um valor truthy para pelo menos um dos chunks.
Este método é semelhante ao Array.prototype.some
e chama fn
em cada chunk do stream até que o valor retornado aguardado seja true
(ou qualquer valor truthy). Assim que uma chamada fn
em um chunk tiver um valor de retorno aguardado truthy, o stream é destruído e a promessa é cumprida com true
. Se nenhuma das chamadas fn
nos chunks retornar um valor truthy, a promessa é cumprida com false
.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// Com um predicado síncrono.
await Readable.from([1, 2, 3, 4]).some(x => x > 2) // true
await Readable.from([1, 2, 3, 4]).some(x => x < 0) // false
// Com um predicado assíncrono, fazendo no máximo 2 verificações de arquivo por vez.
const anyBigFile = await Readable.from(['file1', 'file2', 'file3']).some(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
console.log(anyBigFile) // `true` se qualquer arquivo na lista for maior que 1MB
console.log('done') // O Stream terminou
readable.find(fn[, options])
Adicionado em: v17.5.0, v16.17.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
fn
<Function> | <AsyncFunction> uma função a ser chamada em cada parte do fluxo.data
<any> uma parte de dados do fluxo.options
<Object>signal
<AbortSignal> aborta se o fluxo for destruído, permitindo abortar a chamada defn
antecipadamente.
options
<Object>concurrency
<number> a invocação simultânea máxima defn
a ser chamada no fluxo de uma vez. Padrão:1
.signal
<AbortSignal> permite destruir o fluxo se o sinal for abortado.
Retorna: <Promise> uma promise que avalia para a primeira parte para a qual
fn
avaliou com um valor verdadeiro, ouundefined
se nenhum elemento foi encontrado.
Este método é semelhante a Array.prototype.find
e chama fn
em cada parte do fluxo para encontrar uma parte com um valor verdadeiro para fn
. Assim que o valor retornado aguardado de uma chamada fn
for verdadeiro, o fluxo é destruído e a promise é cumprida com o valor para o qual fn
retornou um valor verdadeiro. Se todas as chamadas fn
nas partes retornarem um valor falso, a promise é cumprida com undefined
.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// Com um predicado síncrono.
await Readable.from([1, 2, 3, 4]).find(x => x > 2) // 3
await Readable.from([1, 2, 3, 4]).find(x => x > 0) // 1
await Readable.from([1, 2, 3, 4]).find(x => x > 10) // undefined
// Com um predicado assíncrono, fazendo no máximo 2 verificações de arquivo por vez.
const foundBigFile = await Readable.from(['file1', 'file2', 'file3']).find(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
console.log(foundBigFile) // Nome do arquivo grande, se algum arquivo na lista for maior que 1MB
console.log('done') // O fluxo terminou
readable.every(fn[, options])
Adicionado em: v17.5.0, v16.15.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
fn
<Function> | <AsyncFunction> uma função para chamar em cada pedaço do fluxo.data
<any> um pedaço de dados do fluxo.options
<Object>signal
<AbortSignal> abortado se o fluxo for destruído, permitindo abortar a chamadafn
antecipadamente.
options
<Object>concurrency
<number> a invocação concorrente máxima defn
para chamar no fluxo de uma vez. Padrão:1
.signal
<AbortSignal> permite destruir o fluxo se o sinal for abortado.
Retorna: <Promise> uma promise que resulta em
true
sefn
retornar um valor truthy para todos os pedaços.
Este método é semelhante a Array.prototype.every
e chama fn
em cada pedaço do fluxo para verificar se todos os valores de retorno aguardados são valores truthy para fn
. Assim que uma chamada fn
em um pedaço de valor de retorno aguardado for falsy, o fluxo é destruído e a promise é cumprida com false
. Se todas as chamadas fn
nos pedaços retornarem um valor truthy, a promise é cumprida com true
.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// Com um predicado síncrono.
await Readable.from([1, 2, 3, 4]).every(x => x > 2) // false
await Readable.from([1, 2, 3, 4]).every(x => x > 0) // true
// Com um predicado assíncrono, fazendo no máximo 2 verificações de arquivo por vez.
const allBigFiles = await Readable.from(['file1', 'file2', 'file3']).every(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
// `true` se todos os arquivos na lista forem maiores que 1MiB
console.log(allBigFiles)
console.log('done') // O fluxo terminou
readable.flatMap(fn[, options])
Adicionado em: v17.5.0, v16.15.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
fn
<Function> | <AsyncGeneratorFunction> | <AsyncFunction> uma função para mapear cada chunk no stream.data
<any> um chunk de dados do stream.options
<Object>signal
<AbortSignal> abortado se o stream for destruído, permitindo abortar a chamadafn
antecipadamente.
options
<Object>concurrency
<number> a invocação concorrente máxima defn
a ser chamada no stream de uma só vez. Padrão:1
.signal
<AbortSignal> permite destruir o stream se o sinal for abortado.
Retorna: <Readable> um stream mapeado com a função
fn
.
Este método retorna um novo stream aplicando o callback dado a cada chunk do stream e então achatando o resultado.
É possível retornar um stream ou outro iterável ou iterável assíncrono de fn
e os streams resultantes serão mesclados (achatados) no stream retornado.
import { Readable } from 'node:stream'
import { createReadStream } from 'node:fs'
// Com um mapeador síncrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap(x => [x, x])) {
console.log(chunk) // 1, 1, 2, 2, 3, 3, 4, 4
}
// Com um mapeador assíncrono, combine o conteúdo de 4 arquivos
const concatResult = Readable.from(['./1.mjs', './2.mjs', './3.mjs', './4.mjs']).flatMap(fileName =>
createReadStream(fileName)
)
for await (const result of concatResult) {
// Isto conterá o conteúdo (todos os chunks) de todos os 4 arquivos
console.log(result)
}
readable.drop(limit[, options])
Adicionado em: v17.5.0, v16.15.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
limit
<number> o número de chunks para descartar do readable.options
<Object>signal
<AbortSignal> permite destruir o stream se o sinal for abortado.
Retorna: <Readable> um stream com
limit
chunks descartados.
Este método retorna um novo stream com os primeiros limit
chunks descartados.
import { Readable } from 'node:stream'
await Readable.from([1, 2, 3, 4]).drop(2).toArray() // [3, 4]
readable.take(limit[, options])
Adicionado em: v17.5.0, v16.15.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
limit
<number> o número de chunks para pegar do readable.options
<Object>signal
<AbortSignal> permite destruir o stream se o sinal for abortado.
Retorna: <Readable> um stream com
limit
chunks pegos.
Este método retorna um novo stream com os primeiros limit
chunks.
import { Readable } from 'node:stream'
await Readable.from([1, 2, 3, 4]).take(2).toArray() // [1, 2]
readable.reduce(fn[, initial[, options]])
Adicionado em: v17.5.0, v16.15.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
fn
<Function> | <AsyncFunction> uma função redutora para chamar em cada chunk no stream.previous
<any> o valor obtido da última chamada parafn
ou o valorinitial
se especificado ou o primeiro chunk do stream caso contrário.data
<any> um chunk de dados do stream.options
<Object>signal
<AbortSignal> abortado se o stream for destruído, permitindo abortar a chamadafn
antecipadamente.
initial
<any> o valor inicial a ser usado na redução.options
<Object>signal
<AbortSignal> permite destruir o stream se o sinal for abortado.
Retorna: <Promise> uma promise para o valor final da redução.
Este método chama fn
em cada chunk do stream em ordem, passando-lhe o resultado do cálculo no elemento anterior. Ele retorna uma promise para o valor final da redução.
Se nenhum valor initial
for fornecido, o primeiro chunk do stream será usado como valor inicial. Se o stream estiver vazio, a promise será rejeitada com um TypeError
com a propriedade de código ERR_INVALID_ARGS
.
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'
const directoryPath = './src'
const filesInDir = await readdir(directoryPath)
const folderSize = await Readable.from(filesInDir).reduce(async (totalSize, file) => {
const { size } = await stat(join(directoryPath, file))
return totalSize + size
}, 0)
console.log(folderSize)
A função redutora itera o elemento do stream elemento por elemento, o que significa que não há parâmetro concurrency
ou paralelismo. Para executar uma redução concorrentemente, você pode extrair a função assíncrona para o método readable.map
.
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'
const directoryPath = './src'
const filesInDir = await readdir(directoryPath)
const folderSize = await Readable.from(filesInDir)
.map(file => stat(join(directoryPath, file)), { concurrency: 2 })
.reduce((totalSize, { size }) => totalSize + size, 0)
console.log(folderSize)
Fluxos Duplex e de transformação
Classe: stream.Duplex
[Histórico]
Versão | Alterações |
---|---|
v6.8.0 | Instâncias de Duplex agora retornam true ao verificar instanceof stream.Writable . |
v0.9.4 | Adicionado em: v0.9.4 |
Fluxos Duplex são fluxos que implementam ambas as interfaces Readable
e Writable
.
Exemplos de fluxos Duplex
incluem:
duplex.allowHalfOpen
Adicionado em: v0.9.4
Se false
, o fluxo terminará automaticamente o lado gravável quando o lado legível terminar. Definido inicialmente pela opção do construtor allowHalfOpen
, que é true
por padrão.
Isso pode ser alterado manualmente para alterar o comportamento de meio aberto de uma instância de fluxo Duplex
existente, mas deve ser alterado antes do evento 'end'
ser emitido.
Classe: stream.Transform
Adicionado em: v0.9.4
Fluxos de transformação são fluxos Duplex
onde a saída está de alguma forma relacionada à entrada. Como todos os fluxos Duplex
, os fluxos Transform
implementam ambas as interfaces Readable
e Writable
.
Exemplos de fluxos Transform
incluem:
transform.destroy([error])
[Histórico]
Versão | Alterações |
---|---|
v14.0.0 | Funciona como uma operação sem efeito em um fluxo que já foi destruído. |
v8.0.0 | Adicionado em: v8.0.0 |
Destrói o fluxo e, opcionalmente, emite um evento 'error'
. Após esta chamada, o fluxo de transformação liberará quaisquer recursos internos. Os implementadores não devem substituir este método, mas sim implementar readable._destroy()
. A implementação padrão de _destroy()
para Transform
também emite 'close'
a menos que emitClose
seja definido como falso.
Uma vez que destroy()
foi chamado, quaisquer outras chamadas serão uma operação sem efeito e nenhum outro erro, exceto de _destroy()
, pode ser emitido como 'error'
.
stream.duplexPair([options])
Adicionado em: v22.6.0, v20.17.0
options
<Object> Um valor para passar para ambos os construtoresDuplex
, para definir opções como buffer.- Retorna: <Array> de duas instâncias
Duplex
.
A função utilitária duplexPair
retorna um Array com dois itens, cada um sendo um fluxo Duplex
conectado ao outro lado:
const [sideA, sideB] = duplexPair()
Tudo o que é escrito em um fluxo fica legível no outro. Ele fornece um comportamento análogo a uma conexão de rede, onde os dados escritos pelo cliente se tornam legíveis pelo servidor, e vice-versa.
Os fluxos Duplex são simétricos; um ou outro pode ser usado sem nenhuma diferença de comportamento.
stream.finished(stream[, options], callback)
[Histórico]
Versão | Alterações |
---|---|
v19.5.0 | Adicionou suporte para ReadableStream e WritableStream . |
v15.11.0 | A opção signal foi adicionada. |
v14.0.0 | O finished(stream, cb) aguardará o evento 'close' antes de invocar o callback. A implementação tenta detectar fluxos legados e aplica este comportamento apenas a fluxos que se espera que emitam 'close' . |
v14.0.0 | Emitir 'close' antes de 'end' em um fluxo Readable causará um erro ERR_STREAM_PREMATURE_CLOSE . |
v14.0.0 | O callback será invocado em fluxos que já terminaram antes da chamada para finished(stream, cb) . |
v10.0.0 | Adicionada em: v10.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> Um fluxo/webstream legível e/ou gravável.options
<Object>error
<boolean> Se definido comofalse
, uma chamada paraemit('error', err)
não é tratada como finalizada. Padrão:true
.readable
<boolean> Quando definido comofalse
, o callback será chamado quando o fluxo terminar, mesmo que o fluxo ainda possa ser legível. Padrão:true
.writable
<boolean> Quando definido comofalse
, o callback será chamado quando o fluxo terminar, mesmo que o fluxo ainda possa ser gravável. Padrão:true
.signal
<AbortSignal> permite abortar a espera pelo término do fluxo. O fluxo subjacente não será abortado se o sinal for abortado. O callback será chamado com umAbortError
. Todos os ouvintes registrados adicionados por esta função também serão removidos.
callback
<Function> Uma função de callback que recebe um argumento de erro opcional.Retorna: <Function> Uma função de limpeza que remove todos os ouvintes registrados.
Uma função para ser notificada quando um fluxo não for mais legível, gravável ou tiver experimentado um erro ou um evento de fechamento prematuro.
const { finished } = require('node:stream')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
finished(rs, err => {
if (err) {
console.error('Stream falhou.', err)
} else {
console.log('Stream terminou a leitura.')
}
})
rs.resume() // Drene o fluxo.
Especialmente útil em cenários de tratamento de erros em que um fluxo é destruído prematuramente (como uma solicitação HTTP abortada) e não emitirá 'end'
ou 'finish'
.
A API finished
fornece versão promise.
stream.finished()
deixa ouvintes de eventos pendentes (em particular 'error'
, 'end'
, 'finish'
e 'close'
) após a invocação do callback
. A razão para isso é que eventos 'error'
inesperados (devido a implementações de fluxo incorretas) não causam travamentos inesperados. Se este for um comportamento indesejado, a função de limpeza retornada precisa ser invocada no callback:
const cleanup = finished(rs, err => {
cleanup()
// ...
})
stream.pipeline(source[, ...transforms], destination, callback)
stream.pipeline(streams, callback)
[Histórico]
Versão | Alterações |
---|---|
v19.7.0, v18.16.0 | Adicionou suporte para webstreams. |
v18.0.0 | Passar um callback inválido para o argumento callback agora lança ERR_INVALID_ARG_TYPE em vez de ERR_INVALID_CALLBACK . |
v14.0.0 | O pipeline(..., cb) aguardará o evento 'close' antes de invocar o callback. A implementação tenta detectar streams legadas e aplica este comportamento apenas a streams que se espera que emitam 'close' . |
v13.10.0 | Adicionou suporte para geradores assíncronos. |
v10.0.0 | Adicionado em: v10.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function> | <ReadableStream>- Retorna: <Iterable> | <AsyncIterable>
...transforms
<Stream> | <Function> | <TransformStream>source
<AsyncIterable>- Retorna: <AsyncIterable>
destination
<Stream> | <Function> | <WritableStream>source
<AsyncIterable>- Retorna: <AsyncIterable> | <Promise>
callback
<Function> Chamado quando o pipeline está totalmente concluído.err
<Error>val
Valor resolvido dePromise
retornado pordestination
.
Retorna: <Stream>
Um método de módulo para conectar streams e geradores, encaminhando erros e limpando adequadamente, e fornecendo um callback quando o pipeline estiver completo.
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// Use a API pipeline para conectar facilmente uma série de streams
// juntas e receber notificação quando o pipeline estiver totalmente concluído.
// Um pipeline para compactar um arquivo tar potencialmente enorme de forma eficiente:
pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
if (err) {
console.error('Pipeline falhou.', err)
} else {
console.log('Pipeline concluído com sucesso.')
}
})
A API pipeline
fornece uma versão promise.
stream.pipeline()
chamará stream.destroy(err)
em todas as streams, exceto:
- Streams
Readable
que emitiram'end'
ou'close'
. - Streams
Writable
que emitiram'finish'
ou'close'
.
stream.pipeline()
deixa ouvintes de eventos pendentes nas streams após a invocação do callback
. No caso de reutilização de streams após falha, isso pode causar vazamentos de ouvintes de eventos e erros ignorados. Se a última stream for legível, os ouvintes de eventos pendentes serão removidos para que a última stream possa ser consumida posteriormente.
stream.pipeline()
fecha todas as streams quando um erro é gerado. O uso de IncomingRequest
com pipeline
pode levar a um comportamento inesperado, pois destrói o socket sem enviar a resposta esperada. Veja o exemplo abaixo:
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt')
pipeline(fileStream, res, err => {
if (err) {
console.log(err) // Arquivo não encontrado
// esta mensagem não pode ser enviada uma vez que `pipeline` já destruiu o socket
return res.end('error!!!')
}
})
})
stream.compose(...streams)
[Histórico]
Versão | Alterações |
---|---|
v21.1.0, v20.10.0 | Adicionou suporte para a classe stream. |
v19.8.0, v18.16.0 | Adicionou suporte para webstreams. |
v16.9.0 | Adicionada em: v16.9.0 |
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - stream.compose
é experimental.
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]> | <Duplex[]> | <Function>- Retorna: <stream.Duplex>
Combina dois ou mais streams em um stream Duplex
que escreve para o primeiro stream e lê do último. Cada stream fornecido é canalizado para o próximo, usando stream.pipeline
. Se algum dos streams apresentar erro, todos são destruídos, incluindo o stream Duplex
externo.
Como stream.compose
retorna um novo stream que por sua vez pode (e deve) ser canalizado para outros streams, ele permite a composição. Em contraste, ao passar streams para stream.pipeline
, tipicamente o primeiro stream é um stream legível e o último um stream gravável, formando um circuito fechado.
Se uma Function
for passada, ela deve ser um método de fábrica que recebe um source
Iterable
.
import { compose, Transform } from 'node:stream'
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''))
},
})
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
}
let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf
}
console.log(res) // imprime 'HELLOWORLD'
stream.compose
pode ser usado para converter iteráveis assíncronos, geradores e funções em streams.
AsyncIterable
converte em umDuplex
legível. Não pode produzirnull
.AsyncGeneratorFunction
converte em umDuplex
de transformação legível/gravável. Deve receber umAsyncIterable
de origem como primeiro parâmetro. Não pode produzirnull
.AsyncFunction
converte em umDuplex
gravável. Deve retornarnull
ouundefined
.
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'
// Converter AsyncIterable em Duplex legível.
const s1 = compose(
(async function* () {
yield 'Hello'
yield 'World'
})()
)
// Converter AsyncGenerator em Duplex de transformação.
const s2 = compose(async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
})
let res = ''
// Converter AsyncFunction em Duplex gravável.
const s3 = compose(async function (source) {
for await (const chunk of source) {
res += chunk
}
})
await finished(compose(s1, s2, s3))
console.log(res) // imprime 'HELLOWORLD'
Veja readable.compose(stream)
para stream.compose
como operador.
stream.Readable.from(iterable[, options])
Adicionado em: v12.3.0, v10.17.0
iterable
<Iterable> Objeto que implementa o protocolo iterávelSymbol.asyncIterator
ouSymbol.iterator
. Emite um evento 'error' se um valor nulo for passado.options
<Object> Opções fornecidas paranew stream.Readable([options])
. Por padrão,Readable.from()
definiráoptions.objectMode
comotrue
, a menos que isso seja explicitamente desabilitado definindooptions.objectMode
comofalse
.- Retorna: <stream.Readable>
Um método utilitário para criar streams legíveis a partir de iteradores.
const { Readable } = require('node:stream')
async function* generate() {
yield 'hello'
yield 'streams'
}
const readable = Readable.from(generate())
readable.on('data', chunk => {
console.log(chunk)
})
Chamar Readable.from(string)
ou Readable.from(buffer)
não fará com que as strings ou buffers sejam iterados para corresponder à semântica de outros streams por razões de desempenho.
Se um objeto Iterable
contendo promises for passado como argumento, pode resultar em uma rejeição não tratada.
const { Readable } = require('node:stream')
Readable.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Rejeição não tratada
])
stream.Readable.fromWeb(readableStream[, options])
Adicionado em: v17.0.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
readableStream
<ReadableStream>options
<Object>encoding
<string>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
Retorna: <stream.Readable>
stream.Readable.isDisturbed(stream)
Adicionado em: v16.8.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
stream
<stream.Readable> | <ReadableStream>- Retorna:
boolean
Retorna se o stream foi lido ou cancelado.
stream.isErrored(stream)
Adicionado em: v17.3.0, v16.14.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
stream
<Readable> | <Writable> | <Duplex> | <WritableStream> | <ReadableStream>- Retorna: <boolean>
Retorna se o stream encontrou um erro.
stream.isReadable(stream)
Adicionado em: v17.4.0, v16.14.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
stream
<Readable> | <Duplex> | <ReadableStream>- Retorna: <boolean>
Retorna se o stream é legível.
stream.Readable.toWeb(streamReadable[, options])
Adicionado em: v17.0.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
streamReadable
<stream.Readable>options
<Object>strategy
<Object>highWaterMark
<number> O tamanho máximo da fila interna (doReadableStream
criado) antes que a contrapressão seja aplicada na leitura dostream.Readable
fornecido. Se nenhum valor for fornecido, ele será retirado dostream.Readable
fornecido.size
<Function> Uma função que define o tamanho do chunk de dados fornecido. Se nenhum valor for fornecido, o tamanho será1
para todos os chunks.chunk
<any>- Retorna: <number>
Retorna: <ReadableStream>
stream.Writable.fromWeb(writableStream[, options])
Adicionado em: v17.0.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
writableStream
<WritableStream>options
<Object>decodeStrings
<boolean>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
Retorna: <stream.Writable>
stream.Writable.toWeb(streamWritable)
Adicionado em: v17.0.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
streamWritable
<stream.Writable>- Retorna: <WritableStream>
stream.Duplex.from(src)
[Histórico]
Versão | Alterações |
---|---|
v19.5.0, v18.17.0 | O argumento src agora pode ser um ReadableStream ou WritableStream . |
v16.8.0 | Adicionada em: v16.8.0 |
src
<Stream> | <Blob> | <ArrayBuffer> | <string> | <Iterable> | <AsyncIterable> | <AsyncGeneratorFunction> | <AsyncFunction> | <Promise> | <Object> | <ReadableStream> | <WritableStream>
Um método utilitário para criar fluxos duplex.
Stream
converte o fluxo gravável emDuplex
gravável e o fluxo legível emDuplex
.Blob
converte emDuplex
legível.string
converte emDuplex
legível.ArrayBuffer
converte emDuplex
legível.AsyncIterable
converte em umDuplex
legível. Não pode gerarnull
.AsyncGeneratorFunction
converte em umDuplex
de transformação legível/gravável. Deve receber umAsyncIterable
de origem como primeiro parâmetro. Não pode gerarnull
.AsyncFunction
converte em umDuplex
gravável. Deve retornarnull
ouundefined
.Object ({ writable, readable })
convertereadable
ewritable
emStream
e, em seguida, combina-os emDuplex
, onde oDuplex
gravará nowritable
e lerá doreadable
.Promise
converte emDuplex
legível. O valornull
é ignorado.ReadableStream
converte emDuplex
legível.WritableStream
converte emDuplex
gravável.- Retorna: <stream.Duplex>
Se um objeto Iterable
contendo promises for passado como argumento, pode resultar em uma rejeição não tratada.
const { Duplex } = require('node:stream')
Duplex.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Rejeição não tratada
])
stream.Duplex.fromWeb(pair[, options])
Adicionado em: v17.0.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
pair
<Object>readable
<ReadableStream>writable
<WritableStream>
options
<Object>Retorna: <stream.Duplex>
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
for await (const chunk of duplex) {
console.log('readable', chunk)
}
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))
stream.Duplex.toWeb(streamDuplex)
Adicionado em: v17.0.0
[Estável: 1 - Experimental]
Estável: 1 Estabilidade: 1 - Experimental
streamDuplex
<stream.Duplex>- Retorna: <Object>
readable
<ReadableStream>writable
<WritableStream>
import { Duplex } from 'node:stream'
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
const { value } = await readable.getReader().read()
console.log('readable', value)
const { Duplex } = require('node:stream')
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
readable
.getReader()
.read()
.then(result => {
console.log('readable', result.value)
})
stream.addAbortSignal(signal, stream)
[Histórico]
Versão | Alterações |
---|---|
v19.7.0, v18.16.0 | Suporte adicionado para ReadableStream e WritableStream . |
v15.4.0 | Adicionada em: v15.4.0 |
signal
<AbortSignal> Um sinal representando possível cancelamentostream
<Stream> | <ReadableStream> | <WritableStream> Um fluxo para anexar um sinal.
Anexa um AbortSignal
a um fluxo legível ou gravável. Isso permite que o código controle a destruição do fluxo usando um AbortController
.
Chamar abort
no AbortController
correspondente ao AbortSignal
passado se comportará da mesma maneira que chamar .destroy(new AbortError())
no fluxo, e controller.error(new AbortError())
para fluxos web.
const fs = require('node:fs')
const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// Mais tarde, interrompa a operação fechando o fluxo
controller.abort()
Ou usando um AbortSignal
com um fluxo legível como um iterável assíncrono:
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // definir um tempo limite
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
try {
for await (const chunk of stream) {
await process(chunk)
}
} catch (e) {
if (e.name === 'AbortError') {
// A operação foi cancelada
} else {
throw e
}
}
})()
Ou usando um AbortSignal
com um ReadableStream
:
const controller = new AbortController()
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello')
controller.enqueue('world')
controller.close()
},
})
addAbortSignal(controller.signal, rs)
finished(rs, err => {
if (err) {
if (err.name === 'AbortError') {
// A operação foi cancelada
}
}
})
const reader = rs.getReader()
reader.read().then(({ value, done }) => {
console.log(value) // hello
console.log(done) // false
controller.abort()
})
stream.getDefaultHighWaterMark(objectMode)
Adicionado em: v19.9.0, v18.17.0
Retorna o highWaterMark
padrão usado pelos streams. O padrão é 65536
(64 KiB), ou 16
para objectMode
.
stream.setDefaultHighWaterMark(objectMode, value)
Adicionado em: v19.9.0, v18.17.0
Define o highWaterMark
padrão usado pelos streams.
API para implementadores de stream
A API do módulo node:stream
foi projetada para facilitar a implementação de streams usando o modelo de herança prototípica do JavaScript.
Primeiro, um desenvolvedor de stream declararia uma nova classe JavaScript que estende uma das quatro classes básicas de stream (stream.Writable
, stream.Readable
, stream.Duplex
ou stream.Transform
), certificando-se de chamar o construtor da classe pai apropriado:
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark })
// ...
}
}
Ao estender streams, lembre-se de quais opções o usuário pode e deve fornecer antes de encaminhá-las para o construtor base. Por exemplo, se a implementação fizer suposições em relação às opções autoDestroy
e emitClose
, não permita que o usuário as substitua. Seja explícito sobre quais opções são encaminhadas em vez de encaminhar implicitamente todas as opções.
A nova classe de stream deve então implementar um ou mais métodos específicos, dependendo do tipo de stream que está sendo criado, conforme detalhado na tabela abaixo:
Caso de uso | Classe | Método(s) a implementar |
---|---|---|
Somente leitura | Readable | _read() |
Somente escrita | Writable | _write() , _writev() , _final() |
Leitura e escrita | Duplex | _read() , _write() , _writev() , _final() |
Operar em dados escritos, então ler o resultado | Transform | _transform() , _flush() , _final() |
O código de implementação para um stream nunca deve chamar os métodos "públicos" de um stream que são destinados ao uso por consumidores (conforme descrito na seção API para consumidores de stream). Fazer isso pode levar a efeitos colaterais adversos no código do aplicativo que consome o stream.
Evite substituir métodos públicos como write()
, end()
, cork()
, uncork()
, read()
e destroy()
, ou emitir eventos internos como 'error'
, 'data'
, 'end'
, 'finish'
e 'close'
através de .emit()
. Fazer isso pode quebrar invariantes de stream atuais e futuros, levando a problemas de comportamento e/ou compatibilidade com outros streams, utilitários de stream e expectativas do usuário.
Construção simplificada
Adicionado em: v1.2.0
Para muitos casos simples, é possível criar um fluxo sem depender de herança. Isso pode ser feito criando diretamente instâncias dos objetos stream.Writable
, stream.Readable
, stream.Duplex
ou stream.Transform
e passando métodos apropriados como opções do construtor.
const { Writable } = require('node:stream')
const myWritable = new Writable({
construct(callback) {
// Inicializar estado e carregar recursos...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Liberar recursos...
},
})
Implementando um fluxo gravável
A classe stream.Writable
é estendida para implementar um fluxo Writable
.
Fluxos Writable
personalizados devem chamar o construtor new stream.Writable([options])
e implementar o método writable._write()
e/ou writable._writev()
.
new stream.Writable([options])
[Histórico]
Versão | Alterações |
---|---|
v22.0.0 | Aumentar highWaterMark padrão. |
v15.5.0 | Suporte para passar um AbortSignal . |
v14.0.0 | Alterar o padrão da opção autoDestroy para true . |
v11.2.0, v10.16.0 | Adicionar a opção autoDestroy para destruir automaticamente o fluxo quando ele emitir 'finish' ou erros. |
v10.0.0 | Adicionar a opção emitClose para especificar se 'close' é emitido na destruição. |
options
<Object>highWaterMark
<number> Nível do buffer quandostream.write()
começa a retornarfalse
. Padrão:65536
(64 KiB), ou16
para fluxosobjectMode
.decodeStrings
<boolean> Se codificarstring
s passadas parastream.write()
emBuffer
s (com a codificação especificada na chamadastream.write()
) antes de passá-las parastream._write()
. Outros tipos de dados não são convertidos (ou seja,Buffer
s não são decodificados emstring
s). Definir como falso impedirá questring
s sejam convertidas. Padrão:true
.defaultEncoding
<string> A codificação padrão usada quando nenhuma codificação é especificada como argumento parastream.write()
. Padrão:'utf8'
.objectMode
<boolean> Sestream.write(anyObj)
é uma operação válida. Quando definido, torna-se possível escrever valores JavaScript além de string, <Buffer>, <TypedArray> ou <DataView> se suportado pela implementação do fluxo. Padrão:false
.emitClose
<boolean> Se o fluxo deve emitir'close'
depois de ter sido destruído. Padrão:true
.write
<Function> Implementação para o métodostream._write()
.writev
<Function> Implementação para o métodostream._writev()
.destroy
<Function> Implementação para o métodostream._destroy()
.final
<Function> Implementação para o métodostream._final()
.construct
<Function> Implementação para o métodostream._construct()
.autoDestroy
<boolean> Se este fluxo deve chamar automaticamente.destroy()
nele mesmo após o término. Padrão:true
.signal
<AbortSignal> Um sinal representando possível cancelamento.
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor(options) {
// Chama o construtor stream.Writable().
super(options)
// ...
}
}
Ou, quando usar construtores de estilo pré-ES6:
const { Writable } = require('node:stream')
const util = require('node:util')
function MyWritable(options) {
if (!(this instanceof MyWritable)) return new MyWritable(options)
Writable.call(this, options)
}
util.inherits(MyWritable, Writable)
Ou, usando a abordagem de construtor simplificada:
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
})
Chamar abort
no AbortController
correspondente ao AbortSignal
passado se comportará da mesma maneira que chamar .destroy(new AbortError())
no fluxo gravável.
const { Writable } = require('node:stream')
const controller = new AbortController()
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
})
// Mais tarde, interrompa a operação fechando o fluxo
controller.abort()
writable._construct(callback)
Adicionado em: v15.0.0
callback
<Function> Chame esta função (opcionalmente com um argumento de erro) quando o stream tiver terminado a inicialização.
O método _construct()
NÃO DEVE ser chamado diretamente. Pode ser implementado por classes filhas e, se assim for, será chamado apenas pelos métodos internos da classe Writable
.
Esta função opcional será chamada em um tick após o construtor do stream ter retornado, atrasando quaisquer chamadas _write()
, _final()
e _destroy()
até que callback
seja chamado. Isso é útil para inicializar o estado ou inicializar recursos assincronamente antes que o stream possa ser usado.
const { Writable } = require('node:stream')
const fs = require('node:fs')
class WriteStream extends Writable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, 'w', (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback)
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
writable._write(chunk, encoding, callback)
[Histórico]
Versão | Alterações |
---|---|
v12.11.0 | _write() é opcional ao fornecer _writev(). |
chunk
<Buffer> | <string> | <any> OBuffer
a ser escrito, convertido a partir dastring
passada parastream.write()
. Se a opçãodecodeStrings
do stream forfalse
ou se o stream estiver operando no modo objeto, o chunk não será convertido e será o que foi passado parastream.write()
.encoding
<string> Se o chunk for uma string, entãoencoding
é a codificação de caracteres dessa string. Se o chunk for umBuffer
, ou se o stream estiver operando no modo objeto,encoding
pode ser ignorado.callback
<Function> Chame esta função (opcionalmente com um argumento de erro) quando o processamento estiver completo para o chunk fornecido.
Todas as implementações de stream Writable
devem fornecer um método writable._write()
e/ou writable._writev()
para enviar dados para o recurso subjacente.
Os streams Transform
fornecem sua própria implementação de writable._write()
.
Esta função NÃO DEVE ser chamada diretamente pelo código da aplicação. Deve ser implementada por classes filhas e chamada apenas pelos métodos internos da classe Writable
.
A função callback
deve ser chamada sincronicamente dentro de writable._write()
ou assincronamente (ou seja, tick diferente) para sinalizar que a gravação foi concluída com sucesso ou falhou com um erro. O primeiro argumento passado para o callback
deve ser o objeto Error
se a chamada falhou ou null
se a gravação foi bem-sucedida.
Todas as chamadas para writable.write()
que ocorrem entre o momento em que writable._write()
é chamado e o callback
é chamado farão com que os dados escritos sejam armazenados em buffer. Quando o callback
é chamado, o stream pode emitir um evento 'drain'
. Se uma implementação de stream for capaz de processar vários chunks de dados ao mesmo tempo, o método writable._writev()
deve ser implementado.
Se a propriedade decodeStrings
for explicitamente definida como false
nas opções do construtor, então chunk
permanecerá o mesmo objeto que é passado para .write()
, e pode ser uma string em vez de um Buffer
. Isso é para suportar implementações que têm um tratamento otimizado para certas codificações de dados de string. Nesse caso, o argumento encoding
indicará a codificação de caracteres da string. Caso contrário, o argumento encoding
pode ser seguramente ignorado.
O método writable._write()
tem o prefixo de um sublinhado porque é interno à classe que o define e nunca deve ser chamado diretamente por programas de usuário.
writable._writev(chunks, callback)
chunks
<Object[]> Os dados a serem escritos. O valor é um array de <Object>, cada um representando um chunk discreto de dados a serem escritos. As propriedades desses objetos são:chunk
<Buffer> | <string> Uma instância de buffer ou string contendo os dados a serem escritos. Ochunk
será uma string se oWritable
foi criado com a opçãodecodeStrings
definida comofalse
e uma string foi passada parawrite()
.encoding
<string> A codificação de caracteres dochunk
. Sechunk
for umBuffer
, aencoding
será'buffer'
.
callback
<Function> Uma função de callback (opcionalmente com um argumento de erro) a ser invocada quando o processamento estiver completo para os chunks fornecidos.
Esta função NÃO DEVE ser chamada diretamente pelo código da aplicação. Ela deve ser implementada pelas classes filhas e chamada apenas pelos métodos internos da classe Writable
.
O método writable._writev()
pode ser implementado em adição ou como alternativa a writable._write()
em implementações de stream que são capazes de processar múltiplos chunks de dados ao mesmo tempo. Se implementado e se houver dados em buffer de escritas anteriores, _writev()
será chamado em vez de _write()
.
O método writable._writev()
é prefixado com um underscore porque é interno à classe que o define e nunca deve ser chamado diretamente por programas de usuário.
writable._destroy(err, callback)
Adicionado em: v8.0.0
err
<Error> Um possível erro.callback
<Function> Uma função de callback que recebe um argumento de erro opcional.
O método _destroy()
é chamado por writable.destroy()
. Ele pode ser sobrescrito por classes filhas, mas não deve ser chamado diretamente.
writable._final(callback)
Adicionado em: v8.0.0
callback
<Function> Chame esta função (opcionalmente com um argumento de erro) quando terminar de escrever quaisquer dados restantes.
O método _final()
não deve ser chamado diretamente. Ele pode ser implementado por classes filhas e, se assim for, será chamado apenas pelos métodos internos da classe Writable
.
Esta função opcional será chamada antes do fechamento do fluxo, atrasando o evento 'finish'
até que callback
seja chamado. Isso é útil para fechar recursos ou escrever dados em buffer antes do término de um fluxo.
Erros durante a escrita
Erros que ocorrem durante o processamento dos métodos writable._write()
, writable._writev()
e writable._final()
devem ser propagados invocando o callback e passando o erro como o primeiro argumento. Lançar um Error
de dentro desses métodos ou emitir manualmente um evento 'error'
resulta em comportamento indefinido.
Se um fluxo Readable
for conectado a um fluxo Writable
quando Writable
emitir um erro, o fluxo Readable
será desconectado.
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
},
})
Um exemplo de fluxo gravável
O seguinte ilustra uma implementação de fluxo Writable
personalizado bastante simplista (e um tanto sem sentido). Embora esta instância de fluxo Writable
específica não seja de nenhuma utilidade real em particular, o exemplo ilustra cada um dos elementos necessários de uma instância de fluxo Writable
personalizado:
const { Writable } = require('node:stream')
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
}
}
Decodificando buffers em um fluxo gravável
Decodificar buffers é uma tarefa comum, por exemplo, ao usar transformadores cuja entrada é uma string. Este não é um processo trivial quando se usa codificação de caracteres multibyte, como UTF-8. O exemplo a seguir mostra como decodificar strings multibyte usando StringDecoder
e Writable
.
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')
class StringWritable extends Writable {
constructor(options) {
super(options)
this._decoder = new StringDecoder(options?.defaultEncoding)
this.data = ''
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk)
}
this.data += chunk
callback()
}
_final(callback) {
this.data += this._decoder.end()
callback()
}
}
const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()
w.write('currency: ')
w.write(euro[0])
w.end(euro[1])
console.log(w.data) // currency: €
Implementando um fluxo legível
A classe stream.Readable
é estendida para implementar um fluxo Readable
.
Fluxos Readable
personalizados devem chamar o construtor new stream.Readable([options])
e implementar o método readable._read()
.
new stream.Readable([options])
[Histórico]
Versão | Alterações |
---|---|
v22.0.0 | Aumento do highWaterMark padrão. |
v15.5.0 | Suporte para passar um AbortSignal . |
v14.0.0 | Altera o padrão da opção autoDestroy para true . |
v11.2.0, v10.16.0 | Adiciona a opção autoDestroy para automaticamente destroy() o fluxo quando ele emite 'end' ou erros. |
options
<Object>highWaterMark
<number> O máximo número de bytes para armazenar no buffer interno antes de parar de ler do recurso subjacente. Padrão:65536
(64 KiB), ou16
para fluxosobjectMode
.encoding
<string> Se especificado, os buffers serão decodificados para strings usando a codificação especificada. Padrão:null
.objectMode
<boolean> Se este fluxo deve se comportar como um fluxo de objetos. Significando questream.read(n)
retorna um único valor em vez de umBuffer
de tamanhon
. Padrão:false
.emitClose
<boolean> Se o fluxo deve emitir'close'
depois de ter sido destruído. Padrão:true
.read
<Function> Implementação para o métodostream._read()
.destroy
<Function> Implementação para o métodostream._destroy()
.construct
<Function> Implementação para o métodostream._construct()
.autoDestroy
<boolean> Se este fluxo deve chamar automaticamente.destroy()
em si mesmo após o término. Padrão:true
.signal
<AbortSignal> Um sinal representando uma possível cancelamento.
const { Readable } = require('node:stream')
class MyReadable extends Readable {
constructor(options) {
// Chama o construtor stream.Readable(options).
super(options)
// ...
}
}
Ou, quando usar construtores de estilo pré-ES6:
const { Readable } = require('node:stream')
const util = require('node:util')
function MyReadable(options) {
if (!(this instanceof MyReadable)) return new MyReadable(options)
Readable.call(this, options)
}
util.inherits(MyReadable, Readable)
Ou, usando a abordagem de construtor simplificada:
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
// ...
},
})
Chamar abort
no AbortController
correspondente ao AbortSignal
passado se comportará da mesma forma que chamar .destroy(new AbortError())
no readable
criado.
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal,
})
// Mais tarde, interrompa a operação fechando o fluxo
controller.abort()
readable._construct(callback)
Adicionado em: v15.0.0
callback
<Function> Chame esta função (opcionalmente com um argumento de erro) quando o stream terminar de inicializar.
O método _construct()
NÃO deve ser chamado diretamente. Ele pode ser implementado por classes filhas e, se for o caso, será chamado apenas pelos métodos internos da classe Readable
.
Esta função opcional será agendada no próximo tick pelo construtor do stream, atrasando quaisquer chamadas _read()
e _destroy()
até que callback
seja chamado. Isso é útil para inicializar o estado ou inicializar recursos assincronamente antes que o stream possa ser usado.
const { Readable } = require('node:stream')
const fs = require('node:fs')
class ReadStream extends Readable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_read(n) {
const buf = Buffer.alloc(n)
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err)
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
}
})
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
readable._read(size)
Adicionado em: v0.9.4
size
<number> Número de bytes a serem lidos assincronamente
Esta função NÃO deve ser chamada diretamente pelo código do aplicativo. Ela deve ser implementada por classes filhas e chamada apenas pelos métodos internos da classe Readable
.
Todas as implementações de stream Readable
devem fornecer uma implementação do método readable._read()
para buscar dados do recurso subjacente.
Quando readable._read()
é chamado, se dados estiverem disponíveis no recurso, a implementação deve começar a inserir esses dados na fila de leitura usando o método this.push(dataChunk)
. _read()
será chamado novamente após cada chamada para this.push(dataChunk)
assim que o stream estiver pronto para aceitar mais dados. _read()
pode continuar lendo do recurso e inserindo dados até que readable.push()
retorne false
. Somente quando _read()
for chamado novamente depois de ter parado, ele deve retomar a inserção de dados adicionais na fila.
Assim que o método readable._read()
tiver sido chamado, ele não será chamado novamente até que mais dados sejam inseridos por meio do método readable.push()
. Dados vazios, como buffers e strings vazios, não farão com que readable._read()
seja chamado.
O argumento size
é consultivo. Para implementações em que uma "leitura" é uma única operação que retorna dados, pode-se usar o argumento size
para determinar quantos dados buscar. Outras implementações podem ignorar este argumento e simplesmente fornecer dados sempre que estiverem disponíveis. Não há necessidade de "esperar" até que size
bytes estejam disponíveis antes de chamar stream.push(chunk)
.
O método readable._read()
tem um prefixo de sublinhado porque é interno à classe que o define e nunca deve ser chamado diretamente por programas de usuário.
readable._destroy(err, callback)
Adicionado em: v8.0.0
err
<Error> Um possível erro.callback
<Function> Uma função de callback que recebe um argumento de erro opcional.
O método _destroy()
é chamado por readable.destroy()
. Ele pode ser sobrescrito por classes filhas, mas não deve ser chamado diretamente.
readable.push(chunk[, encoding])
[Histórico]
Versão | Alterações |
---|---|
v22.0.0, v20.13.0 | O argumento chunk agora pode ser uma instância de TypedArray ou DataView . |
v8.0.0 | O argumento chunk agora pode ser uma instância de Uint8Array . |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Trecho de dados para inserir na fila de leitura. Para fluxos que não operam no modo objeto,chunk
deve ser uma <string>, <Buffer>, <TypedArray> ou <DataView>. Para fluxos em modo objeto,chunk
pode ser qualquer valor JavaScript.encoding
<string> Codificação de trechos de string. Deve ser uma codificaçãoBuffer
válida, como'utf8'
ou'ascii'
.- Retorna: <boolean>
true
se trechos adicionais de dados podem continuar a ser inseridos;false
caso contrário.
Quando chunk
é um <Buffer>, <TypedArray>, <DataView> ou <string>, o trecho de dados será adicionado à fila interna para que os usuários do fluxo possam consumir. Passar chunk
como null
sinaliza o fim do fluxo (EOF), após o qual nenhum dado poderá mais ser escrito.
Quando o Readable
está operando no modo pausado, os dados adicionados com readable.push()
podem ser lidos chamando o método readable.read()
quando o evento 'readable'
for emitido.
Quando o Readable
está operando no modo fluindo, os dados adicionados com readable.push()
serão entregues emitindo um evento 'data'
.
O método readable.push()
é projetado para ser o mais flexível possível. Por exemplo, ao encapsular uma fonte de nível inferior que fornece algum tipo de mecanismo de pausa/reprise e um callback de dados, a fonte de nível inferior pode ser encapsulada pela instância Readable
personalizada:
// `_source` é um objeto com métodos readStop() e readStart(),
// e um membro `ondata` que é chamado quando há dados, e
// um membro `onend` que é chamado quando os dados terminam.
class SourceWrapper extends Readable {
constructor(options) {
super(options)
this._source = getLowLevelSourceObject()
// Toda vez que houver dados, insira-os no buffer interno.
this._source.ondata = chunk => {
// Se push() retornar false, pare de ler da fonte.
if (!this.push(chunk)) this._source.readStop()
}
// Quando a fonte terminar, insira o trecho `null` que sinaliza o EOF.
this._source.onend = () => {
this.push(null)
}
}
// _read() será chamado quando o fluxo quiser puxar mais dados.
// O argumento de tamanho consultivo é ignorado neste caso.
_read(size) {
this._source.readStart()
}
}
O método readable.push()
é usado para inserir o conteúdo no buffer interno. Ele pode ser acionado pelo método readable._read()
.
Para fluxos que não operam no modo objeto, se o parâmetro chunk
de readable.push()
for undefined
, ele será tratado como uma string ou buffer vazio. Veja readable.push('')
para mais informações.
Erros durante a leitura
Erros que ocorrem durante o processamento de readable._read()
devem ser propagados através do método readable.destroy(err)
. Lançar um Error
de dentro de readable._read()
ou emitir manualmente um evento 'error'
resulta em comportamento indefinido.
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
const err = checkSomeErrorCondition()
if (err) {
this.destroy(err)
} else {
// Faça algum trabalho.
}
},
})
Um exemplo de stream de contagem
O seguinte é um exemplo básico de um stream Readable
que emite os numerais de 1 a 1.000.000 em ordem crescente e depois termina.
const { Readable } = require('node:stream')
class Counter extends Readable {
constructor(opt) {
super(opt)
this._max = 1000000
this._index = 1
}
_read() {
const i = this._index++
if (i > this._max) this.push(null)
else {
const str = String(i)
const buf = Buffer.from(str, 'ascii')
this.push(buf)
}
}
}
Implementando um stream duplex
Um stream Duplex
é aquele que implementa tanto Readable
quanto Writable
, como uma conexão de socket TCP.
Como JavaScript não possui suporte para herança múltipla, a classe stream.Duplex
é estendida para implementar um stream Duplex
(ao contrário de estender as classes stream.Readable
e stream.Writable
).
A classe stream.Duplex
herda prototipicamente de stream.Readable
e parasitariamente de stream.Writable
, mas instanceof
funcionará corretamente para ambas as classes base devido à substituição de Symbol.hasInstance
em stream.Writable
.
Streams Duplex
personalizados devem chamar o construtor new stream.Duplex([options])
e implementar ambos os métodos readable._read()
e writable._write()
.
new stream.Duplex(options)
[Histórico]
Versão | Alterações |
---|---|
v8.4.0 | As opções readableHighWaterMark e writableHighWaterMark agora são suportadas. |
options
<Object> Passado para os construtoresWritable
eReadable
. Também possui os seguintes campos:allowHalfOpen
<boolean> Se definido comofalse
, o fluxo encerrará automaticamente o lado gravável quando o lado legível terminar. Padrão:true
.readable
<boolean> Define se oDuplex
deve ser legível. Padrão:true
.writable
<boolean> Define se oDuplex
deve ser gravável. Padrão:true
.readableObjectMode
<boolean> DefineobjectMode
para o lado legível do fluxo. Não tem efeito seobjectMode
fortrue
. Padrão:false
.writableObjectMode
<boolean> DefineobjectMode
para o lado gravável do fluxo. Não tem efeito seobjectMode
fortrue
. Padrão:false
.readableHighWaterMark
<number> DefinehighWaterMark
para o lado legível do fluxo. Não tem efeito sehighWaterMark
for fornecido.writableHighWaterMark
<number> DefinehighWaterMark
para o lado gravável do fluxo. Não tem efeito sehighWaterMark
for fornecido.
const { Duplex } = require('node:stream')
class MyDuplex extends Duplex {
constructor(options) {
super(options)
// ...
}
}
Ou, quando usar construtores de estilo pré-ES6:
const { Duplex } = require('node:stream')
const util = require('node:util')
function MyDuplex(options) {
if (!(this instanceof MyDuplex)) return new MyDuplex(options)
Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)
Ou, usando a abordagem de construtor simplificada:
const { Duplex } = require('node:stream')
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
},
})
Quando usar pipeline:
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')
pipeline(
fs.createReadStream('object.json').setEncoding('utf8'),
new Transform({
decodeStrings: false, // Aceita entrada de string em vez de Buffers
construct(callback) {
this.data = ''
callback()
},
transform(chunk, encoding, callback) {
this.data += chunk
callback()
},
flush(callback) {
try {
// Certifique-se de que seja json válido.
JSON.parse(this.data)
this.push(this.data)
callback()
} catch (err) {
callback(err)
}
},
}),
fs.createWriteStream('valid-object.json'),
err => {
if (err) {
console.error('falhou', err)
} else {
console.log('concluído')
}
}
)
Um exemplo de stream duplex
O seguinte ilustra um exemplo simples de um stream Duplex
que encapsula um objeto de origem de nível inferior hipotético para o qual os dados podem ser escritos e de onde os dados podem ser lidos, embora usando uma API que não é compatível com streams Node.js. O seguinte ilustra um exemplo simples de um stream Duplex
que armazena em buffer os dados escritos recebidos por meio da interface Writable
que é lida novamente por meio da interface Readable
.
const { Duplex } = require('node:stream')
const kSource = Symbol('source')
class MyDuplex extends Duplex {
constructor(source, options) {
super(options)
this[kSource] = source
}
_write(chunk, encoding, callback) {
// A origem subjacente lida apenas com strings.
if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
this[kSource].writeSomeData(chunk)
callback()
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding))
})
}
}
O aspecto mais importante de um stream Duplex
é que os lados Readable
e Writable
operam independentemente um do outro, apesar de coexistirem em uma única instância de objeto.
Streams duplex em modo de objeto
Para streams Duplex
, objectMode
pode ser definido exclusivamente para o lado Readable
ou Writable
usando as opções readableObjectMode
e writableObjectMode
, respectivamente.
No exemplo a seguir, por exemplo, um novo stream Transform
(que é um tipo de stream Duplex
) é criado que possui um lado Writable
em modo de objeto que aceita números JavaScript que são convertidos em strings hexadecimais no lado Readable
.
const { Transform } = require('node:stream')
// Todos os streams Transform também são streams Duplex.
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// Coerce o chunk para um número, se necessário.
chunk |= 0
// Transforme o chunk em algo diferente.
const data = chunk.toString(16)
// Envie os dados para a fila legível.
callback(null, '0'.repeat(data.length % 2) + data)
},
})
myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))
myTransform.write(1)
// Imprime: 01
myTransform.write(10)
// Imprime: 0a
myTransform.write(100)
// Imprime: 64
Implementando um fluxo de transformação
Um fluxo Transform
é um fluxo Duplex
onde a saída é computada de alguma forma a partir da entrada. Exemplos incluem fluxos zlib ou fluxos crypto que comprimem, criptografam ou descriptografam dados.
Não há nenhum requisito de que a saída tenha o mesmo tamanho que a entrada, o mesmo número de chunks ou chegue ao mesmo tempo. Por exemplo, um fluxo Hash
terá apenas um único chunk de saída, que é fornecido quando a entrada termina. Um fluxo zlib
produzirá uma saída que é muito menor ou muito maior que sua entrada.
A classe stream.Transform
é estendida para implementar um fluxo Transform
.
A classe stream.Transform
herda prototipicamente de stream.Duplex
e implementa suas próprias versões dos métodos writable._write()
e readable._read()
. Implementações personalizadas de Transform
devem implementar o método transform._transform()
e podem também implementar o método transform._flush()
.
É preciso ter cuidado ao usar fluxos Transform
, pois os dados escritos no fluxo podem fazer com que o lado Writable
do fluxo seja pausado se a saída no lado Readable
não for consumida.
new stream.Transform([options])
options
<Object> Passado para os construtoresWritable
eReadable
. Também possui os seguintes campos:transform
<Function> Implementação para o métodostream._transform()
.flush
<Function> Implementação para o métodostream._flush()
.
const { Transform } = require('node:stream')
class MyTransform extends Transform {
constructor(options) {
super(options)
// ...
}
}
Ou, ao usar construtores de estilo pré-ES6:
const { Transform } = require('node:stream')
const util = require('node:util')
function MyTransform(options) {
if (!(this instanceof MyTransform)) return new MyTransform(options)
Transform.call(this, options)
}
util.inherits(MyTransform, Transform)
Ou, usando a abordagem de construtor simplificada:
const { Transform } = require('node:stream')
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
},
})
Evento: 'end'
O evento 'end'
é da classe stream.Readable
. O evento 'end'
é emitido depois que todos os dados foram enviados, o que ocorre depois que o callback em transform._flush()
foi chamado. No caso de um erro, 'end'
não deve ser emitido.
Evento: 'finish'
O evento 'finish'
é da classe stream.Writable
. O evento 'finish'
é emitido depois que stream.end()
é chamado e todos os chunks foram processados por stream._transform()
. No caso de um erro, 'finish'
não deve ser emitido.
transform._flush(callback)
callback
<Function> Uma função de callback (opcionalmente com um argumento de erro e dados) a ser chamada quando os dados restantes foram liberados.
Esta função NÃO DEVE ser chamada diretamente pelo código da aplicação. Ela deve ser implementada por classes filhas e chamada apenas pelos métodos internos da classe Readable
.
Em alguns casos, uma operação de transformação pode precisar emitir um pouco de dados adicionais no final do stream. Por exemplo, um stream de compressão zlib
armazenará uma quantidade de estado interno usado para comprimir o resultado de forma otimizada. No entanto, quando o stream termina, esses dados adicionais precisam ser liberados para que os dados comprimidos estejam completos.
Implementações personalizadas de Transform
podem implementar o método transform._flush()
. Este será chamado quando não houver mais dados escritos para serem consumidos, mas antes do evento 'end'
ser emitido, sinalizando o fim do stream Readable
.
Dentro da implementação transform._flush()
, o método transform.push()
pode ser chamado zero ou mais vezes, conforme apropriado. A função callback
deve ser chamada quando a operação de liberação estiver completa.
O método transform._flush()
é prefixado com um underscore porque é interno à classe que o define e nunca deve ser chamado diretamente por programas de usuário.
transform._transform(chunk, encoding, callback)
chunk
<Buffer> | <string> | <any> OBuffer
a ser transformado, convertido a partir dastring
passada parastream.write()
. Se a opçãodecodeStrings
do stream forfalse
ou o stream estiver operando no modo objeto, o chunk não será convertido e será o que foi passado parastream.write()
.encoding
<string> Se o chunk for uma string, este é o tipo de codificação. Se o chunk for um buffer, este é o valor especial'buffer'
. Ignore-o nesse caso.callback
<Function> Uma função de callback (opcionalmente com um argumento de erro e dados) a ser chamada após ochunk
fornecido ter sido processado.
Esta função NÃO DEVE ser chamada diretamente pelo código do aplicativo. Ela deve ser implementada por classes filhas e chamada apenas pelos métodos internos da classe Readable
.
Todas as implementações de stream Transform
devem fornecer um método _transform()
para aceitar entrada e produzir saída. A implementação transform._transform()
lida com os bytes sendo escritos, calcula uma saída e, em seguida, passa essa saída para a parte legível usando o método transform.push()
.
O método transform.push()
pode ser chamado zero ou mais vezes para gerar saída de um único chunk de entrada, dependendo de quanto deve ser gerado como resultado do chunk.
É possível que nenhuma saída seja gerada a partir de qualquer chunk de dados de entrada.
A função callback
deve ser chamada apenas quando o chunk atual for completamente consumido. O primeiro argumento passado para o callback
deve ser um objeto Error
se ocorreu um erro durante o processamento da entrada ou null
caso contrário. Se um segundo argumento for passado para o callback
, ele será encaminhado para o método transform.push()
, mas apenas se o primeiro argumento for falso. Em outras palavras, os seguintes são equivalentes:
transform.prototype._transform = function (data, encoding, callback) {
this.push(data)
callback()
}
transform.prototype._transform = function (data, encoding, callback) {
callback(null, data)
}
O método transform._transform()
é prefixado com um sublinhado porque é interno à classe que o define e nunca deve ser chamado diretamente por programas de usuário.
transform._transform()
nunca é chamado em paralelo; os streams implementam um mecanismo de fila e, para receber o próximo chunk, callback
deve ser chamado, sincronicamente ou assincronicamente.
Classe: stream.PassThrough
A classe stream.PassThrough
é uma implementação trivial de um stream Transform
que simplesmente passa os bytes de entrada para a saída. Seu propósito é principalmente para exemplos e testes, mas existem alguns casos de uso em que stream.PassThrough
é útil como um bloco de construção para novos tipos de streams.
Notas adicionais
Compatibilidade de Streams com geradores assíncronos e iteradores assíncronos
Com o suporte de geradores assíncronos e iteradores em JavaScript, os geradores assíncronos são efetivamente uma construção de stream de nível de linguagem de primeira classe neste momento.
Alguns casos comuns de interoperabilidade usando streams Node.js com geradores assíncronos e iteradores assíncronos são fornecidos abaixo.
Consumindo streams legíveis com iteradores assíncronos
;(async function () {
for await (const chunk of readable) {
console.log(chunk)
}
})()
Iteradores assíncronos registram um manipulador de erros permanente no stream para evitar quaisquer erros não tratados pós-destruição.
Criando streams legíveis com geradores assíncronos
Um stream legível Node.js pode ser criado a partir de um gerador assíncrono usando o método utilitário Readable.from()
:
const { Readable } = require('node:stream')
const ac = new AbortController()
const signal = ac.signal
async function* generate() {
yield 'a'
await someLongRunningFn({ signal })
yield 'b'
yield 'c'
}
const readable = Readable.from(generate())
readable.on('close', () => {
ac.abort()
})
readable.on('data', chunk => {
console.log(chunk)
})
Encadeando para streams graváveis a partir de iteradores assíncronos
Ao gravar em um stream gravável a partir de um iterador assíncrono, garanta o tratamento correto da contrapressão e erros. stream.pipeline()
abstrai o tratamento da contrapressão e erros relacionados à contrapressão:
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')
const writable = fs.createWriteStream('./file')
const ac = new AbortController()
const signal = ac.signal
const iterator = createIterator({ signal })
// Padrão de Callback
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err)
} else {
console.log(value, 'value returned')
}
}).on('close', () => {
ac.abort()
})
// Padrão de Promise
pipelinePromise(iterator, writable)
.then(value => {
console.log(value, 'value returned')
})
.catch(err => {
console.error(err)
ac.abort()
})
Compatibilidade com versões antigas do Node.js
Antes do Node.js 0.10, a interface de stream Readable
era mais simples, mas também menos poderosa e menos útil.
- Em vez de esperar por chamadas ao método
stream.read()
, os eventos'data'
começariam a ser emitidos imediatamente. Aplicações que precisariam realizar alguma quantidade de trabalho para decidir como lidar com os dados eram obrigadas a armazenar os dados lidos em buffers para que os dados não fossem perdidos. - O método
stream.pause()
era consultivo, e não garantido. Isso significava que ainda era necessário estar preparado para receber eventos'data'
mesmo quando o stream estava em um estado pausado.
No Node.js 0.10, a classe Readable
foi adicionada. Para compatibilidade com versões anteriores de programas Node.js, os streams Readable
mudam para o "modo de fluxo" quando um manipulador de eventos 'data'
é adicionado, ou quando o método stream.resume()
é chamado. O efeito é que, mesmo sem usar o novo método stream.read()
e o evento 'readable'
, não é mais necessário se preocupar com a perda de pedaços de 'data'
.
Embora a maioria das aplicações continue funcionando normalmente, isso introduz um caso limite nas seguintes condições:
- Nenhum ouvinte de evento
'data'
é adicionado. - O método
stream.resume()
nunca é chamado. - O stream não é enviado para nenhum destino gravável.
Por exemplo, considere o seguinte código:
// AVISO! QUEBRADO!
net
.createServer(socket => {
// Adicionamos um ouvinte 'end', mas nunca consumimos os dados.
socket.on('end', () => {
// Nunca chegará aqui.
socket.end('A mensagem foi recebida, mas não foi processada.\n')
})
})
.listen(1337)
Antes do Node.js 0.10, os dados da mensagem de entrada seriam simplesmente descartados. No entanto, no Node.js 0.10 e posteriores, o socket permanece pausado para sempre.
A solução alternativa nessa situação é chamar o método stream.resume()
para iniciar o fluxo de dados:
// Solução alternativa.
net
.createServer(socket => {
socket.on('end', () => {
socket.end('A mensagem foi recebida, mas não foi processada.\n')
})
// Inicia o fluxo de dados, descartando-o.
socket.resume()
})
.listen(1337)
Além dos novos streams Readable
mudando para o modo de fluxo, streams no estilo anterior a 0.10 podem ser encapsulados em uma classe Readable
usando o método readable.wrap()
.
readable.read(0)
Existem alguns casos em que é necessário disparar uma atualização dos mecanismos de stream legível subjacentes, sem realmente consumir nenhum dado. Nesses casos, é possível chamar readable.read(0)
, que sempre retornará null
.
Se o buffer de leitura interno estiver abaixo do highWaterMark
, e o stream não estiver lendo atualmente, então chamar stream.read(0)
disparará uma chamada de baixo nível stream._read()
.
Embora a maioria dos aplicativos quase nunca precise fazer isso, existem situações dentro do Node.js em que isso é feito, particularmente nas internas da classe de stream Readable
.
readable.push('')
O uso de readable.push('')
não é recomendado.
Inserir uma <string> de zero byte, <Buffer>, <TypedArray> ou <DataView> em um stream que não está no modo objeto tem um efeito colateral interessante. Como é uma chamada para readable.push()
, a chamada encerrará o processo de leitura. No entanto, como o argumento é uma string vazia, nenhum dado é adicionado ao buffer legível, portanto não há nada para o usuário consumir.
Discrepância de highWaterMark
após chamar readable.setEncoding()
O uso de readable.setEncoding()
mudará o comportamento de como o highWaterMark
opera no modo não objeto.
Normalmente, o tamanho do buffer atual é medido em relação ao highWaterMark
em bytes. No entanto, depois que setEncoding()
é chamado, a função de comparação começará a medir o tamanho do buffer em caracteres.
Isso não é um problema em casos comuns com latin1
ou ascii
. Mas é aconselhável estar atento a esse comportamento ao trabalhar com strings que podem conter caracteres multibyte.