Skip to content

Stream

[Stabile: 2 - Stabile]

Stabile: 2 Stabilità: 2 - Stabile

Codice Sorgente: lib/stream.js

Uno stream è un'interfaccia astratta per lavorare con dati in streaming in Node.js. Il modulo node:stream fornisce un'API per implementare l'interfaccia stream.

Node.js fornisce molti oggetti stream. Ad esempio, una richiesta a un server HTTP e process.stdout sono entrambi istanze di stream.

Gli stream possono essere leggibili, scrivibili o entrambi. Tutti gli stream sono istanze di EventEmitter.

Per accedere al modulo node:stream:

js
const stream = require('node:stream')

Il modulo node:stream è utile per creare nuovi tipi di istanze di stream. Di solito non è necessario utilizzare il modulo node:stream per consumare stream.

Organizzazione di questo documento

Questo documento contiene due sezioni principali e una terza sezione per le note. La prima sezione spiega come utilizzare gli stream esistenti all'interno di un'applicazione. La seconda sezione spiega come creare nuovi tipi di stream.

Tipi di stream

Esistono quattro tipi di stream fondamentali in Node.js:

Inoltre, questo modulo include le funzioni di utilità stream.duplexPair(), stream.pipeline(), stream.finished() stream.Readable.from() e stream.addAbortSignal().

API Stream Promises

Aggiunto in: v15.0.0

L'API stream/promises fornisce un set alternativo di funzioni di utilità asincrone per gli stream che restituiscono oggetti Promise invece di utilizzare callback. L'API è accessibile tramite require('node:stream/promises') o require('node:stream').promises.

stream.pipeline(source[, ...transforms], destination[, options])

stream.pipeline(streams[, options])

[Cronologia]

VersioneModifiche
v18.0.0, v17.2.0, v16.14.0Aggiunta l'opzione end, che può essere impostata su false per evitare la chiusura automatica dello stream di destinazione quando termina la sorgente.
v15.0.0Aggiunto in: v15.0.0
js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
  console.log('Pipeline completata.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Pipeline completata.')

Per utilizzare un AbortSignal, passarlo all'interno di un oggetto options, come ultimo argomento. Quando il segnale viene interrotto, destroy verrà chiamato sulla pipeline sottostante, con un AbortError.

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  const ac = new AbortController()
  const signal = ac.signal

  setImmediate(() => ac.abort())
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
    signal,
  })
}

run().catch(console.error) // AbortError
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
  await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
  console.error(err) // AbortError
}

L'API pipeline supporta anche i generatori asincroni:

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(
    fs.createReadStream('lowercase.txt'),
    async function* (source, { signal }) {
      source.setEncoding('utf8') // Lavora con stringhe invece di `Buffer`.
      for await (const chunk of source) {
        yield await processChunk(chunk, { signal })
      }
    },
    fs.createWriteStream('uppercase.txt')
  )
  console.log('Pipeline completata.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'

await pipeline(
  createReadStream('lowercase.txt'),
  async function* (source, { signal }) {
    source.setEncoding('utf8') // Lavora con stringhe invece di `Buffer`.
    for await (const chunk of source) {
      yield await processChunk(chunk, { signal })
    }
  },
  createWriteStream('uppercase.txt')
)
console.log('Pipeline completata.')

Ricordarsi di gestire l'argomento signal passato al generatore asincrono. Soprattutto nel caso in cui il generatore asincrono sia la sorgente della pipeline (ovvero il primo argomento) o la pipeline non si completerà mai.

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(async function* ({ signal }) {
    await someLongRunningfn({ signal })
    yield 'asd'
  }, fs.createWriteStream('uppercase.txt'))
  console.log('Pipeline completata.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
  await someLongRunningfn({ signal })
  yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline completata.')

L'API pipeline fornisce anche una versione con callback:

stream.finished(stream[, options])

[Cronologia]

VersioneModifiche
v19.5.0, v18.14.0Aggiunto supporto per ReadableStream e WritableStream.
v19.1.0, v18.13.0Aggiunta l'opzione cleanup.
v15.0.0Aggiunta in: v15.0.0
js
const { finished } = require('node:stream/promises')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Stream ha terminato la lettura.')
}

run().catch(console.error)
rs.resume() // Svuota il flusso.
js
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'

const rs = createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Stream ha terminato la lettura.')
}

run().catch(console.error)
rs.resume() // Svuota il flusso.

L'API finished fornisce anche una versione con callback.

stream.finished() lascia listener di eventi pendenti (in particolare 'error', 'end', 'finish' e 'close') dopo che la promise restituita è risolta o rifiutata. La ragione di ciò è che eventi 'error' inaspettati (a causa di implementazioni di flusso errate) non causano arresti anomali inaspettati. Se questo è un comportamento indesiderato, options.cleanup dovrebbe essere impostato su true:

js
await finished(rs, { cleanup: true })

Modalità oggetto

Tutti i flussi creati dalle API di Node.js operano esclusivamente su stringhe, oggetti <Buffer>, <TypedArray> e <DataView>:

  • Stringhe e Buffer sono i tipi più comuni utilizzati con i flussi.
  • TypedArray e DataView consentono di gestire dati binari con tipi come Int32Array o Uint8Array. Quando si scrive un TypedArray o un DataView in un flusso, Node.js elabora i byte grezzi.

È possibile, tuttavia, che le implementazioni di flusso funzionino con altri tipi di valori JavaScript (ad eccezione di null, che ha uno scopo speciale all'interno dei flussi). Tali flussi sono considerati operanti in "modalità oggetto".

Le istanze di flusso vengono commutate in modalità oggetto utilizzando l'opzione objectMode quando viene creato il flusso. Tentare di commutare un flusso esistente in modalità oggetto non è sicuro.

Buffering

Sia i flussi Writable che Readable memorizzeranno i dati in un buffer interno.

La quantità di dati potenzialmente memorizzati nel buffer dipende dall'opzione highWaterMark passata al costruttore del flusso. Per i flussi normali, l'opzione highWaterMark specifica un numero totale di byte. Per i flussi che operano in modalità oggetto, highWaterMark specifica un numero totale di oggetti. Per i flussi che operano su (ma non decodificano) stringhe, highWaterMark specifica un numero totale di unità di codice UTF-16.

I dati vengono memorizzati nel buffer nei flussi Readable quando l'implementazione chiama stream.push(chunk). Se il consumer del flusso non chiama stream.read(), i dati rimarranno nella coda interna fino a quando non saranno consumati.

Una volta che la dimensione totale del buffer di lettura interno raggiunge la soglia specificata da highWaterMark, il flusso interromperà temporaneamente la lettura dei dati dalla risorsa sottostante fino a quando i dati attualmente memorizzati nel buffer non potranno essere consumati (cioè, il flusso smetterà di chiamare il metodo interno readable._read() utilizzato per riempire il buffer di lettura).

I dati vengono memorizzati nel buffer nei flussi Writable quando il metodo writable.write(chunk) viene chiamato ripetutamente. Mentre la dimensione totale del buffer di scrittura interno è inferiore alla soglia impostata da highWaterMark, le chiamate a writable.write() restituiranno true. Una volta che la dimensione del buffer interno raggiunge o supera highWaterMark, verrà restituito false.

Un obiettivo chiave dell'API stream, in particolare il metodo stream.pipe(), è limitare la memorizzazione nel buffer dei dati a livelli accettabili in modo che le sorgenti e le destinazioni con velocità diverse non sovraccarichino la memoria disponibile.

L'opzione highWaterMark è una soglia, non un limite: detta la quantità di dati che un flusso memorizza nel buffer prima di smettere di richiedere altri dati. In generale, non impone una limitazione di memoria rigorosa. Implementazioni di flusso specifiche possono scegliere di imporre limiti più rigorosi, ma ciò è facoltativo.

Poiché i flussi Duplex e Transform sono sia Readable che Writable, ognuno mantiene due buffer interni separati utilizzati per la lettura e la scrittura, consentendo a ciascuna parte di operare indipendentemente dall'altra mantenendo un flusso di dati appropriato ed efficiente. Ad esempio, le istanze net.Socket sono flussi Duplex il cui lato Readable consente il consumo dei dati ricevuti da la socket e il cui lato Writable consente di scrivere dati sulla socket. Poiché i dati possono essere scritti sulla socket a una velocità maggiore o minore rispetto alla ricezione dei dati, ciascun lato dovrebbe operare (e memorizzare nel buffer) indipendentemente dall'altro.

La meccanica della memorizzazione interna nel buffer è un dettaglio di implementazione interna e può essere modificata in qualsiasi momento. Tuttavia, per alcune implementazioni avanzate, i buffer interni possono essere recuperati utilizzando writable.writableBuffer o readable.readableBuffer. L'utilizzo di queste proprietà non documentate è sconsigliato.

API per i consumer di stream

Quasi tutte le applicazioni Node.js, per quanto semplici, utilizzano gli stream in qualche modo. Segue un esempio di utilizzo degli stream in un'applicazione Node.js che implementa un server HTTP:

js
const http = require('node:http')

const server = http.createServer((req, res) => {
  // `req` è un http.IncomingMessage, che è uno stream leggibile.
  // `res` è un http.ServerResponse, che è uno stream scrivibile.

  let body = ''
  // Ottieni i dati come stringhe utf8.
  // Se non viene impostata una codifica, verranno ricevuti oggetti Buffer.
  req.setEncoding('utf8')

  // Gli stream leggibili emettono eventi 'data' una volta aggiunto un listener.
  req.on('data', chunk => {
    body += chunk
  })

  // L'evento 'end' indica che l'intero corpo è stato ricevuto.
  req.on('end', () => {
    try {
      const data = JSON.parse(body)
      // Riscrivi qualcosa di interessante per l'utente:
      res.write(typeof data)
      res.end()
    } catch (er) {
      // uh oh! json errato!
      res.statusCode = 400
      return res.end(`errore: ${er.message}`)
    }
  })
})

server.listen(1337)

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// errore: Unexpected token 'o', "not json" is not valid JSON

Gli stream Writable (come res nell'esempio) espongono metodi come write() e end() che vengono utilizzati per scrivere dati sullo stream.

Gli stream Readable utilizzano l'API EventEmitter per notificare al codice dell'applicazione quando i dati sono disponibili per essere letti dallo stream. Questi dati disponibili possono essere letti dallo stream in diversi modi.

Sia gli stream Writable che Readable utilizzano l'API EventEmitter in vari modi per comunicare lo stato corrente dello stream.

Gli stream Duplex e Transform sono sia Writable che Readable.

Le applicazioni che scrivono dati su uno stream o consumano dati da uno stream non sono tenute a implementare direttamente le interfacce dello stream e generalmente non avranno motivo di chiamare require('node:stream').

Gli sviluppatori che desiderano implementare nuovi tipi di stream dovrebbero fare riferimento alla sezione API per gli implementatori di stream.

Stream scrivibili

Gli stream scrivibili sono un'astrazione per una destinazione in cui vengono scritti i dati.

Esempi di stream Writable includono:

Alcuni di questi esempi sono in realtà stream Duplex che implementano l'interfaccia Writable.

Tutti gli stream Writable implementano l'interfaccia definita dalla classe stream.Writable.

Sebbene istanze specifiche di stream Writable possano differire in vari modi, tutti gli stream Writable seguono lo stesso schema di utilizzo fondamentale illustrato nell'esempio seguente:

js
const myStream = getWritableStreamSomehow()
myStream.write('some data')
myStream.write('some more data')
myStream.end('done writing data')

Classe: stream.Writable

Aggiunto in: v0.9.4

Evento: 'close'

[Cronologia]

VersioneModifiche
v10.0.0Aggiunta opzione emitClose per specificare se 'close' viene emesso su destroy.
v0.9.4Aggiunto in: v0.9.4

L'evento 'close' viene emesso quando lo stream e qualsiasi risorsa sottostante (ad esempio, un descrittore di file) sono stati chiusi. L'evento indica che non saranno emessi più eventi e che non si verificheranno ulteriori calcoli.

Uno stream Writable emetterà sempre l'evento 'close' se viene creato con l'opzione emitClose.

Evento: 'drain'

Aggiunto in: v0.9.4

Se una chiamata a stream.write(chunk) restituisce false, l'evento 'drain' verrà emesso quando sarà opportuno riprendere la scrittura di dati nello stream.

js
// Scrive i dati nello stream scrivibile fornito un milione di volte.
// Attenzione alla contro-pressione.
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000
  write()
  function write() {
    let ok = true
    do {
      i--
      if (i === 0) {
        // Ultima volta!
        writer.write(data, encoding, callback)
      } else {
        // Vediamo se dovremmo continuare o aspettare.
        // Non passare la callback, perché non abbiamo ancora finito.
        ok = writer.write(data, encoding)
      }
    } while (i > 0 && ok)
    if (i > 0) {
      // Doveva fermarsi prima!
      // Scrivi ancora una volta che si svuota.
      writer.once('drain', write)
    }
  }
}
Evento: 'error'

Aggiunto in: v0.9.4

L'evento 'error' viene emesso se si verifica un errore durante la scrittura o il piping dei dati. La callback dell'ascoltatore riceve un singolo argomento Error quando viene chiamata.

Il flusso viene chiuso quando viene emesso l'evento 'error' a meno che l'opzione autoDestroy non sia stata impostata su false durante la creazione del flusso.

Dopo 'error', non dovrebbero essere emessi altri eventi oltre a 'close' (inclusi gli eventi 'error').

Evento: 'finish'

Aggiunto in: v0.9.4

L'evento 'finish' viene emesso dopo che il metodo stream.end() è stato chiamato e tutti i dati sono stati scaricati nel sistema sottostante.

js
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
  console.log('All writes are now complete.')
})
writer.end('This is the end\n')
Evento: 'pipe'

Aggiunto in: v0.9.4

  • src <stream.Readable> flusso sorgente che sta effettuando il piping verso questo flusso scrivibile

L'evento 'pipe' viene emesso quando il metodo stream.pipe() viene chiamato su un flusso leggibile, aggiungendo questo flusso scrivibile al suo insieme di destinazioni.

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
  console.log('Something is piping into the writer.')
  assert.equal(src, reader)
})
reader.pipe(writer)
Evento: 'unpipe'

Aggiunto in: v0.9.4

L'evento 'unpipe' viene emesso quando il metodo stream.unpipe() viene chiamato su un flusso Readable, rimuovendo questo Writable dal suo insieme di destinazioni.

Questo viene anche emesso nel caso in cui questo flusso Writable emetta un errore quando un flusso Readable effettua il piping in esso.

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
  console.log('Something has stopped piping into the writer.')
  assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()

Aggiunto in: v0.11.2

Il metodo writable.cork() forza tutti i dati scritti ad essere bufferizzati in memoria. I dati bufferizzati saranno scaricati quando vengono chiamati i metodi stream.uncork() o stream.end().

Lo scopo principale di writable.cork() è quello di gestire una situazione in cui diversi piccoli chunk vengono scritti nello stream in rapida successione. Invece di inoltrarli immediatamente alla destinazione sottostante, writable.cork() bufferizza tutti i chunk fino a quando non viene chiamato writable.uncork(), che li passerà tutti a writable._writev(), se presente. Questo previene una situazione di blocco head-of-line in cui i dati vengono bufferizzati mentre si aspetta che venga elaborato il primo piccolo chunk. Tuttavia, l'utilizzo di writable.cork() senza implementare writable._writev() potrebbe avere un effetto negativo sulla produttività.

Vedi anche: writable.uncork(), writable._writev().

writable.destroy([error])

[Cronologia]

VersioneModifiche
v14.0.0Funziona come no-op su uno stream che è già stato distrutto.
v8.0.0Aggiunto in: v8.0.0
  • error <Error> Opzionale, un errore da emettere con l'evento 'error'.
  • Restituisce: <this>

Distrugge lo stream. Opzionalmente emette un evento 'error' e un evento 'close' (a meno che emitClose non sia impostato su false). Dopo questa chiamata, lo stream scrivibile è terminato e le chiamate successive a write() o end() si tradurranno in un errore ERR_STREAM_DESTROYED. Questo è un modo distruttivo e immediato per distruggere uno stream. Le chiamate precedenti a write() potrebbero non essere state scaricate e potrebbero attivare un errore ERR_STREAM_DESTROYED. Utilizzare end() invece di destroy se i dati devono essere scaricati prima della chiusura, o attendere l'evento 'drain' prima di distruggere lo stream.

js
const { Writable } = require('node:stream')

const myStream = new Writable()

const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
js
const { Writable } = require('node:stream')

const myStream = new Writable()

myStream.destroy()
myStream.on('error', function wontHappen() {})
js
const { Writable } = require('node:stream')

const myStream = new Writable()
myStream.destroy()

myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED

Una volta che destroy() è stato chiamato, qualsiasi ulteriore chiamata sarà un no-op e non saranno emessi ulteriori errori, eccetto quelli da _destroy(), come 'error'.

Gli implementatori non devono sovrascrivere questo metodo, ma devono invece implementare writable._destroy().

writable.closed

Aggiunto in: v18.0.0

È true dopo che 'close' è stato emesso.

writable.destroyed

Aggiunto in: v8.0.0

È true dopo che writable.destroy() è stato chiamato.

js
const { Writable } = require('node:stream')

const myStream = new Writable()

console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])

[Cronologia]

VersioneModifiche
v22.0.0, v20.13.0L'argomento chunk può ora essere un'istanza di TypedArray o DataView.
v15.0.0Il callback viene invocato prima di 'finish' o in caso di errore.
v14.0.0Il callback viene invocato se viene emesso 'finish' o 'error'.
v10.0.0Questo metodo ora restituisce un riferimento a writable.
v8.0.0L'argomento chunk può ora essere un'istanza di Uint8Array.
v0.9.4Aggiunto in: v0.9.4

Chiamare il metodo writable.end() segnala che nessun altro dato verrà scritto nel Writable. Gli argomenti chunk ed encoding opzionali consentono di scrivere un ultimo ulteriore blocco di dati immediatamente prima di chiudere il flusso.

Chiamare il metodo stream.write() dopo aver chiamato stream.end() genererà un errore.

js
// Scrive 'hello, ' e poi termina con 'world!'.
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hello, ')
file.end('world!')
// Scrivere altro ora non è consentito!
writable.setDefaultEncoding(encoding)

[Cronologia]

VersioneModifiche
v6.1.0Questo metodo ora restituisce un riferimento a writable.
v0.11.15Aggiunto in: v0.11.15

Il metodo writable.setDefaultEncoding() imposta la codifica predefinita encoding per un flusso Writable.

writable.uncork()

Aggiunto in: v0.11.2

Il metodo writable.uncork() svuota tutti i dati in buffer da quando è stato chiamato stream.cork().

Quando si utilizzano writable.cork() e writable.uncork() per gestire il buffering delle scritture su un flusso, differire le chiamate a writable.uncork() usando process.nextTick(). Ciò consente il raggruppamento di tutte le chiamate writable.write() che si verificano all'interno di una determinata fase del loop eventi di Node.js.

js
stream.cork()
stream.write('some ')
stream.write('data ')
process.nextTick(() => stream.uncork())

Se il metodo writable.cork() viene chiamato più volte su un flusso, lo stesso numero di chiamate a writable.uncork() deve essere chiamato per svuotare i dati in buffer.

js
stream.cork()
stream.write('some ')
stream.cork()
stream.write('data ')
process.nextTick(() => {
  stream.uncork()
  // I dati non verranno svuotati fino a quando uncork() non verrà chiamato una seconda volta.
  stream.uncork()
})

Vedi anche: writable.cork().

writable.writable

Aggiunto in: v11.4.0

È true se è sicuro chiamare writable.write(), il che significa che il flusso non è stato distrutto, ha generato errori o terminato.

writable.writableAborted

Aggiunto in: v18.0.0, v16.17.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

Restituisce se il flusso è stato distrutto o ha generato errori prima di emettere 'finish'.

writable.writableEnded

Aggiunto in: v12.9.0

È true dopo che writable.end() è stato chiamato. Questa proprietà non indica se i dati sono stati svuotati, per questo utilizzare writable.writableFinished invece.

writable.writableCorked

Aggiunto in: v13.2.0, v12.16.0

Numero di volte che writable.uncork() deve essere chiamato per scollegare completamente lo stream.

writable.errored

Aggiunto in: v18.0.0

Restituisce un errore se lo stream è stato distrutto con un errore.

writable.writableFinished

Aggiunto in: v12.6.0

È impostato su true immediatamente prima che l'evento 'finish' venga emesso.

writable.writableHighWaterMark

Aggiunto in: v9.3.0

Restituisce il valore di highWaterMark passato durante la creazione di questo Writable.

writable.writableLength

Aggiunto in: v9.4.0

Questa proprietà contiene il numero di byte (o oggetti) nella coda pronti per essere scritti. Il valore fornisce dati di introspezione sullo stato di highWaterMark.

writable.writableNeedDrain

Aggiunto in: v15.2.0, v14.17.0

È true se il buffer dello stream è pieno e lo stream emetterà 'drain'.

writable.writableObjectMode

Aggiunto in: v12.3.0

Getter per la proprietà objectMode di un dato stream Writable.

writable[Symbol.asyncDispose]()

Aggiunto in: v22.4.0, v20.16.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

Chiama writable.destroy() con un AbortError e restituisce una promise che si risolve quando lo stream è terminato.

writable.write(chunk[, encoding][, callback])

[Cronologia]

VersioneModifiche
v22.0.0, v20.13.0L'argomento chunk può ora essere un'istanza di TypedArray o DataView.
v8.0.0L'argomento chunk può ora essere un'istanza di Uint8Array.
v6.0.0Passare null come parametro chunk sarà sempre considerato non valido ora, anche in modalità oggetto.
v0.9.4Aggiunto in: v0.9.4
  • chunk <string> | <Buffer> | <TypedArray> | <DataView> | <any> Dati opzionali da scrivere. Per gli stream che non operano in modalità oggetto, chunk deve essere una <string>, <Buffer>, <TypedArray> o <DataView>. Per gli stream in modalità oggetto, chunk può essere qualsiasi valore JavaScript diverso da null.
  • encoding <string> | <null> La codifica, se chunk è una stringa. Default: 'utf8'
  • callback <Function> Callback per quando questo blocco di dati viene svuotato.
  • Restituisce: <boolean> false se lo stream desidera che il codice chiamante aspetti che l'evento 'drain' venga emesso prima di continuare a scrivere dati aggiuntivi; altrimenti true.

Il metodo writable.write() scrive alcuni dati nello stream e chiama il callback fornito una volta che i dati sono stati completamente gestiti. Se si verifica un errore, il callback verrà chiamato con l'errore come suo primo argomento. Il callback viene chiamato in modo asincrono e prima che venga emesso 'error'.

Il valore restituito è true se il buffer interno è inferiore al highWaterMark configurato quando lo stream è stato creato dopo aver ammesso chunk. Se viene restituito false, ulteriori tentativi di scrittura di dati nello stream dovrebbero interrompersi fino a quando non viene emesso l'evento 'drain'.

Mentre uno stream non sta svuotando, le chiamate a write() metteranno in buffer chunk e restituiranno false. Una volta che tutti i chunk attualmente in buffer sono stati svuotati (accettati per la consegna dal sistema operativo), verrà emesso l'evento 'drain'. Una volta che write() restituisce false, non scrivere più chunk fino a quando non viene emesso l'evento 'drain'. Sebbene sia consentito chiamare write() su uno stream che non sta svuotando, Node.js metterà in buffer tutti i chunk scritti fino a quando non si verificherà l'utilizzo massimo della memoria, a quel punto si interromperà incondizionatamente. Anche prima che si interrompa, l'elevato utilizzo della memoria causerà scarse prestazioni del garbage collector e un elevato RSS (che in genere non viene rilasciato al sistema, nemmeno dopo che la memoria non è più necessaria). Poiché le socket TCP potrebbero non svuotarsi mai se il peer remoto non legge i dati, la scrittura di una socket che non sta svuotando potrebbe portare a una vulnerabilità sfruttabile da remoto.

La scrittura di dati mentre lo stream non sta svuotando è particolarmente problematica per un Transform, perché gli stream Transform sono in pausa per impostazione predefinita fino a quando non vengono collegati o viene aggiunto un gestore di eventi 'data' o 'readable'.

Se i dati da scrivere possono essere generati o recuperati su richiesta, si consiglia di incapsulare la logica in un Readable e utilizzare stream.pipe(). Tuttavia, se si preferisce chiamare write(), è possibile rispettare la contropressione ed evitare problemi di memoria utilizzando l'evento 'drain':

js
function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb)
  } else {
    process.nextTick(cb)
  }
}

// Aspetta che cb venga chiamato prima di effettuare qualsiasi altra scrittura.
write('hello', () => {
  console.log('Scrittura completata, esegui altre scritture ora.')
})

Uno stream Writable in modalità oggetto ignorerà sempre l'argomento encoding.

Flussi leggibili

I flussi leggibili sono un'astrazione per una sorgente da cui vengono consumati i dati.

Esempi di flussi Readable includono:

Tutti i flussi Readable implementano l'interfaccia definita dalla classe stream.Readable.

Due modalità di lettura

I flussi Readable operano effettivamente in una di due modalità: flowing e paused. Queste modalità sono separate dalla modalità oggetto. Un flusso Readable può essere in modalità oggetto o meno, indipendentemente dal fatto che sia in modalità flowing o paused.

  • In modalità flowing, i dati vengono letti automaticamente dal sistema sottostante e forniti a un'applicazione il più rapidamente possibile utilizzando eventi tramite l'interfaccia EventEmitter.
  • In modalità paused, il metodo stream.read() deve essere chiamato esplicitamente per leggere i blocchi di dati dal flusso.

Tutti i flussi Readable iniziano in modalità paused, ma possono essere commutati in modalità flowing in uno dei seguenti modi:

Il Readable può tornare in modalità paused usando uno dei seguenti:

  • Se non ci sono destinazioni pipe, chiamando il metodo stream.pause().
  • Se ci sono destinazioni pipe, rimuovendo tutte le destinazioni pipe. Più destinazioni pipe possono essere rimosse chiamando il metodo stream.unpipe().

Il concetto importante da ricordare è che un Readable non genererà dati finché non viene fornito un meccanismo per consumare o ignorare tali dati. Se il meccanismo di consumo è disabilitato o rimosso, il Readable tenterà di smettere di generare i dati.

Per motivi di compatibilità con le versioni precedenti, la rimozione dei gestori eventi 'data' non metterà automaticamente in pausa il flusso. Inoltre, se ci sono destinazioni pipe, chiamare stream.pause() non garantirà che il flusso rimarrà in pausa una volta che tali destinazioni si svuotano e richiedono più dati.

Se un Readable viene commutato in modalità flowing e non ci sono consumer disponibili per gestire i dati, tali dati andranno persi. Ciò può verificarsi, ad esempio, quando il metodo readable.resume() viene chiamato senza un listener associato all'evento 'data', o quando un gestore eventi 'data' viene rimosso dal flusso.

Aggiungere un gestore eventi 'readable' fa automaticamente smettere di fluire il flusso e i dati devono essere consumati tramite readable.read(). Se il gestore eventi 'readable' viene rimosso, il flusso ricomincerà a fluire se c'è un gestore eventi 'data'.

Tre stati

Le "due modalità" di funzionamento di un flusso Readable sono un'astrazione semplificata per la più complicata gestione dello stato interno che avviene all'interno dell'implementazione del flusso Readable.

Specificamente, in qualsiasi momento, ogni Readable si trova in uno dei tre stati possibili:

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

Quando readable.readableFlowing è null, non viene fornito alcun meccanismo per consumare i dati del flusso. Pertanto, il flusso non genererà dati. In questo stato, l'aggiunta di un listener per l'evento 'data', la chiamata al metodo readable.pipe(), o la chiamata al metodo readable.resume() cambierà readable.readableFlowing in true, facendo sì che Readable inizi ad emettere attivamente eventi man mano che i dati vengono generati.

La chiamata a readable.pause(), readable.unpipe(), o la ricezione di backpressure farà sì che readable.readableFlowing venga impostato su false, interrompendo temporaneamente il flusso di eventi ma non interrompendo la generazione di dati. In questo stato, l'aggiunta di un listener per l'evento 'data' non cambierà readable.readableFlowing in true.

js
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()

pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing è ora false.

pass.on('data', chunk => {
  console.log(chunk.toString())
})
// readableFlowing è ancora false.
pass.write('ok') // Non emetterà 'data'.
pass.resume() // Deve essere chiamato per far sì che il flusso emetta 'data'.
// readableFlowing è ora true.

Mentre readable.readableFlowing è false, i dati potrebbero essere in accumulo nel buffer interno del flusso.

Scegliere uno stile di API

L'API del flusso Readable si è evoluta attraverso diverse versioni di Node.js e fornisce diversi metodi per consumare i dati del flusso. In generale, gli sviluppatori dovrebbero scegliere un solo metodo per consumare i dati e non dovrebbero mai utilizzare più metodi per consumare dati da un singolo flusso. Specificamente, l'utilizzo di una combinazione di on('data'), on('readable'), pipe(), o iteratori asincroni potrebbe portare a comportamenti non intuitivi.

Classe: stream.Readable

Aggiunto in: v0.9.4

Evento: 'close'

[Cronologia]

VersioneModifiche
v10.0.0Aggiunta l'opzione emitClose per specificare se 'close' viene emesso su distruzione.
v0.9.4Aggiunto in: v0.9.4

L'evento 'close' viene emesso quando lo stream e le sue risorse sottostanti (ad esempio, un descrittore di file) sono stati chiusi. L'evento indica che non saranno emessi più eventi e che non si verificheranno ulteriori elaborazioni.

Uno stream Readable emetterà sempre l'evento 'close' se viene creato con l'opzione emitClose.

Evento: 'data'

Aggiunto in: v0.9.4

  • chunk <Buffer> | <string> | <any> Il blocco di dati. Per gli stream che non funzionano in modalità oggetto, il blocco sarà una stringa o un Buffer. Per gli stream che sono in modalità oggetto, il blocco può essere qualsiasi valore JavaScript diverso da null.

L'evento 'data' viene emesso ogni volta che lo stream rilascia la proprietà di un blocco di dati a un consumer. Ciò può accadere ogni volta che lo stream viene commutato in modalità di flusso chiamando readable.pipe(), readable.resume(), o collegando una callback listener all'evento 'data'. L'evento 'data' verrà inoltre emesso ogni volta che viene chiamato il metodo readable.read() ed è disponibile un blocco di dati da restituire.

Collegare un listener dell'evento 'data' a uno stream che non è stato esplicitamente messo in pausa cambierà lo stream in modalità di flusso. I dati verranno quindi passati non appena saranno disponibili.

La callback del listener riceverà il blocco di dati come stringa se è stata specificata una codifica predefinita per lo stream usando il metodo readable.setEncoding(); altrimenti i dati verranno passati come Buffer.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Ricevuti ${chunk.length} byte di dati.`)
})
Evento: 'end'

Aggiunto in: v0.9.4

L'evento 'end' viene emesso quando non ci sono più dati da consumare dallo stream.

L'evento 'end' non verrà emesso a meno che i dati non siano completamente consumati. Questo può essere ottenuto passando lo stream in modalità flowing, o chiamando stream.read() ripetutamente fino a quando tutti i dati non sono stati consumati.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Ricevuti ${chunk.length} byte di dati.`)
})
readable.on('end', () => {
  console.log('Non ci saranno più dati.')
})
Evento: 'error'

Aggiunto in: v0.9.4

L'evento 'error' può essere emesso da un'implementazione Readable in qualsiasi momento. Tipicamente, questo può accadere se lo stream sottostante non è in grado di generare dati a causa di un errore interno sottostante, o quando un'implementazione dello stream tenta di inserire un chunk di dati non valido.

La callback dell'ascoltatore riceverà un singolo oggetto Error.

Evento: 'pause'

Aggiunto in: v0.9.4

L'evento 'pause' viene emesso quando viene chiamato stream.pause() e readableFlowing non è false.

Evento: 'readable'

[Cronologia]

VersioneModifiche
v10.0.0L''readable' viene sempre emesso nel tick successivo dopo che .push() è stato chiamato.
v10.0.0L'utilizzo di 'readable' richiede la chiamata a .read().
v0.9.4Aggiunto in: v0.9.4

L'evento 'readable' viene emesso quando ci sono dati disponibili da leggere dallo stream, fino al limite massimo configurato (state.highWaterMark). Effettivamente, indica che lo stream ha nuove informazioni all'interno del buffer. Se i dati sono disponibili all'interno di questo buffer, stream.read() può essere chiamato per recuperare quei dati. Inoltre, l'evento 'readable' può anche essere emesso quando è stata raggiunta la fine dello stream.

js
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
  // Ci sono alcuni dati da leggere ora.
  let data

  while ((data = this.read()) !== null) {
    console.log(data)
  }
})

Se è stata raggiunta la fine dello stream, chiamare stream.read() restituirà null e innescherà l'evento 'end'. Questo è vero anche se non c'erano mai dati da leggere. Ad esempio, nell'esempio seguente, foo.txt è un file vuoto:

js
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
  console.log('end')
})

L'output dell'esecuzione di questo script è:

bash
$ node test.js
readable: null
end

In alcuni casi, l'aggancio di un ascoltatore per l'evento 'readable' causerà la lettura di una certa quantità di dati in un buffer interno.

In generale, i meccanismi readable.pipe() ed evento 'data' sono più facili da capire dell'evento 'readable'. Tuttavia, la gestione di 'readable' potrebbe comportare un aumento della produttività.

Se vengono utilizzati contemporaneamente sia 'readable' che 'data', 'readable' ha la precedenza nel controllo del flusso, ovvero 'data' verrà emesso solo quando viene chiamato stream.read(). La proprietà readableFlowing diventerà false. Se ci sono ascoltatori 'data' quando 'readable' viene rimosso, lo stream inizierà a fluire, ovvero gli eventi 'data' verranno emessi senza chiamare .resume().

Evento: 'resume'

Aggiunto in: v0.9.4

L'evento 'resume' viene emesso quando viene chiamato stream.resume() e readableFlowing non è true.

readable.destroy([error])

[Cronologia]

VersioneModifiche
v14.0.0Funziona come no-op su un flusso che è già stato distrutto.
v8.0.0Aggiunto in: v8.0.0
  • error <Error> Errore che verrà passato come payload nell'evento 'error'
  • Restituisce: <this>

Distrugge il flusso. Facoltativamente emette un evento 'error' e un evento 'close' (a meno che emitClose non sia impostato su false). Dopo questa chiamata, il flusso leggibile rilascerà tutte le risorse interne e le chiamate successive a push() verranno ignorate.

Una volta chiamato destroy(), ulteriori chiamate saranno no-op e non saranno emessi ulteriori errori, eccetto quelli da _destroy(), come 'error'.

Gli implementatori non dovrebbero sovrascrivere questo metodo, ma implementare invece readable._destroy().

readable.closed

Aggiunto in: v18.0.0

È true dopo che 'close' è stato emesso.

readable.destroyed

Aggiunto in: v8.0.0

È true dopo che readable.destroy() è stato chiamato.

readable.isPaused()

Aggiunto in: v0.11.14

Il metodo readable.isPaused() restituisce lo stato operativo corrente del Readable. Questo viene utilizzato principalmente dal meccanismo che sta alla base del metodo readable.pipe(). Nella maggior parte dei casi tipici, non ci sarà motivo di utilizzare questo metodo direttamente.

js
const readable = new stream.Readable()

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()

Aggiunto in: v0.9.4

Il metodo readable.pause() farà sì che uno stream in modalità flowing smetta di emettere eventi 'data', uscendo dalla modalità flowing. Qualsiasi dato che diventi disponibile rimarrà nel buffer interno.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Ricevuti ${chunk.length} byte di dati.`)
  readable.pause()
  console.log('Non ci saranno ulteriori dati per 1 secondo.')
  setTimeout(() => {
    console.log('Ora i dati riprenderanno a fluire.')
    readable.resume()
  }, 1000)
})

Il metodo readable.pause() non ha effetto se è presente un listener dell'evento 'readable'.

readable.pipe(destination[, options])

Aggiunto in: v0.9.4

Il metodo readable.pipe() aggancia uno stream Writable al readable, facendolo passare automaticamente in modalità flowing e spingendo tutti i suoi dati nello stream Writable agganciato. Il flusso di dati sarà gestito automaticamente in modo che lo stream Writable di destinazione non sia sopraffatto da uno stream Readable più veloce.

L'esempio seguente invia tutti i dati da readable in un file chiamato file.txt:

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Tutti i dati da readable vanno in 'file.txt'.
readable.pipe(writable)

È possibile agganciare più stream Writable a un singolo stream Readable.

Il metodo readable.pipe() restituisce un riferimento allo stream destinazione, rendendo possibile impostare catene di stream in pipe:

js
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)

Per impostazione predefinita, stream.end() viene chiamato sullo stream Writable di destinazione quando lo stream Readable di origine emette 'end', in modo che la destinazione non sia più scrivibile. Per disabilitare questo comportamento predefinito, l'opzione end può essere passata come false, facendo sì che lo stream di destinazione rimanga aperto:

js
reader.pipe(writer, { end: false })
reader.on('end', () => {
  writer.end('Arrivederci\n')
})

Un'importante avvertenza è che se lo stream Readable emette un errore durante l'elaborazione, la destinazione Writable non viene chiusa automaticamente. Se si verifica un errore, sarà necessario chiudere manualmente ogni stream per evitare perdite di memoria.

Gli stream Writable process.stderr e process.stdout non vengono mai chiusi fino all'uscita del processo Node.js, indipendentemente dalle opzioni specificate.

readable.read([size])

Aggiunto in: v0.9.4

Il metodo readable.read() legge i dati dal buffer interno e li restituisce. Se non sono disponibili dati da leggere, viene restituito null. Per impostazione predefinita, i dati vengono restituiti come oggetto Buffer a meno che non sia stata specificata una codifica usando il metodo readable.setEncoding() o lo stream stia operando in modalità oggetto.

L'argomento opzionale size specifica un numero specifico di byte da leggere. Se non sono disponibili size byte da leggere, verrà restituito null a meno che lo stream non sia terminato, nel qual caso verranno restituiti tutti i dati rimanenti nel buffer interno.

Se l'argomento size non è specificato, verranno restituiti tutti i dati contenuti nel buffer interno.

L'argomento size deve essere minore o uguale a 1 GiB.

Il metodo readable.read() dovrebbe essere chiamato solo su stream Readable che operano in modalità in pausa. In modalità flowing, readable.read() viene chiamato automaticamente fino a quando il buffer interno non è completamente svuotato.

js
const readable = getReadableStreamSomehow()

// 'readable' può essere attivato più volte mentre i dati vengono memorizzati nel buffer
readable.on('readable', () => {
  let chunk
  console.log('Lo stream è leggibile (nuovi dati ricevuti nel buffer)')
  // Usa un ciclo per assicurarti di leggere tutti i dati attualmente disponibili
  while (null !== (chunk = readable.read())) {
    console.log(`Letti ${chunk.length} byte di dati...`)
  }
})

// 'end' verrà attivato una volta quando non ci saranno più dati disponibili
readable.on('end', () => {
  console.log('Raggiunto la fine dello stream.')
})

Ogni chiamata a readable.read() restituisce un blocco di dati o null, a significare che non ci sono più dati da leggere in quel momento. Questi blocchi non vengono concatenati automaticamente. Poiché una singola chiamata read() non restituisce tutti i dati, potrebbe essere necessario utilizzare un ciclo while per leggere continuamente i blocchi fino a quando non vengono recuperati tutti i dati. Quando si legge un file di grandi dimensioni, .read() potrebbe restituire temporaneamente null, indicando che ha consumato tutto il contenuto in buffer ma potrebbero esserci altri dati ancora da memorizzare nel buffer. In tali casi, viene emesso un nuovo evento 'readable' una volta che ci sono più dati nel buffer, e l'evento 'end' segnala la fine della trasmissione dei dati.

Pertanto, per leggere l'intero contenuto di un file da un readable, è necessario raccogliere i blocchi attraverso più eventi 'readable':

js
const chunks = []

readable.on('readable', () => {
  let chunk
  while (null !== (chunk = readable.read())) {
    chunks.push(chunk)
  }
})

readable.on('end', () => {
  const content = chunks.join('')
})

Uno stream Readable in modalità oggetto restituirà sempre un singolo elemento da una chiamata a readable.read(size), indipendentemente dal valore dell'argomento size.

Se il metodo readable.read() restituisce un blocco di dati, verrà emesso anche un evento 'data'.

Chiamare stream.read([size]) dopo che l'evento 'end' è stato emesso restituirà null. Non verrà sollevato alcun errore di runtime.

readable.readable

Aggiunto in: v11.4.0

È true se è sicuro chiamare readable.read(), il che significa che il flusso non è stato distrutto o ha emesso 'error' o 'end'.

readable.readableAborted

Aggiunto in: v16.8.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

Restituisce se il flusso è stato distrutto o ha generato un errore prima di emettere 'end'.

readable.readableDidRead

Aggiunto in: v16.7.0, v14.18.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

Restituisce se 'data' è stato emesso.

readable.readableEncoding

Aggiunto in: v12.7.0

Getter per la proprietà encoding di un dato flusso Readable. La proprietà encoding può essere impostata usando il metodo readable.setEncoding().

readable.readableEnded

Aggiunto in: v12.9.0

Diventa true quando viene emesso l'evento 'end'.

readable.errored

Aggiunto in: v18.0.0

Restituisce un errore se il flusso è stato distrutto con un errore.

readable.readableFlowing

Aggiunto in: v9.4.0

Questa proprietà riflette lo stato corrente di un flusso Readable come descritto nella sezione Tre stati.

readable.readableHighWaterMark

Aggiunto in: v9.3.0

Restituisce il valore di highWaterMark passato durante la creazione di questo Readable.

readable.readableLength

Aggiunto in: v9.4.0

Questa proprietà contiene il numero di byte (o oggetti) nella coda pronti per essere letti. Il valore fornisce dati di introspezione sullo stato di highWaterMark.

readable.readableObjectMode

Aggiunto in: v12.3.0

Getter per la proprietà objectMode di un dato stream Readable.

readable.resume()

[Cronologia]

VersioneModifiche
v10.0.0Il metodo resume() non ha effetto se è presente un listener per l'evento 'readable'.
v0.9.4Aggiunto in: v0.9.4

Il metodo readable.resume() fa sì che uno stream Readable esplicitamente in pausa riprenda l'emissione di eventi 'data', passando lo stream in modalità flowing.

Il metodo readable.resume() può essere utilizzato per consumare completamente i dati da uno stream senza elaborare effettivamente nessuno di quei dati:

js
getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Raggiunto la fine, ma non è stato letto nulla.')
  })

Il metodo readable.resume() non ha effetto se è presente un listener per l'evento 'readable'.

readable.setEncoding(encoding)

Aggiunto in: v0.9.4

Il metodo readable.setEncoding() imposta la codifica dei caratteri per i dati letti dallo stream Readable.

Per impostazione predefinita, non viene assegnata alcuna codifica e i dati dello stream saranno restituiti come oggetti Buffer. Impostando una codifica, i dati dello stream vengono restituiti come stringhe della codifica specificata anziché come oggetti Buffer. Ad esempio, chiamare readable.setEncoding('utf8') farà sì che i dati di output vengano interpretati come dati UTF-8 e passati come stringhe. Chiamare readable.setEncoding('hex') farà sì che i dati vengano codificati in formato stringa esadecimale.

Lo stream Readable gestirà correttamente i caratteri multi-byte forniti tramite lo stream che altrimenti verrebbero decodificati in modo errato se semplicemente estratti dallo stream come oggetti Buffer.

js
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
  assert.equal(typeof chunk, 'string')
  console.log('Ricevuti %d caratteri di dati stringa:', chunk.length)
})
readable.unpipe([destination])

Aggiunto in: v0.9.4

Il metodo readable.unpipe() stacca uno stream Writable precedentemente collegato usando il metodo stream.pipe().

Se destination non è specificato, vengono staccati tutti i pipe.

Se destination è specificato, ma nessun pipe è impostato per esso, il metodo non fa nulla.

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Tutti i dati da readable vanno in 'file.txt',
// ma solo per il primo secondo.
readable.pipe(writable)
setTimeout(() => {
  console.log('Stop writing to file.txt.')
  readable.unpipe(writable)
  console.log('Manually close the file stream.')
  writable.end()
}, 1000)
readable.unshift(chunk[, encoding])

[Cronologia]

VersioneModifiche
v22.0.0, v20.13.0L'argomento chunk può ora essere un'istanza di TypedArray o DataView.
v8.0.0L'argomento chunk può ora essere un'istanza di Uint8Array.
v0.9.11Aggiunto in: v0.9.11

Passare chunk come null segnala la fine dello stream (EOF) e si comporta allo stesso modo di readable.push(null), dopo di che non è possibile scrivere più dati. Il segnale EOF viene inserito alla fine del buffer e tutti i dati in buffer verranno comunque svuotati.

Il metodo readable.unshift() inserisce un frammento di dati nel buffer interno. Questo è utile in determinate situazioni in cui uno stream viene consumato da codice che deve "dis-consumare" una certa quantità di dati che ha estratto ottimisticamente dalla sorgente, in modo che i dati possano essere trasmessi ad un'altra parte.

Il metodo stream.unshift(chunk) non può essere chiamato dopo che l'evento 'end' è stato emesso, altrimenti verrà generato un errore di runtime.

Gli sviluppatori che usano stream.unshift() dovrebbero spesso considerare di passare all'utilizzo di uno stream Transform invece. Vedi la sezione API per gli sviluppatori di stream per maggiori informazioni.

js
// Estrai un'intestazione delimitata da \n\n.
// Usa unshift() se ne ottieni troppo.
// Chiama la callback con (errore, intestazione, stream).
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
  stream.on('error', callback)
  stream.on('readable', onReadable)
  const decoder = new StringDecoder('utf8')
  let header = ''
  function onReadable() {
    let chunk
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk)
      if (str.includes('\n\n')) {
        // Trovata la delimitazione dell'intestazione.
        const split = str.split(/\n\n/)
        header += split.shift()
        const remaining = split.join('\n\n')
        const buf = Buffer.from(remaining, 'utf8')
        stream.removeListener('error', callback)
        // Rimuovi l'ascoltatore 'readable' prima di unshifting.
        stream.removeListener('readable', onReadable)
        if (buf.length) stream.unshift(buf)
        // Ora il corpo del messaggio può essere letto dallo stream.
        callback(null, header, stream)
        return
      }
      // Lettura ancora dell'intestazione.
      header += str
    }
  }
}

A differenza di stream.push(chunk), stream.unshift(chunk) non terminerà il processo di lettura reimpostando lo stato di lettura interno dello stream. Ciò può causare risultati imprevisti se readable.unshift() viene chiamato durante una lettura (ad esempio, dall'interno di un'implementazione stream._read() su uno stream personalizzato). Seguire la chiamata a readable.unshift() con un immediato stream.push('') reimposterà correttamente lo stato di lettura, tuttavia è meglio evitare semplicemente di chiamare readable.unshift() durante l'esecuzione di una lettura.

readable.wrap(stream)

Aggiunto in: v0.9.4

  • stream <Stream> Un flusso leggibile "vecchio stile"
  • Restituisce: <this>

Prima di Node.js 0.10, i flussi non implementavano l'intera API del modulo node:stream come attualmente definita. (Vedi Compatibilità per maggiori informazioni.)

Quando si utilizza una libreria Node.js più vecchia che emette eventi 'data' e ha un metodo stream.pause() che è solo consigliativo, il metodo readable.wrap() può essere utilizzato per creare un flusso Readable che utilizza il vecchio flusso come sua sorgente dati.

Sarà raramente necessario utilizzare readable.wrap(), ma il metodo è stato fornito per comodità nell'interazione con applicazioni e librerie Node.js più vecchie.

js
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)

myReader.on('readable', () => {
  myReader.read() // ecc.
})
readable[Symbol.asyncIterator]()

[Cronologia]

VersioneModifiche
v11.14.0Il supporto di Symbol.asyncIterator non è più sperimentale.
v10.0.0Aggiunto in: v10.0.0
js
const fs = require('node:fs')

async function print(readable) {
  readable.setEncoding('utf8')
  let data = ''
  for await (const chunk of readable) {
    data += chunk
  }
  console.log(data)
}

print(fs.createReadStream('file')).catch(console.error)

Se il ciclo termina con un break, return o un throw, il flusso verrà distrutto. In altri termini, iterare su un flusso consumerà completamente il flusso. Il flusso verrà letto in chunk di dimensioni pari all'opzione highWaterMark. Nell'esempio di codice sopra, i dati saranno in un singolo chunk se il file ha meno di 64 KiB di dati perché non viene fornita alcuna opzione highWaterMark a fs.createReadStream().

readable[Symbol.asyncDispose]()

Aggiunto in: v20.4.0, v18.18.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

Chiama readable.destroy() con un AbortError e restituisce una promise che si completa quando lo stream è terminato.

readable.compose(stream[, options])

Aggiunto in: v19.1.0, v18.13.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

js
import { Readable } from 'node:stream'

async function* splitToWords(source) {
  for await (const chunk of source) {
    const words = String(chunk).split(' ')

    for (const word of words) {
      yield word
    }
  }
}

const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()

console.log(words) // stampa ['this', 'is', 'compose', 'as', 'operator']

Vedi stream.compose per maggiori informazioni.

readable.iterator([options])

Aggiunto in: v16.3.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

  • options <Object>

    • destroyOnReturn <boolean> Quando impostato su false, chiamare return sull'iteratore asincrono, o uscire da un ciclo for await...of usando break, return o throw non distruggerà lo stream. Default: true.
  • Restituisce: <AsyncIterator> per consumare lo stream.

L'iteratore creato da questo metodo offre agli utenti la possibilità di annullare la distruzione dello stream se il ciclo for await...of viene lasciato con return, break o throw, o se l'iteratore dovrebbe distruggere lo stream se lo stream ha emesso un errore durante l'iterazione.

js
const { Readable } = require('node:stream')

async function printIterator(readable) {
  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk) // 1
    break
  }

  console.log(readable.destroyed) // false

  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk) // Stamperà 2 e poi 3
  }

  console.log(readable.destroyed) // True, lo stream è stato completamente consumato
}

async function printSymbolAsyncIterator(readable) {
  for await (const chunk of readable) {
    console.log(chunk) // 1
    break
  }

  console.log(readable.destroyed) // true
}

async function showBoth() {
  await printIterator(Readable.from([1, 2, 3]))
  await printSymbolAsyncIterator(Readable.from([1, 2, 3]))
}

showBoth()
readable.map(fn[, options])

[Cronologia]

VersioneModifiche
v20.7.0, v18.19.0Aggiunto highWaterMark nelle opzioni.
v17.4.0, v16.14.0Aggiunto in: v17.4.0, v16.14.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

  • fn <Funzione> | <AsyncFunction> una funzione da mappare su ogni chunk nel flusso.

    • data <qualsiasi> un chunk di dati dal flusso.
    • options <Oggetto>
    • signal <AbortSignal> interrotto se il flusso viene distrutto, consentendo di interrompere precocemente la chiamata fn.
  • options <Oggetto>

    • concurrency <numero> il numero massimo di invocazioni concorrenti di fn da chiamare contemporaneamente sul flusso. Default: 1.
    • highWaterMark <numero> quanti elementi mettere in buffer in attesa del consumo da parte dell'utente degli elementi mappati. Default: concurrency * 2 - 1.
    • signal <AbortSignal> permette di distruggere il flusso se il segnale viene interrotto.
  • Restituisce: <Readable> un flusso mappato con la funzione fn.

Questo metodo permette di mappare il flusso. La funzione fn verrà chiamata per ogni chunk nel flusso. Se la funzione fn restituisce una promise, tale promise verrà awaitata prima di essere passata al flusso di risultato.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// Con un mapper sincrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).map(x => x * 2)) {
  console.log(chunk) // 2, 4, 6, 8
}
// Con un mapper asincrono, effettuando al massimo 2 query contemporaneamente.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
  domain => resolver.resolve4(domain),
  { concurrency: 2 }
)
for await (const result of dnsResults) {
  console.log(result) // Stampa il risultato DNS di resolver.resolve4.
}
readable.filter(fn[, options])

[Cronologia]

VersioneModifiche
v20.7.0, v18.19.0Aggiunto highWaterMark nelle opzioni.
v17.4.0, v16.14.0Aggiunto in: v17.4.0, v16.14.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

  • fn <Funzione> | <AsyncFunction> una funzione per filtrare i chunk dal flusso.

    • data <qualsiasi> un chunk di dati dal flusso.
    • options <Oggetto>
    • signal <AbortSignal> interrotto se il flusso viene distrutto, consentendo di interrompere la chiamata fn in anticipo.
  • options <Oggetto>

    • concurrency <numero> il numero massimo di invocazioni concorrenti di fn da chiamare sul flusso contemporaneamente. Predefinito: 1.
    • highWaterMark <numero> quanti elementi bufferizzare mentre si attende il consumo degli elementi filtrati da parte dell'utente. Predefinito: concurrency * 2 - 1.
    • signal <AbortSignal> consente di distruggere il flusso se il segnale viene interrotto.
  • Restituisce: <Readable> un flusso filtrato con il predicato fn.

Questo metodo consente di filtrare il flusso. Per ogni chunk nel flusso verrà chiamata la funzione fn e, se restituisce un valore truthy, il chunk verrà passato al flusso di risultato. Se la funzione fn restituisce una promise, tale promise verrà awaitata.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// Con un predicato sincrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
  console.log(chunk) // 3, 4
}
// Con un predicato asincrono, effettuando al massimo 2 query contemporaneamente.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).filter(
  async domain => {
    const { address } = await resolver.resolve4(domain, { ttl: true })
    return address.ttl > 60
  },
  { concurrency: 2 }
)
for await (const result of dnsResults) {
  // Registra i domini con più di 60 secondi sul record DNS risolto.
  console.log(result)
}
readable.forEach(fn[, options])

Aggiunto in: v17.5.0, v16.15.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

  • fn <Function> | <AsyncFunction> una funzione da chiamare su ogni chunk del flusso.

    • data <any> un chunk di dati dal flusso.
    • options <Object>
    • signal <AbortSignal> interrotto se il flusso viene distrutto, consentendo di interrompere precocemente la chiamata fn.
  • options <Object>

    • concurrency <number> il massimo numero di invocazioni concorrenti di fn da chiamare sul flusso contemporaneamente. Predefinito: 1.
    • signal <AbortSignal> consente di distruggere il flusso se il segnale viene interrotto.
  • Restituisce: <Promise> una promise per quando il flusso è terminato.

Questo metodo consente di iterare un flusso. Per ogni chunk nel flusso verrà chiamata la funzione fn. Se la funzione fn restituisce una promise, quella promise verrà awaited.

Questo metodo è diverso dai cicli for await...of in quanto può elaborare i chunk in modo concorrente. Inoltre, un'iterazione forEach può essere interrotta solo passando un'opzione signal e interrompendo il relativo AbortController, mentre for await...of può essere interrotto con break o return. In entrambi i casi il flusso verrà distrutto.

Questo metodo è diverso dall'ascolto dell'evento 'data' in quanto utilizza l'evento readable nel meccanismo sottostante e può limitare il numero di chiamate concorrenti fn.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// Con un predicato sincrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
  console.log(chunk) // 3, 4
}
// Con un predicato asincrono, effettuando al massimo 2 query contemporaneamente.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
  async domain => {
    const { address } = await resolver.resolve4(domain, { ttl: true })
    return address
  },
  { concurrency: 2 }
)
await dnsResults.forEach(result => {
  // Registra il risultato, simile a `for await (const result of dnsResults)`
  console.log(result)
})
console.log('done') // Il flusso è terminato
readable.toArray([options])

Aggiunto in: v17.5.0, v16.15.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

  • options <Object>

    • signal <AbortSignal> permette di annullare l'operazione toArray se il segnale viene interrotto.
  • Restituisce: <Promise> una promise contenente un array con il contenuto dello stream.

Questo metodo permette di ottenere facilmente il contenuto di uno stream.

Poiché questo metodo legge l'intero stream in memoria, annulla i vantaggi degli stream. È destinato all'interoperabilità e alla convenienza, non come modo principale per consumare gli stream.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

await Readable.from([1, 2, 3, 4]).toArray() // [1, 2, 3, 4]

// Effettua query DNS in concorrenza usando .map e raccoglie
// i risultati in un array usando toArray
const dnsResults = await Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org'])
  .map(
    async domain => {
      const { address } = await resolver.resolve4(domain, { ttl: true })
      return address
    },
    { concurrency: 2 }
  )
  .toArray()
readable.some(fn[, options])

Aggiunto in: v17.5.0, v16.15.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

  • fn <Function> | <AsyncFunction> una funzione da chiamare su ogni chunk dello stream.

    • data <any> un chunk di dati dallo stream.
    • options <Object>
    • signal <AbortSignal> interrotto se lo stream viene distrutto permettendo di interrompere la chiamata fn anticipatamente.
  • options <Object>

    • concurrency <number> il massimo numero di invocazioni concorrenti di fn da chiamare sullo stream contemporaneamente. Default: 1.
    • signal <AbortSignal> permette di distruggere lo stream se il segnale viene interrotto.
  • Restituisce: <Promise> una promise che restituisce true se fn ha restituito un valore truthy per almeno uno dei chunk.

Questo metodo è simile a Array.prototype.some e chiama fn su ogni chunk nello stream finché il valore restituito atteso non è true (o qualsiasi valore truthy). Una volta che il valore restituito atteso di una chiamata fn su un chunk è truthy, lo stream viene distrutto e la promise viene completata con true. Se nessuna delle chiamate fn sui chunk restituisce un valore truthy, la promise viene completata con false.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// Con un predicato sincrono.
await Readable.from([1, 2, 3, 4]).some(x => x > 2) // true
await Readable.from([1, 2, 3, 4]).some(x => x < 0) // false

// Con un predicato asincrono, effettuando al massimo 2 controlli file alla volta.
const anyBigFile = await Readable.from(['file1', 'file2', 'file3']).some(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
console.log(anyBigFile) // `true` se qualsiasi file nell'elenco è più grande di 1MB
console.log('fatto') // Lo stream è terminato
readable.find(fn[, options])

Aggiunto in: v17.5.0, v16.17.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

  • fn <Function> | <AsyncFunction> una funzione da chiamare su ogni chunk del flusso.

    • data <any> un chunk di dati dal flusso.
    • options <Object>
    • signal <AbortSignal> abortito se il flusso viene distrutto, consentendo di abortire precocemente la chiamata fn.
  • options <Object>

    • concurrency <number> il numero massimo di invocazioni concorrenti di fn da chiamare sul flusso contemporaneamente. Default: 1.
    • signal <AbortSignal> permette di distruggere il flusso se il segnale viene abortito.
  • Restituisce: <Promise> una promise che restituisce il primo chunk per cui fn ha restituito un valore truthy, oppure undefined se nessun elemento è stato trovato.

Questo metodo è simile a Array.prototype.find e chiama fn su ogni chunk nel flusso per trovare un chunk con un valore truthy per fn. Una volta che il valore di ritorno atteso di una chiamata fn è truthy, il flusso viene distrutto e la promise viene completata con il valore per cui fn ha restituito un valore truthy. Se tutte le chiamate fn sui chunk restituiscono un valore falsy, la promise viene completata con undefined.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// Con un predicato sincrono.
await Readable.from([1, 2, 3, 4]).find(x => x > 2) // 3
await Readable.from([1, 2, 3, 4]).find(x => x > 0) // 1
await Readable.from([1, 2, 3, 4]).find(x => x > 10) // undefined

// Con un predicato asincrono, effettuando al massimo 2 controlli file contemporaneamente.
const foundBigFile = await Readable.from(['file1', 'file2', 'file3']).find(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
console.log(foundBigFile) // Nome del file di grandi dimensioni, se presente un file nell'elenco di dimensioni superiori a 1 MB
console.log('fatto') // Il flusso è terminato
readable.every(fn[, options])

Aggiunto in: v17.5.0, v16.15.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

  • fn <Function> | <AsyncFunction> una funzione da chiamare su ogni chunk del flusso.

    • data <any> un chunk di dati dal flusso.
    • options <Object>
    • signal <AbortSignal> abortito se il flusso viene distrutto, consentendo di abortire la chiamata fn in anticipo.
  • options <Object>

    • concurrency <number> il massimo numero di invocazioni concorrenti di fn da chiamare sul flusso contemporaneamente. Default: 1.
    • signal <AbortSignal> permette di distruggere il flusso se il segnale viene abortito.
  • Restituisce: <Promise> una promise che restituisce true se fn ha restituito un valore truthy per tutti i chunk.

Questo metodo è simile a Array.prototype.every e chiama fn su ogni chunk nel flusso per verificare se tutti i valori restituiti attesi sono valori truthy per fn. Una volta che un valore restituito atteso da una chiamata fn su un chunk è falsy, il flusso viene distrutto e la promise viene completata con false. Se tutte le chiamate fn sui chunk restituiscono un valore truthy, la promise viene completata con true.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// Con un predicato sincrono.
await Readable.from([1, 2, 3, 4]).every(x => x > 2) // false
await Readable.from([1, 2, 3, 4]).every(x => x > 0) // true

// Con un predicato asincrono, effettuando al massimo 2 controlli file contemporaneamente.
const allBigFiles = await Readable.from(['file1', 'file2', 'file3']).every(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
// `true` se tutti i file nell'elenco sono più grandi di 1MiB
console.log(allBigFiles)
console.log('fatto') // Il flusso è terminato
readable.flatMap(fn[, options])

Aggiunto in: v17.5.0, v16.15.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

Questo metodo restituisce un nuovo flusso applicando la callback data a ciascun chunk del flusso e quindi appiattendone il risultato.

È possibile restituire un flusso o un altro iterable o async iterable da fn e i flussi risultanti verranno uniti (appiattiti) nel flusso restituito.

js
import { Readable } from 'node:stream'
import { createReadStream } from 'node:fs'

// Con un mapper sincrono.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap(x => [x, x])) {
  console.log(chunk) // 1, 1, 2, 2, 3, 3, 4, 4
}
// Con un mapper asincrono, combina il contenuto di 4 file
const concatResult = Readable.from(['./1.mjs', './2.mjs', './3.mjs', './4.mjs']).flatMap(fileName =>
  createReadStream(fileName)
)
for await (const result of concatResult) {
  // Questo conterrà il contenuto (tutti i chunk) di tutti e 4 i file
  console.log(result)
}
readable.drop(limit[, options])

Aggiunto in: v17.5.0, v16.15.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

  • limit <number> il numero di chunk da rimuovere dal readable.

  • options <Object>

    • signal <AbortSignal> permette di distruggere lo stream se il segnale viene interrotto.
  • Restituisce: <Readable> uno stream con limit chunk rimossi.

Questo metodo restituisce un nuovo stream con i primi limit chunk rimossi.

js
import { Readable } from 'node:stream'

await Readable.from([1, 2, 3, 4]).drop(2).toArray() // [3, 4]
readable.take(limit[, options])

Aggiunto in: v17.5.0, v16.15.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

  • limit <number> il numero di chunk da prendere dal readable.

  • options <Object>

    • signal <AbortSignal> permette di distruggere lo stream se il segnale viene interrotto.
  • Restituisce: <Readable> uno stream con limit chunk presi.

Questo metodo restituisce un nuovo stream con i primi limit chunk.

js
import { Readable } from 'node:stream'

await Readable.from([1, 2, 3, 4]).take(2).toArray() // [1, 2]
readable.reduce(fn[, initial[, options]])

Aggiunto in: v17.5.0, v16.15.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

  • fn <Function> | <AsyncFunction> una funzione di riduzione da chiamare su ogni chunk nello stream.

    • previous <any> il valore ottenuto dall'ultima chiamata a fn o il valore initial se specificato o il primo chunk dello stream altrimenti.
    • data <any> un chunk di dati dallo stream.
    • options <Object>
    • signal <AbortSignal> interrotto se lo stream viene distrutto, permettendo di interrompere precocemente la chiamata fn.
  • initial <any> il valore iniziale da usare nella riduzione.

  • options <Object>

    • signal <AbortSignal> permette di distruggere lo stream se il segnale viene interrotto.
  • Restituisce: <Promise> una promise per il valore finale della riduzione.

Questo metodo chiama fn su ogni chunk dello stream in ordine, passandogli il risultato del calcolo sull'elemento precedente. Restituisce una promise per il valore finale della riduzione.

Se non viene fornito alcun valore initial, il primo chunk dello stream viene usato come valore iniziale. Se lo stream è vuoto, la promise viene rifiutata con un TypeError con la proprietà del codice ERR_INVALID_ARGS.

js
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'

const directoryPath = './src'
const filesInDir = await readdir(directoryPath)

const folderSize = await Readable.from(filesInDir).reduce(async (totalSize, file) => {
  const { size } = await stat(join(directoryPath, file))
  return totalSize + size
}, 0)

console.log(folderSize)

La funzione di riduzione itera l'elemento dello stream uno per uno, il che significa che non c'è alcun parametro concurrency o parallelismo. Per eseguire una reduce in modo concorrente, è possibile estrarre la funzione async nel metodo readable.map.

js
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'

const directoryPath = './src'
const filesInDir = await readdir(directoryPath)

const folderSize = await Readable.from(filesInDir)
  .map(file => stat(join(directoryPath, file)), { concurrency: 2 })
  .reduce((totalSize, { size }) => totalSize + size, 0)

console.log(folderSize)

Stream Duplex e Transform

Classe: stream.Duplex

[Cronologia]

VersioneModifiche
v6.8.0Le istanze di Duplex ora restituiscono true quando si verifica instanceof stream.Writable.
v0.9.4Aggiunto in: v0.9.4

Gli stream Duplex sono stream che implementano sia l'interfaccia Readable che Writable.

Esempi di stream Duplex includono:

duplex.allowHalfOpen

Aggiunto in: v0.9.4

Se false, lo stream terminerà automaticamente il lato scrivibile quando termina il lato leggibile. Impostato inizialmente dall'opzione del costruttore allowHalfOpen, che di default è true.

Questo può essere modificato manualmente per cambiare il comportamento half-open di un'istanza di stream Duplex esistente, ma deve essere modificato prima che venga emesso l'evento 'end'.

Classe: stream.Transform

Aggiunto in: v0.9.4

Gli stream Transform sono stream Duplex in cui l'output è in qualche modo correlato all'input. Come tutti gli stream Duplex, gli stream Transform implementano sia l'interfaccia Readable che Writable.

Esempi di stream Transform includono:

transform.destroy([error])

[Cronologia]

VersioneModifiche
v14.0.0Funziona come no-op su uno stream che è già stato distrutto.
v8.0.0Aggiunto in: v8.0.0

Distrugge lo stream e, facoltativamente, emette un evento 'error'. Dopo questa chiamata, lo stream di trasformazione rilascerà tutte le risorse interne. Gli implementatori non dovrebbero sovrascrivere questo metodo, ma invece implementare readable._destroy(). L'implementazione predefinita di _destroy() per Transform emette anche 'close' a meno che emitClose non sia impostato su false.

Una volta che destroy() è stato chiamato, ulteriori chiamate saranno un no-op e non saranno emessi ulteriori errori, eccetto quelli provenienti da _destroy(), come 'error'.

stream.duplexPair([options])

Aggiunto in: v22.6.0, v20.17.0

  • options <Object> Un valore da passare a entrambi i costruttori Duplex, per impostare opzioni come il buffering.
  • Restituisce: <Array> di due istanze Duplex.

La funzione di utilità duplexPair restituisce un Array con due elementi, ognuno dei quali è un flusso Duplex connesso all'altro lato:

js
const [sideA, sideB] = duplexPair()

Qualsiasi cosa venga scritta in un flusso diventa leggibile nell'altro. Fornisce un comportamento analogo a una connessione di rete, in cui i dati scritti dal client diventano leggibili dal server e viceversa.

I flussi Duplex sono simmetrici; uno o l'altro può essere utilizzato senza alcuna differenza di comportamento.

stream.finished(stream[, options], callback)

[Cronologia]

VersioneModifiche
v19.5.0Aggiunto supporto per ReadableStream e WritableStream.
v15.11.0È stata aggiunta l'opzione signal.
v14.0.0finished(stream, cb) attenderà l'evento 'close' prima di invocare la callback. L'implementazione tenta di rilevare i flussi legacy e applica questo comportamento solo ai flussi che si prevede emettano 'close'.
v14.0.0L'emissione di 'close' prima di 'end' su un flusso Readable causerà un errore ERR_STREAM_PREMATURE_CLOSE.
v14.0.0La callback verrà invocata sui flussi che hanno già terminato prima della chiamata a finished(stream, cb).
v10.0.0Aggiunto in: v10.0.0
  • stream <Stream> | <ReadableStream> | <WritableStream> Un flusso/webstream leggibile e/o scrivibile.

  • options <Object>

    • error <boolean> Se impostato su false, una chiamata a emit('error', err) non è trattata come terminata. Default: true.
    • readable <boolean> Quando impostato su false, la callback verrà chiamata quando il flusso termina anche se il flusso potrebbe essere ancora leggibile. Default: true.
    • writable <boolean> Quando impostato su false, la callback verrà chiamata quando il flusso termina anche se il flusso potrebbe essere ancora scrivibile. Default: true.
    • signal <AbortSignal> permette di interrompere l'attesa della fine del flusso. Il flusso sottostante non verrà interrotto se il segnale viene interrotto. La callback verrà chiamata con un AbortError. Tutti gli ascoltatori registrati aggiunti da questa funzione verranno rimossi.
  • callback <Function> Una funzione di callback che accetta un argomento di errore opzionale.

  • Restituisce: <Function> Una funzione di pulizia che rimuove tutti gli ascoltatori registrati.

Una funzione per ricevere una notifica quando un flusso non è più leggibile, scrivibile o ha riscontrato un errore o un evento di chiusura prematura.

js
const { finished } = require('node:stream')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

finished(rs, err => {
  if (err) {
    console.error('Stream fallito.', err)
  } else {
    console.log('Stream ha finito di leggere.')
  }
})

rs.resume() // Svuota il flusso.

Particolarmente utile negli scenari di gestione degli errori in cui un flusso viene distrutto prematuramente (come una richiesta HTTP interrotta) e non emetterà 'end' o 'finish'.

L'API finished fornisce una versione promise.

stream.finished() lascia gli ascoltatori di eventi in sospeso (in particolare 'error', 'end', 'finish' e 'close') dopo che la callback è stata invocata. La ragione di ciò è che eventi 'error' inaspettati (a causa di implementazioni di flusso errate) non causano arresti anomali imprevisti. Se questo è un comportamento indesiderato, è necessario invocare la funzione di pulizia restituita nella callback:

js
const cleanup = finished(rs, err => {
  cleanup()
  // ...
})

stream.pipeline(source[, ...transforms], destination, callback)

stream.pipeline(streams, callback)

[Cronologia]

VersioneModifiche
v19.7.0, v18.16.0Aggiunto supporto per webstreams.
v18.0.0Il passaggio di un callback non valido all'argomento callback ora genera ERR_INVALID_ARG_TYPE invece di ERR_INVALID_CALLBACK.
v14.0.0pipeline(..., cb) attenderà l'evento 'close' prima di invocare il callback. L'implementazione tenta di rilevare i flussi legacy e applica questo comportamento solo ai flussi che si prevede emettano 'close'.
v13.10.0Aggiunto supporto per i generatori asincroni.
v10.0.0Aggiunto in: v10.0.0

Un metodo del modulo per effettuare il piping tra flussi e generatori, inoltrando gli errori e pulendo correttamente, e fornendo un callback al completamento della pipeline.

js
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')

// Utilizzare l'API pipeline per collegare facilmente una serie di flussi
// insieme e ricevere una notifica quando la pipeline è completamente terminata.

// Una pipeline per comprimere in gzip un file tar potenzialmente enorme in modo efficiente:

pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
  if (err) {
    console.error('Pipeline fallita.', err)
  } else {
    console.log('Pipeline completata.')
  }
})

L'API pipeline fornisce una versione promise.

stream.pipeline() chiamerà stream.destroy(err) su tutti i flussi tranne:

  • Flussi Readable che hanno emesso 'end' o 'close'.
  • Flussi Writable che hanno emesso 'finish' o 'close'.

stream.pipeline() lascia listener di eventi in sospeso sui flussi dopo che il callback è stato invocato. Nel caso di riutilizzo dei flussi dopo un errore, questo può causare perdite di listener di eventi ed errori soppressi. Se l'ultimo flusso è leggibile, i listener di eventi in sospeso verranno rimossi in modo che l'ultimo flusso possa essere consumato in seguito.

stream.pipeline() chiude tutti i flussi quando viene sollevato un errore. L'utilizzo di IncomingRequest con pipeline potrebbe portare a un comportamento inaspettato una volta che distrugge il socket senza inviare la risposta prevista. Vedi l'esempio seguente:

js
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')

const server = http.createServer((req, res) => {
  const fileStream = fs.createReadStream('./fileNotExist.txt')
  pipeline(fileStream, res, err => {
    if (err) {
      console.log(err) // File non trovato
      // questo messaggio non può essere inviato una volta che `pipeline` ha già distrutto il socket
      return res.end('error!!!')
    }
  })
})

stream.compose(...streams)

[Cronologia]

VersioneModifiche
v21.1.0, v20.10.0Aggiunto supporto per la classe stream.
v19.8.0, v18.16.0Aggiunto supporto per webstreams.
v16.9.0Aggiunto in: v16.9.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - stream.compose è sperimentale.

Combina due o più stream in un flusso Duplex che scrive sul primo stream e legge dall'ultimo. Ogni stream fornito viene inserito nel successivo, usando stream.pipeline. Se uno qualsiasi degli stream genera un errore, tutti vengono distrutti, incluso lo stream Duplex esterno.

Poiché stream.compose restituisce un nuovo stream che a sua volta può (e dovrebbe) essere inserito in altri stream, abilita la composizione. Al contrario, quando si passano stream a stream.pipeline, in genere il primo stream è un flusso leggibile e l'ultimo un flusso scrivibile, formando un circuito chiuso.

Se viene passata una Function, questa deve essere un metodo factory che accetta un source Iterable.

js
import { compose, Transform } from 'node:stream'

const removeSpaces = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''))
  },
})

async function* toUpper(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
}

let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
  res += buf
}

console.log(res) // stampa 'HELLOWORLD'

stream.compose può essere usato per convertire iterabili asincroni, generatori e funzioni in stream.

  • AsyncIterable si converte in un Duplex leggibile. Non può generare null.
  • AsyncGeneratorFunction si converte in un Duplex trasformante leggibile/scrivibile. Deve accettare un source AsyncIterable come primo parametro. Non può generare null.
  • AsyncFunction si converte in un Duplex scrivibile. Deve restituire null o undefined.
js
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'

// Converte AsyncIterable in Duplex leggibile.
const s1 = compose(
  (async function* () {
    yield 'Hello'
    yield 'World'
  })()
)

// Converte AsyncGenerator in Duplex trasformante.
const s2 = compose(async function* (source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
})

let res = ''

// Converte AsyncFunction in Duplex scrivibile.
const s3 = compose(async function (source) {
  for await (const chunk of source) {
    res += chunk
  }
})

await finished(compose(s1, s2, s3))

console.log(res) // stampa 'HELLOWORLD'

Vedi readable.compose(stream) per stream.compose come operatore.

stream.Readable.from(iterable[, options])

Aggiunto in: v12.3.0, v10.17.0

  • iterable <Iterable> Oggetto che implementa il protocollo iterabile Symbol.asyncIterator o Symbol.iterator. Emette un evento 'error' se viene passato un valore null.
  • options <Object> Opzioni fornite a new stream.Readable([options]). Per impostazione predefinita, Readable.from() imposterà options.objectMode su true, a meno che ciò non venga esplicitamente escluso impostando options.objectMode su false.
  • Restituisce: <stream.Readable>

Un metodo di utilità per creare stream leggibili da iteratori.

js
const { Readable } = require('node:stream')

async function* generate() {
  yield 'hello'
  yield 'streams'
}

const readable = Readable.from(generate())

readable.on('data', chunk => {
  console.log(chunk)
})

La chiamata Readable.from(string) o Readable.from(buffer) non itererà le stringhe o i buffer per motivi di prestazioni.

Se viene passato come argomento un oggetto Iterable contenente promise, potrebbe verificarsi un'eccezione non gestita.

js
const { Readable } = require('node:stream')

Readable.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Eccezione non gestita
])

stream.Readable.fromWeb(readableStream[, options])

Aggiunto in: v17.0.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

stream.Readable.isDisturbed(stream)

Aggiunto in: v16.8.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

Restituisce se il flusso è stato letto o annullato.

stream.isErrored(stream)

Aggiunto in: v17.3.0, v16.14.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

Restituisce se il flusso ha incontrato un errore.

stream.isReadable(stream)

Aggiunto in: v17.4.0, v16.14.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

Restituisce se il flusso è leggibile.

stream.Readable.toWeb(streamReadable[, options])

Aggiunto in: v17.0.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

  • streamReadable <stream.Readable>

  • options <Object>

    • strategy <Object>
    • highWaterMark <number> La dimensione massima della coda interna (del ReadableStream creato) prima che venga applicata la contro-pressione nella lettura dal dato stream.Readable. Se non viene fornito alcun valore, verrà preso dal dato stream.Readable.
    • size <Function> Una funzione che indica la dimensione del dato chunk di dati. Se non viene fornito alcun valore, la dimensione sarà 1 per tutti i chunk.
    • chunk <any>
    • Restituisce: <number>
  • Restituisce: <ReadableStream>

stream.Writable.fromWeb(writableStream[, options])

Aggiunto in: v17.0.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

stream.Writable.toWeb(streamWritable)

Aggiunto in: v17.0.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

stream.Duplex.from(src)

[Cronologia]

VersioneModifiche
v19.5.0, v18.17.0L'argomento src può ora essere un ReadableStream o un WritableStream.
v16.8.0Aggiunto in: v16.8.0

Un metodo di utilità per la creazione di stream duplex.

  • Stream converte lo stream scrivibile in Duplex scrivibile e lo stream leggibile in Duplex.
  • Blob converte in Duplex leggibile.
  • string converte in Duplex leggibile.
  • ArrayBuffer converte in Duplex leggibile.
  • AsyncIterable converte in un Duplex leggibile. Non può restituire null.
  • AsyncGeneratorFunction converte in un Duplex di trasformazione leggibile/scrivibile. Deve accettare un AsyncIterable di origine come primo parametro. Non può restituire null.
  • AsyncFunction converte in un Duplex scrivibile. Deve restituire null o undefined.
  • Object ({ writable, readable }) converte readable e writable in Stream e poi li combina in Duplex dove Duplex scriverà su writable e leggerà da readable.
  • Promise converte in Duplex leggibile. Il valore null viene ignorato.
  • ReadableStream converte in Duplex leggibile.
  • WritableStream converte in Duplex scrivibile.
  • Restituisce: <stream.Duplex>

Se viene passato come argomento un oggetto Iterable contenente promesse, potrebbe causare un'eccezione non gestita.

js
const { Duplex } = require('node:stream')

Duplex.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Eccezione non gestita
])

stream.Duplex.fromWeb(pair[, options])

Aggiunto in: v17.0.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

js
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')

for await (const chunk of duplex) {
  console.log('readable', chunk)
}
js
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))

stream.Duplex.toWeb(streamDuplex)

Aggiunto in: v17.0.0

[Stabile: 1 - Sperimentale]

Stabile: 1 Stabilità: 1 - Sperimentale

js
import { Duplex } from 'node:stream'

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

const { value } = await readable.getReader().read()
console.log('readable', value)
js
const { Duplex } = require('node:stream')

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

readable
  .getReader()
  .read()
  .then(result => {
    console.log('readable', result.value)
  })

stream.addAbortSignal(signal, stream)

[Cronologia]

VersioneModifiche
v19.7.0, v18.16.0Aggiunto supporto per ReadableStream e WritableStream.
v15.4.0Aggiunto in: v15.4.0

Allega un AbortSignal a un flusso leggibile o scrivibile. Questo permette al codice di controllare la distruzione del flusso usando un AbortController.

Chiamare abort sull' AbortController corrispondente all' AbortSignal passato si comporterà nello stesso modo di chiamare .destroy(new AbortError()) sul flusso, e controller.error(new AbortError()) per i webstreams.

js
const fs = require('node:fs')

const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// Successivamente, interrompe l'operazione chiudendo il flusso
controller.abort()

Oppure usando un AbortSignal con un flusso leggibile come iterabile asincrono:

js
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // imposta un timeout
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
  try {
    for await (const chunk of stream) {
      await process(chunk)
    }
  } catch (e) {
    if (e.name === 'AbortError') {
      // L'operazione è stata annullata
    } else {
      throw e
    }
  }
})()

Oppure usando un AbortSignal con un ReadableStream:

js
const controller = new AbortController()
const rs = new ReadableStream({
  start(controller) {
    controller.enqueue('hello')
    controller.enqueue('world')
    controller.close()
  },
})

addAbortSignal(controller.signal, rs)

finished(rs, err => {
  if (err) {
    if (err.name === 'AbortError') {
      // L'operazione è stata annullata
    }
  }
})

const reader = rs.getReader()

reader.read().then(({ value, done }) => {
  console.log(value) // hello
  console.log(done) // false
  controller.abort()
})

stream.getDefaultHighWaterMark(objectMode)

Aggiunto in: v19.9.0, v18.17.0

Restituisce il valore highWaterMark predefinito utilizzato dagli stream. Il valore predefinito è 65536 (64 KiB), oppure 16 per objectMode.

stream.setDefaultHighWaterMark(objectMode, value)

Aggiunto in: v19.9.0, v18.17.0

Imposta il valore highWaterMark predefinito utilizzato dagli stream.

API per gli sviluppatori di stream

L'API del modulo node:stream è stata progettata per rendere semplice l'implementazione di stream utilizzando il modello di ereditarietà prototipale di JavaScript.

Innanzitutto, uno sviluppatore di stream dichiarerebbe una nuova classe JavaScript che estende una delle quattro classi di stream di base (stream.Writable, stream.Readable, stream.Duplex o stream.Transform), assicurandosi di chiamare il costruttore della classe padre appropriato:

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor({ highWaterMark, ...options }) {
    super({ highWaterMark })
    // ...
  }
}

Quando si estendono gli stream, tenere presente quali opzioni l'utente può e deve fornire prima di inoltrarle al costruttore di base. Ad esempio, se l'implementazione fa ipotesi riguardo alle opzioni autoDestroy ed emitClose, non consentire all'utente di sovrascriverle. Essere espliciti su quali opzioni vengono inoltrate invece di inoltrare implicitamente tutte le opzioni.

La nuova classe stream deve quindi implementare uno o più metodi specifici, a seconda del tipo di stream che viene creato, come dettagliato nella tabella seguente:

Caso d'usoClasseMetodo(i) da implementare
Sola letturaReadable_read()
Sola scritturaWritable_write() , _writev() , _final()
Lettura e scritturaDuplex_read() , _write() , _writev() , _final()
Operare sui dati scritti, quindi leggere il risultatoTransform_transform() , _flush() , _final()

Il codice di implementazione di uno stream non deve mai chiamare i metodi "pubblici" di uno stream che sono destinati all'uso da parte dei consumatori (come descritto nella sezione API per i consumatori di stream). In tal caso, potrebbero verificarsi effetti collaterali negativi nel codice dell'applicazione che utilizza lo stream.

Evitare di sovrascrivere metodi pubblici come write(), end(), cork(), uncork(), read() e destroy(), o di emettere eventi interni come 'error', 'data', 'end', 'finish' e 'close' tramite .emit(). In tal caso, si potrebbero violare gli invarianti correnti e futuri dello stream, causando problemi di comportamento e/o compatibilità con altri stream, utilità di stream e aspettative dell'utente.

Costruzione semplificata

Aggiunto in: v1.2.0

Per molti casi semplici, è possibile creare un flusso senza ricorrere all'ereditarietà. Questo può essere ottenuto creando direttamente istanze degli oggetti stream.Writable, stream.Readable, stream.Duplex o stream.Transform e passando metodi appropriati come opzioni del costruttore.

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  construct(callback) {
    // Inizializza lo stato e carica le risorse...
  },
  write(chunk, encoding, callback) {
    // ...
  },
  destroy() {
    // Libera le risorse...
  },
})

Implementare un flusso scrivibile

La classe stream.Writable viene estesa per implementare un flusso Writable.

I flussi Writable personalizzati devono chiamare il costruttore new stream.Writable([options]) e implementare il metodo writable._write() e/o writable._writev().

new stream.Writable([options])

[Cronologia]

VersioneModifiche
v22.0.0Aumenta il valore predefinito di highWaterMark.
v15.5.0Supporta il passaggio di un AbortSignal.
v14.0.0Cambia il valore predefinito dell'opzione autoDestroy in true.
v11.2.0, v10.16.0Aggiunge l'opzione autoDestroy per distruggere automaticamente il flusso quando emette 'finish' o errori.
v10.0.0Aggiunge l'opzione emitClose per specificare se 'close' viene emesso alla distruzione.
js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor(options) {
    // Chiama il costruttore stream.Writable().
    super(options)
    // ...
  }
}

Oppure, quando si utilizzano costruttori in stile pre-ES6:

js
const { Writable } = require('node:stream')
const util = require('node:util')

function MyWritable(options) {
  if (!(this instanceof MyWritable)) return new MyWritable(options)
  Writable.call(this, options)
}
util.inherits(MyWritable, Writable)

Oppure, usando l'approccio del costruttore semplificato:

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
})

Chiamare abort sull'oggetto AbortController corrispondente all'oggetto AbortSignal passato si comporterà allo stesso modo di chiamare .destroy(new AbortError()) sul flusso scrivibile.

js
const { Writable } = require('node:stream')

const controller = new AbortController()
const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
  signal: controller.signal,
})
// Più tardi, interrompi l'operazione chiudendo il flusso
controller.abort()

writable._construct(callback)

Aggiunto in: v15.0.0

  • callback <Function> Chiama questa funzione (facoltativamente con un argomento di errore) quando lo stream ha terminato l'inizializzazione.

Il metodo _construct() NON DEVE essere chiamato direttamente. Può essere implementato dalle classi figlie e, in tal caso, verrà chiamato solo dai metodi interni della classe Writable.

Questa funzione opzionale verrà chiamata in un tick dopo che il costruttore dello stream è ritornato, ritardando qualsiasi chiamata a _write(), _final() e _destroy() fino a quando callback non viene chiamato. Questo è utile per inizializzare lo stato o inizializzare asincronamente le risorse prima che lo stream possa essere utilizzato.

js
const { Writable } = require('node:stream')
const fs = require('node:fs')

class WriteStream extends Writable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, 'w', (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _write(chunk, encoding, callback) {
    fs.write(this.fd, chunk, callback)
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

writable._write(chunk, encoding, callback)

[Cronologia]

VersioneModifiche
v12.11.0_write() è facoltativo quando si fornisce _writev().
  • chunk <Buffer> | <string> | <any> Il Buffer da scrivere, convertito dalla string passata a stream.write(). Se l'opzione decodeStrings dello stream è false o lo stream sta operando in modalità oggetto, il chunk non verrà convertito e sarà ciò che è stato passato a stream.write().
  • encoding <string> Se il chunk è una stringa, allora encoding è la codifica dei caratteri di quella stringa. Se il chunk è un Buffer, o se lo stream sta operando in modalità oggetto, encoding può essere ignorato.
  • callback <Function> Chiama questa funzione (facoltativamente con un argomento di errore) quando l'elaborazione è completa per il chunk fornito.

Tutte le implementazioni dello stream Writable devono fornire un metodo writable._write() e/o writable._writev() per inviare dati alla risorsa sottostante.

Gli stream Transform forniscono la propria implementazione di writable._write().

Questa funzione NON DEVE essere chiamata direttamente dal codice dell'applicazione. Dovrebbe essere implementata dalle classi figlie e chiamata solo dai metodi interni della classe Writable.

La funzione callback deve essere chiamata sincronicamente all'interno di writable._write() o asincronicamente (cioè tick diverso) per segnalare che la scrittura è stata completata correttamente o è fallita con un errore. Il primo argomento passato al callback deve essere l'oggetto Error se la chiamata è fallita o null se la scrittura è riuscita.

Tutte le chiamate a writable.write() che si verificano tra il momento in cui viene chiamato writable._write() e il momento in cui viene chiamato callback faranno sì che i dati scritti vengano memorizzati nel buffer. Quando viene richiamato callback, lo stream potrebbe emettere un evento 'drain'. Se un'implementazione di stream è in grado di elaborare più chunk di dati contemporaneamente, il metodo writable._writev() dovrebbe essere implementato.

Se la proprietà decodeStrings è impostata esplicitamente su false nelle opzioni del costruttore, allora chunk rimarrà lo stesso oggetto passato a .write(), e potrebbe essere una stringa piuttosto che un Buffer. Questo per supportare implementazioni che hanno una gestione ottimizzata per alcune codifiche di dati stringa. In tal caso, l'argomento encoding indicherà la codifica dei caratteri della stringa. Altrimenti, l'argomento encoding può essere ignorato in sicurezza.

Il metodo writable._write() è preceduto da un trattino basso perché è interno alla classe che lo definisce e non dovrebbe mai essere chiamato direttamente dai programmi utente.

writable._writev(chunks, callback)

  • chunks <Object[]> I dati da scrivere. Il valore è un array di <Object> che rappresentano ciascuno un blocco discreto di dati da scrivere. Le proprietà di questi oggetti sono:

    • chunk <Buffer> | <string> Un'istanza di buffer o una stringa contenente i dati da scrivere. chunk sarà una stringa se Writable è stato creato con l'opzione decodeStrings impostata su false ed è stata passata una stringa a write().
    • encoding <string> La codifica a caratteri di chunk. Se chunk è un Buffer, encoding sarà 'buffer'.
  • callback <Function> Una funzione di callback (facoltativamente con un argomento di errore) da invocare al completamento dell'elaborazione per i blocchi forniti.

Questa funzione NON DEVE essere chiamata direttamente dal codice dell'applicazione. Dovrebbe essere implementata dalle classi figlio e chiamata solo dai metodi interni della classe Writable.

Il metodo writable._writev() può essere implementato in aggiunta o in alternativa a writable._write() nelle implementazioni di stream in grado di elaborare più blocchi di dati contemporaneamente. Se implementato e se ci sono dati in buffer da scritture precedenti, verrà chiamato _writev() invece di _write().

Il metodo writable._writev() è prefissato con un trattino basso perché è interno alla classe che lo definisce e non dovrebbe mai essere chiamato direttamente dai programmi utente.

writable._destroy(err, callback)

Aggiunto in: v8.0.0

  • err <Error> Un possibile errore.
  • callback <Function> Una funzione di callback che accetta un argomento di errore facoltativo.

Il metodo _destroy() viene chiamato da writable.destroy(). Può essere sovrascritto dalle classi figlio, ma non deve essere chiamato direttamente.

writable._final(callback)

Aggiunto in: v8.0.0

  • callback <Function> Chiama questa funzione (facoltativamente con un argomento di errore) al termine della scrittura di tutti i dati rimanenti.

Il metodo _final() non deve essere chiamato direttamente. Può essere implementato dalle classi figlio e, in tal caso, verrà chiamato solo dai metodi interni della classe Writable.

Questa funzione facoltativa verrà chiamata prima della chiusura del flusso, ritardando l'evento 'finish' fino a quando callback non viene chiamato. Questo è utile per chiudere le risorse o scrivere i dati in buffer prima della fine di un flusso.

Errori durante la scrittura

Gli errori che si verificano durante l'elaborazione dei metodi writable._write(), writable._writev() e writable._final() devono essere propagati invocando la callback e passando l'errore come primo argomento. Generare un Error all'interno di questi metodi o emettere manualmente un evento 'error' comporta un comportamento indefinito.

Se un flusso Readable viene inserito in un flusso Writable quando Writable emette un errore, il flusso Readable verrà disinserito.

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'))
    } else {
      callback()
    }
  },
})

Un esempio di flusso scrivibile

Di seguito è illustrata un'implementazione di flusso Writable personalizzato piuttosto semplicistica (e in qualche modo inutile). Sebbene questa specifica istanza di flusso Writable non sia di alcuna particolare utilità reale, l'esempio illustra ciascuno degli elementi necessari di un'istanza di flusso Writable personalizzata:

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'))
    } else {
      callback()
    }
  }
}

Decodifica dei buffer in uno stream scrivibile

La decodifica dei buffer è un'operazione comune, ad esempio quando si utilizzano trasformatori il cui input è una stringa. Questo non è un processo banale quando si utilizza una codifica a caratteri multi-byte, come UTF-8. L'esempio seguente mostra come decodificare stringhe multi-byte usando StringDecoder e Writable.

js
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')

class StringWritable extends Writable {
  constructor(options) {
    super(options)
    this._decoder = new StringDecoder(options?.defaultEncoding)
    this.data = ''
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk)
    }
    this.data += chunk
    callback()
  }
  _final(callback) {
    this.data += this._decoder.end()
    callback()
  }
}

const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()

w.write('currency: ')
w.write(euro[0])
w.end(euro[1])

console.log(w.data) // currency: €

Implementare uno stream leggibile

La classe stream.Readable viene estesa per implementare uno stream Readable.

Gli stream Readable personalizzati devono chiamare il costruttore new stream.Readable([options]) e implementare il metodo readable._read().

new stream.Readable([options])

[Cronologia]

VersioneModifiche
v22.0.0aumento del valore predefinito di highWaterMark.
v15.5.0supporto per il passaggio di un AbortSignal.
v14.0.0modifica del valore predefinito dell'opzione autoDestroy a true.
v11.2.0, v10.16.0aggiunta dell'opzione autoDestroy per distruggere automaticamente lo stream quando emette 'end' o errori.
  • options <Oggetto>
    • highWaterMark <numero> Il numero massimo di byte da memorizzare nel buffer interno prima di smettere di leggere dalla risorsa sottostante. Predefinito: 65536 (64 KiB), o 16 per gli stream objectMode.
    • encoding <stringa> Se specificato, i buffer verranno decodificati in stringhe usando la codifica specificata. Predefinito: null.
    • objectMode <booleano> Se questo stream dovrebbe comportarsi come uno stream di oggetti. Ciò significa che stream.read(n) restituisce un singolo valore invece di un Buffer di dimensione n. Predefinito: false.
    • emitClose <booleano> Se lo stream dovrebbe emettere 'close' dopo essere stato distrutto. Predefinito: true.
    • read <Funzione> Implementazione per il metodo stream._read().
    • destroy <Funzione> Implementazione per il metodo stream._destroy().
    • construct <Funzione> Implementazione per il metodo stream._construct().
    • autoDestroy <booleano> Se questo stream dovrebbe chiamare automaticamente .destroy() su se stesso dopo la terminazione. Predefinito: true.
    • signal <AbortSignal> Un segnale che rappresenta una possibile cancellazione.
js
const { Readable } = require('node:stream')

class MyReadable extends Readable {
  constructor(options) {
    // Chiama il costruttore stream.Readable(options).
    super(options)
    // ...
  }
}

Oppure, quando si usano costruttori in stile pre-ES6:

js
const { Readable } = require('node:stream')
const util = require('node:util')

function MyReadable(options) {
  if (!(this instanceof MyReadable)) return new MyReadable(options)
  Readable.call(this, options)
}
util.inherits(MyReadable, Readable)

Oppure, usando l'approccio del costruttore semplificato:

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    // ...
  },
})

Chiamare abort sull' AbortController corrispondente all' AbortSignal passato si comporterà nello stesso modo di chiamare .destroy(new AbortError()) sul leggibile creato.

js
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
  read(size) {
    // ...
  },
  signal: controller.signal,
})
// Più tardi, interrompi l'operazione chiudendo lo stream
controller.abort()

readable._construct(callback)

Aggiunto in: v15.0.0

  • callback <Function> Chiama questa funzione (facoltativamente con un argomento di errore) quando lo stream ha terminato l'inizializzazione.

Il metodo _construct() NON DEVE essere chiamato direttamente. Può essere implementato dalle classi figlie e, in tal caso, sarà chiamato solo dai metodi interni della classe Readable.

Questa funzione opzionale sarà pianificata nel prossimo tick dal costruttore dello stream, ritardando qualsiasi chiamata _read() e _destroy() fino a quando callback non verrà chiamato. Questo è utile per inizializzare lo stato o inizializzare asincronicamente le risorse prima che lo stream possa essere utilizzato.

js
const { Readable } = require('node:stream')
const fs = require('node:fs')

class ReadStream extends Readable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _read(n) {
    const buf = Buffer.alloc(n)
    fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
      if (err) {
        this.destroy(err)
      } else {
        this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
      }
    })
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

readable._read(size)

Aggiunto in: v0.9.4

  • size <number> Numero di byte da leggere asincronicamente

Questa funzione NON DEVE essere chiamata direttamente dal codice dell'applicazione. Dovrebbe essere implementata dalle classi figlie e chiamata solo dai metodi interni della classe Readable.

Tutte le implementazioni dello stream Readable devono fornire un'implementazione del metodo readable._read() per recuperare i dati dalla risorsa sottostante.

Quando viene chiamato readable._read(), se i dati sono disponibili dalla risorsa, l'implementazione dovrebbe iniziare a inserire quei dati nella coda di lettura usando il metodo this.push(dataChunk). _read() verrà chiamato di nuovo dopo ogni chiamata a this.push(dataChunk) una volta che lo stream è pronto per accettare più dati. _read() può continuare a leggere dalla risorsa e inserire dati finché readable.push() non restituisce false. Solo quando _read() viene chiamato di nuovo dopo che si è fermato, dovrebbe riprendere a inserire dati aggiuntivi nella coda.

Una volta che il metodo readable._read() è stato chiamato, non verrà chiamato di nuovo fino a quando non verranno inseriti più dati tramite il metodo readable.push(). I dati vuoti come buffer e stringhe vuoti non faranno sì che readable._read() venga chiamato.

L'argomento size è indicativo. Per le implementazioni in cui una "lettura" è una singola operazione che restituisce dati, è possibile utilizzare l'argomento size per determinare quanti dati recuperare. Altre implementazioni possono ignorare questo argomento e semplicemente fornire dati quando diventano disponibili. Non è necessario "attendere" fino a quando non sono disponibili size byte prima di chiamare stream.push(chunk).

Il metodo readable._read() è preceduto da un trattino basso perché è interno alla classe che lo definisce e non dovrebbe mai essere chiamato direttamente dai programmi utente.

readable._destroy(err, callback)

Aggiunto in: v8.0.0

  • err <Error> Un possibile errore.
  • callback <Function> Una funzione di callback che accetta un argomento di errore opzionale.

Il metodo _destroy() viene chiamato da readable.destroy(). Può essere sovrascritto dalle classi figlie, ma non deve essere chiamato direttamente.

readable.push(chunk[, encoding])

[Cronologia]

VersioneModifiche
v22.0.0, v20.13.0L'argomento chunk può ora essere un'istanza di TypedArray o DataView.
v8.0.0L'argomento chunk può ora essere un'istanza di Uint8Array.
  • chunk <Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Frammento di dati da inserire nella coda di lettura. Per i flussi che non operano in modalità oggetto, chunk deve essere una <string>, <Buffer>, <TypedArray> o <DataView>. Per i flussi in modalità oggetto, chunk può essere qualsiasi valore JavaScript.
  • encoding <string> Codifica dei frammenti di stringa. Deve essere una codifica Buffer valida, come 'utf8' o 'ascii'.
  • Restituisce: <boolean> true se è possibile continuare a inserire ulteriori frammenti di dati; false altrimenti.

Quando chunk è un <Buffer>, <TypedArray>, <DataView> o <string>, il frammento di dati verrà aggiunto alla coda interna per i consumatori del flusso. Passando chunk come null si segnala la fine del flusso (EOF), dopo di che non è più possibile scrivere altri dati.

Quando il Readable opera in modalità sospesa, i dati aggiunti con readable.push() possono essere letti chiamando il metodo readable.read() quando viene emesso l'evento 'readable'.

Quando il Readable opera in modalità di flusso, i dati aggiunti con readable.push() verranno consegnati emettendo un evento 'data'.

Il metodo readable.push() è progettato per essere il più flessibile possibile. Ad esempio, quando si incapsula una sorgente di livello inferiore che fornisce una qualche forma di meccanismo di pausa/riepilogo e una callback di dati, la sorgente di livello inferiore può essere incapsulata dall'istanza Readable personalizzata:

js
// `_source` è un oggetto con i metodi readStop() e readStart(),
// e un membro `ondata` che viene chiamato quando ha dati, e
// un membro `onend` che viene chiamato quando i dati sono finiti.

class SourceWrapper extends Readable {
  constructor(options) {
    super(options)

    this._source = getLowLevelSourceObject()

    // Ogni volta che ci sono dati, inserirli nel buffer interno.
    this._source.ondata = chunk => {
      // Se push() restituisce false, allora smetti di leggere dalla sorgente.
      if (!this.push(chunk)) this._source.readStop()
    }

    // Quando la sorgente termina, inserisci il frammento `null` che segnala la fine del flusso.
    this._source.onend = () => {
      this.push(null)
    }
  }
  // _read() verrà chiamato quando il flusso desidera estrarre più dati.
  // L'argomento size consigliato viene ignorato in questo caso.
  _read(size) {
    this._source.readStart()
  }
}

Il metodo readable.push() viene utilizzato per inserire il contenuto nel buffer interno. Può essere guidato dal metodo readable._read().

Per i flussi che non operano in modalità oggetto, se il parametro chunk di readable.push() è undefined, verrà trattato come una stringa o un buffer vuoto. Vedere readable.push('') per maggiori informazioni.

Errori durante la lettura

Gli errori che si verificano durante l'elaborazione di readable._read() devono essere propagati attraverso il metodo readable.destroy(err). Sollevare un Error all'interno di readable._read() o emettere manualmente un evento 'error' comporta un comportamento indefinito.

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    const err = checkSomeErrorCondition()
    if (err) {
      this.destroy(err)
    } else {
      // Esegui alcune operazioni.
    }
  },
})

Esempio di stream di conteggio

Segue un esempio di base di uno stream Readable che emette i numeri da 1 a 1.000.000 in ordine ascendente, e poi termina.

js
const { Readable } = require('node:stream')

class Counter extends Readable {
  constructor(opt) {
    super(opt)
    this._max = 1000000
    this._index = 1
  }

  _read() {
    const i = this._index++
    if (i > this._max) this.push(null)
    else {
      const str = String(i)
      const buf = Buffer.from(str, 'ascii')
      this.push(buf)
    }
  }
}

Implementare uno stream duplex

Uno stream Duplex è uno stream che implementa sia Readable che Writable, come ad esempio una connessione socket TCP.

Poiché JavaScript non supporta l'ereditarietà multipla, la classe stream.Duplex viene estesa per implementare uno stream Duplex (invece di estendere le classi stream.Readable e stream.Writable).

La classe stream.Duplex eredita prototipicamente da stream.Readable e parassitariamente da stream.Writable, ma instanceof funzionerà correttamente per entrambe le classi di base grazie alla sovrascrittura di Symbol.hasInstance su stream.Writable.

Gli stream Duplex personalizzati devono chiamare il costruttore new stream.Duplex([options]) e implementare sia il metodo readable._read() che il metodo writable._write().

new stream.Duplex(options)

[Cronologia]

VersioneModifiche
v8.4.0Le opzioni readableHighWaterMark e writableHighWaterMark sono ora supportate.
  • options <Oggetto> Passato sia ai costruttori Writable che Readable. Ha anche i seguenti campi:
    • allowHalfOpen <booleano> Se impostato su false, il flusso terminerà automaticamente il lato scrivibile quando termina il lato leggibile. Predefinito: true.
    • readable <booleano> Imposta se il Duplex deve essere leggibile. Predefinito: true.
    • writable <booleano> Imposta se il Duplex deve essere scrivibile. Predefinito: true.
    • readableObjectMode <booleano> Imposta objectMode per il lato leggibile del flusso. Non ha effetto se objectMode è true. Predefinito: false.
    • writableObjectMode <booleano> Imposta objectMode per il lato scrivibile del flusso. Non ha effetto se objectMode è true. Predefinito: false.
    • readableHighWaterMark <numero> Imposta highWaterMark per il lato leggibile del flusso. Non ha effetto se è fornito highWaterMark.
    • writableHighWaterMark <numero> Imposta highWaterMark per il lato scrivibile del flusso. Non ha effetto se è fornito highWaterMark.
js
const { Duplex } = require('node:stream')

class MyDuplex extends Duplex {
  constructor(options) {
    super(options)
    // ...
  }
}

Oppure, quando si utilizzano costruttori in stile pre-ES6:

js
const { Duplex } = require('node:stream')
const util = require('node:util')

function MyDuplex(options) {
  if (!(this instanceof MyDuplex)) return new MyDuplex(options)
  Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)

Oppure, utilizzando l'approccio del costruttore semplificato:

js
const { Duplex } = require('node:stream')

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  },
})

Quando si utilizza la pipeline:

js
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')

pipeline(
  fs.createReadStream('object.json').setEncoding('utf8'),
  new Transform({
    decodeStrings: false, // Accetta input stringa invece di buffer
    construct(callback) {
      this.data = ''
      callback()
    },
    transform(chunk, encoding, callback) {
      this.data += chunk
      callback()
    },
    flush(callback) {
      try {
        // Assicurarsi che sia un JSON valido.
        JSON.parse(this.data)
        this.push(this.data)
        callback()
      } catch (err) {
        callback(err)
      }
    },
  }),
  fs.createWriteStream('valid-object.json'),
  err => {
    if (err) {
      console.error('errore', err)
    } else {
      console.log('completato')
    }
  }
)

Esempio di stream duplex

L'esempio seguente illustra un semplice esempio di stream Duplex che incapsula un ipotetico oggetto sorgente di livello inferiore in cui è possibile scrivere dati e da cui è possibile leggere dati, sebbene utilizzando un'API non compatibile con gli stream di Node.js. L'esempio seguente illustra un semplice esempio di stream Duplex che mette in buffer i dati in ingresso scritti tramite l'interfaccia Writable che viene letta tramite l'interfaccia Readable.

js
const { Duplex } = require('node:stream')
const kSource = Symbol('source')

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options)
    this[kSource] = source
  }

  _write(chunk, encoding, callback) {
    // La sorgente sottostante gestisce solo le stringhe.
    if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
    this[kSource].writeSomeData(chunk)
    callback()
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding))
    })
  }
}

L'aspetto più importante di uno stream Duplex è che i lati Readable e Writable operano indipendentemente l'uno dall'altro nonostante coesistano all'interno di una singola istanza di oggetto.

Stream duplex in modalità oggetto

Per gli stream Duplex, objectMode può essere impostato esclusivamente per il lato Readable o Writable utilizzando rispettivamente le opzioni readableObjectMode e writableObjectMode.

Nell'esempio seguente, ad esempio, viene creato un nuovo stream Transform (che è un tipo di stream Duplex) che ha un lato Writable in modalità oggetto che accetta numeri JavaScript che vengono convertiti in stringhe esadecimali sul lato Readable.

js
const { Transform } = require('node:stream')

// Tutti gli stream Transform sono anche stream Duplex.
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // Costringe il chunk a un numero se necessario.
    chunk |= 0

    // Trasforma il chunk in qualcos'altro.
    const data = chunk.toString(16)

    // Inserisce i dati nella coda leggibile.
    callback(null, '0'.repeat(data.length % 2) + data)
  },
})

myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))

myTransform.write(1)
// Stampa: 01
myTransform.write(10)
// Stampa: 0a
myTransform.write(100)
// Stampa: 64

Implementare un flusso di trasformazione

Un flusso Transform è un flusso Duplex in cui l'output viene calcolato in qualche modo dall'input. Esempi includono flussi zlib o flussi crypto che comprimono, crittografano o decrittografano i dati.

Non è richiesto che l'output abbia le stesse dimensioni dell'input, lo stesso numero di chunk o arrivi allo stesso tempo. Ad esempio, un flusso Hash avrà solo un singolo chunk di output fornito quando l'input è terminato. Un flusso zlib produrrà un output che è molto più piccolo o molto più grande del suo input.

La classe stream.Transform viene estesa per implementare un flusso Transform.

La classe stream.Transform eredita prototipicamente da stream.Duplex e implementa le proprie versioni dei metodi writable._write() e readable._read(). Le implementazioni personalizzate di Transform devono implementare il metodo transform._transform() e possono anche implementare il metodo transform._flush().

È necessario prestare attenzione quando si utilizzano flussi Transform in quanto i dati scritti nel flusso possono causare la sospensione del lato Writable del flusso se l'output sul lato Readable non viene consumato.

new stream.Transform([options])

js
const { Transform } = require('node:stream')

class MyTransform extends Transform {
  constructor(options) {
    super(options)
    // ...
  }
}

Oppure, quando si utilizzano costruttori in stile pre-ES6:

js
const { Transform } = require('node:stream')
const util = require('node:util')

function MyTransform(options) {
  if (!(this instanceof MyTransform)) return new MyTransform(options)
  Transform.call(this, options)
}
util.inherits(MyTransform, Transform)

Oppure, utilizzando l'approccio del costruttore semplificato:

js
const { Transform } = require('node:stream')

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  },
})

Evento: 'end'

L'evento 'end' proviene dalla classe stream.Readable. L'evento 'end' viene emesso dopo che tutti i dati sono stati elaborati, il che avviene dopo che la callback in transform._flush() è stata chiamata. In caso di errore, 'end' non dovrebbe essere emesso.

Evento: 'finish'

L'evento 'finish' proviene dalla classe stream.Writable. L'evento 'finish' viene emesso dopo che stream.end() è stato chiamato e tutti i chunk sono stati elaborati da stream._transform(). In caso di errore, 'finish' non dovrebbe essere emesso.

transform._flush(callback)

  • callback <Funzione> Una funzione di callback (facoltativamente con un argomento di errore e dati) da chiamare quando i dati rimanenti sono stati svuotati.

Questa funzione NON DEVE essere chiamata direttamente dal codice dell'applicazione. Dovrebbe essere implementata dalle classi figlio e chiamata solo dai metodi interni della classe Readable.

In alcuni casi, un'operazione di trasformazione potrebbe aver bisogno di emettere un bit aggiuntivo di dati alla fine del flusso. Ad esempio, un flusso di compressione zlib memorizzerà una quantità di stato interno utilizzato per comprimere in modo ottimale l'output. Quando il flusso termina, tuttavia, quei dati aggiuntivi devono essere svuotati in modo che i dati compressi siano completi.

Le implementazioni personalizzate di Transform possono implementare il metodo transform._flush(). Questo verrà chiamato quando non ci sono più dati scritti da consumare, ma prima che l'evento 'end' venga emesso segnalando la fine del flusso Readable.

All'interno dell'implementazione di transform._flush(), il metodo transform.push() può essere chiamato zero o più volte, a seconda dei casi. La funzione callback deve essere chiamata quando l'operazione di svuotamento è completa.

Il metodo transform._flush() è prefissato con un underscore perché è interno alla classe che lo definisce e non dovrebbe mai essere chiamato direttamente dai programmi utente.

transform._transform(chunk, encoding, callback)

  • chunk <Buffer> | <string> | <any> Il Buffer da trasformare, convertito dalla string passata a stream.write(). Se l'opzione decodeStrings dello stream è false o lo stream opera in modalità oggetto, il chunk non verrà convertito e sarà quello passato a stream.write().
  • encoding <string> Se il chunk è una stringa, questo è il tipo di codifica. Se il chunk è un buffer, questo è il valore speciale 'buffer'. Ignorarlo in quel caso.
  • callback <Function> Una funzione di callback (facoltativamente con un argomento di errore e dati) da chiamare dopo che il chunk fornito è stato elaborato.

Questa funzione NON DEVE essere chiamata direttamente dal codice dell'applicazione. Dovrebbe essere implementata dalle classi figlie e chiamata solo dai metodi interni della classe Readable.

Tutte le implementazioni dello stream Transform devono fornire un metodo _transform() per accettare input e produrre output. L'implementazione di transform._transform() gestisce i byte in scrittura, calcola un output, quindi passa quell'output alla parte leggibile usando il metodo transform.push().

Il metodo transform.push() può essere chiamato zero o più volte per generare output da un singolo chunk di input, a seconda di quanto deve essere output come risultato del chunk.

È possibile che nessun output venga generato da un dato chunk di dati di input.

La funzione callback deve essere chiamata solo quando il chunk corrente è completamente consumato. Il primo argomento passato al callback deve essere un oggetto Error se si è verificato un errore durante l'elaborazione dell'input o null altrimenti. Se viene passato un secondo argomento al callback, verrà inoltrato al metodo transform.push(), ma solo se il primo argomento è falso. In altre parole, i seguenti sono equivalenti:

js
transform.prototype._transform = function (data, encoding, callback) {
  this.push(data)
  callback()
}

transform.prototype._transform = function (data, encoding, callback) {
  callback(null, data)
}

Il metodo transform._transform() è prefissato con un underscore perché è interno alla classe che lo definisce e non dovrebbe mai essere chiamato direttamente dai programmi utente.

transform._transform() non viene mai chiamato in parallelo; gli stream implementano un meccanismo di coda e per ricevere il chunk successivo, callback deve essere chiamato, sincronicamente o asincronicamente.

Classe: stream.PassThrough

La classe stream.PassThrough è un'implementazione banale di uno stream Transform che semplicemente passa i byte di input all'output. Il suo scopo è principalmente quello di fornire esempi e test, ma ci sono alcuni casi d'uso in cui stream.PassThrough è utile come componente base per nuovi tipi di stream.

Note aggiuntive

Compatibilità degli stream con generatori asincroni e iteratori asincroni

Con il supporto di generatori e iteratori asincroni in JavaScript, i generatori asincroni sono a questo punto effettivamente una struttura di stream di primo livello a livello di linguaggio.

Di seguito sono riportati alcuni casi comuni di interoperabilità tra gli stream di Node.js e i generatori e gli iteratori asincroni.

Consumare stream leggibili con iteratori asincroni

js
;(async function () {
  for await (const chunk of readable) {
    console.log(chunk)
  }
})()

Gli iteratori asincroni registrano un gestore di errori permanente sullo stream per prevenire eventuali errori non gestiti post-distruzione.

Creare stream leggibili con generatori asincroni

Uno stream leggibile di Node.js può essere creato da un generatore asincrono usando il metodo di utilità Readable.from():

js
const { Readable } = require('node:stream')

const ac = new AbortController()
const signal = ac.signal

async function* generate() {
  yield 'a'
  await someLongRunningFn({ signal })
  yield 'b'
  yield 'c'
}

const readable = Readable.from(generate())
readable.on('close', () => {
  ac.abort()
})

readable.on('data', chunk => {
  console.log(chunk)
})

Piping verso stream scrivibili da iteratori asincroni

Quando si scrive su uno stream scrivibile da un iteratore asincrono, assicurarsi di gestire correttamente la backpressure e gli errori. stream.pipeline() astrae la gestione della backpressure e degli errori correlati alla backpressure:

js
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')

const writable = fs.createWriteStream('./file')

const ac = new AbortController()
const signal = ac.signal

const iterator = createIterator({ signal })

// Modello Callback
pipeline(iterator, writable, (err, value) => {
  if (err) {
    console.error(err)
  } else {
    console.log(value, 'value returned')
  }
}).on('close', () => {
  ac.abort()
})

// Modello Promise
pipelinePromise(iterator, writable)
  .then(value => {
    console.log(value, 'value returned')
  })
  .catch(err => {
    console.error(err)
    ac.abort()
  })

Compatibilità con versioni precedenti di Node.js

Prima di Node.js 0.10, l'interfaccia della stream Readable era più semplice, ma anche meno potente e meno utile.

  • Invece di attendere le chiamate al metodo stream.read(), gli eventi 'data' iniziavano a essere emessi immediatamente. Le applicazioni che necessitavano di eseguire una certa quantità di lavoro per decidere come gestire i dati erano obbligate a memorizzare i dati letti in buffer, in modo che i dati non andassero persi.
  • Il metodo stream.pause() era consigliativo, piuttosto che garantito. Ciò significava che era comunque necessario essere preparati a ricevere eventi 'data' anche quando la stream era in stato di pausa.

In Node.js 0.10, è stata aggiunta la classe Readable. Per la compatibilità con i programmi Node.js precedenti, le stream Readable passano alla "modalità flowing" quando viene aggiunto un gestore eventi 'data', o quando viene chiamato il metodo stream.resume(). L'effetto è che, anche senza utilizzare il nuovo metodo stream.read() e l'evento 'readable', non è più necessario preoccuparsi di perdere chunk 'data'.

Sebbene la maggior parte delle applicazioni continui a funzionare normalmente, questo introduce un caso limite nelle seguenti condizioni:

  • Non viene aggiunto alcun listener di eventi 'data'.
  • Il metodo stream.resume() non viene mai chiamato.
  • La stream non viene inviata a nessuna destinazione scrivibile.

Ad esempio, considera il seguente codice:

js
// ATTENZIONE!  NON FUNZIONA!
net
  .createServer(socket => {
    // Aggiungiamo un listener 'end', ma non consumiamo mai i dati.
    socket.on('end', () => {
      // Non arriverà mai qui.
      socket.end('Il messaggio è stato ricevuto ma non è stato elaborato.\n')
    })
  })
  .listen(1337)

Prima di Node.js 0.10, i dati del messaggio in arrivo sarebbero stati semplicemente scartati. Tuttavia, in Node.js 0.10 e successive, il socket rimane in pausa per sempre.

La soluzione in questa situazione è chiamare il metodo stream.resume() per iniziare il flusso di dati:

js
// Soluzione.
net
  .createServer(socket => {
    socket.on('end', () => {
      socket.end('Il messaggio è stato ricevuto ma non è stato elaborato.\n')
    })

    // Avvia il flusso di dati, scartandolo.
    socket.resume()
  })
  .listen(1337)

Oltre al passaggio delle nuove stream Readable alla modalità flowing, le stream in stile pre-0.10 possono essere racchiuse in una classe Readable usando il metodo readable.wrap().

readable.read(0)

Ci sono alcuni casi in cui è necessario innescare un aggiornamento dei meccanismi sottostanti dello stream leggibile, senza effettivamente consumare alcun dato. In questi casi, è possibile chiamare readable.read(0), che restituirà sempre null.

Se il buffer di lettura interno è inferiore a highWaterMark, e lo stream non sta attualmente leggendo, allora chiamare stream.read(0) innescherà una chiamata di basso livello a stream._read().

Sebbene la maggior parte delle applicazioni non abbia quasi mai bisogno di farlo, ci sono situazioni all'interno di Node.js in cui questo viene fatto, in particolare negli interni della classe stream Readable.

readable.push('')

L'utilizzo di readable.push('') non è raccomandato.

Inserire uno <string>, <Buffer>, <TypedArray> o <DataView> a zero byte in uno stream che non è in modalità oggetto ha un effetto collaterale interessante. Poiché è una chiamata a readable.push(), la chiamata terminerà il processo di lettura. Tuttavia, poiché l'argomento è una stringa vuota, nessun dato viene aggiunto al buffer leggibile, quindi non c'è nulla che un utente possa consumare.

Discrepanza di highWaterMark dopo aver chiamato readable.setEncoding()

L'utilizzo di readable.setEncoding() cambierà il comportamento del funzionamento di highWaterMark in modalità non oggetto.

In genere, la dimensione del buffer corrente viene misurata rispetto a highWaterMark in byte. Tuttavia, dopo che setEncoding() è stato chiamato, la funzione di confronto inizierà a misurare la dimensione del buffer in caratteri.

Questo non è un problema nei casi comuni con latin1 o ascii. Ma si consiglia di essere consapevoli di questo comportamento quando si lavora con stringhe che potrebbero contenere caratteri multibyte.