Skip to content

Contropressione negli Stream

Si verifica un problema generale durante la gestione dei dati chiamato contropressione (backpressure) che descrive un accumulo di dati dietro un buffer durante il trasferimento dati. Quando l'estremità ricevente del trasferimento ha operazioni complesse, o è più lenta per qualsiasi motivo, c'è una tendenza ad accumulare dati dalla sorgente in ingresso, come un'ostruzione.

Per risolvere questo problema, deve essere in atto un sistema di delega per garantire un flusso regolare di dati da una sorgente all'altra. Diverse comunità hanno risolto questo problema in modo univoco per i loro programmi, le pipe Unix e i socket TCP sono buoni esempi di questo, e sono spesso indicati come controllo di flusso. In Node.js, gli stream sono diventati la soluzione adottata.

Lo scopo di questa guida è di dettagliare ulteriormente cos'è la contropressione e come gli stream la gestiscono esattamente nel codice sorgente di Node.js. La seconda parte della guida introdurrà le migliori pratiche suggerite per garantire che il codice della tua applicazione sia sicuro e ottimizzato durante l'implementazione degli stream.

Diamo per scontato una certa familiarità con la definizione generale di backpressure, Buffer e EventEmitters in Node.js, così come un po' di esperienza con Stream. Se non hai letto quei documenti, non è una cattiva idea dare un'occhiata alla documentazione dell'API prima, poiché ti aiuterà ad espandere la tua comprensione mentre leggi questa guida.

Il problema con la gestione dei dati

In un sistema informatico, i dati vengono trasferiti da un processo all'altro tramite pipe, socket e segnali. In Node.js, troviamo un meccanismo simile chiamato Stream. Gli stream sono fantastici! Fanno così tanto per Node.js e quasi ogni parte del codice interno utilizza quel modulo. Come sviluppatore, sei più che incoraggiato ad usarli anche tu!

javascript
const readline = require('node:readline')

const rl = readline.createInterface({
  output: process.stdout,
  input: process.stdin,
})

rl.question('Perché dovresti usare gli stream? ', answer => {
  console.log(`Forse è ${answer}, forse perché sono fantastici!`)
})

rl.close()

Un buon esempio di perché il meccanismo di contropressione implementato tramite stream è una grande ottimizzazione può essere dimostrato confrontando gli strumenti di sistema interni dall'implementazione di Stream di Node.js.

In uno scenario, prenderemo un file di grandi dimensioni (circa -9 GB) e lo comprimeremo usando il familiare strumento zip(1).

bash
zip The.Matrix.1080p.mkv

Mentre questo richiederà alcuni minuti per completarsi, in un altro shell possiamo eseguire uno script che utilizza il modulo zlib di Node.js, che si avvolge attorno a un altro strumento di compressione, gzip(1).

javascript
const gzip = require('node:zlib').createGzip()
const fs = require('node:fs')

const inp = fs.createReadStream('The.Matrix.1080p.mkv')
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz')

inp.pipe(gzip).pipe(out)

Per testare i risultati, prova ad aprire ogni file compresso. Il file compresso dallo strumento zip(1) ti notificherà che il file è corrotto, mentre la compressione completata da Stream si decomprimerà senza errori.

Nota

In questo esempio, usiamo .pipe() per ottenere la sorgente dati da un'estremità all'altra. Tuttavia, nota che non ci sono gestori di errori adeguati collegati. Se un blocco di dati non dovesse essere ricevuto correttamente, la sorgente Readable o lo stream gzip non verrà distrutto. pump è uno strumento di utilità che distruggerebbe correttamente tutti gli stream in una pipeline se uno di essi fallisse o si chiudesse, ed è indispensabile in questo caso!

pump è necessario solo per Node.js 8.x o versioni precedenti, poiché per Node.js 10.x o versioni successive, pipeline è introdotto per sostituire pump. Questo è un metodo del modulo per eseguire il pipe tra stream inoltrando gli errori e pulendo correttamente e fornendo un callback quando la pipeline è completa.

Ecco un esempio di utilizzo di pipeline:

javascript
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// Usa l'API pipeline per eseguire facilmente il pipe di una serie di stream
// insieme e ricevi una notifica quando la pipeline è completamente terminata.
// Una pipeline per comprimere in gzip un file video potenzialmente enorme in modo efficiente:
pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  err => {
    if (err) {
      console.error('Pipeline fallita', err)
    } else {
      console.log('Pipeline riuscita')
    }
  }
)

Puoi anche usare il modulo stream/promises per usare pipeline con async / await:

javascript
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
  try {
    await pipeline(
      fs.createReadStream('The.Matrix.1080p.mkv'),
      zlib.createGzip(),
      fs.createWriteStream('The.Matrix.1080p.mkv.gz')
    )
    console.log('Pipeline riuscita')
  } catch (err) {
    console.error('Pipeline fallita', err)
  }
}

Dati Eccessivi, Troppo Velocemente

Ci sono casi in cui uno stream Readable potrebbe fornire dati allo stream Writable troppo velocemente, molto più di quanto il consumer possa gestire!

Quando ciò si verifica, il consumer inizierà ad accodare tutti i chunk di dati per un consumo successivo. La coda di scrittura diventerà sempre più lunga e, per questo motivo, una maggiore quantità di dati dovrà essere mantenuta in memoria fino al completamento dell'intero processo.

La scrittura su un disco è molto più lenta della lettura da un disco, quindi, quando cerchiamo di comprimere un file e scriverlo sul nostro disco rigido, si verificherà la backpressure perché il disco di scrittura non sarà in grado di tenere il passo con la velocità di lettura.

javascript
// Segretamente lo stream sta dicendo: "Whoa, whoa! Aspetta, questo è troppo!"
// I dati inizieranno ad accumularsi sul lato di lettura del buffer dati mentre
// la scrittura cerca di tenere il passo con il flusso di dati in arrivo.
inp.pipe(gzip).pipe(outputFile)

Questo è il motivo per cui un meccanismo di backpressure è importante. Se un sistema di backpressure non fosse presente, il processo esaurirebbe la memoria del sistema, rallentando efficacemente altri processi e monopolizzando una grande parte del sistema fino al completamento.

Questo si traduce in alcune cose:

  • Rallentamento di tutti gli altri processi correnti
  • Un garbage collector molto sovraccarico
  • Esaurimento della memoria

Negli esempi seguenti, rimuoveremo il valore di ritorno della funzione .write() e lo cambieremo in true, disabilitando efficacemente il supporto della backpressure nel core di Node.js. In qualsiasi riferimento al binario 'modified', stiamo parlando dell'esecuzione del binario node senza la riga return ret;, e invece con la sostituzione return true;.

Eccesso di Carico sul Garbage Collection

Diamo un'occhiata a un rapido benchmark. Usando lo stesso esempio di cui sopra, abbiamo eseguito alcuni test temporali per ottenere un tempo mediano per entrambi i binari.

bash
   prova (#)  | binario `node` (ms) | binario `node` modificato (ms)
=================================================================
      1       |      56924         |           55011
      2       |      52686         |           55869
      3       |      59479         |           54043
      4       |      54473         |           55229
      5       |      52933         |           59723
=================================================================
tempo medio: |      55299         |           55975

Entrambi impiegano circa un minuto per l'esecuzione, quindi non c'è molta differenza, ma diamo un'occhiata più da vicino per confermare se i nostri sospetti sono corretti. Utilizziamo lo strumento Linux dtrace per valutare cosa sta succedendo con il garbage collector V8.

Il tempo misurato del GC (garbage collector) indica gli intervalli di un ciclo completo di una singola scansione eseguita dal garbage collector:

bash
tempo appross. (ms) | GC (ms) | GC modificato (ms)
=================================================
          0       |    0    |      0
          1       |    0    |      0
         40       |    0    |      2
        170       |    3    |      1
        300       |    3    |      1
         *             *           *
         *             *           *
         *             *           *
      39000       |    6    |     26
      42000       |    6    |     21
      47000       |    5    |     32
      50000       |    8    |     28
      54000       |    6    |     35

Mentre i due processi iniziano allo stesso modo e sembrano lavorare il GC alla stessa velocità, diventa evidente che dopo alcuni secondi con un sistema di backpressure funzionante correttamente, esso distribuisce il carico del GC su intervalli costanti di 4-8 millisecondi fino alla fine del trasferimento dati.

Tuttavia, quando un sistema di backpressure non è in atto, il garbage collection V8 inizia a rallentare. Il binario normale chiama il GC circa 75 volte al minuto, mentre il binario modificato lo chiama solo 36 volte.

Questo è il lento e graduale debito che si accumula dall'aumento dell'utilizzo della memoria. Man mano che i dati vengono trasferiti, senza un sistema di backpressure in atto, viene utilizzata più memoria per ogni trasferimento di chunk.

Più memoria viene allocata, più il GC deve occuparsi in una singola scansione. Più grande è la scansione, più il GC deve decidere cosa può essere liberato, e la scansione di puntatori staccati in uno spazio di memoria più ampio consumerà più potenza di calcolo.

Esaurimento Memoria

Per determinare il consumo di memoria di ciascun binario, abbiamo cronometrato ogni processo con /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js individualmente.

Questo è l'output sul binario normale:

bash
Respecting the return value of .write()
=============================================
real        58.88
user        56.79
sys          8.79
  87810048  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
     19427  page reclaims
      3134  page faults
         0  swaps
         5  block input operations
       194  block output operations
         0  messages sent
         0  messages received
         1  signals received
        12  voluntary context switches
    666037  involuntary context switches

La dimensione massima in byte occupata dalla memoria virtuale risulta essere approssimativamente 87,81 MB.

E ora, cambiando il valore di ritorno della funzione .write(), otteniamo:

bash
Without respecting the return value of .write():
==================================================
real        54.48
user        53.15
sys          7.43
1524965376  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
    373617  page reclaims
      3139  page faults
         0  swaps
        18  block input operations
       199  block output operations
         0  messages sent
         0  messages received
         1  signals received
        25  voluntary context switches
    629566  involuntary context switches

La dimensione massima in byte occupata dalla memoria virtuale risulta essere approssimativamente 1,52 GB.

Senza stream in atto per delegare la backpressure, viene allocato uno spazio di memoria di un ordine di grandezza maggiore - un enorme margine di differenza tra lo stesso processo!

Questo esperimento mostra quanto sia ottimizzato ed economicamente efficiente il meccanismo di backpressure di Node.js per il tuo sistema di elaborazione. Ora, analizziamo come funziona!

Come la Contropressione Risolve Questi Problemi?

Esistono diverse funzioni per trasferire dati da un processo a un altro. In Node.js, c'è una funzione interna integrata chiamata .pipe(). Esistono anche altri pacchetti che puoi utilizzare! In definitiva, però, al livello base di questo processo, abbiamo due componenti separati: la sorgente dei dati e il consumatore.

Quando .pipe() viene chiamato dalla sorgente, segnala al consumatore che ci sono dati da trasferire. La funzione pipe aiuta a impostare le appropriate chiusure di contropressione per i trigger degli eventi.

In Node.js la sorgente è un flusso Readable e il consumatore è il flusso Writable (entrambi questi possono essere scambiati con un flusso Duplex o Transform, ma questo è fuori dallo scopo di questa guida).

Il momento in cui viene attivata la contropressione può essere ristretto esattamente al valore di ritorno della funzione .write() di un Writable. Questo valore di ritorno è determinato, ovviamente, da alcune condizioni.

In qualsiasi scenario in cui il buffer di dati ha superato highwaterMark o la coda di scrittura è attualmente occupata, .write() restituirà false.

Quando viene restituito un valore false, il sistema di contropressione si attiva. Metterà in pausa il flusso Readable in entrata dall'invio di dati e aspetterà che il consumatore sia di nuovo pronto. Una volta svuotato il buffer di dati, verrà emesso un evento 'drain' e verrà ripreso il flusso di dati in entrata.

Una volta terminata la coda, la contropressione consentirà nuovamente l'invio dei dati. Lo spazio di memoria utilizzato si libererà e si preparerà per il prossimo batch di dati.

Questo consente efficacemente l'utilizzo di una quantità fissa di memoria in un dato momento per una funzione .pipe(). Non ci saranno perdite di memoria e nessun buffering infinito, e il garbage collector dovrà occuparsi di una sola area di memoria!

Quindi, se la contropressione è così importante, perché (probabilmente) non ne hai mai sentito parlare? Beh, la risposta è semplice: Node.js fa tutto questo automaticamente per te.

Questo è fantastico! Ma anche non così fantastico quando stiamo cercando di capire come implementare i nostri flussi personalizzati.

NOTA

Nella maggior parte delle macchine, esiste una dimensione in byte che determina quando un buffer è pieno (che varia a seconda delle diverse macchine). Node.js ti consente di impostare il tuo highWaterMark personalizzato, ma comunemente, il valore predefinito è impostato su 16kb (16384, o 16 per i flussi objectMode). Nei casi in cui potresti voler aumentare quel valore, fallo pure, ma fallo con cautela!

Ciclo di vita di .pipe()

Per comprendere meglio il backpressure, ecco un diagramma di flusso sul ciclo di vita di un flusso Readable che viene piped in un flusso Writable:

bash
                                                     +===================+
                         x-->  Funzioni di Piping   +-->   src.pipe(dest)  |
                         x     sono impostate durante     |===================|
                         x     il metodo .pipe.     |  Callback eventi  |
  +===============+      x                           |-------------------|
  |   I tuoi Dati   |      x     Esistono al di fuori    | .on('close', cb)  |
  +=======+=======+      x     del flusso dati, ma    | .on('data', cb)   |
          |              x     importantemente attaccano    | .on('drain', cb)  |
          |              x     eventi e i loro     | .on('unpipe', cb) |
+---------v---------+    x     rispettivi callback. | .on('error', cb)  |
|  Flusso Readable  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Flusso Writable  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Questo chunk è troppo grande?  |
  ^       |       |     emit .end();             |    La coda è occupata?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  No  |        |  Yes  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            quando la coda è vuota     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Buffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   aggiungi chunk alla coda    |
                                       |            <---^---------------------<
                                       +============+

NOTA

Se stai impostando una pipeline per concatenare alcuni flussi per manipolare i tuoi dati, molto probabilmente implementerai un flusso Transform.

In questo caso, l'output dal tuo flusso Readable entrerà nel Transform e verrà piped nel Writable.

javascript
Readable.pipe(Transformable).pipe(Writable)

Il backpressure verrà applicato automaticamente, ma si noti che sia l'highwaterMark in entrata che in uscita del flusso Transform possono essere manipolati e influenzeranno il sistema di backpressure.

Linee Guida sulla Contropressione

Dalla versione Node.js v0.10, la classe Stream offre la possibilità di modificare il comportamento di .read() o .write() utilizzando le versioni con underscore di queste funzioni rispettivamente (._read() e ._write()).

Esistono linee guida documentate per l'implementazione di stream leggibili e l'implementazione di stream scrivibili. Daremo per scontato che le abbiate lette, e la sezione successiva approfondirà ulteriormente l'argomento.

Regole da Rispettare Durante l'Implementazione di Stream Personalizzati

La regola d'oro degli stream è quella di rispettare sempre la contropressione. Ciò che costituisce una best practice è una pratica non contraddittoria. Finché si fa attenzione ad evitare comportamenti che confliggono con il supporto interno della contropressione, si può essere certi di seguire una buona pratica.

In generale:

  1. Non chiamare mai .push() se non richiesto.
  2. Non chiamare mai .write() dopo che ha restituito falso, ma attendere invece 'drain'.
  3. Gli stream cambiano tra diverse versioni di Node.js e la libreria che si utilizza. Fate attenzione e testate le cose.

NOTA

Per quanto riguarda il punto 3, un pacchetto incredibilmente utile per la creazione di stream del browser è readable-stream. Rodd Vagg ha scritto un ottimo post sul blog che descrive l'utilità di questa libreria. In breve, fornisce un tipo di degradazione graduale automatizzata per gli stream leggibili e supporta versioni precedenti di browser e Node.js.

Regole specifiche per gli Stream Leggibili

Finora, abbiamo esaminato come .write() influisce sulla contropressione e ci siamo concentrati molto sullo stream scrivibile. A causa delle funzionalità di Node.js, i dati fluiscono tecnicamente a valle dallo stream leggibile a quello scrivibile. Tuttavia, come possiamo osservare in qualsiasi trasmissione di dati, materia o energia, la sorgente è altrettanto importante della destinazione, e lo stream leggibile è fondamentale per la gestione della contropressione.

Entrambi questi processi si basano l'uno sull'altro per comunicare efficacemente; se lo stream leggibile ignora quando lo stream scrivibile gli chiede di smettere di inviare dati, può essere altrettanto problematico di quando il valore di ritorno di .write() è errato.

Quindi, oltre a rispettare il valore di ritorno di .write(), dobbiamo anche rispettare il valore di ritorno di .push() utilizzato nel metodo ._read(). Se .push() restituisce un valore falso, lo stream smetterà di leggere dalla sorgente. Altrimenti, continuerà senza pause.

Ecco un esempio di cattiva pratica usando .push():

javascript
// Questo è problematico perché ignora completamente il valore di ritorno di push
// che potrebbe essere un segnale di contropressione dallo stream di destinazione!
class MyReadable extends Readable {
  _read(size) {
    let chunk
    while (null == (chunk = getNextChunk())) {
      this.push(chunk)
    }
  }
}

Inoltre, dall'esterno dello stream personalizzato, ci sono delle insidie nell'ignorare la contropressione. In questo contro-esempio di buona pratica, il codice dell'applicazione forza i dati attraverso ogni volta che sono disponibili (segnalati dall'evento 'data') :

javascript
// Questo ignora i meccanismi di contropressione implementati da Node.js,
// e spinge incondizionatamente i dati, indipendentemente dal fatto che
// lo stream di destinazione sia pronto o meno.
readable.on('data', data => writable.write(data))

Ecco un esempio di utilizzo di .push() con uno stream leggibile.

javascript
const { Readable } = require('node:stream')

// Crea uno stream Readable personalizzato
const myReadableStream = new Readable({
  objectMode: true,
  read(size) {
    // Spingi alcuni dati nello stream
    this.push({ message: 'Hello, world!' })
    this.push(null) // Segna la fine dello stream
  },
})

// Consuma lo stream
myReadableStream.on('data', chunk => {
  console.log(chunk)
})

// Output:
// { message: 'Hello, world!' }

Regole specifiche per gli Stream Scrivibili

Ricorda che un .write() può restituire true o false a seconda di alcune condizioni. Fortunatamente per noi, quando creiamo il nostro stream Writable, la macchina a stati dello stream gestirà le nostre callback e determinerà quando gestire la backpressure e ottimizzare il flusso dei dati per noi. Tuttavia, quando vogliamo usare un Writable direttamente, dobbiamo rispettare il valore di ritorno di .write() e prestare molta attenzione a queste condizioni:

  • Se la coda di scrittura è occupata, .write() restituirà false.
  • Se il chunk di dati è troppo grande, .write() restituirà false (il limite è indicato dalla variabile highWaterMark).

In questo esempio, creiamo un custom stream Readable che spinge un singolo oggetto nello stream usando .push(). Il metodo ._read() viene chiamato quando lo stream è pronto per consumare dati, e in questo caso, spingiamo immediatamente alcuni dati nello stream e segniamo la fine dello stream spingendo null.

javascript
const stream = require('stream')

class MyReadable extends stream.Readable {
  constructor() {
    super()
  }

  _read() {
    const data = { message: 'Hello, world!' }
    this.push(data)
    this.push(null)
  }
}

const readableStream = new MyReadable()

readableStream.pipe(process.stdout)

Quindi consumiamo lo stream ascoltando l'evento 'data' e loggando ogni chunk di dati che viene spinto nello stream. In questo caso, spingiamo solo un singolo chunk di dati nello stream, quindi vediamo solo un messaggio di log.

Regole specifiche per gli Stream Scrivibili

Ricorda che un .write() può restituire true o false a seconda di alcune condizioni. Fortunatamente per noi, quando creiamo il nostro stream Writable, la macchina a stati dello stream gestirà le nostre callback e determinerà quando gestire la backpressure e ottimizzare il flusso dei dati per noi.

Tuttavia, quando vogliamo usare un Writable direttamente, dobbiamo rispettare il valore di ritorno di .write() e prestare molta attenzione a queste condizioni:

  • Se la coda di scrittura è occupata, .write() restituirà false.
  • Se il chunk di dati è troppo grande, .write() restituirà false (il limite è indicato dalla variabile highWaterMark).
javascript
class MyWritable extends Writable {
  // Questo writable è invalido a causa della natura asincrona delle callback JavaScript.
  // Senza un'istruzione di ritorno per ogni callback prima dell'ultima,
  // c'è una grande probabilità che vengano chiamate più callback.
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) callback()
    else if (chunk.toString().indexOf('b') >= 0) callback()
    callback()
  }
}

Ci sono anche alcune cose da tenere d'occhio quando si implementa ._writev(). La funzione è accoppiata con .cork(), ma c'è un errore comune quando si scrive:

javascript
// Usare .uncork() due volte qui fa due chiamate sul livello C++, rendendo inutile la tecnica cork/uncork.
ws.cork()
ws.write('hello ')
ws.write('world ')
ws.uncork()

ws.cork()
ws.write('from ')
ws.write('Matteo')
ws.uncork()

// Il modo corretto di scrivere questo è utilizzare process.nextTick(), che si attiva nel prossimo ciclo eventi.
ws.cork()
ws.write('hello ')
ws.write('world ')
process.nextTick(doUncork, ws)

ws.cork()
ws.write('from ')
ws.write('Matteo')
process.nextTick(doUncork, ws)

// Come funzione globale.
function doUncork(stream) {
  stream.uncork()
}

.cork() può essere chiamato tutte le volte che vogliamo, dobbiamo solo stare attenti a chiamare .uncork() lo stesso numero di volte per farlo fluire di nuovo.

Conclusione

I flussi sono un modulo spesso utilizzato in Node.js. Sono importanti per la struttura interna e, per gli sviluppatori, per espandere e connettere l'ecosistema dei moduli Node.js.

Si spera che ora sarete in grado di risolvere i problemi e codificare in sicurezza i vostri flussi Writable e Readable tenendo presente la backpressure, e condividere le vostre conoscenze con colleghi e amici.

Assicuratevi di approfondire ulteriormente Stream per altre funzioni API che contribuiranno a migliorare e sbloccare le vostre capacità di streaming durante la creazione di un'applicazione con Node.js.