Stream
[Stable: 2 - Stable]
Stable: 2 Stabilité : 2 - Stable
Code Source : lib/stream.js
Un flux est une interface abstraite pour travailler avec des données en flux dans Node.js. Le module node:stream
fournit une API pour implémenter l'interface de flux.
Il existe de nombreux objets flux fournis par Node.js. Par exemple, une requête vers un serveur HTTP et process.stdout
sont tous deux des instances de flux.
Les flux peuvent être lisibles, inscriptibles, ou les deux. Tous les flux sont des instances de EventEmitter
.
Pour accéder au module node:stream
:
const stream = require('node:stream')
Le module node:stream
est utile pour créer de nouveaux types d'instances de flux. Il n'est généralement pas nécessaire d'utiliser le module node:stream
pour consommer des flux.
Organisation de ce document
Ce document contient deux sections principales et une troisième section pour les notes. La première section explique comment utiliser les flux existants au sein d'une application. La deuxième section explique comment créer de nouveaux types de flux.
Types de flux
Il existe quatre types de flux fondamentaux dans Node.js :
Writable
: flux dans lesquels des données peuvent être écrites (par exemple,fs.createWriteStream()
).Readable
: flux à partir desquels des données peuvent être lues (par exemple,fs.createReadStream()
).Duplex
: flux qui sont à la foisReadable
etWritable
(par exemple,net.Socket
).Transform
: fluxDuplex
qui peuvent modifier ou transformer les données lorsqu'elles sont écrites et lues (par exemple,zlib.createDeflate()
).
De plus, ce module comprend les fonctions utilitaires stream.duplexPair()
, stream.pipeline()
, stream.finished()
stream.Readable.from()
et stream.addAbortSignal()
.
API Streams Promises
Ajouté dans : v15.0.0
L'API stream/promises
fournit un ensemble alternatif de fonctions utilitaires asynchrones pour les streams qui retournent des objets Promise
plutôt que d'utiliser des rappels. L'API est accessible via require('node:stream/promises')
ou require('node:stream').promises
.
stream.pipeline(source[, ...transforms], destination[, options])
stream.pipeline(streams[, options])
[Historique]
Version | Changements |
---|---|
v18.0.0, v17.2.0, v16.14.0 | Ajout de l'option end , qui peut être définie à false pour empêcher la fermeture automatique du stream de destination lorsque la source se termine. |
v15.0.0 | Ajouté dans : v15.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function>- Retourne: <Promise> | <AsyncIterable>
...transforms
<Stream> | <Function>source
<AsyncIterable>- Retourne: <Promise> | <AsyncIterable>
destination
<Stream> | <Function>source
<AsyncIterable>- Retourne: <Promise> | <AsyncIterable>
options
<Object> Options de Pipelinesignal
<AbortSignal>end
<boolean> Terminer le stream de destination lorsque le stream source se termine. Les streams de transformation sont toujours terminés, même si cette valeur estfalse
. Par défaut :true
.
Retourne: <Promise> Est résolue lorsque le pipeline est terminé.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
console.log('Pipeline réussi.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Pipeline réussi.')
Pour utiliser un AbortSignal
, passez-le dans un objet options, comme dernier argument. Lorsque le signal est annulé, destroy
sera appelé sur le pipeline sous-jacent, avec une AbortError
.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
const ac = new AbortController()
const signal = ac.signal
setImmediate(() => ac.abort())
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
signal,
})
}
run().catch(console.error) // AbortError
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
console.error(err) // AbortError
}
L'API pipeline
prend également en charge les générateurs asynchrones :
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // Travailler avec des chaînes plutôt que des `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
fs.createWriteStream('uppercase.txt')
)
console.log('Pipeline réussi.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // Travailler avec des chaînes plutôt que des `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
createWriteStream('uppercase.txt')
)
console.log('Pipeline réussi.')
N'oubliez pas de gérer l'argument signal
passé au générateur asynchrone. En particulier dans le cas où le générateur asynchrone est la source du pipeline (c.-à-d. le premier argument) ou le pipeline ne sera jamais terminé.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline réussi.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline réussi.')
L'API pipeline
fournit une version avec callback :
stream.finished(stream[, options])
[Historique]
Version | Modifications |
---|---|
v19.5.0, v18.14.0 | Ajout de la prise en charge de ReadableStream et WritableStream . |
v19.1.0, v18.13.0 | L'option cleanup a été ajoutée. |
v15.0.0 | Ajouté dans : v15.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> Un flux/flux web lisible et/ou inscriptible.options
<Object>error
<boolean> | <undefined>readable
<boolean> | <undefined>writable
<boolean> | <undefined>signal
<AbortSignal> | <undefined>cleanup
<boolean> | <undefined> Sitrue
, supprime les écouteurs enregistrés par cette fonction avant que la promesse ne soit remplie. Par défaut :false
.
Retourne : <Promise> Est satisfaite lorsque le flux n’est plus lisible ou inscriptible.
const { finished } = require('node:stream/promises')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('Le flux a fini de lire.')
}
run().catch(console.error)
rs.resume() // Vidanger le flux.
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'
const rs = createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('Le flux a fini de lire.')
}
run().catch(console.error)
rs.resume() // Vidanger le flux.
L’API finished
fournit également une version de rappel.
stream.finished()
laisse des écouteurs d’événements en suspens (en particulier 'error'
, 'end'
, 'finish'
et 'close'
) après que la promesse retournée a été résolue ou rejetée. La raison en est qu’il est possible que des événements 'error'
inattendus (en raison d’implémentations de flux incorrectes) ne provoquent pas de plantages inattendus. Si ce comportement n’est pas souhaité, alors options.cleanup
doit être défini sur true
:
await finished(rs, { cleanup: true })
Mode objet
Tous les flux créés par les API de Node.js fonctionnent exclusivement sur des chaînes, des objets <Buffer>, <TypedArray> et <DataView> :
- Les
Strings
et lesBuffers
sont les types les plus couramment utilisés avec les flux. TypedArray
etDataView
vous permettent de gérer les données binaires avec des types tels queInt32Array
ouUint8Array
. Lorsque vous écrivez un TypedArray ou un DataView dans un flux, Node.js traite les octets bruts.
Il est cependant possible que les implémentations de flux fonctionnent avec d'autres types de valeurs JavaScript (à l'exception de null
, qui a une fonction spéciale dans les flux). On considère que ces flux fonctionnent en "mode objet".
Les instances de flux sont basculées en mode objet à l'aide de l'option objectMode
lors de la création du flux. Il n'est pas sûr de tenter de basculer un flux existant en mode objet.
Mise en mémoire tampon
Les flux Writable
et Readable
stockent les données dans une mémoire tampon interne.
La quantité de données potentiellement mises en mémoire tampon dépend de l'option highWaterMark
transmise au constructeur du flux. Pour les flux normaux, l'option highWaterMark
spécifie un nombre total d'octets. Pour les flux fonctionnant en mode objet, la valeur highWaterMark
spécifie un nombre total d'objets. Pour les flux fonctionnant sur des chaînes (mais ne les décodant pas), highWaterMark
spécifie un nombre total d'unités de code UTF-16.
Les données sont mises en mémoire tampon dans les flux Readable
lorsque l'implémentation appelle stream.push(chunk)
. Si le consommateur du flux n'appelle pas stream.read()
, les données restent dans la file d'attente interne jusqu'à ce qu'elles soient consommées.
Une fois que la taille totale de la mémoire tampon de lecture interne atteint le seuil spécifié par highWaterMark
, le flux arrête temporairement de lire des données de la ressource sous-jacente jusqu'à ce que les données actuellement mises en mémoire tampon puissent être consommées (c'est-à-dire que le flux cesse d'appeler la méthode interne readable._read()
utilisée pour remplir la mémoire tampon de lecture).
Les données sont mises en mémoire tampon dans les flux Writable
lorsque la méthode writable.write(chunk)
est appelée de manière répétée. Tant que la taille totale de la mémoire tampon d'écriture interne est inférieure au seuil défini par highWaterMark
, les appels à writable.write()
renvoient true
. Une fois que la taille de la mémoire tampon interne atteint ou dépasse highWaterMark
, false
est renvoyé.
Un objectif clé de l'API stream
, en particulier de la méthode stream.pipe()
, est de limiter la mise en mémoire tampon des données à des niveaux acceptables, de sorte que les sources et les destinations de vitesses différentes ne surchargent pas la mémoire disponible.
L'option highWaterMark
est un seuil, pas une limite : elle détermine la quantité de données qu'un flux met en mémoire tampon avant d'arrêter de demander davantage de données. Elle n'impose pas de limitation stricte de la mémoire en général. Les implémentations de flux spécifiques peuvent choisir d'imposer des limites plus strictes, mais cela est facultatif.
Étant donné que les flux Duplex
et Transform
sont à la fois Readable
et Writable
, chacun maintient deux mémoires tampons internes distinctes utilisées pour la lecture et l'écriture, ce qui permet à chaque côté de fonctionner indépendamment de l'autre tout en maintenant un flux de données approprié et efficace. Par exemple, les instances de net.Socket
sont des flux Duplex
dont le côté Readable
permet de consommer les données reçues du socket et dont le côté Writable
permet d'écrire des données dans le socket. Étant donné que les données peuvent être écrites dans le socket à un débit plus rapide ou plus lent que les données reçues, chaque côté doit fonctionner (et mettre en mémoire tampon) indépendamment de l'autre.
Les mécanismes de la mise en mémoire tampon interne sont un détail d'implémentation interne et peuvent être modifiés à tout moment. Cependant, pour certaines implémentations avancées, les mémoires tampons internes peuvent être récupérées à l'aide de writable.writableBuffer
ou readable.readableBuffer
. L'utilisation de ces propriétés non documentées est déconseillée.
API pour les consommateurs de flux
Presque toutes les applications Node.js, même les plus simples, utilisent des flux d'une manière ou d'une autre. Voici un exemple d'utilisation de flux dans une application Node.js qui implémente un serveur HTTP :
const http = require('node:http')
const server = http.createServer((req, res) => {
// `req` est un http.IncomingMessage, qui est un flux lisible.
// `res` est un http.ServerResponse, qui est un flux écrivable.
let body = ''
// Obtenez les données sous forme de chaînes utf8.
// Si un encodage n'est pas défini, des objets Buffer seront reçus.
req.setEncoding('utf8')
// Les flux lisibles émettent des événements 'data' une fois qu'un écouteur est ajouté.
req.on('data', chunk => {
body += chunk
})
// L'événement 'end' indique que l'ensemble du corps a été reçu.
req.on('end', () => {
try {
const data = JSON.parse(body)
// Réécrivez quelque chose d'intéressant pour l'utilisateur :
res.write(typeof data)
res.end()
} catch (er) {
// oups ! mauvais json !
res.statusCode = 400
return res.end(`erreur : ${er.message}`)
}
})
})
server.listen(1337)
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// erreur : Unexpected token 'o', "not json" is not valid JSON
Les flux Writable
(tels que res
dans l'exemple) exposent des méthodes telles que write()
et end()
qui sont utilisées pour écrire des données dans le flux.
Les flux Readable
utilisent l'API EventEmitter
pour notifier le code d'application lorsque des données sont disponibles pour être lues à partir du flux. Ces données disponibles peuvent être lues à partir du flux de plusieurs manières.
Les flux Writable
et Readable
utilisent l'API EventEmitter
de diverses manières pour communiquer l'état actuel du flux.
Les flux Duplex
et Transform
sont à la fois Writable
et Readable
.
Les applications qui écrivent des données dans un flux ou qui consomment des données à partir d'un flux ne sont pas tenues d'implémenter directement les interfaces de flux et n'auront généralement aucune raison d'appeler require('node:stream')
.
Les développeurs souhaitant implémenter de nouveaux types de flux doivent se référer à la section API pour les implémenteurs de flux.
Flux d'écriture
Les flux d'écriture sont une abstraction pour une destination vers laquelle les données sont écrites.
Les exemples de flux Writable
incluent :
- Requêtes HTTP, sur le client
- Réponses HTTP, sur le serveur
- Flux d'écriture fs
- Flux zlib
- Flux crypto
- Sockets TCP
- Entrée standard du processus enfant
process.stdout
,process.stderr
Certains de ces exemples sont en fait des flux Duplex
qui implémentent l'interface Writable
.
Tous les flux Writable
implémentent l'interface définie par la classe stream.Writable
.
Bien que des instances spécifiques de flux Writable
puissent différer de diverses manières, tous les flux Writable
suivent le même modèle d'utilisation fondamental, comme illustré dans l'exemple ci-dessous :
const myStream = getWritableStreamSomehow()
myStream.write('some data')
myStream.write('some more data')
myStream.end('done writing data')
Classe : stream.Writable
Ajouté dans : v0.9.4
Événement : 'close'
[Historique]
Version | Changements |
---|---|
v10.0.0 | Ajout de l'option emitClose pour spécifier si 'close' est émis lors de la destruction. |
v0.9.4 | Ajouté dans : v0.9.4 |
L'événement 'close'
est émis lorsque le flux et l'une de ses ressources sous-jacentes (un descripteur de fichier, par exemple) ont été fermés. L'événement indique qu'il n'y aura plus d'événements émis et qu'aucun autre calcul n'aura lieu.
Un flux Writable
émettra toujours l'événement 'close'
s'il est créé avec l'option emitClose
.
Événement : 'drain'
Ajouté dans : v0.9.4
Si un appel à stream.write(chunk)
renvoie false
, l'événement 'drain'
sera émis lorsqu'il sera approprié de reprendre l'écriture de données dans le flux.
// Écrire les données dans le flux d'écriture fourni un million de fois.
// Soyez attentif à la contre-pression.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000
write()
function write() {
let ok = true
do {
i--
if (i === 0) {
// Dernière fois !
writer.write(data, encoding, callback)
} else {
// Voyons si nous devons continuer ou attendre.
// Ne passez pas le rappel, car nous n'avons pas encore terminé.
ok = writer.write(data, encoding)
}
} while (i > 0 && ok)
if (i > 0) {
// Il a fallu s'arrêter tôt !
// Écrire un peu plus une fois qu'il est vidé.
writer.once('drain', write)
}
}
}
Événement : 'error'
Ajouté dans : v0.9.4
L’événement 'error'
est émis si une erreur s’est produite lors de l’écriture ou de la transmission de données. La fonction de rappel de l’écouteur reçoit un seul argument Error
lorsqu’elle est appelée.
Le flux est fermé lorsque l’événement 'error'
est émis, sauf si l’option autoDestroy
a été définie sur false
lors de la création du flux.
Après 'error'
, aucun autre événement que 'close'
ne devrait être émis (y compris les événements 'error'
).
Événement : 'finish'
Ajouté dans : v0.9.4
L’événement 'finish'
est émis après que la méthode stream.end()
a été appelée et que toutes les données ont été vidées vers le système sous-jacent.
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
console.log('Toutes les écritures sont maintenant terminées.')
})
writer.end('C’est la fin\n')
Événement : 'pipe'
Ajouté dans : v0.9.4
src
<stream.Readable> flux source qui est redirigé vers cet élément inscriptible
L’événement 'pipe'
est émis lorsque la méthode stream.pipe()
est appelée sur un flux lisible, ajoutant cet élément inscriptible à son ensemble de destinations.
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
console.log('Quelque chose est en cours de redirection vers l’enregistreur.')
assert.equal(src, reader)
})
reader.pipe(writer)
Événement : 'unpipe'
Ajouté dans : v0.9.4
src
<stream.Readable> Le flux source qui a annulé la redirection de cet élément inscriptible
L’événement 'unpipe'
est émis lorsque la méthode stream.unpipe()
est appelée sur un flux Readable
, en supprimant cet élément Writable
de son ensemble de destinations.
Il est également émis au cas où ce flux Writable
émet une erreur lorsqu’un flux Readable
est redirigé vers lui.
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
console.log('Quelque chose a cessé d’être redirigé vers l’enregistreur.')
assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()
Ajouté dans la version : v0.11.2
La méthode writable.cork()
force toutes les données écrites à être mises en mémoire tampon. Les données mises en mémoire tampon sont vidées lorsque les méthodes stream.uncork()
ou stream.end()
sont appelées.
L'objectif principal de writable.cork()
est de prendre en compte une situation dans laquelle plusieurs petits morceaux sont écrits dans le flux en succession rapide. Au lieu de les transférer immédiatement vers la destination sous-jacente, writable.cork()
met en mémoire tampon tous les morceaux jusqu'à ce que writable.uncork()
soit appelé, ce qui les transmettra tous à writable._writev()
, s'il est présent. Cela empêche une situation de blocage en tête de ligne où les données sont mises en mémoire tampon en attendant que le premier petit morceau soit traité. Toutefois, l'utilisation de writable.cork()
sans implémenter writable._writev()
peut avoir un effet négatif sur le débit.
Voir aussi : writable.uncork()
, writable._writev()
.
writable.destroy([error])
[Historique]
Version | Modifications |
---|---|
v14.0.0 | Fonctionne comme une opération sans effet sur un flux qui a déjà été détruit. |
v8.0.0 | Ajouté dans la version : v8.0.0 |
Détruit le flux. Émet éventuellement un événement 'error'
et émet un événement 'close'
(sauf si emitClose
est défini sur false
). Après cet appel, le flux accessible en écriture est terminé et les appels suivants à write()
ou end()
génèrent une erreur ERR_STREAM_DESTROYED
. Il s'agit d'une manière destructrice et immédiate de détruire un flux. Les appels précédents à write()
peuvent ne pas avoir été vidés et peuvent déclencher une erreur ERR_STREAM_DESTROYED
. Utilisez end()
au lieu de destroy
si les données doivent être vidées avant la fermeture, ou attendez l'événement 'drain'
avant de détruire le flux.
const { Writable } = require('node:stream')
const myStream = new Writable()
const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.on('error', function wontHappen() {})
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED
Une fois que destroy()
a été appelé, tous les appels suivants seront des opérations sans effet et aucune autre erreur que celle de _destroy()
ne peut être émise en tant que 'error'
.
Les implémenteurs ne doivent pas remplacer cette méthode, mais plutôt implémenter writable._destroy()
.
writable.closed
Ajouté dans : v18.0.0
Est true
après l’émission de 'close'
.
writable.destroyed
Ajouté dans : v8.0.0
Est true
après l’appel à writable.destroy()
.
const { Writable } = require('node:stream')
const myStream = new Writable()
console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])
[Historique]
Version | Modifications |
---|---|
v22.0.0, v20.13.0 | L’argument chunk peut maintenant être une instance TypedArray ou DataView . |
v15.0.0 | Le callback est invoqué avant 'finish' ou en cas d’erreur. |
v14.0.0 | Le callback est invoqué si 'finish' ou 'error' est émis. |
v10.0.0 | Cette méthode renvoie maintenant une référence à writable . |
v8.0.0 | L’argument chunk peut maintenant être une instance Uint8Array . |
v0.9.4 | Ajouté dans : v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> Données optionnelles à écrire. Pour les flux ne fonctionnant pas en mode objet,chunk
doit être un <string>, un <Buffer>, un <TypedArray> ou un <DataView>. Pour les flux en mode objet,chunk
peut être n’importe quelle valeur JavaScript autre quenull
.encoding
<string> L’encodage sichunk
est une chaîne de caractèrescallback
<Function> Rappel lorsque le flux est terminé.- Retourne : <this>
L’appel de la méthode writable.end()
signale qu’il n’y aura plus de données écrites dans le Writable
. Les arguments optionnels chunk
et encoding
permettent d’écrire un dernier bloc de données supplémentaire juste avant de fermer le flux.
L’appel de la méthode stream.write()
après avoir appelé stream.end()
lèvera une erreur.
// Écrire 'hello, ' puis terminer par 'world!'.
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hello, ')
file.end('world!')
// Il n’est plus autorisé d’écrire maintenant !
writable.setDefaultEncoding(encoding)
[Historique]
Version | Modifications |
---|---|
v6.1.0 | Cette méthode renvoie désormais une référence à writable . |
v0.11.15 | Ajoutée dans : v0.11.15 |
La méthode writable.setDefaultEncoding()
définit l' encoding
par défaut pour un flux Writable
.
writable.uncork()
Ajoutée dans : v0.11.2
La méthode writable.uncork()
vide toutes les données mises en mémoire tampon depuis l'appel à stream.cork()
.
Lors de l'utilisation de writable.cork()
et writable.uncork()
pour gérer la mise en mémoire tampon des écritures dans un flux, différez les appels à writable.uncork()
en utilisant process.nextTick()
. Cela permet le traitement par lots de tous les appels writable.write()
qui se produisent au cours d'une phase donnée de la boucle d'événements de Node.js.
stream.cork()
stream.write('quelques ')
stream.write('données ')
process.nextTick(() => stream.uncork())
Si la méthode writable.cork()
est appelée plusieurs fois sur un flux, le même nombre d'appels à writable.uncork()
doit être effectué pour vider les données mises en mémoire tampon.
stream.cork()
stream.write('quelques ')
stream.cork()
stream.write('données ')
process.nextTick(() => {
stream.uncork()
// Les données ne seront pas vidées tant que uncork() n'est pas appelé une seconde fois.
stream.uncork()
})
Voir également : writable.cork()
.
writable.writable
Ajoutée dans : v11.4.0
Est true
s'il est sûr d'appeler writable.write()
, ce qui signifie que le flux n'a pas été détruit, n'a pas généré d'erreur ou n'est pas terminé.
writable.writableAborted
Ajoutée dans : v18.0.0, v16.17.0
[Stable : 1 - Expérimental]
Stable : 1 Stabilité : 1 - Expérimental
Retourne si le flux a été détruit ou a généré une erreur avant d'émettre 'finish'
.
writable.writableEnded
Ajouté dans : v12.9.0
Est true
après que writable.end()
a été appelé. Cette propriété n'indique pas si les données ont été vidées, pour cela utilisez plutôt writable.writableFinished
.
writable.writableCorked
Ajouté dans : v13.2.0, v12.16.0
Nombre de fois que writable.uncork()
doit être appelé pour décorker entièrement le flux.
writable.errored
Ajouté dans : v18.0.0
Retourne une erreur si le flux a été détruit avec une erreur.
writable.writableFinished
Ajouté dans : v12.6.0
Est mis à true
immédiatement avant que l'événement 'finish'
soit émis.
writable.writableHighWaterMark
Ajouté dans : v9.3.0
Retourne la valeur de highWaterMark
passée lors de la création de ce Writable
.
writable.writableLength
Ajouté dans : v9.4.0
Cette propriété contient le nombre d'octets (ou d'objets) dans la file d'attente prêts à être écrits. La valeur fournit des données d'introspection concernant l'état de highWaterMark
.
writable.writableNeedDrain
Ajouté dans : v15.2.0, v14.17.0
Est true
si le tampon du flux est plein et que le flux émettra 'drain'
.
writable.writableObjectMode
Ajouté dans : v12.3.0
Accesseur pour la propriété objectMode
d’un flux Writable
donné.
writable[Symbol.asyncDispose]()
Ajouté dans : v22.4.0, v20.16.0
[Stable: 1 - Experimental]
Stable: 1 Stabilité : 1 - Expérimental
Appelle writable.destroy()
avec une AbortError
et renvoie une promesse qui est tenue lorsque le flux est terminé.
writable.write(chunk[, encoding][, callback])
[Historique]
Version | Changements |
---|---|
v22.0.0, v20.13.0 | L’argument chunk peut maintenant être une instance TypedArray ou DataView . |
v8.0.0 | L’argument chunk peut maintenant être une instance Uint8Array . |
v6.0.0 | Le passage de null comme paramètre chunk sera toujours considéré comme invalide maintenant, même en mode objet. |
v0.9.4 | Ajouté dans : v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> Données facultatives à écrire. Pour les flux ne fonctionnant pas en mode objet,chunk
doit être un <string>, un <Buffer>, un <TypedArray> ou une <DataView>. Pour les flux en mode objet,chunk
peut être n’importe quelle valeur JavaScript autre quenull
.encoding
<string> | <null> L’encodage, sichunk
est une chaîne. Par défaut :'utf8'
callback
<Function> Fonction de rappel lorsque ce bloc de données est vidé.- Retourne : <boolean>
false
si le flux souhaite que le code appelant attende que l’événement'drain'
soit émis avant de continuer à écrire des données supplémentaires ; sinontrue
.
La méthode writable.write()
écrit des données dans le flux et appelle la callback
fournie une fois que les données ont été entièrement traitées. Si une erreur se produit, la callback
sera appelée avec l’erreur comme premier argument. La callback
est appelée de manière asynchrone et avant que 'error'
ne soit émis.
La valeur de retour est true
si la mémoire tampon interne est inférieure à la highWaterMark
configurée lors de la création du flux après avoir admis chunk
. Si false
est retourné, d’autres tentatives d’écriture de données dans le flux doivent s’arrêter jusqu’à ce que l’événement 'drain'
soit émis.
Tant qu’un flux ne se vide pas, les appels à write()
mettront chunk
en mémoire tampon et retourneront false. Une fois que tous les blocs actuellement mis en mémoire tampon sont vidés (acceptés pour la livraison par le système d’exploitation), l’événement 'drain'
sera émis. Une fois que write()
retourne false, n’écrivez plus de blocs jusqu’à ce que l’événement 'drain'
soit émis. Bien qu’il soit permis d’appeler write()
sur un flux qui ne se vide pas, Node.js mettra en mémoire tampon tous les blocs écrits jusqu’à ce que l’utilisation maximale de la mémoire se produise, auquel cas il abandonnera inconditionnellement. Même avant qu’il n’abandonne, une forte utilisation de la mémoire entraînera de mauvaises performances du collecteur de déchets et un RSS élevé (qui n’est généralement pas restitué au système, même après que la mémoire n’est plus nécessaire). Étant donné que les sockets TCP peuvent ne jamais se vider si le pair distant ne lit pas les données, l’écriture dans un socket qui ne se vide pas peut entraîner une vulnérabilité exploitable à distance.
L’écriture de données pendant que le flux ne se vide pas est particulièrement problématique pour un Transform
, car les flux Transform
sont mis en pause par défaut jusqu’à ce qu’ils soient connectés ou qu’un gestionnaire d’événements 'data'
ou 'readable'
soit ajouté.
Si les données à écrire peuvent être générées ou récupérées à la demande, il est recommandé d’encapsuler la logique dans un Readable
et d’utiliser stream.pipe()
. Toutefois, si l’appel à write()
est préféré, il est possible de respecter la contre-pression et d’éviter les problèmes de mémoire en utilisant l’événement 'drain'
:
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb)
} else {
process.nextTick(cb)
}
}
// Attendre que cb soit appelée avant de faire d’autres écritures.
write('hello', () => {
console.log('Écriture terminée, faire plus d’écritures maintenant.')
})
Un flux Writable
en mode objet ignorera toujours l’argument encoding
.
Flux de lecture
Les flux de lecture sont une abstraction pour une source à partir de laquelle les données sont consommées.
Voici des exemples de flux Readable
:
- Réponses HTTP, côté client
- Requêtes HTTP, côté serveur
- Flux de lecture fs
- Flux zlib
- Flux crypto
- Sockets TCP
- stdout et stderr des processus enfants
process.stdin
Tous les flux Readable
implémentent l’interface définie par la classe stream.Readable
.
Deux modes de lecture
Les flux Readable
fonctionnent de fait dans l’un des deux modes suivants : flux continu et pause. Ces modes sont distincts du mode objet. Un flux Readable
peut être en mode objet ou non, qu’il soit en mode flux continu ou en mode pause.
- En mode flux continu, les données sont lues automatiquement à partir du système sous-jacent et fournies à une application aussi rapidement que possible en utilisant des événements via l’interface
EventEmitter
. - En mode pause, la méthode
stream.read()
doit être appelée explicitement pour lire des fragments de données depuis le flux.
Tous les flux Readable
commencent en mode pause, mais peuvent être basculés en mode flux continu de l’une des manières suivantes :
- En ajoutant un gestionnaire d’événements
'data'
. - En appelant la méthode
stream.resume()
. - En appelant la méthode
stream.pipe()
pour envoyer les données vers un fluxWritable
.
Le flux Readable
peut revenir en mode pause en utilisant l’une des méthodes suivantes :
- S’il n’y a pas de destinations de flux, en appelant la méthode
stream.pause()
. - S’il existe des destinations de flux, en supprimant toutes les destinations de flux. Plusieurs destinations de flux peuvent être supprimées en appelant la méthode
stream.unpipe()
.
Le concept important à retenir est qu’un flux Readable
ne générera pas de données tant qu’un mécanisme permettant de consommer ou d’ignorer ces données n’est pas fourni. Si le mécanisme de consommation est désactivé ou supprimé, le flux Readable
tentera d’arrêter la génération de données.
Pour des raisons de rétrocompatibilité, la suppression des gestionnaires d’événements 'data'
ne mettra pas automatiquement le flux en pause. De plus, s’il existe des destinations de flux, l’appel de la fonction stream.pause()
ne garantira pas que le flux restera en pause une fois que ces destinations seront vidées et demanderont plus de données.
Si un flux Readable
est basculé en mode flux continu et qu’il n’y a pas de consommateurs disponibles pour gérer les données, ces données seront perdues. Cela peut se produire, par exemple, lorsque la méthode readable.resume()
est appelée sans qu’un écouteur soit attaché à l’événement 'data'
, ou lorsqu’un gestionnaire d’événements 'data'
est supprimé du flux.
L’ajout d’un gestionnaire d’événements 'readable'
interrompt automatiquement le flux du flux, et les données doivent être consommées via readable.read()
. Si le gestionnaire d’événements 'readable'
est supprimé, le flux recommencera à circuler s’il existe un gestionnaire d’événements 'data'
.
Trois états
Les "deux modes" de fonctionnement d'un flux Readable
sont une abstraction simplifiée de la gestion d'état interne plus complexe qui se produit au sein de l'implémentation du flux Readable
.
Plus précisément, à tout moment donné, chaque Readable
est dans l'un des trois états possibles :
readable.readableFlowing === null
readable.readableFlowing === false
readable.readableFlowing === true
Lorsque readable.readableFlowing
est null
, aucun mécanisme de consommation des données du flux n'est fourni. Par conséquent, le flux ne générera pas de données. Dans cet état, l'attachement d'un écouteur pour l'événement 'data'
, l'appel de la méthode readable.pipe()
ou l'appel de la méthode readable.resume()
fera passer readable.readableFlowing
à true
, ce qui fera que le Readable
commence à émettre activement des événements au fur et à mesure que les données sont générées.
L'appel de readable.pause()
, readable.unpipe()
ou la réception d'une contre-pression fera passer readable.readableFlowing
à false
, ce qui interrompra temporairement le flux d'événements, mais n'interrompra pas la génération de données. Dans cet état, l'attachement d'un écouteur pour l'événement 'data'
ne fera pas passer readable.readableFlowing
à true
.
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()
pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing est maintenant false.
pass.on('data', chunk => {
console.log(chunk.toString())
})
// readableFlowing est toujours false.
pass.write('ok') // N'émettra pas 'data'.
pass.resume() // Doit être appelé pour que le flux émette 'data'.
// readableFlowing est maintenant true.
Lorsque readable.readableFlowing
est false
, des données peuvent s'accumuler dans le tampon interne du flux.
Choisir un seul style d'API
L'API de flux Readable
a évolué à travers plusieurs versions de Node.js et fournit plusieurs méthodes de consommation des données de flux. En général, les développeurs doivent choisir l'une des méthodes de consommation des données et ne doivent jamais utiliser plusieurs méthodes pour consommer les données d'un seul flux. Plus précisément, l'utilisation d'une combinaison de on('data')
, on('readable')
, pipe()
ou d'itérateurs asynchrones pourrait conduire à un comportement non intuitif.
Classe : stream.Readable
Ajouté dans : v0.9.4
Événement : 'close'
[Historique]
Version | Modifications |
---|---|
v10.0.0 | Ajout de l’option emitClose pour spécifier si 'close' est émis lors de la destruction. |
v0.9.4 | Ajouté dans : v0.9.4 |
L’événement 'close'
est émis lorsque le flux et toutes ses ressources sous-jacentes (un descripteur de fichier, par exemple) ont été fermés. L’événement indique qu’aucun autre événement ne sera émis et qu’aucun autre calcul n’aura lieu.
Un flux Readable
émettra toujours l’événement 'close'
s’il est créé avec l’option emitClose
.
Événement : 'data'
Ajouté dans : v0.9.4
chunk
<Buffer> | <string> | <any> Le bloc de données. Pour les flux qui ne fonctionnent pas en mode objet, le bloc sera une chaîne ou unBuffer
. Pour les flux qui sont en mode objet, le bloc peut être n’importe quelle valeur JavaScript autre quenull
.
L’événement 'data'
est émis chaque fois que le flux cède la propriété d’un bloc de données à un consommateur. Cela peut se produire chaque fois que le flux est commuté en mode fluide en appelant readable.pipe()
, readable.resume()
ou en attachant une fonction de rappel d’écouteur à l’événement 'data'
. L’événement 'data'
sera également émis chaque fois que la méthode readable.read()
est appelée et qu’un bloc de données est disponible pour être retourné.
L’attache d’un écouteur d’événements 'data'
à un flux qui n’a pas été explicitement mis en pause commutera le flux en mode fluide. Les données seront alors transmises dès qu’elles seront disponibles.
La fonction de rappel d’écouteur se verra transmettre le bloc de données sous forme de chaîne si un codage par défaut a été spécifié pour le flux à l’aide de la méthode readable.setEncoding()
; sinon, les données seront transmises sous forme de Buffer
.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Reçu ${chunk.length} octets de données.`)
})
Événement : 'end'
Ajouté dans : v0.9.4
L’événement 'end'
est émis lorsqu’il n’y a plus de données à consommer dans le flux.
L’événement 'end'
ne sera pas émis tant que les données ne seront pas complètement consommées. Cela peut être réalisé en faisant passer le flux en mode flux continu, ou en appelant stream.read()
à plusieurs reprises jusqu’à ce que toutes les données soient consommées.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Reçu ${chunk.length} octets de données.`)
})
readable.on('end', () => {
console.log('Il n’y aura plus de données.')
})
Événement : 'error'
Ajouté dans : v0.9.4
L’événement 'error'
peut être émis par une implémentation Readable
à tout moment. En général, cela peut se produire si le flux sous-jacent n’est pas en mesure de générer des données en raison d’une défaillance interne sous-jacente ou lorsqu’une implémentation de flux tente de transmettre un bloc de données non valide.
Le rappel du listener recevra un seul objet Error
.
Événement : 'pause'
Ajouté dans : v0.9.4
L’événement 'pause'
est émis lorsque stream.pause()
est appelé et que readableFlowing
n’est pas false
.
Événement : 'readable'
[Historique]
Version | Modifications |
---|---|
v10.0.0 | 'readable' est toujours émis dans le prochain tick après l’appel de .push() . |
v10.0.0 | L’utilisation de 'readable' nécessite l’appel de .read() . |
v0.9.4 | Ajouté dans : v0.9.4 |
L’événement 'readable'
est émis lorsqu’il y a des données disponibles à lire à partir du flux, jusqu’à la limite maximale configurée (state.highWaterMark
). En fait, il indique que le flux contient de nouvelles informations dans la mémoire tampon. Si des données sont disponibles dans cette mémoire tampon, stream.read()
peut être appelé pour récupérer ces données. De plus, l’événement 'readable'
peut également être émis lorsque la fin du flux a été atteinte.
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
// Il y a maintenant des données à lire.
let data
while ((data = this.read()) !== null) {
console.log(data)
}
})
Si la fin du flux a été atteinte, l’appel de stream.read()
renverra null
et déclenchera l’événement 'end'
. Cela est également vrai s’il n’y a jamais eu de données à lire. Par exemple, dans l’exemple suivant, foo.txt
est un fichier vide :
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
console.log('end')
})
Le résultat de l’exécution de ce script est :
$ node test.js
readable: null
end
Dans certains cas, la connexion d’un listener pour l’événement 'readable'
entraînera la lecture d’une certaine quantité de données dans une mémoire tampon interne.
En général, les mécanismes readable.pipe()
et d’événement 'data'
sont plus faciles à comprendre que l’événement 'readable'
. Cependant, la gestion de 'readable'
pourrait entraîner une augmentation du débit.
Si 'readable'
et 'data'
sont utilisés en même temps, 'readable'
est prioritaire dans le contrôle du flux, c’est-à-dire que 'data'
ne sera émis que lorsque stream.read()
sera appelé. La propriété readableFlowing
deviendrait false
. S’il existe des listeners 'data'
lorsque 'readable'
est supprimé, le flux commencera à circuler, c’est-à-dire que les événements 'data'
seront émis sans appeler .resume()
.
Événement : 'resume'
Ajouté dans : v0.9.4
L’événement 'resume'
est émis lorsque stream.resume()
est appelé et que readableFlowing
n’est pas true
.
readable.destroy([error])
[Historique]
Version | Modifications |
---|---|
v14.0.0 | Fonctionne comme une opération sans effet sur un flux déjà détruit. |
v8.0.0 | Ajouté dans : v8.0.0 |
Détruit le flux. Émet éventuellement un événement 'error'
, et émet un événement 'close'
(sauf si emitClose
est défini sur false
). Après cet appel, le flux lisible libérera toutes les ressources internes et les appels suivants à push()
seront ignorés.
Une fois que destroy()
a été appelé, tous les appels ultérieurs seront une opération sans effet et aucune autre erreur que celles de _destroy()
ne pourra être émise comme 'error'
.
Les implémenteurs ne doivent pas remplacer cette méthode, mais plutôt implémenter readable._destroy()
.
readable.closed
Ajouté dans : v18.0.0
Est true
après l’émission de 'close'
.
readable.destroyed
Ajouté dans : v8.0.0
Est true
après l’appel de readable.destroy()
.
readable.isPaused()
Ajouté dans : v0.11.14
- Retourne : <boolean>
La méthode readable.isPaused()
renvoie l’état de fonctionnement actuel du Readable
. Ceci est principalement utilisé par le mécanisme qui sous-tend la méthode readable.pipe()
. Dans la plupart des cas typiques, il n’y aura aucune raison d’utiliser cette méthode directement.
const readable = new stream.Readable()
readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()
Ajouté dans : v0.9.4
- Retourne : <this>
La méthode readable.pause()
amènera un flux en mode fluide à arrêter l'émission d'événements 'data'
, ce qui le fera sortir du mode fluide. Toutes les données qui deviennent disponibles resteront dans le tampon interne.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Reçu ${chunk.length} octets de données.`)
readable.pause()
console.log('Il n’y aura pas de données supplémentaires pendant 1 seconde.')
setTimeout(() => {
console.log('Maintenant, les données vont recommencer à circuler.')
readable.resume()
}, 1000)
})
La méthode readable.pause()
n’a aucun effet s’il existe un écouteur d’événement 'readable'
.
readable.pipe(destination[, options])
Ajouté dans : v0.9.4
destination
<stream.Writable> La destination pour l’écriture des donnéesoptions
<Object> Options de pipeend
<boolean> Mettre fin à l’écriture lorsque la lecture est terminée. Par défaut :true
.
Retourne : <stream.Writable> La destination, ce qui permet une chaîne de pipes si elle s’agit d’un flux
Duplex
ou d’un fluxTransform
La méthode readable.pipe()
rattache un flux Writable
au readable
, ce qui l’amène à basculer automatiquement en mode fluide et à envoyer toutes ses données vers le Writable
rattaché. Le flux de données sera géré automatiquement afin que le flux Writable
de destination ne soit pas submergé par un flux Readable
plus rapide.
L’exemple suivant dirige toutes les données du readable
vers un fichier nommé file.txt
:
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Toutes les données de readable sont dirigées vers 'file.txt'.
readable.pipe(writable)
Il est possible de rattacher plusieurs flux Writable
à un seul flux Readable
.
La méthode readable.pipe()
retourne une référence vers le flux de destination, ce qui permet de configurer des chaînes de flux mis en pipeline :
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)
Par défaut, stream.end()
est appelé sur le flux Writable
de destination lorsque le flux Readable
source émet 'end'
, de sorte que la destination n’est plus accessible en écriture. Pour désactiver ce comportement par défaut, l’option end
peut être passée avec la valeur false
, ce qui a pour effet de laisser le flux de destination ouvert :
reader.pipe(writer, { end: false })
reader.on('end', () => {
writer.end('Au revoir\n')
})
Un avertissement important est que si le flux Readable
émet une erreur durant le traitement, la destination Writable
n’est pas fermée automatiquement. En cas d’erreur, il sera nécessaire de fermer manuellement chaque flux afin d’éviter les fuites de mémoire.
Les flux Writable
process.stderr
et process.stdout
ne sont jamais fermés tant que le processus Node.js n’est pas terminé, quelles que soient les options spécifiées.
readable.read([size])
Ajouté dans : v0.9.4
size
<number> Argument optionnel pour spécifier la quantité de données à lire.- Retourne : <string> | <Buffer> | <null> | <any>
La méthode readable.read()
lit les données du tampon interne et les renvoie. Si aucune donnée n’est disponible pour être lue, null
est retourné. Par défaut, les données sont retournées sous forme d’objet Buffer
, sauf si un encodage a été spécifié à l’aide de la méthode readable.setEncoding()
ou si le flux fonctionne en mode objet.
L’argument optionnel size
spécifie un nombre spécifique d’octets à lire. Si size
octets ne sont pas disponibles pour être lus, null
sera retourné sauf si le flux s’est terminé, auquel cas toutes les données restantes dans le tampon interne seront retournées.
Si l’argument size
n’est pas spécifié, toutes les données contenues dans le tampon interne seront retournées.
L’argument size
doit être inférieur ou égal à 1 Gio.
La méthode readable.read()
doit être appelée uniquement sur les flux Readable
fonctionnant en mode pause. En mode flux, readable.read()
est appelé automatiquement jusqu’à ce que le tampon interne soit entièrement vidé.
const readable = getReadableStreamSomehow()
// 'readable' peut être déclenché plusieurs fois car les données sont mises en mémoire tampon
readable.on('readable', () => {
let chunk
console.log('Le flux est lisible (nouvelles données reçues dans le tampon)')
// Utiliser une boucle pour s'assurer de lire toutes les données actuellement disponibles
while (null !== (chunk = readable.read())) {
console.log(`Lecture de ${chunk.length} octets de données...`)
}
})
// 'end' sera déclenché une fois lorsqu'il n'y aura plus de données disponibles
readable.on('end', () => {
console.log('Fin du flux atteinte.')
})
Chaque appel à readable.read()
renvoie un bloc de données ou null
, ce qui signifie qu’il n’y a plus de données à lire à ce moment-là. Ces blocs ne sont pas automatiquement concaténés. Étant donné qu’un seul appel à read()
ne renvoie pas toutes les données, l’utilisation d’une boucle while peut être nécessaire pour lire continuellement les blocs jusqu’à ce que toutes les données soient récupérées. Lors de la lecture d’un fichier volumineux, .read()
peut renvoyer temporairement null
, ce qui indique qu’il a consommé tout le contenu mis en mémoire tampon, mais qu’il peut y avoir d’autres données à mettre en mémoire tampon. Dans de tels cas, un nouvel événement 'readable'
est émis une fois qu’il y a plus de données dans le tampon, et l’événement 'end'
signifie la fin de la transmission des données.
Par conséquent, pour lire l’intégralité du contenu d’un fichier à partir d’un readable
, il est nécessaire de collecter des blocs sur plusieurs événements 'readable'
:
const chunks = []
readable.on('readable', () => {
let chunk
while (null !== (chunk = readable.read())) {
chunks.push(chunk)
}
})
readable.on('end', () => {
const content = chunks.join('')
})
Un flux Readable
en mode objet renverra toujours un seul élément d’un appel à readable.read(size)
, quelle que soit la valeur de l’argument size
.
Si la méthode readable.read()
renvoie un bloc de données, un événement 'data'
sera également émis.
L’appel à stream.read([size])
après l’émission de l’événement 'end'
renverra null
. Aucune erreur d’exécution ne sera levée.
readable.readable
Ajouté dans : v11.4.0
Est true
s'il est sûr d'appeler readable.read()
, ce qui signifie que le flux n'a pas été détruit ou n'a pas émis 'error'
ou 'end'
.
readable.readableAborted
Ajouté dans : v16.8.0
[Stable: 1 - Expérimental]
Stable: 1 Stabilité: 1 - Expérimental
Renvoie si le flux a été détruit ou a généré une erreur avant d'émettre 'end'
.
readable.readableDidRead
Ajouté dans : v16.7.0, v14.18.0
[Stable: 1 - Expérimental]
Stable: 1 Stabilité: 1 - Expérimental
Renvoie si 'data'
a été émis.
readable.readableEncoding
Ajouté dans : v12.7.0
Getter pour la propriété encoding
d'un flux Readable
donné. La propriété encoding
peut être définie en utilisant la méthode readable.setEncoding()
.
readable.readableEnded
Ajouté dans : v12.9.0
Devient true
lorsque l'événement 'end'
est émis.
readable.errored
Ajouté dans : v18.0.0
Renvoie une erreur si le flux a été détruit avec une erreur.
readable.readableFlowing
Ajouté dans : v9.4.0
Cette propriété reflète l'état actuel d'un flux Readable
tel que décrit dans la section Trois états.
readable.readableHighWaterMark
Ajouté dans : v9.3.0
Retourne la valeur de highWaterMark
passée lors de la création de ce Readable
.
readable.readableLength
Ajouté dans : v9.4.0
Cette propriété contient le nombre d’octets (ou d’objets) dans la file d’attente prêts à être lus. La valeur fournit des données d’introspection concernant l’état du highWaterMark
.
readable.readableObjectMode
Ajouté dans : v12.3.0
Getter pour la propriété objectMode
d’un flux Readable
donné.
readable.resume()
[Historique]
Version | Modifications |
---|---|
v10.0.0 | resume() n’a aucun effet s’il y a un écouteur d’événement 'readable' . |
v0.9.4 | Ajouté dans : v0.9.4 |
- Retourne : <this>
La méthode readable.resume()
amène un flux Readable
explicitement mis en pause à reprendre l’émission d’événements 'data'
, en basculant le flux en mode de flux.
La méthode readable.resume()
peut être utilisée pour consommer entièrement les données d’un flux sans réellement traiter aucune de ces données :
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Atteint la fin, mais n’a rien lu.')
})
La méthode readable.resume()
n’a aucun effet s’il y a un écouteur d’événement 'readable'
.
readable.setEncoding(encoding)
Ajouté dans : v0.9.4
La méthode readable.setEncoding()
définit l’encodage de caractères pour les données lues à partir du flux Readable
.
Par défaut, aucun encodage n’est attribué et les données du flux sont retournées sous forme d’objets Buffer
. La définition d’un encodage entraîne le retour des données du flux sous forme de chaînes de l’encodage spécifié plutôt que d’objets Buffer
. Par exemple, l’appel de readable.setEncoding('utf8')
entraînera l’interprétation des données de sortie comme des données UTF-8 et leur transmission sous forme de chaînes. L’appel de readable.setEncoding('hex')
entraînera l’encodage des données au format de chaîne hexadécimal.
Le flux Readable
gérera correctement les caractères multi-octets délivrés par le flux qui deviendraient autrement incorrectement décodés s’ils étaient simplement extraits du flux sous forme d’objets Buffer
.
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
assert.equal(typeof chunk, 'string')
console.log('Obtenu %d caractères de données de chaîne :', chunk.length)
})
readable.unpipe([destination])
Ajouté dans : v0.9.4
destination
<stream.Writable> Flux spécifique optionnel à déconnecter- Retourne : <this>
La méthode readable.unpipe()
détache un flux Writable
précédemment attaché en utilisant la méthode stream.pipe()
.
Si destination
n’est pas spécifié, alors tous les pipes sont détachés.
Si destination
est spécifié, mais qu’aucun pipe n’est configuré pour celui-ci, alors la méthode ne fait rien.
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Toutes les données de readable vont dans 'file.txt',
// mais seulement pendant la première seconde.
readable.pipe(writable)
setTimeout(() => {
console.log('Arrêter d’écrire dans file.txt.')
readable.unpipe(writable)
console.log('Fermer manuellement le flux de fichier.')
writable.end()
}, 1000)
readable.unshift(chunk[, encoding])
[Historique]
Version | Modifications |
---|---|
v22.0.0, v20.13.0 | L’argument chunk peut maintenant être une instance TypedArray ou DataView . |
v8.0.0 | L’argument chunk peut maintenant être une instance Uint8Array . |
v0.9.11 | Ajouté dans : v0.9.11 |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Bloc de données à décaler vers la file d’attente de lecture. Pour les flux ne fonctionnant pas en mode objet,chunk
doit être une <string>, <Buffer>, <TypedArray>, <DataView> ounull
. Pour les flux en mode objet,chunk
peut être n’importe quelle valeur JavaScript.encoding
<string> Encodage des blocs de chaîne. Doit être un encodageBuffer
valide, tel que'utf8'
ou'ascii'
.
Le fait de transmettre chunk
comme null
signale la fin du flux (EOF) et se comporte de la même manière que readable.push(null)
, après quoi plus aucune donnée ne peut être écrite. Le signal EOF est placé à la fin du tampon et toutes les données mises en mémoire tampon seront toujours vidées.
La méthode readable.unshift()
repousse un bloc de données dans le tampon interne. Ceci est utile dans certaines situations où un flux est consommé par un code qui a besoin de « ne pas consommer » une certaine quantité de données qu’il a extraites de manière optimiste de la source, afin que les données puissent être transmises à une autre partie.
La méthode stream.unshift(chunk)
ne peut pas être appelée après que l’événement 'end'
a été émis, sinon une erreur d’exécution sera levée.
Les développeurs qui utilisent stream.unshift()
devraient souvent envisager de passer à l’utilisation d’un flux Transform
. Consultez la section API pour les implémenteurs de flux pour plus d’informations.
// Extraire un en-tête délimité par \n\n.
// Utiliser unshift() si nous en obtenons trop.
// Appeler le rappel avec (erreur, en-tête, flux).
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
stream.on('error', callback)
stream.on('readable', onReadable)
const decoder = new StringDecoder('utf8')
let header = ''
function onReadable() {
let chunk
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk)
if (str.includes('\n\n')) {
// En-tête trouvée.
const split = str.split(/\n\n/)
header += split.shift()
const remaining = split.join('\n\n')
const buf = Buffer.from(remaining, 'utf8')
stream.removeListener('error', callback)
// Supprimer l’écouteur 'readable' avant de faire unshift.
stream.removeListener('readable', onReadable)
if (buf.length) stream.unshift(buf)
// Maintenant, le corps du message peut être lu depuis le flux.
callback(null, header, stream)
return
}
// Lecture de l’en-tête en cours.
header += str
}
}
}
Contrairement à stream.push(chunk)
, stream.unshift(chunk)
ne terminera pas le processus de lecture en réinitialisant l’état de lecture interne du flux. Cela peut provoquer des résultats inattendus si readable.unshift()
est appelé pendant une lecture (c’est-à-dire à partir d’une implémentation de stream._read()
sur un flux personnalisé). Suivre l’appel à readable.unshift()
avec un stream.push('')
immédiat réinitialisera l’état de lecture de manière appropriée, cependant, il est préférable d’éviter d’appeler readable.unshift()
pendant le processus d’exécution d’une lecture.
readable.wrap(stream)
Ajouté dans : v0.9.4
Avant Node.js 0.10, les flux n'implémentaient pas l'intégralité de l'API du module node:stream
telle qu'elle est définie actuellement. (Voir Compatibilité pour plus d'informations).
Lors de l'utilisation d'une ancienne bibliothèque Node.js qui émet des événements 'data'
et qui possède une méthode stream.pause()
qui n'est qu'informative, la méthode readable.wrap()
peut être utilisée pour créer un flux Readable
qui utilise l'ancien flux comme source de données.
Il sera rarement nécessaire d'utiliser readable.wrap()
, mais la méthode a été fournie pour faciliter l'interaction avec les anciennes applications et bibliothèques Node.js.
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)
myReader.on('readable', () => {
myReader.read() // etc.
})
readable[Symbol.asyncIterator]()
[Historique]
Version | Modifications |
---|---|
v11.14.0 | La prise en charge de Symbol.asyncIterator n'est plus expérimentale. |
v10.0.0 | Ajouté dans : v10.0.0 |
- Retourne : <AsyncIterator> pour consommer entièrement le flux.
const fs = require('node:fs')
async function print(readable) {
readable.setEncoding('utf8')
let data = ''
for await (const chunk of readable) {
data += chunk
}
console.log(data)
}
print(fs.createReadStream('file')).catch(console.error)
Si la boucle se termine par un break
, un return
ou un throw
, le flux sera détruit. En d'autres termes, itérer sur un flux consommera entièrement le flux. Le flux sera lu en blocs de taille égale à l'option highWaterMark
. Dans l'exemple de code ci-dessus, les données seront dans un seul bloc si le fichier contient moins de 64 Kio de données, car aucune option highWaterMark
n'est fournie à fs.createReadStream()
.
readable[Symbol.asyncDispose]()
Ajouté dans : v20.4.0, v18.18.0
[Stable: 1 - Expérimental]
Stable: 1 Stabilité: 1 - Expérimental
Appelle readable.destroy()
avec une AbortError
et renvoie une promesse qui se réalise lorsque le flux est terminé.
readable.compose(stream[, options])
Ajouté dans : v19.1.0, v18.13.0
[Stable: 1 - Expérimental]
Stable: 1 Stabilité: 1 - Expérimental
stream
<Stream> | <Iterable> | <AsyncIterable> | <Function>options
<Object>signal
<AbortSignal> permet de détruire le flux si le signal est interrompu.
Retourne : <Duplex> un flux composé du flux
stream
.
import { Readable } from 'node:stream'
async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ')
for (const word of words) {
yield word
}
}
}
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()
console.log(words) // affiche ['this', 'is', 'compose', 'as', 'operator']
Voir stream.compose
pour plus d'informations.
readable.iterator([options])
Ajouté dans : v16.3.0
[Stable: 1 - Expérimental]
Stable: 1 Stabilité: 1 - Expérimental
options
<Object>destroyOnReturn
<boolean> Lorsque défini surfalse
, appelerreturn
sur l'itérateur asynchrone, ou quitter une itérationfor await...of
en utilisant unbreak
,return
outhrow
ne détruira pas le flux. Par défaut :true
.
Retourne : <AsyncIterator> pour consommer le flux.
L'itérateur créé par cette méthode donne aux utilisateurs la possibilité d'annuler la destruction du flux si la boucle for await...of
est quittée par return
, break
, ou throw
, ou si l'itérateur doit détruire le flux si le flux a émis une erreur pendant l'itération.
const { Readable } = require('node:stream')
async function printIterator(readable) {
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk) // 1
break
}
console.log(readable.destroyed) // false
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk) // Affichera 2 puis 3
}
console.log(readable.destroyed) // True, le flux a été totalement consommé
}
async function printSymbolAsyncIterator(readable) {
for await (const chunk of readable) {
console.log(chunk) // 1
break
}
console.log(readable.destroyed) // true
}
async function showBoth() {
await printIterator(Readable.from([1, 2, 3]))
await printSymbolAsyncIterator(Readable.from([1, 2, 3]))
}
showBoth()
readable.map(fn[, options])
[Historique]
Version | Modifications |
---|---|
v20.7.0, v18.19.0 | Ajout de highWaterMark dans les options. |
v17.4.0, v16.14.0 | Ajouté dans : v17.4.0, v16.14.0 |
[Stable: 1 - Expérimental]
Stable: 1 Stabilité: 1 - Expérimental
fn
<Function> | <AsyncFunction> une fonction à appliquer sur chaque morceau du flux.data
<any> un morceau de données provenant du flux.options
<Object>signal
<AbortSignal> abandonné si le flux est détruit permettant d’abandonner l’appelfn
plus tôt.
options
<Object>concurrency
<number> le nombre maximal d’appels simultanés defn
à faire sur le flux en même temps. Par défaut :1
.highWaterMark
<number> le nombre d’éléments à mettre en mémoire tampon en attendant la consommation par l’utilisateur des éléments mappés. Par défaut :concurrency * 2 - 1
.signal
<AbortSignal> permet de détruire le flux si le signal est abandonné.
Retourne : <Readable> un flux mappé avec la fonction
fn
.
Cette méthode permet de faire un mapping sur le flux. La fonction fn
sera appelée pour chaque morceau du flux. Si la fonction fn
retourne une promesse, cette promesse sera attendue
avant d’être transmise au flux de résultat.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// Avec un mapper synchrone.
for await (const chunk of Readable.from([1, 2, 3, 4]).map(x => x * 2)) {
console.log(chunk) // 2, 4, 6, 8
}
// Avec un mapper asynchrone, effectuant au maximum 2 requêtes à la fois.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
domain => resolver.resolve4(domain),
{ concurrency: 2 }
)
for await (const result of dnsResults) {
console.log(result) // Affiche le résultat DNS de resolver.resolve4.
}
readable.filter(fn[, options])
[Historique]
Version | Modifications |
---|---|
v20.7.0, v18.19.0 | ajout de highWaterMark dans les options. |
v17.4.0, v16.14.0 | Ajouté dans : v17.4.0, v16.14.0 |
[Stable: 1 - Expérimental]
Stable: 1 Stabilité: 1 - Expérimental
fn
<Function> | <AsyncFunction> une fonction pour filtrer les blocs du flux.data
<any> un bloc de données du flux.options
<Object>signal
<AbortSignal> annulé si le flux est détruit, ce qui permet d'annuler l'appel defn
plus tôt.
options
<Object>concurrency
<number> le nombre maximal d'invocations simultanées defn
à appeler sur le flux à la fois. Par défaut :1
.highWaterMark
<number> combien d'éléments mettre en mémoire tampon en attendant la consommation par l'utilisateur des éléments filtrés. Par défaut :concurrency * 2 - 1
.signal
<AbortSignal> permet de détruire le flux si le signal est annulé.
Retourne : <Readable> un flux filtré avec le prédicat
fn
.
Cette méthode permet de filtrer le flux. Pour chaque bloc du flux, la fonction fn
sera appelée et si elle retourne une valeur « truthy », le bloc sera passé au flux résultant. Si la fonction fn
retourne une promesse, cette promesse sera « awaitée ».
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// Avec un prédicat synchrone.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
console.log(chunk) // 3, 4
}
// Avec un prédicat asynchrone, effectuant au plus 2 requêtes à la fois.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).filter(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address.ttl > 60
},
{ concurrency: 2 }
)
for await (const result of dnsResults) {
// Affiche les domaines avec plus de 60 secondes sur l'enregistrement DNS résolu.
console.log(result)
}
readable.forEach(fn[, options])
Ajouté dans : v17.5.0, v16.15.0
[Stable: 1 - Expérimental]
Stable: 1 Stabilité: 1 - Expérimental
fn
<Fonction> | <AsyncFunction> une fonction à appeler sur chaque bloc du flux.data
<any> un bloc de données provenant du flux.options
<Object>signal
<AbortSignal> abandonné si le flux est détruit, ce qui permet d'abandonner l'appel defn
plus tôt.
options
<Object>concurrency
<number> le nombre maximal d'invocations simultanées defn
à appeler sur le flux en une seule fois. Par défaut :1
.signal
<AbortSignal> permet de détruire le flux si le signal est abandonné.
Retourne : <Promise> une promesse indiquant que le flux est terminé.
Cette méthode permet d’itérer dans un flux. Pour chaque bloc du flux, la fonction fn
est appelée. Si la fonction fn
retourne une promesse, cette promesse sera attendue
.
Cette méthode diffère des boucles for await...of
en ce qu’elle peut éventuellement traiter les blocs simultanément. De plus, une itération forEach
ne peut être arrêtée qu’en ayant passé une option signal
et en abandonnant le AbortController
associé, tandis que for await...of
peut être arrêtée avec break
ou return
. Dans les deux cas, le flux est détruit.
Cette méthode diffère de l’écoute de l’événement 'data'
en ce qu’elle utilise l’événement readable
dans le mécanisme sous-jacent et peut limiter le nombre d’appels fn
simultanés.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// Avec un prédicat synchrone.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
console.log(chunk) // 3, 4
}
// Avec un prédicat asynchrone, effectuant au plus 2 requêtes à la fois.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address
},
{ concurrency: 2 }
)
await dnsResults.forEach(result => {
// Affiche le résultat, similaire à `for await (const result of dnsResults)`
console.log(result)
})
console.log('done') // Le flux est terminé
readable.toArray([options])
Ajouté dans : v17.5.0, v16.15.0
[Stable : 1 - Expérimental]
Stable : 1 Stabilité : 1 - Expérimental
options
<Object>signal
<AbortSignal> permet d'annuler l'opération toArray si le signal est abandonné.
Retourne : <Promise> une promesse contenant un tableau avec le contenu du flux.
Cette méthode permet d'obtenir facilement le contenu d'un flux.
Comme cette méthode lit l'ensemble du flux en mémoire, elle annule les avantages des flux. Elle est destinée à l'interopérabilité et à la commodité, et non comme moyen principal de consommer des flux.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
await Readable.from([1, 2, 3, 4]).toArray() // [1, 2, 3, 4]
// Effectuer des requêtes DNS simultanément à l'aide de .map et collecter
// les résultats dans un tableau à l'aide de toArray
const dnsResults = await Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org'])
.map(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address
},
{ concurrency: 2 }
)
.toArray()
readable.some(fn[, options])
Ajouté dans : v17.5.0, v16.15.0
[Stable : 1 - Expérimental]
Stable : 1 Stabilité : 1 - Expérimental
fn
<Function> | <AsyncFunction> une fonction à appeler sur chaque bloc du flux.data
<any> un bloc de données du flux.options
<Object>signal
<AbortSignal> abandonné si le flux est détruit, ce qui permet d'interrompre l'appel defn
plus tôt.
options
<Object>concurrency
<number> le nombre maximal d'appels simultanés defn
à appeler sur le flux à la fois. Par défaut :1
.signal
<AbortSignal> permet de détruire le flux si le signal est abandonné.
Retourne : <Promise> une promesse évaluant à
true
sifn
a retourné une valeur truthy pour au moins l'un des blocs.
Cette méthode est similaire à Array.prototype.some
et appelle fn
sur chaque bloc du flux jusqu'à ce que la valeur de retour attendue soit true
(ou toute valeur truthy). Une fois qu'un appel de fn
sur un bloc a une valeur de retour attendue truthy, le flux est détruit et la promesse est remplie avec true
. Si aucun des appels de fn
sur les blocs ne retourne de valeur truthy, la promesse est remplie avec false
.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// Avec un prédicat synchrone.
await Readable.from([1, 2, 3, 4]).some(x => x > 2) // true
await Readable.from([1, 2, 3, 4]).some(x => x < 0) // false
// Avec un prédicat asynchrone, effectuant au maximum 2 vérifications de fichiers à la fois.
const anyBigFile = await Readable.from(['file1', 'file2', 'file3']).some(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
console.log(anyBigFile) // `true` si un fichier de la liste est plus grand que 1 Mo
console.log('done') // Le flux est terminé
readable.find(fn[, options])
Ajouté dans : v17.5.0, v16.17.0
[Stable: 1 - Expérimental]
Stable: 1 Stabilité : 1 - Expérimental
fn
<Function> | <AsyncFunction> une fonction à appeler sur chaque chunk du flux.data
<any> un chunk de données provenant du flux.options
<Object>signal
<AbortSignal> interrompu si le flux est détruit, ce qui permet d'interrompre l'appelfn
plus tôt.
options
<Object>concurrency
<number> le nombre maximal d'invocations simultanées defn
à appeler sur le flux à la fois. Par défaut :1
.signal
<AbortSignal> permet de détruire le flux si le signal est interrompu.
Retourne : <Promise> une promesse évaluant le premier chunk pour lequel
fn
a évalué avec une valeur truthy, ouundefined
si aucun élément n'a été trouvé.
Cette méthode est similaire à Array.prototype.find
et appelle fn
sur chaque chunk du flux pour trouver un chunk avec une valeur truthy pour fn
. Une fois que la valeur de retour attendue d'un appel fn
est truthy, le flux est détruit et la promesse est résolue avec la valeur pour laquelle fn
a retourné une valeur truthy. Si tous les appels fn
sur les chunks renvoient une valeur falsy, la promesse est résolue avec undefined
.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// Avec un prédicat synchrone.
await Readable.from([1, 2, 3, 4]).find(x => x > 2) // 3
await Readable.from([1, 2, 3, 4]).find(x => x > 0) // 1
await Readable.from([1, 2, 3, 4]).find(x => x > 10) // undefined
// Avec un prédicat asynchrone, effectuant au maximum 2 vérifications de fichiers à la fois.
const foundBigFile = await Readable.from(['file1', 'file2', 'file3']).find(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
console.log(foundBigFile) // Nom du fichier volumineux, si un fichier de la liste est supérieur à 1 Mo
console.log('done') // Le flux est terminé
readable.every(fn[, options])
Ajouté dans : v17.5.0, v16.15.0
[Stable: 1 - Expérimental]
Stable : 1 Stabilité : 1 - Expérimental
fn
<Function> | <AsyncFunction> une fonction à appeler sur chaque bloc du flux.data
<any> un bloc de données du flux.options
<Object>signal
<AbortSignal> abandonné si le flux est détruit permettant d’abandonner l’appelfn
plus tôt.
options
<Object>concurrency
<number> le nombre maximal d’appels simultanés defn
à effectuer sur le flux en une seule fois. Par défaut :1
.signal
<AbortSignal> permet de détruire le flux si le signal est abandonné.
Retourne : <Promise> une promesse dont l’évaluation est
true
sifn
a renvoyé une valeur « truthy » pour tous les blocs.
Cette méthode est similaire à Array.prototype.every
et appelle fn
sur chaque bloc du flux pour vérifier si toutes les valeurs de retour attendues sont des valeurs « truthy » pour fn
. Une fois qu’un appel fn
sur une valeur de retour attendue de bloc est « falsy », le flux est détruit et la promesse est réalisée avec false
. Si tous les appels fn
sur les blocs renvoient une valeur « truthy », la promesse est réalisée avec true
.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// Avec un prédicat synchrone.
await Readable.from([1, 2, 3, 4]).every(x => x > 2) // false
await Readable.from([1, 2, 3, 4]).every(x => x > 0) // true
// Avec un prédicat asynchrone, effectuant au maximum 2 vérifications de fichier à la fois.
const allBigFiles = await Readable.from(['fichier1', 'fichier2', 'fichier3']).every(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
// `true` si tous les fichiers de la liste sont supérieurs à 1 Mio
console.log(allBigFiles)
console.log('terminé') // Le flux est terminé
readable.flatMap(fn[, options])
Ajouté dans : v17.5.0, v16.15.0
[Stable: 1 - Expérimental]
Stable: 1 Stabilité: 1 - Expérimental
fn
<Function> | <AsyncGeneratorFunction> | <AsyncFunction> une fonction à appliquer sur chaque morceau du flux.data
<any> un morceau de données provenant du flux.options
<Object>signal
<AbortSignal> abandonné si le flux est détruit, permettant d'abandonner l'appelfn
prématurément.
options
<Object>concurrency
<number> le nombre maximal d'appels simultanés defn
à effectuer sur le flux en une seule fois. Par défaut :1
.signal
<AbortSignal> permet de détruire le flux si le signal est abandonné.
Retourne : <Readable> un flux aplati avec la fonction
fn
.
Cette méthode retourne un nouveau flux en appliquant le rappel donné à chaque morceau du flux, puis en aplatissant le résultat.
Il est possible de retourner un flux ou un autre itérable ou itérable asynchrone à partir de fn
et les flux de résultat seront fusionnés (aplatis) dans le flux retourné.
import { Readable } from 'node:stream'
import { createReadStream } from 'node:fs'
// Avec un mapper synchrone.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap(x => [x, x])) {
console.log(chunk) // 1, 1, 2, 2, 3, 3, 4, 4
}
// Avec un mapper asynchrone, combine le contenu de 4 fichiers
const concatResult = Readable.from(['./1.mjs', './2.mjs', './3.mjs', './4.mjs']).flatMap(fileName =>
createReadStream(fileName)
)
for await (const result of concatResult) {
// Cela contiendra le contenu (tous les morceaux) des 4 fichiers
console.log(result)
}
readable.drop(limit[, options])
Ajouté dans : v17.5.0, v16.15.0
[Stable : 1 - Expérimental]
Stable : 1 Stabilité : 1 - Expérimental
limit
<number> le nombre de blocs à supprimer du flux lisible.options
<Object>signal
<AbortSignal> permet de détruire le flux si le signal est interrompu.
Retourne : <Readable> un flux avec
limit
blocs supprimés.
Cette méthode retourne un nouveau flux avec les limit
premiers blocs supprimés.
import { Readable } from 'node:stream'
await Readable.from([1, 2, 3, 4]).drop(2).toArray() // [3, 4]
readable.take(limit[, options])
Ajouté dans : v17.5.0, v16.15.0
[Stable : 1 - Expérimental]
Stable : 1 Stabilité : 1 - Expérimental
limit
<number> le nombre de blocs à prendre du flux lisible.options
<Object>signal
<AbortSignal> permet de détruire le flux si le signal est interrompu.
Retourne : <Readable> un flux avec
limit
blocs pris.
Cette méthode retourne un nouveau flux avec les limit
premiers blocs.
import { Readable } from 'node:stream'
await Readable.from([1, 2, 3, 4]).take(2).toArray() // [1, 2]
readable.reduce(fn[, initial[, options]])
Ajouté dans : v17.5.0, v16.15.0
[Stable : 1 - Expérimental]
Stable : 1 Stabilité : 1 - Expérimental
fn
<Function> | <AsyncFunction> une fonction réductrice à appeler sur chaque bloc du flux.previous
<any> la valeur obtenue à partir du dernier appel àfn
ou la valeurinitial
si elle est spécifiée ou le premier bloc du flux sinon.data
<any> un bloc de données provenant du flux.options
<Object>signal
<AbortSignal> abandonné si le flux est détruit, permettant d’abandonner l’appel àfn
plus tôt.
initial
<any> la valeur initiale à utiliser dans la réduction.options
<Object>signal
<AbortSignal> permet de détruire le flux si le signal est interrompu.
Retourne : <Promise> une promesse pour la valeur finale de la réduction.
Cette méthode appelle fn
sur chaque bloc du flux dans l’ordre, en lui passant le résultat du calcul sur l’élément précédent. Elle retourne une promesse pour la valeur finale de la réduction.
Si aucune valeur initial
n’est fournie, le premier bloc du flux est utilisé comme valeur initiale. Si le flux est vide, la promesse est rejetée avec un TypeError
avec la propriété de code ERR_INVALID_ARGS
.
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'
const directoryPath = './src'
const filesInDir = await readdir(directoryPath)
const folderSize = await Readable.from(filesInDir).reduce(async (totalSize, file) => {
const { size } = await stat(join(directoryPath, file))
return totalSize + size
}, 0)
console.log(folderSize)
La fonction réductrice itère le flux élément par élément, ce qui signifie qu’il n’y a pas de paramètre de concurrency
ou de parallélisme. Pour effectuer une reduce
simultanément, vous pouvez extraire la fonction asynchrone vers la méthode readable.map
.
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'
const directoryPath = './src'
const filesInDir = await readdir(directoryPath)
const folderSize = await Readable.from(filesInDir)
.map(file => stat(join(directoryPath, file)), { concurrency: 2 })
.reduce((totalSize, { size }) => totalSize + size, 0)
console.log(folderSize)
Flux duplex et flux de transformation
Classe : stream.Duplex
[Historique]
Version | Modifications |
---|---|
v6.8.0 | Les instances de Duplex renvoient désormais true lors de la vérification instanceof stream.Writable . |
v0.9.4 | Ajouté dans : v0.9.4 |
Les flux duplex sont des flux qui implémentent à la fois les interfaces Readable
et Writable
.
Voici des exemples de flux Duplex
:
duplex.allowHalfOpen
Ajouté dans : v0.9.4
Si false
, le flux mettra automatiquement fin au côté accessible en écriture lorsque le côté accessible en lecture se termine. Définie initialement par l’option de constructeur allowHalfOpen
, qui est par défaut à true
.
Cela peut être changé manuellement pour modifier le comportement semi-ouvert d’une instance de flux Duplex
existante, mais doit être changé avant que l’événement 'end'
soit émis.
Classe : stream.Transform
Ajouté dans : v0.9.4
Les flux de transformation sont des flux Duplex
où la sortie est liée d’une manière ou d’une autre à l’entrée. Comme tous les flux Duplex
, les flux Transform
implémentent à la fois les interfaces Readable
et Writable
.
Voici des exemples de flux Transform
:
transform.destroy([error])
[Historique]
Version | Modifications |
---|---|
v14.0.0 | Fonctionne comme une opération sans effet sur un flux qui a déjà été détruit. |
v8.0.0 | Ajouté dans : v8.0.0 |
Détruit le flux et émet éventuellement un événement 'error'
. Après cet appel, le flux de transformation libère toutes les ressources internes. Les implémenteurs ne doivent pas remplacer cette méthode, mais plutôt implémenter readable._destroy()
. L’implémentation par défaut de _destroy()
pour Transform
émet également 'close'
sauf si emitClose
est défini sur false.
Une fois que destroy()
a été appelé, tout appel ultérieur sera sans effet et aucune autre erreur, sauf provenant de _destroy()
, ne peut être émise en tant que 'error'
.
stream.duplexPair([options])
Ajouté dans la version : v22.6.0, v20.17.0
options
<Objet> Une valeur à transmettre aux deux constructeursDuplex
, pour définir des options telles que la mise en mémoire tampon.- Retourne : <Tableau> de deux instances
Duplex
.
La fonction utilitaire duplexPair
retourne un tableau avec deux éléments, chacun étant un flux Duplex
connecté à l’autre côté :
const [sideA, sideB] = duplexPair()
Tout ce qui est écrit dans un flux est rendu lisible sur l’autre. Il fournit un comportement analogue à une connexion réseau, où les données écrites par le client deviennent lisibles par le serveur, et vice-versa.
Les flux Duplex sont symétriques ; l’un ou l’autre peut être utilisé sans aucune différence de comportement.
stream.finished(stream[, options], callback)
[Historique]
Version | Modifications |
---|---|
v19.5.0 | Ajout de la prise en charge de ReadableStream et WritableStream . |
v15.11.0 | L’option signal a été ajoutée. |
v14.0.0 | La fonction finished(stream, cb) attendra l’événement 'close' avant d’invoquer le rappel. L’implémentation essaie de détecter les flux hérités et d’appliquer ce comportement uniquement aux flux qui sont censés émettre 'close' . |
v14.0.0 | L’émission de 'close' avant 'end' sur un flux Readable provoquera une erreur ERR_STREAM_PREMATURE_CLOSE . |
v14.0.0 | Le rappel sera invoqué sur les flux qui ont déjà été terminés avant l’appel à finished(stream, cb) . |
v10.0.0 | Ajouté dans la version : v10.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> Un flux/flux Web lisible et/ou inscriptible.options
<Objet>error
<booléen> Si la valeur estfalse
, un appel àemit('error', err)
n’est pas considéré comme terminé. Par défaut :true
.readable
<booléen> Lorsque la valeur estfalse
, le rappel est appelé lorsque le flux se termine, même si le flux peut encore être lisible. Par défaut :true
.writable
<booléen> Lorsque la valeur estfalse
, le rappel est appelé lorsque le flux se termine, même si le flux peut encore être inscriptible. Par défaut :true
.signal
<AbortSignal> permet d’abandonner l’attente de la fin du flux. Le flux sous-jacent ne sera pas abandonné si le signal est abandonné. Le rappel sera appelé avec uneAbortError
. Tous les écouteurs enregistrés ajoutés par cette fonction seront également supprimés.
callback
<Fonction> Une fonction de rappel qui prend un argument d’erreur facultatif.- Retourne : <Fonction> Une fonction de nettoyage qui supprime tous les écouteurs enregistrés.
Une fonction pour être averti lorsqu’un flux n’est plus lisible, inscriptible ou a rencontré une erreur ou un événement de fermeture prématurée.
const { finished } = require('node:stream')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
finished(rs, err => {
if (err) {
console.error('Le flux a échoué.', err)
} else {
console.log('Le flux a fini de lire.')
}
})
rs.resume() // Purger le flux.
Particulièrement utile dans les scénarios de gestion des erreurs où un flux est détruit prématurément (comme une requête HTTP abandonnée) et n’émettra pas 'end'
ou 'finish'
.
L’API finished
fournit une version de promesse.
stream.finished()
laisse des écouteurs d’événements en suspens (en particulier 'error'
, 'end'
, 'finish'
et 'close'
) après que callback
a été invoqué. La raison en est que les événements 'error'
inattendus (en raison d’implémentations de flux incorrectes) ne provoquent pas de plantages inattendus. Si ce comportement n’est pas souhaité, la fonction de nettoyage retournée doit être invoquée dans le rappel :
const cleanup = finished(rs, err => {
cleanup()
// ...
})
stream.pipeline(source[, ...transforms], destination, callback)
stream.pipeline(streams, callback)
[Historique]
Version | Modifications |
---|---|
v19.7.0, v18.16.0 | Ajout de la prise en charge des webstreams. |
v18.0.0 | Le passage d'un callback invalide à l'argument callback lève maintenant ERR_INVALID_ARG_TYPE au lieu de ERR_INVALID_CALLBACK . |
v14.0.0 | pipeline(..., cb) attendra l'événement 'close' avant d'invoquer le callback. L'implémentation essaie de détecter les anciens flux et applique ce comportement uniquement aux flux qui sont censés émettre 'close' . |
v13.10.0 | Ajout de la prise en charge des générateurs asynchrones. |
v10.0.0 | Ajouté dans : v10.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function> | <ReadableStream>- Retourne : <Iterable> | <AsyncIterable>
...transforms
<Stream> | <Function> | <TransformStream>source
<AsyncIterable>- Retourne : <AsyncIterable>
destination
<Stream> | <Function> | <WritableStream>source
<AsyncIterable>- Retourne : <AsyncIterable> | <Promise>
callback
<Function> Appelé lorsque le pipeline est entièrement terminé.err
<Error>val
Valeur résolue dePromise
retournée pardestination
.
Retourne : <Stream>
Une méthode de module pour connecter des flux et des générateurs, transmettre les erreurs et nettoyer correctement et fournir un callback lorsque le pipeline est terminé.
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// Utilisez l'API pipeline pour connecter facilement une série de flux
// ensemble et être notifié lorsque le pipeline est entièrement terminé.
// Un pipeline pour compresser efficacement un fichier tar potentiellement énorme :
pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
if (err) {
console.error('Échec du pipeline.', err)
} else {
console.log('Pipeline réussi.')
}
})
L'API pipeline
fournit une version promise.
stream.pipeline()
appellera stream.destroy(err)
sur tous les flux sauf :
- Les flux
Readable
qui ont émis'end'
ou'close'
. - Les flux
Writable
qui ont émis'finish'
ou'close'
.
stream.pipeline()
laisse des écouteurs d'événements en suspens sur les flux après l'invocation du callback
. En cas de réutilisation de flux après un échec, cela peut provoquer des fuites d'écouteurs d'événements et des erreurs étouffées. Si le dernier flux est lisible, les écouteurs d'événements en suspens seront supprimés afin que le dernier flux puisse être consommé ultérieurement.
stream.pipeline()
ferme tous les flux lorsqu'une erreur est déclenchée. L'utilisation de IncomingRequest
avec pipeline
peut entraîner un comportement inattendu une fois qu'il aurait détruit le socket sans envoyer la réponse attendue. Voir l'exemple ci-dessous :
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt')
pipeline(fileStream, res, err => {
if (err) {
console.log(err) // Aucun fichier de ce type
// ce message ne peut pas être envoyé une fois que `pipeline` a déjà détruit le socket
return res.end('erreur !!!')
}
})
})
stream.compose(...streams)
[Historique]
Version | Modifications |
---|---|
v21.1.0, v20.10.0 | Ajout de la prise en charge de la classe stream. |
v19.8.0, v18.16.0 | Ajout de la prise en charge des flux web. |
v16.9.0 | Ajouté dans : v16.9.0 |
[Stable : 1 - Expérimental]
Stable : 1 Stabilité : 1 - stream.compose
est expérimental.
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]> | <Duplex[]> | <Function>- Retourne : <stream.Duplex>
Combine deux flux ou plus en un flux Duplex
qui écrit dans le premier flux et lit à partir du dernier. Chaque flux fourni est dirigé vers le suivant, en utilisant stream.pipeline
. Si l’un des flux génère une erreur, tous sont détruits, y compris le flux Duplex
externe.
Comme stream.compose
retourne un nouveau flux qui peut (et doit) à son tour être dirigé vers d’autres flux, il permet la composition. En revanche, lors de la transmission de flux à stream.pipeline
, en général, le premier flux est un flux lisible et le dernier un flux accessible en écriture, formant un circuit fermé.
S’il reçoit une Function
, elle doit être une méthode de fabrique prenant une source
Iterable
.
import { compose, Transform } from 'node:stream'
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''))
},
})
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
}
let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf
}
console.log(res) // affiche 'HELLOWORLD'
stream.compose
peut être utilisé pour convertir des itérables asynchrones, des générateurs et des fonctions en flux.
AsyncIterable
convertit en unDuplex
lisible. Ne peut pas générernull
.AsyncGeneratorFunction
convertit en unDuplex
de transformation lisible/accessible en écriture. Doit prendre uneAsyncIterable
source comme premier paramètre. Ne peut pas générernull
.AsyncFunction
convertit en unDuplex
accessible en écriture. Doit retourner soitnull
, soitundefined
.
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'
// Convertir AsyncIterable en Duplex lisible.
const s1 = compose(
(async function* () {
yield 'Hello'
yield 'World'
})()
)
// Convertir AsyncGenerator en Duplex de transformation.
const s2 = compose(async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
})
let res = ''
// Convertir AsyncFunction en Duplex accessible en écriture.
const s3 = compose(async function (source) {
for await (const chunk of source) {
res += chunk
}
})
await finished(compose(s1, s2, s3))
console.log(res) // affiche 'HELLOWORLD'
Voir readable.compose(stream)
pour stream.compose
en tant qu’opérateur.
stream.Readable.from(iterable[, options])
Ajouté dans : v12.3.0, v10.17.0
iterable
<Iterable> Objet implémentant le protocole itérableSymbol.asyncIterator
ouSymbol.iterator
. Émet un événement 'error' si une valeur null est passée.options
<Object> Options fournies ànew stream.Readable([options])
. Par défaut,Readable.from()
définitoptions.objectMode
surtrue
, sauf si cela est explicitement désactivé en définissantoptions.objectMode
surfalse
.- Renvoie : <stream.Readable>
Une méthode utilitaire pour créer des flux lisibles à partir d'itérateurs.
const { Readable } = require('node:stream')
async function* generate() {
yield 'hello'
yield 'streams'
}
const readable = Readable.from(generate())
readable.on('data', chunk => {
console.log(chunk)
})
L'appel à Readable.from(string)
ou Readable.from(buffer)
n'itérera pas sur les chaînes ou les tampons pour correspondre à la sémantique des autres flux pour des raisons de performance.
Si un objet Iterable
contenant des promesses est passé comme argument, cela pourrait entraîner un rejet non traité.
const { Readable } = require('node:stream')
Readable.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Rejet non traité
])
stream.Readable.fromWeb(readableStream[, options])
Ajouté dans : v17.0.0
[Stable: 1 - Expérimental]
Stable: 1 Stabilité : 1 - Expérimental
readableStream
<ReadableStream>options
<Object>encoding
<string>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
Renvoie : <stream.Readable>
stream.Readable.isDisturbed(stream)
Ajouté dans : v16.8.0
[Stable: 1 - Expérimental]
Stable: 1 Stabilité : 1 - Expérimental
stream
<stream.Readable> | <ReadableStream>- Retourne :
boolean
Indique si le flux a été lu ou annulé.
stream.isErrored(stream)
Ajouté dans : v17.3.0, v16.14.0
[Stable: 1 - Expérimental]
Stable: 1 Stabilité : 1 - Expérimental
stream
<Readable> | <Writable> | <Duplex> | <WritableStream> | <ReadableStream>- Retourne : <boolean>
Indique si le flux a rencontré une erreur.
stream.isReadable(stream)
Ajouté dans : v17.4.0, v16.14.0
[Stable: 1 - Expérimental]
Stable: 1 Stabilité : 1 - Expérimental
stream
<Readable> | <Duplex> | <ReadableStream>- Retourne : <boolean>
Indique si le flux est lisible.
stream.Readable.toWeb(streamReadable[, options])
Ajouté dans : v17.0.0
[Stable: 1 - Expérimental]
Stable: 1 Stabilité : 1 - Expérimental
streamReadable
<stream.Readable>options
<Object>strategy
<Object>highWaterMark
<number> La taille maximale de la file d’attente interne (duReadableStream
créé) avant que la contre-pression ne soit appliquée lors de la lecture à partir dustream.Readable
donné. Si aucune valeur n’est fournie, elle sera prise à partir dustream.Readable
donné.size
<Function> Une fonction qui donne la taille du bloc de données donné. Si aucune valeur n’est fournie, la taille sera1
pour tous les blocs.chunk
<any>- Retourne : <number>
Retourne : <ReadableStream>
stream.Writable.fromWeb(writableStream[, options])
Ajouté dans : v17.0.0
[Stable: 1 - Expérimental]
Stable: 1 Stabilité: 1 - Expérimental
writableStream
<WritableStream>options
<Object>decodeStrings
<boolean>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
Retourne : <stream.Writable>
stream.Writable.toWeb(streamWritable)
Ajouté dans : v17.0.0
[Stable: 1 - Expérimental]
Stable: 1 Stabilité: 1 - Expérimental
streamWritable
<stream.Writable>- Retourne : <WritableStream>
stream.Duplex.from(src)
[Historique]
Version | Modifications |
---|---|
v19.5.0, v18.17.0 | L’argument src peut désormais être un ReadableStream ou WritableStream . |
v16.8.0 | Ajouté dans : v16.8.0 |
src
<Stream> | <Blob> | <ArrayBuffer> | <string> | <Iterable> | <AsyncIterable> | <AsyncGeneratorFunction> | <AsyncFunction> | <Promise> | <Object> | <ReadableStream> | <WritableStream>
Une méthode utilitaire pour créer des flux duplex.
Stream
convertit un flux d'écriture enDuplex
accessible en écriture et un flux de lecture enDuplex
.Blob
convertit enDuplex
accessible en lecture.string
convertit enDuplex
accessible en lecture.ArrayBuffer
convertit enDuplex
accessible en lecture.AsyncIterable
convertit enDuplex
accessible en lecture. Ne peut pas générernull
.AsyncGeneratorFunction
convertit en unDuplex
de transformation accessible en lecture/écriture. Doit prendre une sourceAsyncIterable
comme premier paramètre. Ne peut pas générernull
.AsyncFunction
convertit enDuplex
accessible en écriture. Doit retournernull
ouundefined
.Object ({ writable, readable })
convertitreadable
etwritable
enStream
, puis les combine enDuplex
où leDuplex
écrira dans lewritable
et lira à partir dureadable
.Promise
convertit enDuplex
accessible en lecture. La valeurnull
est ignorée.ReadableStream
convertit enDuplex
accessible en lecture.WritableStream
convertit enDuplex
accessible en écriture.- Retourne : <stream.Duplex>
Si un objet Iterable
contenant des promesses est transmis comme argument, cela peut entraîner un rejet non géré.
const { Duplex } = require('node:stream')
Duplex.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Rejet non géré
])
stream.Duplex.fromWeb(pair[, options])
Ajouté dans : v17.0.0
[Stable: 1 - Expérimental]
Stable: 1 Stabilité : 1 - Expérimental
pair
<Object>readable
<ReadableStream>writable
<WritableStream>
options
<Object>Retourne : <stream.Duplex>
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
for await (const chunk of duplex) {
console.log('readable', chunk)
}
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))
stream.Duplex.toWeb(streamDuplex)
Ajouté dans : v17.0.0
[Stable : 1 - Expérimental]
Stable : 1 Stabilité : 1 - Expérimental
streamDuplex
<stream.Duplex>- Retourne : <Object>
readable
<ReadableStream>writable
<WritableStream>
import { Duplex } from 'node:stream'
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
const { value } = await readable.getReader().read()
console.log('readable', value)
const { Duplex } = require('node:stream')
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
readable
.getReader()
.read()
.then(result => {
console.log('readable', result.value)
})
stream.addAbortSignal(signal, stream)
[Historique]
Version | Modifications |
---|---|
v19.7.0, v18.16.0 | Ajout du support pour ReadableStream et WritableStream . |
v15.4.0 | Ajouté dans : v15.4.0 |
signal
<AbortSignal> Un signal représentant une possible annulationstream
<Stream> | <ReadableStream> | <WritableStream> Un flux auquel attacher un signal.
Attache un AbortSignal à un flux lisible ou inscriptible. Cela permet au code de contrôler la destruction du flux en utilisant un AbortController
.
L'appel de abort
sur l'AbortController
correspondant à l'AbortSignal
transmis se comportera de la même manière que l'appel de .destroy(new AbortError())
sur le flux, et controller.error(new AbortError())
pour les flux web.
const fs = require('node:fs')
const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// Plus tard, interrompre l'opération en fermant le flux
controller.abort()
Ou en utilisant un AbortSignal
avec un flux lisible comme itérable asynchrone :
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // définir un délai d'expiration
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
try {
for await (const chunk of stream) {
await process(chunk)
}
} catch (e) {
if (e.name === 'AbortError') {
// L'opération a été annulée
} else {
throw e
}
}
})()
Ou en utilisant un AbortSignal
avec un ReadableStream :
const controller = new AbortController()
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello')
controller.enqueue('world')
controller.close()
},
})
addAbortSignal(controller.signal, rs)
finished(rs, err => {
if (err) {
if (err.name === 'AbortError') {
// L'opération a été annulée
}
}
})
const reader = rs.getReader()
reader.read().then(({ value, done }) => {
console.log(value) // hello
console.log(done) // false
controller.abort()
})
stream.getDefaultHighWaterMark(objectMode)
Ajouté dans : v19.9.0, v18.17.0
Retourne le highWaterMark
par défaut utilisé par les flux. La valeur par défaut est 65536
(64 kio) ou 16
pour objectMode
.
stream.setDefaultHighWaterMark(objectMode, value)
Ajouté dans : v19.9.0, v18.17.0
Définit le highWaterMark
par défaut utilisé par les flux.
API pour les implémenteurs de flux
L'API du module node:stream
a été conçue pour faciliter la mise en œuvre de flux en utilisant le modèle d'héritage prototypique de JavaScript.
Tout d'abord, un développeur de flux déclarerait une nouvelle classe JavaScript qui étend l'une des quatre classes de flux de base (stream.Writable
, stream.Readable
, stream.Duplex
ou stream.Transform
), en s'assurant d'appeler le constructeur de la classe parent appropriée :
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark })
// ...
}
}
Lorsque vous étendez des flux, gardez à l'esprit les options que l'utilisateur peut et doit fournir avant de les transmettre au constructeur de base. Par exemple, si l'implémentation fait des hypothèses concernant les options autoDestroy
et emitClose
, ne permettez pas à l'utilisateur de les remplacer. Soyez explicite sur les options qui sont transmises au lieu de transmettre implicitement toutes les options.
La nouvelle classe de flux doit ensuite implémenter une ou plusieurs méthodes spécifiques, selon le type de flux créé, comme indiqué dans le tableau ci-dessous :
Cas d'utilisation | Classe | Méthode(s) à implémenter |
---|---|---|
Lecture seule | Readable | _read() |
Écriture seule | Writable | _write() , _writev() , _final() |
Lecture et écriture | Duplex | _read() , _write() , _writev() , _final() |
Traiter les données écrites, puis lire le résultat | Transform | _transform() , _flush() , _final() |
Le code d'implémentation d'un flux ne doit jamais appeler les méthodes "publiques" d'un flux qui sont destinées à être utilisées par les consommateurs (comme décrit dans la section API pour les consommateurs de flux). Cela pourrait entraîner des effets secondaires indésirables dans le code d'application qui consomme le flux.
Évitez de remplacer des méthodes publiques telles que write()
, end()
, cork()
, uncork()
, read()
et destroy()
, ou d'émettre des événements internes tels que 'error'
, 'data'
, 'end'
, 'finish'
et 'close'
via .emit()
. Cela peut casser les invariants de flux actuels et futurs, entraînant des problèmes de comportement et/ou de compatibilité avec d'autres flux, utilitaires de flux et attentes des utilisateurs.
Construction simplifiée
Ajouté dans la version : v1.2.0
Dans de nombreux cas simples, il est possible de créer un flux sans avoir recours à l'héritage. Cela peut être réalisé en créant directement des instances des objets stream.Writable
, stream.Readable
, stream.Duplex
ou stream.Transform
et en passant les méthodes appropriées en tant qu'options du constructeur.
const { Writable } = require('node:stream')
const myWritable = new Writable({
construct(callback) {
// Initialiser l'état et charger les ressources...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Libérer les ressources...
},
})
Mise en œuvre d'un flux inscriptible
La classe stream.Writable
est étendue pour mettre en œuvre un flux Writable
.
Les flux Writable
personnalisés doivent appeler le constructeur new stream.Writable([options])
et mettre en œuvre la méthode writable._write()
et/ou writable._writev()
.
new stream.Writable([options])
[Historique]
Version | Modifications |
---|---|
v22.0.0 | Augmentation de la valeur par défaut de highWaterMark. |
v15.5.0 | Prise en charge du passage d'un AbortSignal. |
v14.0.0 | Modification de la valeur par défaut de l'option autoDestroy à true . |
v11.2.0, v10.16.0 | Ajout de l'option autoDestroy pour appeler automatiquement destroy() le flux lorsqu'il émet 'finish' ou des erreurs. |
v10.0.0 | Ajout de l'option emitClose pour spécifier si 'close' est émis lors de la destruction. |
options
<Object>highWaterMark
<number> Niveau du tampon lorsquestream.write()
commence à renvoyerfalse
. Par défaut :65536
(64 Kio), ou16
pour les fluxobjectMode
.decodeStrings
<boolean> Indique s'il faut encoder lesstring
transmises àstream.write()
enBuffer
(avec l'encodage spécifié dans l'appelstream.write()
) avant de les transmettre àstream._write()
. Les autres types de données ne sont pas convertis (c.-à-d. que lesBuffer
ne sont pas décodés enstring
). La valeur false empêchera la conversion desstring
. Par défaut :true
.defaultEncoding
<string> L'encodage par défaut qui est utilisé lorsqu'aucun encodage n'est spécifié comme argument àstream.write()
. Par défaut :'utf8'
.objectMode
<boolean> Indique sistream.write(anyObj)
est une opération valide ou non. Lorsqu'il est défini, il devient possible d'écrire des valeurs JavaScript autres que string, <Buffer>, <TypedArray> ou <DataView> si elles sont prises en charge par l'implémentation du flux. Par défaut :false
.emitClose
<boolean> Indique si le flux doit émettre'close'
après avoir été détruit ou non. Par défaut :true
.write
<Function> Mise en œuvre de la méthodestream._write()
.writev
<Function> Mise en œuvre de la méthodestream._writev()
.destroy
<Function> Mise en œuvre de la méthodestream._destroy()
.final
<Function> Mise en œuvre de la méthodestream._final()
.construct
<Function> Mise en œuvre de la méthodestream._construct()
.autoDestroy
<boolean> Indique si ce flux doit appeler automatiquement.destroy()
sur lui-même après la fin. Par défaut :true
.signal
<AbortSignal> Un signal représentant une possible annulation.
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor(options) {
// Appelle le constructeur stream.Writable().
super(options)
// ...
}
}
Ou, lors de l'utilisation de constructeurs de style antérieur à ES6 :
const { Writable } = require('node:stream')
const util = require('node:util')
function MyWritable(options) {
if (!(this instanceof MyWritable)) return new MyWritable(options)
Writable.call(this, options)
}
util.inherits(MyWritable, Writable)
Ou, en utilisant l'approche simplifiée du constructeur :
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
})
L'appel de abort
sur AbortController
correspondant au AbortSignal
passé se comportera de la même manière que l'appel de .destroy(new AbortError())
sur le flux inscriptible.
const { Writable } = require('node:stream')
const controller = new AbortController()
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
})
// Plus tard, annuler l'opération en fermant le flux
controller.abort()
writable._construct(callback)
Ajouté dans : v15.0.0
callback
<Fonction> Appelez cette fonction (éventuellement avec un argument d’erreur) lorsque le flux a terminé son initialisation.
La méthode _construct()
ne DOIT PAS être appelée directement. Elle peut être implémentée par les classes enfants et, si c’est le cas, elle sera appelée uniquement par les méthodes internes de la classe Writable
.
Cette fonction optionnelle sera appelée dans un tic après le retour du constructeur du flux, ce qui retardera tout appel à _write()
, _final()
et _destroy()
jusqu’à ce que callback
soit appelée. Ceci est utile pour initialiser l’état ou pour initialiser de manière asynchrone des ressources avant que le flux puisse être utilisé.
const { Writable } = require('node:stream')
const fs = require('node:fs')
class WriteStream extends Writable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, 'w', (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback)
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
writable._write(chunk, encoding, callback)
[Historique]
Version | Modifications |
---|---|
v12.11.0 | _write() est optionnel lorsque vous fournissez _writev() . |
chunk
<Buffer> | <string> | <any> LeBuffer
à écrire, converti à partir de lastring
passée àstream.write()
. Si l’optiondecodeStrings
du flux estfalse
ou si le flux fonctionne en mode objet, le chunk ne sera pas converti et sera ce qui a été passé àstream.write()
.encoding
<string> Si le chunk est une chaîne, alorsencoding
est l’encodage de caractères de cette chaîne. Si le chunk est unBuffer
, ou si le flux fonctionne en mode objet,encoding
peut être ignoré.callback
<Fonction> Appelez cette fonction (éventuellement avec un argument d’erreur) lorsque le traitement du chunk fourni est terminé.
Toutes les implémentations de flux Writable
doivent fournir une méthode writable._write()
et/ou writable._writev()
pour envoyer les données à la ressource sous-jacente.
Les flux Transform
fournissent leur propre implémentation de writable._write()
.
Cette fonction ne DOIT PAS être appelée directement par le code de l’application. Elle doit être implémentée par les classes enfants et appelée uniquement par les méthodes internes de la classe Writable
.
La fonction callback
doit être appelée de manière synchrone dans writable._write()
ou de manière asynchrone (c.-à-d. un tic différent) pour signaler soit que l’écriture a réussi, soit qu’elle a échoué avec une erreur. Le premier argument passé à callback
doit être l’objet Error
si l’appel a échoué ou null
si l’écriture a réussi.
Tous les appels à writable.write()
qui se produisent entre le moment où writable._write()
est appelé et le moment où callback
est appelé entraîneront la mise en mémoire tampon des données écrites. Lorsque callback
est invoqué, le flux peut émettre un événement 'drain'
. Si une implémentation de flux est capable de traiter plusieurs chunks de données à la fois, la méthode writable._writev()
doit être implémentée.
Si la propriété decodeStrings
est explicitement définie sur false
dans les options du constructeur, alors chunk
restera le même objet qui est passé à .write()
, et peut être une chaîne plutôt qu’un Buffer
. Cela permet de prendre en charge les implémentations qui ont une gestion optimisée pour certains encodages de données de chaîne. Dans ce cas, l’argument encoding
indiquera l’encodage de caractères de la chaîne. Sinon, l’argument encoding
peut être ignoré en toute sécurité.
La méthode writable._write()
est préfixée d’un trait de soulignement, car elle est interne à la classe qui la définit et ne doit jamais être appelée directement par les programmes utilisateur.
writable._writev(chunks, callback)
chunks
<Object[]> Les données à écrire. La valeur est un tableau de <Object> qui représentent chacun un morceau de données discret à écrire. Les propriétés de ces objets sont :chunk
<Buffer> | <string> Une instance de tampon ou une chaîne contenant les données à écrire. Lechunk
sera une chaîne si leWritable
a été créé avec l’optiondecodeStrings
définie surfalse
et qu’une chaîne a été transmise àwrite()
.encoding
<string> L’encodage des caractères duchunk
. Sichunk
est unBuffer
, l’encodage sera'buffer'
.
callback
<Function> Une fonction de rappel (éventuellement avec un argument d’erreur) à invoquer lorsque le traitement est terminé pour les morceaux fournis.
Cette fonction NE DOIT PAS être appelée directement par le code de l’application. Elle doit être implémentée par les classes enfants et appelée uniquement par les méthodes internes de la classe Writable
.
La méthode writable._writev()
peut être implémentée en plus ou en alternative à writable._write()
dans les implémentations de flux capables de traiter plusieurs morceaux de données à la fois. Si elle est implémentée et s’il existe des données mises en mémoire tampon provenant d’écritures précédentes, _writev()
sera appelée au lieu de _write()
.
La méthode writable._writev()
est préfixée d’un trait de soulignement car elle est interne à la classe qui la définit et ne doit jamais être appelée directement par les programmes utilisateur.
writable._destroy(err, callback)
Ajoutée dans : v8.0.0
err
<Error> Une erreur possible.callback
<Function> Une fonction de rappel qui prend un argument d’erreur facultatif.
La méthode _destroy()
est appelée par writable.destroy()
. Elle peut être remplacée par des classes enfants, mais elle ne doit pas être appelée directement.
writable._final(callback)
Ajouté dans : v8.0.0
callback
<Function> Appeler cette fonction (éventuellement avec un argument d'erreur) lorsque l'écriture des données restantes est terminée.
La méthode _final()
ne doit pas être appelée directement. Elle peut être implémentée par des classes enfants, et si c'est le cas, elle sera appelée uniquement par les méthodes internes de la classe Writable
.
Cette fonction optionnelle sera appelée avant la fermeture du flux, retardant l'événement 'finish'
jusqu'à ce que callback
soit appelée. Cela est utile pour fermer des ressources ou écrire des données mises en mémoire tampon avant la fin d'un flux.
Erreurs lors de l'écriture
Les erreurs survenant lors du traitement des méthodes writable._write()
, writable._writev()
et writable._final()
doivent être propagées en invoquant le rappel et en transmettant l'erreur comme premier argument. Lever une Error
à partir de ces méthodes ou émettre manuellement un événement 'error'
entraîne un comportement indéfini.
Si un flux Readable
est relié à un flux Writable
lorsque Writable
émet une erreur, le flux Readable
sera déconnecté.
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
},
})
Un exemple de flux inscriptible
Ce qui suit illustre une implémentation de flux Writable
personnalisé assez simpliste (et quelque peu inutile). Bien que cette instance de flux Writable
spécifique ne soit pas d'une utilité réelle particulière, l'exemple illustre chacun des éléments requis d'une instance de flux Writable
personnalisée :
const { Writable } = require('node:stream')
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk is invalid'))
} else {
callback()
}
}
}
Décodage des tampons dans un flux accessible en écriture
Le décodage des tampons est une tâche courante, par exemple, lors de l'utilisation de transformateurs dont l'entrée est une chaîne de caractères. Ce n'est pas un processus trivial lors de l'utilisation de l'encodage de caractères multi-octets, tel que UTF-8. L'exemple suivant montre comment décoder des chaînes multi-octets en utilisant StringDecoder
et Writable
.
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')
class StringWritable extends Writable {
constructor(options) {
super(options)
this._decoder = new StringDecoder(options?.defaultEncoding)
this.data = ''
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk)
}
this.data += chunk
callback()
}
_final(callback) {
this.data += this._decoder.end()
callback()
}
}
const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()
w.write('currency: ')
w.write(euro[0])
w.end(euro[1])
console.log(w.data) // currency: €
Implémentation d'un flux lisible
La classe stream.Readable
est étendue pour implémenter un flux Readable
.
Les flux Readable
personnalisés doivent appeler le constructeur new stream.Readable([options])
et implémenter la méthode readable._read()
.
new stream.Readable([options])
[Historique]
Version | Modifications |
---|---|
v22.0.0 | Augmentation de la valeur par défaut de highWaterMark. |
v15.5.0 | Prise en charge du passage d'un AbortSignal. |
v14.0.0 | Changement de la valeur par défaut de l'option autoDestroy à true . |
v11.2.0, v10.16.0 | Ajout de l'option autoDestroy pour destroy() automatiquement le flux lorsqu'il émet 'end' ou des erreurs. |
options
<Object>highWaterMark
<number> Le nombre maximal d'octets à stocker dans le tampon interne avant de cesser de lire la ressource sous-jacente. Par défaut :65536
(64 Ko), ou16
pour les flux enobjectMode
.encoding
<string> Si spécifié, les tampons seront décodés en chaînes à l'aide de l'encodage spécifié. Par défaut :null
.objectMode
<boolean> Indique si ce flux doit se comporter comme un flux d'objets. Cela signifie questream.read(n)
renvoie une seule valeur au lieu d'unBuffer
de taillen
. Par défaut :false
.emitClose
<boolean> Indique si le flux doit émettre'close'
après avoir été détruit. Par défaut :true
.read
<Function> Implémentation de la méthodestream._read()
.destroy
<Function> Implémentation de la méthodestream._destroy()
.construct
<Function> Implémentation de la méthodestream._construct()
.autoDestroy
<boolean> Indique si ce flux doit appeler automatiquement.destroy()
sur lui-même après sa fin. Par défaut :true
.signal
<AbortSignal> Un signal représentant une possible annulation.
const { Readable } = require('node:stream')
class MyReadable extends Readable {
constructor(options) {
// Appelle le constructeur stream.Readable(options).
super(options)
// ...
}
}
Ou, lors de l'utilisation de constructeurs de style pré-ES6 :
const { Readable } = require('node:stream')
const util = require('node:util')
function MyReadable(options) {
if (!(this instanceof MyReadable)) return new MyReadable(options)
Readable.call(this, options)
}
util.inherits(MyReadable, Readable)
Ou, en utilisant l'approche de constructeur simplifiée :
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
// ...
},
})
L'appel de abort
sur le AbortController
correspondant à la AbortSignal
passée se comportera de la même manière que l'appel de .destroy(new AbortError())
sur le flux lisible créé.
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal,
})
// Plus tard, annulez l'opération en fermant le flux.
controller.abort()
readable._construct(callback)
Ajouté dans : v15.0.0
callback
<Function> Appelez cette fonction (éventuellement avec un argument d'erreur) lorsque le flux a terminé son initialisation.
La méthode _construct()
NE DOIT PAS être appelée directement. Elle peut être implémentée par des classes enfants et, si c'est le cas, elle sera appelée uniquement par les méthodes internes de la classe Readable
.
Cette fonction optionnelle sera planifiée lors de la prochaine itération par le constructeur du flux, ce qui retardera tout appel de _read()
et _destroy()
jusqu'à ce que callback
soit appelée. Ceci est utile pour initialiser l'état ou initialiser de manière asynchrone des ressources avant que le flux ne puisse être utilisé.
const { Readable } = require('node:stream')
const fs = require('node:fs')
class ReadStream extends Readable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_read(n) {
const buf = Buffer.alloc(n)
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err)
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
}
})
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
readable._read(size)
Ajouté dans : v0.9.4
size
<number> Nombre d’octets à lire de façon asynchrone
Cette fonction NE DOIT PAS être appelée directement par le code de l’application. Elle doit être implémentée par les classes enfants, et être appelée uniquement par les méthodes internes de la classe Readable
.
Toutes les implémentations de flux Readable
doivent fournir une implémentation de la méthode readable._read()
pour récupérer des données depuis la ressource sous-jacente.
Lorsque readable._read()
est appelée, si des données sont disponibles à partir de la ressource, l’implémentation doit commencer à pousser ces données dans la file d’attente de lecture en utilisant la méthode this.push(dataChunk)
. _read()
sera appelée à nouveau après chaque appel à this.push(dataChunk)
une fois que le flux est prêt à accepter plus de données. _read()
peut continuer à lire depuis la ressource et à pousser des données jusqu’à ce que readable.push()
renvoie false
. Ce n’est que lorsque _read()
est appelée à nouveau après qu’elle s’est arrêtée qu’elle doit reprendre le transfert de données supplémentaires dans la file d’attente.
Une fois que la méthode readable._read()
a été appelée, elle ne sera plus appelée jusqu’à ce que plus de données soient poussées via la méthode readable.push()
. Les données vides telles que les tampons et les chaînes vides n’entraîneront pas l’appel de readable._read()
.
L’argument size
est consultatif. Pour les implémentations où une "lecture" est une opération unique qui renvoie des données, l’argument size
peut être utilisé pour déterminer la quantité de données à extraire. D’autres implémentations peuvent ignorer cet argument et simplement fournir des données chaque fois qu’elles deviennent disponibles. Il n’est pas nécessaire d’"attendre" que size
octets soient disponibles avant d’appeler stream.push(chunk)
.
La méthode readable._read()
est précédée d’un trait de soulignement, car elle est interne à la classe qui la définit et ne doit jamais être appelée directement par les programmes utilisateurs.
readable._destroy(err, callback)
Ajouté dans : v8.0.0
err
<Error> Une erreur possible.callback
<Function> Une fonction de rappel qui prend un argument d’erreur facultatif.
La méthode _destroy()
est appelée par readable.destroy()
. Elle peut être remplacée par les classes enfants mais elle ne doit pas être appelée directement.
readable.push(chunk[, encoding])
[Historique]
Version | Modifications |
---|---|
v22.0.0, v20.13.0 | L’argument chunk peut maintenant être une instance TypedArray ou DataView . |
v8.0.0 | L’argument chunk peut maintenant être une instance Uint8Array . |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Portion de données à ajouter à la file d’attente de lecture. Pour les flux ne fonctionnant pas en mode objet,chunk
doit être une <string>, un <Buffer>, un <TypedArray> ou une <DataView>. Pour les flux en mode objet,chunk
peut être n’importe quelle valeur JavaScript.encoding
<string> Encodage des portions de chaînes. Doit être un encodageBuffer
valide, tel que'utf8'
ou'ascii'
.- Retourne : <boolean>
true
si des portions de données supplémentaires peuvent continuer à être ajoutées ;false
sinon.
Lorsque chunk
est un <Buffer>, un <TypedArray>, une <DataView> ou une <string>, la portion
de données sera ajoutée à la file d’attente interne pour que les utilisateurs du flux puissent la consommer. Le fait de transmettre chunk
en tant que null
signale la fin du flux (EOF), après quoi aucune autre donnée ne peut être écrite.
Lorsque le Readable
fonctionne en mode suspendu, les données ajoutées avec readable.push()
peuvent être lues en appelant la méthode readable.read()
lorsque l’événement 'readable'
est émis.
Lorsque le Readable
fonctionne en mode flux, les données ajoutées avec readable.push()
seront distribuées en émettant un événement 'data'
.
La méthode readable.push()
est conçue pour être aussi flexible que possible. Par exemple, lors de l’encapsulation d’une source de niveau inférieur qui fournit une forme quelconque de mécanisme de pause/reprise, et un rappel de données, la source de niveau inférieur peut être encapsulée par l’instance Readable
personnalisée :
// `_source` est un objet avec des méthodes readStop() et readStart(),
// et un membre `ondata` qui est appelé lorsqu’il a des données, et
// un membre `onend` qui est appelé lorsque les données sont terminées.
class SourceWrapper extends Readable {
constructor(options) {
super(options)
this._source = getLowLevelSourceObject()
// Chaque fois qu’il y a des données, les ajouter à la mémoire tampon interne.
this._source.ondata = chunk => {
// Si push() retourne false, alors arrêtez la lecture depuis la source.
if (!this.push(chunk)) this._source.readStop()
}
// Lorsque la source se termine, ajoutez la portion `null` signalant la fin de fichier.
this._source.onend = () => {
this.push(null)
}
}
// _read() sera appelé lorsque le flux veut récupérer plus de données.
// L’argument de taille indicatif est ignoré dans ce cas.
_read(size) {
this._source.readStart()
}
}
La méthode readable.push()
est utilisée pour ajouter le contenu à la mémoire tampon interne. Elle peut être pilotée par la méthode readable._read()
.
Pour les flux ne fonctionnant pas en mode objet, si le paramètre chunk
de readable.push()
est undefined
, il sera traité comme une chaîne ou une mémoire tampon vide. Voir readable.push('')
pour plus d’informations.
Erreurs lors de la lecture
Les erreurs survenant lors du traitement de readable._read()
doivent être propagées via la méthode readable.destroy(err)
. Lancer une Error
depuis readable._read()
ou émettre manuellement un événement 'error'
entraîne un comportement indéfini.
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
const err = checkSomeErrorCondition()
if (err) {
this.destroy(err)
} else {
// Effectuer un travail.
}
},
})
Exemple de flux de comptage
L'exemple suivant est un exemple basique d'un flux Readable
qui émet les chiffres de 1 à 1 000 000 par ordre croissant, puis se termine.
const { Readable } = require('node:stream')
class Counter extends Readable {
constructor(opt) {
super(opt)
this._max = 1000000
this._index = 1
}
_read() {
const i = this._index++
if (i > this._max) this.push(null)
else {
const str = String(i)
const buf = Buffer.from(str, 'ascii')
this.push(buf)
}
}
}
Implémentation d'un flux duplex
Un flux Duplex
est un flux qui implémente à la fois Readable
et Writable
, comme une connexion de socket TCP.
Comme JavaScript ne prend pas en charge l'héritage multiple, la classe stream.Duplex
est étendue pour implémenter un flux Duplex
(au lieu d'étendre les classes stream.Readable
et stream.Writable
).
La classe stream.Duplex
hérite de manière prototypique de stream.Readable
et de manière parasitaire de stream.Writable
, mais instanceof
fonctionnera correctement pour les deux classes de base grâce à la surcharge de Symbol.hasInstance
sur stream.Writable
.
Les flux Duplex
personnalisés doivent appeler le constructeur new stream.Duplex([options])
et implémenter à la fois les méthodes readable._read()
et writable._write()
.
new stream.Duplex(options)
[Historique]
Version | Modifications |
---|---|
v8.4.0 | Les options readableHighWaterMark et writableHighWaterMark sont désormais prises en charge. |
options
<Objet> Transmis aux constructeursWritable
etReadable
. Possède également les champs suivants :allowHalfOpen
<boolean> S'il est défini surfalse
, le flux terminera automatiquement le côté accessible en écriture lorsque le côté accessible en lecture se terminera. Par défaut :true
.readable
<boolean> Détermine si leDuplex
doit être accessible en lecture. Par défaut :true
.writable
<boolean> Détermine si leDuplex
doit être accessible en écriture. Par défaut :true
.readableObjectMode
<boolean> DéfinitobjectMode
pour le côté accessible en lecture du flux. N'a aucun effet siobjectMode
esttrue
. Par défaut :false
.writableObjectMode
<boolean> DéfinitobjectMode
pour le côté accessible en écriture du flux. N'a aucun effet siobjectMode
esttrue
. Par défaut :false
.readableHighWaterMark
<number> DéfinithighWaterMark
pour le côté accessible en lecture du flux. N'a aucun effet sihighWaterMark
est fourni.writableHighWaterMark
<number> DéfinithighWaterMark
pour le côté accessible en écriture du flux. N'a aucun effet sihighWaterMark
est fourni.
const { Duplex } = require('node:stream')
class MyDuplex extends Duplex {
constructor(options) {
super(options)
// ...
}
}
Ou, lors de l'utilisation de constructeurs de style pré-ES6 :
const { Duplex } = require('node:stream')
const util = require('node:util')
function MyDuplex(options) {
if (!(this instanceof MyDuplex)) return new MyDuplex(options)
Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)
Ou, en utilisant l'approche du constructeur simplifié :
const { Duplex } = require('node:stream')
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
},
})
Lors de l'utilisation du pipeline :
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')
pipeline(
fs.createReadStream('object.json').setEncoding('utf8'),
new Transform({
decodeStrings: false, // Accepter les entrées de type chaîne plutôt que les tampons
construct(callback) {
this.data = ''
callback()
},
transform(chunk, encoding, callback) {
this.data += chunk
callback()
},
flush(callback) {
try {
// S'assurer qu'il s'agit d'un JSON valide.
JSON.parse(this.data)
this.push(this.data)
callback()
} catch (err) {
callback(err)
}
},
}),
fs.createWriteStream('valid-object.json'),
err => {
if (err) {
console.error('failed', err)
} else {
console.log('completed')
}
}
)
Exemple de flux duplex
L'exemple suivant illustre un exemple simple de flux Duplex
qui encapsule un objet source hypothétique de niveau inférieur dans lequel des données peuvent être écrites, et à partir duquel des données peuvent être lues, bien qu'en utilisant une API qui n'est pas compatible avec les flux Node.js. L'exemple suivant illustre un exemple simple de flux Duplex
qui met en mémoire tampon les données écrites entrantes via l'interface Writable
qui est relue via l'interface Readable
.
const { Duplex } = require('node:stream')
const kSource = Symbol('source')
class MyDuplex extends Duplex {
constructor(source, options) {
super(options)
this[kSource] = source
}
_write(chunk, encoding, callback) {
// La source sous-jacente ne traite que les chaînes de caractères.
if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
this[kSource].writeSomeData(chunk)
callback()
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding))
})
}
}
L'aspect le plus important d'un flux Duplex
est que les côtés Readable
et Writable
fonctionnent indépendamment l'un de l'autre, bien qu'ils coexistent au sein d'une même instance d'objet.
Flux duplex en mode objet
Pour les flux Duplex
, objectMode
peut être défini exclusivement pour le côté Readable
ou Writable
en utilisant respectivement les options readableObjectMode
et writableObjectMode
.
Dans l'exemple suivant, par exemple, un nouveau flux Transform
(qui est un type de flux Duplex
) est créé avec un côté Writable
en mode objet qui accepte les nombres JavaScript qui sont convertis en chaînes hexadécimales du côté Readable
.
const { Transform } = require('node:stream')
// Tous les flux Transform sont également des flux Duplex.
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// Convertir le chunk en nombre si nécessaire.
chunk |= 0
// Transformer le chunk en autre chose.
const data = chunk.toString(16)
// Pousser les données dans la file d'attente lisible.
callback(null, '0'.repeat(data.length % 2) + data)
},
})
myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))
myTransform.write(1)
// Affiche : 01
myTransform.write(10)
// Affiche : 0a
myTransform.write(100)
// Affiche : 64
Implémentation d'un flux de transformation
Un flux Transform
est un flux Duplex
où la sortie est calculée d'une certaine manière à partir de l'entrée. Les exemples incluent les flux zlib ou les flux crypto qui compressent, cryptent ou décryptent des données.
Il n'y a aucune obligation que la sortie ait la même taille que l'entrée, le même nombre de morceaux ou arrive au même moment. Par exemple, un flux Hash
n'aura jamais qu'un seul morceau de sortie qui est fourni lorsque l'entrée est terminée. Un flux zlib
produira une sortie qui est soit beaucoup plus petite, soit beaucoup plus grande que son entrée.
La classe stream.Transform
est étendue pour implémenter un flux Transform
.
La classe stream.Transform
hérite prototypiquement de stream.Duplex
et implémente ses propres versions des méthodes writable._write()
et readable._read()
. Les implémentations Transform
personnalisées doivent implémenter la méthode transform._transform()
et peuvent également implémenter la méthode transform._flush()
.
Il faut être prudent lors de l'utilisation de flux Transform
, car les données écrites dans le flux peuvent entraîner la mise en pause du côté Writable
du flux si la sortie du côté Readable
n'est pas consommée.
new stream.Transform([options])
options
<Object> Transmis aux constructeursWritable
etReadable
. A également les champs suivants :transform
<Function> Implémentation de la méthodestream._transform()
.flush
<Function> Implémentation de la méthodestream._flush()
.
const { Transform } = require('node:stream')
class MyTransform extends Transform {
constructor(options) {
super(options)
// ...
}
}
Ou, lors de l'utilisation de constructeurs de style pré-ES6 :
const { Transform } = require('node:stream')
const util = require('node:util')
function MyTransform(options) {
if (!(this instanceof MyTransform)) return new MyTransform(options)
Transform.call(this, options)
}
util.inherits(MyTransform, Transform)
Ou, en utilisant l'approche de constructeur simplifiée :
const { Transform } = require('node:stream')
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
},
})
Événement : 'end'
L’événement 'end'
provient de la classe stream.Readable
. L’événement 'end'
est émis une fois que toutes les données ont été produites, ce qui se produit après l’appel du rappel dans transform._flush()
. En cas d’erreur, 'end'
ne doit pas être émis.
Événement : 'finish'
L’événement 'finish'
provient de la classe stream.Writable
. L’événement 'finish'
est émis après que stream.end()
est appelé et que tous les blocs ont été traités par stream._transform()
. En cas d’erreur, 'finish'
ne doit pas être émis.
transform._flush(callback)
callback
<Fonction> Une fonction de rappel (avec éventuellement un argument d’erreur et des données) à appeler lorsque les données restantes ont été vidées.
Cette fonction NE DOIT PAS être appelée directement par le code de l’application. Elle doit être implémentée par les classes enfants et appelée uniquement par les méthodes internes de la classe Readable
.
Dans certains cas, une opération de transformation peut avoir besoin d’émettre un bit de données supplémentaire à la fin du flux. Par exemple, un flux de compression zlib
stockera une quantité d’état interne utilisée pour compresser de manière optimale la sortie. Lorsque le flux se termine, cependant, ces données supplémentaires doivent être vidées afin que les données compressées soient complètes.
Les implémentations personnalisées de Transform
peuvent implémenter la méthode transform._flush()
. Celle-ci sera appelée lorsqu’il n’y aura plus de données écrites à consommer, mais avant que l’événement 'end'
ne soit émis signalant la fin du flux Readable
.
Au sein de l’implémentation de transform._flush()
, la méthode transform.push()
peut être appelée zéro ou plusieurs fois, selon le cas. La fonction callback
doit être appelée lorsque l’opération de vidage est terminée.
La méthode transform._flush()
est préfixée par un caractère de soulignement car elle est interne à la classe qui la définit et ne doit jamais être appelée directement par les programmes utilisateur.
transform._transform(chunk, encoding, callback)
chunk
<Buffer> | <string> | <any> LeBuffer
à transformer, converti à partir de lastring
passée àstream.write()
. Si l'optiondecodeStrings
du flux estfalse
ou si le flux fonctionne en mode objet, le chunk ne sera pas converti et sera ce qui a été passé àstream.write()
.encoding
<string> Si le chunk est une chaîne, alors il s'agit du type d'encodage. Si le chunk est un buffer, alors il s'agit de la valeur spéciale'buffer'
. Ignorez-le dans ce cas.callback
<Function> Une fonction de rappel (éventuellement avec un argument d'erreur et des données) à appeler une fois que lechunk
fourni a été traité.
Cette fonction NE DOIT PAS être appelée directement par le code de l'application. Elle doit être implémentée par les classes enfants et appelée uniquement par les méthodes internes de la classe Readable
.
Toutes les implémentations de flux Transform
doivent fournir une méthode _transform()
pour accepter l'entrée et produire la sortie. L'implémentation transform._transform()
gère les octets écrits, calcule une sortie, puis transmet cette sortie à la partie lisible en utilisant la méthode transform.push()
.
La méthode transform.push()
peut être appelée zéro ou plusieurs fois pour générer une sortie à partir d'un seul chunk d'entrée, en fonction de la quantité à sortir en raison du chunk.
Il est possible qu'aucune sortie ne soit générée à partir d'un chunk de données d'entrée donné.
La fonction callback
doit être appelée uniquement lorsque le chunk actuel est complètement consommé. Le premier argument passé au callback
doit être un objet Error
si une erreur s'est produite lors du traitement de l'entrée, ou null
sinon. Si un deuxième argument est passé au callback
, il sera transmis à la méthode transform.push()
, mais seulement si le premier argument est faux. En d'autres termes, ce qui suit est équivalent :
transform.prototype._transform = function (data, encoding, callback) {
this.push(data)
callback()
}
transform.prototype._transform = function (data, encoding, callback) {
callback(null, data)
}
La méthode transform._transform()
est préfixée par un trait de soulignement car elle est interne à la classe qui la définit et ne doit jamais être appelée directement par les programmes utilisateurs.
transform._transform()
n'est jamais appelée en parallèle ; les flux implémentent un mécanisme de file d'attente, et pour recevoir le prochain chunk, callback
doit être appelée, soit de manière synchrone, soit de manière asynchrone.
Classe : stream.PassThrough
La classe stream.PassThrough
est une implémentation triviale d’un flux Transform
qui transmet simplement les octets d’entrée vers la sortie. Son objectif est principalement de servir d’exemples et de tests, mais il existe certains cas d’utilisation où stream.PassThrough
est utile comme élément de base pour de nouveaux types de flux.
Notes supplémentaires
Compatibilité des flux avec les générateurs et itérateurs asynchrones
Avec la prise en charge des générateurs et itérateurs asynchrones en JavaScript, les générateurs asynchrones sont désormais effectivement une construction de flux de premier ordre au niveau du langage.
Voici quelques cas courants d’interopérabilité d’utilisation de flux Node.js avec des générateurs asynchrones et des itérateurs asynchrones.
Consommer des flux lisibles avec des itérateurs asynchrones
;(async function () {
for await (const chunk of readable) {
console.log(chunk)
}
})()
Les itérateurs asynchrones enregistrent un gestionnaire d’erreurs permanent sur le flux pour éviter toute erreur post-destruction non gérée.
Créer des flux lisibles avec des générateurs asynchrones
Un flux lisible Node.js peut être créé à partir d’un générateur asynchrone en utilisant la méthode utilitaire Readable.from()
:
const { Readable } = require('node:stream')
const ac = new AbortController()
const signal = ac.signal
async function* generate() {
yield 'a'
await someLongRunningFn({ signal })
yield 'b'
yield 'c'
}
const readable = Readable.from(generate())
readable.on('close', () => {
ac.abort()
})
readable.on('data', chunk => {
console.log(chunk)
})
Envoyer des itérateurs asynchrones vers des flux accessibles en écriture
Lorsque vous écrivez dans un flux accessible en écriture à partir d’un itérateur asynchrone, assurez-vous d’une gestion correcte de la contre-pression et des erreurs. stream.pipeline()
fait abstraction de la gestion de la contre-pression et des erreurs liées à la contre-pression :
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')
const writable = fs.createWriteStream('./file')
const ac = new AbortController()
const signal = ac.signal
const iterator = createIterator({ signal })
// Modèle de rappel
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err)
} else {
console.log(value, 'value returned')
}
}).on('close', () => {
ac.abort()
})
// Modèle de promesse
pipelinePromise(iterator, writable)
.then(value => {
console.log(value, 'value returned')
})
.catch(err => {
console.error(err)
ac.abort()
})
Compatibilité avec les anciennes versions de Node.js
Avant Node.js 0.10, l'interface de flux Readable
était plus simple, mais aussi moins puissante et moins utile.
- Plutôt que d'attendre les appels à la méthode
stream.read()
, les événements'data'
commençaient à émettre immédiatement. Les applications qui devaient effectuer un certain travail pour décider comment gérer les données étaient obligées de stocker les données lues dans des tampons afin que les données ne soient pas perdues. - La méthode
stream.pause()
était consultative, plutôt que garantie. Cela signifiait qu'il était toujours nécessaire d'être prêt à recevoir des événements'data'
même lorsque le flux était dans un état en pause.
Dans Node.js 0.10, la classe Readable
a été ajoutée. Pour assurer la rétrocompatibilité avec les anciens programmes Node.js, les flux Readable
passent en "mode fluide" lorsqu'un gestionnaire d'événements 'data'
est ajouté, ou lorsque la méthode stream.resume()
est appelée. L'effet est que, même en n'utilisant pas la nouvelle méthode stream.read()
et l'événement 'readable'
, il n'est plus nécessaire de s'inquiéter de la perte de blocs 'data'
.
Bien que la plupart des applications continuent de fonctionner normalement, cela introduit un cas limite dans les conditions suivantes :
- Aucun écouteur d'événement
'data'
n'est ajouté. - La méthode
stream.resume()
n'est jamais appelée. - Le flux n'est pas redirigé vers une destination inscriptible.
Par exemple, considérez le code suivant :
// ATTENTION ! CASSÉ !
net.createServer((socket) => {
// Nous ajoutons un écouteur 'end', mais ne consommons jamais les données.
socket.on('end', () => {
// Il n'arrivera jamais ici.
socket.end('Le message a été reçu mais n'a pas été traité.\n');
});
}).listen(1337);
Avant Node.js 0.10, les données du message entrant seraient simplement ignorées. Cependant, dans Node.js 0.10 et au-delà, le socket reste en pause indéfiniment.
La solution de contournement dans cette situation consiste à appeler la méthode stream.resume()
pour démarrer le flux de données :
// Solution de contournement.
net.createServer((socket) => {
socket.on('end', () => {
socket.end('Le message a été reçu mais n'a pas été traité.\n');
});
// Démarrer le flux de données, en l'ignorant.
socket.resume();
}).listen(1337);
En plus des nouveaux flux Readable
qui passent en mode fluide, les flux de style pré-0.10 peuvent être encapsulés dans une classe Readable
en utilisant la méthode readable.wrap()
.
readable.read(0)
Dans certains cas, il est nécessaire de déclencher une actualisation des mécanismes de flux lisibles sous-jacents, sans consommer réellement de données. Dans de tels cas, il est possible d'appeler readable.read(0)
, qui retournera toujours null
.
Si le tampon de lecture interne est inférieur à highWaterMark
, et que le flux n'est pas en cours de lecture, alors l'appel à stream.read(0)
déclenchera un appel de bas niveau stream._read()
.
Bien que la plupart des applications n'aient presque jamais besoin de faire cela, il existe des situations dans Node.js où cela est fait, en particulier dans les mécanismes internes de la classe de flux Readable
.
readable.push('')
L'utilisation de readable.push('')
n'est pas recommandée.
Le fait de pousser une <string>, un <Buffer>, un <TypedArray> ou une <DataView> de zéro octet dans un flux qui n'est pas en mode objet a un effet secondaire intéressant. Parce qu'il s'agit bien d'un appel à readable.push()
, l'appel mettra fin au processus de lecture. Cependant, comme l'argument est une chaîne vide, aucune donnée n'est ajoutée au tampon lisible, il n'y a donc rien à consommer pour l'utilisateur.
Discrépance highWaterMark
après l'appel de readable.setEncoding()
L'utilisation de readable.setEncoding()
modifiera le comportement de la façon dont highWaterMark
fonctionne en mode non-objet.
Généralement, la taille du tampon actuel est mesurée par rapport à highWaterMark
en octets. Cependant, après l'appel de setEncoding()
, la fonction de comparaison commencera à mesurer la taille du tampon en caractères.
Ce n'est pas un problème dans les cas courants avec latin1
ou ascii
. Mais il est conseillé d'être attentif à ce comportement lorsque l'on travaille avec des chaînes de caractères qui pourraient contenir des caractères multi-octets.