Skip to content

La contre-pression dans les flux

Un problème général se produit lors du traitement des données, appelé contre-pression (backpressure), et décrit une accumulation de données derrière un tampon lors du transfert de données. Lorsque l'extrémité réceptrice du transfert effectue des opérations complexes ou est plus lente pour une raison quelconque, les données de la source entrante ont tendance à s'accumuler, comme un bouchon.

Pour résoudre ce problème, un système de délégation doit être mis en place pour assurer un flux de données fluide d'une source à une autre. Différentes communautés ont résolu ce problème de manière unique pour leurs programmes ; les pipes Unix et les sockets TCP en sont de bons exemples et sont souvent appelés contrôle de flux. Dans Node.js, les flux ont été la solution adoptée.

L'objectif de ce guide est de détailler davantage ce qu'est la contre-pression et comment les flux y répondent exactement dans le code source de Node.js. La deuxième partie du guide présentera les meilleures pratiques suggérées pour garantir que le code de votre application est sûr et optimisé lors de l'implémentation des flux.

Nous supposons une certaine familiarité avec la définition générale de backpressure, Buffer et EventEmitters dans Node.js, ainsi qu'une certaine expérience avec Stream. Si vous n'avez pas lu ces documents, il n'est pas mauvais de consulter la documentation de l'API en premier, car cela vous aidera à approfondir votre compréhension tout en lisant ce guide.

Le problème du traitement des données

Dans un système informatique, les données sont transférées d'un processus à un autre via des pipes, des sockets et des signaux. Dans Node.js, nous trouvons un mécanisme similaire appelé Stream. Les flux sont géniaux ! Ils font tellement pour Node.js et presque toutes les parties de la base de code interne utilisent ce module. En tant que développeur, vous êtes plus qu'encouragé à les utiliser également !

javascript
const readline = require('node:readline')

const rl = readline.createInterface({
  output: process.stdout,
  input: process.stdin,
})

rl.question('Pourquoi devriez-vous utiliser les flux ? ', answer => {
  console.log(`Peut-être que c'est ${answer}, peut-être que c'est parce qu'ils sont géniaux !`)
})

rl.close()

Un bon exemple de la raison pour laquelle le mécanisme de contre-pression implémenté via les flux est une excellente optimisation peut être démontré en comparant les outils système internes de l'implémentation Stream de Node.js.

Dans un scénario, nous prendrons un fichier volumineux (environ -9 Go) et nous le compresserons à l'aide de l'outil familier zip(1).

bash
zip The.Matrix.1080p.mkv

Bien que cela prenne quelques minutes, dans un autre shell, nous pouvons exécuter un script qui utilise le module zlib de Node.js, qui encapsule un autre outil de compression, gzip(1).

javascript
const gzip = require('node:zlib').createGzip()
const fs = require('node:fs')

const inp = fs.createReadStream('The.Matrix.1080p.mkv')
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz')

inp.pipe(gzip).pipe(out)

Pour tester les résultats, essayez d'ouvrir chaque fichier compressé. Le fichier compressé par l'outil zip(1) vous informera que le fichier est corrompu, tandis que la compression terminée par Stream se décompressera sans erreur.

Note

Dans cet exemple, nous utilisons .pipe() pour obtenir la source de données d'une extrémité à l'autre. Cependant, notez qu'aucun gestionnaire d'erreurs approprié n'est attaché. Si un bloc de données devait ne pas être correctement reçu, la source Readable ou le flux gzip ne seront pas détruits. pump est un outil utilitaire qui détruirait correctement tous les flux dans un pipeline si l'un d'eux échoue ou se ferme, et est indispensable dans ce cas !

pump n'est nécessaire que pour Node.js 8.x ou antérieur, car pour Node.js 10.x ou version ultérieure, pipeline est introduit pour remplacer pump. Il s'agit d'une méthode de module pour acheminer entre les flux en transférant les erreurs et en nettoyant correctement et en fournissant un rappel lorsque le pipeline est terminé.

Voici un exemple d'utilisation de pipeline :

javascript
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// Utilisez l'API pipeline pour facilement acheminer une série de flux
// ensemble et être averti lorsque le pipeline est complètement terminé.
// Un pipeline pour compresser efficacement un fichier vidéo potentiellement énorme :
pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  err => {
    if (err) {
      console.error('Pipeline failed', err)
    } else {
      console.log('Pipeline succeeded')
    }
  }
)

Vous pouvez également utiliser le module stream/promises pour utiliser pipeline avec async / await :

javascript
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
  try {
    await pipeline(
      fs.createReadStream('The.Matrix.1080p.mkv'),
      zlib.createGzip(),
      fs.createWriteStream('The.Matrix.1080p.mkv.gz')
    )
    console.log('Pipeline succeeded')
  } catch (err) {
    console.error('Pipeline failed', err)
  }
}

Trop de données, trop rapidement

Il existe des cas où un flux Readable peut fournir des données au flux Writable beaucoup trop rapidement – beaucoup plus que ce que le consommateur peut gérer !

Lorsque cela se produit, le consommateur commence à mettre en file d'attente tous les blocs de données pour une consommation ultérieure. La file d'attente d'écriture devient de plus en plus longue, et de ce fait, davantage de données doivent être conservées en mémoire jusqu'à la fin du processus.

L'écriture sur un disque est beaucoup plus lente que la lecture sur un disque. Ainsi, lorsque nous essayons de compresser un fichier et de l'écrire sur notre disque dur, une contre-pression se produira car l'écriture sur le disque ne pourra pas suivre la vitesse de lecture.

javascript
// En secret, le flux dit : « Whoa, whoa ! attends, c'est beaucoup trop ! »
// Les données commenceront à s'accumuler du côté lecture de la mémoire tampon de données lorsque
// l'écriture essaie de suivre le flux de données entrant.
inp.pipe(gzip).pipe(outputFile)

C'est pourquoi un mécanisme de contre-pression est important. Sans système de contre-pression, le processus épuiserait la mémoire de votre système, ralentissant efficacement les autres processus et monopolisant une grande partie de votre système jusqu'à son achèvement.

Cela entraîne plusieurs conséquences :

  • Ralentissement de tous les autres processus en cours
  • Un garbage collector très surchargé
  • Épuisement de la mémoire

Dans les exemples suivants, nous supprimerons la valeur de retour de la fonction .write() et la remplacerons par true, ce qui désactive efficacement le support de la contre-pression dans le cœur de Node.js. Toute référence au binaire 'modified' désigne l'exécution du binaire node sans la ligne return ret;, et avec return true; à la place.

Surcharge excessive du garbage collector

Examinons un benchmark rapide. En utilisant le même exemple que ci-dessus, nous avons effectué quelques essais chronométrés pour obtenir un temps médian pour les deux binaires.

bash
   essai (#)  | binaire `node` (ms) | binaire `node` modifié (ms)
=================================================================
      1       |      56924         |           55011
      2       |      52686         |           55869
      3       |      59479         |           54043
      4       |      54473         |           55229
      5       |      52933         |           59723
=================================================================
temps moyen : |      55299         |           55975

Les deux prennent environ une minute à s'exécuter, il n'y a donc pas beaucoup de différence, mais examinons de plus près pour confirmer si nos soupçons sont corrects. Nous utilisons l'outil Linux dtrace pour évaluer ce qui se passe avec le garbage collector V8.

Le temps mesuré du GC (garbage collector) indique les intervalles d'un cycle complet d'un seul balayage effectué par le garbage collector :

bash
temps approx. (ms) | GC (ms) | GC modifié (ms)
=================================================
          0       |    0    |      0
          1       |    0    |      0
         40       |    0    |      2
        170       |    3    |      1
        300       |    3    |      1
         *             *           *
         *             *           *
         *             *           *
      39000       |    6    |     26
      42000       |    6    |     21
      47000       |    5    |     32
      50000       |    8    |     28
      54000       |    6    |     35

Alors que les deux processus commencent de la même manière et semblent fonctionner le GC au même rythme, il devient évident qu'après quelques secondes avec un système de contre-pression correctement en place, il répartit la charge du GC sur des intervalles constants de 4 à 8 millisecondes jusqu'à la fin du transfert de données.

Cependant, lorsqu'un système de contre-pression n'est pas en place, le garbage collection V8 commence à se traîner. Le binaire normal appelé GC se déclenche environ 75 fois par minute, tandis que le binaire modifié ne se déclenche que 36 fois.

Il s'agit de la dette lente et progressive qui s'accumule à partir de l'utilisation croissante de la mémoire. Au fur et à mesure que les données sont transférées, sans système de contre-pression en place, plus de mémoire est utilisée pour chaque transfert de bloc.

Plus la mémoire allouée est importante, plus le GC doit s'occuper d'un seul balayage. Plus le balayage est important, plus le GC doit décider ce qui peut être libéré, et la recherche de pointeurs détachés dans un espace mémoire plus grand consommera plus de puissance de calcul.

Épuisement de la mémoire

Pour déterminer la consommation mémoire de chaque binaire, nous avons chronométré chaque processus individuellement avec /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js.

Voici la sortie sur le binaire normal :

bash
Respecting the return value of .write()
=============================================
real        58.88
user        56.79
sys          8.79
  87810048  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
     19427  page reclaims
      3134  page faults
         0  swaps
         5  block input operations
       194  block output operations
         0  messages sent
         0  messages received
         1  signals received
        12  voluntary context switches
    666037  involuntary context switches

La taille maximale en octets occupée par la mémoire virtuelle s'avère être d'environ 87,81 Mo.

Et maintenant, en modifiant la valeur de retour de la fonction .write(), nous obtenons :

bash
Without respecting the return value of .write():
==================================================
real        54.48
user        53.15
sys          7.43
1524965376  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
    373617  page reclaims
      3139  page faults
         0  swaps
        18  block input operations
       199  block output operations
         0  messages sent
         0  messages received
         1  signals received
        25  voluntary context switches
    629566  involuntary context switches

La taille maximale en octets occupée par la mémoire virtuelle s'avère être d'environ 1,52 Go.

Sans les flux en place pour déléguer la contre-pression, il y a un ordre de grandeur supérieur d'espace mémoire alloué – une énorme différence entre le même processus !

Cette expérience montre à quel point le mécanisme de contre-pression de Node.js est optimisé et économique pour votre système informatique. Maintenant, décortiquons son fonctionnement !

Comment la contre-pression résout-elle ces problèmes ?

Il existe différentes fonctions pour transférer des données d'un processus à un autre. Dans Node.js, il existe une fonction interne intégrée appelée .pipe(). Il existe également d'autres packages que vous pouvez utiliser ! En fin de compte, au niveau fondamental de ce processus, nous avons deux composants distincts : la source des données et le consommateur.

Lorsque .pipe() est appelé depuis la source, il signale au consommateur qu'il y a des données à transférer. La fonction pipe permet de mettre en place les fermetures de contre-pression appropriées pour les déclencheurs d'événements.

Dans Node.js, la source est un flux Readable et le consommateur est le flux Writable (ces deux éléments peuvent être interchangés avec un flux Duplex ou Transform, mais cela dépasse le cadre de ce guide).

Le moment où la contre-pression est déclenchée peut être exactement réduit à la valeur de retour de la fonction .write() d'un Writable. Cette valeur de retour est déterminée par quelques conditions, bien sûr.

Dans tous les scénarios où la mémoire tampon de données a dépassé la valeur highwaterMark ou où la file d'attente d'écriture est actuellement occupée, .write() renverra false.

Lorsqu'une valeur false est renvoyée, le système de contre-pression se déclenche. Il mettra en pause le flux Readable entrant pour empêcher l'envoi de données et attendra que le consommateur soit à nouveau prêt. Une fois la mémoire tampon de données vidée, un événement 'drain' sera émis et reprendra le flux de données entrant.

Une fois la file d'attente terminée, la contre-pression permettra à nouveau l'envoi de données. L'espace mémoire utilisé se libérera et se préparera pour le prochain lot de données.

Cela permet effectivement d'utiliser une quantité de mémoire fixe à tout moment pour une fonction .pipe(). Il n'y aura pas de fuite de mémoire, ni de mise en mémoire tampon infinie, et le garbage collector n'aura qu'à gérer une seule zone en mémoire !

Donc, si la contre-pression est si importante, pourquoi ne l'avez-vous (probablement) pas entendue ? Eh bien, la réponse est simple : Node.js fait tout cela automatiquement pour vous.

C'est tellement bien ! Mais aussi pas si bien lorsque nous essayons de comprendre comment implémenter nos propres flux personnalisés.

NOTE

Sur la plupart des machines, il existe une taille en octets qui détermine quand une mémoire tampon est pleine (ce qui variera selon les machines). Node.js vous permet de définir votre propre highWaterMark, mais généralement, la valeur par défaut est définie sur 16 ko (16384, ou 16 pour les flux objectMode). Dans les cas où vous souhaiteriez augmenter cette valeur, faites-le, mais avec prudence !

Cycle de vie de .pipe()

Pour mieux comprendre la contre-pression, voici un organigramme du cycle de vie d'un flux Readable injecté dans un flux Writable :

bash
                                                     +===================+
                         x-->  Fonctions d'injection   +-->   src.pipe(dest)  |
                         x     sont configurées pendant     |===================|
                         x     la méthode .pipe.     |  Gestionnaires d'événements  |
  +===============+      x                           |-------------------|
  |   Vos Données   |      x     Elles existent en dehors    | .on('close', cb)  |
  +=======+=======+      x     du flux de données, mais    | .on('data', cb)   |
          |              x     attachent des événements    | .on('drain', cb)  |
          |              x     et leurs gestionnaires      | .on('unpipe', cb) |
+---------v---------+    x     respectivement.          | .on('error', cb)  |
|  Flux Readable  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Flux Writable  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Ce chunk est-il trop volumineux ?  |
  ^       |       |     émettre .end();             |    La file d'attente est-elle occupée ?      |
  |       |       +-> sinon                       +-------+----------------+---+
  |       ^       |     émettre .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  Non  |        |  Oui  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               émettre .pause();          +=================+     |
  |       ^---------------^-----------------------+  retourner false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            lorsque la file d'attente est vide     +============+                         |
  ^------------^-----------------------<  Mise en mémoire tampon |                         |
               |                       |============|                         |
               +> émettre .drain();       |  ^Tampon^  |                         |
               +> émettre .resume();      +------------+                         |
                                       |  ^Tampon^  |                         |
                                       +------------+   ajouter le chunk à la file d'attente    |
                                       |            <---^---------------------<
                                       +============+

REMARQUE

Si vous configurez un pipeline pour chaîner plusieurs flux afin de manipuler vos données, vous implémenterez très probablement un flux Transform.

Dans ce cas, votre sortie de votre flux Readable entrera dans le Transform et sera injectée dans le Writable.

javascript
Readable.pipe(Transformable).pipe(Writable)

La contre-pression sera appliquée automatiquement, mais notez que les highwaterMark entrants et sortants du flux Transform peuvent être manipulés et affecteront le système de contre-pression.

Directives relatives à la contre-pression

Depuis Node.js v0.10, la classe Stream offre la possibilité de modifier le comportement de .read() ou .write() en utilisant la version underscore de ces fonctions respectives (._read() et ._write()).

Des directives sont documentées pour l'implémentation des flux Readable et l'implémentation des flux Writable. Nous supposerons que vous les avez lues, et la section suivante approfondira un peu plus le sujet.

Règles à respecter lors de l'implémentation de flux personnalisés

La règle d'or des flux est de toujours respecter la contre-pression. Ce qui constitue une meilleure pratique est une pratique non contradictoire. Tant que vous veillez à éviter les comportements qui entrent en conflit avec le support interne de la contre-pression, vous pouvez être sûr de suivre les bonnes pratiques.

En général,

  1. Ne jamais .push() si on ne vous le demande pas.
  2. Ne jamais appeler .write() après qu'il renvoie false, mais attendre 'drain' à la place.
  3. Les flux changent entre les différentes versions de Node.js et la bibliothèque que vous utilisez. Soyez prudent et testez les choses.

NOTE

En ce qui concerne le point 3, un package incroyablement utile pour construire des flux de navigateur est readable-stream. Rodd Vagg a écrit un excellent article de blog décrivant l'utilité de cette bibliothèque. En bref, elle fournit un type de dégradation gracieuse automatisée pour les flux Readable, et supporte les anciennes versions des navigateurs et de Node.js.

Règles spécifiques aux flux Readable

Jusqu'à présent, nous avons examiné comment .write() affecte la contre-pression et nous nous sommes concentrés sur le flux Writable. En raison des fonctionnalités de Node.js, les données circulent techniquement en aval de Readable vers Writable. Cependant, comme nous pouvons l'observer dans toute transmission de données, de matière ou d'énergie, la source est tout aussi importante que la destination, et le flux Readable est vital pour la gestion de la contre-pression.

Ces deux processus dépendent l'un de l'autre pour communiquer efficacement. Si le Readable ignore la demande du flux Writable de cesser d'envoyer des données, cela peut être tout aussi problématique que lorsque la valeur de retour de .write() est incorrecte.

Ainsi, en plus de respecter la valeur de retour de .write(), nous devons également respecter la valeur de retour de .push() utilisée dans la méthode ._read(). Si .push() renvoie une valeur false, le flux cessera de lire à partir de la source. Sinon, il continuera sans pause.

Voici un exemple de mauvaise pratique utilisant .push() :

javascript
// Ceci est problématique car il ignore complètement la valeur de retour de push
// qui peut être un signal de contre-pression du flux de destination !
class MyReadable extends Readable {
  _read(size) {
    let chunk
    while (null == (chunk = getNextChunk())) {
      this.push(chunk)
    }
  }
}

De plus, en dehors du flux personnalisé, il y a des pièges à ignorer la contre-pression. Dans ce contre-exemple de bonne pratique, le code de l'application force les données à passer chaque fois qu'elles sont disponibles (signalées par l'événement 'data') :

javascript
// Ceci ignore les mécanismes de contre-pression mis en place par Node.js,
// et pousse inconditionnellement les données, indépendamment du fait que
// le flux de destination soit prêt ou non.
readable.on('data', data => writable.write(data))

Voici un exemple d'utilisation de .push() avec un flux Readable.

javascript
const { Readable } = require('node:stream')

// Créer un flux Readable personnalisé
const myReadableStream = new Readable({
  objectMode: true,
  read(size) {
    // Pousser des données dans le flux
    this.push({ message: 'Hello, world!' })
    this.push(null) // Marquer la fin du flux
  },
})

// Consommer le flux
myReadableStream.on('data', chunk => {
  console.log(chunk)
})

// Sortie :
// { message: 'Hello, world!' }

Règles spécifiques aux flux Writable

Rappelons qu'une méthode .write() peut renvoyer true ou false en fonction de certaines conditions. Heureusement pour nous, lorsque nous construisons notre propre flux Writable, la machine à états du flux gérera nos callbacks et déterminera quand gérer la contre-pression et optimiser le flux de données pour nous. Cependant, lorsque nous voulons utiliser un Writable directement, nous devons respecter la valeur de retour de .write() et prêter une attention particulière à ces conditions :

  • Si la file d'attente d'écriture est occupée, .write() renverra false.
  • Si le bloc de données est trop volumineux, .write() renverra false (la limite est indiquée par la variable highWaterMark).

Dans cet exemple, nous créons un flux Readable personnalisé qui pousse un seul objet sur le flux en utilisant .push(). La méthode ._read() est appelée lorsque le flux est prêt à consommer des données, et dans ce cas, nous poussons immédiatement des données sur le flux et marquons la fin du flux en poussant null.

javascript
const stream = require('stream')

class MyReadable extends stream.Readable {
  constructor() {
    super()
  }

  _read() {
    const data = { message: 'Hello, world!' }
    this.push(data)
    this.push(null)
  }
}

const readableStream = new MyReadable()

readableStream.pipe(process.stdout)

Nous consommons ensuite le flux en écoutant l'événement 'data' et en enregistrant chaque bloc de données qui est poussé sur le flux. Dans ce cas, nous ne poussons qu'un seul bloc de données sur le flux, nous ne voyons donc qu'un seul message de journal.

Règles spécifiques aux flux Writable

Rappelons qu'une méthode .write() peut renvoyer true ou false en fonction de certaines conditions. Heureusement pour nous, lorsque nous construisons notre propre flux Writable, la machine à états du flux gérera nos callbacks et déterminera quand gérer la contre-pression et optimiser le flux de données pour nous.

Cependant, lorsque nous voulons utiliser un Writable directement, nous devons respecter la valeur de retour de .write() et prêter une attention particulière à ces conditions :

  • Si la file d'attente d'écriture est occupée, .write() renverra false.
  • Si le bloc de données est trop volumineux, .write() renverra false (la limite est indiquée par la variable highWaterMark).
javascript
class MyWritable extends Writable {
  // Ce writable est invalide en raison de la nature asynchrone des callbacks JavaScript.
  // Sans instruction return pour chaque callback avant le dernier,
  // il y a de fortes chances que plusieurs callbacks soient appelés.
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) callback()
    else if (chunk.toString().indexOf('b') >= 0) callback()
    callback()
  }
}

Il y a aussi certaines choses à surveiller lors de l'implémentation de ._writev(). La fonction est couplée à .cork(), mais il y a une erreur courante lors de l'écriture :

javascript
// Utiliser .uncork() deux fois ici effectue deux appels sur la couche C++, rendant la
// technique cork/uncork inutile.
ws.cork()
ws.write('hello ')
ws.write('world ')
ws.uncork()

ws.cork()
ws.write('from ')
ws.write('Matteo')
ws.uncork()

// La bonne façon d'écrire ceci est d'utiliser process.nextTick(), qui se déclenche
// lors de la prochaine boucle d'événements.
ws.cork()
ws.write('hello ')
ws.write('world ')
process.nextTick(doUncork, ws)

ws.cork()
ws.write('from ')
ws.write('Matteo')
process.nextTick(doUncork, ws)

// En tant que fonction globale.
function doUncork(stream) {
  stream.uncork()
}

.cork() peut être appelé autant de fois que nous le voulons, nous devons simplement faire attention à appeler .uncork() le même nombre de fois pour le faire refluer.

Conclusion

Les streams sont un module fréquemment utilisé dans Node.js. Ils sont importants pour la structure interne et, pour les développeurs, pour étendre et connecter l'écosystème des modules Node.js.

Espérons que vous serez maintenant capable de déboguer et de coder en toute sécurité vos propres streams Writable et Readable en gardant la contre-pression à l'esprit, et de partager vos connaissances avec vos collègues et amis.

N'oubliez pas de vous renseigner davantage sur Stream pour d'autres fonctions API afin d'améliorer et de libérer vos capacités de streaming lors de la création d'une application avec Node.js.