Stream
[Stabile: 2 - Stabile]
Stabile: 2 Stabilità: 2 - Stabile
Codice Sorgente: lib/stream.js
Uno stream è un'interfaccia astratta per lavorare con dati in streaming in Node.js. Il modulo node:stream
fornisce un'API per implementare l'interfaccia stream.
Node.js fornisce molti oggetti stream. Ad esempio, una richiesta a un server HTTP e process.stdout
sono entrambi istanze di stream.
Gli stream possono essere leggibili, scrivibili o entrambi. Tutti gli stream sono istanze di EventEmitter
.
Per accedere al modulo node:stream
:
const stream = require('node:stream')
Il modulo node:stream
è utile per creare nuovi tipi di istanze di stream. Di solito non è necessario utilizzare il modulo node:stream
per consumare stream.
Organizzazione di questo documento
Questo documento contiene due sezioni principali e una terza sezione per le note. La prima sezione spiega come utilizzare gli stream esistenti all'interno di un'applicazione. La seconda sezione spiega come creare nuovi tipi di stream.
Tipi di stream
Esistono quattro tipi di stream fondamentali in Node.js:
Writable
: stream in cui è possibile scrivere dati (ad esempio,fs.createWriteStream()
).Readable
: stream da cui è possibile leggere dati (ad esempio,fs.createReadStream()
).Duplex
: stream che sono siaReadable
cheWritable
(ad esempio,net.Socket
).Transform
: streamDuplex
che possono modificare o trasformare i dati mentre vengono scritti e letti (ad esempio,zlib.createDeflate()
).
Inoltre, questo modulo include le funzioni di utilità stream.duplexPair()
, stream.pipeline()
, stream.finished()
stream.Readable.from()
e stream.addAbortSignal()
.
API Stream Promises
Aggiunto in: v15.0.0
L'API stream/promises
fornisce un set alternativo di funzioni di utilità asincrone per gli stream che restituiscono oggetti Promise
invece di utilizzare callback. L'API è accessibile tramite require('node:stream/promises')
o require('node:stream').promises
.
stream.pipeline(source[, ...transforms], destination[, options])
stream.pipeline(streams[, options])
[Cronologia]
Versione | Modifiche |
---|---|
v18.0.0, v17.2.0, v16.14.0 | Aggiunta l'opzione end , che può essere impostata su false per evitare la chiusura automatica dello stream di destinazione quando termina la sorgente. |
v15.0.0 | Aggiunto in: v15.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function>- Restituisce: <Promise> | <AsyncIterable>
...transforms
<Stream> | <Function>source
<AsyncIterable>- Restituisce: <Promise> | <AsyncIterable>
destination
<Stream> | <Function>source
<AsyncIterable>- Restituisce: <Promise> | <AsyncIterable>
options
<Object> Opzioni pipelinesignal
<AbortSignal>end
<boolean> Termina lo stream di destinazione quando termina lo stream di origine. Gli stream di trasformazione vengono sempre terminati, anche se questo valore èfalse
. Predefinito:true
.
Restituisce: <Promise> Si completa quando la pipeline è completata.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
console.log('Pipeline completata.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Pipeline completata.')
Per utilizzare un AbortSignal
, passarlo all'interno di un oggetto options, come ultimo argomento. Quando il segnale viene interrotto, destroy
verrà chiamato sulla pipeline sottostante, con un AbortError
.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
const ac = new AbortController()
const signal = ac.signal
setImmediate(() => ac.abort())
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
signal,
})
}
run().catch(console.error) // AbortError
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
console.error(err) // AbortError
}
L'API pipeline
supporta anche i generatori asincroni:
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // Lavora con stringhe invece di `Buffer`.
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
fs.createWriteStream('uppercase.txt')
)
console.log('Pipeline completata.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // Lavora con stringhe invece di `Buffer`.
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
createWriteStream('uppercase.txt')
)
console.log('Pipeline completata.')
Ricordarsi di gestire l'argomento signal
passato al generatore asincrono. Soprattutto nel caso in cui il generatore asincrono sia la sorgente della pipeline (ovvero il primo argomento) o la pipeline non si completerà mai.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline completata.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline completata.')
L'API pipeline
fornisce anche una versione con callback:
stream.finished(stream[, options])
[Cronologia]
Versione | Modifiche |
---|---|
v19.5.0, v18.14.0 | Aggiunto supporto per ReadableStream e WritableStream . |
v19.1.0, v18.13.0 | Aggiunta l'opzione cleanup . |
v15.0.0 | Aggiunta in: v15.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> Un flusso/webstream leggibile e/o scrivibile.options
<Object>error
<boolean> | <undefined>readable
<boolean> | <undefined>writable
<boolean> | <undefined>signal
<AbortSignal> | <undefined>cleanup
<boolean> | <undefined> Setrue
, rimuove gli ascoltatori registrati da questa funzione prima che la promise venga soddisfatta. Default:false
.
Restituisce: <Promise> Viene soddisfatta quando il flusso non è più leggibile o scrivibile.
const { finished } = require('node:stream/promises')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('Stream ha terminato la lettura.')
}
run().catch(console.error)
rs.resume() // Svuota il flusso.
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'
const rs = createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('Stream ha terminato la lettura.')
}
run().catch(console.error)
rs.resume() // Svuota il flusso.
L'API finished
fornisce anche una versione con callback.
stream.finished()
lascia listener di eventi pendenti (in particolare 'error'
, 'end'
, 'finish'
e 'close'
) dopo che la promise restituita è risolta o rifiutata. La ragione di ciò è che eventi 'error'
inaspettati (a causa di implementazioni di flusso errate) non causano arresti anomali inaspettati. Se questo è un comportamento indesiderato, options.cleanup
dovrebbe essere impostato su true
:
await finished(rs, { cleanup: true })
Modalità oggetto
Tutti i flussi creati dalle API di Node.js operano esclusivamente su stringhe, oggetti <Buffer>, <TypedArray> e <DataView>:
Stringhe
eBuffer
sono i tipi più comuni utilizzati con i flussi.TypedArray
eDataView
consentono di gestire dati binari con tipi comeInt32Array
oUint8Array
. Quando si scrive un TypedArray o un DataView in un flusso, Node.js elabora i byte grezzi.
È possibile, tuttavia, che le implementazioni di flusso funzionino con altri tipi di valori JavaScript (ad eccezione di null
, che ha uno scopo speciale all'interno dei flussi). Tali flussi sono considerati operanti in "modalità oggetto".
Le istanze di flusso vengono commutate in modalità oggetto utilizzando l'opzione objectMode
quando viene creato il flusso. Tentare di commutare un flusso esistente in modalità oggetto non è sicuro.
Buffering
Sia i flussi Writable
che Readable
memorizzeranno i dati in un buffer interno.
La quantità di dati potenzialmente memorizzati nel buffer dipende dall'opzione highWaterMark
passata al costruttore del flusso. Per i flussi normali, l'opzione highWaterMark
specifica un numero totale di byte. Per i flussi che operano in modalità oggetto, highWaterMark
specifica un numero totale di oggetti. Per i flussi che operano su (ma non decodificano) stringhe, highWaterMark
specifica un numero totale di unità di codice UTF-16.
I dati vengono memorizzati nel buffer nei flussi Readable
quando l'implementazione chiama stream.push(chunk)
. Se il consumer del flusso non chiama stream.read()
, i dati rimarranno nella coda interna fino a quando non saranno consumati.
Una volta che la dimensione totale del buffer di lettura interno raggiunge la soglia specificata da highWaterMark
, il flusso interromperà temporaneamente la lettura dei dati dalla risorsa sottostante fino a quando i dati attualmente memorizzati nel buffer non potranno essere consumati (cioè, il flusso smetterà di chiamare il metodo interno readable._read()
utilizzato per riempire il buffer di lettura).
I dati vengono memorizzati nel buffer nei flussi Writable
quando il metodo writable.write(chunk)
viene chiamato ripetutamente. Mentre la dimensione totale del buffer di scrittura interno è inferiore alla soglia impostata da highWaterMark
, le chiamate a writable.write()
restituiranno true
. Una volta che la dimensione del buffer interno raggiunge o supera highWaterMark
, verrà restituito false
.
Un obiettivo chiave dell'API stream
, in particolare il metodo stream.pipe()
, è limitare la memorizzazione nel buffer dei dati a livelli accettabili in modo che le sorgenti e le destinazioni con velocità diverse non sovraccarichino la memoria disponibile.
L'opzione highWaterMark
è una soglia, non un limite: detta la quantità di dati che un flusso memorizza nel buffer prima di smettere di richiedere altri dati. In generale, non impone una limitazione di memoria rigorosa. Implementazioni di flusso specifiche possono scegliere di imporre limiti più rigorosi, ma ciò è facoltativo.
Poiché i flussi Duplex
e Transform
sono sia Readable
che Writable
, ognuno mantiene due buffer interni separati utilizzati per la lettura e la scrittura, consentendo a ciascuna parte di operare indipendentemente dall'altra mantenendo un flusso di dati appropriato ed efficiente. Ad esempio, le istanze net.Socket
sono flussi Duplex
il cui lato Readable
consente il consumo dei dati ricevuti da la socket e il cui lato Writable
consente di scrivere dati sulla socket. Poiché i dati possono essere scritti sulla socket a una velocità maggiore o minore rispetto alla ricezione dei dati, ciascun lato dovrebbe operare (e memorizzare nel buffer) indipendentemente dall'altro.
La meccanica della memorizzazione interna nel buffer è un dettaglio di implementazione interna e può essere modificata in qualsiasi momento. Tuttavia, per alcune implementazioni avanzate, i buffer interni possono essere recuperati utilizzando writable.writableBuffer
o readable.readableBuffer
. L'utilizzo di queste proprietà non documentate è sconsigliato.
API per i consumer di stream
Quasi tutte le applicazioni Node.js, per quanto semplici, utilizzano gli stream in qualche modo. Segue un esempio di utilizzo degli stream in un'applicazione Node.js che implementa un server HTTP:
const http = require('node:http')
const server = http.createServer((req, res) => {
// `req` è un http.IncomingMessage, che è uno stream leggibile.
// `res` è un http.ServerResponse, che è uno stream scrivibile.
let body = ''
// Ottieni i dati come stringhe utf8.
// Se non viene impostata una codifica, verranno ricevuti oggetti Buffer.
req.setEncoding('utf8')
// Gli stream leggibili emettono eventi 'data' una volta aggiunto un listener.
req.on('data', chunk => {
body += chunk
})
// L'evento 'end' indica che l'intero corpo è stato ricevuto.
req.on('end', () => {
try {
const data = JSON.parse(body)
// Riscrivi qualcosa di interessante per l'utente:
res.write(typeof data)
res.end()
} catch (er) {
// uh oh! json errato!
res.statusCode = 400
return res.end(`errore: ${er.message}`)
}
})
})
server.listen(1337)
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// errore: Unexpected token 'o', "not json" is not valid JSON
Gli stream Writable
(come res
nell'esempio) espongono metodi come write()
e end()
che vengono utilizzati per scrivere dati sullo stream.
Gli stream Readable
utilizzano l'API EventEmitter
per notificare al codice dell'applicazione quando i dati sono disponibili per essere letti dallo stream. Questi dati disponibili possono essere letti dallo stream in diversi modi.
Sia gli stream Writable
che Readable
utilizzano l'API EventEmitter
in vari modi per comunicare lo stato corrente dello stream.
Gli stream Duplex
e Transform
sono sia Writable
che Readable
.
Le applicazioni che scrivono dati su uno stream o consumano dati da uno stream non sono tenute a implementare direttamente le interfacce dello stream e generalmente non avranno motivo di chiamare require('node:stream')
.
Gli sviluppatori che desiderano implementare nuovi tipi di stream dovrebbero fare riferimento alla sezione API per gli implementatori di stream.
Stream scrivibili
Gli stream scrivibili sono un'astrazione per una destinazione in cui vengono scritti i dati.
Esempi di stream Writable
includono:
- Richieste HTTP, sul client
- Risposte HTTP, sul server
- Stream di scrittura fs
- Stream zlib
- Stream crypto
- Socket TCP
- child process stdin
process.stdout
,process.stderr
Alcuni di questi esempi sono in realtà stream Duplex
che implementano l'interfaccia Writable
.
Tutti gli stream Writable
implementano l'interfaccia definita dalla classe stream.Writable
.
Sebbene istanze specifiche di stream Writable
possano differire in vari modi, tutti gli stream Writable
seguono lo stesso schema di utilizzo fondamentale illustrato nell'esempio seguente:
const myStream = getWritableStreamSomehow()
myStream.write('some data')
myStream.write('some more data')
myStream.end('done writing data')
Classe: stream.Writable
Aggiunto in: v0.9.4
Evento: 'close'
[Cronologia]
Versione | Modifiche |
---|---|
v10.0.0 | Aggiunta opzione emitClose per specificare se 'close' viene emesso su destroy. |
v0.9.4 | Aggiunto in: v0.9.4 |
L'evento 'close'
viene emesso quando lo stream e qualsiasi risorsa sottostante (ad esempio, un descrittore di file) sono stati chiusi. L'evento indica che non saranno emessi più eventi e che non si verificheranno ulteriori calcoli.
Uno stream Writable
emetterà sempre l'evento 'close'
se viene creato con l'opzione emitClose
.
Evento: 'drain'
Aggiunto in: v0.9.4
Se una chiamata a stream.write(chunk)
restituisce false
, l'evento 'drain'
verrà emesso quando sarà opportuno riprendere la scrittura di dati nello stream.
// Scrive i dati nello stream scrivibile fornito un milione di volte.
// Attenzione alla contro-pressione.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000
write()
function write() {
let ok = true
do {
i--
if (i === 0) {
// Ultima volta!
writer.write(data, encoding, callback)
} else {
// Vediamo se dovremmo continuare o aspettare.
// Non passare la callback, perché non abbiamo ancora finito.
ok = writer.write(data, encoding)
}
} while (i > 0 && ok)
if (i > 0) {
// Doveva fermarsi prima!
// Scrivi ancora una volta che si svuota.
writer.once('drain', write)
}
}
}
Evento: 'error'
Aggiunto in: v0.9.4
L'evento 'error'
viene emesso se si verifica un errore durante la scrittura o il piping dei dati. La callback dell'ascoltatore riceve un singolo argomento Error
quando viene chiamata.
Il flusso viene chiuso quando viene emesso l'evento 'error'
a meno che l'opzione autoDestroy
non sia stata impostata su false
durante la creazione del flusso.
Dopo 'error'
, non dovrebbero essere emessi altri eventi oltre a 'close'
(inclusi gli eventi 'error'
).
Evento: 'finish'
Aggiunto in: v0.9.4
L'evento 'finish'
viene emesso dopo che il metodo stream.end()
è stato chiamato e tutti i dati sono stati scaricati nel sistema sottostante.
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
console.log('All writes are now complete.')
})
writer.end('This is the end\n')
Evento: 'pipe'
Aggiunto in: v0.9.4
src
<stream.Readable> flusso sorgente che sta effettuando il piping verso questo flusso scrivibile
L'evento 'pipe'
viene emesso quando il metodo stream.pipe()
viene chiamato su un flusso leggibile, aggiungendo questo flusso scrivibile al suo insieme di destinazioni.
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
console.log('Something is piping into the writer.')
assert.equal(src, reader)
})
reader.pipe(writer)
Evento: 'unpipe'
Aggiunto in: v0.9.4
src
<stream.Readable> Il flusso sorgente che ha unpiped questo flusso scrivibile
L'evento 'unpipe'
viene emesso quando il metodo stream.unpipe()
viene chiamato su un flusso Readable
, rimuovendo questo Writable
dal suo insieme di destinazioni.
Questo viene anche emesso nel caso in cui questo flusso Writable
emetta un errore quando un flusso Readable
effettua il piping in esso.
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
console.log('Something has stopped piping into the writer.')
assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()
Aggiunto in: v0.11.2
Il metodo writable.cork()
forza tutti i dati scritti ad essere bufferizzati in memoria. I dati bufferizzati saranno scaricati quando vengono chiamati i metodi stream.uncork()
o stream.end()
.
Lo scopo principale di writable.cork()
è quello di gestire una situazione in cui diversi piccoli chunk vengono scritti nello stream in rapida successione. Invece di inoltrarli immediatamente alla destinazione sottostante, writable.cork()
bufferizza tutti i chunk fino a quando non viene chiamato writable.uncork()
, che li passerà tutti a writable._writev()
, se presente. Questo previene una situazione di blocco head-of-line in cui i dati vengono bufferizzati mentre si aspetta che venga elaborato il primo piccolo chunk. Tuttavia, l'utilizzo di writable.cork()
senza implementare writable._writev()
potrebbe avere un effetto negativo sulla produttività.
Vedi anche: writable.uncork()
, writable._writev()
.
writable.destroy([error])
[Cronologia]
Versione | Modifiche |
---|---|
v14.0.0 | Funziona come no-op su uno stream che è già stato distrutto. |
v8.0.0 | Aggiunto in: v8.0.0 |
Distrugge lo stream. Opzionalmente emette un evento 'error'
e un evento 'close'
(a meno che emitClose
non sia impostato su false
). Dopo questa chiamata, lo stream scrivibile è terminato e le chiamate successive a write()
o end()
si tradurranno in un errore ERR_STREAM_DESTROYED
. Questo è un modo distruttivo e immediato per distruggere uno stream. Le chiamate precedenti a write()
potrebbero non essere state scaricate e potrebbero attivare un errore ERR_STREAM_DESTROYED
. Utilizzare end()
invece di destroy se i dati devono essere scaricati prima della chiusura, o attendere l'evento 'drain'
prima di distruggere lo stream.
const { Writable } = require('node:stream')
const myStream = new Writable()
const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.on('error', function wontHappen() {})
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED
Una volta che destroy()
è stato chiamato, qualsiasi ulteriore chiamata sarà un no-op e non saranno emessi ulteriori errori, eccetto quelli da _destroy()
, come 'error'
.
Gli implementatori non devono sovrascrivere questo metodo, ma devono invece implementare writable._destroy()
.
writable.closed
Aggiunto in: v18.0.0
È true
dopo che 'close'
è stato emesso.
writable.destroyed
Aggiunto in: v8.0.0
È true
dopo che writable.destroy()
è stato chiamato.
const { Writable } = require('node:stream')
const myStream = new Writable()
console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])
[Cronologia]
Versione | Modifiche |
---|---|
v22.0.0, v20.13.0 | L'argomento chunk può ora essere un'istanza di TypedArray o DataView . |
v15.0.0 | Il callback viene invocato prima di 'finish' o in caso di errore. |
v14.0.0 | Il callback viene invocato se viene emesso 'finish' o 'error'. |
v10.0.0 | Questo metodo ora restituisce un riferimento a writable . |
v8.0.0 | L'argomento chunk può ora essere un'istanza di Uint8Array . |
v0.9.4 | Aggiunto in: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> Dati opzionali da scrivere. Per i flussi che non operano in modalità oggetto,chunk
deve essere una <string>, <Buffer>, <TypedArray> o <DataView>. Per i flussi in modalità oggetto,chunk
può essere qualsiasi valore JavaScript diverso danull
.encoding
<string> La codifica sechunk
è una stringacallback
<Function> Callback per quando il flusso è terminato.- Restituisce: <this>
Chiamare il metodo writable.end()
segnala che nessun altro dato verrà scritto nel Writable
. Gli argomenti chunk
ed encoding
opzionali consentono di scrivere un ultimo ulteriore blocco di dati immediatamente prima di chiudere il flusso.
Chiamare il metodo stream.write()
dopo aver chiamato stream.end()
genererà un errore.
// Scrive 'hello, ' e poi termina con 'world!'.
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hello, ')
file.end('world!')
// Scrivere altro ora non è consentito!
writable.setDefaultEncoding(encoding)
[Cronologia]
Versione | Modifiche |
---|---|
v6.1.0 | Questo metodo ora restituisce un riferimento a writable . |
v0.11.15 | Aggiunto in: v0.11.15 |
Il metodo writable.setDefaultEncoding()
imposta la codifica predefinita encoding
per un flusso Writable
.
writable.uncork()
Aggiunto in: v0.11.2
Il metodo writable.uncork()
svuota tutti i dati in buffer da quando è stato chiamato stream.cork()
.
Quando si utilizzano writable.cork()
e writable.uncork()
per gestire il buffering delle scritture su un flusso, differire le chiamate a writable.uncork()
usando process.nextTick()
. Ciò consente il raggruppamento di tutte le chiamate writable.write()
che si verificano all'interno di una determinata fase del loop eventi di Node.js.
stream.cork()
stream.write('some ')
stream.write('data ')
process.nextTick(() => stream.uncork())
Se il metodo writable.cork()
viene chiamato più volte su un flusso, lo stesso numero di chiamate a writable.uncork()
deve essere chiamato per svuotare i dati in buffer.
stream.cork()
stream.write('some ')
stream.cork()
stream.write('data ')
process.nextTick(() => {
stream.uncork()
// I dati non verranno svuotati fino a quando uncork() non verrà chiamato una seconda volta.
stream.uncork()
})
Vedi anche: writable.cork()
.
writable.writable
Aggiunto in: v11.4.0
È true
se è sicuro chiamare writable.write()
, il che significa che il flusso non è stato distrutto, ha generato errori o terminato.
writable.writableAborted
Aggiunto in: v18.0.0, v16.17.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
Restituisce se il flusso è stato distrutto o ha generato errori prima di emettere 'finish'
.
writable.writableEnded
Aggiunto in: v12.9.0
È true
dopo che writable.end()
è stato chiamato. Questa proprietà non indica se i dati sono stati svuotati, per questo utilizzare writable.writableFinished
invece.
writable.writableCorked
Aggiunto in: v13.2.0, v12.16.0
Numero di volte che writable.uncork()
deve essere chiamato per scollegare completamente lo stream.
writable.errored
Aggiunto in: v18.0.0
Restituisce un errore se lo stream è stato distrutto con un errore.
writable.writableFinished
Aggiunto in: v12.6.0
È impostato su true
immediatamente prima che l'evento 'finish'
venga emesso.
writable.writableHighWaterMark
Aggiunto in: v9.3.0
Restituisce il valore di highWaterMark
passato durante la creazione di questo Writable
.
writable.writableLength
Aggiunto in: v9.4.0
Questa proprietà contiene il numero di byte (o oggetti) nella coda pronti per essere scritti. Il valore fornisce dati di introspezione sullo stato di highWaterMark
.
writable.writableNeedDrain
Aggiunto in: v15.2.0, v14.17.0
È true
se il buffer dello stream è pieno e lo stream emetterà 'drain'
.
writable.writableObjectMode
Aggiunto in: v12.3.0
Getter per la proprietà objectMode
di un dato stream Writable
.
writable[Symbol.asyncDispose]()
Aggiunto in: v22.4.0, v20.16.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
Chiama writable.destroy()
con un AbortError
e restituisce una promise che si risolve quando lo stream è terminato.
writable.write(chunk[, encoding][, callback])
[Cronologia]
Versione | Modifiche |
---|---|
v22.0.0, v20.13.0 | L'argomento chunk può ora essere un'istanza di TypedArray o DataView . |
v8.0.0 | L'argomento chunk può ora essere un'istanza di Uint8Array . |
v6.0.0 | Passare null come parametro chunk sarà sempre considerato non valido ora, anche in modalità oggetto. |
v0.9.4 | Aggiunto in: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> Dati opzionali da scrivere. Per gli stream che non operano in modalità oggetto,chunk
deve essere una <string>, <Buffer>, <TypedArray> o <DataView>. Per gli stream in modalità oggetto,chunk
può essere qualsiasi valore JavaScript diverso danull
.encoding
<string> | <null> La codifica, sechunk
è una stringa. Default:'utf8'
callback
<Function> Callback per quando questo blocco di dati viene svuotato.- Restituisce: <boolean>
false
se lo stream desidera che il codice chiamante aspetti che l'evento'drain'
venga emesso prima di continuare a scrivere dati aggiuntivi; altrimentitrue
.
Il metodo writable.write()
scrive alcuni dati nello stream e chiama il callback
fornito una volta che i dati sono stati completamente gestiti. Se si verifica un errore, il callback
verrà chiamato con l'errore come suo primo argomento. Il callback
viene chiamato in modo asincrono e prima che venga emesso 'error'
.
Il valore restituito è true
se il buffer interno è inferiore al highWaterMark
configurato quando lo stream è stato creato dopo aver ammesso chunk
. Se viene restituito false
, ulteriori tentativi di scrittura di dati nello stream dovrebbero interrompersi fino a quando non viene emesso l'evento 'drain'
.
Mentre uno stream non sta svuotando, le chiamate a write()
metteranno in buffer chunk
e restituiranno false. Una volta che tutti i chunk attualmente in buffer sono stati svuotati (accettati per la consegna dal sistema operativo), verrà emesso l'evento 'drain'
. Una volta che write()
restituisce false, non scrivere più chunk fino a quando non viene emesso l'evento 'drain'
. Sebbene sia consentito chiamare write()
su uno stream che non sta svuotando, Node.js metterà in buffer tutti i chunk scritti fino a quando non si verificherà l'utilizzo massimo della memoria, a quel punto si interromperà incondizionatamente. Anche prima che si interrompa, l'elevato utilizzo della memoria causerà scarse prestazioni del garbage collector e un elevato RSS (che in genere non viene rilasciato al sistema, nemmeno dopo che la memoria non è più necessaria). Poiché le socket TCP potrebbero non svuotarsi mai se il peer remoto non legge i dati, la scrittura di una socket che non sta svuotando potrebbe portare a una vulnerabilità sfruttabile da remoto.
La scrittura di dati mentre lo stream non sta svuotando è particolarmente problematica per un Transform
, perché gli stream Transform
sono in pausa per impostazione predefinita fino a quando non vengono collegati o viene aggiunto un gestore di eventi 'data'
o 'readable'
.
Se i dati da scrivere possono essere generati o recuperati su richiesta, si consiglia di incapsulare la logica in un Readable
e utilizzare stream.pipe()
. Tuttavia, se si preferisce chiamare write()
, è possibile rispettare la contropressione ed evitare problemi di memoria utilizzando l'evento 'drain'
:
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb)
} else {
process.nextTick(cb)
}
}
// Aspetta che cb venga chiamato prima di effettuare qualsiasi altra scrittura.
write('hello', () => {
console.log('Scrittura completata, esegui altre scritture ora.')
})
Uno stream Writable
in modalità oggetto ignorerà sempre l'argomento encoding
.
Flussi leggibili
I flussi leggibili sono un'astrazione per una sorgente da cui vengono consumati i dati.
Esempi di flussi Readable
includono:
- Risposte HTTP, sul client
- Richieste HTTP, sul server
- flussi di lettura fs
- flussi zlib
- flussi crypto
- socket TCP
- stdout e stderr del processo figlio
process.stdin
Tutti i flussi Readable
implementano l'interfaccia definita dalla classe stream.Readable
.
Due modalità di lettura
I flussi Readable
operano effettivamente in una di due modalità: flowing e paused. Queste modalità sono separate dalla modalità oggetto. Un flusso Readable
può essere in modalità oggetto o meno, indipendentemente dal fatto che sia in modalità flowing o paused.
- In modalità flowing, i dati vengono letti automaticamente dal sistema sottostante e forniti a un'applicazione il più rapidamente possibile utilizzando eventi tramite l'interfaccia
EventEmitter
. - In modalità paused, il metodo
stream.read()
deve essere chiamato esplicitamente per leggere i blocchi di dati dal flusso.
Tutti i flussi Readable
iniziano in modalità paused, ma possono essere commutati in modalità flowing in uno dei seguenti modi:
- Aggiungendo un gestore eventi
'data'
. - Chiamando il metodo
stream.resume()
. - Chiamando il metodo
stream.pipe()
per inviare i dati a unWritable
.
Il Readable
può tornare in modalità paused usando uno dei seguenti:
- Se non ci sono destinazioni pipe, chiamando il metodo
stream.pause()
. - Se ci sono destinazioni pipe, rimuovendo tutte le destinazioni pipe. Più destinazioni pipe possono essere rimosse chiamando il metodo
stream.unpipe()
.
Il concetto importante da ricordare è che un Readable
non genererà dati finché non viene fornito un meccanismo per consumare o ignorare tali dati. Se il meccanismo di consumo è disabilitato o rimosso, il Readable
tenterà di smettere di generare i dati.
Per motivi di compatibilità con le versioni precedenti, la rimozione dei gestori eventi 'data'
non metterà automaticamente in pausa il flusso. Inoltre, se ci sono destinazioni pipe, chiamare stream.pause()
non garantirà che il flusso rimarrà in pausa una volta che tali destinazioni si svuotano e richiedono più dati.
Se un Readable
viene commutato in modalità flowing e non ci sono consumer disponibili per gestire i dati, tali dati andranno persi. Ciò può verificarsi, ad esempio, quando il metodo readable.resume()
viene chiamato senza un listener associato all'evento 'data'
, o quando un gestore eventi 'data'
viene rimosso dal flusso.
Aggiungere un gestore eventi 'readable'
fa automaticamente smettere di fluire il flusso e i dati devono essere consumati tramite readable.read()
. Se il gestore eventi 'readable'
viene rimosso, il flusso ricomincerà a fluire se c'è un gestore eventi 'data'
.
Tre stati
Le "due modalità" di funzionamento di un flusso Readable
sono un'astrazione semplificata per la più complicata gestione dello stato interno che avviene all'interno dell'implementazione del flusso Readable
.
Specificamente, in qualsiasi momento, ogni Readable
si trova in uno dei tre stati possibili:
readable.readableFlowing === null
readable.readableFlowing === false
readable.readableFlowing === true
Quando readable.readableFlowing
è null
, non viene fornito alcun meccanismo per consumare i dati del flusso. Pertanto, il flusso non genererà dati. In questo stato, l'aggiunta di un listener per l'evento 'data'
, la chiamata al metodo readable.pipe()
, o la chiamata al metodo readable.resume()
cambierà readable.readableFlowing
in true
, facendo sì che Readable
inizi ad emettere attivamente eventi man mano che i dati vengono generati.
La chiamata a readable.pause()
, readable.unpipe()
, o la ricezione di backpressure farà sì che readable.readableFlowing
venga impostato su false
, interrompendo temporaneamente il flusso di eventi ma non interrompendo la generazione di dati. In questo stato, l'aggiunta di un listener per l'evento 'data'
non cambierà readable.readableFlowing
in true
.
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()
pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing è ora false.
pass.on('data', chunk => {
console.log(chunk.toString())
})
// readableFlowing è ancora false.
pass.write('ok') // Non emetterà 'data'.
pass.resume() // Deve essere chiamato per far sì che il flusso emetta 'data'.
// readableFlowing è ora true.
Mentre readable.readableFlowing
è false
, i dati potrebbero essere in accumulo nel buffer interno del flusso.
Scegliere uno stile di API
L'API del flusso Readable
si è evoluta attraverso diverse versioni di Node.js e fornisce diversi metodi per consumare i dati del flusso. In generale, gli sviluppatori dovrebbero scegliere un solo metodo per consumare i dati e non dovrebbero mai utilizzare più metodi per consumare dati da un singolo flusso. Specificamente, l'utilizzo di una combinazione di on('data')
, on('readable')
, pipe()
, o iteratori asincroni potrebbe portare a comportamenti non intuitivi.
Classe: stream.Readable
Aggiunto in: v0.9.4
Evento: 'close'
[Cronologia]
Versione | Modifiche |
---|---|
v10.0.0 | Aggiunta l'opzione emitClose per specificare se 'close' viene emesso su distruzione. |
v0.9.4 | Aggiunto in: v0.9.4 |
L'evento 'close'
viene emesso quando lo stream e le sue risorse sottostanti (ad esempio, un descrittore di file) sono stati chiusi. L'evento indica che non saranno emessi più eventi e che non si verificheranno ulteriori elaborazioni.
Uno stream Readable
emetterà sempre l'evento 'close'
se viene creato con l'opzione emitClose
.
Evento: 'data'
Aggiunto in: v0.9.4
chunk
<Buffer> | <string> | <any> Il blocco di dati. Per gli stream che non funzionano in modalità oggetto, il blocco sarà una stringa o unBuffer
. Per gli stream che sono in modalità oggetto, il blocco può essere qualsiasi valore JavaScript diverso danull
.
L'evento 'data'
viene emesso ogni volta che lo stream rilascia la proprietà di un blocco di dati a un consumer. Ciò può accadere ogni volta che lo stream viene commutato in modalità di flusso chiamando readable.pipe()
, readable.resume()
, o collegando una callback listener all'evento 'data'
. L'evento 'data'
verrà inoltre emesso ogni volta che viene chiamato il metodo readable.read()
ed è disponibile un blocco di dati da restituire.
Collegare un listener dell'evento 'data'
a uno stream che non è stato esplicitamente messo in pausa cambierà lo stream in modalità di flusso. I dati verranno quindi passati non appena saranno disponibili.
La callback del listener riceverà il blocco di dati come stringa se è stata specificata una codifica predefinita per lo stream usando il metodo readable.setEncoding()
; altrimenti i dati verranno passati come Buffer
.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Ricevuti ${chunk.length} byte di dati.`)
})
Evento: 'end'
Aggiunto in: v0.9.4
L'evento 'end'
viene emesso quando non ci sono più dati da consumare dallo stream.
L'evento 'end'
non verrà emesso a meno che i dati non siano completamente consumati. Questo può essere ottenuto passando lo stream in modalità flowing, o chiamando stream.read()
ripetutamente fino a quando tutti i dati non sono stati consumati.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Ricevuti ${chunk.length} byte di dati.`)
})
readable.on('end', () => {
console.log('Non ci saranno più dati.')
})
Evento: 'error'
Aggiunto in: v0.9.4
L'evento 'error'
può essere emesso da un'implementazione Readable
in qualsiasi momento. Tipicamente, questo può accadere se lo stream sottostante non è in grado di generare dati a causa di un errore interno sottostante, o quando un'implementazione dello stream tenta di inserire un chunk di dati non valido.
La callback dell'ascoltatore riceverà un singolo oggetto Error
.
Evento: 'pause'
Aggiunto in: v0.9.4
L'evento 'pause'
viene emesso quando viene chiamato stream.pause()
e readableFlowing
non è false
.
Evento: 'readable'
[Cronologia]
Versione | Modifiche |
---|---|
v10.0.0 | L''readable' viene sempre emesso nel tick successivo dopo che .push() è stato chiamato. |
v10.0.0 | L'utilizzo di 'readable' richiede la chiamata a .read() . |
v0.9.4 | Aggiunto in: v0.9.4 |
L'evento 'readable'
viene emesso quando ci sono dati disponibili da leggere dallo stream, fino al limite massimo configurato (state.highWaterMark
). Effettivamente, indica che lo stream ha nuove informazioni all'interno del buffer. Se i dati sono disponibili all'interno di questo buffer, stream.read()
può essere chiamato per recuperare quei dati. Inoltre, l'evento 'readable'
può anche essere emesso quando è stata raggiunta la fine dello stream.
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
// Ci sono alcuni dati da leggere ora.
let data
while ((data = this.read()) !== null) {
console.log(data)
}
})
Se è stata raggiunta la fine dello stream, chiamare stream.read()
restituirà null
e innescherà l'evento 'end'
. Questo è vero anche se non c'erano mai dati da leggere. Ad esempio, nell'esempio seguente, foo.txt
è un file vuoto:
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
console.log('end')
})
L'output dell'esecuzione di questo script è:
$ node test.js
readable: null
end
In alcuni casi, l'aggancio di un ascoltatore per l'evento 'readable'
causerà la lettura di una certa quantità di dati in un buffer interno.
In generale, i meccanismi readable.pipe()
ed evento 'data'
sono più facili da capire dell'evento 'readable'
. Tuttavia, la gestione di 'readable'
potrebbe comportare un aumento della produttività.
Se vengono utilizzati contemporaneamente sia 'readable'
che 'data'
, 'readable'
ha la precedenza nel controllo del flusso, ovvero 'data'
verrà emesso solo quando viene chiamato stream.read()
. La proprietà readableFlowing
diventerà false
. Se ci sono ascoltatori 'data'
quando 'readable'
viene rimosso, lo stream inizierà a fluire, ovvero gli eventi 'data'
verranno emessi senza chiamare .resume()
.
Evento: 'resume'
Aggiunto in: v0.9.4
L'evento 'resume'
viene emesso quando viene chiamato stream.resume()
e readableFlowing
non è true
.
readable.destroy([error])
[Cronologia]
Versione | Modifiche |
---|---|
v14.0.0 | Funziona come no-op su un flusso che è già stato distrutto. |
v8.0.0 | Aggiunto in: v8.0.0 |
Distrugge il flusso. Facoltativamente emette un evento 'error'
e un evento 'close'
(a meno che emitClose
non sia impostato su false
). Dopo questa chiamata, il flusso leggibile rilascerà tutte le risorse interne e le chiamate successive a push()
verranno ignorate.
Una volta chiamato destroy()
, ulteriori chiamate saranno no-op e non saranno emessi ulteriori errori, eccetto quelli da _destroy()
, come 'error'
.
Gli implementatori non dovrebbero sovrascrivere questo metodo, ma implementare invece readable._destroy()
.
readable.closed
Aggiunto in: v18.0.0
È true
dopo che 'close'
è stato emesso.
readable.destroyed
Aggiunto in: v8.0.0
È true
dopo che readable.destroy()
è stato chiamato.
readable.isPaused()
Aggiunto in: v0.11.14
- Restituisce: <boolean>
Il metodo readable.isPaused()
restituisce lo stato operativo corrente del Readable
. Questo viene utilizzato principalmente dal meccanismo che sta alla base del metodo readable.pipe()
. Nella maggior parte dei casi tipici, non ci sarà motivo di utilizzare questo metodo direttamente.
const readable = new stream.Readable()
readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()
Aggiunto in: v0.9.4
- Restituisce: <this>
Il metodo readable.pause()
farà sì che uno stream in modalità flowing smetta di emettere eventi 'data'
, uscendo dalla modalità flowing. Qualsiasi dato che diventi disponibile rimarrà nel buffer interno.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Ricevuti ${chunk.length} byte di dati.`)
readable.pause()
console.log('Non ci saranno ulteriori dati per 1 secondo.')
setTimeout(() => {
console.log('Ora i dati riprenderanno a fluire.')
readable.resume()
}, 1000)
})
Il metodo readable.pause()
non ha effetto se è presente un listener dell'evento 'readable'
.
readable.pipe(destination[, options])
Aggiunto in: v0.9.4
destination
<stream.Writable> La destinazione per la scrittura dei datioptions
<Object> Opzioni di pipeend
<boolean> Termina lo scrittore quando il lettore termina. Default:true
.
Restituisce: <stream.Writable> La destinazione, permettendo una catena di pipe se si tratta di uno stream
Duplex
oTransform
Il metodo readable.pipe()
aggancia uno stream Writable
al readable
, facendolo passare automaticamente in modalità flowing e spingendo tutti i suoi dati nello stream Writable
agganciato. Il flusso di dati sarà gestito automaticamente in modo che lo stream Writable
di destinazione non sia sopraffatto da uno stream Readable
più veloce.
L'esempio seguente invia tutti i dati da readable
in un file chiamato file.txt
:
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Tutti i dati da readable vanno in 'file.txt'.
readable.pipe(writable)
È possibile agganciare più stream Writable
a un singolo stream Readable
.
Il metodo readable.pipe()
restituisce un riferimento allo stream destinazione, rendendo possibile impostare catene di stream in pipe:
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)
Per impostazione predefinita, stream.end()
viene chiamato sullo stream Writable
di destinazione quando lo stream Readable
di origine emette 'end'
, in modo che la destinazione non sia più scrivibile. Per disabilitare questo comportamento predefinito, l'opzione end
può essere passata come false
, facendo sì che lo stream di destinazione rimanga aperto:
reader.pipe(writer, { end: false })
reader.on('end', () => {
writer.end('Arrivederci\n')
})
Un'importante avvertenza è che se lo stream Readable
emette un errore durante l'elaborazione, la destinazione Writable
non viene chiusa automaticamente. Se si verifica un errore, sarà necessario chiudere manualmente ogni stream per evitare perdite di memoria.
Gli stream Writable
process.stderr
e process.stdout
non vengono mai chiusi fino all'uscita del processo Node.js, indipendentemente dalle opzioni specificate.
readable.read([size])
Aggiunto in: v0.9.4
size
<number> Argomento opzionale per specificare quanta dati leggere.- Restituisce: <string> | <Buffer> | <null> | <any>
Il metodo readable.read()
legge i dati dal buffer interno e li restituisce. Se non sono disponibili dati da leggere, viene restituito null
. Per impostazione predefinita, i dati vengono restituiti come oggetto Buffer
a meno che non sia stata specificata una codifica usando il metodo readable.setEncoding()
o lo stream stia operando in modalità oggetto.
L'argomento opzionale size
specifica un numero specifico di byte da leggere. Se non sono disponibili size
byte da leggere, verrà restituito null
a meno che lo stream non sia terminato, nel qual caso verranno restituiti tutti i dati rimanenti nel buffer interno.
Se l'argomento size
non è specificato, verranno restituiti tutti i dati contenuti nel buffer interno.
L'argomento size
deve essere minore o uguale a 1 GiB.
Il metodo readable.read()
dovrebbe essere chiamato solo su stream Readable
che operano in modalità in pausa. In modalità flowing, readable.read()
viene chiamato automaticamente fino a quando il buffer interno non è completamente svuotato.
const readable = getReadableStreamSomehow()
// 'readable' può essere attivato più volte mentre i dati vengono memorizzati nel buffer
readable.on('readable', () => {
let chunk
console.log('Lo stream è leggibile (nuovi dati ricevuti nel buffer)')
// Usa un ciclo per assicurarti di leggere tutti i dati attualmente disponibili
while (null !== (chunk = readable.read())) {
console.log(`Letti ${chunk.length} byte di dati...`)
}
})
// 'end' verrà attivato una volta quando non ci saranno più dati disponibili
readable.on('end', () => {
console.log('Raggiunto la fine dello stream.')
})
Ogni chiamata a readable.read()
restituisce un blocco di dati o null
, a significare che non ci sono più dati da leggere in quel momento. Questi blocchi non vengono concatenati automaticamente. Poiché una singola chiamata read()
non restituisce tutti i dati, potrebbe essere necessario utilizzare un ciclo while per leggere continuamente i blocchi fino a quando non vengono recuperati tutti i dati. Quando si legge un file di grandi dimensioni, .read()
potrebbe restituire temporaneamente null
, indicando che ha consumato tutto il contenuto in buffer ma potrebbero esserci altri dati ancora da memorizzare nel buffer. In tali casi, viene emesso un nuovo evento 'readable'
una volta che ci sono più dati nel buffer, e l'evento 'end'
segnala la fine della trasmissione dei dati.
Pertanto, per leggere l'intero contenuto di un file da un readable
, è necessario raccogliere i blocchi attraverso più eventi 'readable'
:
const chunks = []
readable.on('readable', () => {
let chunk
while (null !== (chunk = readable.read())) {
chunks.push(chunk)
}
})
readable.on('end', () => {
const content = chunks.join('')
})
Uno stream Readable
in modalità oggetto restituirà sempre un singolo elemento da una chiamata a readable.read(size)
, indipendentemente dal valore dell'argomento size
.
Se il metodo readable.read()
restituisce un blocco di dati, verrà emesso anche un evento 'data'
.
Chiamare stream.read([size])
dopo che l'evento 'end'
è stato emesso restituirà null
. Non verrà sollevato alcun errore di runtime.
readable.readable
Aggiunto in: v11.4.0
È true
se è sicuro chiamare readable.read()
, il che significa che il flusso non è stato distrutto o ha emesso 'error'
o 'end'
.
readable.readableAborted
Aggiunto in: v16.8.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
Restituisce se il flusso è stato distrutto o ha generato un errore prima di emettere 'end'
.
readable.readableDidRead
Aggiunto in: v16.7.0, v14.18.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
Restituisce se 'data'
è stato emesso.
readable.readableEncoding
Aggiunto in: v12.7.0
Getter per la proprietà encoding
di un dato flusso Readable
. La proprietà encoding
può essere impostata usando il metodo readable.setEncoding()
.
readable.readableEnded
Aggiunto in: v12.9.0
Diventa true
quando viene emesso l'evento 'end'
.
readable.errored
Aggiunto in: v18.0.0
Restituisce un errore se il flusso è stato distrutto con un errore.
readable.readableFlowing
Aggiunto in: v9.4.0
Questa proprietà riflette lo stato corrente di un flusso Readable
come descritto nella sezione Tre stati.
readable.readableHighWaterMark
Aggiunto in: v9.3.0
Restituisce il valore di highWaterMark
passato durante la creazione di questo Readable
.
readable.readableLength
Aggiunto in: v9.4.0
Questa proprietà contiene il numero di byte (o oggetti) nella coda pronti per essere letti. Il valore fornisce dati di introspezione sullo stato di highWaterMark
.
readable.readableObjectMode
Aggiunto in: v12.3.0
Getter per la proprietà objectMode
di un dato stream Readable
.
readable.resume()
[Cronologia]
Versione | Modifiche |
---|---|
v10.0.0 | Il metodo resume() non ha effetto se è presente un listener per l'evento 'readable' . |
v0.9.4 | Aggiunto in: v0.9.4 |
- Restituisce: <this>
Il metodo readable.resume()
fa sì che uno stream Readable
esplicitamente in pausa riprenda l'emissione di eventi 'data'
, passando lo stream in modalità flowing.
Il metodo readable.resume()
può essere utilizzato per consumare completamente i dati da uno stream senza elaborare effettivamente nessuno di quei dati:
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Raggiunto la fine, ma non è stato letto nulla.')
})
Il metodo readable.resume()
non ha effetto se è presente un listener per l'evento 'readable'
.
readable.setEncoding(encoding)
Aggiunto in: v0.9.4
Il metodo readable.setEncoding()
imposta la codifica dei caratteri per i dati letti dallo stream Readable
.
Per impostazione predefinita, non viene assegnata alcuna codifica e i dati dello stream saranno restituiti come oggetti Buffer
. Impostando una codifica, i dati dello stream vengono restituiti come stringhe della codifica specificata anziché come oggetti Buffer
. Ad esempio, chiamare readable.setEncoding('utf8')
farà sì che i dati di output vengano interpretati come dati UTF-8 e passati come stringhe. Chiamare readable.setEncoding('hex')
farà sì che i dati vengano codificati in formato stringa esadecimale.
Lo stream Readable
gestirà correttamente i caratteri multi-byte forniti tramite lo stream che altrimenti verrebbero decodificati in modo errato se semplicemente estratti dallo stream come oggetti Buffer
.
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
assert.equal(typeof chunk, 'string')
console.log('Ricevuti %d caratteri di dati stringa:', chunk.length)
})
readable.unpipe([destination])
Aggiunto in: v0.9.4
destination
<stream.Writable> Stream specifico opzionale da scollegare- Restituisce: <this>
Il metodo readable.unpipe()
stacca uno stream Writable
precedentemente collegato usando il metodo stream.pipe()
.
Se destination
non è specificato, vengono staccati tutti i pipe.
Se destination
è specificato, ma nessun pipe è impostato per esso, il metodo non fa nulla.
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Tutti i dati da readable vanno in 'file.txt',
// ma solo per il primo secondo.
readable.pipe(writable)
setTimeout(() => {
console.log('Stop writing to file.txt.')
readable.unpipe(writable)
console.log('Manually close the file stream.')
writable.end()
}, 1000)
readable.unshift(chunk[, encoding])
[Cronologia]
Versione | Modifiche |
---|---|
v22.0.0, v20.13.0 | L'argomento chunk può ora essere un'istanza di TypedArray o DataView . |
v8.0.0 | L'argomento chunk può ora essere un'istanza di Uint8Array . |
v0.9.11 | Aggiunto in: v0.9.11 |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Frammento di dati da inserire nella coda di lettura. Per gli stream che non operano in modalità oggetto,chunk
deve essere una <string>, <Buffer>, <TypedArray>, <DataView> onull
. Per gli stream in modalità oggetto,chunk
può essere qualsiasi valore JavaScript.encoding
<string> Codifica dei frammenti di stringa. Deve essere una codificaBuffer
valida, come'utf8'
o'ascii'
.
Passare chunk
come null
segnala la fine dello stream (EOF) e si comporta allo stesso modo di readable.push(null)
, dopo di che non è possibile scrivere più dati. Il segnale EOF viene inserito alla fine del buffer e tutti i dati in buffer verranno comunque svuotati.
Il metodo readable.unshift()
inserisce un frammento di dati nel buffer interno. Questo è utile in determinate situazioni in cui uno stream viene consumato da codice che deve "dis-consumare" una certa quantità di dati che ha estratto ottimisticamente dalla sorgente, in modo che i dati possano essere trasmessi ad un'altra parte.
Il metodo stream.unshift(chunk)
non può essere chiamato dopo che l'evento 'end'
è stato emesso, altrimenti verrà generato un errore di runtime.
Gli sviluppatori che usano stream.unshift()
dovrebbero spesso considerare di passare all'utilizzo di uno stream Transform
invece. Vedi la sezione API per gli sviluppatori di stream per maggiori informazioni.
// Estrai un'intestazione delimitata da \n\n.
// Usa unshift() se ne ottieni troppo.
// Chiama la callback con (errore, intestazione, stream).
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
stream.on('error', callback)
stream.on('readable', onReadable)
const decoder = new StringDecoder('utf8')
let header = ''
function onReadable() {
let chunk
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk)
if (str.includes('\n\n')) {
// Trovata la delimitazione dell'intestazione.
const split = str.split(/\n\n/)
header += split.shift()
const remaining = split.join('\n\n')
const buf = Buffer.from(remaining, 'utf8')
stream.removeListener('error', callback)
// Rimuovi l'ascoltatore 'readable' prima di unshifting.
stream.removeListener('readable', onReadable)
if (buf.length) stream.unshift(buf)
// Ora il corpo del messaggio può essere letto dallo stream.
callback(null, header, stream)
return
}
// Lettura ancora dell'intestazione.
header += str
}
}
}
A differenza di stream.push(chunk)
, stream.unshift(chunk)
non terminerà il processo di lettura reimpostando lo stato di lettura interno dello stream. Ciò può causare risultati imprevisti se readable.unshift()
viene chiamato durante una lettura (ad esempio, dall'interno di un'implementazione stream._read()
su uno stream personalizzato). Seguire la chiamata a readable.unshift()
con un immediato stream.push('')
reimposterà correttamente lo stato di lettura, tuttavia è meglio evitare semplicemente di chiamare readable.unshift()
durante l'esecuzione di una lettura.
readable.wrap(stream)
Aggiunto in: v0.9.4
Prima di Node.js 0.10, i flussi non implementavano l'intera API del modulo node:stream
come attualmente definita. (Vedi Compatibilità per maggiori informazioni.)
Quando si utilizza una libreria Node.js più vecchia che emette eventi 'data'
e ha un metodo stream.pause()
che è solo consigliativo, il metodo readable.wrap()
può essere utilizzato per creare un flusso Readable
che utilizza il vecchio flusso come sua sorgente dati.
Sarà raramente necessario utilizzare readable.wrap()
, ma il metodo è stato fornito per comodità nell'interazione con applicazioni e librerie Node.js più vecchie.
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)
myReader.on('readable', () => {
myReader.read() // ecc.
})
readable[Symbol.asyncIterator]()
[Cronologia]
Versione | Modifiche |
---|---|
v11.14.0 | Il supporto di Symbol.asyncIterator non è più sperimentale. |
v10.0.0 | Aggiunto in: v10.0.0 |
- Restituisce: <AsyncIterator> per consumare completamente il flusso.
const fs = require('node:fs')
async function print(readable) {
readable.setEncoding('utf8')
let data = ''
for await (const chunk of readable) {
data += chunk
}
console.log(data)
}
print(fs.createReadStream('file')).catch(console.error)
Se il ciclo termina con un break
, return
o un throw
, il flusso verrà distrutto. In altri termini, iterare su un flusso consumerà completamente il flusso. Il flusso verrà letto in chunk di dimensioni pari all'opzione highWaterMark
. Nell'esempio di codice sopra, i dati saranno in un singolo chunk se il file ha meno di 64 KiB di dati perché non viene fornita alcuna opzione highWaterMark
a fs.createReadStream()
.
readable[Symbol.asyncDispose]()
Aggiunto in: v20.4.0, v18.18.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
Chiama readable.destroy()
con un AbortError
e restituisce una promise che si completa quando lo stream è terminato.
readable.compose(stream[, options])
Aggiunto in: v19.1.0, v18.13.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
stream
<Stream> | <Iterable> | <AsyncIterable> | <Function>options
<Object>signal
<AbortSignal> permette di distruggere lo stream se il segnale viene interrotto.
Restituisce: <Duplex> uno stream composto con lo stream
stream
.
import { Readable } from 'node:stream'
async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ')
for (const word of words) {
yield word
}
}
}
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()
console.log(words) // stampa ['this', 'is', 'compose', 'as', 'operator']
Vedi stream.compose
per maggiori informazioni.
readable.iterator([options])
Aggiunto in: v16.3.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
options
<Object>destroyOnReturn
<boolean> Quando impostato sufalse
, chiamarereturn
sull'iteratore asincrono, o uscire da un ciclofor await...of
usandobreak
,return
othrow
non distruggerà lo stream. Default:true
.
Restituisce: <AsyncIterator> per consumare lo stream.
L'iteratore creato da questo metodo offre agli utenti la possibilità di annullare la distruzione dello stream se il ciclo for await...of
viene lasciato con return
, break
o throw
, o se l'iteratore dovrebbe distruggere lo stream se lo stream ha emesso un errore durante l'iterazione.
const { Readable } = require('node:stream')
async function printIterator(readable) {
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk) // 1
break
}
console.log(readable.destroyed) // false
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk) // Stamperà 2 e poi 3
}
console.log(readable.destroyed) // True, lo stream è stato completamente consumato
}
async function printSymbolAsyncIterator(readable) {
for await (const chunk of readable) {
console.log(chunk) // 1
break
}
console.log(readable.destroyed) // true
}
async function showBoth() {
await printIterator(Readable.from([1, 2, 3]))
await printSymbolAsyncIterator(Readable.from([1, 2, 3]))
}
showBoth()
readable.map(fn[, options])
[Cronologia]
Versione | Modifiche |
---|---|
v20.7.0, v18.19.0 | Aggiunto highWaterMark nelle opzioni. |
v17.4.0, v16.14.0 | Aggiunto in: v17.4.0, v16.14.0 |
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
fn
<Funzione> | <AsyncFunction> una funzione da mappare su ogni chunk nel flusso.data
<qualsiasi> un chunk di dati dal flusso.options
<Oggetto>signal
<AbortSignal> interrotto se il flusso viene distrutto, consentendo di interrompere precocemente la chiamatafn
.
options
<Oggetto>concurrency
<numero> il numero massimo di invocazioni concorrenti difn
da chiamare contemporaneamente sul flusso. Default:1
.highWaterMark
<numero> quanti elementi mettere in buffer in attesa del consumo da parte dell'utente degli elementi mappati. Default:concurrency * 2 - 1
.signal
<AbortSignal> permette di distruggere il flusso se il segnale viene interrotto.
Restituisce: <Readable> un flusso mappato con la funzione
fn
.
Questo metodo permette di mappare il flusso. La funzione fn
verrà chiamata per ogni chunk nel flusso. Se la funzione fn
restituisce una promise, tale promise verrà await
ata prima di essere passata al flusso di risultato.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// Con un mapper sincrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).map(x => x * 2)) {
console.log(chunk) // 2, 4, 6, 8
}
// Con un mapper asincrono, effettuando al massimo 2 query contemporaneamente.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
domain => resolver.resolve4(domain),
{ concurrency: 2 }
)
for await (const result of dnsResults) {
console.log(result) // Stampa il risultato DNS di resolver.resolve4.
}
readable.filter(fn[, options])
[Cronologia]
Versione | Modifiche |
---|---|
v20.7.0, v18.19.0 | Aggiunto highWaterMark nelle opzioni. |
v17.4.0, v16.14.0 | Aggiunto in: v17.4.0, v16.14.0 |
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
fn
<Funzione> | <AsyncFunction> una funzione per filtrare i chunk dal flusso.data
<qualsiasi> un chunk di dati dal flusso.options
<Oggetto>signal
<AbortSignal> interrotto se il flusso viene distrutto, consentendo di interrompere la chiamatafn
in anticipo.
options
<Oggetto>concurrency
<numero> il numero massimo di invocazioni concorrenti difn
da chiamare sul flusso contemporaneamente. Predefinito:1
.highWaterMark
<numero> quanti elementi bufferizzare mentre si attende il consumo degli elementi filtrati da parte dell'utente. Predefinito:concurrency * 2 - 1
.signal
<AbortSignal> consente di distruggere il flusso se il segnale viene interrotto.
Restituisce: <Readable> un flusso filtrato con il predicato
fn
.
Questo metodo consente di filtrare il flusso. Per ogni chunk nel flusso verrà chiamata la funzione fn
e, se restituisce un valore truthy, il chunk verrà passato al flusso di risultato. Se la funzione fn
restituisce una promise, tale promise verrà await
ata.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// Con un predicato sincrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
console.log(chunk) // 3, 4
}
// Con un predicato asincrono, effettuando al massimo 2 query contemporaneamente.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).filter(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address.ttl > 60
},
{ concurrency: 2 }
)
for await (const result of dnsResults) {
// Registra i domini con più di 60 secondi sul record DNS risolto.
console.log(result)
}
readable.forEach(fn[, options])
Aggiunto in: v17.5.0, v16.15.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
fn
<Function> | <AsyncFunction> una funzione da chiamare su ogni chunk del flusso.data
<any> un chunk di dati dal flusso.options
<Object>signal
<AbortSignal> interrotto se il flusso viene distrutto, consentendo di interrompere precocemente la chiamatafn
.
options
<Object>concurrency
<number> il massimo numero di invocazioni concorrenti difn
da chiamare sul flusso contemporaneamente. Predefinito:1
.signal
<AbortSignal> consente di distruggere il flusso se il segnale viene interrotto.
Restituisce: <Promise> una promise per quando il flusso è terminato.
Questo metodo consente di iterare un flusso. Per ogni chunk nel flusso verrà chiamata la funzione fn
. Se la funzione fn
restituisce una promise, quella promise verrà await
ed.
Questo metodo è diverso dai cicli for await...of
in quanto può elaborare i chunk in modo concorrente. Inoltre, un'iterazione forEach
può essere interrotta solo passando un'opzione signal
e interrompendo il relativo AbortController
, mentre for await...of
può essere interrotto con break
o return
. In entrambi i casi il flusso verrà distrutto.
Questo metodo è diverso dall'ascolto dell'evento 'data'
in quanto utilizza l'evento readable
nel meccanismo sottostante e può limitare il numero di chiamate concorrenti fn
.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// Con un predicato sincrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
console.log(chunk) // 3, 4
}
// Con un predicato asincrono, effettuando al massimo 2 query contemporaneamente.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address
},
{ concurrency: 2 }
)
await dnsResults.forEach(result => {
// Registra il risultato, simile a `for await (const result of dnsResults)`
console.log(result)
})
console.log('done') // Il flusso è terminato
readable.toArray([options])
Aggiunto in: v17.5.0, v16.15.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
options
<Object>signal
<AbortSignal> permette di annullare l'operazione toArray se il segnale viene interrotto.
Restituisce: <Promise> una promise contenente un array con il contenuto dello stream.
Questo metodo permette di ottenere facilmente il contenuto di uno stream.
Poiché questo metodo legge l'intero stream in memoria, annulla i vantaggi degli stream. È destinato all'interoperabilità e alla convenienza, non come modo principale per consumare gli stream.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
await Readable.from([1, 2, 3, 4]).toArray() // [1, 2, 3, 4]
// Effettua query DNS in concorrenza usando .map e raccoglie
// i risultati in un array usando toArray
const dnsResults = await Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org'])
.map(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address
},
{ concurrency: 2 }
)
.toArray()
readable.some(fn[, options])
Aggiunto in: v17.5.0, v16.15.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
fn
<Function> | <AsyncFunction> una funzione da chiamare su ogni chunk dello stream.data
<any> un chunk di dati dallo stream.options
<Object>signal
<AbortSignal> interrotto se lo stream viene distrutto permettendo di interrompere la chiamatafn
anticipatamente.
options
<Object>concurrency
<number> il massimo numero di invocazioni concorrenti difn
da chiamare sullo stream contemporaneamente. Default:1
.signal
<AbortSignal> permette di distruggere lo stream se il segnale viene interrotto.
Restituisce: <Promise> una promise che restituisce
true
sefn
ha restituito un valore truthy per almeno uno dei chunk.
Questo metodo è simile a Array.prototype.some
e chiama fn
su ogni chunk nello stream finché il valore restituito atteso non è true
(o qualsiasi valore truthy). Una volta che il valore restituito atteso di una chiamata fn
su un chunk è truthy, lo stream viene distrutto e la promise viene completata con true
. Se nessuna delle chiamate fn
sui chunk restituisce un valore truthy, la promise viene completata con false
.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// Con un predicato sincrono.
await Readable.from([1, 2, 3, 4]).some(x => x > 2) // true
await Readable.from([1, 2, 3, 4]).some(x => x < 0) // false
// Con un predicato asincrono, effettuando al massimo 2 controlli file alla volta.
const anyBigFile = await Readable.from(['file1', 'file2', 'file3']).some(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
console.log(anyBigFile) // `true` se qualsiasi file nell'elenco è più grande di 1MB
console.log('fatto') // Lo stream è terminato
readable.find(fn[, options])
Aggiunto in: v17.5.0, v16.17.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
fn
<Function> | <AsyncFunction> una funzione da chiamare su ogni chunk del flusso.data
<any> un chunk di dati dal flusso.options
<Object>signal
<AbortSignal> abortito se il flusso viene distrutto, consentendo di abortire precocemente la chiamatafn
.
options
<Object>concurrency
<number> il numero massimo di invocazioni concorrenti difn
da chiamare sul flusso contemporaneamente. Default:1
.signal
<AbortSignal> permette di distruggere il flusso se il segnale viene abortito.
Restituisce: <Promise> una promise che restituisce il primo chunk per cui
fn
ha restituito un valore truthy, oppureundefined
se nessun elemento è stato trovato.
Questo metodo è simile a Array.prototype.find
e chiama fn
su ogni chunk nel flusso per trovare un chunk con un valore truthy per fn
. Una volta che il valore di ritorno atteso di una chiamata fn
è truthy, il flusso viene distrutto e la promise viene completata con il valore per cui fn
ha restituito un valore truthy. Se tutte le chiamate fn
sui chunk restituiscono un valore falsy, la promise viene completata con undefined
.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// Con un predicato sincrono.
await Readable.from([1, 2, 3, 4]).find(x => x > 2) // 3
await Readable.from([1, 2, 3, 4]).find(x => x > 0) // 1
await Readable.from([1, 2, 3, 4]).find(x => x > 10) // undefined
// Con un predicato asincrono, effettuando al massimo 2 controlli file contemporaneamente.
const foundBigFile = await Readable.from(['file1', 'file2', 'file3']).find(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
console.log(foundBigFile) // Nome del file di grandi dimensioni, se presente un file nell'elenco di dimensioni superiori a 1 MB
console.log('fatto') // Il flusso è terminato
readable.every(fn[, options])
Aggiunto in: v17.5.0, v16.15.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
fn
<Function> | <AsyncFunction> una funzione da chiamare su ogni chunk del flusso.data
<any> un chunk di dati dal flusso.options
<Object>signal
<AbortSignal> abortito se il flusso viene distrutto, consentendo di abortire la chiamatafn
in anticipo.
options
<Object>concurrency
<number> il massimo numero di invocazioni concorrenti difn
da chiamare sul flusso contemporaneamente. Default:1
.signal
<AbortSignal> permette di distruggere il flusso se il segnale viene abortito.
Restituisce: <Promise> una promise che restituisce
true
sefn
ha restituito un valore truthy per tutti i chunk.
Questo metodo è simile a Array.prototype.every
e chiama fn
su ogni chunk nel flusso per verificare se tutti i valori restituiti attesi sono valori truthy per fn
. Una volta che un valore restituito atteso da una chiamata fn
su un chunk è falsy, il flusso viene distrutto e la promise viene completata con false
. Se tutte le chiamate fn
sui chunk restituiscono un valore truthy, la promise viene completata con true
.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// Con un predicato sincrono.
await Readable.from([1, 2, 3, 4]).every(x => x > 2) // false
await Readable.from([1, 2, 3, 4]).every(x => x > 0) // true
// Con un predicato asincrono, effettuando al massimo 2 controlli file contemporaneamente.
const allBigFiles = await Readable.from(['file1', 'file2', 'file3']).every(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
// `true` se tutti i file nell'elenco sono più grandi di 1MiB
console.log(allBigFiles)
console.log('fatto') // Il flusso è terminato
readable.flatMap(fn[, options])
Aggiunto in: v17.5.0, v16.15.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
fn
<Function> | <AsyncGeneratorFunction> | <AsyncFunction> una funzione per mappare ogni chunk nel flusso.data
<any> un chunk di dati dal flusso.options
<Object>signal
<AbortSignal> abortito se il flusso viene distrutto, consentendo di abortire la chiamatafn
in anticipo.
options
<Object>concurrency
<number> il numero massimo di invocazioni concorrenti difn
da chiamare sul flusso contemporaneamente. Default:1
.signal
<AbortSignal> consente di distruggere il flusso se il segnale viene abortito.
Restituisce: <Readable> un flusso flat-mappato con la funzione
fn
.
Questo metodo restituisce un nuovo flusso applicando la callback data a ciascun chunk del flusso e quindi appiattendone il risultato.
È possibile restituire un flusso o un altro iterable o async iterable da fn
e i flussi risultanti verranno uniti (appiattiti) nel flusso restituito.
import { Readable } from 'node:stream'
import { createReadStream } from 'node:fs'
// Con un mapper sincrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap(x => [x, x])) {
console.log(chunk) // 1, 1, 2, 2, 3, 3, 4, 4
}
// Con un mapper asincrono, combina il contenuto di 4 file
const concatResult = Readable.from(['./1.mjs', './2.mjs', './3.mjs', './4.mjs']).flatMap(fileName =>
createReadStream(fileName)
)
for await (const result of concatResult) {
// Questo conterrà il contenuto (tutti i chunk) di tutti e 4 i file
console.log(result)
}
readable.drop(limit[, options])
Aggiunto in: v17.5.0, v16.15.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
limit
<number> il numero di chunk da rimuovere dal readable.options
<Object>signal
<AbortSignal> permette di distruggere lo stream se il segnale viene interrotto.
Restituisce: <Readable> uno stream con
limit
chunk rimossi.
Questo metodo restituisce un nuovo stream con i primi limit
chunk rimossi.
import { Readable } from 'node:stream'
await Readable.from([1, 2, 3, 4]).drop(2).toArray() // [3, 4]
readable.take(limit[, options])
Aggiunto in: v17.5.0, v16.15.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
limit
<number> il numero di chunk da prendere dal readable.options
<Object>signal
<AbortSignal> permette di distruggere lo stream se il segnale viene interrotto.
Restituisce: <Readable> uno stream con
limit
chunk presi.
Questo metodo restituisce un nuovo stream con i primi limit
chunk.
import { Readable } from 'node:stream'
await Readable.from([1, 2, 3, 4]).take(2).toArray() // [1, 2]
readable.reduce(fn[, initial[, options]])
Aggiunto in: v17.5.0, v16.15.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
fn
<Function> | <AsyncFunction> una funzione di riduzione da chiamare su ogni chunk nello stream.previous
<any> il valore ottenuto dall'ultima chiamata afn
o il valoreinitial
se specificato o il primo chunk dello stream altrimenti.data
<any> un chunk di dati dallo stream.options
<Object>signal
<AbortSignal> interrotto se lo stream viene distrutto, permettendo di interrompere precocemente la chiamatafn
.
initial
<any> il valore iniziale da usare nella riduzione.options
<Object>signal
<AbortSignal> permette di distruggere lo stream se il segnale viene interrotto.
Restituisce: <Promise> una promise per il valore finale della riduzione.
Questo metodo chiama fn
su ogni chunk dello stream in ordine, passandogli il risultato del calcolo sull'elemento precedente. Restituisce una promise per il valore finale della riduzione.
Se non viene fornito alcun valore initial
, il primo chunk dello stream viene usato come valore iniziale. Se lo stream è vuoto, la promise viene rifiutata con un TypeError
con la proprietà del codice ERR_INVALID_ARGS
.
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'
const directoryPath = './src'
const filesInDir = await readdir(directoryPath)
const folderSize = await Readable.from(filesInDir).reduce(async (totalSize, file) => {
const { size } = await stat(join(directoryPath, file))
return totalSize + size
}, 0)
console.log(folderSize)
La funzione di riduzione itera l'elemento dello stream uno per uno, il che significa che non c'è alcun parametro concurrency
o parallelismo. Per eseguire una reduce
in modo concorrente, è possibile estrarre la funzione async nel metodo readable.map
.
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'
const directoryPath = './src'
const filesInDir = await readdir(directoryPath)
const folderSize = await Readable.from(filesInDir)
.map(file => stat(join(directoryPath, file)), { concurrency: 2 })
.reduce((totalSize, { size }) => totalSize + size, 0)
console.log(folderSize)
Stream Duplex e Transform
Classe: stream.Duplex
[Cronologia]
Versione | Modifiche |
---|---|
v6.8.0 | Le istanze di Duplex ora restituiscono true quando si verifica instanceof stream.Writable . |
v0.9.4 | Aggiunto in: v0.9.4 |
Gli stream Duplex sono stream che implementano sia l'interfaccia Readable
che Writable
.
Esempi di stream Duplex
includono:
duplex.allowHalfOpen
Aggiunto in: v0.9.4
Se false
, lo stream terminerà automaticamente il lato scrivibile quando termina il lato leggibile. Impostato inizialmente dall'opzione del costruttore allowHalfOpen
, che di default è true
.
Questo può essere modificato manualmente per cambiare il comportamento half-open di un'istanza di stream Duplex
esistente, ma deve essere modificato prima che venga emesso l'evento 'end'
.
Classe: stream.Transform
Aggiunto in: v0.9.4
Gli stream Transform sono stream Duplex
in cui l'output è in qualche modo correlato all'input. Come tutti gli stream Duplex
, gli stream Transform
implementano sia l'interfaccia Readable
che Writable
.
Esempi di stream Transform
includono:
transform.destroy([error])
[Cronologia]
Versione | Modifiche |
---|---|
v14.0.0 | Funziona come no-op su uno stream che è già stato distrutto. |
v8.0.0 | Aggiunto in: v8.0.0 |
Distrugge lo stream e, facoltativamente, emette un evento 'error'
. Dopo questa chiamata, lo stream di trasformazione rilascerà tutte le risorse interne. Gli implementatori non dovrebbero sovrascrivere questo metodo, ma invece implementare readable._destroy()
. L'implementazione predefinita di _destroy()
per Transform
emette anche 'close'
a meno che emitClose
non sia impostato su false.
Una volta che destroy()
è stato chiamato, ulteriori chiamate saranno un no-op e non saranno emessi ulteriori errori, eccetto quelli provenienti da _destroy()
, come 'error'
.
stream.duplexPair([options])
Aggiunto in: v22.6.0, v20.17.0
options
<Object> Un valore da passare a entrambi i costruttoriDuplex
, per impostare opzioni come il buffering.- Restituisce: <Array> di due istanze
Duplex
.
La funzione di utilità duplexPair
restituisce un Array con due elementi, ognuno dei quali è un flusso Duplex
connesso all'altro lato:
const [sideA, sideB] = duplexPair()
Qualsiasi cosa venga scritta in un flusso diventa leggibile nell'altro. Fornisce un comportamento analogo a una connessione di rete, in cui i dati scritti dal client diventano leggibili dal server e viceversa.
I flussi Duplex sono simmetrici; uno o l'altro può essere utilizzato senza alcuna differenza di comportamento.
stream.finished(stream[, options], callback)
[Cronologia]
Versione | Modifiche |
---|---|
v19.5.0 | Aggiunto supporto per ReadableStream e WritableStream . |
v15.11.0 | È stata aggiunta l'opzione signal . |
v14.0.0 | finished(stream, cb) attenderà l'evento 'close' prima di invocare la callback. L'implementazione tenta di rilevare i flussi legacy e applica questo comportamento solo ai flussi che si prevede emettano 'close' . |
v14.0.0 | L'emissione di 'close' prima di 'end' su un flusso Readable causerà un errore ERR_STREAM_PREMATURE_CLOSE . |
v14.0.0 | La callback verrà invocata sui flussi che hanno già terminato prima della chiamata a finished(stream, cb) . |
v10.0.0 | Aggiunto in: v10.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> Un flusso/webstream leggibile e/o scrivibile.options
<Object>error
<boolean> Se impostato sufalse
, una chiamata aemit('error', err)
non è trattata come terminata. Default:true
.readable
<boolean> Quando impostato sufalse
, la callback verrà chiamata quando il flusso termina anche se il flusso potrebbe essere ancora leggibile. Default:true
.writable
<boolean> Quando impostato sufalse
, la callback verrà chiamata quando il flusso termina anche se il flusso potrebbe essere ancora scrivibile. Default:true
.signal
<AbortSignal> permette di interrompere l'attesa della fine del flusso. Il flusso sottostante non verrà interrotto se il segnale viene interrotto. La callback verrà chiamata con unAbortError
. Tutti gli ascoltatori registrati aggiunti da questa funzione verranno rimossi.
callback
<Function> Una funzione di callback che accetta un argomento di errore opzionale.Restituisce: <Function> Una funzione di pulizia che rimuove tutti gli ascoltatori registrati.
Una funzione per ricevere una notifica quando un flusso non è più leggibile, scrivibile o ha riscontrato un errore o un evento di chiusura prematura.
const { finished } = require('node:stream')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
finished(rs, err => {
if (err) {
console.error('Stream fallito.', err)
} else {
console.log('Stream ha finito di leggere.')
}
})
rs.resume() // Svuota il flusso.
Particolarmente utile negli scenari di gestione degli errori in cui un flusso viene distrutto prematuramente (come una richiesta HTTP interrotta) e non emetterà 'end'
o 'finish'
.
L'API finished
fornisce una versione promise.
stream.finished()
lascia gli ascoltatori di eventi in sospeso (in particolare 'error'
, 'end'
, 'finish'
e 'close'
) dopo che la callback
è stata invocata. La ragione di ciò è che eventi 'error'
inaspettati (a causa di implementazioni di flusso errate) non causano arresti anomali imprevisti. Se questo è un comportamento indesiderato, è necessario invocare la funzione di pulizia restituita nella callback:
const cleanup = finished(rs, err => {
cleanup()
// ...
})
stream.pipeline(source[, ...transforms], destination, callback)
stream.pipeline(streams, callback)
[Cronologia]
Versione | Modifiche |
---|---|
v19.7.0, v18.16.0 | Aggiunto supporto per webstreams. |
v18.0.0 | Il passaggio di un callback non valido all'argomento callback ora genera ERR_INVALID_ARG_TYPE invece di ERR_INVALID_CALLBACK . |
v14.0.0 | pipeline(..., cb) attenderà l'evento 'close' prima di invocare il callback. L'implementazione tenta di rilevare i flussi legacy e applica questo comportamento solo ai flussi che si prevede emettano 'close' . |
v13.10.0 | Aggiunto supporto per i generatori asincroni. |
v10.0.0 | Aggiunto in: v10.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function> | <ReadableStream>- Restituisce: <Iterable> | <AsyncIterable>
...transforms
<Stream> | <Function> | <TransformStream>source
<AsyncIterable>- Restituisce: <AsyncIterable>
destination
<Stream> | <Function> | <WritableStream>source
<AsyncIterable>- Restituisce: <AsyncIterable> | <Promise>
callback
<Function> Chiamato quando la pipeline è completamente terminata.err
<Error>val
Valore risolto diPromise
restituito dadestination
.
Restituisce: <Stream>
Un metodo del modulo per effettuare il piping tra flussi e generatori, inoltrando gli errori e pulendo correttamente, e fornendo un callback al completamento della pipeline.
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// Utilizzare l'API pipeline per collegare facilmente una serie di flussi
// insieme e ricevere una notifica quando la pipeline è completamente terminata.
// Una pipeline per comprimere in gzip un file tar potenzialmente enorme in modo efficiente:
pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
if (err) {
console.error('Pipeline fallita.', err)
} else {
console.log('Pipeline completata.')
}
})
L'API pipeline
fornisce una versione promise.
stream.pipeline()
chiamerà stream.destroy(err)
su tutti i flussi tranne:
- Flussi
Readable
che hanno emesso'end'
o'close'
. - Flussi
Writable
che hanno emesso'finish'
o'close'
.
stream.pipeline()
lascia listener di eventi in sospeso sui flussi dopo che il callback
è stato invocato. Nel caso di riutilizzo dei flussi dopo un errore, questo può causare perdite di listener di eventi ed errori soppressi. Se l'ultimo flusso è leggibile, i listener di eventi in sospeso verranno rimossi in modo che l'ultimo flusso possa essere consumato in seguito.
stream.pipeline()
chiude tutti i flussi quando viene sollevato un errore. L'utilizzo di IncomingRequest
con pipeline
potrebbe portare a un comportamento inaspettato una volta che distrugge il socket senza inviare la risposta prevista. Vedi l'esempio seguente:
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt')
pipeline(fileStream, res, err => {
if (err) {
console.log(err) // File non trovato
// questo messaggio non può essere inviato una volta che `pipeline` ha già distrutto il socket
return res.end('error!!!')
}
})
})
stream.compose(...streams)
[Cronologia]
Versione | Modifiche |
---|---|
v21.1.0, v20.10.0 | Aggiunto supporto per la classe stream. |
v19.8.0, v18.16.0 | Aggiunto supporto per webstreams. |
v16.9.0 | Aggiunto in: v16.9.0 |
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - stream.compose
è sperimentale.
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]> | <Duplex[]> | <Function>- Restituisce: <stream.Duplex>
Combina due o più stream in un flusso Duplex
che scrive sul primo stream e legge dall'ultimo. Ogni stream fornito viene inserito nel successivo, usando stream.pipeline
. Se uno qualsiasi degli stream genera un errore, tutti vengono distrutti, incluso lo stream Duplex
esterno.
Poiché stream.compose
restituisce un nuovo stream che a sua volta può (e dovrebbe) essere inserito in altri stream, abilita la composizione. Al contrario, quando si passano stream a stream.pipeline
, in genere il primo stream è un flusso leggibile e l'ultimo un flusso scrivibile, formando un circuito chiuso.
Se viene passata una Function
, questa deve essere un metodo factory che accetta un source
Iterable
.
import { compose, Transform } from 'node:stream'
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''))
},
})
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
}
let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf
}
console.log(res) // stampa 'HELLOWORLD'
stream.compose
può essere usato per convertire iterabili asincroni, generatori e funzioni in stream.
AsyncIterable
si converte in unDuplex
leggibile. Non può generarenull
.AsyncGeneratorFunction
si converte in unDuplex
trasformante leggibile/scrivibile. Deve accettare un sourceAsyncIterable
come primo parametro. Non può generarenull
.AsyncFunction
si converte in unDuplex
scrivibile. Deve restituirenull
oundefined
.
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'
// Converte AsyncIterable in Duplex leggibile.
const s1 = compose(
(async function* () {
yield 'Hello'
yield 'World'
})()
)
// Converte AsyncGenerator in Duplex trasformante.
const s2 = compose(async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
})
let res = ''
// Converte AsyncFunction in Duplex scrivibile.
const s3 = compose(async function (source) {
for await (const chunk of source) {
res += chunk
}
})
await finished(compose(s1, s2, s3))
console.log(res) // stampa 'HELLOWORLD'
Vedi readable.compose(stream)
per stream.compose
come operatore.
stream.Readable.from(iterable[, options])
Aggiunto in: v12.3.0, v10.17.0
iterable
<Iterable> Oggetto che implementa il protocollo iterabileSymbol.asyncIterator
oSymbol.iterator
. Emette un evento 'error' se viene passato un valore null.options
<Object> Opzioni fornite anew stream.Readable([options])
. Per impostazione predefinita,Readable.from()
imposteràoptions.objectMode
sutrue
, a meno che ciò non venga esplicitamente escluso impostandooptions.objectMode
sufalse
.- Restituisce: <stream.Readable>
Un metodo di utilità per creare stream leggibili da iteratori.
const { Readable } = require('node:stream')
async function* generate() {
yield 'hello'
yield 'streams'
}
const readable = Readable.from(generate())
readable.on('data', chunk => {
console.log(chunk)
})
La chiamata Readable.from(string)
o Readable.from(buffer)
non itererà le stringhe o i buffer per motivi di prestazioni.
Se viene passato come argomento un oggetto Iterable
contenente promise, potrebbe verificarsi un'eccezione non gestita.
const { Readable } = require('node:stream')
Readable.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Eccezione non gestita
])
stream.Readable.fromWeb(readableStream[, options])
Aggiunto in: v17.0.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
readableStream
<ReadableStream>options
<Object>encoding
<string>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
Restituisce: <stream.Readable>
stream.Readable.isDisturbed(stream)
Aggiunto in: v16.8.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
stream
<stream.Readable> | <ReadableStream>- Restituisce:
boolean
Restituisce se il flusso è stato letto o annullato.
stream.isErrored(stream)
Aggiunto in: v17.3.0, v16.14.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
stream
<Readable> | <Writable> | <Duplex> | <WritableStream> | <ReadableStream>- Restituisce: <boolean>
Restituisce se il flusso ha incontrato un errore.
stream.isReadable(stream)
Aggiunto in: v17.4.0, v16.14.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
stream
<Readable> | <Duplex> | <ReadableStream>- Restituisce: <boolean>
Restituisce se il flusso è leggibile.
stream.Readable.toWeb(streamReadable[, options])
Aggiunto in: v17.0.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
streamReadable
<stream.Readable>options
<Object>strategy
<Object>highWaterMark
<number> La dimensione massima della coda interna (delReadableStream
creato) prima che venga applicata la contro-pressione nella lettura dal datostream.Readable
. Se non viene fornito alcun valore, verrà preso dal datostream.Readable
.size
<Function> Una funzione che indica la dimensione del dato chunk di dati. Se non viene fornito alcun valore, la dimensione sarà1
per tutti i chunk.chunk
<any>- Restituisce: <number>
Restituisce: <ReadableStream>
stream.Writable.fromWeb(writableStream[, options])
Aggiunto in: v17.0.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
writableStream
<WritableStream>options
<Object>decodeStrings
<boolean>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
Restituisce: <stream.Writable>
stream.Writable.toWeb(streamWritable)
Aggiunto in: v17.0.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
streamWritable
<stream.Writable>- Restituisce: <WritableStream>
stream.Duplex.from(src)
[Cronologia]
Versione | Modifiche |
---|---|
v19.5.0, v18.17.0 | L'argomento src può ora essere un ReadableStream o un WritableStream . |
v16.8.0 | Aggiunto in: v16.8.0 |
src
<Stream> | <Blob> | <ArrayBuffer> | <string> | <Iterable> | <AsyncIterable> | <AsyncGeneratorFunction> | <AsyncFunction> | <Promise> | <Object> | <ReadableStream> | <WritableStream>
Un metodo di utilità per la creazione di stream duplex.
Stream
converte lo stream scrivibile inDuplex
scrivibile e lo stream leggibile inDuplex
.Blob
converte inDuplex
leggibile.string
converte inDuplex
leggibile.ArrayBuffer
converte inDuplex
leggibile.AsyncIterable
converte in unDuplex
leggibile. Non può restituirenull
.AsyncGeneratorFunction
converte in unDuplex
di trasformazione leggibile/scrivibile. Deve accettare unAsyncIterable
di origine come primo parametro. Non può restituirenull
.AsyncFunction
converte in unDuplex
scrivibile. Deve restituirenull
oundefined
.Object ({ writable, readable })
convertereadable
ewritable
inStream
e poi li combina inDuplex
doveDuplex
scriverà suwritable
e leggerà dareadable
.Promise
converte inDuplex
leggibile. Il valorenull
viene ignorato.ReadableStream
converte inDuplex
leggibile.WritableStream
converte inDuplex
scrivibile.- Restituisce: <stream.Duplex>
Se viene passato come argomento un oggetto Iterable
contenente promesse, potrebbe causare un'eccezione non gestita.
const { Duplex } = require('node:stream')
Duplex.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Eccezione non gestita
])
stream.Duplex.fromWeb(pair[, options])
Aggiunto in: v17.0.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
pair
<Object>readable
<ReadableStream>writable
<WritableStream>
options
<Object>Restituisce: <stream.Duplex>
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
for await (const chunk of duplex) {
console.log('readable', chunk)
}
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))
stream.Duplex.toWeb(streamDuplex)
Aggiunto in: v17.0.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
streamDuplex
<stream.Duplex>- Restituisce: <Object>
readable
<ReadableStream>writable
<WritableStream>
import { Duplex } from 'node:stream'
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
const { value } = await readable.getReader().read()
console.log('readable', value)
const { Duplex } = require('node:stream')
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
readable
.getReader()
.read()
.then(result => {
console.log('readable', result.value)
})
stream.addAbortSignal(signal, stream)
[Cronologia]
Versione | Modifiche |
---|---|
v19.7.0, v18.16.0 | Aggiunto supporto per ReadableStream e WritableStream . |
v15.4.0 | Aggiunto in: v15.4.0 |
signal
<AbortSignal> Un segnale che rappresenta una possibile cancellazionestream
<Stream> | <ReadableStream> | <WritableStream> Un flusso a cui allegare un segnale.
Allega un AbortSignal a un flusso leggibile o scrivibile. Questo permette al codice di controllare la distruzione del flusso usando un AbortController
.
Chiamare abort
sull' AbortController
corrispondente all' AbortSignal
passato si comporterà nello stesso modo di chiamare .destroy(new AbortError())
sul flusso, e controller.error(new AbortError())
per i webstreams.
const fs = require('node:fs')
const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// Successivamente, interrompe l'operazione chiudendo il flusso
controller.abort()
Oppure usando un AbortSignal
con un flusso leggibile come iterabile asincrono:
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // imposta un timeout
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
try {
for await (const chunk of stream) {
await process(chunk)
}
} catch (e) {
if (e.name === 'AbortError') {
// L'operazione è stata annullata
} else {
throw e
}
}
})()
Oppure usando un AbortSignal
con un ReadableStream:
const controller = new AbortController()
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello')
controller.enqueue('world')
controller.close()
},
})
addAbortSignal(controller.signal, rs)
finished(rs, err => {
if (err) {
if (err.name === 'AbortError') {
// L'operazione è stata annullata
}
}
})
const reader = rs.getReader()
reader.read().then(({ value, done }) => {
console.log(value) // hello
console.log(done) // false
controller.abort()
})
stream.getDefaultHighWaterMark(objectMode)
Aggiunto in: v19.9.0, v18.17.0
Restituisce il valore highWaterMark
predefinito utilizzato dagli stream. Il valore predefinito è 65536
(64 KiB), oppure 16
per objectMode
.
stream.setDefaultHighWaterMark(objectMode, value)
Aggiunto in: v19.9.0, v18.17.0
Imposta il valore highWaterMark
predefinito utilizzato dagli stream.
API per gli sviluppatori di stream
L'API del modulo node:stream
è stata progettata per rendere semplice l'implementazione di stream utilizzando il modello di ereditarietà prototipale di JavaScript.
Innanzitutto, uno sviluppatore di stream dichiarerebbe una nuova classe JavaScript che estende una delle quattro classi di stream di base (stream.Writable
, stream.Readable
, stream.Duplex
o stream.Transform
), assicurandosi di chiamare il costruttore della classe padre appropriato:
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark })
// ...
}
}
Quando si estendono gli stream, tenere presente quali opzioni l'utente può e deve fornire prima di inoltrarle al costruttore di base. Ad esempio, se l'implementazione fa ipotesi riguardo alle opzioni autoDestroy
ed emitClose
, non consentire all'utente di sovrascriverle. Essere espliciti su quali opzioni vengono inoltrate invece di inoltrare implicitamente tutte le opzioni.
La nuova classe stream deve quindi implementare uno o più metodi specifici, a seconda del tipo di stream che viene creato, come dettagliato nella tabella seguente:
Caso d'uso | Classe | Metodo(i) da implementare |
---|---|---|
Sola lettura | Readable | _read() |
Sola scrittura | Writable | _write() , _writev() , _final() |
Lettura e scrittura | Duplex | _read() , _write() , _writev() , _final() |
Operare sui dati scritti, quindi leggere il risultato | Transform | _transform() , _flush() , _final() |
Il codice di implementazione di uno stream non deve mai chiamare i metodi "pubblici" di uno stream che sono destinati all'uso da parte dei consumatori (come descritto nella sezione API per i consumatori di stream). In tal caso, potrebbero verificarsi effetti collaterali negativi nel codice dell'applicazione che utilizza lo stream.
Evitare di sovrascrivere metodi pubblici come write()
, end()
, cork()
, uncork()
, read()
e destroy()
, o di emettere eventi interni come 'error'
, 'data'
, 'end'
, 'finish'
e 'close'
tramite .emit()
. In tal caso, si potrebbero violare gli invarianti correnti e futuri dello stream, causando problemi di comportamento e/o compatibilità con altri stream, utilità di stream e aspettative dell'utente.
Costruzione semplificata
Aggiunto in: v1.2.0
Per molti casi semplici, è possibile creare un flusso senza ricorrere all'ereditarietà. Questo può essere ottenuto creando direttamente istanze degli oggetti stream.Writable
, stream.Readable
, stream.Duplex
o stream.Transform
e passando metodi appropriati come opzioni del costruttore.
const { Writable } = require('node:stream')
const myWritable = new Writable({
construct(callback) {
// Inizializza lo stato e carica le risorse...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Libera le risorse...
},
})
Implementare un flusso scrivibile
La classe stream.Writable
viene estesa per implementare un flusso Writable
.
I flussi Writable
personalizzati devono chiamare il costruttore new stream.Writable([options])
e implementare il metodo writable._write()
e/o writable._writev()
.
new stream.Writable([options])
[Cronologia]
Versione | Modifiche |
---|---|
v22.0.0 | Aumenta il valore predefinito di highWaterMark. |
v15.5.0 | Supporta il passaggio di un AbortSignal. |
v14.0.0 | Cambia il valore predefinito dell'opzione autoDestroy in true . |
v11.2.0, v10.16.0 | Aggiunge l'opzione autoDestroy per distruggere automaticamente il flusso quando emette 'finish' o errori. |
v10.0.0 | Aggiunge l'opzione emitClose per specificare se 'close' viene emesso alla distruzione. |
options
<Oggetto>highWaterMark
<numero> Livello del buffer quandostream.write()
inizia a restituirefalse
. Predefinito:65536
(64 KiB), o16
per i flussiobjectMode
.decodeStrings
<booleano> Se codificare le stringhe passate astream.write()
inBuffer
(con la codifica specificata nella chiamatastream.write()
) prima di passarle astream._write()
. Altri tipi di dati non vengono convertiti (ad esempio, iBuffer
non vengono decodificati in stringhe). L'impostazione su false impedirà la conversione delle stringhe. Predefinito:true
.defaultEncoding
<stringa> La codifica predefinita utilizzata quando nessuna codifica è specificata come argomento astream.write()
. Predefinito:'utf8'
.objectMode
<booleano> Sestream.write(anyObj)
è un'operazione valida. Quando impostato, diventa possibile scrivere valori JavaScript diversi da stringa, <Buffer>, <TypedArray> o <DataView> se supportato dall'implementazione del flusso. Predefinito:false
.emitClose
<booleano> Se il flusso dovrebbe emettere'close'
dopo essere stato distrutto. Predefinito:true
.write
<Funzione> Implementazione per il metodostream._write()
.writev
<Funzione> Implementazione per il metodostream._writev()
.destroy
<Funzione> Implementazione per il metodostream._destroy()
.final
<Funzione> Implementazione per il metodostream._final()
.construct
<Funzione> Implementazione per il metodostream._construct()
.autoDestroy
<booleano> Se questo flusso dovrebbe chiamare automaticamente.destroy()
su se stesso dopo la fine. Predefinito:true
.signal
<AbortSignal> Un segnale che rappresenta una possibile cancellazione.
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor(options) {
// Chiama il costruttore stream.Writable().
super(options)
// ...
}
}
Oppure, quando si utilizzano costruttori in stile pre-ES6:
const { Writable } = require('node:stream')
const util = require('node:util')
function MyWritable(options) {
if (!(this instanceof MyWritable)) return new MyWritable(options)
Writable.call(this, options)
}
util.inherits(MyWritable, Writable)
Oppure, usando l'approccio del costruttore semplificato:
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
})
Chiamare abort
sull'oggetto AbortController
corrispondente all'oggetto AbortSignal
passato si comporterà allo stesso modo di chiamare .destroy(new AbortError())
sul flusso scrivibile.
const { Writable } = require('node:stream')
const controller = new AbortController()
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
})
// Più tardi, interrompi l'operazione chiudendo il flusso
controller.abort()
writable._construct(callback)
Aggiunto in: v15.0.0
callback
<Function> Chiama questa funzione (facoltativamente con un argomento di errore) quando lo stream ha terminato l'inizializzazione.
Il metodo _construct()
NON DEVE essere chiamato direttamente. Può essere implementato dalle classi figlie e, in tal caso, verrà chiamato solo dai metodi interni della classe Writable
.
Questa funzione opzionale verrà chiamata in un tick dopo che il costruttore dello stream è ritornato, ritardando qualsiasi chiamata a _write()
, _final()
e _destroy()
fino a quando callback
non viene chiamato. Questo è utile per inizializzare lo stato o inizializzare asincronamente le risorse prima che lo stream possa essere utilizzato.
const { Writable } = require('node:stream')
const fs = require('node:fs')
class WriteStream extends Writable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, 'w', (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback)
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
writable._write(chunk, encoding, callback)
[Cronologia]
Versione | Modifiche |
---|---|
v12.11.0 | _write() è facoltativo quando si fornisce _writev(). |
chunk
<Buffer> | <string> | <any> IlBuffer
da scrivere, convertito dallastring
passata astream.write()
. Se l'opzionedecodeStrings
dello stream èfalse
o lo stream sta operando in modalità oggetto, il chunk non verrà convertito e sarà ciò che è stato passato astream.write()
.encoding
<string> Se il chunk è una stringa, alloraencoding
è la codifica dei caratteri di quella stringa. Se il chunk è unBuffer
, o se lo stream sta operando in modalità oggetto,encoding
può essere ignorato.callback
<Function> Chiama questa funzione (facoltativamente con un argomento di errore) quando l'elaborazione è completa per il chunk fornito.
Tutte le implementazioni dello stream Writable
devono fornire un metodo writable._write()
e/o writable._writev()
per inviare dati alla risorsa sottostante.
Gli stream Transform
forniscono la propria implementazione di writable._write()
.
Questa funzione NON DEVE essere chiamata direttamente dal codice dell'applicazione. Dovrebbe essere implementata dalle classi figlie e chiamata solo dai metodi interni della classe Writable
.
La funzione callback
deve essere chiamata sincronicamente all'interno di writable._write()
o asincronicamente (cioè tick diverso) per segnalare che la scrittura è stata completata correttamente o è fallita con un errore. Il primo argomento passato al callback
deve essere l'oggetto Error
se la chiamata è fallita o null
se la scrittura è riuscita.
Tutte le chiamate a writable.write()
che si verificano tra il momento in cui viene chiamato writable._write()
e il momento in cui viene chiamato callback
faranno sì che i dati scritti vengano memorizzati nel buffer. Quando viene richiamato callback
, lo stream potrebbe emettere un evento 'drain'
. Se un'implementazione di stream è in grado di elaborare più chunk di dati contemporaneamente, il metodo writable._writev()
dovrebbe essere implementato.
Se la proprietà decodeStrings
è impostata esplicitamente su false
nelle opzioni del costruttore, allora chunk
rimarrà lo stesso oggetto passato a .write()
, e potrebbe essere una stringa piuttosto che un Buffer
. Questo per supportare implementazioni che hanno una gestione ottimizzata per alcune codifiche di dati stringa. In tal caso, l'argomento encoding
indicherà la codifica dei caratteri della stringa. Altrimenti, l'argomento encoding
può essere ignorato in sicurezza.
Il metodo writable._write()
è preceduto da un trattino basso perché è interno alla classe che lo definisce e non dovrebbe mai essere chiamato direttamente dai programmi utente.
writable._writev(chunks, callback)
chunks
<Object[]> I dati da scrivere. Il valore è un array di <Object> che rappresentano ciascuno un blocco discreto di dati da scrivere. Le proprietà di questi oggetti sono:chunk
<Buffer> | <string> Un'istanza di buffer o una stringa contenente i dati da scrivere.chunk
sarà una stringa seWritable
è stato creato con l'opzionedecodeStrings
impostata sufalse
ed è stata passata una stringa awrite()
.encoding
<string> La codifica a caratteri dichunk
. Sechunk
è unBuffer
,encoding
sarà'buffer'
.
callback
<Function> Una funzione di callback (facoltativamente con un argomento di errore) da invocare al completamento dell'elaborazione per i blocchi forniti.
Questa funzione NON DEVE essere chiamata direttamente dal codice dell'applicazione. Dovrebbe essere implementata dalle classi figlio e chiamata solo dai metodi interni della classe Writable
.
Il metodo writable._writev()
può essere implementato in aggiunta o in alternativa a writable._write()
nelle implementazioni di stream in grado di elaborare più blocchi di dati contemporaneamente. Se implementato e se ci sono dati in buffer da scritture precedenti, verrà chiamato _writev()
invece di _write()
.
Il metodo writable._writev()
è prefissato con un trattino basso perché è interno alla classe che lo definisce e non dovrebbe mai essere chiamato direttamente dai programmi utente.
writable._destroy(err, callback)
Aggiunto in: v8.0.0
err
<Error> Un possibile errore.callback
<Function> Una funzione di callback che accetta un argomento di errore facoltativo.
Il metodo _destroy()
viene chiamato da writable.destroy()
. Può essere sovrascritto dalle classi figlio, ma non deve essere chiamato direttamente.
writable._final(callback)
Aggiunto in: v8.0.0
callback
<Function> Chiama questa funzione (facoltativamente con un argomento di errore) al termine della scrittura di tutti i dati rimanenti.
Il metodo _final()
non deve essere chiamato direttamente. Può essere implementato dalle classi figlio e, in tal caso, verrà chiamato solo dai metodi interni della classe Writable
.
Questa funzione facoltativa verrà chiamata prima della chiusura del flusso, ritardando l'evento 'finish'
fino a quando callback
non viene chiamato. Questo è utile per chiudere le risorse o scrivere i dati in buffer prima della fine di un flusso.
Errori durante la scrittura
Gli errori che si verificano durante l'elaborazione dei metodi writable._write()
, writable._writev()
e writable._final()
devono essere propagati invocando la callback e passando l'errore come primo argomento. Generare un Error
all'interno di questi metodi o emettere manualmente un evento 'error'
comporta un comportamento indefinito.
Se un flusso Readable
viene inserito in un flusso Writable
quando Writable
emette un errore, il flusso Readable
verrà disinserito.
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
},
})
Un esempio di flusso scrivibile
Di seguito è illustrata un'implementazione di flusso Writable
personalizzato piuttosto semplicistica (e in qualche modo inutile). Sebbene questa specifica istanza di flusso Writable
non sia di alcuna particolare utilità reale, l'esempio illustra ciascuno degli elementi necessari di un'istanza di flusso Writable
personalizzata:
const { Writable } = require('node:stream')
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
}
}
Decodifica dei buffer in uno stream scrivibile
La decodifica dei buffer è un'operazione comune, ad esempio quando si utilizzano trasformatori il cui input è una stringa. Questo non è un processo banale quando si utilizza una codifica a caratteri multi-byte, come UTF-8. L'esempio seguente mostra come decodificare stringhe multi-byte usando StringDecoder
e Writable
.
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')
class StringWritable extends Writable {
constructor(options) {
super(options)
this._decoder = new StringDecoder(options?.defaultEncoding)
this.data = ''
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk)
}
this.data += chunk
callback()
}
_final(callback) {
this.data += this._decoder.end()
callback()
}
}
const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()
w.write('currency: ')
w.write(euro[0])
w.end(euro[1])
console.log(w.data) // currency: €
Implementare uno stream leggibile
La classe stream.Readable
viene estesa per implementare uno stream Readable
.
Gli stream Readable
personalizzati devono chiamare il costruttore new stream.Readable([options])
e implementare il metodo readable._read()
.
new stream.Readable([options])
[Cronologia]
Versione | Modifiche |
---|---|
v22.0.0 | aumento del valore predefinito di highWaterMark. |
v15.5.0 | supporto per il passaggio di un AbortSignal. |
v14.0.0 | modifica del valore predefinito dell'opzione autoDestroy a true . |
v11.2.0, v10.16.0 | aggiunta dell'opzione autoDestroy per distruggere automaticamente lo stream quando emette 'end' o errori. |
options
<Oggetto>highWaterMark
<numero> Il numero massimo di byte da memorizzare nel buffer interno prima di smettere di leggere dalla risorsa sottostante. Predefinito:65536
(64 KiB), o16
per gli streamobjectMode
.encoding
<stringa> Se specificato, i buffer verranno decodificati in stringhe usando la codifica specificata. Predefinito:null
.objectMode
<booleano> Se questo stream dovrebbe comportarsi come uno stream di oggetti. Ciò significa chestream.read(n)
restituisce un singolo valore invece di unBuffer
di dimensionen
. Predefinito:false
.emitClose
<booleano> Se lo stream dovrebbe emettere'close'
dopo essere stato distrutto. Predefinito:true
.read
<Funzione> Implementazione per il metodostream._read()
.destroy
<Funzione> Implementazione per il metodostream._destroy()
.construct
<Funzione> Implementazione per il metodostream._construct()
.autoDestroy
<booleano> Se questo stream dovrebbe chiamare automaticamente.destroy()
su se stesso dopo la terminazione. Predefinito:true
.signal
<AbortSignal> Un segnale che rappresenta una possibile cancellazione.
const { Readable } = require('node:stream')
class MyReadable extends Readable {
constructor(options) {
// Chiama il costruttore stream.Readable(options).
super(options)
// ...
}
}
Oppure, quando si usano costruttori in stile pre-ES6:
const { Readable } = require('node:stream')
const util = require('node:util')
function MyReadable(options) {
if (!(this instanceof MyReadable)) return new MyReadable(options)
Readable.call(this, options)
}
util.inherits(MyReadable, Readable)
Oppure, usando l'approccio del costruttore semplificato:
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
// ...
},
})
Chiamare abort
sull' AbortController
corrispondente all' AbortSignal
passato si comporterà nello stesso modo di chiamare .destroy(new AbortError())
sul leggibile creato.
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal,
})
// Più tardi, interrompi l'operazione chiudendo lo stream
controller.abort()
readable._construct(callback)
Aggiunto in: v15.0.0
callback
<Function> Chiama questa funzione (facoltativamente con un argomento di errore) quando lo stream ha terminato l'inizializzazione.
Il metodo _construct()
NON DEVE essere chiamato direttamente. Può essere implementato dalle classi figlie e, in tal caso, sarà chiamato solo dai metodi interni della classe Readable
.
Questa funzione opzionale sarà pianificata nel prossimo tick dal costruttore dello stream, ritardando qualsiasi chiamata _read()
e _destroy()
fino a quando callback
non verrà chiamato. Questo è utile per inizializzare lo stato o inizializzare asincronicamente le risorse prima che lo stream possa essere utilizzato.
const { Readable } = require('node:stream')
const fs = require('node:fs')
class ReadStream extends Readable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_read(n) {
const buf = Buffer.alloc(n)
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err)
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
}
})
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
readable._read(size)
Aggiunto in: v0.9.4
size
<number> Numero di byte da leggere asincronicamente
Questa funzione NON DEVE essere chiamata direttamente dal codice dell'applicazione. Dovrebbe essere implementata dalle classi figlie e chiamata solo dai metodi interni della classe Readable
.
Tutte le implementazioni dello stream Readable
devono fornire un'implementazione del metodo readable._read()
per recuperare i dati dalla risorsa sottostante.
Quando viene chiamato readable._read()
, se i dati sono disponibili dalla risorsa, l'implementazione dovrebbe iniziare a inserire quei dati nella coda di lettura usando il metodo this.push(dataChunk)
. _read()
verrà chiamato di nuovo dopo ogni chiamata a this.push(dataChunk)
una volta che lo stream è pronto per accettare più dati. _read()
può continuare a leggere dalla risorsa e inserire dati finché readable.push()
non restituisce false
. Solo quando _read()
viene chiamato di nuovo dopo che si è fermato, dovrebbe riprendere a inserire dati aggiuntivi nella coda.
Una volta che il metodo readable._read()
è stato chiamato, non verrà chiamato di nuovo fino a quando non verranno inseriti più dati tramite il metodo readable.push()
. I dati vuoti come buffer e stringhe vuoti non faranno sì che readable._read()
venga chiamato.
L'argomento size
è indicativo. Per le implementazioni in cui una "lettura" è una singola operazione che restituisce dati, è possibile utilizzare l'argomento size
per determinare quanti dati recuperare. Altre implementazioni possono ignorare questo argomento e semplicemente fornire dati quando diventano disponibili. Non è necessario "attendere" fino a quando non sono disponibili size
byte prima di chiamare stream.push(chunk)
.
Il metodo readable._read()
è preceduto da un trattino basso perché è interno alla classe che lo definisce e non dovrebbe mai essere chiamato direttamente dai programmi utente.
readable._destroy(err, callback)
Aggiunto in: v8.0.0
err
<Error> Un possibile errore.callback
<Function> Una funzione di callback che accetta un argomento di errore opzionale.
Il metodo _destroy()
viene chiamato da readable.destroy()
. Può essere sovrascritto dalle classi figlie, ma non deve essere chiamato direttamente.
readable.push(chunk[, encoding])
[Cronologia]
Versione | Modifiche |
---|---|
v22.0.0, v20.13.0 | L'argomento chunk può ora essere un'istanza di TypedArray o DataView . |
v8.0.0 | L'argomento chunk può ora essere un'istanza di Uint8Array . |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Frammento di dati da inserire nella coda di lettura. Per i flussi che non operano in modalità oggetto,chunk
deve essere una <string>, <Buffer>, <TypedArray> o <DataView>. Per i flussi in modalità oggetto,chunk
può essere qualsiasi valore JavaScript.encoding
<string> Codifica dei frammenti di stringa. Deve essere una codificaBuffer
valida, come'utf8'
o'ascii'
.- Restituisce: <boolean>
true
se è possibile continuare a inserire ulteriori frammenti di dati;false
altrimenti.
Quando chunk
è un <Buffer>, <TypedArray>, <DataView> o <string>, il frammento di dati verrà aggiunto alla coda interna per i consumatori del flusso. Passando chunk
come null
si segnala la fine del flusso (EOF), dopo di che non è più possibile scrivere altri dati.
Quando il Readable
opera in modalità sospesa, i dati aggiunti con readable.push()
possono essere letti chiamando il metodo readable.read()
quando viene emesso l'evento 'readable'
.
Quando il Readable
opera in modalità di flusso, i dati aggiunti con readable.push()
verranno consegnati emettendo un evento 'data'
.
Il metodo readable.push()
è progettato per essere il più flessibile possibile. Ad esempio, quando si incapsula una sorgente di livello inferiore che fornisce una qualche forma di meccanismo di pausa/riepilogo e una callback di dati, la sorgente di livello inferiore può essere incapsulata dall'istanza Readable
personalizzata:
// `_source` è un oggetto con i metodi readStop() e readStart(),
// e un membro `ondata` che viene chiamato quando ha dati, e
// un membro `onend` che viene chiamato quando i dati sono finiti.
class SourceWrapper extends Readable {
constructor(options) {
super(options)
this._source = getLowLevelSourceObject()
// Ogni volta che ci sono dati, inserirli nel buffer interno.
this._source.ondata = chunk => {
// Se push() restituisce false, allora smetti di leggere dalla sorgente.
if (!this.push(chunk)) this._source.readStop()
}
// Quando la sorgente termina, inserisci il frammento `null` che segnala la fine del flusso.
this._source.onend = () => {
this.push(null)
}
}
// _read() verrà chiamato quando il flusso desidera estrarre più dati.
// L'argomento size consigliato viene ignorato in questo caso.
_read(size) {
this._source.readStart()
}
}
Il metodo readable.push()
viene utilizzato per inserire il contenuto nel buffer interno. Può essere guidato dal metodo readable._read()
.
Per i flussi che non operano in modalità oggetto, se il parametro chunk
di readable.push()
è undefined
, verrà trattato come una stringa o un buffer vuoto. Vedere readable.push('')
per maggiori informazioni.
Errori durante la lettura
Gli errori che si verificano durante l'elaborazione di readable._read()
devono essere propagati attraverso il metodo readable.destroy(err)
. Sollevare un Error
all'interno di readable._read()
o emettere manualmente un evento 'error'
comporta un comportamento indefinito.
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
const err = checkSomeErrorCondition()
if (err) {
this.destroy(err)
} else {
// Esegui alcune operazioni.
}
},
})
Esempio di stream di conteggio
Segue un esempio di base di uno stream Readable
che emette i numeri da 1 a 1.000.000 in ordine ascendente, e poi termina.
const { Readable } = require('node:stream')
class Counter extends Readable {
constructor(opt) {
super(opt)
this._max = 1000000
this._index = 1
}
_read() {
const i = this._index++
if (i > this._max) this.push(null)
else {
const str = String(i)
const buf = Buffer.from(str, 'ascii')
this.push(buf)
}
}
}
Implementare uno stream duplex
Uno stream Duplex
è uno stream che implementa sia Readable
che Writable
, come ad esempio una connessione socket TCP.
Poiché JavaScript non supporta l'ereditarietà multipla, la classe stream.Duplex
viene estesa per implementare uno stream Duplex
(invece di estendere le classi stream.Readable
e stream.Writable
).
La classe stream.Duplex
eredita prototipicamente da stream.Readable
e parassitariamente da stream.Writable
, ma instanceof
funzionerà correttamente per entrambe le classi di base grazie alla sovrascrittura di Symbol.hasInstance
su stream.Writable
.
Gli stream Duplex
personalizzati devono chiamare il costruttore new stream.Duplex([options])
e implementare sia il metodo readable._read()
che il metodo writable._write()
.
new stream.Duplex(options)
[Cronologia]
Versione | Modifiche |
---|---|
v8.4.0 | Le opzioni readableHighWaterMark e writableHighWaterMark sono ora supportate. |
options
<Oggetto> Passato sia ai costruttoriWritable
cheReadable
. Ha anche i seguenti campi:allowHalfOpen
<booleano> Se impostato sufalse
, il flusso terminerà automaticamente il lato scrivibile quando termina il lato leggibile. Predefinito:true
.readable
<booleano> Imposta se ilDuplex
deve essere leggibile. Predefinito:true
.writable
<booleano> Imposta se ilDuplex
deve essere scrivibile. Predefinito:true
.readableObjectMode
<booleano> ImpostaobjectMode
per il lato leggibile del flusso. Non ha effetto seobjectMode
ètrue
. Predefinito:false
.writableObjectMode
<booleano> ImpostaobjectMode
per il lato scrivibile del flusso. Non ha effetto seobjectMode
ètrue
. Predefinito:false
.readableHighWaterMark
<numero> ImpostahighWaterMark
per il lato leggibile del flusso. Non ha effetto se è fornitohighWaterMark
.writableHighWaterMark
<numero> ImpostahighWaterMark
per il lato scrivibile del flusso. Non ha effetto se è fornitohighWaterMark
.
const { Duplex } = require('node:stream')
class MyDuplex extends Duplex {
constructor(options) {
super(options)
// ...
}
}
Oppure, quando si utilizzano costruttori in stile pre-ES6:
const { Duplex } = require('node:stream')
const util = require('node:util')
function MyDuplex(options) {
if (!(this instanceof MyDuplex)) return new MyDuplex(options)
Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)
Oppure, utilizzando l'approccio del costruttore semplificato:
const { Duplex } = require('node:stream')
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
},
})
Quando si utilizza la pipeline:
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')
pipeline(
fs.createReadStream('object.json').setEncoding('utf8'),
new Transform({
decodeStrings: false, // Accetta input stringa invece di buffer
construct(callback) {
this.data = ''
callback()
},
transform(chunk, encoding, callback) {
this.data += chunk
callback()
},
flush(callback) {
try {
// Assicurarsi che sia un JSON valido.
JSON.parse(this.data)
this.push(this.data)
callback()
} catch (err) {
callback(err)
}
},
}),
fs.createWriteStream('valid-object.json'),
err => {
if (err) {
console.error('errore', err)
} else {
console.log('completato')
}
}
)
Esempio di stream duplex
L'esempio seguente illustra un semplice esempio di stream Duplex
che incapsula un ipotetico oggetto sorgente di livello inferiore in cui è possibile scrivere dati e da cui è possibile leggere dati, sebbene utilizzando un'API non compatibile con gli stream di Node.js. L'esempio seguente illustra un semplice esempio di stream Duplex
che mette in buffer i dati in ingresso scritti tramite l'interfaccia Writable
che viene letta tramite l'interfaccia Readable
.
const { Duplex } = require('node:stream')
const kSource = Symbol('source')
class MyDuplex extends Duplex {
constructor(source, options) {
super(options)
this[kSource] = source
}
_write(chunk, encoding, callback) {
// La sorgente sottostante gestisce solo le stringhe.
if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
this[kSource].writeSomeData(chunk)
callback()
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding))
})
}
}
L'aspetto più importante di uno stream Duplex
è che i lati Readable
e Writable
operano indipendentemente l'uno dall'altro nonostante coesistano all'interno di una singola istanza di oggetto.
Stream duplex in modalità oggetto
Per gli stream Duplex
, objectMode
può essere impostato esclusivamente per il lato Readable
o Writable
utilizzando rispettivamente le opzioni readableObjectMode
e writableObjectMode
.
Nell'esempio seguente, ad esempio, viene creato un nuovo stream Transform
(che è un tipo di stream Duplex
) che ha un lato Writable
in modalità oggetto che accetta numeri JavaScript che vengono convertiti in stringhe esadecimali sul lato Readable
.
const { Transform } = require('node:stream')
// Tutti gli stream Transform sono anche stream Duplex.
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// Costringe il chunk a un numero se necessario.
chunk |= 0
// Trasforma il chunk in qualcos'altro.
const data = chunk.toString(16)
// Inserisce i dati nella coda leggibile.
callback(null, '0'.repeat(data.length % 2) + data)
},
})
myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))
myTransform.write(1)
// Stampa: 01
myTransform.write(10)
// Stampa: 0a
myTransform.write(100)
// Stampa: 64
Implementare un flusso di trasformazione
Un flusso Transform
è un flusso Duplex
in cui l'output viene calcolato in qualche modo dall'input. Esempi includono flussi zlib o flussi crypto che comprimono, crittografano o decrittografano i dati.
Non è richiesto che l'output abbia le stesse dimensioni dell'input, lo stesso numero di chunk o arrivi allo stesso tempo. Ad esempio, un flusso Hash
avrà solo un singolo chunk di output fornito quando l'input è terminato. Un flusso zlib
produrrà un output che è molto più piccolo o molto più grande del suo input.
La classe stream.Transform
viene estesa per implementare un flusso Transform
.
La classe stream.Transform
eredita prototipicamente da stream.Duplex
e implementa le proprie versioni dei metodi writable._write()
e readable._read()
. Le implementazioni personalizzate di Transform
devono implementare il metodo transform._transform()
e possono anche implementare il metodo transform._flush()
.
È necessario prestare attenzione quando si utilizzano flussi Transform
in quanto i dati scritti nel flusso possono causare la sospensione del lato Writable
del flusso se l'output sul lato Readable
non viene consumato.
new stream.Transform([options])
options
<Object> Passato sia ai costruttoriWritable
cheReadable
. Ha anche i seguenti campi:transform
<Function> Implementazione per il metodostream._transform()
.flush
<Function> Implementazione per il metodostream._flush()
.
const { Transform } = require('node:stream')
class MyTransform extends Transform {
constructor(options) {
super(options)
// ...
}
}
Oppure, quando si utilizzano costruttori in stile pre-ES6:
const { Transform } = require('node:stream')
const util = require('node:util')
function MyTransform(options) {
if (!(this instanceof MyTransform)) return new MyTransform(options)
Transform.call(this, options)
}
util.inherits(MyTransform, Transform)
Oppure, utilizzando l'approccio del costruttore semplificato:
const { Transform } = require('node:stream')
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
},
})
Evento: 'end'
L'evento 'end'
proviene dalla classe stream.Readable
. L'evento 'end'
viene emesso dopo che tutti i dati sono stati elaborati, il che avviene dopo che la callback in transform._flush()
è stata chiamata. In caso di errore, 'end'
non dovrebbe essere emesso.
Evento: 'finish'
L'evento 'finish'
proviene dalla classe stream.Writable
. L'evento 'finish'
viene emesso dopo che stream.end()
è stato chiamato e tutti i chunk sono stati elaborati da stream._transform()
. In caso di errore, 'finish'
non dovrebbe essere emesso.
transform._flush(callback)
callback
<Funzione> Una funzione di callback (facoltativamente con un argomento di errore e dati) da chiamare quando i dati rimanenti sono stati svuotati.
Questa funzione NON DEVE essere chiamata direttamente dal codice dell'applicazione. Dovrebbe essere implementata dalle classi figlio e chiamata solo dai metodi interni della classe Readable
.
In alcuni casi, un'operazione di trasformazione potrebbe aver bisogno di emettere un bit aggiuntivo di dati alla fine del flusso. Ad esempio, un flusso di compressione zlib
memorizzerà una quantità di stato interno utilizzato per comprimere in modo ottimale l'output. Quando il flusso termina, tuttavia, quei dati aggiuntivi devono essere svuotati in modo che i dati compressi siano completi.
Le implementazioni personalizzate di Transform
possono implementare il metodo transform._flush()
. Questo verrà chiamato quando non ci sono più dati scritti da consumare, ma prima che l'evento 'end'
venga emesso segnalando la fine del flusso Readable
.
All'interno dell'implementazione di transform._flush()
, il metodo transform.push()
può essere chiamato zero o più volte, a seconda dei casi. La funzione callback
deve essere chiamata quando l'operazione di svuotamento è completa.
Il metodo transform._flush()
è prefissato con un underscore perché è interno alla classe che lo definisce e non dovrebbe mai essere chiamato direttamente dai programmi utente.
transform._transform(chunk, encoding, callback)
chunk
<Buffer> | <string> | <any> IlBuffer
da trasformare, convertito dallastring
passata astream.write()
. Se l'opzionedecodeStrings
dello stream èfalse
o lo stream opera in modalità oggetto, il chunk non verrà convertito e sarà quello passato astream.write()
.encoding
<string> Se il chunk è una stringa, questo è il tipo di codifica. Se il chunk è un buffer, questo è il valore speciale'buffer'
. Ignorarlo in quel caso.callback
<Function> Una funzione di callback (facoltativamente con un argomento di errore e dati) da chiamare dopo che ilchunk
fornito è stato elaborato.
Questa funzione NON DEVE essere chiamata direttamente dal codice dell'applicazione. Dovrebbe essere implementata dalle classi figlie e chiamata solo dai metodi interni della classe Readable
.
Tutte le implementazioni dello stream Transform
devono fornire un metodo _transform()
per accettare input e produrre output. L'implementazione di transform._transform()
gestisce i byte in scrittura, calcola un output, quindi passa quell'output alla parte leggibile usando il metodo transform.push()
.
Il metodo transform.push()
può essere chiamato zero o più volte per generare output da un singolo chunk di input, a seconda di quanto deve essere output come risultato del chunk.
È possibile che nessun output venga generato da un dato chunk di dati di input.
La funzione callback
deve essere chiamata solo quando il chunk corrente è completamente consumato. Il primo argomento passato al callback
deve essere un oggetto Error
se si è verificato un errore durante l'elaborazione dell'input o null
altrimenti. Se viene passato un secondo argomento al callback
, verrà inoltrato al metodo transform.push()
, ma solo se il primo argomento è falso. In altre parole, i seguenti sono equivalenti:
transform.prototype._transform = function (data, encoding, callback) {
this.push(data)
callback()
}
transform.prototype._transform = function (data, encoding, callback) {
callback(null, data)
}
Il metodo transform._transform()
è prefissato con un underscore perché è interno alla classe che lo definisce e non dovrebbe mai essere chiamato direttamente dai programmi utente.
transform._transform()
non viene mai chiamato in parallelo; gli stream implementano un meccanismo di coda e per ricevere il chunk successivo, callback
deve essere chiamato, sincronicamente o asincronicamente.
Classe: stream.PassThrough
La classe stream.PassThrough
è un'implementazione banale di uno stream Transform
che semplicemente passa i byte di input all'output. Il suo scopo è principalmente quello di fornire esempi e test, ma ci sono alcuni casi d'uso in cui stream.PassThrough
è utile come componente base per nuovi tipi di stream.
Note aggiuntive
Compatibilità degli stream con generatori asincroni e iteratori asincroni
Con il supporto di generatori e iteratori asincroni in JavaScript, i generatori asincroni sono a questo punto effettivamente una struttura di stream di primo livello a livello di linguaggio.
Di seguito sono riportati alcuni casi comuni di interoperabilità tra gli stream di Node.js e i generatori e gli iteratori asincroni.
Consumare stream leggibili con iteratori asincroni
;(async function () {
for await (const chunk of readable) {
console.log(chunk)
}
})()
Gli iteratori asincroni registrano un gestore di errori permanente sullo stream per prevenire eventuali errori non gestiti post-distruzione.
Creare stream leggibili con generatori asincroni
Uno stream leggibile di Node.js può essere creato da un generatore asincrono usando il metodo di utilità Readable.from()
:
const { Readable } = require('node:stream')
const ac = new AbortController()
const signal = ac.signal
async function* generate() {
yield 'a'
await someLongRunningFn({ signal })
yield 'b'
yield 'c'
}
const readable = Readable.from(generate())
readable.on('close', () => {
ac.abort()
})
readable.on('data', chunk => {
console.log(chunk)
})
Piping verso stream scrivibili da iteratori asincroni
Quando si scrive su uno stream scrivibile da un iteratore asincrono, assicurarsi di gestire correttamente la backpressure e gli errori. stream.pipeline()
astrae la gestione della backpressure e degli errori correlati alla backpressure:
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')
const writable = fs.createWriteStream('./file')
const ac = new AbortController()
const signal = ac.signal
const iterator = createIterator({ signal })
// Modello Callback
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err)
} else {
console.log(value, 'value returned')
}
}).on('close', () => {
ac.abort()
})
// Modello Promise
pipelinePromise(iterator, writable)
.then(value => {
console.log(value, 'value returned')
})
.catch(err => {
console.error(err)
ac.abort()
})
Compatibilità con versioni precedenti di Node.js
Prima di Node.js 0.10, l'interfaccia della stream Readable
era più semplice, ma anche meno potente e meno utile.
- Invece di attendere le chiamate al metodo
stream.read()
, gli eventi'data'
iniziavano a essere emessi immediatamente. Le applicazioni che necessitavano di eseguire una certa quantità di lavoro per decidere come gestire i dati erano obbligate a memorizzare i dati letti in buffer, in modo che i dati non andassero persi. - Il metodo
stream.pause()
era consigliativo, piuttosto che garantito. Ciò significava che era comunque necessario essere preparati a ricevere eventi'data'
anche quando la stream era in stato di pausa.
In Node.js 0.10, è stata aggiunta la classe Readable
. Per la compatibilità con i programmi Node.js precedenti, le stream Readable
passano alla "modalità flowing" quando viene aggiunto un gestore eventi 'data'
, o quando viene chiamato il metodo stream.resume()
. L'effetto è che, anche senza utilizzare il nuovo metodo stream.read()
e l'evento 'readable'
, non è più necessario preoccuparsi di perdere chunk 'data'
.
Sebbene la maggior parte delle applicazioni continui a funzionare normalmente, questo introduce un caso limite nelle seguenti condizioni:
- Non viene aggiunto alcun listener di eventi
'data'
. - Il metodo
stream.resume()
non viene mai chiamato. - La stream non viene inviata a nessuna destinazione scrivibile.
Ad esempio, considera il seguente codice:
// ATTENZIONE! NON FUNZIONA!
net
.createServer(socket => {
// Aggiungiamo un listener 'end', ma non consumiamo mai i dati.
socket.on('end', () => {
// Non arriverà mai qui.
socket.end('Il messaggio è stato ricevuto ma non è stato elaborato.\n')
})
})
.listen(1337)
Prima di Node.js 0.10, i dati del messaggio in arrivo sarebbero stati semplicemente scartati. Tuttavia, in Node.js 0.10 e successive, il socket rimane in pausa per sempre.
La soluzione in questa situazione è chiamare il metodo stream.resume()
per iniziare il flusso di dati:
// Soluzione.
net
.createServer(socket => {
socket.on('end', () => {
socket.end('Il messaggio è stato ricevuto ma non è stato elaborato.\n')
})
// Avvia il flusso di dati, scartandolo.
socket.resume()
})
.listen(1337)
Oltre al passaggio delle nuove stream Readable
alla modalità flowing, le stream in stile pre-0.10 possono essere racchiuse in una classe Readable
usando il metodo readable.wrap()
.
readable.read(0)
Ci sono alcuni casi in cui è necessario innescare un aggiornamento dei meccanismi sottostanti dello stream leggibile, senza effettivamente consumare alcun dato. In questi casi, è possibile chiamare readable.read(0)
, che restituirà sempre null
.
Se il buffer di lettura interno è inferiore a highWaterMark
, e lo stream non sta attualmente leggendo, allora chiamare stream.read(0)
innescherà una chiamata di basso livello a stream._read()
.
Sebbene la maggior parte delle applicazioni non abbia quasi mai bisogno di farlo, ci sono situazioni all'interno di Node.js in cui questo viene fatto, in particolare negli interni della classe stream Readable
.
readable.push('')
L'utilizzo di readable.push('')
non è raccomandato.
Inserire uno <string>, <Buffer>, <TypedArray> o <DataView> a zero byte in uno stream che non è in modalità oggetto ha un effetto collaterale interessante. Poiché è una chiamata a readable.push()
, la chiamata terminerà il processo di lettura. Tuttavia, poiché l'argomento è una stringa vuota, nessun dato viene aggiunto al buffer leggibile, quindi non c'è nulla che un utente possa consumare.
Discrepanza di highWaterMark
dopo aver chiamato readable.setEncoding()
L'utilizzo di readable.setEncoding()
cambierà il comportamento del funzionamento di highWaterMark
in modalità non oggetto.
In genere, la dimensione del buffer corrente viene misurata rispetto a highWaterMark
in byte. Tuttavia, dopo che setEncoding()
è stato chiamato, la funzione di confronto inizierà a misurare la dimensione del buffer in caratteri.
Questo non è un problema nei casi comuni con latin1
o ascii
. Ma si consiglia di essere consapevoli di questo comportamento quando si lavora con stringhe che potrebbero contenere caratteri multibyte.