Skip to content

Stream

[Stable: 2 - Stable]

Stable: 2 Stabilité : 2 - Stable

Code Source : lib/stream.js

Un flux est une interface abstraite pour travailler avec des données en flux dans Node.js. Le module node:stream fournit une API pour implémenter l'interface de flux.

Il existe de nombreux objets flux fournis par Node.js. Par exemple, une requête vers un serveur HTTP et process.stdout sont tous deux des instances de flux.

Les flux peuvent être lisibles, inscriptibles, ou les deux. Tous les flux sont des instances de EventEmitter.

Pour accéder au module node:stream :

js
const stream = require('node:stream')

Le module node:stream est utile pour créer de nouveaux types d'instances de flux. Il n'est généralement pas nécessaire d'utiliser le module node:stream pour consommer des flux.

Organisation de ce document

Ce document contient deux sections principales et une troisième section pour les notes. La première section explique comment utiliser les flux existants au sein d'une application. La deuxième section explique comment créer de nouveaux types de flux.

Types de flux

Il existe quatre types de flux fondamentaux dans Node.js :

De plus, ce module comprend les fonctions utilitaires stream.duplexPair(), stream.pipeline(), stream.finished() stream.Readable.from() et stream.addAbortSignal().

API Streams Promises

Ajouté dans : v15.0.0

L'API stream/promises fournit un ensemble alternatif de fonctions utilitaires asynchrones pour les streams qui retournent des objets Promise plutôt que d'utiliser des rappels. L'API est accessible via require('node:stream/promises') ou require('node:stream').promises.

stream.pipeline(source[, ...transforms], destination[, options])

stream.pipeline(streams[, options])

[Historique]

VersionChangements
v18.0.0, v17.2.0, v16.14.0Ajout de l'option end, qui peut être définie à false pour empêcher la fermeture automatique du stream de destination lorsque la source se termine.
v15.0.0Ajouté dans : v15.0.0
js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
  console.log('Pipeline réussi.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Pipeline réussi.')

Pour utiliser un AbortSignal, passez-le dans un objet options, comme dernier argument. Lorsque le signal est annulé, destroy sera appelé sur le pipeline sous-jacent, avec une AbortError.

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  const ac = new AbortController()
  const signal = ac.signal

  setImmediate(() => ac.abort())
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
    signal,
  })
}

run().catch(console.error) // AbortError
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
  await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
  console.error(err) // AbortError
}

L'API pipeline prend également en charge les générateurs asynchrones :

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(
    fs.createReadStream('lowercase.txt'),
    async function* (source, { signal }) {
      source.setEncoding('utf8') // Travailler avec des chaînes plutôt que des `Buffer`s.
      for await (const chunk of source) {
        yield await processChunk(chunk, { signal })
      }
    },
    fs.createWriteStream('uppercase.txt')
  )
  console.log('Pipeline réussi.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'

await pipeline(
  createReadStream('lowercase.txt'),
  async function* (source, { signal }) {
    source.setEncoding('utf8') // Travailler avec des chaînes plutôt que des `Buffer`s.
    for await (const chunk of source) {
      yield await processChunk(chunk, { signal })
    }
  },
  createWriteStream('uppercase.txt')
)
console.log('Pipeline réussi.')

N'oubliez pas de gérer l'argument signal passé au générateur asynchrone. En particulier dans le cas où le générateur asynchrone est la source du pipeline (c.-à-d. le premier argument) ou le pipeline ne sera jamais terminé.

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(async function* ({ signal }) {
    await someLongRunningfn({ signal })
    yield 'asd'
  }, fs.createWriteStream('uppercase.txt'))
  console.log('Pipeline réussi.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
  await someLongRunningfn({ signal })
  yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline réussi.')

L'API pipeline fournit une version avec callback :

stream.finished(stream[, options])

[Historique]

VersionModifications
v19.5.0, v18.14.0Ajout de la prise en charge de ReadableStream et WritableStream.
v19.1.0, v18.13.0L'option cleanup a été ajoutée.
v15.0.0Ajouté dans : v15.0.0
js
const { finished } = require('node:stream/promises')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Le flux a fini de lire.')
}

run().catch(console.error)
rs.resume() // Vidanger le flux.
js
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'

const rs = createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Le flux a fini de lire.')
}

run().catch(console.error)
rs.resume() // Vidanger le flux.

L’API finished fournit également une version de rappel.

stream.finished() laisse des écouteurs d’événements en suspens (en particulier 'error', 'end', 'finish' et 'close') après que la promesse retournée a été résolue ou rejetée. La raison en est qu’il est possible que des événements 'error' inattendus (en raison d’implémentations de flux incorrectes) ne provoquent pas de plantages inattendus. Si ce comportement n’est pas souhaité, alors options.cleanup doit être défini sur true :

js
await finished(rs, { cleanup: true })

Mode objet

Tous les flux créés par les API de Node.js fonctionnent exclusivement sur des chaînes, des objets <Buffer>, <TypedArray> et <DataView> :

  • Les Strings et les Buffers sont les types les plus couramment utilisés avec les flux.
  • TypedArray et DataView vous permettent de gérer les données binaires avec des types tels que Int32Array ou Uint8Array. Lorsque vous écrivez un TypedArray ou un DataView dans un flux, Node.js traite les octets bruts.

Il est cependant possible que les implémentations de flux fonctionnent avec d'autres types de valeurs JavaScript (à l'exception de null, qui a une fonction spéciale dans les flux). On considère que ces flux fonctionnent en "mode objet".

Les instances de flux sont basculées en mode objet à l'aide de l'option objectMode lors de la création du flux. Il n'est pas sûr de tenter de basculer un flux existant en mode objet.

Mise en mémoire tampon

Les flux Writable et Readable stockent les données dans une mémoire tampon interne.

La quantité de données potentiellement mises en mémoire tampon dépend de l'option highWaterMark transmise au constructeur du flux. Pour les flux normaux, l'option highWaterMark spécifie un nombre total d'octets. Pour les flux fonctionnant en mode objet, la valeur highWaterMark spécifie un nombre total d'objets. Pour les flux fonctionnant sur des chaînes (mais ne les décodant pas), highWaterMark spécifie un nombre total d'unités de code UTF-16.

Les données sont mises en mémoire tampon dans les flux Readable lorsque l'implémentation appelle stream.push(chunk). Si le consommateur du flux n'appelle pas stream.read(), les données restent dans la file d'attente interne jusqu'à ce qu'elles soient consommées.

Une fois que la taille totale de la mémoire tampon de lecture interne atteint le seuil spécifié par highWaterMark, le flux arrête temporairement de lire des données de la ressource sous-jacente jusqu'à ce que les données actuellement mises en mémoire tampon puissent être consommées (c'est-à-dire que le flux cesse d'appeler la méthode interne readable._read() utilisée pour remplir la mémoire tampon de lecture).

Les données sont mises en mémoire tampon dans les flux Writable lorsque la méthode writable.write(chunk) est appelée de manière répétée. Tant que la taille totale de la mémoire tampon d'écriture interne est inférieure au seuil défini par highWaterMark, les appels à writable.write() renvoient true. Une fois que la taille de la mémoire tampon interne atteint ou dépasse highWaterMark, false est renvoyé.

Un objectif clé de l'API stream, en particulier de la méthode stream.pipe(), est de limiter la mise en mémoire tampon des données à des niveaux acceptables, de sorte que les sources et les destinations de vitesses différentes ne surchargent pas la mémoire disponible.

L'option highWaterMark est un seuil, pas une limite : elle détermine la quantité de données qu'un flux met en mémoire tampon avant d'arrêter de demander davantage de données. Elle n'impose pas de limitation stricte de la mémoire en général. Les implémentations de flux spécifiques peuvent choisir d'imposer des limites plus strictes, mais cela est facultatif.

Étant donné que les flux Duplex et Transform sont à la fois Readable et Writable, chacun maintient deux mémoires tampons internes distinctes utilisées pour la lecture et l'écriture, ce qui permet à chaque côté de fonctionner indépendamment de l'autre tout en maintenant un flux de données approprié et efficace. Par exemple, les instances de net.Socket sont des flux Duplex dont le côté Readable permet de consommer les données reçues du socket et dont le côté Writable permet d'écrire des données dans le socket. Étant donné que les données peuvent être écrites dans le socket à un débit plus rapide ou plus lent que les données reçues, chaque côté doit fonctionner (et mettre en mémoire tampon) indépendamment de l'autre.

Les mécanismes de la mise en mémoire tampon interne sont un détail d'implémentation interne et peuvent être modifiés à tout moment. Cependant, pour certaines implémentations avancées, les mémoires tampons internes peuvent être récupérées à l'aide de writable.writableBuffer ou readable.readableBuffer. L'utilisation de ces propriétés non documentées est déconseillée.

API pour les consommateurs de flux

Presque toutes les applications Node.js, même les plus simples, utilisent des flux d'une manière ou d'une autre. Voici un exemple d'utilisation de flux dans une application Node.js qui implémente un serveur HTTP :

js
const http = require('node:http')

const server = http.createServer((req, res) => {
  // `req` est un http.IncomingMessage, qui est un flux lisible.
  // `res` est un http.ServerResponse, qui est un flux écrivable.

  let body = ''
  // Obtenez les données sous forme de chaînes utf8.
  // Si un encodage n'est pas défini, des objets Buffer seront reçus.
  req.setEncoding('utf8')

  // Les flux lisibles émettent des événements 'data' une fois qu'un écouteur est ajouté.
  req.on('data', chunk => {
    body += chunk
  })

  // L'événement 'end' indique que l'ensemble du corps a été reçu.
  req.on('end', () => {
    try {
      const data = JSON.parse(body)
      // Réécrivez quelque chose d'intéressant pour l'utilisateur :
      res.write(typeof data)
      res.end()
    } catch (er) {
      // oups ! mauvais json !
      res.statusCode = 400
      return res.end(`erreur : ${er.message}`)
    }
  })
})

server.listen(1337)

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// erreur : Unexpected token 'o', "not json" is not valid JSON

Les flux Writable (tels que res dans l'exemple) exposent des méthodes telles que write() et end() qui sont utilisées pour écrire des données dans le flux.

Les flux Readable utilisent l'API EventEmitter pour notifier le code d'application lorsque des données sont disponibles pour être lues à partir du flux. Ces données disponibles peuvent être lues à partir du flux de plusieurs manières.

Les flux Writable et Readable utilisent l'API EventEmitter de diverses manières pour communiquer l'état actuel du flux.

Les flux Duplex et Transform sont à la fois Writable et Readable.

Les applications qui écrivent des données dans un flux ou qui consomment des données à partir d'un flux ne sont pas tenues d'implémenter directement les interfaces de flux et n'auront généralement aucune raison d'appeler require('node:stream').

Les développeurs souhaitant implémenter de nouveaux types de flux doivent se référer à la section API pour les implémenteurs de flux.

Flux d'écriture

Les flux d'écriture sont une abstraction pour une destination vers laquelle les données sont écrites.

Les exemples de flux Writable incluent :

Certains de ces exemples sont en fait des flux Duplex qui implémentent l'interface Writable.

Tous les flux Writable implémentent l'interface définie par la classe stream.Writable.

Bien que des instances spécifiques de flux Writable puissent différer de diverses manières, tous les flux Writable suivent le même modèle d'utilisation fondamental, comme illustré dans l'exemple ci-dessous :

js
const myStream = getWritableStreamSomehow()
myStream.write('some data')
myStream.write('some more data')
myStream.end('done writing data')

Classe : stream.Writable

Ajouté dans : v0.9.4

Événement : 'close'

[Historique]

VersionChangements
v10.0.0Ajout de l'option emitClose pour spécifier si 'close' est émis lors de la destruction.
v0.9.4Ajouté dans : v0.9.4

L'événement 'close' est émis lorsque le flux et l'une de ses ressources sous-jacentes (un descripteur de fichier, par exemple) ont été fermés. L'événement indique qu'il n'y aura plus d'événements émis et qu'aucun autre calcul n'aura lieu.

Un flux Writable émettra toujours l'événement 'close' s'il est créé avec l'option emitClose.

Événement : 'drain'

Ajouté dans : v0.9.4

Si un appel à stream.write(chunk) renvoie false, l'événement 'drain' sera émis lorsqu'il sera approprié de reprendre l'écriture de données dans le flux.

js
// Écrire les données dans le flux d'écriture fourni un million de fois.
// Soyez attentif à la contre-pression.
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000
  write()
  function write() {
    let ok = true
    do {
      i--
      if (i === 0) {
        // Dernière fois !
        writer.write(data, encoding, callback)
      } else {
        // Voyons si nous devons continuer ou attendre.
        // Ne passez pas le rappel, car nous n'avons pas encore terminé.
        ok = writer.write(data, encoding)
      }
    } while (i > 0 && ok)
    if (i > 0) {
      // Il a fallu s'arrêter tôt !
      // Écrire un peu plus une fois qu'il est vidé.
      writer.once('drain', write)
    }
  }
}
Événement : 'error'

Ajouté dans : v0.9.4

L’événement 'error' est émis si une erreur s’est produite lors de l’écriture ou de la transmission de données. La fonction de rappel de l’écouteur reçoit un seul argument Error lorsqu’elle est appelée.

Le flux est fermé lorsque l’événement 'error' est émis, sauf si l’option autoDestroy a été définie sur false lors de la création du flux.

Après 'error', aucun autre événement que 'close' ne devrait être émis (y compris les événements 'error').

Événement : 'finish'

Ajouté dans : v0.9.4

L’événement 'finish' est émis après que la méthode stream.end() a été appelée et que toutes les données ont été vidées vers le système sous-jacent.

js
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
  console.log('Toutes les écritures sont maintenant terminées.')
})
writer.end('C’est la fin\n')
Événement : 'pipe'

Ajouté dans : v0.9.4

L’événement 'pipe' est émis lorsque la méthode stream.pipe() est appelée sur un flux lisible, ajoutant cet élément inscriptible à son ensemble de destinations.

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
  console.log('Quelque chose est en cours de redirection vers l’enregistreur.')
  assert.equal(src, reader)
})
reader.pipe(writer)
Événement : 'unpipe'

Ajouté dans : v0.9.4

L’événement 'unpipe' est émis lorsque la méthode stream.unpipe() est appelée sur un flux Readable, en supprimant cet élément Writable de son ensemble de destinations.

Il est également émis au cas où ce flux Writable émet une erreur lorsqu’un flux Readable est redirigé vers lui.

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
  console.log('Quelque chose a cessé d’être redirigé vers l’enregistreur.')
  assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()

Ajouté dans la version : v0.11.2

La méthode writable.cork() force toutes les données écrites à être mises en mémoire tampon. Les données mises en mémoire tampon sont vidées lorsque les méthodes stream.uncork() ou stream.end() sont appelées.

L'objectif principal de writable.cork() est de prendre en compte une situation dans laquelle plusieurs petits morceaux sont écrits dans le flux en succession rapide. Au lieu de les transférer immédiatement vers la destination sous-jacente, writable.cork() met en mémoire tampon tous les morceaux jusqu'à ce que writable.uncork() soit appelé, ce qui les transmettra tous à writable._writev(), s'il est présent. Cela empêche une situation de blocage en tête de ligne où les données sont mises en mémoire tampon en attendant que le premier petit morceau soit traité. Toutefois, l'utilisation de writable.cork() sans implémenter writable._writev() peut avoir un effet négatif sur le débit.

Voir aussi : writable.uncork(), writable._writev().

writable.destroy([error])

[Historique]

VersionModifications
v14.0.0Fonctionne comme une opération sans effet sur un flux qui a déjà été détruit.
v8.0.0Ajouté dans la version : v8.0.0
  • error <Error> Facultatif, une erreur à émettre avec l'événement 'error'.
  • Retourne : <this>

Détruit le flux. Émet éventuellement un événement 'error' et émet un événement 'close' (sauf si emitClose est défini sur false). Après cet appel, le flux accessible en écriture est terminé et les appels suivants à write() ou end() génèrent une erreur ERR_STREAM_DESTROYED. Il s'agit d'une manière destructrice et immédiate de détruire un flux. Les appels précédents à write() peuvent ne pas avoir été vidés et peuvent déclencher une erreur ERR_STREAM_DESTROYED. Utilisez end() au lieu de destroy si les données doivent être vidées avant la fermeture, ou attendez l'événement 'drain' avant de détruire le flux.

js
const { Writable } = require('node:stream')

const myStream = new Writable()

const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
js
const { Writable } = require('node:stream')

const myStream = new Writable()

myStream.destroy()
myStream.on('error', function wontHappen() {})
js
const { Writable } = require('node:stream')

const myStream = new Writable()
myStream.destroy()

myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED

Une fois que destroy() a été appelé, tous les appels suivants seront des opérations sans effet et aucune autre erreur que celle de _destroy() ne peut être émise en tant que 'error'.

Les implémenteurs ne doivent pas remplacer cette méthode, mais plutôt implémenter writable._destroy().

writable.closed

Ajouté dans : v18.0.0

Est true après l’émission de 'close'.

writable.destroyed

Ajouté dans : v8.0.0

Est true après l’appel à writable.destroy().

js
const { Writable } = require('node:stream')

const myStream = new Writable()

console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])

[Historique]

VersionModifications
v22.0.0, v20.13.0L’argument chunk peut maintenant être une instance TypedArray ou DataView.
v15.0.0Le callback est invoqué avant 'finish' ou en cas d’erreur.
v14.0.0Le callback est invoqué si 'finish' ou 'error' est émis.
v10.0.0Cette méthode renvoie maintenant une référence à writable.
v8.0.0L’argument chunk peut maintenant être une instance Uint8Array.
v0.9.4Ajouté dans : v0.9.4

L’appel de la méthode writable.end() signale qu’il n’y aura plus de données écrites dans le Writable. Les arguments optionnels chunk et encoding permettent d’écrire un dernier bloc de données supplémentaire juste avant de fermer le flux.

L’appel de la méthode stream.write() après avoir appelé stream.end() lèvera une erreur.

js
// Écrire 'hello, ' puis terminer par 'world!'.
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hello, ')
file.end('world!')
// Il n’est plus autorisé d’écrire maintenant !
writable.setDefaultEncoding(encoding)

[Historique]

VersionModifications
v6.1.0Cette méthode renvoie désormais une référence à writable.
v0.11.15Ajoutée dans : v0.11.15

La méthode writable.setDefaultEncoding() définit l' encoding par défaut pour un flux Writable.

writable.uncork()

Ajoutée dans : v0.11.2

La méthode writable.uncork() vide toutes les données mises en mémoire tampon depuis l'appel à stream.cork().

Lors de l'utilisation de writable.cork() et writable.uncork() pour gérer la mise en mémoire tampon des écritures dans un flux, différez les appels à writable.uncork() en utilisant process.nextTick(). Cela permet le traitement par lots de tous les appels writable.write() qui se produisent au cours d'une phase donnée de la boucle d'événements de Node.js.

js
stream.cork()
stream.write('quelques ')
stream.write('données ')
process.nextTick(() => stream.uncork())

Si la méthode writable.cork() est appelée plusieurs fois sur un flux, le même nombre d'appels à writable.uncork() doit être effectué pour vider les données mises en mémoire tampon.

js
stream.cork()
stream.write('quelques ')
stream.cork()
stream.write('données ')
process.nextTick(() => {
  stream.uncork()
  // Les données ne seront pas vidées tant que uncork() n'est pas appelé une seconde fois.
  stream.uncork()
})

Voir également : writable.cork().

writable.writable

Ajoutée dans : v11.4.0

Est true s'il est sûr d'appeler writable.write(), ce qui signifie que le flux n'a pas été détruit, n'a pas généré d'erreur ou n'est pas terminé.

writable.writableAborted

Ajoutée dans : v18.0.0, v16.17.0

[Stable : 1 - Expérimental]

Stable : 1 Stabilité : 1 - Expérimental

Retourne si le flux a été détruit ou a généré une erreur avant d'émettre 'finish'.

writable.writableEnded

Ajouté dans : v12.9.0

Est true après que writable.end() a été appelé. Cette propriété n'indique pas si les données ont été vidées, pour cela utilisez plutôt writable.writableFinished.

writable.writableCorked

Ajouté dans : v13.2.0, v12.16.0

Nombre de fois que writable.uncork() doit être appelé pour décorker entièrement le flux.

writable.errored

Ajouté dans : v18.0.0

Retourne une erreur si le flux a été détruit avec une erreur.

writable.writableFinished

Ajouté dans : v12.6.0

Est mis à true immédiatement avant que l'événement 'finish' soit émis.

writable.writableHighWaterMark

Ajouté dans : v9.3.0

Retourne la valeur de highWaterMark passée lors de la création de ce Writable.

writable.writableLength

Ajouté dans : v9.4.0

Cette propriété contient le nombre d'octets (ou d'objets) dans la file d'attente prêts à être écrits. La valeur fournit des données d'introspection concernant l'état de highWaterMark.

writable.writableNeedDrain

Ajouté dans : v15.2.0, v14.17.0

Est true si le tampon du flux est plein et que le flux émettra 'drain'.

writable.writableObjectMode

Ajouté dans : v12.3.0

Accesseur pour la propriété objectMode d’un flux Writable donné.

writable[Symbol.asyncDispose]()

Ajouté dans : v22.4.0, v20.16.0

[Stable: 1 - Experimental]

Stable: 1 Stabilité : 1 - Expérimental

Appelle writable.destroy() avec une AbortError et renvoie une promesse qui est tenue lorsque le flux est terminé.

writable.write(chunk[, encoding][, callback])

[Historique]

VersionChangements
v22.0.0, v20.13.0L’argument chunk peut maintenant être une instance TypedArray ou DataView.
v8.0.0L’argument chunk peut maintenant être une instance Uint8Array.
v6.0.0Le passage de null comme paramètre chunk sera toujours considéré comme invalide maintenant, même en mode objet.
v0.9.4Ajouté dans : v0.9.4
  • chunk <string> | <Buffer> | <TypedArray> | <DataView> | <any> Données facultatives à écrire. Pour les flux ne fonctionnant pas en mode objet, chunk doit être un <string>, un <Buffer>, un <TypedArray> ou une <DataView>. Pour les flux en mode objet, chunk peut être n’importe quelle valeur JavaScript autre que null.
  • encoding <string> | <null> L’encodage, si chunk est une chaîne. Par défaut : 'utf8'
  • callback <Function> Fonction de rappel lorsque ce bloc de données est vidé.
  • Retourne : <boolean> false si le flux souhaite que le code appelant attende que l’événement 'drain' soit émis avant de continuer à écrire des données supplémentaires ; sinon true.

La méthode writable.write() écrit des données dans le flux et appelle la callback fournie une fois que les données ont été entièrement traitées. Si une erreur se produit, la callback sera appelée avec l’erreur comme premier argument. La callback est appelée de manière asynchrone et avant que 'error' ne soit émis.

La valeur de retour est true si la mémoire tampon interne est inférieure à la highWaterMark configurée lors de la création du flux après avoir admis chunk. Si false est retourné, d’autres tentatives d’écriture de données dans le flux doivent s’arrêter jusqu’à ce que l’événement 'drain' soit émis.

Tant qu’un flux ne se vide pas, les appels à write() mettront chunk en mémoire tampon et retourneront false. Une fois que tous les blocs actuellement mis en mémoire tampon sont vidés (acceptés pour la livraison par le système d’exploitation), l’événement 'drain' sera émis. Une fois que write() retourne false, n’écrivez plus de blocs jusqu’à ce que l’événement 'drain' soit émis. Bien qu’il soit permis d’appeler write() sur un flux qui ne se vide pas, Node.js mettra en mémoire tampon tous les blocs écrits jusqu’à ce que l’utilisation maximale de la mémoire se produise, auquel cas il abandonnera inconditionnellement. Même avant qu’il n’abandonne, une forte utilisation de la mémoire entraînera de mauvaises performances du collecteur de déchets et un RSS élevé (qui n’est généralement pas restitué au système, même après que la mémoire n’est plus nécessaire). Étant donné que les sockets TCP peuvent ne jamais se vider si le pair distant ne lit pas les données, l’écriture dans un socket qui ne se vide pas peut entraîner une vulnérabilité exploitable à distance.

L’écriture de données pendant que le flux ne se vide pas est particulièrement problématique pour un Transform, car les flux Transform sont mis en pause par défaut jusqu’à ce qu’ils soient connectés ou qu’un gestionnaire d’événements 'data' ou 'readable' soit ajouté.

Si les données à écrire peuvent être générées ou récupérées à la demande, il est recommandé d’encapsuler la logique dans un Readable et d’utiliser stream.pipe(). Toutefois, si l’appel à write() est préféré, il est possible de respecter la contre-pression et d’éviter les problèmes de mémoire en utilisant l’événement 'drain' :

js
function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb)
  } else {
    process.nextTick(cb)
  }
}

// Attendre que cb soit appelée avant de faire d’autres écritures.
write('hello', () => {
  console.log('Écriture terminée, faire plus d’écritures maintenant.')
})

Un flux Writable en mode objet ignorera toujours l’argument encoding.

Flux de lecture

Les flux de lecture sont une abstraction pour une source à partir de laquelle les données sont consommées.

Voici des exemples de flux Readable :

Tous les flux Readable implémentent l’interface définie par la classe stream.Readable.

Deux modes de lecture

Les flux Readable fonctionnent de fait dans l’un des deux modes suivants : flux continu et pause. Ces modes sont distincts du mode objet. Un flux Readable peut être en mode objet ou non, qu’il soit en mode flux continu ou en mode pause.

  • En mode flux continu, les données sont lues automatiquement à partir du système sous-jacent et fournies à une application aussi rapidement que possible en utilisant des événements via l’interface EventEmitter.
  • En mode pause, la méthode stream.read() doit être appelée explicitement pour lire des fragments de données depuis le flux.

Tous les flux Readable commencent en mode pause, mais peuvent être basculés en mode flux continu de l’une des manières suivantes :

Le flux Readable peut revenir en mode pause en utilisant l’une des méthodes suivantes :

  • S’il n’y a pas de destinations de flux, en appelant la méthode stream.pause().
  • S’il existe des destinations de flux, en supprimant toutes les destinations de flux. Plusieurs destinations de flux peuvent être supprimées en appelant la méthode stream.unpipe().

Le concept important à retenir est qu’un flux Readable ne générera pas de données tant qu’un mécanisme permettant de consommer ou d’ignorer ces données n’est pas fourni. Si le mécanisme de consommation est désactivé ou supprimé, le flux Readable tentera d’arrêter la génération de données.

Pour des raisons de rétrocompatibilité, la suppression des gestionnaires d’événements 'data' ne mettra pas automatiquement le flux en pause. De plus, s’il existe des destinations de flux, l’appel de la fonction stream.pause() ne garantira pas que le flux restera en pause une fois que ces destinations seront vidées et demanderont plus de données.

Si un flux Readable est basculé en mode flux continu et qu’il n’y a pas de consommateurs disponibles pour gérer les données, ces données seront perdues. Cela peut se produire, par exemple, lorsque la méthode readable.resume() est appelée sans qu’un écouteur soit attaché à l’événement 'data', ou lorsqu’un gestionnaire d’événements 'data' est supprimé du flux.

L’ajout d’un gestionnaire d’événements 'readable' interrompt automatiquement le flux du flux, et les données doivent être consommées via readable.read(). Si le gestionnaire d’événements 'readable' est supprimé, le flux recommencera à circuler s’il existe un gestionnaire d’événements 'data'.

Trois états

Les "deux modes" de fonctionnement d'un flux Readable sont une abstraction simplifiée de la gestion d'état interne plus complexe qui se produit au sein de l'implémentation du flux Readable.

Plus précisément, à tout moment donné, chaque Readable est dans l'un des trois états possibles :

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

Lorsque readable.readableFlowing est null, aucun mécanisme de consommation des données du flux n'est fourni. Par conséquent, le flux ne générera pas de données. Dans cet état, l'attachement d'un écouteur pour l'événement 'data', l'appel de la méthode readable.pipe() ou l'appel de la méthode readable.resume() fera passer readable.readableFlowing à true, ce qui fera que le Readable commence à émettre activement des événements au fur et à mesure que les données sont générées.

L'appel de readable.pause(), readable.unpipe() ou la réception d'une contre-pression fera passer readable.readableFlowing à false, ce qui interrompra temporairement le flux d'événements, mais n'interrompra pas la génération de données. Dans cet état, l'attachement d'un écouteur pour l'événement 'data' ne fera pas passer readable.readableFlowing à true.

js
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()

pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing est maintenant false.

pass.on('data', chunk => {
  console.log(chunk.toString())
})
// readableFlowing est toujours false.
pass.write('ok') // N'émettra pas 'data'.
pass.resume() // Doit être appelé pour que le flux émette 'data'.
// readableFlowing est maintenant true.

Lorsque readable.readableFlowing est false, des données peuvent s'accumuler dans le tampon interne du flux.

Choisir un seul style d'API

L'API de flux Readable a évolué à travers plusieurs versions de Node.js et fournit plusieurs méthodes de consommation des données de flux. En général, les développeurs doivent choisir l'une des méthodes de consommation des données et ne doivent jamais utiliser plusieurs méthodes pour consommer les données d'un seul flux. Plus précisément, l'utilisation d'une combinaison de on('data'), on('readable'), pipe() ou d'itérateurs asynchrones pourrait conduire à un comportement non intuitif.

Classe : stream.Readable

Ajouté dans : v0.9.4

Événement : 'close'

[Historique]

VersionModifications
v10.0.0Ajout de l’option emitClose pour spécifier si 'close' est émis lors de la destruction.
v0.9.4Ajouté dans : v0.9.4

L’événement 'close' est émis lorsque le flux et toutes ses ressources sous-jacentes (un descripteur de fichier, par exemple) ont été fermés. L’événement indique qu’aucun autre événement ne sera émis et qu’aucun autre calcul n’aura lieu.

Un flux Readable émettra toujours l’événement 'close' s’il est créé avec l’option emitClose.

Événement : 'data'

Ajouté dans : v0.9.4

  • chunk <Buffer> | <string> | <any> Le bloc de données. Pour les flux qui ne fonctionnent pas en mode objet, le bloc sera une chaîne ou un Buffer. Pour les flux qui sont en mode objet, le bloc peut être n’importe quelle valeur JavaScript autre que null.

L’événement 'data' est émis chaque fois que le flux cède la propriété d’un bloc de données à un consommateur. Cela peut se produire chaque fois que le flux est commuté en mode fluide en appelant readable.pipe(), readable.resume() ou en attachant une fonction de rappel d’écouteur à l’événement 'data'. L’événement 'data' sera également émis chaque fois que la méthode readable.read() est appelée et qu’un bloc de données est disponible pour être retourné.

L’attache d’un écouteur d’événements 'data' à un flux qui n’a pas été explicitement mis en pause commutera le flux en mode fluide. Les données seront alors transmises dès qu’elles seront disponibles.

La fonction de rappel d’écouteur se verra transmettre le bloc de données sous forme de chaîne si un codage par défaut a été spécifié pour le flux à l’aide de la méthode readable.setEncoding(); sinon, les données seront transmises sous forme de Buffer.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Reçu ${chunk.length} octets de données.`)
})
Événement : 'end'

Ajouté dans : v0.9.4

L’événement 'end' est émis lorsqu’il n’y a plus de données à consommer dans le flux.

L’événement 'end' ne sera pas émis tant que les données ne seront pas complètement consommées. Cela peut être réalisé en faisant passer le flux en mode flux continu, ou en appelant stream.read() à plusieurs reprises jusqu’à ce que toutes les données soient consommées.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Reçu ${chunk.length} octets de données.`)
})
readable.on('end', () => {
  console.log('Il n’y aura plus de données.')
})
Événement : 'error'

Ajouté dans : v0.9.4

L’événement 'error' peut être émis par une implémentation Readable à tout moment. En général, cela peut se produire si le flux sous-jacent n’est pas en mesure de générer des données en raison d’une défaillance interne sous-jacente ou lorsqu’une implémentation de flux tente de transmettre un bloc de données non valide.

Le rappel du listener recevra un seul objet Error.

Événement : 'pause'

Ajouté dans : v0.9.4

L’événement 'pause' est émis lorsque stream.pause() est appelé et que readableFlowing n’est pas false.

Événement : 'readable'

[Historique]

VersionModifications
v10.0.0'readable' est toujours émis dans le prochain tick après l’appel de .push().
v10.0.0L’utilisation de 'readable' nécessite l’appel de .read().
v0.9.4Ajouté dans : v0.9.4

L’événement 'readable' est émis lorsqu’il y a des données disponibles à lire à partir du flux, jusqu’à la limite maximale configurée (state.highWaterMark). En fait, il indique que le flux contient de nouvelles informations dans la mémoire tampon. Si des données sont disponibles dans cette mémoire tampon, stream.read() peut être appelé pour récupérer ces données. De plus, l’événement 'readable' peut également être émis lorsque la fin du flux a été atteinte.

js
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
  // Il y a maintenant des données à lire.
  let data

  while ((data = this.read()) !== null) {
    console.log(data)
  }
})

Si la fin du flux a été atteinte, l’appel de stream.read() renverra null et déclenchera l’événement 'end'. Cela est également vrai s’il n’y a jamais eu de données à lire. Par exemple, dans l’exemple suivant, foo.txt est un fichier vide :

js
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
  console.log('end')
})

Le résultat de l’exécution de ce script est :

bash
$ node test.js
readable: null
end

Dans certains cas, la connexion d’un listener pour l’événement 'readable' entraînera la lecture d’une certaine quantité de données dans une mémoire tampon interne.

En général, les mécanismes readable.pipe() et d’événement 'data' sont plus faciles à comprendre que l’événement 'readable'. Cependant, la gestion de 'readable' pourrait entraîner une augmentation du débit.

Si 'readable' et 'data' sont utilisés en même temps, 'readable' est prioritaire dans le contrôle du flux, c’est-à-dire que 'data' ne sera émis que lorsque stream.read() sera appelé. La propriété readableFlowing deviendrait false. S’il existe des listeners 'data' lorsque 'readable' est supprimé, le flux commencera à circuler, c’est-à-dire que les événements 'data' seront émis sans appeler .resume().

Événement : 'resume'

Ajouté dans : v0.9.4

L’événement 'resume' est émis lorsque stream.resume() est appelé et que readableFlowing n’est pas true.

readable.destroy([error])

[Historique]

VersionModifications
v14.0.0Fonctionne comme une opération sans effet sur un flux déjà détruit.
v8.0.0Ajouté dans : v8.0.0
  • error <Error> Erreur qui sera passée comme charge utile dans l’événement 'error'
  • Retourne : <this>

Détruit le flux. Émet éventuellement un événement 'error', et émet un événement 'close' (sauf si emitClose est défini sur false). Après cet appel, le flux lisible libérera toutes les ressources internes et les appels suivants à push() seront ignorés.

Une fois que destroy() a été appelé, tous les appels ultérieurs seront une opération sans effet et aucune autre erreur que celles de _destroy() ne pourra être émise comme 'error'.

Les implémenteurs ne doivent pas remplacer cette méthode, mais plutôt implémenter readable._destroy().

readable.closed

Ajouté dans : v18.0.0

Est true après l’émission de 'close'.

readable.destroyed

Ajouté dans : v8.0.0

Est true après l’appel de readable.destroy().

readable.isPaused()

Ajouté dans : v0.11.14

La méthode readable.isPaused() renvoie l’état de fonctionnement actuel du Readable. Ceci est principalement utilisé par le mécanisme qui sous-tend la méthode readable.pipe(). Dans la plupart des cas typiques, il n’y aura aucune raison d’utiliser cette méthode directement.

js
const readable = new stream.Readable()

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()

Ajouté dans : v0.9.4

La méthode readable.pause() amènera un flux en mode fluide à arrêter l'émission d'événements 'data', ce qui le fera sortir du mode fluide. Toutes les données qui deviennent disponibles resteront dans le tampon interne.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Reçu ${chunk.length} octets de données.`)
  readable.pause()
  console.log('Il n’y aura pas de données supplémentaires pendant 1 seconde.')
  setTimeout(() => {
    console.log('Maintenant, les données vont recommencer à circuler.')
    readable.resume()
  }, 1000)
})

La méthode readable.pause() n’a aucun effet s’il existe un écouteur d’événement 'readable'.

readable.pipe(destination[, options])

Ajouté dans : v0.9.4

  • destination <stream.Writable> La destination pour l’écriture des données

  • options <Object> Options de pipe

    • end <boolean> Mettre fin à l’écriture lorsque la lecture est terminée. Par défaut : true.
  • Retourne : <stream.Writable> La destination, ce qui permet une chaîne de pipes si elle s’agit d’un flux Duplex ou d’un flux Transform

La méthode readable.pipe() rattache un flux Writable au readable, ce qui l’amène à basculer automatiquement en mode fluide et à envoyer toutes ses données vers le Writable rattaché. Le flux de données sera géré automatiquement afin que le flux Writable de destination ne soit pas submergé par un flux Readable plus rapide.

L’exemple suivant dirige toutes les données du readable vers un fichier nommé file.txt :

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Toutes les données de readable sont dirigées vers 'file.txt'.
readable.pipe(writable)

Il est possible de rattacher plusieurs flux Writable à un seul flux Readable.

La méthode readable.pipe() retourne une référence vers le flux de destination, ce qui permet de configurer des chaînes de flux mis en pipeline :

js
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)

Par défaut, stream.end() est appelé sur le flux Writable de destination lorsque le flux Readable source émet 'end', de sorte que la destination n’est plus accessible en écriture. Pour désactiver ce comportement par défaut, l’option end peut être passée avec la valeur false, ce qui a pour effet de laisser le flux de destination ouvert :

js
reader.pipe(writer, { end: false })
reader.on('end', () => {
  writer.end('Au revoir\n')
})

Un avertissement important est que si le flux Readable émet une erreur durant le traitement, la destination Writable n’est pas fermée automatiquement. En cas d’erreur, il sera nécessaire de fermer manuellement chaque flux afin d’éviter les fuites de mémoire.

Les flux Writable process.stderr et process.stdout ne sont jamais fermés tant que le processus Node.js n’est pas terminé, quelles que soient les options spécifiées.

readable.read([size])

Ajouté dans : v0.9.4

La méthode readable.read() lit les données du tampon interne et les renvoie. Si aucune donnée n’est disponible pour être lue, null est retourné. Par défaut, les données sont retournées sous forme d’objet Buffer, sauf si un encodage a été spécifié à l’aide de la méthode readable.setEncoding() ou si le flux fonctionne en mode objet.

L’argument optionnel size spécifie un nombre spécifique d’octets à lire. Si size octets ne sont pas disponibles pour être lus, null sera retourné sauf si le flux s’est terminé, auquel cas toutes les données restantes dans le tampon interne seront retournées.

Si l’argument size n’est pas spécifié, toutes les données contenues dans le tampon interne seront retournées.

L’argument size doit être inférieur ou égal à 1 Gio.

La méthode readable.read() doit être appelée uniquement sur les flux Readable fonctionnant en mode pause. En mode flux, readable.read() est appelé automatiquement jusqu’à ce que le tampon interne soit entièrement vidé.

js
const readable = getReadableStreamSomehow()

// 'readable' peut être déclenché plusieurs fois car les données sont mises en mémoire tampon
readable.on('readable', () => {
  let chunk
  console.log('Le flux est lisible (nouvelles données reçues dans le tampon)')
  // Utiliser une boucle pour s'assurer de lire toutes les données actuellement disponibles
  while (null !== (chunk = readable.read())) {
    console.log(`Lecture de ${chunk.length} octets de données...`)
  }
})

// 'end' sera déclenché une fois lorsqu'il n'y aura plus de données disponibles
readable.on('end', () => {
  console.log('Fin du flux atteinte.')
})

Chaque appel à readable.read() renvoie un bloc de données ou null, ce qui signifie qu’il n’y a plus de données à lire à ce moment-là. Ces blocs ne sont pas automatiquement concaténés. Étant donné qu’un seul appel à read() ne renvoie pas toutes les données, l’utilisation d’une boucle while peut être nécessaire pour lire continuellement les blocs jusqu’à ce que toutes les données soient récupérées. Lors de la lecture d’un fichier volumineux, .read() peut renvoyer temporairement null, ce qui indique qu’il a consommé tout le contenu mis en mémoire tampon, mais qu’il peut y avoir d’autres données à mettre en mémoire tampon. Dans de tels cas, un nouvel événement 'readable' est émis une fois qu’il y a plus de données dans le tampon, et l’événement 'end' signifie la fin de la transmission des données.

Par conséquent, pour lire l’intégralité du contenu d’un fichier à partir d’un readable, il est nécessaire de collecter des blocs sur plusieurs événements 'readable' :

js
const chunks = []

readable.on('readable', () => {
  let chunk
  while (null !== (chunk = readable.read())) {
    chunks.push(chunk)
  }
})

readable.on('end', () => {
  const content = chunks.join('')
})

Un flux Readable en mode objet renverra toujours un seul élément d’un appel à readable.read(size), quelle que soit la valeur de l’argument size.

Si la méthode readable.read() renvoie un bloc de données, un événement 'data' sera également émis.

L’appel à stream.read([size]) après l’émission de l’événement 'end' renverra null. Aucune erreur d’exécution ne sera levée.

readable.readable

Ajouté dans : v11.4.0

Est true s'il est sûr d'appeler readable.read(), ce qui signifie que le flux n'a pas été détruit ou n'a pas émis 'error' ou 'end'.

readable.readableAborted

Ajouté dans : v16.8.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité: 1 - Expérimental

Renvoie si le flux a été détruit ou a généré une erreur avant d'émettre 'end'.

readable.readableDidRead

Ajouté dans : v16.7.0, v14.18.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité: 1 - Expérimental

Renvoie si 'data' a été émis.

readable.readableEncoding

Ajouté dans : v12.7.0

Getter pour la propriété encoding d'un flux Readable donné. La propriété encoding peut être définie en utilisant la méthode readable.setEncoding().

readable.readableEnded

Ajouté dans : v12.9.0

Devient true lorsque l'événement 'end' est émis.

readable.errored

Ajouté dans : v18.0.0

Renvoie une erreur si le flux a été détruit avec une erreur.

readable.readableFlowing

Ajouté dans : v9.4.0

Cette propriété reflète l'état actuel d'un flux Readable tel que décrit dans la section Trois états.

readable.readableHighWaterMark

Ajouté dans : v9.3.0

Retourne la valeur de highWaterMark passée lors de la création de ce Readable.

readable.readableLength

Ajouté dans : v9.4.0

Cette propriété contient le nombre d’octets (ou d’objets) dans la file d’attente prêts à être lus. La valeur fournit des données d’introspection concernant l’état du highWaterMark.

readable.readableObjectMode

Ajouté dans : v12.3.0

Getter pour la propriété objectMode d’un flux Readable donné.

readable.resume()

[Historique]

VersionModifications
v10.0.0resume() n’a aucun effet s’il y a un écouteur d’événement 'readable'.
v0.9.4Ajouté dans : v0.9.4

La méthode readable.resume() amène un flux Readable explicitement mis en pause à reprendre l’émission d’événements 'data', en basculant le flux en mode de flux.

La méthode readable.resume() peut être utilisée pour consommer entièrement les données d’un flux sans réellement traiter aucune de ces données :

js
getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Atteint la fin, mais n’a rien lu.')
  })

La méthode readable.resume() n’a aucun effet s’il y a un écouteur d’événement 'readable'.

readable.setEncoding(encoding)

Ajouté dans : v0.9.4

La méthode readable.setEncoding() définit l’encodage de caractères pour les données lues à partir du flux Readable.

Par défaut, aucun encodage n’est attribué et les données du flux sont retournées sous forme d’objets Buffer. La définition d’un encodage entraîne le retour des données du flux sous forme de chaînes de l’encodage spécifié plutôt que d’objets Buffer. Par exemple, l’appel de readable.setEncoding('utf8') entraînera l’interprétation des données de sortie comme des données UTF-8 et leur transmission sous forme de chaînes. L’appel de readable.setEncoding('hex') entraînera l’encodage des données au format de chaîne hexadécimal.

Le flux Readable gérera correctement les caractères multi-octets délivrés par le flux qui deviendraient autrement incorrectement décodés s’ils étaient simplement extraits du flux sous forme d’objets Buffer.

js
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
  assert.equal(typeof chunk, 'string')
  console.log('Obtenu %d caractères de données de chaîne :', chunk.length)
})
readable.unpipe([destination])

Ajouté dans : v0.9.4

La méthode readable.unpipe() détache un flux Writable précédemment attaché en utilisant la méthode stream.pipe().

Si destination n’est pas spécifié, alors tous les pipes sont détachés.

Si destination est spécifié, mais qu’aucun pipe n’est configuré pour celui-ci, alors la méthode ne fait rien.

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Toutes les données de readable vont dans 'file.txt',
// mais seulement pendant la première seconde.
readable.pipe(writable)
setTimeout(() => {
  console.log('Arrêter d’écrire dans file.txt.')
  readable.unpipe(writable)
  console.log('Fermer manuellement le flux de fichier.')
  writable.end()
}, 1000)
readable.unshift(chunk[, encoding])

[Historique]

VersionModifications
v22.0.0, v20.13.0L’argument chunk peut maintenant être une instance TypedArray ou DataView.
v8.0.0L’argument chunk peut maintenant être une instance Uint8Array.
v0.9.11Ajouté dans : v0.9.11

Le fait de transmettre chunk comme null signale la fin du flux (EOF) et se comporte de la même manière que readable.push(null), après quoi plus aucune donnée ne peut être écrite. Le signal EOF est placé à la fin du tampon et toutes les données mises en mémoire tampon seront toujours vidées.

La méthode readable.unshift() repousse un bloc de données dans le tampon interne. Ceci est utile dans certaines situations où un flux est consommé par un code qui a besoin de « ne pas consommer » une certaine quantité de données qu’il a extraites de manière optimiste de la source, afin que les données puissent être transmises à une autre partie.

La méthode stream.unshift(chunk) ne peut pas être appelée après que l’événement 'end' a été émis, sinon une erreur d’exécution sera levée.

Les développeurs qui utilisent stream.unshift() devraient souvent envisager de passer à l’utilisation d’un flux Transform. Consultez la section API pour les implémenteurs de flux pour plus d’informations.

js
// Extraire un en-tête délimité par \n\n.
// Utiliser unshift() si nous en obtenons trop.
// Appeler le rappel avec (erreur, en-tête, flux).
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
  stream.on('error', callback)
  stream.on('readable', onReadable)
  const decoder = new StringDecoder('utf8')
  let header = ''
  function onReadable() {
    let chunk
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk)
      if (str.includes('\n\n')) {
        // En-tête trouvée.
        const split = str.split(/\n\n/)
        header += split.shift()
        const remaining = split.join('\n\n')
        const buf = Buffer.from(remaining, 'utf8')
        stream.removeListener('error', callback)
        // Supprimer l’écouteur 'readable' avant de faire unshift.
        stream.removeListener('readable', onReadable)
        if (buf.length) stream.unshift(buf)
        // Maintenant, le corps du message peut être lu depuis le flux.
        callback(null, header, stream)
        return
      }
      // Lecture de l’en-tête en cours.
      header += str
    }
  }
}

Contrairement à stream.push(chunk), stream.unshift(chunk) ne terminera pas le processus de lecture en réinitialisant l’état de lecture interne du flux. Cela peut provoquer des résultats inattendus si readable.unshift() est appelé pendant une lecture (c’est-à-dire à partir d’une implémentation de stream._read() sur un flux personnalisé). Suivre l’appel à readable.unshift() avec un stream.push('') immédiat réinitialisera l’état de lecture de manière appropriée, cependant, il est préférable d’éviter d’appeler readable.unshift() pendant le processus d’exécution d’une lecture.

readable.wrap(stream)

Ajouté dans : v0.9.4

Avant Node.js 0.10, les flux n'implémentaient pas l'intégralité de l'API du module node:stream telle qu'elle est définie actuellement. (Voir Compatibilité pour plus d'informations).

Lors de l'utilisation d'une ancienne bibliothèque Node.js qui émet des événements 'data' et qui possède une méthode stream.pause() qui n'est qu'informative, la méthode readable.wrap() peut être utilisée pour créer un flux Readable qui utilise l'ancien flux comme source de données.

Il sera rarement nécessaire d'utiliser readable.wrap(), mais la méthode a été fournie pour faciliter l'interaction avec les anciennes applications et bibliothèques Node.js.

js
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)

myReader.on('readable', () => {
  myReader.read() // etc.
})
readable[Symbol.asyncIterator]()

[Historique]

VersionModifications
v11.14.0La prise en charge de Symbol.asyncIterator n'est plus expérimentale.
v10.0.0Ajouté dans : v10.0.0
js
const fs = require('node:fs')

async function print(readable) {
  readable.setEncoding('utf8')
  let data = ''
  for await (const chunk of readable) {
    data += chunk
  }
  console.log(data)
}

print(fs.createReadStream('file')).catch(console.error)

Si la boucle se termine par un break, un return ou un throw, le flux sera détruit. En d'autres termes, itérer sur un flux consommera entièrement le flux. Le flux sera lu en blocs de taille égale à l'option highWaterMark. Dans l'exemple de code ci-dessus, les données seront dans un seul bloc si le fichier contient moins de 64 Kio de données, car aucune option highWaterMark n'est fournie à fs.createReadStream().

readable[Symbol.asyncDispose]()

Ajouté dans : v20.4.0, v18.18.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité: 1 - Expérimental

Appelle readable.destroy() avec une AbortError et renvoie une promesse qui se réalise lorsque le flux est terminé.

readable.compose(stream[, options])

Ajouté dans : v19.1.0, v18.13.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité: 1 - Expérimental

js
import { Readable } from 'node:stream'

async function* splitToWords(source) {
  for await (const chunk of source) {
    const words = String(chunk).split(' ')

    for (const word of words) {
      yield word
    }
  }
}

const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()

console.log(words) // affiche ['this', 'is', 'compose', 'as', 'operator']

Voir stream.compose pour plus d'informations.

readable.iterator([options])

Ajouté dans : v16.3.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité: 1 - Expérimental

  • options <Object>

    • destroyOnReturn <boolean> Lorsque défini sur false, appeler return sur l'itérateur asynchrone, ou quitter une itération for await...of en utilisant un break, return ou throw ne détruira pas le flux. Par défaut : true.
  • Retourne : <AsyncIterator> pour consommer le flux.

L'itérateur créé par cette méthode donne aux utilisateurs la possibilité d'annuler la destruction du flux si la boucle for await...of est quittée par return, break, ou throw, ou si l'itérateur doit détruire le flux si le flux a émis une erreur pendant l'itération.

js
const { Readable } = require('node:stream')

async function printIterator(readable) {
  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk) // 1
    break
  }

  console.log(readable.destroyed) // false

  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk) // Affichera 2 puis 3
  }

  console.log(readable.destroyed) // True, le flux a été totalement consommé
}

async function printSymbolAsyncIterator(readable) {
  for await (const chunk of readable) {
    console.log(chunk) // 1
    break
  }

  console.log(readable.destroyed) // true
}

async function showBoth() {
  await printIterator(Readable.from([1, 2, 3]))
  await printSymbolAsyncIterator(Readable.from([1, 2, 3]))
}

showBoth()
readable.map(fn[, options])

[Historique]

VersionModifications
v20.7.0, v18.19.0Ajout de highWaterMark dans les options.
v17.4.0, v16.14.0Ajouté dans : v17.4.0, v16.14.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité: 1 - Expérimental

  • fn <Function> | <AsyncFunction> une fonction à appliquer sur chaque morceau du flux.

    • data <any> un morceau de données provenant du flux.
    • options <Object>
    • signal <AbortSignal> abandonné si le flux est détruit permettant d’abandonner l’appel fn plus tôt.
  • options <Object>

    • concurrency <number> le nombre maximal d’appels simultanés de fn à faire sur le flux en même temps. Par défaut : 1.
    • highWaterMark <number> le nombre d’éléments à mettre en mémoire tampon en attendant la consommation par l’utilisateur des éléments mappés. Par défaut : concurrency * 2 - 1.
    • signal <AbortSignal> permet de détruire le flux si le signal est abandonné.
  • Retourne : <Readable> un flux mappé avec la fonction fn.

Cette méthode permet de faire un mapping sur le flux. La fonction fn sera appelée pour chaque morceau du flux. Si la fonction fn retourne une promesse, cette promesse sera attendue avant d’être transmise au flux de résultat.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// Avec un mapper synchrone.
for await (const chunk of Readable.from([1, 2, 3, 4]).map(x => x * 2)) {
  console.log(chunk) // 2, 4, 6, 8
}
// Avec un mapper asynchrone, effectuant au maximum 2 requêtes à la fois.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
  domain => resolver.resolve4(domain),
  { concurrency: 2 }
)
for await (const result of dnsResults) {
  console.log(result) // Affiche le résultat DNS de resolver.resolve4.
}
readable.filter(fn[, options])

[Historique]

VersionModifications
v20.7.0, v18.19.0ajout de highWaterMark dans les options.
v17.4.0, v16.14.0Ajouté dans : v17.4.0, v16.14.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité: 1 - Expérimental

  • fn <Function> | <AsyncFunction> une fonction pour filtrer les blocs du flux.

    • data <any> un bloc de données du flux.
    • options <Object>
    • signal <AbortSignal> annulé si le flux est détruit, ce qui permet d'annuler l'appel de fn plus tôt.
  • options <Object>

    • concurrency <number> le nombre maximal d'invocations simultanées de fn à appeler sur le flux à la fois. Par défaut : 1.
    • highWaterMark <number> combien d'éléments mettre en mémoire tampon en attendant la consommation par l'utilisateur des éléments filtrés. Par défaut : concurrency * 2 - 1.
    • signal <AbortSignal> permet de détruire le flux si le signal est annulé.
  • Retourne : <Readable> un flux filtré avec le prédicat fn.

Cette méthode permet de filtrer le flux. Pour chaque bloc du flux, la fonction fn sera appelée et si elle retourne une valeur « truthy », le bloc sera passé au flux résultant. Si la fonction fn retourne une promesse, cette promesse sera « awaitée ».

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// Avec un prédicat synchrone.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
  console.log(chunk) // 3, 4
}
// Avec un prédicat asynchrone, effectuant au plus 2 requêtes à la fois.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).filter(
  async domain => {
    const { address } = await resolver.resolve4(domain, { ttl: true })
    return address.ttl > 60
  },
  { concurrency: 2 }
)
for await (const result of dnsResults) {
  // Affiche les domaines avec plus de 60 secondes sur l'enregistrement DNS résolu.
  console.log(result)
}
readable.forEach(fn[, options])

Ajouté dans : v17.5.0, v16.15.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité: 1 - Expérimental

  • fn <Fonction> | <AsyncFunction> une fonction à appeler sur chaque bloc du flux.

    • data <any> un bloc de données provenant du flux.
    • options <Object>
    • signal <AbortSignal> abandonné si le flux est détruit, ce qui permet d'abandonner l'appel de fn plus tôt.
  • options <Object>

    • concurrency <number> le nombre maximal d'invocations simultanées de fn à appeler sur le flux en une seule fois. Par défaut : 1.
    • signal <AbortSignal> permet de détruire le flux si le signal est abandonné.
  • Retourne : <Promise> une promesse indiquant que le flux est terminé.

Cette méthode permet d’itérer dans un flux. Pour chaque bloc du flux, la fonction fn est appelée. Si la fonction fn retourne une promesse, cette promesse sera attendue.

Cette méthode diffère des boucles for await...of en ce qu’elle peut éventuellement traiter les blocs simultanément. De plus, une itération forEach ne peut être arrêtée qu’en ayant passé une option signal et en abandonnant le AbortController associé, tandis que for await...of peut être arrêtée avec break ou return. Dans les deux cas, le flux est détruit.

Cette méthode diffère de l’écoute de l’événement 'data' en ce qu’elle utilise l’événement readable dans le mécanisme sous-jacent et peut limiter le nombre d’appels fn simultanés.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// Avec un prédicat synchrone.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
  console.log(chunk) // 3, 4
}
// Avec un prédicat asynchrone, effectuant au plus 2 requêtes à la fois.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
  async domain => {
    const { address } = await resolver.resolve4(domain, { ttl: true })
    return address
  },
  { concurrency: 2 }
)
await dnsResults.forEach(result => {
  // Affiche le résultat, similaire à `for await (const result of dnsResults)`
  console.log(result)
})
console.log('done') // Le flux est terminé
readable.toArray([options])

Ajouté dans : v17.5.0, v16.15.0

[Stable : 1 - Expérimental]

Stable : 1 Stabilité : 1 - Expérimental

  • options <Object>

    • signal <AbortSignal> permet d'annuler l'opération toArray si le signal est abandonné.
  • Retourne : <Promise> une promesse contenant un tableau avec le contenu du flux.

Cette méthode permet d'obtenir facilement le contenu d'un flux.

Comme cette méthode lit l'ensemble du flux en mémoire, elle annule les avantages des flux. Elle est destinée à l'interopérabilité et à la commodité, et non comme moyen principal de consommer des flux.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

await Readable.from([1, 2, 3, 4]).toArray() // [1, 2, 3, 4]

// Effectuer des requêtes DNS simultanément à l'aide de .map et collecter
// les résultats dans un tableau à l'aide de toArray
const dnsResults = await Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org'])
  .map(
    async domain => {
      const { address } = await resolver.resolve4(domain, { ttl: true })
      return address
    },
    { concurrency: 2 }
  )
  .toArray()
readable.some(fn[, options])

Ajouté dans : v17.5.0, v16.15.0

[Stable : 1 - Expérimental]

Stable : 1 Stabilité : 1 - Expérimental

  • fn <Function> | <AsyncFunction> une fonction à appeler sur chaque bloc du flux.

    • data <any> un bloc de données du flux.
    • options <Object>
    • signal <AbortSignal> abandonné si le flux est détruit, ce qui permet d'interrompre l'appel de fn plus tôt.
  • options <Object>

    • concurrency <number> le nombre maximal d'appels simultanés de fn à appeler sur le flux à la fois. Par défaut : 1.
    • signal <AbortSignal> permet de détruire le flux si le signal est abandonné.
  • Retourne : <Promise> une promesse évaluant à true si fn a retourné une valeur truthy pour au moins l'un des blocs.

Cette méthode est similaire à Array.prototype.some et appelle fn sur chaque bloc du flux jusqu'à ce que la valeur de retour attendue soit true (ou toute valeur truthy). Une fois qu'un appel de fn sur un bloc a une valeur de retour attendue truthy, le flux est détruit et la promesse est remplie avec true. Si aucun des appels de fn sur les blocs ne retourne de valeur truthy, la promesse est remplie avec false.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// Avec un prédicat synchrone.
await Readable.from([1, 2, 3, 4]).some(x => x > 2) // true
await Readable.from([1, 2, 3, 4]).some(x => x < 0) // false

// Avec un prédicat asynchrone, effectuant au maximum 2 vérifications de fichiers à la fois.
const anyBigFile = await Readable.from(['file1', 'file2', 'file3']).some(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
console.log(anyBigFile) // `true` si un fichier de la liste est plus grand que 1 Mo
console.log('done') // Le flux est terminé
readable.find(fn[, options])

Ajouté dans : v17.5.0, v16.17.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité : 1 - Expérimental

  • fn <Function> | <AsyncFunction> une fonction à appeler sur chaque chunk du flux.

    • data <any> un chunk de données provenant du flux.
    • options <Object>
    • signal <AbortSignal> interrompu si le flux est détruit, ce qui permet d'interrompre l'appel fn plus tôt.
  • options <Object>

    • concurrency <number> le nombre maximal d'invocations simultanées de fn à appeler sur le flux à la fois. Par défaut : 1.
    • signal <AbortSignal> permet de détruire le flux si le signal est interrompu.
  • Retourne : <Promise> une promesse évaluant le premier chunk pour lequel fn a évalué avec une valeur truthy, ou undefined si aucun élément n'a été trouvé.

Cette méthode est similaire à Array.prototype.find et appelle fn sur chaque chunk du flux pour trouver un chunk avec une valeur truthy pour fn. Une fois que la valeur de retour attendue d'un appel fn est truthy, le flux est détruit et la promesse est résolue avec la valeur pour laquelle fn a retourné une valeur truthy. Si tous les appels fn sur les chunks renvoient une valeur falsy, la promesse est résolue avec undefined.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// Avec un prédicat synchrone.
await Readable.from([1, 2, 3, 4]).find(x => x > 2) // 3
await Readable.from([1, 2, 3, 4]).find(x => x > 0) // 1
await Readable.from([1, 2, 3, 4]).find(x => x > 10) // undefined

// Avec un prédicat asynchrone, effectuant au maximum 2 vérifications de fichiers à la fois.
const foundBigFile = await Readable.from(['file1', 'file2', 'file3']).find(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
console.log(foundBigFile) // Nom du fichier volumineux, si un fichier de la liste est supérieur à 1 Mo
console.log('done') // Le flux est terminé
readable.every(fn[, options])

Ajouté dans : v17.5.0, v16.15.0

[Stable: 1 - Expérimental]

Stable : 1 Stabilité : 1 - Expérimental

  • fn <Function> | <AsyncFunction> une fonction à appeler sur chaque bloc du flux.

    • data <any> un bloc de données du flux.
    • options <Object>
    • signal <AbortSignal> abandonné si le flux est détruit permettant d’abandonner l’appel fn plus tôt.
  • options <Object>

    • concurrency <number> le nombre maximal d’appels simultanés de fn à effectuer sur le flux en une seule fois. Par défaut : 1.
    • signal <AbortSignal> permet de détruire le flux si le signal est abandonné.
  • Retourne : <Promise> une promesse dont l’évaluation est true si fn a renvoyé une valeur « truthy » pour tous les blocs.

Cette méthode est similaire à Array.prototype.every et appelle fn sur chaque bloc du flux pour vérifier si toutes les valeurs de retour attendues sont des valeurs « truthy » pour fn. Une fois qu’un appel fn sur une valeur de retour attendue de bloc est « falsy », le flux est détruit et la promesse est réalisée avec false. Si tous les appels fn sur les blocs renvoient une valeur « truthy », la promesse est réalisée avec true.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// Avec un prédicat synchrone.
await Readable.from([1, 2, 3, 4]).every(x => x > 2) // false
await Readable.from([1, 2, 3, 4]).every(x => x > 0) // true

// Avec un prédicat asynchrone, effectuant au maximum 2 vérifications de fichier à la fois.
const allBigFiles = await Readable.from(['fichier1', 'fichier2', 'fichier3']).every(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
// `true` si tous les fichiers de la liste sont supérieurs à 1 Mio
console.log(allBigFiles)
console.log('terminé') // Le flux est terminé
readable.flatMap(fn[, options])

Ajouté dans : v17.5.0, v16.15.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité: 1 - Expérimental

Cette méthode retourne un nouveau flux en appliquant le rappel donné à chaque morceau du flux, puis en aplatissant le résultat.

Il est possible de retourner un flux ou un autre itérable ou itérable asynchrone à partir de fn et les flux de résultat seront fusionnés (aplatis) dans le flux retourné.

js
import { Readable } from 'node:stream'
import { createReadStream } from 'node:fs'

// Avec un mapper synchrone.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap(x => [x, x])) {
  console.log(chunk) // 1, 1, 2, 2, 3, 3, 4, 4
}
// Avec un mapper asynchrone, combine le contenu de 4 fichiers
const concatResult = Readable.from(['./1.mjs', './2.mjs', './3.mjs', './4.mjs']).flatMap(fileName =>
  createReadStream(fileName)
)
for await (const result of concatResult) {
  // Cela contiendra le contenu (tous les morceaux) des 4 fichiers
  console.log(result)
}
readable.drop(limit[, options])

Ajouté dans : v17.5.0, v16.15.0

[Stable : 1 - Expérimental]

Stable : 1 Stabilité : 1 - Expérimental

  • limit <number> le nombre de blocs à supprimer du flux lisible.

  • options <Object>

    • signal <AbortSignal> permet de détruire le flux si le signal est interrompu.
  • Retourne : <Readable> un flux avec limit blocs supprimés.

Cette méthode retourne un nouveau flux avec les limit premiers blocs supprimés.

js
import { Readable } from 'node:stream'

await Readable.from([1, 2, 3, 4]).drop(2).toArray() // [3, 4]
readable.take(limit[, options])

Ajouté dans : v17.5.0, v16.15.0

[Stable : 1 - Expérimental]

Stable : 1 Stabilité : 1 - Expérimental

  • limit <number> le nombre de blocs à prendre du flux lisible.

  • options <Object>

    • signal <AbortSignal> permet de détruire le flux si le signal est interrompu.
  • Retourne : <Readable> un flux avec limit blocs pris.

Cette méthode retourne un nouveau flux avec les limit premiers blocs.

js
import { Readable } from 'node:stream'

await Readable.from([1, 2, 3, 4]).take(2).toArray() // [1, 2]
readable.reduce(fn[, initial[, options]])

Ajouté dans : v17.5.0, v16.15.0

[Stable : 1 - Expérimental]

Stable : 1 Stabilité : 1 - Expérimental

  • fn <Function> | <AsyncFunction> une fonction réductrice à appeler sur chaque bloc du flux.

    • previous <any> la valeur obtenue à partir du dernier appel à fn ou la valeur initial si elle est spécifiée ou le premier bloc du flux sinon.
    • data <any> un bloc de données provenant du flux.
    • options <Object>
    • signal <AbortSignal> abandonné si le flux est détruit, permettant d’abandonner l’appel à fn plus tôt.
  • initial <any> la valeur initiale à utiliser dans la réduction.

  • options <Object>

    • signal <AbortSignal> permet de détruire le flux si le signal est interrompu.
  • Retourne : <Promise> une promesse pour la valeur finale de la réduction.

Cette méthode appelle fn sur chaque bloc du flux dans l’ordre, en lui passant le résultat du calcul sur l’élément précédent. Elle retourne une promesse pour la valeur finale de la réduction.

Si aucune valeur initial n’est fournie, le premier bloc du flux est utilisé comme valeur initiale. Si le flux est vide, la promesse est rejetée avec un TypeError avec la propriété de code ERR_INVALID_ARGS.

js
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'

const directoryPath = './src'
const filesInDir = await readdir(directoryPath)

const folderSize = await Readable.from(filesInDir).reduce(async (totalSize, file) => {
  const { size } = await stat(join(directoryPath, file))
  return totalSize + size
}, 0)

console.log(folderSize)

La fonction réductrice itère le flux élément par élément, ce qui signifie qu’il n’y a pas de paramètre de concurrency ou de parallélisme. Pour effectuer une reduce simultanément, vous pouvez extraire la fonction asynchrone vers la méthode readable.map.

js
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'

const directoryPath = './src'
const filesInDir = await readdir(directoryPath)

const folderSize = await Readable.from(filesInDir)
  .map(file => stat(join(directoryPath, file)), { concurrency: 2 })
  .reduce((totalSize, { size }) => totalSize + size, 0)

console.log(folderSize)

Flux duplex et flux de transformation

Classe : stream.Duplex

[Historique]

VersionModifications
v6.8.0Les instances de Duplex renvoient désormais true lors de la vérification instanceof stream.Writable.
v0.9.4Ajouté dans : v0.9.4

Les flux duplex sont des flux qui implémentent à la fois les interfaces Readable et Writable.

Voici des exemples de flux Duplex :

duplex.allowHalfOpen

Ajouté dans : v0.9.4

Si false, le flux mettra automatiquement fin au côté accessible en écriture lorsque le côté accessible en lecture se termine. Définie initialement par l’option de constructeur allowHalfOpen, qui est par défaut à true.

Cela peut être changé manuellement pour modifier le comportement semi-ouvert d’une instance de flux Duplex existante, mais doit être changé avant que l’événement 'end' soit émis.

Classe : stream.Transform

Ajouté dans : v0.9.4

Les flux de transformation sont des flux Duplex où la sortie est liée d’une manière ou d’une autre à l’entrée. Comme tous les flux Duplex, les flux Transform implémentent à la fois les interfaces Readable et Writable.

Voici des exemples de flux Transform :

transform.destroy([error])

[Historique]

VersionModifications
v14.0.0Fonctionne comme une opération sans effet sur un flux qui a déjà été détruit.
v8.0.0Ajouté dans : v8.0.0

Détruit le flux et émet éventuellement un événement 'error'. Après cet appel, le flux de transformation libère toutes les ressources internes. Les implémenteurs ne doivent pas remplacer cette méthode, mais plutôt implémenter readable._destroy(). L’implémentation par défaut de _destroy() pour Transform émet également 'close' sauf si emitClose est défini sur false.

Une fois que destroy() a été appelé, tout appel ultérieur sera sans effet et aucune autre erreur, sauf provenant de _destroy(), ne peut être émise en tant que 'error'.

stream.duplexPair([options])

Ajouté dans la version : v22.6.0, v20.17.0

  • options <Objet> Une valeur à transmettre aux deux constructeurs Duplex, pour définir des options telles que la mise en mémoire tampon.
  • Retourne : <Tableau> de deux instances Duplex.

La fonction utilitaire duplexPair retourne un tableau avec deux éléments, chacun étant un flux Duplex connecté à l’autre côté :

js
const [sideA, sideB] = duplexPair()

Tout ce qui est écrit dans un flux est rendu lisible sur l’autre. Il fournit un comportement analogue à une connexion réseau, où les données écrites par le client deviennent lisibles par le serveur, et vice-versa.

Les flux Duplex sont symétriques ; l’un ou l’autre peut être utilisé sans aucune différence de comportement.

stream.finished(stream[, options], callback)

[Historique]

VersionModifications
v19.5.0Ajout de la prise en charge de ReadableStream et WritableStream.
v15.11.0L’option signal a été ajoutée.
v14.0.0La fonction finished(stream, cb) attendra l’événement 'close' avant d’invoquer le rappel. L’implémentation essaie de détecter les flux hérités et d’appliquer ce comportement uniquement aux flux qui sont censés émettre 'close'.
v14.0.0L’émission de 'close' avant 'end' sur un flux Readable provoquera une erreur ERR_STREAM_PREMATURE_CLOSE.
v14.0.0Le rappel sera invoqué sur les flux qui ont déjà été terminés avant l’appel à finished(stream, cb).
v10.0.0Ajouté dans la version : v10.0.0
  • stream <Stream> | <ReadableStream> | <WritableStream> Un flux/flux Web lisible et/ou inscriptible.
  • options <Objet>
    • error <booléen> Si la valeur est false, un appel à emit('error', err) n’est pas considéré comme terminé. Par défaut : true.
    • readable <booléen> Lorsque la valeur est false, le rappel est appelé lorsque le flux se termine, même si le flux peut encore être lisible. Par défaut : true.
    • writable <booléen> Lorsque la valeur est false, le rappel est appelé lorsque le flux se termine, même si le flux peut encore être inscriptible. Par défaut : true.
    • signal <AbortSignal> permet d’abandonner l’attente de la fin du flux. Le flux sous-jacent ne sera pas abandonné si le signal est abandonné. Le rappel sera appelé avec une AbortError. Tous les écouteurs enregistrés ajoutés par cette fonction seront également supprimés.
  • callback <Fonction> Une fonction de rappel qui prend un argument d’erreur facultatif.
  • Retourne : <Fonction> Une fonction de nettoyage qui supprime tous les écouteurs enregistrés.

Une fonction pour être averti lorsqu’un flux n’est plus lisible, inscriptible ou a rencontré une erreur ou un événement de fermeture prématurée.

js
const { finished } = require('node:stream')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

finished(rs, err => {
  if (err) {
    console.error('Le flux a échoué.', err)
  } else {
    console.log('Le flux a fini de lire.')
  }
})

rs.resume() // Purger le flux.

Particulièrement utile dans les scénarios de gestion des erreurs où un flux est détruit prématurément (comme une requête HTTP abandonnée) et n’émettra pas 'end' ou 'finish'.

L’API finished fournit une version de promesse.

stream.finished() laisse des écouteurs d’événements en suspens (en particulier 'error', 'end', 'finish' et 'close') après que callback a été invoqué. La raison en est que les événements 'error' inattendus (en raison d’implémentations de flux incorrectes) ne provoquent pas de plantages inattendus. Si ce comportement n’est pas souhaité, la fonction de nettoyage retournée doit être invoquée dans le rappel :

js
const cleanup = finished(rs, err => {
  cleanup()
  // ...
})

stream.pipeline(source[, ...transforms], destination, callback)

stream.pipeline(streams, callback)

[Historique]

VersionModifications
v19.7.0, v18.16.0Ajout de la prise en charge des webstreams.
v18.0.0Le passage d'un callback invalide à l'argument callback lève maintenant ERR_INVALID_ARG_TYPE au lieu de ERR_INVALID_CALLBACK.
v14.0.0pipeline(..., cb) attendra l'événement 'close' avant d'invoquer le callback. L'implémentation essaie de détecter les anciens flux et applique ce comportement uniquement aux flux qui sont censés émettre 'close'.
v13.10.0Ajout de la prise en charge des générateurs asynchrones.
v10.0.0Ajouté dans : v10.0.0

Une méthode de module pour connecter des flux et des générateurs, transmettre les erreurs et nettoyer correctement et fournir un callback lorsque le pipeline est terminé.

js
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')

// Utilisez l'API pipeline pour connecter facilement une série de flux
// ensemble et être notifié lorsque le pipeline est entièrement terminé.

// Un pipeline pour compresser efficacement un fichier tar potentiellement énorme :

pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
  if (err) {
    console.error('Échec du pipeline.', err)
  } else {
    console.log('Pipeline réussi.')
  }
})

L'API pipeline fournit une version promise.

stream.pipeline() appellera stream.destroy(err) sur tous les flux sauf :

  • Les flux Readable qui ont émis 'end' ou 'close'.
  • Les flux Writable qui ont émis 'finish' ou 'close'.

stream.pipeline() laisse des écouteurs d'événements en suspens sur les flux après l'invocation du callback. En cas de réutilisation de flux après un échec, cela peut provoquer des fuites d'écouteurs d'événements et des erreurs étouffées. Si le dernier flux est lisible, les écouteurs d'événements en suspens seront supprimés afin que le dernier flux puisse être consommé ultérieurement.

stream.pipeline() ferme tous les flux lorsqu'une erreur est déclenchée. L'utilisation de IncomingRequest avec pipeline peut entraîner un comportement inattendu une fois qu'il aurait détruit le socket sans envoyer la réponse attendue. Voir l'exemple ci-dessous :

js
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')

const server = http.createServer((req, res) => {
  const fileStream = fs.createReadStream('./fileNotExist.txt')
  pipeline(fileStream, res, err => {
    if (err) {
      console.log(err) // Aucun fichier de ce type
      // ce message ne peut pas être envoyé une fois que `pipeline` a déjà détruit le socket
      return res.end('erreur !!!')
    }
  })
})

stream.compose(...streams)

[Historique]

VersionModifications
v21.1.0, v20.10.0Ajout de la prise en charge de la classe stream.
v19.8.0, v18.16.0Ajout de la prise en charge des flux web.
v16.9.0Ajouté dans : v16.9.0

[Stable : 1 - Expérimental]

Stable : 1 Stabilité : 1 - stream.compose est expérimental.

Combine deux flux ou plus en un flux Duplex qui écrit dans le premier flux et lit à partir du dernier. Chaque flux fourni est dirigé vers le suivant, en utilisant stream.pipeline. Si l’un des flux génère une erreur, tous sont détruits, y compris le flux Duplex externe.

Comme stream.compose retourne un nouveau flux qui peut (et doit) à son tour être dirigé vers d’autres flux, il permet la composition. En revanche, lors de la transmission de flux à stream.pipeline, en général, le premier flux est un flux lisible et le dernier un flux accessible en écriture, formant un circuit fermé.

S’il reçoit une Function, elle doit être une méthode de fabrique prenant une source Iterable.

js
import { compose, Transform } from 'node:stream'

const removeSpaces = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''))
  },
})

async function* toUpper(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
}

let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
  res += buf
}

console.log(res) // affiche 'HELLOWORLD'

stream.compose peut être utilisé pour convertir des itérables asynchrones, des générateurs et des fonctions en flux.

  • AsyncIterable convertit en un Duplex lisible. Ne peut pas générer null.
  • AsyncGeneratorFunction convertit en un Duplex de transformation lisible/accessible en écriture. Doit prendre une AsyncIterable source comme premier paramètre. Ne peut pas générer null.
  • AsyncFunction convertit en un Duplex accessible en écriture. Doit retourner soit null, soit undefined.
js
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'

// Convertir AsyncIterable en Duplex lisible.
const s1 = compose(
  (async function* () {
    yield 'Hello'
    yield 'World'
  })()
)

// Convertir AsyncGenerator en Duplex de transformation.
const s2 = compose(async function* (source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
})

let res = ''

// Convertir AsyncFunction en Duplex accessible en écriture.
const s3 = compose(async function (source) {
  for await (const chunk of source) {
    res += chunk
  }
})

await finished(compose(s1, s2, s3))

console.log(res) // affiche 'HELLOWORLD'

Voir readable.compose(stream) pour stream.compose en tant qu’opérateur.

stream.Readable.from(iterable[, options])

Ajouté dans : v12.3.0, v10.17.0

  • iterable <Iterable> Objet implémentant le protocole itérable Symbol.asyncIterator ou Symbol.iterator. Émet un événement 'error' si une valeur null est passée.
  • options <Object> Options fournies à new stream.Readable([options]). Par défaut, Readable.from() définit options.objectMode sur true, sauf si cela est explicitement désactivé en définissant options.objectMode sur false.
  • Renvoie : <stream.Readable>

Une méthode utilitaire pour créer des flux lisibles à partir d'itérateurs.

js
const { Readable } = require('node:stream')

async function* generate() {
  yield 'hello'
  yield 'streams'
}

const readable = Readable.from(generate())

readable.on('data', chunk => {
  console.log(chunk)
})

L'appel à Readable.from(string) ou Readable.from(buffer) n'itérera pas sur les chaînes ou les tampons pour correspondre à la sémantique des autres flux pour des raisons de performance.

Si un objet Iterable contenant des promesses est passé comme argument, cela pourrait entraîner un rejet non traité.

js
const { Readable } = require('node:stream')

Readable.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Rejet non traité
])

stream.Readable.fromWeb(readableStream[, options])

Ajouté dans : v17.0.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité : 1 - Expérimental

stream.Readable.isDisturbed(stream)

Ajouté dans : v16.8.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité : 1 - Expérimental

Indique si le flux a été lu ou annulé.

stream.isErrored(stream)

Ajouté dans : v17.3.0, v16.14.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité : 1 - Expérimental

Indique si le flux a rencontré une erreur.

stream.isReadable(stream)

Ajouté dans : v17.4.0, v16.14.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité : 1 - Expérimental

Indique si le flux est lisible.

stream.Readable.toWeb(streamReadable[, options])

Ajouté dans : v17.0.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité : 1 - Expérimental

  • streamReadable <stream.Readable>

  • options <Object>

    • strategy <Object>
    • highWaterMark <number> La taille maximale de la file d’attente interne (du ReadableStream créé) avant que la contre-pression ne soit appliquée lors de la lecture à partir du stream.Readable donné. Si aucune valeur n’est fournie, elle sera prise à partir du stream.Readable donné.
    • size <Function> Une fonction qui donne la taille du bloc de données donné. Si aucune valeur n’est fournie, la taille sera 1 pour tous les blocs.
    • chunk <any>
    • Retourne : <number>
  • Retourne : <ReadableStream>

stream.Writable.fromWeb(writableStream[, options])

Ajouté dans : v17.0.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité: 1 - Expérimental

stream.Writable.toWeb(streamWritable)

Ajouté dans : v17.0.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité: 1 - Expérimental

stream.Duplex.from(src)

[Historique]

VersionModifications
v19.5.0, v18.17.0L’argument src peut désormais être un ReadableStream ou WritableStream.
v16.8.0Ajouté dans : v16.8.0

Une méthode utilitaire pour créer des flux duplex.

  • Stream convertit un flux d'écriture en Duplex accessible en écriture et un flux de lecture en Duplex.
  • Blob convertit en Duplex accessible en lecture.
  • string convertit en Duplex accessible en lecture.
  • ArrayBuffer convertit en Duplex accessible en lecture.
  • AsyncIterable convertit en Duplex accessible en lecture. Ne peut pas générer null.
  • AsyncGeneratorFunction convertit en un Duplex de transformation accessible en lecture/écriture. Doit prendre une source AsyncIterable comme premier paramètre. Ne peut pas générer null.
  • AsyncFunction convertit en Duplex accessible en écriture. Doit retourner null ou undefined.
  • Object ({ writable, readable }) convertit readable et writable en Stream, puis les combine en Duplex où le Duplex écrira dans le writable et lira à partir du readable.
  • Promise convertit en Duplex accessible en lecture. La valeur null est ignorée.
  • ReadableStream convertit en Duplex accessible en lecture.
  • WritableStream convertit en Duplex accessible en écriture.
  • Retourne : <stream.Duplex>

Si un objet Iterable contenant des promesses est transmis comme argument, cela peut entraîner un rejet non géré.

js
const { Duplex } = require('node:stream')

Duplex.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Rejet non géré
])

stream.Duplex.fromWeb(pair[, options])

Ajouté dans : v17.0.0

[Stable: 1 - Expérimental]

Stable: 1 Stabilité : 1 - Expérimental

js
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')

for await (const chunk of duplex) {
  console.log('readable', chunk)
}
js
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))

stream.Duplex.toWeb(streamDuplex)

Ajouté dans : v17.0.0

[Stable : 1 - Expérimental]

Stable : 1 Stabilité : 1 - Expérimental

js
import { Duplex } from 'node:stream'

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

const { value } = await readable.getReader().read()
console.log('readable', value)
js
const { Duplex } = require('node:stream')

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

readable
  .getReader()
  .read()
  .then(result => {
    console.log('readable', result.value)
  })

stream.addAbortSignal(signal, stream)

[Historique]

VersionModifications
v19.7.0, v18.16.0Ajout du support pour ReadableStream et WritableStream.
v15.4.0Ajouté dans : v15.4.0

Attache un AbortSignal à un flux lisible ou inscriptible. Cela permet au code de contrôler la destruction du flux en utilisant un AbortController.

L'appel de abort sur l'AbortController correspondant à l'AbortSignal transmis se comportera de la même manière que l'appel de .destroy(new AbortError()) sur le flux, et controller.error(new AbortError()) pour les flux web.

js
const fs = require('node:fs')

const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// Plus tard, interrompre l'opération en fermant le flux
controller.abort()

Ou en utilisant un AbortSignal avec un flux lisible comme itérable asynchrone :

js
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // définir un délai d'expiration
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
  try {
    for await (const chunk of stream) {
      await process(chunk)
    }
  } catch (e) {
    if (e.name === 'AbortError') {
      // L'opération a été annulée
    } else {
      throw e
    }
  }
})()

Ou en utilisant un AbortSignal avec un ReadableStream :

js
const controller = new AbortController()
const rs = new ReadableStream({
  start(controller) {
    controller.enqueue('hello')
    controller.enqueue('world')
    controller.close()
  },
})

addAbortSignal(controller.signal, rs)

finished(rs, err => {
  if (err) {
    if (err.name === 'AbortError') {
      // L'opération a été annulée
    }
  }
})

const reader = rs.getReader()

reader.read().then(({ value, done }) => {
  console.log(value) // hello
  console.log(done) // false
  controller.abort()
})

stream.getDefaultHighWaterMark(objectMode)

Ajouté dans : v19.9.0, v18.17.0

Retourne le highWaterMark par défaut utilisé par les flux. La valeur par défaut est 65536 (64 kio) ou 16 pour objectMode.

stream.setDefaultHighWaterMark(objectMode, value)

Ajouté dans : v19.9.0, v18.17.0

Définit le highWaterMark par défaut utilisé par les flux.

API pour les implémenteurs de flux

L'API du module node:stream a été conçue pour faciliter la mise en œuvre de flux en utilisant le modèle d'héritage prototypique de JavaScript.

Tout d'abord, un développeur de flux déclarerait une nouvelle classe JavaScript qui étend l'une des quatre classes de flux de base (stream.Writable, stream.Readable, stream.Duplex ou stream.Transform), en s'assurant d'appeler le constructeur de la classe parent appropriée :

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor({ highWaterMark, ...options }) {
    super({ highWaterMark })
    // ...
  }
}

Lorsque vous étendez des flux, gardez à l'esprit les options que l'utilisateur peut et doit fournir avant de les transmettre au constructeur de base. Par exemple, si l'implémentation fait des hypothèses concernant les options autoDestroy et emitClose, ne permettez pas à l'utilisateur de les remplacer. Soyez explicite sur les options qui sont transmises au lieu de transmettre implicitement toutes les options.

La nouvelle classe de flux doit ensuite implémenter une ou plusieurs méthodes spécifiques, selon le type de flux créé, comme indiqué dans le tableau ci-dessous :

Cas d'utilisationClasseMéthode(s) à implémenter
Lecture seuleReadable_read()
Écriture seuleWritable_write() , _writev() , _final()
Lecture et écritureDuplex_read() , _write() , _writev() , _final()
Traiter les données écrites, puis lire le résultatTransform_transform() , _flush() , _final()

Le code d'implémentation d'un flux ne doit jamais appeler les méthodes "publiques" d'un flux qui sont destinées à être utilisées par les consommateurs (comme décrit dans la section API pour les consommateurs de flux). Cela pourrait entraîner des effets secondaires indésirables dans le code d'application qui consomme le flux.

Évitez de remplacer des méthodes publiques telles que write(), end(), cork(), uncork(), read() et destroy(), ou d'émettre des événements internes tels que 'error', 'data', 'end', 'finish' et 'close' via .emit(). Cela peut casser les invariants de flux actuels et futurs, entraînant des problèmes de comportement et/ou de compatibilité avec d'autres flux, utilitaires de flux et attentes des utilisateurs.

Construction simplifiée

Ajouté dans la version : v1.2.0

Dans de nombreux cas simples, il est possible de créer un flux sans avoir recours à l'héritage. Cela peut être réalisé en créant directement des instances des objets stream.Writable, stream.Readable, stream.Duplex ou stream.Transform et en passant les méthodes appropriées en tant qu'options du constructeur.

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  construct(callback) {
    // Initialiser l'état et charger les ressources...
  },
  write(chunk, encoding, callback) {
    // ...
  },
  destroy() {
    // Libérer les ressources...
  },
})

Mise en œuvre d'un flux inscriptible

La classe stream.Writable est étendue pour mettre en œuvre un flux Writable.

Les flux Writable personnalisés doivent appeler le constructeur new stream.Writable([options]) et mettre en œuvre la méthode writable._write() et/ou writable._writev().

new stream.Writable([options])

[Historique]

VersionModifications
v22.0.0Augmentation de la valeur par défaut de highWaterMark.
v15.5.0Prise en charge du passage d'un AbortSignal.
v14.0.0Modification de la valeur par défaut de l'option autoDestroy à true.
v11.2.0, v10.16.0Ajout de l'option autoDestroy pour appeler automatiquement destroy() le flux lorsqu'il émet 'finish' ou des erreurs.
v10.0.0Ajout de l'option emitClose pour spécifier si 'close' est émis lors de la destruction.
  • options <Object>
    • highWaterMark <number> Niveau du tampon lorsque stream.write() commence à renvoyer false. Par défaut : 65536 (64 Kio), ou 16 pour les flux objectMode.
    • decodeStrings <boolean> Indique s'il faut encoder les string transmises à stream.write() en Buffer (avec l'encodage spécifié dans l'appel stream.write()) avant de les transmettre à stream._write(). Les autres types de données ne sont pas convertis (c.-à-d. que les Buffer ne sont pas décodés en string). La valeur false empêchera la conversion des string. Par défaut : true.
    • defaultEncoding <string> L'encodage par défaut qui est utilisé lorsqu'aucun encodage n'est spécifié comme argument à stream.write(). Par défaut : 'utf8'.
    • objectMode <boolean> Indique si stream.write(anyObj) est une opération valide ou non. Lorsqu'il est défini, il devient possible d'écrire des valeurs JavaScript autres que string, <Buffer>, <TypedArray> ou <DataView> si elles sont prises en charge par l'implémentation du flux. Par défaut : false.
    • emitClose <boolean> Indique si le flux doit émettre 'close' après avoir été détruit ou non. Par défaut : true.
    • write <Function> Mise en œuvre de la méthode stream._write().
    • writev <Function> Mise en œuvre de la méthode stream._writev().
    • destroy <Function> Mise en œuvre de la méthode stream._destroy().
    • final <Function> Mise en œuvre de la méthode stream._final().
    • construct <Function> Mise en œuvre de la méthode stream._construct().
    • autoDestroy <boolean> Indique si ce flux doit appeler automatiquement .destroy() sur lui-même après la fin. Par défaut : true.
    • signal <AbortSignal> Un signal représentant une possible annulation.
js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor(options) {
    // Appelle le constructeur stream.Writable().
    super(options)
    // ...
  }
}

Ou, lors de l'utilisation de constructeurs de style antérieur à ES6 :

js
const { Writable } = require('node:stream')
const util = require('node:util')

function MyWritable(options) {
  if (!(this instanceof MyWritable)) return new MyWritable(options)
  Writable.call(this, options)
}
util.inherits(MyWritable, Writable)

Ou, en utilisant l'approche simplifiée du constructeur :

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
})

L'appel de abort sur AbortController correspondant au AbortSignal passé se comportera de la même manière que l'appel de .destroy(new AbortError()) sur le flux inscriptible.

js
const { Writable } = require('node:stream')

const controller = new AbortController()
const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
  signal: controller.signal,
})
// Plus tard, annuler l'opération en fermant le flux
controller.abort()

writable._construct(callback)

Ajouté dans : v15.0.0

  • callback <Fonction> Appelez cette fonction (éventuellement avec un argument d’erreur) lorsque le flux a terminé son initialisation.

La méthode _construct() ne DOIT PAS être appelée directement. Elle peut être implémentée par les classes enfants et, si c’est le cas, elle sera appelée uniquement par les méthodes internes de la classe Writable.

Cette fonction optionnelle sera appelée dans un tic après le retour du constructeur du flux, ce qui retardera tout appel à _write(), _final() et _destroy() jusqu’à ce que callback soit appelée. Ceci est utile pour initialiser l’état ou pour initialiser de manière asynchrone des ressources avant que le flux puisse être utilisé.

js
const { Writable } = require('node:stream')
const fs = require('node:fs')

class WriteStream extends Writable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, 'w', (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _write(chunk, encoding, callback) {
    fs.write(this.fd, chunk, callback)
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

writable._write(chunk, encoding, callback)

[Historique]

VersionModifications
v12.11.0_write() est optionnel lorsque vous fournissez _writev().
  • chunk <Buffer> | <string> | <any> Le Buffer à écrire, converti à partir de la string passée à stream.write(). Si l’option decodeStrings du flux est false ou si le flux fonctionne en mode objet, le chunk ne sera pas converti et sera ce qui a été passé à stream.write().
  • encoding <string> Si le chunk est une chaîne, alors encoding est l’encodage de caractères de cette chaîne. Si le chunk est un Buffer, ou si le flux fonctionne en mode objet, encoding peut être ignoré.
  • callback <Fonction> Appelez cette fonction (éventuellement avec un argument d’erreur) lorsque le traitement du chunk fourni est terminé.

Toutes les implémentations de flux Writable doivent fournir une méthode writable._write() et/ou writable._writev() pour envoyer les données à la ressource sous-jacente.

Les flux Transform fournissent leur propre implémentation de writable._write().

Cette fonction ne DOIT PAS être appelée directement par le code de l’application. Elle doit être implémentée par les classes enfants et appelée uniquement par les méthodes internes de la classe Writable.

La fonction callback doit être appelée de manière synchrone dans writable._write() ou de manière asynchrone (c.-à-d. un tic différent) pour signaler soit que l’écriture a réussi, soit qu’elle a échoué avec une erreur. Le premier argument passé à callback doit être l’objet Error si l’appel a échoué ou null si l’écriture a réussi.

Tous les appels à writable.write() qui se produisent entre le moment où writable._write() est appelé et le moment où callback est appelé entraîneront la mise en mémoire tampon des données écrites. Lorsque callback est invoqué, le flux peut émettre un événement 'drain'. Si une implémentation de flux est capable de traiter plusieurs chunks de données à la fois, la méthode writable._writev() doit être implémentée.

Si la propriété decodeStrings est explicitement définie sur false dans les options du constructeur, alors chunk restera le même objet qui est passé à .write(), et peut être une chaîne plutôt qu’un Buffer. Cela permet de prendre en charge les implémentations qui ont une gestion optimisée pour certains encodages de données de chaîne. Dans ce cas, l’argument encoding indiquera l’encodage de caractères de la chaîne. Sinon, l’argument encoding peut être ignoré en toute sécurité.

La méthode writable._write() est préfixée d’un trait de soulignement, car elle est interne à la classe qui la définit et ne doit jamais être appelée directement par les programmes utilisateur.

writable._writev(chunks, callback)

  • chunks <Object[]> Les données à écrire. La valeur est un tableau de <Object> qui représentent chacun un morceau de données discret à écrire. Les propriétés de ces objets sont :

    • chunk <Buffer> | <string> Une instance de tampon ou une chaîne contenant les données à écrire. Le chunk sera une chaîne si le Writable a été créé avec l’option decodeStrings définie sur false et qu’une chaîne a été transmise à write().
    • encoding <string> L’encodage des caractères du chunk. Si chunk est un Buffer, l’encodage sera 'buffer'.
  • callback <Function> Une fonction de rappel (éventuellement avec un argument d’erreur) à invoquer lorsque le traitement est terminé pour les morceaux fournis.

Cette fonction NE DOIT PAS être appelée directement par le code de l’application. Elle doit être implémentée par les classes enfants et appelée uniquement par les méthodes internes de la classe Writable.

La méthode writable._writev() peut être implémentée en plus ou en alternative à writable._write() dans les implémentations de flux capables de traiter plusieurs morceaux de données à la fois. Si elle est implémentée et s’il existe des données mises en mémoire tampon provenant d’écritures précédentes, _writev() sera appelée au lieu de _write().

La méthode writable._writev() est préfixée d’un trait de soulignement car elle est interne à la classe qui la définit et ne doit jamais être appelée directement par les programmes utilisateur.

writable._destroy(err, callback)

Ajoutée dans : v8.0.0

  • err <Error> Une erreur possible.
  • callback <Function> Une fonction de rappel qui prend un argument d’erreur facultatif.

La méthode _destroy() est appelée par writable.destroy(). Elle peut être remplacée par des classes enfants, mais elle ne doit pas être appelée directement.

writable._final(callback)

Ajouté dans : v8.0.0

  • callback <Function> Appeler cette fonction (éventuellement avec un argument d'erreur) lorsque l'écriture des données restantes est terminée.

La méthode _final() ne doit pas être appelée directement. Elle peut être implémentée par des classes enfants, et si c'est le cas, elle sera appelée uniquement par les méthodes internes de la classe Writable.

Cette fonction optionnelle sera appelée avant la fermeture du flux, retardant l'événement 'finish' jusqu'à ce que callback soit appelée. Cela est utile pour fermer des ressources ou écrire des données mises en mémoire tampon avant la fin d'un flux.

Erreurs lors de l'écriture

Les erreurs survenant lors du traitement des méthodes writable._write(), writable._writev() et writable._final() doivent être propagées en invoquant le rappel et en transmettant l'erreur comme premier argument. Lever une Error à partir de ces méthodes ou émettre manuellement un événement 'error' entraîne un comportement indéfini.

Si un flux Readable est relié à un flux Writable lorsque Writable émet une erreur, le flux Readable sera déconnecté.

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'))
    } else {
      callback()
    }
  },
})

Un exemple de flux inscriptible

Ce qui suit illustre une implémentation de flux Writable personnalisé assez simpliste (et quelque peu inutile). Bien que cette instance de flux Writable spécifique ne soit pas d'une utilité réelle particulière, l'exemple illustre chacun des éléments requis d'une instance de flux Writable personnalisée :

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'))
    } else {
      callback()
    }
  }
}

Décodage des tampons dans un flux accessible en écriture

Le décodage des tampons est une tâche courante, par exemple, lors de l'utilisation de transformateurs dont l'entrée est une chaîne de caractères. Ce n'est pas un processus trivial lors de l'utilisation de l'encodage de caractères multi-octets, tel que UTF-8. L'exemple suivant montre comment décoder des chaînes multi-octets en utilisant StringDecoder et Writable.

js
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')

class StringWritable extends Writable {
  constructor(options) {
    super(options)
    this._decoder = new StringDecoder(options?.defaultEncoding)
    this.data = ''
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk)
    }
    this.data += chunk
    callback()
  }
  _final(callback) {
    this.data += this._decoder.end()
    callback()
  }
}

const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()

w.write('currency: ')
w.write(euro[0])
w.end(euro[1])

console.log(w.data) // currency: €

Implémentation d'un flux lisible

La classe stream.Readable est étendue pour implémenter un flux Readable.

Les flux Readable personnalisés doivent appeler le constructeur new stream.Readable([options]) et implémenter la méthode readable._read().

new stream.Readable([options])

[Historique]

VersionModifications
v22.0.0Augmentation de la valeur par défaut de highWaterMark.
v15.5.0Prise en charge du passage d'un AbortSignal.
v14.0.0Changement de la valeur par défaut de l'option autoDestroy à true.
v11.2.0, v10.16.0Ajout de l'option autoDestroy pour destroy() automatiquement le flux lorsqu'il émet 'end' ou des erreurs.
  • options <Object>
    • highWaterMark <number> Le nombre maximal d'octets à stocker dans le tampon interne avant de cesser de lire la ressource sous-jacente. Par défaut : 65536 (64 Ko), ou 16 pour les flux en objectMode.
    • encoding <string> Si spécifié, les tampons seront décodés en chaînes à l'aide de l'encodage spécifié. Par défaut : null.
    • objectMode <boolean> Indique si ce flux doit se comporter comme un flux d'objets. Cela signifie que stream.read(n) renvoie une seule valeur au lieu d'un Buffer de taille n. Par défaut : false.
    • emitClose <boolean> Indique si le flux doit émettre 'close' après avoir été détruit. Par défaut : true.
    • read <Function> Implémentation de la méthode stream._read().
    • destroy <Function> Implémentation de la méthode stream._destroy().
    • construct <Function> Implémentation de la méthode stream._construct().
    • autoDestroy <boolean> Indique si ce flux doit appeler automatiquement .destroy() sur lui-même après sa fin. Par défaut : true.
    • signal <AbortSignal> Un signal représentant une possible annulation.
js
const { Readable } = require('node:stream')

class MyReadable extends Readable {
  constructor(options) {
    // Appelle le constructeur stream.Readable(options).
    super(options)
    // ...
  }
}

Ou, lors de l'utilisation de constructeurs de style pré-ES6 :

js
const { Readable } = require('node:stream')
const util = require('node:util')

function MyReadable(options) {
  if (!(this instanceof MyReadable)) return new MyReadable(options)
  Readable.call(this, options)
}
util.inherits(MyReadable, Readable)

Ou, en utilisant l'approche de constructeur simplifiée :

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    // ...
  },
})

L'appel de abort sur le AbortController correspondant à la AbortSignal passée se comportera de la même manière que l'appel de .destroy(new AbortError()) sur le flux lisible créé.

js
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
  read(size) {
    // ...
  },
  signal: controller.signal,
})
// Plus tard, annulez l'opération en fermant le flux.
controller.abort()

readable._construct(callback)

Ajouté dans : v15.0.0

  • callback <Function> Appelez cette fonction (éventuellement avec un argument d'erreur) lorsque le flux a terminé son initialisation.

La méthode _construct() NE DOIT PAS être appelée directement. Elle peut être implémentée par des classes enfants et, si c'est le cas, elle sera appelée uniquement par les méthodes internes de la classe Readable.

Cette fonction optionnelle sera planifiée lors de la prochaine itération par le constructeur du flux, ce qui retardera tout appel de _read() et _destroy() jusqu'à ce que callback soit appelée. Ceci est utile pour initialiser l'état ou initialiser de manière asynchrone des ressources avant que le flux ne puisse être utilisé.

js
const { Readable } = require('node:stream')
const fs = require('node:fs')

class ReadStream extends Readable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _read(n) {
    const buf = Buffer.alloc(n)
    fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
      if (err) {
        this.destroy(err)
      } else {
        this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
      }
    })
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

readable._read(size)

Ajouté dans : v0.9.4

  • size <number> Nombre d’octets à lire de façon asynchrone

Cette fonction NE DOIT PAS être appelée directement par le code de l’application. Elle doit être implémentée par les classes enfants, et être appelée uniquement par les méthodes internes de la classe Readable.

Toutes les implémentations de flux Readable doivent fournir une implémentation de la méthode readable._read() pour récupérer des données depuis la ressource sous-jacente.

Lorsque readable._read() est appelée, si des données sont disponibles à partir de la ressource, l’implémentation doit commencer à pousser ces données dans la file d’attente de lecture en utilisant la méthode this.push(dataChunk). _read() sera appelée à nouveau après chaque appel à this.push(dataChunk) une fois que le flux est prêt à accepter plus de données. _read() peut continuer à lire depuis la ressource et à pousser des données jusqu’à ce que readable.push() renvoie false. Ce n’est que lorsque _read() est appelée à nouveau après qu’elle s’est arrêtée qu’elle doit reprendre le transfert de données supplémentaires dans la file d’attente.

Une fois que la méthode readable._read() a été appelée, elle ne sera plus appelée jusqu’à ce que plus de données soient poussées via la méthode readable.push(). Les données vides telles que les tampons et les chaînes vides n’entraîneront pas l’appel de readable._read().

L’argument size est consultatif. Pour les implémentations où une "lecture" est une opération unique qui renvoie des données, l’argument size peut être utilisé pour déterminer la quantité de données à extraire. D’autres implémentations peuvent ignorer cet argument et simplement fournir des données chaque fois qu’elles deviennent disponibles. Il n’est pas nécessaire d’"attendre" que size octets soient disponibles avant d’appeler stream.push(chunk).

La méthode readable._read() est précédée d’un trait de soulignement, car elle est interne à la classe qui la définit et ne doit jamais être appelée directement par les programmes utilisateurs.

readable._destroy(err, callback)

Ajouté dans : v8.0.0

  • err <Error> Une erreur possible.
  • callback <Function> Une fonction de rappel qui prend un argument d’erreur facultatif.

La méthode _destroy() est appelée par readable.destroy(). Elle peut être remplacée par les classes enfants mais elle ne doit pas être appelée directement.

readable.push(chunk[, encoding])

[Historique]

VersionModifications
v22.0.0, v20.13.0L’argument chunk peut maintenant être une instance TypedArray ou DataView.
v8.0.0L’argument chunk peut maintenant être une instance Uint8Array.
  • chunk <Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Portion de données à ajouter à la file d’attente de lecture. Pour les flux ne fonctionnant pas en mode objet, chunk doit être une <string>, un <Buffer>, un <TypedArray> ou une <DataView>. Pour les flux en mode objet, chunk peut être n’importe quelle valeur JavaScript.
  • encoding <string> Encodage des portions de chaînes. Doit être un encodage Buffer valide, tel que 'utf8' ou 'ascii'.
  • Retourne : <boolean> true si des portions de données supplémentaires peuvent continuer à être ajoutées ; false sinon.

Lorsque chunk est un <Buffer>, un <TypedArray>, une <DataView> ou une <string>, la portion de données sera ajoutée à la file d’attente interne pour que les utilisateurs du flux puissent la consommer. Le fait de transmettre chunk en tant que null signale la fin du flux (EOF), après quoi aucune autre donnée ne peut être écrite.

Lorsque le Readable fonctionne en mode suspendu, les données ajoutées avec readable.push() peuvent être lues en appelant la méthode readable.read() lorsque l’événement 'readable' est émis.

Lorsque le Readable fonctionne en mode flux, les données ajoutées avec readable.push() seront distribuées en émettant un événement 'data'.

La méthode readable.push() est conçue pour être aussi flexible que possible. Par exemple, lors de l’encapsulation d’une source de niveau inférieur qui fournit une forme quelconque de mécanisme de pause/reprise, et un rappel de données, la source de niveau inférieur peut être encapsulée par l’instance Readable personnalisée :

js
// `_source` est un objet avec des méthodes readStop() et readStart(),
// et un membre `ondata` qui est appelé lorsqu’il a des données, et
// un membre `onend` qui est appelé lorsque les données sont terminées.

class SourceWrapper extends Readable {
  constructor(options) {
    super(options)

    this._source = getLowLevelSourceObject()

    // Chaque fois qu’il y a des données, les ajouter à la mémoire tampon interne.
    this._source.ondata = chunk => {
      // Si push() retourne false, alors arrêtez la lecture depuis la source.
      if (!this.push(chunk)) this._source.readStop()
    }

    // Lorsque la source se termine, ajoutez la portion `null` signalant la fin de fichier.
    this._source.onend = () => {
      this.push(null)
    }
  }
  // _read() sera appelé lorsque le flux veut récupérer plus de données.
  // L’argument de taille indicatif est ignoré dans ce cas.
  _read(size) {
    this._source.readStart()
  }
}

La méthode readable.push() est utilisée pour ajouter le contenu à la mémoire tampon interne. Elle peut être pilotée par la méthode readable._read().

Pour les flux ne fonctionnant pas en mode objet, si le paramètre chunk de readable.push() est undefined, il sera traité comme une chaîne ou une mémoire tampon vide. Voir readable.push('') pour plus d’informations.

Erreurs lors de la lecture

Les erreurs survenant lors du traitement de readable._read() doivent être propagées via la méthode readable.destroy(err). Lancer une Error depuis readable._read() ou émettre manuellement un événement 'error' entraîne un comportement indéfini.

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    const err = checkSomeErrorCondition()
    if (err) {
      this.destroy(err)
    } else {
      // Effectuer un travail.
    }
  },
})

Exemple de flux de comptage

L'exemple suivant est un exemple basique d'un flux Readable qui émet les chiffres de 1 à 1 000 000 par ordre croissant, puis se termine.

js
const { Readable } = require('node:stream')

class Counter extends Readable {
  constructor(opt) {
    super(opt)
    this._max = 1000000
    this._index = 1
  }

  _read() {
    const i = this._index++
    if (i > this._max) this.push(null)
    else {
      const str = String(i)
      const buf = Buffer.from(str, 'ascii')
      this.push(buf)
    }
  }
}

Implémentation d'un flux duplex

Un flux Duplex est un flux qui implémente à la fois Readable et Writable, comme une connexion de socket TCP.

Comme JavaScript ne prend pas en charge l'héritage multiple, la classe stream.Duplex est étendue pour implémenter un flux Duplex (au lieu d'étendre les classes stream.Readable et stream.Writable).

La classe stream.Duplex hérite de manière prototypique de stream.Readable et de manière parasitaire de stream.Writable, mais instanceof fonctionnera correctement pour les deux classes de base grâce à la surcharge de Symbol.hasInstance sur stream.Writable.

Les flux Duplex personnalisés doivent appeler le constructeur new stream.Duplex([options]) et implémenter à la fois les méthodes readable._read() et writable._write().

new stream.Duplex(options)

[Historique]

VersionModifications
v8.4.0Les options readableHighWaterMark et writableHighWaterMark sont désormais prises en charge.
  • options <Objet> Transmis aux constructeurs Writable et Readable. Possède également les champs suivants :
    • allowHalfOpen <boolean> S'il est défini sur false, le flux terminera automatiquement le côté accessible en écriture lorsque le côté accessible en lecture se terminera. Par défaut : true.
    • readable <boolean> Détermine si le Duplex doit être accessible en lecture. Par défaut : true.
    • writable <boolean> Détermine si le Duplex doit être accessible en écriture. Par défaut : true.
    • readableObjectMode <boolean> Définit objectMode pour le côté accessible en lecture du flux. N'a aucun effet si objectMode est true. Par défaut : false.
    • writableObjectMode <boolean> Définit objectMode pour le côté accessible en écriture du flux. N'a aucun effet si objectMode est true. Par défaut : false.
    • readableHighWaterMark <number> Définit highWaterMark pour le côté accessible en lecture du flux. N'a aucun effet si highWaterMark est fourni.
    • writableHighWaterMark <number> Définit highWaterMark pour le côté accessible en écriture du flux. N'a aucun effet si highWaterMark est fourni.
js
const { Duplex } = require('node:stream')

class MyDuplex extends Duplex {
  constructor(options) {
    super(options)
    // ...
  }
}

Ou, lors de l'utilisation de constructeurs de style pré-ES6 :

js
const { Duplex } = require('node:stream')
const util = require('node:util')

function MyDuplex(options) {
  if (!(this instanceof MyDuplex)) return new MyDuplex(options)
  Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)

Ou, en utilisant l'approche du constructeur simplifié :

js
const { Duplex } = require('node:stream')

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  },
})

Lors de l'utilisation du pipeline :

js
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')

pipeline(
  fs.createReadStream('object.json').setEncoding('utf8'),
  new Transform({
    decodeStrings: false, // Accepter les entrées de type chaîne plutôt que les tampons
    construct(callback) {
      this.data = ''
      callback()
    },
    transform(chunk, encoding, callback) {
      this.data += chunk
      callback()
    },
    flush(callback) {
      try {
        // S'assurer qu'il s'agit d'un JSON valide.
        JSON.parse(this.data)
        this.push(this.data)
        callback()
      } catch (err) {
        callback(err)
      }
    },
  }),
  fs.createWriteStream('valid-object.json'),
  err => {
    if (err) {
      console.error('failed', err)
    } else {
      console.log('completed')
    }
  }
)

Exemple de flux duplex

L'exemple suivant illustre un exemple simple de flux Duplex qui encapsule un objet source hypothétique de niveau inférieur dans lequel des données peuvent être écrites, et à partir duquel des données peuvent être lues, bien qu'en utilisant une API qui n'est pas compatible avec les flux Node.js. L'exemple suivant illustre un exemple simple de flux Duplex qui met en mémoire tampon les données écrites entrantes via l'interface Writable qui est relue via l'interface Readable.

js
const { Duplex } = require('node:stream')
const kSource = Symbol('source')

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options)
    this[kSource] = source
  }

  _write(chunk, encoding, callback) {
    // La source sous-jacente ne traite que les chaînes de caractères.
    if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
    this[kSource].writeSomeData(chunk)
    callback()
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding))
    })
  }
}

L'aspect le plus important d'un flux Duplex est que les côtés Readable et Writable fonctionnent indépendamment l'un de l'autre, bien qu'ils coexistent au sein d'une même instance d'objet.

Flux duplex en mode objet

Pour les flux Duplex, objectMode peut être défini exclusivement pour le côté Readable ou Writable en utilisant respectivement les options readableObjectMode et writableObjectMode.

Dans l'exemple suivant, par exemple, un nouveau flux Transform (qui est un type de flux Duplex) est créé avec un côté Writable en mode objet qui accepte les nombres JavaScript qui sont convertis en chaînes hexadécimales du côté Readable.

js
const { Transform } = require('node:stream')

// Tous les flux Transform sont également des flux Duplex.
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // Convertir le chunk en nombre si nécessaire.
    chunk |= 0

    // Transformer le chunk en autre chose.
    const data = chunk.toString(16)

    // Pousser les données dans la file d'attente lisible.
    callback(null, '0'.repeat(data.length % 2) + data)
  },
})

myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))

myTransform.write(1)
// Affiche : 01
myTransform.write(10)
// Affiche : 0a
myTransform.write(100)
// Affiche : 64

Implémentation d'un flux de transformation

Un flux Transform est un flux Duplex où la sortie est calculée d'une certaine manière à partir de l'entrée. Les exemples incluent les flux zlib ou les flux crypto qui compressent, cryptent ou décryptent des données.

Il n'y a aucune obligation que la sortie ait la même taille que l'entrée, le même nombre de morceaux ou arrive au même moment. Par exemple, un flux Hash n'aura jamais qu'un seul morceau de sortie qui est fourni lorsque l'entrée est terminée. Un flux zlib produira une sortie qui est soit beaucoup plus petite, soit beaucoup plus grande que son entrée.

La classe stream.Transform est étendue pour implémenter un flux Transform.

La classe stream.Transform hérite prototypiquement de stream.Duplex et implémente ses propres versions des méthodes writable._write() et readable._read(). Les implémentations Transform personnalisées doivent implémenter la méthode transform._transform() et peuvent également implémenter la méthode transform._flush().

Il faut être prudent lors de l'utilisation de flux Transform, car les données écrites dans le flux peuvent entraîner la mise en pause du côté Writable du flux si la sortie du côté Readable n'est pas consommée.

new stream.Transform([options])

js
const { Transform } = require('node:stream')

class MyTransform extends Transform {
  constructor(options) {
    super(options)
    // ...
  }
}

Ou, lors de l'utilisation de constructeurs de style pré-ES6 :

js
const { Transform } = require('node:stream')
const util = require('node:util')

function MyTransform(options) {
  if (!(this instanceof MyTransform)) return new MyTransform(options)
  Transform.call(this, options)
}
util.inherits(MyTransform, Transform)

Ou, en utilisant l'approche de constructeur simplifiée :

js
const { Transform } = require('node:stream')

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  },
})

Événement : 'end'

L’événement 'end' provient de la classe stream.Readable. L’événement 'end' est émis une fois que toutes les données ont été produites, ce qui se produit après l’appel du rappel dans transform._flush(). En cas d’erreur, 'end' ne doit pas être émis.

Événement : 'finish'

L’événement 'finish' provient de la classe stream.Writable. L’événement 'finish' est émis après que stream.end() est appelé et que tous les blocs ont été traités par stream._transform(). En cas d’erreur, 'finish' ne doit pas être émis.

transform._flush(callback)

  • callback <Fonction> Une fonction de rappel (avec éventuellement un argument d’erreur et des données) à appeler lorsque les données restantes ont été vidées.

Cette fonction NE DOIT PAS être appelée directement par le code de l’application. Elle doit être implémentée par les classes enfants et appelée uniquement par les méthodes internes de la classe Readable.

Dans certains cas, une opération de transformation peut avoir besoin d’émettre un bit de données supplémentaire à la fin du flux. Par exemple, un flux de compression zlib stockera une quantité d’état interne utilisée pour compresser de manière optimale la sortie. Lorsque le flux se termine, cependant, ces données supplémentaires doivent être vidées afin que les données compressées soient complètes.

Les implémentations personnalisées de Transform peuvent implémenter la méthode transform._flush(). Celle-ci sera appelée lorsqu’il n’y aura plus de données écrites à consommer, mais avant que l’événement 'end' ne soit émis signalant la fin du flux Readable.

Au sein de l’implémentation de transform._flush(), la méthode transform.push() peut être appelée zéro ou plusieurs fois, selon le cas. La fonction callback doit être appelée lorsque l’opération de vidage est terminée.

La méthode transform._flush() est préfixée par un caractère de soulignement car elle est interne à la classe qui la définit et ne doit jamais être appelée directement par les programmes utilisateur.

transform._transform(chunk, encoding, callback)

  • chunk <Buffer> | <string> | <any> Le Buffer à transformer, converti à partir de la string passée à stream.write(). Si l'option decodeStrings du flux est false ou si le flux fonctionne en mode objet, le chunk ne sera pas converti et sera ce qui a été passé à stream.write().
  • encoding <string> Si le chunk est une chaîne, alors il s'agit du type d'encodage. Si le chunk est un buffer, alors il s'agit de la valeur spéciale 'buffer'. Ignorez-le dans ce cas.
  • callback <Function> Une fonction de rappel (éventuellement avec un argument d'erreur et des données) à appeler une fois que le chunk fourni a été traité.

Cette fonction NE DOIT PAS être appelée directement par le code de l'application. Elle doit être implémentée par les classes enfants et appelée uniquement par les méthodes internes de la classe Readable.

Toutes les implémentations de flux Transform doivent fournir une méthode _transform() pour accepter l'entrée et produire la sortie. L'implémentation transform._transform() gère les octets écrits, calcule une sortie, puis transmet cette sortie à la partie lisible en utilisant la méthode transform.push().

La méthode transform.push() peut être appelée zéro ou plusieurs fois pour générer une sortie à partir d'un seul chunk d'entrée, en fonction de la quantité à sortir en raison du chunk.

Il est possible qu'aucune sortie ne soit générée à partir d'un chunk de données d'entrée donné.

La fonction callback doit être appelée uniquement lorsque le chunk actuel est complètement consommé. Le premier argument passé au callback doit être un objet Error si une erreur s'est produite lors du traitement de l'entrée, ou null sinon. Si un deuxième argument est passé au callback, il sera transmis à la méthode transform.push(), mais seulement si le premier argument est faux. En d'autres termes, ce qui suit est équivalent :

js
transform.prototype._transform = function (data, encoding, callback) {
  this.push(data)
  callback()
}

transform.prototype._transform = function (data, encoding, callback) {
  callback(null, data)
}

La méthode transform._transform() est préfixée par un trait de soulignement car elle est interne à la classe qui la définit et ne doit jamais être appelée directement par les programmes utilisateurs.

transform._transform() n'est jamais appelée en parallèle ; les flux implémentent un mécanisme de file d'attente, et pour recevoir le prochain chunk, callback doit être appelée, soit de manière synchrone, soit de manière asynchrone.

Classe : stream.PassThrough

La classe stream.PassThrough est une implémentation triviale d’un flux Transform qui transmet simplement les octets d’entrée vers la sortie. Son objectif est principalement de servir d’exemples et de tests, mais il existe certains cas d’utilisation où stream.PassThrough est utile comme élément de base pour de nouveaux types de flux.

Notes supplémentaires

Compatibilité des flux avec les générateurs et itérateurs asynchrones

Avec la prise en charge des générateurs et itérateurs asynchrones en JavaScript, les générateurs asynchrones sont désormais effectivement une construction de flux de premier ordre au niveau du langage.

Voici quelques cas courants d’interopérabilité d’utilisation de flux Node.js avec des générateurs asynchrones et des itérateurs asynchrones.

Consommer des flux lisibles avec des itérateurs asynchrones

js
;(async function () {
  for await (const chunk of readable) {
    console.log(chunk)
  }
})()

Les itérateurs asynchrones enregistrent un gestionnaire d’erreurs permanent sur le flux pour éviter toute erreur post-destruction non gérée.

Créer des flux lisibles avec des générateurs asynchrones

Un flux lisible Node.js peut être créé à partir d’un générateur asynchrone en utilisant la méthode utilitaire Readable.from() :

js
const { Readable } = require('node:stream')

const ac = new AbortController()
const signal = ac.signal

async function* generate() {
  yield 'a'
  await someLongRunningFn({ signal })
  yield 'b'
  yield 'c'
}

const readable = Readable.from(generate())
readable.on('close', () => {
  ac.abort()
})

readable.on('data', chunk => {
  console.log(chunk)
})

Envoyer des itérateurs asynchrones vers des flux accessibles en écriture

Lorsque vous écrivez dans un flux accessible en écriture à partir d’un itérateur asynchrone, assurez-vous d’une gestion correcte de la contre-pression et des erreurs. stream.pipeline() fait abstraction de la gestion de la contre-pression et des erreurs liées à la contre-pression :

js
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')

const writable = fs.createWriteStream('./file')

const ac = new AbortController()
const signal = ac.signal

const iterator = createIterator({ signal })

// Modèle de rappel
pipeline(iterator, writable, (err, value) => {
  if (err) {
    console.error(err)
  } else {
    console.log(value, 'value returned')
  }
}).on('close', () => {
  ac.abort()
})

// Modèle de promesse
pipelinePromise(iterator, writable)
  .then(value => {
    console.log(value, 'value returned')
  })
  .catch(err => {
    console.error(err)
    ac.abort()
  })

Compatibilité avec les anciennes versions de Node.js

Avant Node.js 0.10, l'interface de flux Readable était plus simple, mais aussi moins puissante et moins utile.

  • Plutôt que d'attendre les appels à la méthode stream.read(), les événements 'data' commençaient à émettre immédiatement. Les applications qui devaient effectuer un certain travail pour décider comment gérer les données étaient obligées de stocker les données lues dans des tampons afin que les données ne soient pas perdues.
  • La méthode stream.pause() était consultative, plutôt que garantie. Cela signifiait qu'il était toujours nécessaire d'être prêt à recevoir des événements 'data' même lorsque le flux était dans un état en pause.

Dans Node.js 0.10, la classe Readable a été ajoutée. Pour assurer la rétrocompatibilité avec les anciens programmes Node.js, les flux Readable passent en "mode fluide" lorsqu'un gestionnaire d'événements 'data' est ajouté, ou lorsque la méthode stream.resume() est appelée. L'effet est que, même en n'utilisant pas la nouvelle méthode stream.read() et l'événement 'readable', il n'est plus nécessaire de s'inquiéter de la perte de blocs 'data'.

Bien que la plupart des applications continuent de fonctionner normalement, cela introduit un cas limite dans les conditions suivantes :

  • Aucun écouteur d'événement 'data' n'est ajouté.
  • La méthode stream.resume() n'est jamais appelée.
  • Le flux n'est pas redirigé vers une destination inscriptible.

Par exemple, considérez le code suivant :

js
// ATTENTION !  CASSÉ !
net.createServer((socket) => {

  // Nous ajoutons un écouteur 'end', mais ne consommons jamais les données.
  socket.on('end', () => {
    // Il n'arrivera jamais ici.
    socket.end('Le message a été reçu mais n'a pas été traité.\n');
  });

}).listen(1337);

Avant Node.js 0.10, les données du message entrant seraient simplement ignorées. Cependant, dans Node.js 0.10 et au-delà, le socket reste en pause indéfiniment.

La solution de contournement dans cette situation consiste à appeler la méthode stream.resume() pour démarrer le flux de données :

js
// Solution de contournement.
net.createServer((socket) => {
  socket.on('end', () => {
    socket.end('Le message a été reçu mais n'a pas été traité.\n');
  });

  // Démarrer le flux de données, en l'ignorant.
  socket.resume();
}).listen(1337);

En plus des nouveaux flux Readable qui passent en mode fluide, les flux de style pré-0.10 peuvent être encapsulés dans une classe Readable en utilisant la méthode readable.wrap().

readable.read(0)

Dans certains cas, il est nécessaire de déclencher une actualisation des mécanismes de flux lisibles sous-jacents, sans consommer réellement de données. Dans de tels cas, il est possible d'appeler readable.read(0), qui retournera toujours null.

Si le tampon de lecture interne est inférieur à highWaterMark, et que le flux n'est pas en cours de lecture, alors l'appel à stream.read(0) déclenchera un appel de bas niveau stream._read().

Bien que la plupart des applications n'aient presque jamais besoin de faire cela, il existe des situations dans Node.js où cela est fait, en particulier dans les mécanismes internes de la classe de flux Readable.

readable.push('')

L'utilisation de readable.push('') n'est pas recommandée.

Le fait de pousser une <string>, un <Buffer>, un <TypedArray> ou une <DataView> de zéro octet dans un flux qui n'est pas en mode objet a un effet secondaire intéressant. Parce qu'il s'agit bien d'un appel à readable.push(), l'appel mettra fin au processus de lecture. Cependant, comme l'argument est une chaîne vide, aucune donnée n'est ajoutée au tampon lisible, il n'y a donc rien à consommer pour l'utilisateur.

Discrépance highWaterMark après l'appel de readable.setEncoding()

L'utilisation de readable.setEncoding() modifiera le comportement de la façon dont highWaterMark fonctionne en mode non-objet.

Généralement, la taille du tampon actuel est mesurée par rapport à highWaterMark en octets. Cependant, après l'appel de setEncoding(), la fonction de comparaison commencera à mesurer la taille du tampon en caractères.

Ce n'est pas un problème dans les cas courants avec latin1 ou ascii. Mais il est conseillé d'être attentif à ce comportement lorsque l'on travaille avec des chaînes de caractères qui pourraient contenir des caractères multi-octets.