Stream
[Stabile: 2 - Stabile]
Stabile: 2 Stabilità: 2 - Stabile
Codice sorgente: lib/stream.js
Uno stream è un'interfaccia astratta per lavorare con dati in streaming in Node.js. Il modulo node:stream
fornisce un'API per implementare l'interfaccia stream.
Ci sono molti oggetti stream forniti da Node.js. Ad esempio, una richiesta a un server HTTP e process.stdout
sono entrambe istanze di stream.
Gli stream possono essere leggibili, scrivibili o entrambi. Tutti gli stream sono istanze di EventEmitter
.
Per accedere al modulo node:stream
:
const stream = require('node:stream');
Il modulo node:stream
è utile per creare nuovi tipi di istanze stream. Di solito non è necessario utilizzare il modulo node:stream
per consumare stream.
Organizzazione di questo documento
Questo documento contiene due sezioni principali e una terza sezione per le note. La prima sezione spiega come utilizzare gli stream esistenti all'interno di un'applicazione. La seconda sezione spiega come creare nuovi tipi di stream.
Tipi di stream
Esistono quattro tipi di stream fondamentali all'interno di Node.js:
Writable
: stream in cui è possibile scrivere dati (ad esempio,fs.createWriteStream()
).Readable
: stream da cui è possibile leggere dati (ad esempio,fs.createReadStream()
).Duplex
: stream che sono siaReadable
cheWritable
(ad esempio,net.Socket
).Transform
: streamDuplex
che possono modificare o trasformare i dati mentre vengono scritti e letti (ad esempio,zlib.createDeflate()
).
Inoltre, questo modulo include le funzioni di utilità stream.duplexPair()
, stream.pipeline()
, stream.finished()
stream.Readable.from()
e stream.addAbortSignal()
.
API Streams Promises
Aggiunto in: v15.0.0
L'API stream/promises
fornisce un insieme alternativo di funzioni di utilità asincrone per gli stream che restituiscono oggetti Promise
invece di utilizzare i callback. L'API è accessibile tramite require('node:stream/promises')
o require('node:stream').promises
.
stream.pipeline(source[, ...transforms], destination[, options])
stream.pipeline(streams[, options])
[Cronologia]
Versione | Modifiche |
---|---|
v18.0.0, v17.2.0, v16.14.0 | Aggiunta l'opzione end , che può essere impostata su false per impedire la chiusura automatica dello stream di destinazione quando la sorgente termina. |
v15.0.0 | Aggiunto in: v15.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function>- Restituisce: <Promise> | <AsyncIterable>
...transforms
<Stream> | <Function>source
<AsyncIterable>- Restituisce: <Promise> | <AsyncIterable>
destination
<Stream> | <Function>source
<AsyncIterable>- Restituisce: <Promise> | <AsyncIterable>
options
<Object> Opzioni della pipelinesignal
<AbortSignal>end
<boolean> Termina lo stream di destinazione quando termina lo stream di origine. Gli stream di trasformazione terminano sempre, anche se questo valore èfalse
. Predefinito:true
.
Restituisce: <Promise> Si adempie quando la pipeline è completa.
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
Per utilizzare un AbortSignal
, passalo all'interno di un oggetto di opzioni, come ultimo argomento. Quando il segnale viene interrotto, destroy
verrà chiamato sulla pipeline sottostante, con un AbortError
.
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
const ac = new AbortController();
const signal = ac.signal;
setImmediate(() => ac.abort());
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
{ signal },
);
}
run().catch(console.error); // AbortError
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}
L'API pipeline
supporta anche i generatori async:
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
Ricorda di gestire l'argomento signal
passato al generatore async. Soprattutto nel caso in cui il generatore async è la sorgente per la pipeline (cioè il primo argomento) oppure la pipeline non verrà mai completata.
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
async function run() {
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
L'API pipeline
fornisce la versione callback:
stream.finished(stream[, options])
[Cronologia]
Versione | Modifiche |
---|---|
v19.5.0, v18.14.0 | Aggiunto il supporto per ReadableStream e WritableStream . |
v19.1.0, v18.13.0 | Aggiunta l'opzione cleanup . |
v15.0.0 | Aggiunto in: v15.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> Uno stream/webstream leggibile e/o scrivibile.options
<Object>error
<boolean> | <undefined>readable
<boolean> | <undefined>writable
<boolean> | <undefined>signal
<AbortSignal> | <undefined>cleanup
<boolean> | <undefined> Setrue
, rimuove i listener registrati da questa funzione prima che la promise sia soddisfatta. Predefinito:false
.
Restituisce: <Promise> Si completa quando lo stream non è più leggibile o scrivibile.
const { finished } = require('node:stream/promises');
const fs = require('node:fs');
const rs = fs.createReadStream('archive.tar');
async function run() {
await finished(rs);
console.log('Lo stream ha terminato la lettura.');
}
run().catch(console.error);
rs.resume(); // Scarica lo stream.
import { finished } from 'node:stream/promises';
import { createReadStream } from 'node:fs';
const rs = createReadStream('archive.tar');
async function run() {
await finished(rs);
console.log('Lo stream ha terminato la lettura.');
}
run().catch(console.error);
rs.resume(); // Scarica lo stream.
L'API finished
fornisce anche una versione di callback.
stream.finished()
lascia event listeners pendenti (in particolare 'error'
, 'end'
, 'finish'
e 'close'
) dopo che la promise restituita è stata risolta o rifiutata. La ragione di ciò è che gli eventi 'error'
imprevisti (a causa di implementazioni di stream non corrette) non causino arresti anomali imprevisti. Se questo è un comportamento indesiderato, allora options.cleanup
dovrebbe essere impostato su true
:
await finished(rs, { cleanup: true });
Modalità oggetto
Tutti gli stream creati dalle API di Node.js operano esclusivamente su stringhe, oggetti <Buffer>, <TypedArray> e <DataView>:
Strings
eBuffers
sono i tipi più comuni utilizzati con gli stream.TypedArray
eDataView
consentono di gestire dati binari con tipi comeInt32Array
oUint8Array
. Quando si scrive un TypedArray o DataView in uno stream, Node.js elabora i byte raw.
È possibile, tuttavia, che le implementazioni di stream funzionino con altri tipi di valori JavaScript (ad eccezione di null
, che ha uno scopo speciale all'interno degli stream). Tali stream sono considerati operanti in "modalità oggetto".
Le istanze di stream vengono commutate in modalità oggetto utilizzando l'opzione objectMode
quando viene creato lo stream. Tentare di commutare uno stream esistente in modalità oggetto non è sicuro.
Buffering
Sia gli stream Writable
che Readable
memorizzano i dati in un buffer interno.
La quantità di dati potenzialmente bufferizzati dipende dall'opzione highWaterMark
passata al costruttore dello stream. Per gli stream normali, l'opzione highWaterMark
specifica un numero totale di byte. Per gli stream che operano in modalità oggetto, highWaterMark
specifica un numero totale di oggetti. Per gli stream che operano su (ma non decodificano) stringhe, highWaterMark
specifica un numero totale di unità di codice UTF-16.
I dati vengono bufferizzati negli stream Readable
quando l'implementazione chiama stream.push(chunk)
. Se il consumer dello Stream non chiama stream.read()
, i dati rimarranno nella coda interna fino a quando non verranno consumati.
Una volta che la dimensione totale del buffer di lettura interno raggiunge la soglia specificata da highWaterMark
, lo stream smetterà temporaneamente di leggere i dati dalla risorsa sottostante fino a quando i dati attualmente bufferizzati non potranno essere consumati (ovvero, lo stream smetterà di chiamare il metodo interno readable._read()
utilizzato per riempire il buffer di lettura).
I dati vengono bufferizzati negli stream Writable
quando il metodo writable.write(chunk)
viene chiamato ripetutamente. Mentre la dimensione totale del buffer di scrittura interno è inferiore alla soglia impostata da highWaterMark
, le chiamate a writable.write()
restituiranno true
. Una volta che la dimensione del buffer interno raggiunge o supera highWaterMark
, verrà restituito false
.
Un obiettivo chiave dell'API stream
, in particolare il metodo stream.pipe()
, è limitare il buffering dei dati a livelli accettabili in modo che sorgenti e destinazioni di velocità diverse non sovraccarichino la memoria disponibile.
L'opzione highWaterMark
è una soglia, non un limite: indica la quantità di dati che uno stream bufferizza prima di smettere di chiedere ulteriori dati. In generale, non impone una rigorosa limitazione della memoria. Implementazioni di stream specifiche possono scegliere di imporre limiti più rigidi, ma farlo è facoltativo.
Poiché gli stream Duplex
e Transform
sono sia Readable
che Writable
, ognuno mantiene due buffer interni separati utilizzati per la lettura e la scrittura, consentendo a ciascun lato di operare indipendentemente dall'altro mantenendo un flusso di dati appropriato ed efficiente. Ad esempio, le istanze di net.Socket
sono stream Duplex
il cui lato Readable
consente il consumo di dati ricevuti dalla socket e il cui lato Writable
consente la scrittura di dati nella socket. Poiché i dati possono essere scritti nella socket a una velocità maggiore o minore rispetto a quella con cui vengono ricevuti, ogni lato deve operare (e bufferizzare) indipendentemente dall'altro.
La meccanica del buffering interno è un dettaglio di implementazione interno e può essere modificata in qualsiasi momento. Tuttavia, per alcune implementazioni avanzate, i buffer interni possono essere recuperati utilizzando writable.writableBuffer
o readable.readableBuffer
. L'uso di queste proprietà non documentate è sconsigliato.
API per i consumatori di stream
Quasi tutte le applicazioni Node.js, non importa quanto semplici, utilizzano stream in qualche modo. Il seguente è un esempio di utilizzo degli stream in un'applicazione Node.js che implementa un server HTTP:
const http = require('node:http');
const server = http.createServer((req, res) => {
// `req` è un http.IncomingMessage, che è uno stream leggibile.
// `res` è un http.ServerResponse, che è uno stream scrivibile.
let body = '';
// Ottieni i dati come stringhe utf8.
// Se una codifica non è impostata, verranno ricevuti oggetti Buffer.
req.setEncoding('utf8');
// Gli stream leggibili emettono eventi 'data' una volta aggiunto un listener.
req.on('data', (chunk) => {
body += chunk;
});
// L'evento 'end' indica che l'intero corpo è stato ricevuto.
req.on('end', () => {
try {
const data = JSON.parse(body);
// Rispondi con qualcosa di interessante all'utente:
res.write(typeof data);
res.end();
} catch (er) {
// uh oh! json non valido!
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
server.listen(1337);
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON
Gli stream Writable
(come res
nell'esempio) espongono metodi come write()
e end()
che vengono utilizzati per scrivere dati sullo stream.
Gli stream Readable
utilizzano l'API EventEmitter
per notificare al codice dell'applicazione quando i dati sono disponibili per essere letti dallo stream. Tali dati disponibili possono essere letti dallo stream in diversi modi.
Sia gli stream Writable
che Readable
utilizzano l'API EventEmitter
in vari modi per comunicare lo stato corrente dello stream.
Gli stream Duplex
e Transform
sono sia Writable
che Readable
.
Le applicazioni che scrivono o consumano dati da uno stream non sono tenute a implementare direttamente le interfacce di stream e generalmente non avranno motivo di chiamare require('node:stream')
.
Gli sviluppatori che desiderano implementare nuovi tipi di stream dovrebbero fare riferimento alla sezione API per gli implementatori di stream.
Flussi scrivibili
I flussi scrivibili sono un'astrazione per una destinazione verso cui i dati vengono scritti.
Esempi di flussi Writable
includono:
- Richieste HTTP, sul client
- Risposte HTTP, sul server
- Flussi di scrittura fs
- Flussi zlib
- Flussi crittografici
- Socket TCP
- Stdin del processo figlio
process.stdout
,process.stderr
Alcuni di questi esempi sono in realtà flussi Duplex
che implementano l'interfaccia Writable
.
Tutti i flussi Writable
implementano l'interfaccia definita dalla classe stream.Writable
.
Mentre le istanze specifiche di flussi Writable
possono differire in vari modi, tutti i flussi Writable
seguono lo stesso schema di utilizzo fondamentale come illustrato nell'esempio seguente:
const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');
Classe: stream.Writable
Aggiunto in: v0.9.4
Evento: 'close'
[Cronologia]
Versione | Cambiamenti |
---|---|
v10.0.0 | Aggiunta l'opzione emitClose per specificare se 'close' viene emesso alla distruzione. |
v0.9.4 | Aggiunto in: v0.9.4 |
L'evento 'close'
viene emesso quando il flusso e qualsiasi sua risorsa sottostante (un descrittore di file, ad esempio) sono stati chiusi. L'evento indica che non verranno emessi ulteriori eventi e non si verificherà alcun ulteriore calcolo.
Un flusso Writable
emetterà sempre l'evento 'close'
se viene creato con l'opzione emitClose
.
Evento: 'drain'
Aggiunto in: v0.9.4
Se una chiamata a stream.write(chunk)
restituisce false
, l'evento 'drain'
verrà emesso quando sarà appropriato riprendere la scrittura dei dati nel flusso.
// Scrivi i dati al flusso scrivibile fornito un milione di volte.
// Sii attento alla contropressione.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000;
write();
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// Ultima volta!
writer.write(data, encoding, callback);
} else {
// Vedi se dovremmo continuare o aspettare.
// Non passare la callback, perché non abbiamo ancora finito.
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// Dovevo fermarmi prima!
// Scrivi ancora una volta che si svuota.
writer.once('drain', write);
}
}
}
Evento: 'error'
Aggiunto in: v0.9.4
L'evento 'error'
viene emesso se si è verificato un errore durante la scrittura o il trasferimento di dati. Il callback del listener riceve un singolo argomento Error
quando viene chiamato.
Lo stream viene chiuso quando viene emesso l'evento 'error'
a meno che l'opzione autoDestroy
non sia stata impostata su false
durante la creazione dello stream.
Dopo 'error'
, nessun altro evento oltre a 'close'
dovrebbe essere emesso (inclusi gli eventi 'error'
).
Evento: 'finish'
Aggiunto in: v0.9.4
L'evento 'finish'
viene emesso dopo che il metodo stream.end()
è stato chiamato e tutti i dati sono stati scaricati nel sistema sottostante.
const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`);
}
writer.on('finish', () => {
console.log('Tutte le scritture sono ora complete.');
});
writer.end('Questa è la fine\n');
Evento: 'pipe'
Aggiunto in: v0.9.4
src
<stream.Readable> stream sorgente che sta eseguendo il piping verso questo scrivibile
L'evento 'pipe'
viene emesso quando il metodo stream.pipe()
viene chiamato su uno stream readable, aggiungendo questo scrivibile al suo insieme di destinazioni.
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
console.log('Qualcosa sta eseguendo il piping nello writer.');
assert.equal(src, reader);
});
reader.pipe(writer);
Evento: 'unpipe'
Aggiunto in: v0.9.4
src
<stream.Readable> Lo stream sorgente che ha rimosso il pipe da questo scrivibile
L'evento 'unpipe'
viene emesso quando il metodo stream.unpipe()
viene chiamato su uno stream Readable
, rimuovendo questo Writable
dal suo insieme di destinazioni.
Questo viene emesso anche nel caso in cui questo stream Writable
emetta un errore quando uno stream Readable
esegue il pipe al suo interno.
const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
console.log('Qualcosa ha smesso di eseguire il piping nello writer.');
assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
writable.cork()
Aggiunto in: v0.11.2
Il metodo writable.cork()
forza tutti i dati scritti a essere memorizzati nel buffer in memoria. I dati bufferizzati verranno scaricati quando vengono chiamati i metodi stream.uncork()
o stream.end()
.
L'intento principale di writable.cork()
è di gestire una situazione in cui diversi piccoli chunk vengono scritti nello stream in rapida successione. Invece di inoltrarli immediatamente alla destinazione sottostante, writable.cork()
bufferizza tutti i chunk fino a quando non viene chiamato writable.uncork()
, che li passerà tutti a writable._writev()
, se presente. Ciò impedisce una situazione di blocco head-of-line in cui i dati vengono bufferizzati mentre si attende che il primo piccolo chunk venga elaborato. Tuttavia, l'uso di writable.cork()
senza implementare writable._writev()
può avere un effetto negativo sulla velocità effettiva.
Vedi anche: writable.uncork()
, writable._writev()
.
writable.destroy([error])
[Cronologia]
Versione | Modifiche |
---|---|
v14.0.0 | Funziona come no-op su uno stream che è già stato distrutto. |
v8.0.0 | Aggiunto in: v8.0.0 |
Distrugge lo stream. Facoltativamente, emette un evento 'error'
ed emette un evento 'close'
(a meno che emitClose
non sia impostato su false
). Dopo questa chiamata, lo stream scrivibile è terminato e le successive chiamate a write()
o end()
comporteranno un errore ERR_STREAM_DESTROYED
. Questo è un modo distruttivo e immediato per distruggere uno stream. Le chiamate precedenti a write()
potrebbero non essere state scaricate e potrebbero attivare un errore ERR_STREAM_DESTROYED
. Utilizzare end()
invece di destroy se i dati devono essere scaricati prima della chiusura oppure attendere l'evento 'drain'
prima di distruggere lo stream.
const { Writable } = require('node:stream');
const myStream = new Writable();
const fooErr = new Error('foo error');
myStream.destroy(fooErr);
myStream.on('error', (fooErr) => console.error(fooErr.message)); // foo error
const { Writable } = require('node:stream');
const myStream = new Writable();
myStream.destroy();
myStream.on('error', function wontHappen() {});
const { Writable } = require('node:stream');
const myStream = new Writable();
myStream.destroy();
myStream.write('foo', (error) => console.error(error.code));
// ERR_STREAM_DESTROYED
Una volta che destroy()
è stato chiamato, ulteriori chiamate saranno no-op e nessun ulteriore errore eccetto da _destroy()
può essere emesso come 'error'
.
Gli implementatori non devono sovrascrivere questo metodo, ma implementare invece writable._destroy()
.
writable.closed
Aggiunto in: v18.0.0
È true
dopo che è stato emesso 'close'
.
writable.destroyed
Aggiunto in: v8.0.0
È true
dopo che è stato chiamato writable.destroy()
.
const { Writable } = require('node:stream');
const myStream = new Writable();
console.log(myStream.destroyed); // false
myStream.destroy();
console.log(myStream.destroyed); // true
writable.end([chunk[, encoding]][, callback])
[Cronologia]
Versione | Modifiche |
---|---|
v22.0.0, v20.13.0 | L'argomento chunk ora può essere un'istanza di TypedArray o DataView . |
v15.0.0 | La callback viene richiamata prima di 'finish' o in caso di errore. |
v14.0.0 | La callback viene richiamata se vengono emessi 'finish' o 'error'. |
v10.0.0 | Questo metodo ora restituisce un riferimento a writable . |
v8.0.0 | L'argomento chunk ora può essere un'istanza di Uint8Array . |
v0.9.4 | Aggiunto in: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> Dati opzionali da scrivere. Per i flussi che non operano in modalità oggetto,chunk
deve essere una <string>, <Buffer>, <TypedArray> o <DataView>. Per i flussi in modalità oggetto,chunk
può essere qualsiasi valore JavaScript diverso danull
.encoding
<string> La codifica sechunk
è una stringacallback
<Function> Callback per quando il flusso è terminato.- Restituisce: <this>
La chiamata al metodo writable.end()
segnala che non verranno più scritti dati nel Writable
. Gli argomenti opzionali chunk
e encoding
consentono di scrivere un ulteriore blocco di dati finale immediatamente prima di chiudere il flusso.
La chiamata al metodo stream.write()
dopo aver chiamato stream.end()
genererà un errore.
// Scrivi 'hello, ' e poi termina con 'world!'.
const fs = require('node:fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// Scrivere altro ora non è consentito!
writable.setDefaultEncoding(encoding)
[Cronologia]
Versione | Modifiche |
---|---|
v6.1.0 | Questo metodo ora restituisce un riferimento a writable . |
v0.11.15 | Aggiunto in: v0.11.15 |
Il metodo writable.setDefaultEncoding()
imposta la encoding
predefinita per un flusso Writable
.
writable.uncork()
Aggiunto in: v0.11.2
Il metodo writable.uncork()
scarica tutti i dati bufferizzati da quando è stato chiamato stream.cork()
.
Quando si utilizzano writable.cork()
e writable.uncork()
per gestire il buffering delle scritture in un flusso, differire le chiamate a writable.uncork()
utilizzando process.nextTick()
. In questo modo è possibile raggruppare tutte le chiamate writable.write()
che si verificano all'interno di una determinata fase del ciclo di eventi di Node.js.
stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());
Se il metodo writable.cork()
viene chiamato più volte su un flusso, è necessario chiamare lo stesso numero di volte writable.uncork()
per scaricare i dati bufferizzati.
stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
stream.uncork();
// I dati non verranno scaricati finché uncork() non viene chiamato una seconda volta.
stream.uncork();
});
Vedi anche: writable.cork()
.
writable.writable
Aggiunto in: v11.4.0
È true
se è sicuro chiamare writable.write()
, il che significa che il flusso non è stato distrutto, ha generato un errore o è terminato.
writable.writableAborted
Aggiunto in: v18.0.0, v16.17.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
Restituisce se il flusso è stato distrutto o ha generato un errore prima di emettere 'finish'
.
writable.writableEnded
Aggiunto in: v12.9.0
È true
dopo che è stato chiamato writable.end()
. Questa proprietà non indica se i dati sono stati scaricati; per questo, utilizzare invece writable.writableFinished
.
writable.writableCorked
Aggiunto in: v13.2.0, v12.16.0
Numero di volte in cui è necessario chiamare writable.uncork()
per sbloccare completamente lo stream.
writable.errored
Aggiunto in: v18.0.0
Restituisce un errore se lo stream è stato distrutto con un errore.
writable.writableFinished
Aggiunto in: v12.6.0
È impostato su true
immediatamente prima che venga emesso l'evento 'finish'
.
writable.writableHighWaterMark
Aggiunto in: v9.3.0
Restituisce il valore di highWaterMark
passato durante la creazione di questo Writable
.
writable.writableLength
Aggiunto in: v9.4.0
Questa proprietà contiene il numero di byte (o oggetti) nella coda pronti per essere scritti. Il valore fornisce dati di introspezione relativi allo stato di highWaterMark
.
writable.writableNeedDrain
Aggiunto in: v15.2.0, v14.17.0
È true
se il buffer dello stream è pieno e lo stream emetterà 'drain'
.
writable.writableObjectMode
Aggiunto in: v12.3.0
Getter per la proprietà objectMode
di un determinato stream Writable
.
writable[Symbol.asyncDispose]()
Aggiunto in: v22.4.0, v20.16.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
Chiama writable.destroy()
con un AbortError
e restituisce una promise che si risolve quando lo stream è terminato.
writable.write(chunk[, encoding][, callback])
[Cronologia]
Versione | Modifiche |
---|---|
v22.0.0, v20.13.0 | L'argomento chunk ora può essere un'istanza di TypedArray o DataView . |
v8.0.0 | L'argomento chunk ora può essere un'istanza di Uint8Array . |
v6.0.0 | Passare null come parametro chunk sarà sempre considerato non valido, anche in modalità oggetto. |
v0.9.4 | Aggiunto in: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> Dati facoltativi da scrivere. Per gli stream che non operano in modalità oggetto,chunk
deve essere un <string>, <Buffer>, <TypedArray> o <DataView>. Per gli stream in modalità oggetto,chunk
può essere qualsiasi valore JavaScript diverso danull
.encoding
<string> | <null> La codifica, sechunk
è una stringa. Predefinito:'utf8'
callback
<Function> Callback per quando questo chunk di dati viene scaricato.- Restituisce: <boolean>
false
se lo stream desidera che il codice chiamante attenda che l'evento'drain'
venga emesso prima di continuare a scrivere dati aggiuntivi; altrimentitrue
.
Il metodo writable.write()
scrive alcuni dati nello stream e richiama la callback
fornita una volta che i dati sono stati completamente gestiti. Se si verifica un errore, la callback
verrà richiamata con l'errore come primo argomento. La callback
viene richiamata in modo asincrono e prima che venga emesso 'error'
.
Il valore di ritorno è true
se il buffer interno è inferiore a highWaterMark
configurato quando lo stream è stato creato dopo aver ammesso chunk
. Se viene restituito false
, ulteriori tentativi di scrittura di dati nello stream devono interrompersi fino all'emissione dell'evento 'drain'
.
Mentre uno stream non si sta svuotando, le chiamate a write()
bufferizzeranno chunk
e restituiranno false. Una volta che tutti i chunk attualmente bufferizzati sono stati svuotati (accettati per la consegna dal sistema operativo), verrà emesso l'evento 'drain'
. Una volta che write()
restituisce false, non scrivere più chunk fino all'emissione dell'evento 'drain'
. Sebbene sia consentito chiamare write()
su uno stream che non si sta svuotando, Node.js bufferizzerà tutti i chunk scritti fino a quando non si verifica il massimo utilizzo della memoria, a quel punto si interromperà incondizionatamente. Anche prima che si interrompa, l'elevato utilizzo della memoria causerà scarse prestazioni del garbage collector e un RSS elevato (che in genere non viene rilasciato al sistema, anche dopo che la memoria non è più richiesta). Poiché i socket TCP potrebbero non svuotarsi mai se il peer remoto non legge i dati, scrivere su un socket che non si sta svuotando potrebbe portare a una vulnerabilità sfruttabile da remoto.
La scrittura di dati mentre lo stream non si sta svuotando è particolarmente problematica per un Transform
, perché gli stream Transform
vengono messi in pausa per impostazione predefinita finché non vengono piped o viene aggiunto un gestore di eventi 'data'
o 'readable'
.
Se i dati da scrivere possono essere generati o recuperati su richiesta, si consiglia di incapsulare la logica in un Readable
e utilizzare stream.pipe()
. Tuttavia, se si preferisce chiamare write()
, è possibile rispettare la contropressione ed evitare problemi di memoria utilizzando l'evento 'drain'
:
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb);
} else {
process.nextTick(cb);
}
}
// Aspetta che cb venga chiamata prima di eseguire qualsiasi altra scrittura.
write('hello', () => {
console.log('Scrittura completata, esegui altre scritture ora.');
});
Uno stream Writable
in modalità oggetto ignorerà sempre l'argomento encoding
.
Stream leggibili
Gli stream leggibili sono un'astrazione per una sorgente da cui vengono consumati i dati.
Esempi di stream Readable
includono:
- Risposte HTTP, sul client
- Richieste HTTP, sul server
- Stream di lettura fs
- Stream zlib
- Stream crittografici
- Socket TCP
- stdout e stderr del processo figlio
process.stdin
Tutti gli stream Readable
implementano l'interfaccia definita dalla classe stream.Readable
.
Due modalità di lettura
Gli stream Readable
operano effettivamente in una delle due modalità: flowing e paused. Queste modalità sono separate dalla modalità oggetto. Uno stream Readable
può essere in modalità oggetto o meno, indipendentemente dal fatto che sia in modalità flowing o in modalità paused.
- In modalità flowing, i dati vengono letti automaticamente dal sistema sottostante e forniti a un'applicazione il più rapidamente possibile utilizzando eventi tramite l'interfaccia
EventEmitter
. - In modalità paused, il metodo
stream.read()
deve essere chiamato esplicitamente per leggere blocchi di dati dallo stream.
Tutti gli stream Readable
iniziano in modalità paused, ma possono essere commutati in modalità flowing in uno dei seguenti modi:
- Aggiungendo un gestore di eventi
'data'
. - Chiamando il metodo
stream.resume()
. - Chiamando il metodo
stream.pipe()
per inviare i dati a uno streamWritable
.
Lo stream Readable
può tornare in modalità paused utilizzando uno dei seguenti metodi:
- Se non ci sono destinazioni di pipe, chiamando il metodo
stream.pause()
. - Se ci sono destinazioni di pipe, rimuovendo tutte le destinazioni di pipe. È possibile rimuovere più destinazioni di pipe chiamando il metodo
stream.unpipe()
.
Il concetto importante da ricordare è che uno stream Readable
non genererà dati fino a quando non viene fornito un meccanismo per consumare o ignorare tali dati. Se il meccanismo di consumo è disabilitato o rimosso, lo stream Readable
cercherà di interrompere la generazione dei dati.
Per motivi di compatibilità con le versioni precedenti, la rimozione dei gestori di eventi 'data'
non metterà automaticamente in pausa lo stream. Inoltre, se ci sono destinazioni piped, la chiamata a stream.pause()
non garantirà che lo stream rimanga in pausa una volta che tali destinazioni si scaricano e richiedono più dati.
Se uno stream Readable
viene commutato in modalità flowing e non sono disponibili consumer per gestire i dati, tali dati andranno persi. Ciò può verificarsi, ad esempio, quando viene chiamato il metodo readable.resume()
senza un listener collegato all'evento 'data'
, o quando un gestore di eventi 'data'
viene rimosso dallo stream.
L'aggiunta di un gestore di eventi 'readable'
fa sì che lo stream smetta automaticamente di fluire e i dati devono essere consumati tramite readable.read()
. Se il gestore di eventi 'readable'
viene rimosso, lo stream ricomincerà a fluire se è presente un gestore di eventi 'data'
.
Tre stati
I "due modi" di funzionamento per uno stream Readable
sono un'astrazione semplificata per la gestione dello stato interno più complessa che avviene all'interno dell'implementazione dello stream Readable
.
Nello specifico, in un dato momento, ogni Readable
si trova in uno dei tre possibili stati:
readable.readableFlowing === null
readable.readableFlowing === false
readable.readableFlowing === true
Quando readable.readableFlowing
è null
, non viene fornito alcun meccanismo per consumare i dati dello stream. Pertanto, lo stream non genererà dati. Mentre si trova in questo stato, collegare un listener per l'evento 'data'
, chiamare il metodo readable.pipe()
o chiamare il metodo readable.resume()
cambierà readable.readableFlowing
in true
, facendo sì che il Readable
inizi a emettere attivamente eventi man mano che i dati vengono generati.
Chiamare readable.pause()
, readable.unpipe()
o ricevere contropressione farà sì che readable.readableFlowing
venga impostato come false
, interrompendo temporaneamente il flusso di eventi ma non interrompendo la generazione di dati. Mentre si trova in questo stato, collegare un listener per l'evento 'data'
non cambierà readable.readableFlowing
in true
.
const { PassThrough, Writable } = require('node:stream');
const pass = new PassThrough();
const writable = new Writable();
pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing è ora false.
pass.on('data', (chunk) => { console.log(chunk.toString()); });
// readableFlowing è ancora false.
pass.write('ok'); // Non emetterà 'data'.
pass.resume(); // Deve essere chiamato per fare in modo che lo stream emetta 'data'.
// readableFlowing è ora true.
Mentre readable.readableFlowing
è false
, i dati potrebbero accumularsi all'interno del buffer interno dello stream.
Scegli uno stile API
L'API stream Readable
si è evoluta attraverso molteplici versioni di Node.js e fornisce molteplici metodi per consumare i dati dello stream. In generale, gli sviluppatori dovrebbero scegliere uno dei metodi per consumare i dati e non dovrebbero mai utilizzare più metodi per consumare i dati da un singolo stream. Nello specifico, utilizzare una combinazione di on('data')
, on('readable')
, pipe()
o iteratori asincroni potrebbe portare a un comportamento non intuitivo.
Classe: stream.Readable
Aggiunto in: v0.9.4
Evento: 'close'
[Cronologia]
Versione | Modifiche |
---|---|
v10.0.0 | Aggiunta l'opzione emitClose per specificare se 'close' viene emesso durante la distruzione. |
v0.9.4 | Aggiunto in: v0.9.4 |
L'evento 'close'
viene emesso quando lo stream e una qualsiasi delle sue risorse sottostanti (ad esempio, un descrittore di file) sono state chiuse. L'evento indica che non verranno emessi altri eventi e non si verificheranno ulteriori calcoli.
Uno stream Readable
emetterà sempre l'evento 'close'
se viene creato con l'opzione emitClose
.
Evento: 'data'
Aggiunto in: v0.9.4
chunk
<Buffer> | <string> | <any> Il chunk di dati. Per gli stream che non operano in modalità oggetto, il chunk sarà una stringa o unBuffer
. Per gli stream che sono in modalità oggetto, il chunk può essere qualsiasi valore JavaScript diverso danull
.
L'evento 'data'
viene emesso ogni volta che lo stream cede la proprietà di un chunk di dati a un consumer. Ciò può verificarsi ogni volta che lo stream viene commutato in modalità di flusso chiamando readable.pipe()
, readable.resume()
o collegando una callback listener all'evento 'data'
. L'evento 'data'
verrà emesso anche ogni volta che viene chiamato il metodo readable.read()
e un chunk di dati è disponibile per essere restituito.
Collegare un listener di eventi 'data'
a uno stream che non è stato esplicitamente messo in pausa commuterà lo stream in modalità di flusso. I dati verranno quindi passati non appena saranno disponibili.
Alla callback listener verrà passato il chunk di dati come stringa se è stata specificata una codifica predefinita per lo stream utilizzando il metodo readable.setEncoding()
; altrimenti i dati verranno passati come Buffer
.
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Ricevuti ${chunk.length} byte di dati.`);
});
Evento: 'end'
Aggiunto in: v0.9.4
L'evento 'end'
viene emesso quando non ci sono più dati da consumare dal flusso.
L'evento 'end'
non verrà emesso a meno che i dati non siano completamente consumati. Ciò può essere ottenuto commutando il flusso in modalità flowing o chiamando stream.read()
ripetutamente fino a quando tutti i dati non sono stati consumati.
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
console.log('There will be no more data.');
});
Evento: 'error'
Aggiunto in: v0.9.4
L'evento 'error'
può essere emesso da un'implementazione Readable
in qualsiasi momento. In genere, ciò può verificarsi se il flusso sottostante non è in grado di generare dati a causa di un errore interno sottostante o quando un'implementazione del flusso tenta di inviare un chunk di dati non valido.
Al callback del listener verrà passato un singolo oggetto Error
.
Evento: 'pause'
Aggiunto in: v0.9.4
L'evento 'pause'
viene emesso quando viene chiamato stream.pause()
e readableFlowing
non è false
.
Evento: 'readable'
[Cronologia]
Versione | Modifiche |
---|---|
v10.0.0 | 'readable' viene sempre emesso nel tick successivo dopo che è stato chiamato .push() . |
v10.0.0 | L'utilizzo di 'readable' richiede la chiamata di .read() . |
v0.9.4 | Aggiunto in: v0.9.4 |
L'evento 'readable'
viene emesso quando ci sono dati disponibili per essere letti dal flusso, fino al limite massimo configurato (state.highWaterMark
). In effetti, indica che il flusso ha nuove informazioni all'interno del buffer. Se i dati sono disponibili all'interno di questo buffer, è possibile chiamare stream.read()
per recuperare tali dati. Inoltre, l'evento 'readable'
può anche essere emesso quando è stata raggiunta la fine del flusso.
const readable = getReadableStreamSomehow();
readable.on('readable', function() {
// There is some data to read now.
let data;
while ((data = this.read()) !== null) {
console.log(data);
}
});
Se è stata raggiunta la fine del flusso, la chiamata a stream.read()
restituirà null
e attiverà l'evento 'end'
. Questo è vero anche se non ci sono mai stati dati da leggere. Ad esempio, nel seguente esempio, foo.txt
è un file vuoto:
const fs = require('node:fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
console.log('end');
});
L'output dell'esecuzione di questo script è:
$ node test.js
readable: null
end
In alcuni casi, l'aggiunta di un listener per l'evento 'readable'
farà sì che una certa quantità di dati venga letta in un buffer interno.
In generale, i meccanismi readable.pipe()
e dell'evento 'data'
sono più facili da capire rispetto all'evento 'readable'
. Tuttavia, la gestione di 'readable'
potrebbe comportare un aumento della velocità effettiva.
Se sia 'readable'
che 'data'
vengono utilizzati contemporaneamente, 'readable'
ha la precedenza nel controllo del flusso, ovvero 'data'
verrà emesso solo quando viene chiamato stream.read()
. La proprietà readableFlowing
diventerebbe false
. Se ci sono listener 'data'
quando viene rimosso 'readable'
, il flusso inizierà a fluire, ovvero gli eventi 'data'
verranno emessi senza chiamare .resume()
.
Evento: 'resume'
Aggiunto in: v0.9.4
L'evento 'resume'
viene emesso quando viene chiamato stream.resume()
e readableFlowing
non è true
.
readable.destroy([error])
[Cronologia]
Versione | Modifiche |
---|---|
v14.0.0 | Funziona come no-op su uno stream che è già stato distrutto. |
v8.0.0 | Aggiunto in: v8.0.0 |
Distrugge lo stream. Facoltativamente emette un evento 'error'
ed emette un evento 'close'
(a meno che emitClose
non sia impostato su false
). Dopo questa chiamata, lo stream leggibile rilascerà qualsiasi risorsa interna e le successive chiamate a push()
verranno ignorate.
Una volta che destroy()
è stato chiamato, qualsiasi ulteriore chiamata sarà un no-op e nessun ulteriore errore tranne che da _destroy()
potrà essere emesso come 'error'
.
Gli implementatori non devono sovrascrivere questo metodo, ma invece implementare readable._destroy()
.
readable.closed
Aggiunto in: v18.0.0
È true
dopo che 'close'
è stato emesso.
readable.destroyed
Aggiunto in: v8.0.0
È true
dopo che readable.destroy()
è stato chiamato.
readable.isPaused()
Aggiunto in: v0.11.14
- Restituisce: <boolean>
Il metodo readable.isPaused()
restituisce lo stato operativo corrente di Readable
. Questo è utilizzato principalmente dal meccanismo alla base del metodo readable.pipe()
. Nella maggior parte dei casi tipici, non ci sarà alcun motivo per utilizzare questo metodo direttamente.
const readable = new stream.Readable();
readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false
readable.pause()
Aggiunto in: v0.9.4
- Restituisce: <this>
Il metodo readable.pause()
farà sì che uno stream in modalità flowing smetta di emettere eventi 'data'
, uscendo dalla modalità flowing. Qualsiasi dato che diventa disponibile rimarrà nel buffer interno.
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Ricevuti ${chunk.length} byte di dati.`);
readable.pause();
console.log('Non ci saranno ulteriori dati per 1 secondo.');
setTimeout(() => {
console.log('Ora i dati inizieranno a fluire di nuovo.');
readable.resume();
}, 1000);
});
Il metodo readable.pause()
non ha alcun effetto se è presente un listener di eventi 'readable'
.
readable.pipe(destination[, options])
Aggiunto in: v0.9.4
destination
<stream.Writable> La destinazione per la scrittura dei datioptions
<Object> Opzioni di pipeend
<boolean> Termina il writer quando il reader termina. Predefinito:true
.
Restituisce: <stream.Writable> La destinazione, consentendo una catena di pipe se è uno stream
Duplex
oTransform
Il metodo readable.pipe()
collega uno stream Writable
al readable
, facendolo passare automaticamente in modalità flowing e inviando tutti i suoi dati al Writable
collegato. Il flusso di dati sarà gestito automaticamente in modo che lo stream Writable
di destinazione non sia sopraffatto da uno stream Readable
più veloce.
L'esempio seguente invia tramite pipe tutti i dati dal readable
a un file denominato file.txt
:
const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// Tutti i dati da readable vanno in 'file.txt'.
readable.pipe(writable);
È possibile collegare più stream Writable
a un singolo stream Readable
.
Il metodo readable.pipe()
restituisce un riferimento allo stream di destinazione, rendendo possibile impostare catene di stream piped:
const fs = require('node:fs');
const zlib = require('node:zlib');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
Per impostazione predefinita, stream.end()
viene chiamato sullo stream Writable
di destinazione quando lo stream Readable
di origine emette 'end'
, in modo che la destinazione non sia più scrivibile. Per disabilitare questo comportamento predefinito, l'opzione end
può essere passata come false
, facendo in modo che lo stream di destinazione rimanga aperto:
reader.pipe(writer, { end: false });
reader.on('end', () => {
writer.end('Goodbye\n');
});
Un avvertimento importante è che se lo stream Readable
emette un errore durante l'elaborazione, la destinazione Writable
non viene chiusa automaticamente. Se si verifica un errore, sarà necessario chiudere manualmente ogni stream per prevenire perdite di memoria.
Gli stream Writable
process.stderr
e process.stdout
non vengono mai chiusi fino a quando il processo Node.js non si chiude, indipendentemente dalle opzioni specificate.
readable.read([size])
Aggiunto in: v0.9.4
size
<number> Argomento opzionale per specificare quanti dati leggere.- Restituisce: <string> | <Buffer> | <null> | <any>
Il metodo readable.read()
legge i dati dal buffer interno e li restituisce. Se non sono disponibili dati da leggere, viene restituito null
. Per impostazione predefinita, i dati vengono restituiti come oggetto Buffer
a meno che non sia stata specificata una codifica utilizzando il metodo readable.setEncoding()
o lo stream non stia operando in modalità oggetto.
L'argomento opzionale size
specifica un numero specifico di byte da leggere. Se non sono disponibili size
byte da leggere, verrà restituito null
a meno che lo stream non sia terminato, nel qual caso verranno restituiti tutti i dati rimanenti nel buffer interno.
Se l'argomento size
non è specificato, verranno restituiti tutti i dati contenuti nel buffer interno.
L'argomento size
deve essere minore o uguale a 1 GiB.
Il metodo readable.read()
deve essere chiamato solo su stream Readable
che operano in modalità in pausa. In modalità fluente, readable.read()
viene chiamato automaticamente fino a quando il buffer interno non è completamente svuotato.
const readable = getReadableStreamSomehow();
// 'readable' potrebbe essere attivato più volte man mano che i dati vengono memorizzati nel buffer
readable.on('readable', () => {
let chunk;
console.log('Lo stream è leggibile (nuovi dati ricevuti nel buffer)');
// Utilizzare un ciclo per assicurarsi di leggere tutti i dati attualmente disponibili
while (null !== (chunk = readable.read())) {
console.log(`Lettura di ${chunk.length} byte di dati...`);
}
});
// 'end' verrà attivato una volta quando non sono più disponibili dati
readable.on('end', () => {
console.log('Raggiunta la fine dello stream.');
});
Ogni chiamata a readable.read()
restituisce un blocco di dati o null
, indicando che non ci sono più dati da leggere in quel momento. Questi blocchi non vengono concatenati automaticamente. Poiché una singola chiamata a read()
non restituisce tutti i dati, potrebbe essere necessario utilizzare un ciclo while per leggere continuamente i blocchi fino a quando non vengono recuperati tutti i dati. Durante la lettura di un file di grandi dimensioni, .read()
potrebbe restituire temporaneamente null
, indicando che ha consumato tutto il contenuto memorizzato nel buffer, ma potrebbero esserci ancora più dati da memorizzare nel buffer. In tali casi, viene emesso un nuovo evento 'readable'
una volta che ci sono più dati nel buffer e l'evento 'end'
indica la fine della trasmissione dei dati.
Pertanto, per leggere l'intero contenuto di un file da un readable
, è necessario raccogliere blocchi su più eventi 'readable'
:
const chunks = [];
readable.on('readable', () => {
let chunk;
while (null !== (chunk = readable.read())) {
chunks.push(chunk);
}
});
readable.on('end', () => {
const content = chunks.join('');
});
Uno stream Readable
in modalità oggetto restituirà sempre un singolo elemento da una chiamata a readable.read(size)
, indipendentemente dal valore dell'argomento size
.
Se il metodo readable.read()
restituisce un blocco di dati, verrà emesso anche un evento 'data'
.
La chiamata a stream.read([size])
dopo che l'evento 'end'
è stato emesso restituirà null
. Non verrà generato alcun errore di runtime.
readable.readable
Aggiunto in: v11.4.0
È true
se è sicuro chiamare readable.read()
, il che significa che lo stream non è stato distrutto o non ha emesso 'error'
o 'end'
.
readable.readableAborted
Aggiunto in: v16.8.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
Restituisce se lo stream è stato distrutto o ha generato un errore prima di emettere 'end'
.
readable.readableDidRead
Aggiunto in: v16.7.0, v14.18.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
Restituisce se 'data'
è stato emesso.
readable.readableEncoding
Aggiunto in: v12.7.0
Getter per la proprietà encoding
di un dato stream Readable
. La proprietà encoding
può essere impostata utilizzando il metodo readable.setEncoding()
.
readable.readableEnded
Aggiunto in: v12.9.0
Diventa true
quando viene emesso l'evento 'end'
.
readable.errored
Aggiunto in: v18.0.0
Restituisce l'errore se lo stream è stato distrutto con un errore.
readable.readableFlowing
Aggiunto in: v9.4.0
Questa proprietà riflette lo stato attuale di uno stream Readable
come descritto nella sezione Tre stati.
readable.readableHighWaterMark
Aggiunto in: v9.3.0
Restituisce il valore di highWaterMark
passato durante la creazione di questo Readable
.
readable.readableLength
Aggiunto in: v9.4.0
Questa proprietà contiene il numero di byte (o oggetti) nella coda pronti per essere letti. Il valore fornisce dati di introspezione riguardanti lo stato di highWaterMark
.
readable.readableObjectMode
Aggiunto in: v12.3.0
Getter per la proprietà objectMode
di un determinato stream Readable
.
readable.resume()
[Cronologia]
Versione | Modifiche |
---|---|
v10.0.0 | resume() non ha alcun effetto se è in ascolto un evento 'readable' . |
v0.9.4 | Aggiunto in: v0.9.4 |
- Restituisce: <this>
Il metodo readable.resume()
fa sì che uno stream Readable
esplicitamente in pausa riprenda a emettere eventi 'data'
, portando lo stream in modalità flowing.
Il metodo readable.resume()
può essere utilizzato per consumare completamente i dati da uno stream senza elaborare effettivamente nessuno di quei dati:
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Raggiunta la fine, ma non ho letto nulla.');
});
Il metodo readable.resume()
non ha effetto se c'è un listener di eventi 'readable'
.
readable.setEncoding(encoding)
Aggiunto in: v0.9.4
Il metodo readable.setEncoding()
imposta la codifica dei caratteri per i dati letti dallo stream Readable
.
Per impostazione predefinita, non viene assegnata alcuna codifica e i dati dello stream verranno restituiti come oggetti Buffer
. L'impostazione di una codifica fa sì che i dati dello stream vengano restituiti come stringhe della codifica specificata anziché come oggetti Buffer
. Ad esempio, chiamando readable.setEncoding('utf8')
i dati di output verranno interpretati come dati UTF-8 e passati come stringhe. Chiamando readable.setEncoding('hex')
i dati verranno codificati in formato stringa esadecimale.
Lo stream Readable
gestirà correttamente i caratteri multi-byte forniti attraverso lo stream che altrimenti verrebbero decodificati in modo errato se semplicemente estratti dallo stream come oggetti Buffer
.
const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
assert.equal(typeof chunk, 'string');
console.log('Ottenuti %d caratteri di dati stringa:', chunk.length);
});
readable.unpipe([destination])
Aggiunta in: v0.9.4
destination
<stream.Writable> Stream specifico opzionale da scollegare.- Restituisce: <this>
Il metodo readable.unpipe()
scollega uno stream Writable
precedentemente collegato utilizzando il metodo stream.pipe()
.
Se destination
non è specificato, allora tutti i pipe vengono scollegati.
Se destination
è specificato, ma non è impostato alcun pipe per esso, allora il metodo non fa nulla.
const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// Tutti i dati da readable vanno in 'file.txt',
// ma solo per il primo secondo.
readable.pipe(writable);
setTimeout(() => {
console.log('Smetti di scrivere su file.txt.');
readable.unpipe(writable);
console.log('Chiudi manualmente lo stream del file.');
writable.end();
}, 1000);
readable.unshift(chunk[, encoding])
[Cronologia]
Versione | Modifiche |
---|---|
v22.0.0, v20.13.0 | L'argomento chunk ora può essere un'istanza di TypedArray o DataView . |
v8.0.0 | L'argomento chunk ora può essere un'istanza di Uint8Array . |
v0.9.11 | Aggiunta in: v0.9.11 |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Chunk di dati da reinserire nella coda di lettura. Per gli stream che non operano in modalità oggetto,chunk
deve essere un <string>, <Buffer>, <TypedArray>, <DataView> onull
. Per gli stream in modalità oggetto,chunk
può essere qualsiasi valore JavaScript.encoding
<string> Codifica dei chunk di stringa. Deve essere una codificaBuffer
valida, come'utf8'
o'ascii'
.
Passare chunk
come null
segnala la fine dello stream (EOF) e si comporta come readable.push(null)
, dopo il quale non è possibile scrivere altri dati. Il segnale EOF viene inserito alla fine del buffer e tutti i dati memorizzati nel buffer verranno comunque scaricati.
Il metodo readable.unshift()
reinserisce un chunk di dati nel buffer interno. Questo è utile in certe situazioni in cui uno stream viene utilizzato da codice che ha bisogno di "annullare il consumo" di una certa quantità di dati che ha estratto in modo ottimistico dalla sorgente, in modo che i dati possano essere passati a qualche altra parte.
Il metodo stream.unshift(chunk)
non può essere chiamato dopo che l'evento 'end'
è stato emesso o verrà generato un errore di runtime.
Gli sviluppatori che utilizzano stream.unshift()
dovrebbero spesso considerare di passare all'uso di uno stream Transform
. Vedere la sezione API per gli implementatori di stream per maggiori informazioni.
// Estrai un'intestazione delimitata da \n\n.
// Usa unshift() se otteniamo troppo.
// Chiama il callback con (error, header, stream).
const { StringDecoder } = require('node:string_decoder');
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
const decoder = new StringDecoder('utf8');
let header = '';
function onReadable() {
let chunk;
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk);
if (str.includes('\n\n')) {
// Trovato il limite dell'intestazione.
const split = str.split(/\n\n/);
header += split.shift();
const remaining = split.join('\n\n');
const buf = Buffer.from(remaining, 'utf8');
stream.removeListener('error', callback);
// Rimuovi il listener 'readable' prima di unshifting.
stream.removeListener('readable', onReadable);
if (buf.length)
stream.unshift(buf);
// Ora il corpo del messaggio può essere letto dallo stream.
callback(null, header, stream);
return;
}
// Stiamo ancora leggendo l'intestazione.
header += str;
}
}
}
A differenza di stream.push(chunk)
, stream.unshift(chunk)
non terminerà il processo di lettura resettando lo stato di lettura interno dello stream. Ciò può causare risultati inattesi se readable.unshift()
viene chiamato durante una lettura (cioè dall'interno di un'implementazione stream._read()
su uno stream personalizzato). Seguire la chiamata a readable.unshift()
con un stream.push('')
immediato ripristinerà lo stato di lettura in modo appropriato, tuttavia è meglio evitare semplicemente di chiamare readable.unshift()
mentre si è in procinto di eseguire una lettura.
readable.wrap(stream)
Aggiunto in: v0.9.4
Prima di Node.js 0.10, gli stream non implementavano l'intera API del modulo node:stream
come è definita attualmente. (Vedere Compatibilità per maggiori informazioni.)
Quando si utilizza una libreria Node.js più vecchia che emette eventi 'data'
e ha un metodo stream.pause()
che è solo consultivo, il metodo readable.wrap()
può essere utilizzato per creare uno stream Readable
che utilizza il vecchio stream come sorgente dati.
Sarà raramente necessario utilizzare readable.wrap()
, ma il metodo è stato fornito come una comodità per interagire con le applicazioni e librerie Node.js più vecchie.
const { OldReader } = require('./old-api-module.js');
const { Readable } = require('node:stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);
myReader.on('readable', () => {
myReader.read(); // ecc.
});
readable[Symbol.asyncIterator]()
[Cronologia]
Versione | Modifiche |
---|---|
v11.14.0 | Il supporto a Symbol.asyncIterator non è più sperimentale. |
v10.0.0 | Aggiunto in: v10.0.0 |
- Restituisce: <AsyncIterator> per consumare completamente lo stream.
const fs = require('node:fs');
async function print(readable) {
readable.setEncoding('utf8');
let data = '';
for await (const chunk of readable) {
data += chunk;
}
console.log(data);
}
print(fs.createReadStream('file')).catch(console.error);
Se il ciclo termina con un break
, return
, o un throw
, lo stream verrà distrutto. In altri termini, l'iterazione su uno stream consumerà lo stream completamente. Lo stream verrà letto in chunk di dimensioni pari all'opzione highWaterMark
. Nell'esempio di codice sopra, i dati saranno in un singolo chunk se il file ha meno di 64 KiB di dati perché non viene fornita alcuna opzione highWaterMark
a fs.createReadStream()
.
readable[Symbol.asyncDispose]()
Aggiunto in: v20.4.0, v18.18.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
Chiama readable.destroy()
con un AbortError
e restituisce una promise che si adempie quando lo stream è terminato.
readable.compose(stream[, options])
Aggiunto in: v19.1.0, v18.13.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
stream
<Stream> | <Iterable> | <AsyncIterable> | <Function>options
<Object>signal
<AbortSignal> consente di distruggere lo stream se il segnale viene interrotto.
Restituisce: <Duplex> uno stream composto con lo stream
stream
.
import { Readable } from 'node:stream';
async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ');
for (const word of words) {
yield word;
}
}
}
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
const words = await wordsStream.toArray();
console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator']
Vedere stream.compose
per maggiori informazioni.
readable.iterator([options])
Aggiunto in: v16.3.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
options
<Object>destroyOnReturn
<boolean> Quando impostato sufalse
, chiamarereturn
sull'iteratore asincrono, o uscire da un'iterazionefor await...of
usando unbreak
,return
, othrow
non distruggerà lo stream. Predefinito:true
.
Restituisce: <AsyncIterator> per consumare lo stream.
L'iteratore creato da questo metodo offre agli utenti la possibilità di annullare la distruzione dello stream se il ciclo for await...of
viene chiuso da return
, break
o throw
, oppure se l'iteratore deve distruggere lo stream se lo stream ha emesso un errore durante l'iterazione.
const { Readable } = require('node:stream');
async function printIterator(readable) {
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk); // 1
break;
}
console.log(readable.destroyed); // false
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk); // Will print 2 and then 3
}
console.log(readable.destroyed); // True, stream was totally consumed
}
async function printSymbolAsyncIterator(readable) {
for await (const chunk of readable) {
console.log(chunk); // 1
break;
}
console.log(readable.destroyed); // true
}
async function showBoth() {
await printIterator(Readable.from([1, 2, 3]));
await printSymbolAsyncIterator(Readable.from([1, 2, 3]));
}
showBoth();
readable.map(fn[, options])
[Cronologia]
Versione | Modifiche |
---|---|
v20.7.0, v18.19.0 | aggiunto highWaterMark nelle opzioni. |
v17.4.0, v16.14.0 | Aggiunto in: v17.4.0, v16.14.0 |
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
fn
<Function> | <AsyncFunction> una funzione per mappare ogni chunk nello stream.data
<any> un chunk di dati dallo stream.options
<Object>signal
<AbortSignal> interrotto se lo stream viene distrutto permettendo di interrompere anticipatamente la chiamatafn
.
options
<Object>concurrency
<number> il numero massimo di invocazioni concorrenti difn
da chiamare sullo stream contemporaneamente. Predefinito:1
.highWaterMark
<number> quanti elementi memorizzare nel buffer durante l'attesa del consumo da parte dell'utente degli elementi mappati. Predefinito:concurrency * 2 - 1
.signal
<AbortSignal> consente di distruggere lo stream se il segnale viene interrotto.
Restituisce: <Readable> uno stream mappato con la funzione
fn
.
Questo metodo consente di mappare lo stream. La funzione fn
verrà chiamata per ogni chunk nello stream. Se la funzione fn
restituisce una promise, tale promise verrà await
ed prima di essere passata allo stream di risultati.
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// Con un mapper sincrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
console.log(chunk); // 2, 4, 6, 8
}
// Con un mapper asincrono, eseguendo al massimo 2 query alla volta.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map((domain) => resolver.resolve4(domain), { concurrency: 2 });
for await (const result of dnsResults) {
console.log(result); // Registra il risultato DNS di resolver.resolve4.
}
readable.filter(fn[, options])
[Storia]
Versione | Modifiche |
---|---|
v20.7.0, v18.19.0 | aggiunto highWaterMark nelle opzioni. |
v17.4.0, v16.14.0 | Aggiunto in: v17.4.0, v16.14.0 |
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
fn
<Function> | <AsyncFunction> una funzione per filtrare i chunk dallo stream.data
<any> un chunk di dati dallo stream.options
<Object>signal
<AbortSignal> interrotto se lo stream viene distrutto consentendo di interrompere anticipatamente la chiamatafn
.
options
<Object>concurrency
<number> il numero massimo di invocazioni concorrenti difn
da chiamare sullo stream contemporaneamente. Predefinito:1
.highWaterMark
<number> quanti elementi bufferizzare durante l'attesa del consumo da parte dell'utente degli elementi filtrati. Predefinito:concurrency * 2 - 1
.signal
<AbortSignal> consente di distruggere lo stream se il segnale viene interrotto.
Restituisce: <Readable> uno stream filtrato con il predicato
fn
.
Questo metodo consente di filtrare lo stream. Per ogni chunk nello stream, verrà chiamata la funzione fn
e, se restituisce un valore truthy, il chunk verrà passato allo stream di risultati. Se la funzione fn
restituisce una promise, quella promise verrà await
ed.
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// Con un predicato sincrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(chunk); // 3, 4
}
// Con un predicato asincrono, che esegue al massimo 2 query alla volta.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).filter(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address.ttl > 60;
}, { concurrency: 2 });
for await (const result of dnsResults) {
// Registra i domini con più di 60 secondi sul record DNS risolto.
console.log(result);
}
readable.forEach(fn[, options])
Aggiunto in: v17.5.0, v16.15.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
fn
<Function> | <AsyncFunction> una funzione da chiamare su ogni blocco dello stream.data
<any> un blocco di dati dallo stream.options
<Object>signal
<AbortSignal> interrotto se lo stream viene distrutto consentendo di interrompere anticipatamente la chiamatafn
.
options
<Object>concurrency
<number> il numero massimo di invocazioni concorrenti difn
da chiamare sullo stream contemporaneamente. Predefinito:1
.signal
<AbortSignal> consente di distruggere lo stream se il segnale viene interrotto.
Restituisce: <Promise> una promise per quando lo stream ha terminato.
Questo metodo consente di iterare uno stream. Per ogni blocco nello stream verrà chiamata la funzione fn
. Se la funzione fn
restituisce una promise, quella promise verrà await
ed.
Questo metodo è diverso dai cicli for await...of
in quanto può facoltativamente elaborare i blocchi contemporaneamente. Inoltre, un'iterazione forEach
può essere interrotta solo avendo passato un'opzione signal
e interrompendo il relativo AbortController
mentre for await...of
può essere interrotto con break
o return
. In entrambi i casi lo stream verrà distrutto.
Questo metodo è diverso dall'ascolto dell'evento 'data'
in quanto utilizza l'evento readable
nel meccanismo sottostante e può limitare il numero di chiamate fn
concorrenti.
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
// Con un predicato sincrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
console.log(chunk); // 3, 4
}
// Con un predicato asincrono, effettuando al massimo 2 query alla volta.
const resolver = new Resolver();
const dnsResults = Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 });
await dnsResults.forEach((result) => {
// Registra il risultato, simile a `for await (const result of dnsResults)`
console.log(result);
});
console.log('done'); // Lo stream ha terminato
readable.toArray([options])
Aggiunto in: v17.5.0, v16.15.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
options
<Object>signal
<AbortSignal> permette di annullare l'operazione toArray se il segnale viene interrotto.
Restituisce: <Promise> una promise contenente un array con il contenuto dello stream.
Questo metodo permette di ottenere facilmente il contenuto di uno stream.
Poiché questo metodo legge l'intero stream in memoria, nega i vantaggi degli stream. È inteso per l'interoperabilità e la comodità, non come il modo principale per consumare gli stream.
import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';
await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]
// Esegui query DNS in parallelo usando .map e raccogli
// i risultati in un array usando toArray
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
'www.linuxfoundation.org',
]).map(async (domain) => {
const { address } = await resolver.resolve4(domain, { ttl: true });
return address;
}, { concurrency: 2 }).toArray();
readable.some(fn[, options])
Aggiunto in: v17.5.0, v16.15.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
fn
<Function> | <AsyncFunction> una funzione da chiamare su ogni chunk dello stream.data
<any> un chunk di dati dallo stream.options
<Object>signal
<AbortSignal> interrotto se lo stream viene distrutto permettendo di interrompere la chiamata afn
in anticipo.
options
<Object>concurrency
<number> il numero massimo di invocazioni concorrenti difn
da chiamare sullo stream contemporaneamente. Predefinito:1
.signal
<AbortSignal> permette di distruggere lo stream se il segnale viene interrotto.
Restituisce: <Promise> una promise che valuta a
true
sefn
ha restituito un valore truthy per almeno uno dei chunk.
Questo metodo è simile a Array.prototype.some
e chiama fn
su ogni chunk nello stream finché il valore di ritorno atteso è true
(o qualsiasi valore truthy). Una volta che una chiamata fn
su un chunk con valore di ritorno atteso è truthy, lo stream viene distrutto e la promise viene soddisfatta con true
. Se nessuna delle chiamate fn
sui chunk restituisce un valore truthy, la promise viene soddisfatta con false
.
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// Con un predicato sincrono.
await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true
await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false
// Con un predicato asincrono, effettuando al massimo 2 controlli di file alla volta.
const anyBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).some(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(anyBigFile); // `true` se un qualsiasi file nella lista è più grande di 1MB
console.log('done'); // Lo Stream è terminato
readable.find(fn[, options])
Aggiunto in: v17.5.0, v16.17.0
[Stable: 1 - Sperimentale]
Stable: 1 Stabilità: 1 - Sperimentale
fn
<Function> | <AsyncFunction> una funzione da chiamare su ogni chunk dello stream.data
<any> un chunk di dati dallo stream.options
<Object>signal
<AbortSignal> interrotto se lo stream viene distrutto, consentendo di interrompere anticipatamente la chiamatafn
.
options
<Object>concurrency
<number> il numero massimo di invocazioni concorrenti difn
da chiamare contemporaneamente sullo stream. Predefinito:1
.signal
<AbortSignal> consente di distruggere lo stream se il segnale viene interrotto.
Restituisce: <Promise> una promise che valuta il primo chunk per il quale
fn
ha valutato con un valore truthy, oundefined
se non è stato trovato alcun elemento.
Questo metodo è simile a Array.prototype.find
e chiama fn
su ogni chunk nello stream per trovare un chunk con un valore truthy per fn
. Una volta che il valore di ritorno atteso di una chiamata fn
è truthy, lo stream viene distrutto e la promise viene soddisfatta con il valore per il quale fn
ha restituito un valore truthy. Se tutte le chiamate fn
sui chunk restituiscono un valore falsy, la promise viene soddisfatta con undefined
.
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// Con un predicato sincrono.
await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3
await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1
await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined
// Con un predicato asincrono, eseguendo al massimo 2 controlli di file alla volta.
const foundBigFile = await Readable.from([
'file1',
'file2',
'file3',
]).find(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(foundBigFile); // Nome del file grande, se un qualsiasi file nella lista è più grande di 1MB
console.log('done'); // Lo stream è terminato
readable.every(fn[, options])
Aggiunto in: v17.5.0, v16.15.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
fn
<Function> | <AsyncFunction> una funzione da chiamare su ogni chunk dello stream.data
<any> un chunk di dati dallo stream.options
<Object>signal
<AbortSignal> interrotto se lo stream viene distrutto consentendo di interrompere anticipatamente la chiamata afn
.
options
<Object>concurrency
<number> il numero massimo di invocazioni simultanee difn
da chiamare sullo stream contemporaneamente. Predefinito:1
.signal
<AbortSignal> consente di distruggere lo stream se il segnale viene interrotto.
Restituisce: <Promise> una promise che valuta a
true
sefn
ha restituito un valore truthy per tutti i chunk.
Questo metodo è simile a Array.prototype.every
e chiama fn
su ogni chunk nello stream per verificare se tutti i valori di ritorno attesi sono valori truthy per fn
. Una volta che una chiamata a fn
su un valore di ritorno atteso di un chunk è falsy, lo stream viene distrutto e la promise viene soddisfatta con false
. Se tutte le chiamate fn
sui chunk restituiscono un valore truthy, la promise viene soddisfatta con true
.
import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';
// Con un predicato sincrono.
await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false
await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true
// Con un predicato asincrono, effettuando al massimo 2 controlli di file alla volta.
const allBigFiles = await Readable.from([
'file1',
'file2',
'file3',
]).every(async (fileName) => {
const stats = await stat(fileName);
return stats.size > 1024 * 1024;
}, { concurrency: 2 });
// `true` se tutti i file nell'elenco sono più grandi di 1MiB
console.log(allBigFiles);
console.log('fatto'); // Lo stream è terminato
readable.flatMap(fn[, options])
Aggiunto in: v17.5.0, v16.15.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
fn
<Function> | <AsyncGeneratorFunction> | <AsyncFunction> una funzione da mappare su ogni chunk nello stream.data
<any> un chunk di dati dallo stream.options
<Object>signal
<AbortSignal> interrotto se lo stream viene distrutto, consentendo di interrompere anticipatamente la chiamata afn
.
options
<Object>concurrency
<number> il numero massimo di invocazioni concorrenti difn
da chiamare sullo stream contemporaneamente. Predefinito:1
.signal
<AbortSignal> consente di distruggere lo stream se il segnale viene interrotto.
Restituisce: <Readable> uno stream flat-mapped con la funzione
fn
.
Questo metodo restituisce un nuovo stream applicando la callback fornita a ciascun chunk dello stream e quindi appiattendo il risultato.
È possibile restituire uno stream o un altro iterabile o iterabile asincrono da fn
e gli stream di risultati verranno uniti (appiattiti) nello stream restituito.
import { Readable } from 'node:stream';
import { createReadStream } from 'node:fs';
// Con un mapper sincrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4
}
// Con un mapper asincrono, combina il contenuto di 4 file
const concatResult = Readable.from([
'./1.mjs',
'./2.mjs',
'./3.mjs',
'./4.mjs',
]).flatMap((fileName) => createReadStream(fileName));
for await (const result of concatResult) {
// Questo conterrà il contenuto (tutti i chunk) di tutti e 4 i file
console.log(result);
}
readable.drop(limit[, options])
Aggiunto in: v17.5.0, v16.15.0
[Stable: 1 - Experimental]
Stable: 1 Stabilità: 1 - Sperimentale
limit
<number> il numero di chunk da eliminare dal readable.options
<Object>signal
<AbortSignal> consente di distruggere lo stream se il segnale viene interrotto.
Restituisce: <Readable> uno stream con
limit
chunk eliminati.
Questo metodo restituisce un nuovo stream con i primi limit
chunk eliminati.
import { Readable } from 'node:stream';
await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4]
readable.take(limit[, options])
Aggiunto in: v17.5.0, v16.15.0
[Stable: 1 - Experimental]
Stable: 1 Stabilità: 1 - Sperimentale
limit
<number> il numero di chunk da prendere dal readable.options
<Object>signal
<AbortSignal> consente di distruggere lo stream se il segnale viene interrotto.
Restituisce: <Readable> uno stream con
limit
chunk presi.
Questo metodo restituisce un nuovo stream con i primi limit
chunk.
import { Readable } from 'node:stream';
await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
readable.reduce(fn[, initial[, options]])
Aggiunto in: v17.5.0, v16.15.0
[Stable: 1 - Experimental]
Stable: 1 Stabilità: 1 - Sperimentale
fn
<Function> | <AsyncFunction> una funzione riduttore da chiamare su ogni chunk nello stream.previous
<any> il valore ottenuto dall'ultima chiamata afn
o il valoreinitial
se specificato o il primo chunk dello stream altrimenti.data
<any> un chunk di dati dallo stream.options
<Object>signal
<AbortSignal> interrotto se lo stream viene distrutto, consentendo di interrompere la chiamatafn
in anticipo.
initial
<any> il valore iniziale da utilizzare nella riduzione.options
<Object>signal
<AbortSignal> consente di distruggere lo stream se il segnale viene interrotto.
Restituisce: <Promise> una promise per il valore finale della riduzione.
Questo metodo chiama fn
su ogni chunk dello stream in ordine, passandogli il risultato del calcolo sull'elemento precedente. Restituisce una promise per il valore finale della riduzione.
Se non viene fornito alcun valore initial
, il primo chunk dello stream viene utilizzato come valore iniziale. Se lo stream è vuoto, la promise viene rifiutata con un TypeError
con la proprietà del codice ERR_INVALID_ARGS
.
import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';
const directoryPath = './src';
const filesInDir = await readdir(directoryPath);
const folderSize = await Readable.from(filesInDir)
.reduce(async (totalSize, file) => {
const { size } = await stat(join(directoryPath, file));
return totalSize + size;
}, 0);
console.log(folderSize);
La funzione riduttore itera lo stream elemento per elemento, il che significa che non esiste alcun parametro concurrency
o parallelismo. Per eseguire una reduce
contemporaneamente, puoi estrarre la funzione async nel metodo readable.map
.
import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';
const directoryPath = './src';
const filesInDir = await readdir(directoryPath);
const folderSize = await Readable.from(filesInDir)
.map((file) => stat(join(directoryPath, file)), { concurrency: 2 })
.reduce((totalSize, { size }) => totalSize + size, 0);
console.log(folderSize);
Flussi duplex e di trasformazione
Classe: stream.Duplex
[Cronologia]
Versione | Modifiche |
---|---|
v6.8.0 | Le istanze di Duplex ora restituiscono true quando si controlla instanceof stream.Writable . |
v0.9.4 | Aggiunto in: v0.9.4 |
I flussi Duplex sono flussi che implementano sia le interfacce Readable
che Writable
.
Esempi di flussi Duplex
includono:
duplex.allowHalfOpen
Aggiunto in: v0.9.4
Se false
, il flusso terminerà automaticamente il lato scrivibile quando termina il lato leggibile. Impostato inizialmente dall'opzione del costruttore allowHalfOpen
, che ha come valore predefinito true
.
Questo può essere modificato manualmente per cambiare il comportamento semiaperto di un'istanza di flusso Duplex
esistente, ma deve essere modificato prima che venga emesso l'evento 'end'
.
Classe: stream.Transform
Aggiunto in: v0.9.4
I flussi di trasformazione sono flussi Duplex
in cui l'output è in qualche modo correlato all'input. Come tutti i flussi Duplex
, i flussi Transform
implementano sia le interfacce Readable
che Writable
.
Esempi di flussi Transform
includono:
transform.destroy([error])
[Cronologia]
Versione | Modifiche |
---|---|
v14.0.0 | Funziona come no-op su un flusso che è già stato distrutto. |
v8.0.0 | Aggiunto in: v8.0.0 |
Distrugge il flusso e, facoltativamente, emette un evento 'error'
. Dopo questa chiamata, il flusso di trasformazione rilascerà tutte le risorse interne. Gli implementatori non devono sovrascrivere questo metodo, ma implementare invece readable._destroy()
. L'implementazione predefinita di _destroy()
per Transform
emette anche 'close'
a meno che emitClose
non sia impostato su false.
Una volta che destroy()
è stato chiamato, qualsiasi ulteriore chiamata sarà un no-op e nessun ulteriore errore tranne da _destroy()
può essere emesso come 'error'
.
stream.duplexPair([options])
Aggiunto in: v22.6.0, v20.17.0
options
<Object> Un valore da passare a entrambi i costruttoriDuplex
, per impostare opzioni come il buffering.- Restituisce: <Array> di due istanze
Duplex
.
La funzione di utilità duplexPair
restituisce un Array con due elementi, ognuno dei quali è uno stream Duplex
connesso all'altro lato:
const [ sideA, sideB ] = duplexPair();
Qualsiasi cosa venga scritta su uno stream diventa leggibile sull'altro. Fornisce un comportamento analogo a una connessione di rete, in cui i dati scritti dal client diventano leggibili dal server e viceversa.
Gli stream Duplex sono simmetrici; uno o l'altro può essere utilizzato senza alcuna differenza di comportamento.
stream.finished(stream[, options], callback)
[Cronologia]
Versione | Modifiche |
---|---|
v19.5.0 | Aggiunto il supporto per ReadableStream e WritableStream . |
v15.11.0 | È stata aggiunta l'opzione signal . |
v14.0.0 | finished(stream, cb) aspetterà l'evento 'close' prima di invocare il callback. L'implementazione cerca di rilevare gli stream legacy e applica questo comportamento solo agli stream che si prevede emettano 'close' . |
v14.0.0 | L'emissione di 'close' prima di 'end' su uno stream Readable causerà un errore ERR_STREAM_PREMATURE_CLOSE . |
v14.0.0 | Il callback verrà invocato su stream che sono già terminati prima della chiamata a finished(stream, cb) . |
v10.0.0 | Aggiunto in: v10.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> Uno stream/webstream leggibile e/o scrivibile.options
<Object>error
<boolean> Se impostato sufalse
, una chiamata aemit('error', err)
non viene trattata come terminata. Predefinito:true
.readable
<boolean> Quando impostato sufalse
, il callback verrà chiamato quando lo stream termina anche se lo stream potrebbe essere ancora leggibile. Predefinito:true
.writable
<boolean> Quando impostato sufalse
, il callback verrà chiamato quando lo stream termina anche se lo stream potrebbe essere ancora scrivibile. Predefinito:true
.signal
<AbortSignal> consente di interrompere l'attesa per la fine dello stream. Lo stream sottostante non verrà interrotto se il segnale viene interrotto. Il callback verrà chiamato con unAbortError
. Verranno rimossi anche tutti i listener registrati aggiunti da questa funzione.
callback
<Function> Una funzione di callback che accetta un argomento di errore opzionale.Restituisce: <Function> Una funzione di pulizia che rimuove tutti i listener registrati.
Una funzione per essere notificati quando uno stream non è più leggibile, scrivibile o ha subito un errore o un evento di chiusura prematura.
const { finished } = require('node:stream');
const fs = require('node:fs');
const rs = fs.createReadStream('archive.tar');
finished(rs, (err) => {
if (err) {
console.error('Stream failed.', err);
} else {
console.log('Stream is done reading.');
}
});
rs.resume(); // Scarica lo stream.
Particolarmente utile negli scenari di gestione degli errori in cui uno stream viene distrutto prematuramente (come una richiesta HTTP interrotta) e non emetterà 'end'
o 'finish'
.
L'API finished
fornisce la versione promise.
stream.finished()
lascia listener di eventi pendenti (in particolare 'error'
, 'end'
, 'finish'
e 'close'
) dopo che callback
è stato invocato. La ragione di ciò è che eventi 'error'
inattesi (a causa di implementazioni di stream errate) non causino arresti anomali imprevisti. Se questo è un comportamento indesiderato, allora la funzione di pulizia restituita deve essere invocata nel callback:
const cleanup = finished(rs, (err) => {
cleanup();
// ...
});
stream.pipeline(source[, ...transforms], destination, callback)
stream.pipeline(streams, callback)
[Cronologia]
Versione | Modifiche |
---|---|
v19.7.0, v18.16.0 | Aggiunto il supporto per i webstream. |
v18.0.0 | Passare una callback non valida all'argomento callback ora genera ERR_INVALID_ARG_TYPE invece di ERR_INVALID_CALLBACK . |
v14.0.0 | pipeline(..., cb) attenderà l'evento 'close' prima di invocare la callback. L'implementazione cerca di rilevare gli stream legacy e applica questo comportamento solo agli stream che dovrebbero emettere 'close' . |
v13.10.0 | Aggiunto il supporto per i generatori async. |
v10.0.0 | Aggiunto in: v10.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function> | <ReadableStream>- Restituisce: <Iterable> | <AsyncIterable>
...transforms
<Stream> | <Function> | <TransformStream>source
<AsyncIterable>- Restituisce: <AsyncIterable>
destination
<Stream> | <Function> | <WritableStream>source
<AsyncIterable>- Restituisce: <AsyncIterable> | <Promise>
callback
<Function> Chiamata quando la pipeline è completamente terminata.err
<Error>val
Valore risolto diPromise
restituito dadestination
.
Restituisce: <Stream>
Un metodo del modulo per collegare tramite pipe tra stream e generatori inoltrando gli errori ed eseguendo una pulizia corretta e fornendo una callback quando la pipeline è completa.
const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
// Usa l'API pipeline per collegare facilmente una serie di stream
// insieme e ricevere una notifica quando la pipeline è completamente terminata.
// Una pipeline per comprimere in gzip un file tar potenzialmente enorme in modo efficiente:
pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
(err) => {
if (err) {
console.error('Pipeline fallita.', err);
} else {
console.log('Pipeline riuscita.');
}
},
);
L'API pipeline
fornisce una versione promise.
stream.pipeline()
chiamerà stream.destroy(err)
su tutti gli stream eccetto:
- Stream
Readable
che hanno emesso'end'
o'close'
. - Stream
Writable
che hanno emesso'finish'
o'close'
.
stream.pipeline()
lascia event listener pendenti sugli stream dopo che la callback
è stata invocata. In caso di riutilizzo degli stream dopo un errore, ciò può causare perdite di event listener ed errori nascosti. Se l'ultimo stream è leggibile, gli event listener pendenti verranno rimossi in modo che l'ultimo stream possa essere consumato in seguito.
stream.pipeline()
chiude tutti gli stream quando viene generato un errore. L'utilizzo di IncomingRequest
con pipeline
potrebbe portare a un comportamento imprevisto poiché distruggerebbe il socket senza inviare la risposta prevista. Vedi l'esempio qui sotto:
const fs = require('node:fs');
const http = require('node:http');
const { pipeline } = require('node:stream');
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt');
pipeline(fileStream, res, (err) => {
if (err) {
console.log(err); // Nessun file di questo tipo
// questo messaggio non può essere inviato una volta che `pipeline` ha già distrutto il socket
return res.end('errore!!!');
}
});
});
stream.compose(...streams)
[Cronologia]
Versione | Modifiche |
---|---|
v21.1.0, v20.10.0 | Aggiunto il supporto per la classe stream. |
v19.8.0, v18.16.0 | Aggiunto il supporto per i webstream. |
v16.9.0 | Aggiunto in: v16.9.0 |
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - stream.compose
è sperimentale.
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]> | <Duplex[]> | <Function>- Restituisce: <stream.Duplex>
Combina due o più stream in uno stream Duplex
che scrive nel primo stream e legge dall'ultimo. Ogni stream fornito viene collegato al successivo, utilizzando stream.pipeline
. Se uno qualsiasi degli stream genera un errore, tutti vengono distrutti, incluso lo stream Duplex
esterno.
Poiché stream.compose
restituisce un nuovo stream che a sua volta può (e deve) essere collegato ad altri stream, abilita la composizione. Al contrario, quando si passano stream a stream.pipeline
, in genere il primo stream è uno stream leggibile e l'ultimo è uno stream scrivibile, formando un circuito chiuso.
Se viene passata una Function
, deve essere un metodo factory che accetta un Iterable
source
.
import { compose, Transform } from 'node:stream';
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''));
},
});
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}
let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf;
}
console.log(res); // stampa 'HELLOWORLD'
stream.compose
può essere utilizzato per convertire iterabili asincroni, generatori e funzioni in stream.
AsyncIterable
si converte in unDuplex
leggibile. Non può restituirenull
.AsyncGeneratorFunction
si converte in unDuplex
transform leggibile/scrivibile. Deve accettare unAsyncIterable
source come primo parametro. Non può restituirenull
.AsyncFunction
si converte in unDuplex
scrivibile. Deve restituirenull
oundefined
.
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';
// Converte AsyncIterable in Duplex leggibile.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());
// Converte AsyncGenerator in Duplex transform.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
let res = '';
// Converte AsyncFunction in Duplex scrivibile.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});
await finished(compose(s1, s2, s3));
console.log(res); // stampa 'HELLOWORLD'
Vedi readable.compose(stream)
per stream.compose
come operatore.
stream.Readable.from(iterable[, options])
Aggiunto in: v12.3.0, v10.17.0
iterable
<Iterable> Oggetto che implementa il protocollo iterabileSymbol.asyncIterator
oSymbol.iterator
. Emette un evento 'error' se viene passato un valore null.options
<Object> Opzioni fornite anew stream.Readable([options])
. Per impostazione predefinita,Readable.from()
imposteràoptions.objectMode
sutrue
, a meno che non si escluda esplicitamente impostandooptions.objectMode
sufalse
.- Restituisce: <stream.Readable>
Un metodo di utilità per creare stream leggibili dagli iteratori.
const { Readable } = require('node:stream');
async function * generate() {
yield 'hello';
yield 'streams';
}
const readable = Readable.from(generate());
readable.on('data', (chunk) => {
console.log(chunk);
});
Chiamare Readable.from(string)
o Readable.from(buffer)
non farà iterare le stringhe o i buffer per corrispondere alla semantica degli altri stream per motivi di prestazioni.
Se un oggetto Iterable
contenente promise viene passato come argomento, potrebbe comportare un rifiuto non gestito.
const { Readable } = require('node:stream');
Readable.from([
new Promise((resolve) => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Rifiuto non gestito
]);
stream.Readable.fromWeb(readableStream[, options])
Aggiunto in: v17.0.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
readableStream
<ReadableStream>options
<Object>encoding
<string>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
Restituisce: <stream.Readable>
stream.Readable.isDisturbed(stream)
Aggiunto in: v16.8.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
stream
<stream.Readable> | <ReadableStream>- Restituisce:
boolean
Restituisce se lo stream è stato letto o annullato.
stream.isErrored(stream)
Aggiunto in: v17.3.0, v16.14.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
stream
<Readable> | <Writable> | <Duplex> | <WritableStream> | <ReadableStream>- Restituisce: <boolean>
Restituisce se lo stream ha riscontrato un errore.
stream.isReadable(stream)
Aggiunto in: v17.4.0, v16.14.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
stream
<Readable> | <Duplex> | <ReadableStream>- Restituisce: <boolean>
Restituisce se lo stream è leggibile.
stream.Readable.toWeb(streamReadable[, options])
Aggiunto in: v17.0.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
streamReadable
<stream.Readable>options
<Object>strategy
<Object>highWaterMark
<number> La dimensione massima della coda interna (dellaReadableStream
creata) prima che la contropressione venga applicata durante la lettura dallastream.Readable
data. Se non viene fornito alcun valore, verrà preso dallastream.Readable
data.size
<Function> Una funzione che determina la dimensione del blocco di dati dato. Se non viene fornito alcun valore, la dimensione sarà1
per tutti i blocchi.chunk
<any>- Restituisce: <number>
Restituisce: <ReadableStream>
stream.Writable.fromWeb(writableStream[, options])
Aggiunto in: v17.0.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
writableStream
<WritableStream>options
<Object>decodeStrings
<boolean>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
Restituisce: <stream.Writable>
stream.Writable.toWeb(streamWritable)
Aggiunto in: v17.0.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
streamWritable
<stream.Writable>- Restituisce: <WritableStream>
stream.Duplex.from(src)
[Cronologia]
Versione | Modifiche |
---|---|
v19.5.0, v18.17.0 | L'argomento src può ora essere un ReadableStream o WritableStream . |
v16.8.0 | Aggiunto in: v16.8.0 |
src
<Stream> | <Blob> | <ArrayBuffer> | <string> | <Iterable> | <AsyncIterable> | <AsyncGeneratorFunction> | <AsyncFunction> | <Promise> | <Object> | <ReadableStream> | <WritableStream>
Un metodo di utilità per la creazione di stream duplex.
Stream
converte lo stream scrivibile inDuplex
scrivibile e lo stream leggibile inDuplex
.Blob
converte inDuplex
leggibile.string
converte inDuplex
leggibile.ArrayBuffer
converte inDuplex
leggibile.AsyncIterable
converte in unDuplex
leggibile. Non può restituirenull
.AsyncGeneratorFunction
converte in unDuplex
di trasformazione leggibile/scrivibile. Deve prendere unAsyncIterable
sorgente come primo parametro. Non può restituirenull
.AsyncFunction
converte in unDuplex
scrivibile. Deve restituirenull
oundefined
Object ({ writable, readable })
convertereadable
ewritable
inStream
e quindi li combina inDuplex
dove ilDuplex
scriverà sulwritable
e leggerà dalreadable
.Promise
converte inDuplex
leggibile. Il valorenull
viene ignorato.ReadableStream
converte inDuplex
leggibile.WritableStream
converte inDuplex
scrivibile.- Restituisce: <stream.Duplex>
Se un oggetto Iterable
contenente promesse viene passato come argomento, potrebbe comportare un rifiuto non gestito.
const { Duplex } = require('node:stream');
Duplex.from([
new Promise((resolve) => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Rifiuto non gestito
]);
stream.Duplex.fromWeb(pair[, options])
Aggiunto in: v17.0.0
[Stable: 1 - Sperimentale]
Stable: 1 Stabilità: 1 - Sperimentale
pair
<Object>readable
<ReadableStream>writable
<WritableStream>
options
<Object>Restituisce: <stream.Duplex>
import { Duplex } from 'node:stream';
import {
ReadableStream,
WritableStream,
} from 'node:stream/web';
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world');
},
});
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk);
},
});
const pair = {
readable,
writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
duplex.write('hello');
for await (const chunk of duplex) {
console.log('readable', chunk);
}
const { Duplex } = require('node:stream');
const {
ReadableStream,
WritableStream,
} = require('node:stream/web');
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world');
},
});
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk);
},
});
const pair = {
readable,
writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
duplex.write('hello');
duplex.once('readable', () => console.log('readable', duplex.read()));
stream.Duplex.toWeb(streamDuplex)
Aggiunto in: v17.0.0
[Stabile: 1 - Sperimentale]
Stabile: 1 Stabilità: 1 - Sperimentale
streamDuplex
<stream.Duplex>- Restituisce: <Object>
readable
<ReadableStream>writable
<WritableStream>
import { Duplex } from 'node:stream';
const duplex = Duplex({
objectMode: true,
read() {
this.push('world');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('writable', chunk);
callback();
},
});
const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');
const { value } = await readable.getReader().read();
console.log('readable', value);
const { Duplex } = require('node:stream');
const duplex = Duplex({
objectMode: true,
read() {
this.push('world');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('writable', chunk);
callback();
},
});
const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');
readable.getReader().read().then((result) => {
console.log('readable', result.value);
});
stream.addAbortSignal(signal, stream)
[Cronologia]
Versione | Modifiche |
---|---|
v19.7.0, v18.16.0 | Aggiunto il supporto per ReadableStream e WritableStream . |
v15.4.0 | Aggiunto in: v15.4.0 |
signal
<AbortSignal> Un segnale che rappresenta una possibile cancellazionestream
<Stream> | <ReadableStream> | <WritableStream> Uno stream a cui collegare un segnale.
Allega un AbortSignal a un flusso leggibile o scrivibile. Ciò consente al codice di controllare la distruzione del flusso utilizzando un AbortController
.
La chiamata a abort
sull'AbortController
corrispondente all'AbortSignal
passato si comporterà allo stesso modo della chiamata a .destroy(new AbortError())
sullo stream e controller.error(new AbortError())
per i webstream.
const fs = require('node:fs');
const controller = new AbortController();
const read = addAbortSignal(
controller.signal,
fs.createReadStream(('object.json')),
);
// Successivamente, interrompi l'operazione chiudendo lo stream
controller.abort();
Oppure usa un AbortSignal
con un flusso leggibile come iterabile asincrono:
const controller = new AbortController();
setTimeout(() => controller.abort(), 10_000); // imposta un timeout
const stream = addAbortSignal(
controller.signal,
fs.createReadStream(('object.json')),
);
(async () => {
try {
for await (const chunk of stream) {
await process(chunk);
}
} catch (e) {
if (e.name === 'AbortError') {
// L'operazione è stata annullata
} else {
throw e;
}
}
})();
Oppure usa un AbortSignal
con un ReadableStream:
const controller = new AbortController();
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello');
controller.enqueue('world');
controller.close();
},
});
addAbortSignal(controller.signal, rs);
finished(rs, (err) => {
if (err) {
if (err.name === 'AbortError') {
// L'operazione è stata annullata
}
}
});
const reader = rs.getReader();
reader.read().then(({ value, done }) => {
console.log(value); // hello
console.log(done); // false
controller.abort();
});
stream.getDefaultHighWaterMark(objectMode)
Aggiunto in: v19.9.0, v18.17.0
Restituisce l'highWaterMark predefinito utilizzato dagli stream. Il valore predefinito è 65536
(64 KiB) o 16
per objectMode
.
stream.setDefaultHighWaterMark(objectMode, value)
Aggiunto in: v19.9.0, v18.17.0
Imposta l'highWaterMark predefinito utilizzato dagli stream.
API per gli implementatori di stream
L'API del modulo node:stream
è stata progettata per rendere possibile l'implementazione semplice degli stream utilizzando il modello di ereditarietà prototipale di JavaScript.
Innanzitutto, uno sviluppatore di stream dichiarerebbe una nuova classe JavaScript che estende una delle quattro classi di stream di base (stream.Writable
, stream.Readable
, stream.Duplex
o stream.Transform
), assicurandosi di chiamare il costruttore della classe padre appropriato:
const { Writable } = require('node:stream');
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark });
// ...
}
}
Quando si estendono gli stream, tieni presente quali opzioni l'utente può e deve fornire prima di inoltrarle al costruttore di base. Ad esempio, se l'implementazione fa supposizioni in merito alle opzioni autoDestroy
e emitClose
, non consentire all'utente di sovrascriverle. Sii esplicito su quali opzioni vengono inoltrate invece di inoltrarle implicitamente tutte.
La nuova classe di stream deve quindi implementare uno o più metodi specifici, a seconda del tipo di stream che viene creato, come dettagliato nella tabella seguente:
Caso d'uso | Classe | Metodo/i da implementare |
---|---|---|
Sola lettura | Readable | _read() |
Sola scrittura | Writable | _write() , _writev() , _final() |
Lettura e scrittura | Duplex | _read() , _write() , _writev() , _final() |
Opera sui dati scritti, quindi legge il risultato | Transform | _transform() , _flush() , _final() |
Il codice di implementazione per uno stream non dovrebbe mai chiamare i metodi "pubblici" di uno stream destinati all'uso da parte dei consumatori (come descritto nella sezione API per i consumatori di stream). In tal caso, potrebbero verificarsi effetti collaterali negativi nel codice dell'applicazione che utilizza lo stream. |
Evita di sovrascrivere metodi pubblici come write()
, end()
, cork()
, uncork()
, read()
e destroy()
o di emettere eventi interni come 'error'
, 'data'
, 'end'
, 'finish'
e 'close'
tramite .emit()
. In tal caso, è possibile interrompere gli invarianti dello stream attuali e futuri, causando problemi di comportamento e/o compatibilità con altri stream, utilità di stream e aspettative dell'utente.
Costruzione semplificata
Aggiunto in: v1.2.0
Per molti casi semplici, è possibile creare uno stream senza fare affidamento sull'ereditarietà. Questo può essere realizzato creando direttamente istanze degli oggetti stream.Writable
, stream.Readable
, stream.Duplex
o stream.Transform
e passando metodi appropriati come opzioni del costruttore.
const { Writable } = require('node:stream');
const myWritable = new Writable({
construct(callback) {
// Inizializza lo stato e carica le risorse...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Libera le risorse...
},
});
Implementazione di uno stream scrivibile
La classe stream.Writable
viene estesa per implementare uno stream Writable
.
Gli stream Writable
personalizzati devono chiamare il costruttore new stream.Writable([options])
e implementare il metodo writable._write()
e/o writable._writev()
.
new stream.Writable([options])
[Cronologia]
Versione | Modifiche |
---|---|
v22.0.0 | aumenta il valore predefinito di highWaterMark. |
v15.5.0 | supporta il passaggio di un AbortSignal. |
v14.0.0 | Modifica l'opzione autoDestroy predefinita in true . |
v11.2.0, v10.16.0 | Aggiunta l'opzione autoDestroy per destroy() automaticamente lo stream quando emette 'finish' o errori. |
v10.0.0 | Aggiunta l'opzione emitClose per specificare se 'close' viene emesso durante la distruzione. |
options
<Object>highWaterMark
<number> Livello del buffer quandostream.write()
inizia a restituirefalse
. Predefinito:65536
(64 KiB), o16
per streamobjectMode
.decodeStrings
<boolean> Se codificare lestring
passate astream.write()
inBuffer
(con la codifica specificata nella chiamatastream.write()
) prima di passarle astream._write()
. Altri tipi di dati non vengono convertiti (ad esempio, iBuffer
non vengono decodificati instring
). Impostare su false impedirà la conversione dellestring
. Predefinito:true
.defaultEncoding
<string> La codifica predefinita che viene utilizzata quando non viene specificata alcuna codifica come argomento perstream.write()
. Predefinito:'utf8'
.objectMode
<boolean> Sestream.write(anyObj)
è un'operazione valida o meno. Quando impostato, diventa possibile scrivere valori JavaScript diversi da stringa, <Buffer>, <TypedArray> o <DataView> se supportato dall'implementazione dello stream. Predefinito:false
.emitClose
<boolean> Se lo stream deve emettere o meno'close'
dopo essere stato distrutto. Predefinito:true
.write
<Function> Implementazione per il metodostream._write()
.writev
<Function> Implementazione per il metodostream._writev()
.destroy
<Function> Implementazione per il metodostream._destroy()
.final
<Function> Implementazione per il metodostream._final()
.construct
<Function> Implementazione per il metodostream._construct()
.autoDestroy
<boolean> Se questo stream debba chiamare automaticamente.destroy()
su se stesso dopo la fine. Predefinito:true
.signal
<AbortSignal> Un segnale che rappresenta una possibile cancellazione.
const { Writable } = require('node:stream');
class MyWritable extends Writable {
constructor(options) {
// Chiama il costruttore stream.Writable().
super(options);
// ...
}
}
Oppure, quando si utilizzano costruttori in stile pre-ES6:
const { Writable } = require('node:stream');
const util = require('node:util');
function MyWritable(options) {
if (!(this instanceof MyWritable))
return new MyWritable(options);
Writable.call(this, options);
}
util.inherits(MyWritable, Writable);
Oppure, utilizzando l'approccio del costruttore semplificato:
const { Writable } = require('node:stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
});
Chiamare abort
sull'AbortController
corrispondente al AbortSignal
passato si comporterà allo stesso modo della chiamata .destroy(new AbortError())
sullo stream scrivibile.
const { Writable } = require('node:stream');
const controller = new AbortController();
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
});
// Successivamente, interrompi l'operazione chiudendo lo stream
controller.abort();
writable._construct(callback)
Aggiunto in: v15.0.0
callback
<Function> Chiama questa funzione (opzionalmente con un argomento di errore) quando lo stream ha terminato l'inizializzazione.
Il metodo _construct()
NON DEVE essere chiamato direttamente. Può essere implementato da classi figlie e, in tal caso, verrà chiamato solo dai metodi interni della classe Writable
.
Questa funzione opzionale verrà chiamata in un tick dopo che il costruttore dello stream ha restituito, ritardando qualsiasi chiamata a _write()
, _final()
e _destroy()
fino a quando non viene chiamata callback
. Questo è utile per inizializzare lo stato o inizializzare asincronamente le risorse prima che lo stream possa essere utilizzato.
const { Writable } = require('node:stream');
const fs = require('node:fs');
class WriteStream extends Writable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, 'w', (err, fd) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback);
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
writable._write(chunk, encoding, callback)
[Cronologia]
Versione | Modifiche |
---|---|
v12.11.0 | _write() è opzionale quando si fornisce _writev(). |
chunk
<Buffer> | <string> | <any> IlBuffer
da scrivere, convertito dallastring
passata astream.write()
. Se l'opzionedecodeStrings
dello stream èfalse
o lo stream sta operando in modalità oggetto, il chunk non verrà convertito e sarà qualsiasi cosa sia stata passata astream.write()
.encoding
<string> Se il chunk è una stringa, alloraencoding
è la codifica dei caratteri di quella stringa. Se chunk è unBuffer
, o se lo stream sta operando in modalità oggetto,encoding
può essere ignorato.callback
<Function> Chiama questa funzione (opzionalmente con un argomento di errore) quando l'elaborazione è completa per il chunk fornito.
Tutte le implementazioni dello stream Writable
devono fornire un metodo writable._write()
e/o writable._writev()
per inviare dati alla risorsa sottostante.
Gli stream Transform
forniscono la propria implementazione di writable._write()
.
Questa funzione NON DEVE essere chiamata direttamente dal codice dell'applicazione. Dovrebbe essere implementata da classi figlie e chiamata solo dai metodi interni della classe Writable
.
La funzione callback
deve essere chiamata sincronicamente all'interno di writable._write()
o asincronicamente (cioè tick diverso) per segnalare se la scrittura è stata completata con successo o non è riuscita con un errore. Il primo argomento passato al callback
deve essere l'oggetto Error
se la chiamata non è riuscita o null
se la scrittura è riuscita.
Tutte le chiamate a writable.write()
che si verificano tra il momento in cui viene chiamato writable._write()
e il callback
viene chiamato faranno sì che i dati scritti vengano memorizzati nel buffer. Quando viene richiamato il callback
, lo stream potrebbe emettere un evento 'drain'
. Se un'implementazione dello stream è in grado di elaborare più chunk di dati contemporaneamente, è necessario implementare il metodo writable._writev()
.
Se la proprietà decodeStrings
è impostata esplicitamente su false
nelle opzioni del costruttore, allora chunk
rimarrà lo stesso oggetto passato a .write()
e potrebbe essere una stringa anziché un Buffer
. Questo per supportare le implementazioni che hanno una gestione ottimizzata per determinate codifiche di dati stringa. In tal caso, l'argomento encoding
indicherà la codifica dei caratteri della stringa. Altrimenti, l'argomento encoding
può essere tranquillamente ignorato.
Il metodo writable._write()
è preceduto da un trattino basso perché è interno alla classe che lo definisce e non dovrebbe mai essere chiamato direttamente dai programmi utente.
writable._writev(chunks, callback)
chunks
<Object[]> I dati da scrivere. Il valore è un array di <Object> che rappresentano ciascuno un blocco discreto di dati da scrivere. Le proprietà di questi oggetti sono:chunk
<Buffer> | <string> Un'istanza buffer o una stringa contenente i dati da scrivere. Ilchunk
sarà una stringa se ilWritable
è stato creato con l'opzionedecodeStrings
impostata sufalse
ed è stata passata una stringa awrite()
.encoding
<string> La codifica dei caratteri delchunk
. Sechunk
è unBuffer
, laencoding
sarà'buffer'
.
callback
<Function> Una funzione di callback (opzionalmente con un argomento di errore) da invocare quando l'elaborazione è completa per i chunk forniti.
Questa funzione NON DEVE essere chiamata direttamente dal codice dell'applicazione. Dovrebbe essere implementata dalle classi figlie e chiamata solo dai metodi interni della classe Writable
.
Il metodo writable._writev()
può essere implementato in aggiunta o in alternativa a writable._write()
nelle implementazioni di stream in grado di elaborare più chunk di dati contemporaneamente. Se implementato e se ci sono dati memorizzati nel buffer da scritture precedenti, verrà chiamato _writev()
invece di _write()
.
Il metodo writable._writev()
è preceduto da un trattino basso perché è interno alla classe che lo definisce e non dovrebbe mai essere chiamato direttamente dai programmi utente.
writable._destroy(err, callback)
Aggiunto in: v8.0.0
err
<Error> Un possibile errore.callback
<Function> Una funzione di callback che accetta un argomento di errore opzionale.
Il metodo _destroy()
viene chiamato da writable.destroy()
. Può essere sovrascritto dalle classi figlie ma non deve essere chiamato direttamente.
writable._final(callback)
Aggiunto in: v8.0.0
callback
<Funzione> Chiama questa funzione (opzionalmente con un argomento di errore) quando hai finito di scrivere tutti i dati rimanenti.
Il metodo _final()
non deve essere chiamato direttamente. Può essere implementato dalle classi figlie e, in tal caso, verrà chiamato solo dai metodi interni della classe Writable
.
Questa funzione opzionale verrà chiamata prima che lo stream si chiuda, ritardando l'evento 'finish'
fino a quando non viene chiamata callback
. Questo è utile per chiudere le risorse o scrivere dati memorizzati nel buffer prima che uno stream termini.
Errori durante la scrittura
Gli errori che si verificano durante l'elaborazione dei metodi writable._write()
, writable._writev()
e writable._final()
devono essere propagati invocando la callback e passando l'errore come primo argomento. Lanciare un Error
dall'interno di questi metodi o emettere manualmente un evento 'error'
provoca un comportamento indefinito.
Se uno stream Readable
esegue il pipe in uno stream Writable
quando Writable
emette un errore, lo stream Readable
verrà scollegato.
const { Writable } = require('node:stream');
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
},
});
Un esempio di stream scrivibile
Quanto segue illustra un'implementazione di stream Writable
personalizzata piuttosto semplicistica (e un po' inutile). Sebbene questa specifica istanza di stream Writable
non sia di alcuna particolare utilità reale, l'esempio illustra ciascuno degli elementi richiesti di un'istanza di stream Writable
personalizzata:
const { Writable } = require('node:stream');
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'));
} else {
callback();
}
}
}
Decodifica dei buffer in un flusso scrivibile
La decodifica dei buffer è un'operazione comune, ad esempio, quando si utilizzano trasformatori la cui input è una stringa. Questo non è un processo banale quando si utilizzano codifiche di caratteri multi-byte, come UTF-8. L'esempio seguente mostra come decodificare stringhe multi-byte utilizzando StringDecoder
e Writable
.
const { Writable } = require('node:stream');
const { StringDecoder } = require('node:string_decoder');
class StringWritable extends Writable {
constructor(options) {
super(options);
this._decoder = new StringDecoder(options?.defaultEncoding);
this.data = '';
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk);
}
this.data += chunk;
callback();
}
_final(callback) {
this.data += this._decoder.end();
callback();
}
}
const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();
w.write('currency: ');
w.write(euro[0]);
w.end(euro[1]);
console.log(w.data); // currency: €
Implementazione di un flusso leggibile
La classe stream.Readable
viene estesa per implementare un flusso Readable
.
I flussi Readable
personalizzati devono chiamare il costruttore new stream.Readable([options])
e implementare il metodo readable._read()
.
new stream.Readable([options])
[Cronologia]
Versione | Modifiche |
---|---|
v22.0.0 | aumenta highWaterMark predefinito. |
v15.5.0 | supporta il passaggio di un AbortSignal. |
v14.0.0 | Cambia l'opzione autoDestroy predefinita in true . |
v11.2.0, v10.16.0 | Aggiunge l'opzione autoDestroy per destroy() automaticamente il flusso quando emette 'end' o errori. |
options
<Object>highWaterMark
<number> Il massimo numero di byte da memorizzare nel buffer interno prima di cessare la lettura dalla risorsa sottostante. Predefinito:65536
(64 KiB), o16
per i flussiobjectMode
.encoding
<string> Se specificato, i buffer verranno decodificati in stringhe utilizzando la codifica specificata. Predefinito:null
.objectMode
<boolean> Indica se questo flusso deve comportarsi come un flusso di oggetti. Ciò significa chestream.read(n)
restituisce un singolo valore invece di unBuffer
di dimensionen
. Predefinito:false
.emitClose
<boolean> Indica se il flusso deve emettere'close'
dopo essere stato distrutto. Predefinito:true
.read
<Function> Implementazione per il metodostream._read()
.destroy
<Function> Implementazione per il metodostream._destroy()
.construct
<Function> Implementazione per il metodostream._construct()
.autoDestroy
<boolean> Indica se questo flusso deve chiamare automaticamente.destroy()
su se stesso dopo la fine. Predefinito:true
.signal
<AbortSignal> Un segnale che rappresenta una possibile cancellazione.
const { Readable } = require('node:stream');
class MyReadable extends Readable {
constructor(options) {
// Chiama il costruttore stream.Readable(options).
super(options);
// ...
}
}
Oppure, quando si utilizzano costruttori in stile pre-ES6:
const { Readable } = require('node:stream');
const util = require('node:util');
function MyReadable(options) {
if (!(this instanceof MyReadable))
return new MyReadable(options);
Readable.call(this, options);
}
util.inherits(MyReadable, Readable);
Oppure, utilizzando l'approccio del costruttore semplificato:
const { Readable } = require('node:stream');
const myReadable = new Readable({
read(size) {
// ...
},
});
Chiamare abort
sull'AbortController
corrispondente all'AbortSignal
passato si comporterà allo stesso modo della chiamata di .destroy(new AbortError())
sul flusso leggibile creato.
const { Readable } = require('node:stream');
const controller = new AbortController();
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal,
});
// Successivamente, interrompere l'operazione chiudendo il flusso
controller.abort();
readable._construct(callback)
Aggiunto in: v15.0.0
callback
<Function> Chiama questa funzione (opzionalmente con un argomento di errore) quando lo stream ha terminato l'inizializzazione.
Il metodo _construct()
NON DEVE essere chiamato direttamente. Può essere implementato dalle classi figlio e, in tal caso, verrà chiamato solo dai metodi interni della classe Readable
.
Questa funzione opzionale verrà pianificata nel prossimo tick dal costruttore dello stream, ritardando qualsiasi chiamata a _read()
e _destroy()
fino a quando non viene chiamata callback
. Ciò è utile per inizializzare lo stato o inizializzare asincronamente le risorse prima che lo stream possa essere utilizzato.
const { Readable } = require('node:stream');
const fs = require('node:fs');
class ReadStream extends Readable {
constructor(filename) {
super();
this.filename = filename;
this.fd = null;
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err);
} else {
this.fd = fd;
callback();
}
});
}
_read(n) {
const buf = Buffer.alloc(n);
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err);
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
}
});
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, (er) => callback(er || err));
} else {
callback(err);
}
}
}
readable._read(size)
Aggiunto in: v0.9.4
size
<number> Numero di byte da leggere in modo asincrono
Questa funzione NON DEVE essere chiamata direttamente dal codice dell'applicazione. Dovrebbe essere implementata dalle classi figlio e chiamata solo dai metodi interni della classe Readable
.
Tutte le implementazioni di stream Readable
devono fornire un'implementazione del metodo readable._read()
per recuperare i dati dalla risorsa sottostante.
Quando viene chiamato readable._read()
, se i dati sono disponibili dalla risorsa, l'implementazione deve iniziare a inserire tali dati nella coda di lettura utilizzando il metodo this.push(dataChunk)
. _read()
verrà chiamato di nuovo dopo ogni chiamata a this.push(dataChunk)
una volta che lo stream è pronto per accettare più dati. _read()
può continuare a leggere dalla risorsa e a inserire dati fino a quando readable.push()
non restituisce false
. Solo quando _read()
viene chiamato di nuovo dopo che si è fermato, dovrebbe riprendere a inserire dati aggiuntivi nella coda.
Una volta che il metodo readable._read()
è stato chiamato, non verrà chiamato di nuovo fino a quando non verranno inseriti altri dati tramite il metodo readable.push()
. Dati vuoti come buffer e stringhe vuote non causeranno la chiamata di readable._read()
.
L'argomento size
è indicativo. Per le implementazioni in cui una "lettura" è una singola operazione che restituisce dati, è possibile utilizzare l'argomento size
per determinare la quantità di dati da recuperare. Altre implementazioni possono ignorare questo argomento e semplicemente fornire i dati quando diventano disponibili. Non è necessario "attendere" che siano disponibili size
byte prima di chiamare stream.push(chunk)
.
Il metodo readable._read()
è preceduto da un trattino basso perché è interno alla classe che lo definisce e non dovrebbe mai essere chiamato direttamente dai programmi utente.
readable._destroy(err, callback)
Aggiunto in: v8.0.0
err
<Error> Un possibile errore.callback
<Function> Una funzione di callback che accetta un argomento di errore opzionale.
Il metodo _destroy()
viene chiamato da readable.destroy()
. Può essere sovrascritto dalle classi figlio ma non deve essere chiamato direttamente.
readable.push(chunk[, encoding])
[Cronologia]
Versione | Modifiche |
---|---|
v22.0.0, v20.13.0 | L'argomento chunk ora può essere un'istanza di TypedArray o DataView . |
v8.0.0 | L'argomento chunk ora può essere un'istanza di Uint8Array . |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Blocco di dati da inserire nella coda di lettura. Per gli stream che non operano in modalità oggetto,chunk
deve essere un <string>, <Buffer>, <TypedArray> o <DataView>. Per gli stream in modalità oggetto,chunk
può essere qualsiasi valore JavaScript.encoding
<string> Codifica dei blocchi di stringa. Deve essere una codificaBuffer
valida, come'utf8'
o'ascii'
.- Restituisce: <boolean>
true
se è possibile continuare a inserire blocchi di dati aggiuntivi;false
altrimenti.
Quando chunk
è un <Buffer>, <TypedArray>, <DataView> o <string>, il chunk
di dati verrà aggiunto alla coda interna affinché gli utenti dello stream lo consumino. Passare chunk
come null
segnala la fine dello stream (EOF), dopo di che non è possibile scrivere altri dati.
Quando Readable
opera in modalità in pausa, i dati aggiunti con readable.push()
possono essere letti chiamando il metodo readable.read()
quando viene emesso l'evento 'readable'
.
Quando Readable
opera in modalità flowing, i dati aggiunti con readable.push()
verranno forniti emettendo un evento 'data'
.
Il metodo readable.push()
è progettato per essere il più flessibile possibile. Ad esempio, quando si incapsula una sorgente di livello inferiore che fornisce una qualche forma di meccanismo di pausa/ripresa e un callback di dati, la sorgente di basso livello può essere incapsulata dall'istanza Readable
personalizzata:
// `_source` è un oggetto con i metodi readStop() e readStart() e
// un membro `ondata` che viene chiamato quando ha dati e
// un membro `onend` che viene chiamato quando i dati sono terminati.
class SourceWrapper extends Readable {
constructor(options) {
super(options);
this._source = getLowLevelSourceObject();
// Ogni volta che ci sono dati, inseriscili nel buffer interno.
this._source.ondata = (chunk) => {
// Se push() restituisce false, interrompi la lettura dalla sorgente.
if (!this.push(chunk))
this._source.readStop();
};
// Quando la sorgente termina, inserisci il blocco `null` che segnala EOF.
this._source.onend = () => {
this.push(null);
};
}
// _read() verrà chiamato quando lo stream vuole estrarre più dati.
// L'argomento advisory size viene ignorato in questo caso.
_read(size) {
this._source.readStart();
}
}
Il metodo readable.push()
viene utilizzato per inserire il contenuto nel buffer interno. Può essere guidato dal metodo readable._read()
.
Per gli stream che non operano in modalità oggetto, se il parametro chunk
di readable.push()
è undefined
, verrà trattato come stringa o buffer vuoti. Vedi readable.push('')
per maggiori informazioni.
Error durante la lettura
Gli errori che si verificano durante l'elaborazione di readable._read()
devono essere propagati attraverso il metodo readable.destroy(err)
. Lanciare un Error
dall'interno di readable._read()
o emettere manualmente un evento 'error'
comporta un comportamento non definito.
const { Readable } = require('node:stream');
const myReadable = new Readable({
read(size) {
const err = checkSomeErrorCondition();
if (err) {
this.destroy(err);
} else {
// Do some work.
}
},
});
Un esempio di stream di conteggio
Quello che segue è un esempio base di uno stream Readable
che emette i numeri da 1 a 1.000.000 in ordine crescente e poi termina.
const { Readable } = require('node:stream');
class Counter extends Readable {
constructor(opt) {
super(opt);
this._max = 1000000;
this._index = 1;
}
_read() {
const i = this._index++;
if (i > this._max)
this.push(null);
else {
const str = String(i);
const buf = Buffer.from(str, 'ascii');
this.push(buf);
}
}
}
Implementazione di uno stream duplex
Uno stream Duplex
è uno che implementa sia Readable
che Writable
, come una connessione socket TCP.
Poiché JavaScript non supporta l'ereditarietà multipla, la classe stream.Duplex
viene estesa per implementare uno stream Duplex
(invece di estendere le classi stream.Readable
e stream.Writable
).
La classe stream.Duplex
eredita prototipicamente da stream.Readable
e parassiticamente da stream.Writable
, ma instanceof
funzionerà correttamente per entrambe le classi base a causa della sovrascrittura di Symbol.hasInstance
su stream.Writable
.
Gli stream Duplex
personalizzati devono chiamare il costruttore new stream.Duplex([options])
e implementare sia i metodi readable._read()
che writable._write()
.
new stream.Duplex(options)
[Cronologia]
Versione | Modifiche |
---|---|
v8.4.0 | Le opzioni readableHighWaterMark e writableHighWaterMark sono ora supportate. |
options
<Object> Passato sia ai costruttoriWritable
cheReadable
. Ha anche i seguenti campi:allowHalfOpen
<boolean> Se impostato sufalse
, lo stream terminerà automaticamente il lato scrivibile quando il lato leggibile termina. Predefinito:true
.readable
<boolean> Imposta se ilDuplex
debba essere leggibile. Predefinito:true
.writable
<boolean> Imposta se ilDuplex
debba essere scrivibile. Predefinito:true
.readableObjectMode
<boolean> ImpostaobjectMode
per il lato leggibile dello stream. Non ha effetto seobjectMode
ètrue
. Predefinito:false
.writableObjectMode
<boolean> ImpostaobjectMode
per il lato scrivibile dello stream. Non ha effetto seobjectMode
ètrue
. Predefinito:false
.readableHighWaterMark
<number> ImpostahighWaterMark
per il lato leggibile dello stream. Non ha effetto sehighWaterMark
è fornito.writableHighWaterMark
<number> ImpostahighWaterMark
per il lato scrivibile dello stream. Non ha effetto sehighWaterMark
è fornito.
const { Duplex } = require('node:stream');
class MyDuplex extends Duplex {
constructor(options) {
super(options);
// ...
}
}
Oppure, quando si utilizzano costruttori in stile pre-ES6:
const { Duplex } = require('node:stream');
const util = require('node:util');
function MyDuplex(options) {
if (!(this instanceof MyDuplex))
return new MyDuplex(options);
Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);
Oppure, utilizzando l'approccio del costruttore semplificato:
const { Duplex } = require('node:stream');
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
},
});
Quando si utilizza pipeline:
const { Transform, pipeline } = require('node:stream');
const fs = require('node:fs');
pipeline(
fs.createReadStream('object.json')
.setEncoding('utf8'),
new Transform({
decodeStrings: false, // Accept string input rather than Buffers
construct(callback) {
this.data = '';
callback();
},
transform(chunk, encoding, callback) {
this.data += chunk;
callback();
},
flush(callback) {
try {
// Make sure is valid json.
JSON.parse(this.data);
this.push(this.data);
callback();
} catch (err) {
callback(err);
}
},
}),
fs.createWriteStream('valid-object.json'),
(err) => {
if (err) {
console.error('failed', err);
} else {
console.log('completed');
}
},
);
Un esempio di stream duplex
Il seguente illustra un semplice esempio di stream Duplex
che avvolge un ipotetico oggetto sorgente di livello inferiore in cui i dati possono essere scritti e da cui i dati possono essere letti, anche se utilizzando un'API non compatibile con gli stream di Node.js. Il seguente illustra un semplice esempio di stream Duplex
che memorizza nel buffer i dati scritti in entrata tramite l'interfaccia Writable
che viene riletta tramite l'interfaccia Readable
.
const { Duplex } = require('node:stream');
const kSource = Symbol('source');
class MyDuplex extends Duplex {
constructor(source, options) {
super(options);
this[kSource] = source;
}
_write(chunk, encoding, callback) {
// La sorgente sottostante gestisce solo stringhe.
if (Buffer.isBuffer(chunk))
chunk = chunk.toString();
this[kSource].writeSomeData(chunk);
callback();
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding));
});
}
}
L'aspetto più importante di uno stream Duplex
è che i lati Readable
e Writable
operano indipendentemente l'uno dall'altro nonostante coesistano all'interno di una singola istanza di oggetto.
Stream duplex in modalità oggetto
Per gli stream Duplex
, objectMode
può essere impostato esclusivamente per il lato Readable
o Writable
utilizzando rispettivamente le opzioni readableObjectMode
e writableObjectMode
.
Nell'esempio seguente, ad esempio, viene creato un nuovo stream Transform
(che è un tipo di stream Duplex
) che ha un lato Writable
in modalità oggetto che accetta numeri JavaScript che vengono convertiti in stringhe esadecimali sul lato Readable
.
const { Transform } = require('node:stream');
// Tutti gli stream Transform sono anche Stream Duplex.
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// Coerce the chunk to a number if necessary.
chunk |= 0;
// Transform the chunk into something else.
const data = chunk.toString(16);
// Push the data onto the readable queue.
callback(null, '0'.repeat(data.length % 2) + data);
},
});
myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));
myTransform.write(1);
// Prints: 01
myTransform.write(10);
// Prints: 0a
myTransform.write(100);
// Prints: 64
Implementazione di uno stream di trasformazione
Uno stream Transform
è uno stream Duplex
in cui l'output viene calcolato in qualche modo dall'input. Esempi includono gli stream zlib o gli stream crypto che comprimono, crittografano o decrittografano i dati.
Non è necessario che l'output abbia le stesse dimensioni dell'input, lo stesso numero di chunk o arrivi nello stesso momento. Ad esempio, uno stream Hash
avrà sempre e solo un singolo chunk di output che viene fornito quando l'input è terminato. Uno stream zlib
produrrà un output che è molto più piccolo o molto più grande del suo input.
La classe stream.Transform
viene estesa per implementare uno stream Transform
.
La classe stream.Transform
eredita prototipicamente da stream.Duplex
e implementa le proprie versioni dei metodi writable._write()
e readable._read()
. Le implementazioni Transform
personalizzate devono implementare il metodo transform._transform()
e possono anche implementare il metodo transform._flush()
.
È necessario prestare attenzione quando si utilizzano stream Transform
poiché i dati scritti nello stream possono causare la pausa del lato Writable
dello stream se l'output sul lato Readable
non viene consumato.
new stream.Transform([options])
options
<Object> Passato sia ai costruttoriWritable
cheReadable
. Ha anche i seguenti campi:transform
<Function> Implementazione per il metodostream._transform()
.flush
<Function> Implementazione per il metodostream._flush()
.
const { Transform } = require('node:stream');
class MyTransform extends Transform {
constructor(options) {
super(options);
// ...
}
}
Oppure, quando si utilizzano costruttori in stile pre-ES6:
const { Transform } = require('node:stream');
const util = require('node:util');
function MyTransform(options) {
if (!(this instanceof MyTransform))
return new MyTransform(options);
Transform.call(this, options);
}
util.inherits(MyTransform, Transform);
Oppure, utilizzando l'approccio semplificato del costruttore:
const { Transform } = require('node:stream');
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
},
});
Evento: 'end'
L'evento 'end'
proviene dalla classe stream.Readable
. L'evento 'end'
viene emesso dopo che tutti i dati sono stati emessi, il che si verifica dopo che è stata chiamata la callback in transform._flush()
. In caso di errore, 'end'
non deve essere emesso.
Evento: 'finish'
L'evento 'finish'
proviene dalla classe stream.Writable
. L'evento 'finish'
viene emesso dopo che stream.end()
è stato chiamato e tutti i chunk sono stati elaborati da stream._transform()
. In caso di errore, 'finish'
non deve essere emesso.
transform._flush(callback)
callback
<Function> Una funzione di callback (opzionalmente con un argomento di errore e dati) da chiamare quando i dati rimanenti sono stati scaricati.
Questa funzione NON DEVE essere chiamata direttamente dal codice dell'applicazione. Dovrebbe essere implementata dalle classi figlie e chiamata solo dai metodi interni della classe Readable
.
In alcuni casi, un'operazione di trasformazione potrebbe aver bisogno di emettere un bit aggiuntivo di dati alla fine dello stream. Ad esempio, uno stream di compressione zlib
memorizzerà una quantità di stato interno utilizzato per comprimere in modo ottimale l'output. Quando lo stream termina, tuttavia, quei dati aggiuntivi devono essere scaricati in modo che i dati compressi siano completi.
Le implementazioni personalizzate di Transform
possono implementare il metodo transform._flush()
. Questo verrà chiamato quando non ci sono più dati scritti da consumare, ma prima che l'evento 'end'
venga emesso segnalando la fine dello stream Readable
.
All'interno dell'implementazione di transform._flush()
, il metodo transform.push()
può essere chiamato zero o più volte, a seconda dei casi. La funzione callback
deve essere chiamata al termine dell'operazione di flush.
Il metodo transform._flush()
è preceduto da un trattino basso perché è interno alla classe che lo definisce e non dovrebbe mai essere chiamato direttamente dai programmi utente.
transform._transform(chunk, encoding, callback)
chunk
<Buffer> | <string> | <any> IlBuffer
da trasformare, convertito dallastring
passata astream.write()
. Se l'opzionedecodeStrings
dello stream èfalse
o lo stream opera in modalità oggetto, il chunk non sarà convertito e sarà qualsiasi cosa sia stata passata astream.write()
.encoding
<string> Se il chunk è una stringa, allora questo è il tipo di codifica. Se il chunk è un buffer, allora questo è il valore speciale'buffer'
. Ignoralo in quel caso.callback
<Function> Una funzione di callback (opzionalmente con un argomento di errore e dati) da chiamare dopo che ilchunk
fornito è stato elaborato.
Questa funzione NON DEVE essere chiamata direttamente dal codice dell'applicazione. Dovrebbe essere implementata dalle classi figlio e chiamata solo dai metodi interni della classe Readable
.
Tutte le implementazioni dello stream Transform
devono fornire un metodo _transform()
per accettare input e produrre output. L'implementazione transform._transform()
gestisce i byte che vengono scritti, calcola un output, quindi passa quell'output alla parte leggibile usando il metodo transform.push()
.
Il metodo transform.push()
può essere chiamato zero o più volte per generare output da un singolo chunk di input, a seconda di quanto deve essere emesso come risultato del chunk.
È possibile che non venga generato alcun output da un dato chunk di dati di input.
La funzione callback
deve essere chiamata solo quando il chunk corrente è completamente consumato. Il primo argomento passato alla callback
deve essere un oggetto Error
se si è verificato un errore durante l'elaborazione dell'input o null
altrimenti. Se un secondo argomento viene passato alla callback
, verrà inoltrato al metodo transform.push()
, ma solo se il primo argomento è falsy. In altre parole, i seguenti sono equivalenti:
transform.prototype._transform = function(data, encoding, callback) {
this.push(data);
callback();
};
transform.prototype._transform = function(data, encoding, callback) {
callback(null, data);
};
Il metodo transform._transform()
è preceduto da un trattino basso perché è interno alla classe che lo definisce e non dovrebbe mai essere chiamato direttamente dai programmi utente.
transform._transform()
non viene mai chiamato in parallelo; gli stream implementano un meccanismo di coda e, per ricevere il chunk successivo, è necessario chiamare callback
, sia sincrona che asincrona.
Classe: stream.PassThrough
La classe stream.PassThrough
è un'implementazione banale di uno stream Transform
che passa semplicemente i byte di input all'output. Il suo scopo è principalmente per esempi e test, ma ci sono alcuni casi d'uso in cui stream.PassThrough
è utile come elemento costitutivo per nuovi tipi di stream.
Note aggiuntive
Compatibilità degli stream con generatori asincroni e iteratori asincroni
Con il supporto dei generatori e iteratori asincroni in JavaScript, i generatori asincroni sono effettivamente una costruzione di stream di prima classe a livello di linguaggio a questo punto.
Di seguito sono forniti alcuni casi comuni di interoperabilità dell'utilizzo di stream Node.js con generatori asincroni e iteratori asincroni.
Consumare stream leggibili con iteratori asincroni
(async function() {
for await (const chunk of readable) {
console.log(chunk);
}
})();
Gli iteratori asincroni registrano un gestore di errori permanente sullo stream per prevenire eventuali errori post-distruzione non gestiti.
Creazione di stream leggibili con generatori asincroni
Uno stream leggibile di Node.js può essere creato da un generatore asincrono utilizzando il metodo di utilità Readable.from()
:
const { Readable } = require('node:stream');
const ac = new AbortController();
const signal = ac.signal;
async function * generate() {
yield 'a';
await someLongRunningFn({ signal });
yield 'b';
yield 'c';
}
const readable = Readable.from(generate());
readable.on('close', () => {
ac.abort();
});
readable.on('data', (chunk) => {
console.log(chunk);
});
Piping verso stream scrivibili da iteratori asincroni
Quando si scrive su uno stream scrivibile da un iteratore asincrono, assicurarsi della corretta gestione della contropressione e degli errori. stream.pipeline()
astrae la gestione della contropressione e degli errori correlati alla contropressione:
const fs = require('node:fs');
const { pipeline } = require('node:stream');
const { pipeline: pipelinePromise } = require('node:stream/promises');
const writable = fs.createWriteStream('./file');
const ac = new AbortController();
const signal = ac.signal;
const iterator = createIterator({ signal });
// Callback Pattern
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err);
} else {
console.log(value, 'value returned');
}
}).on('close', () => {
ac.abort();
});
// Promise Pattern
pipelinePromise(iterator, writable)
.then((value) => {
console.log(value, 'value returned');
})
.catch((err) => {
console.error(err);
ac.abort();
});
Compatibilità con versioni precedenti di Node.js
Prima di Node.js 0.10, l'interfaccia dello stream Readable
era più semplice, ma anche meno potente e meno utile.
- Invece di aspettare le chiamate al metodo
stream.read()
, gli eventi'data'
iniziavano a essere emessi immediatamente. Le applicazioni che avrebbero dovuto eseguire una certa quantità di lavoro per decidere come gestire i dati erano tenute a memorizzare i dati letti in buffer in modo che i dati non andassero persi. - Il metodo
stream.pause()
era consultivo, piuttosto che garantito. Ciò significava che era comunque necessario essere preparati a ricevere eventi'data'
anche quando lo stream era in uno stato di pausa.
In Node.js 0.10, è stata aggiunta la classe Readable
. Per la compatibilità con le versioni precedenti dei programmi Node.js, gli stream Readable
passano in "flowing mode" quando viene aggiunto un gestore di eventi 'data'
, oppure quando viene chiamato il metodo stream.resume()
. L'effetto è che, anche quando non si utilizzano il nuovo metodo stream.read()
e l'evento 'readable'
, non è più necessario preoccuparsi di perdere chunk di 'data'
.
Sebbene la maggior parte delle applicazioni continuerà a funzionare normalmente, ciò introduce un caso limite nelle seguenti condizioni:
- Non viene aggiunto alcun listener di eventi
'data'
. - Il metodo
stream.resume()
non viene mai chiamato. - Lo stream non viene piped a nessuna destinazione scrivibile.
Ad esempio, considera il seguente codice:
// ATTENZIONE! ROTTO!
net.createServer((socket) => {
// Aggiungiamo un listener 'end', ma non consumiamo mai i dati.
socket.on('end', () => {
// Non arriverà mai qui.
socket.end('Il messaggio è stato ricevuto ma non è stato elaborato.\n');
});
}).listen(1337);
Prima di Node.js 0.10, i dati del messaggio in entrata sarebbero semplicemente stati scartati. Tuttavia, in Node.js 0.10 e versioni successive, il socket rimane in pausa per sempre.
La soluzione alternativa in questa situazione è chiamare il metodo stream.resume()
per avviare il flusso di dati:
// Soluzione alternativa.
net.createServer((socket) => {
socket.on('end', () => {
socket.end('Il messaggio è stato ricevuto ma non è stato elaborato.\n');
});
// Avvia il flusso di dati, scartandoli.
socket.resume();
}).listen(1337);
Oltre ai nuovi stream Readable
che passano in flowing mode, gli stream in stile pre-0.10 possono essere avvolti in una classe Readable
utilizzando il metodo readable.wrap()
.
readable.read(0)
Ci sono alcuni casi in cui è necessario attivare un aggiornamento dei meccanismi di flusso leggibile sottostanti, senza consumare effettivamente alcun dato. In tali casi, è possibile chiamare readable.read(0)
, che restituirà sempre null
.
Se il buffer di lettura interno è inferiore a highWaterMark
e il flusso non è attualmente in lettura, la chiamata a stream.read(0)
attiverà una chiamata stream._read()
di basso livello.
Sebbene la maggior parte delle applicazioni non avrà quasi mai bisogno di farlo, ci sono situazioni all'interno di Node.js in cui ciò viene fatto, in particolare all'interno della classe di flusso Readable
.
readable.push('')
L'uso di readable.push('')
non è raccomandato.
L'invio di uno <string>, <Buffer>, <TypedArray> o <DataView> a zero byte a un flusso che non è in modalità oggetto ha un interessante effetto collaterale. Poiché è una chiamata a readable.push()
, la chiamata terminerà il processo di lettura. Tuttavia, poiché l'argomento è una stringa vuota, nessun dato viene aggiunto al buffer leggibile, quindi non c'è nulla che un utente possa consumare.
Discrepanza highWaterMark
dopo la chiamata a readable.setEncoding()
L'uso di readable.setEncoding()
cambierà il comportamento del modo in cui highWaterMark
opera in modalità non oggetto.
In genere, la dimensione del buffer corrente viene misurata rispetto a highWaterMark
in byte. Tuttavia, dopo che setEncoding()
è stato chiamato, la funzione di confronto inizierà a misurare la dimensione del buffer in caratteri.
Questo non è un problema nei casi comuni con latin1
o ascii
. Ma si consiglia di essere consapevoli di questo comportamento quando si lavora con stringhe che potrebbero contenere caratteri multi-byte.