Stream
[Stable: 2 - Stable]
Stable: 2 Stabilität: 2 - Stabil
Quellcode: lib/stream.js
Ein Stream ist eine abstrakte Schnittstelle für die Arbeit mit Streaming-Daten in Node.js. Das Modul node:stream
bietet eine API zur Implementierung der Stream-Schnittstelle.
Es gibt viele Stream-Objekte, die von Node.js bereitgestellt werden. Beispielsweise sind eine Anfrage an einen HTTP-Server und process.stdout
beides Stream-Instanzen.
Streams können lesbar, schreibbar oder beides sein. Alle Streams sind Instanzen von EventEmitter
.
Um auf das Modul node:stream
zuzugreifen:
const stream = require('node:stream')
Das Modul node:stream
ist nützlich, um neue Arten von Stream-Instanzen zu erstellen. Es ist in der Regel nicht erforderlich, das Modul node:stream
zu verwenden, um Streams zu konsumieren.
Gliederung dieses Dokuments
Dieses Dokument enthält zwei Hauptabschnitte und einen dritten Abschnitt für Anmerkungen. Der erste Abschnitt erklärt, wie man vorhandene Streams innerhalb einer Anwendung verwendet. Der zweite Abschnitt erklärt, wie man neue Arten von Streams erstellt.
Arten von Streams
Es gibt vier grundlegende Stream-Typen in Node.js:
Writable
: Streams, in die Daten geschrieben werden können (z. B.fs.createWriteStream()
).Readable
: Streams, aus denen Daten gelesen werden können (z. B.fs.createReadStream()
).Duplex
: Streams, die sowohlReadable
als auchWritable
sind (z. B.net.Socket
).Transform
:Duplex
-Streams, die die Daten beim Schreiben und Lesen verändern oder transformieren können (z. B.zlib.createDeflate()
).
Zusätzlich enthält dieses Modul die Hilfsfunktionen stream.duplexPair()
, stream.pipeline()
, stream.finished()
, stream.Readable.from()
und stream.addAbortSignal()
.
Streams Promises API
Hinzugefügt in: v15.0.0
Die stream/promises
API bietet einen alternativen Satz von asynchronen Hilfsfunktionen für Streams, die Promise
-Objekte zurückgeben, anstatt Callbacks zu verwenden. Die API ist über require('node:stream/promises')
oder require('node:stream').promises
zugänglich.
stream.pipeline(source[, ...transforms], destination[, options])
stream.pipeline(streams[, options])
[Historie]
Version | Änderungen |
---|---|
v18.0.0, v17.2.0, v16.14.0 | Füge die end -Option hinzu, die auf false gesetzt werden kann, um zu verhindern, dass der Ziel-Stream automatisch geschlossen wird, wenn die Quelle endet. |
v15.0.0 | Hinzugefügt in: v15.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function>- Gibt zurück: <Promise> | <AsyncIterable>
...transforms
<Stream> | <Function>source
<AsyncIterable>- Gibt zurück: <Promise> | <AsyncIterable>
destination
<Stream> | <Function>source
<AsyncIterable>- Gibt zurück: <Promise> | <AsyncIterable>
options
<Object> Pipeline-Optionensignal
<AbortSignal>end
<boolean> Beende den Zielstream, wenn der Quellstream endet. Transform-Streams werden immer beendet, auch wenn dieser Wertfalse
ist. Standard:true
.
Gibt zurück: <Promise> Wird erfüllt, wenn die Pipeline abgeschlossen ist.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
console.log('Pipeline erfolgreich.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Pipeline erfolgreich.')
Um ein AbortSignal
zu verwenden, übergeben Sie es als letztes Argument innerhalb eines Options-Objekts. Wenn das Signal abgebrochen wird, wird destroy
auf der zugrunde liegenden Pipeline mit einem AbortError
aufgerufen.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
const ac = new AbortController()
const signal = ac.signal
setImmediate(() => ac.abort())
await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
signal,
})
}
run().catch(console.error) // AbortError
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'
const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
console.error(err) // AbortError
}
Die pipeline
API unterstützt auch asynchrone Generatoren:
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // Arbeite mit Strings anstelle von `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
fs.createWriteStream('uppercase.txt')
)
console.log('Pipeline erfolgreich.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8') // Arbeite mit Strings anstelle von `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal })
}
},
createWriteStream('uppercase.txt')
)
console.log('Pipeline erfolgreich.')
Denken Sie daran, das signal
-Argument zu behandeln, das an den asynchronen Generator übergeben wird. Insbesondere in dem Fall, in dem der asynchrone Generator die Quelle für die Pipeline ist (d.h. das erste Argument), oder die Pipeline wird niemals abgeschlossen.
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
async function run() {
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline erfolgreich.')
}
run().catch(console.error)
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
await someLongRunningfn({ signal })
yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline erfolgreich.')
Die pipeline
API bietet eine Callback-Version:
stream.finished(stream[, options])
[Verlauf]
Version | Änderungen |
---|---|
v19.5.0, v18.14.0 | Unterstützung für ReadableStream und WritableStream hinzugefügt. |
v19.1.0, v18.13.0 | Die Option cleanup wurde hinzugefügt. |
v15.0.0 | Hinzugefügt in: v15.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> Ein lesbarer und/oder beschreibbarer Stream/Webstream.options
<Object>error
<boolean> | <undefined>readable
<boolean> | <undefined>writable
<boolean> | <undefined>signal
<AbortSignal> | <undefined>cleanup
<boolean> | <undefined> Wenntrue
, werden die von dieser Funktion registrierten Listener entfernt, bevor das Promise erfüllt wird. Standard:false
.
Gibt zurück: <Promise> Wird erfüllt, wenn der Stream nicht mehr lesbar oder beschreibbar ist.
const { finished } = require('node:stream/promises')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('Stream ist mit dem Lesen fertig.')
}
run().catch(console.error)
rs.resume() // Den Stream leeren.
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'
const rs = createReadStream('archive.tar')
async function run() {
await finished(rs)
console.log('Stream ist mit dem Lesen fertig.')
}
run().catch(console.error)
rs.resume() // Den Stream leeren.
Die finished
-API bietet auch eine Callback-Version.
stream.finished()
hinterlässt hängende Event-Listener (insbesondere 'error'
, 'end'
, 'finish'
und 'close'
), nachdem das zurückgegebene Promise aufgelöst oder abgelehnt wurde. Der Grund dafür ist, dass unerwartete 'error'
-Ereignisse (aufgrund fehlerhafter Stream-Implementierungen) keine unerwarteten Abstürze verursachen. Wenn dieses Verhalten unerwünscht ist, sollte options.cleanup
auf true
gesetzt werden:
await finished(rs, { cleanup: true })
Objektmodus
Alle von Node.js-APIs erstellten Streams arbeiten ausschließlich mit Strings, <Buffer>, <TypedArray> und <DataView>-Objekten:
Strings
undBuffers
sind die am häufigsten verwendeten Typen mit Streams.TypedArray
undDataView
ermöglichen es Ihnen, binäre Daten mit Typen wieInt32Array
oderUint8Array
zu verarbeiten. Wenn Sie ein TypedArray oder DataView in einen Stream schreiben, verarbeitet Node.js die rohen Bytes.
Es ist jedoch möglich, dass Stream-Implementierungen mit anderen Arten von JavaScript-Werten arbeiten (mit Ausnahme von null
, das innerhalb von Streams einen besonderen Zweck erfüllt). Solche Streams gelten als im "Objektmodus" betrieben.
Stream-Instanzen werden in den Objektmodus versetzt, indem die Option objectMode
verwendet wird, wenn der Stream erstellt wird. Der Versuch, einen bestehenden Stream in den Objektmodus zu versetzen, ist nicht sicher.
Pufferung
Sowohl Writable
als auch Readable
-Streams speichern Daten in einem internen Puffer.
Die Menge der potenziell gepufferten Daten hängt von der Option highWaterMark
ab, die dem Konstruktor des Streams übergeben wird. Bei normalen Streams gibt die Option highWaterMark
eine Gesamtanzahl von Bytes an. Für Streams, die im Objektmodus arbeiten, gibt highWaterMark
eine Gesamtzahl von Objekten an. Für Streams, die mit (aber nicht dekodierenden) Strings arbeiten, gibt highWaterMark
eine Gesamtzahl von UTF-16-Codeeinheiten an.
Daten werden in Readable
-Streams gepuffert, wenn die Implementierung stream.push(chunk)
aufruft. Wenn der Konsument des Streams nicht stream.read()
aufruft, verbleiben die Daten in der internen Warteschlange, bis sie konsumiert werden.
Sobald die Gesamtgröße des internen Lesepuffers den durch highWaterMark
angegebenen Schwellenwert erreicht, stoppt der Stream vorübergehend das Lesen von Daten aus der zugrunde liegenden Ressource, bis die aktuell gepufferten Daten konsumiert werden können (d. h. der Stream stoppt den Aufruf der internen readable._read()
-Methode, die zum Füllen des Lesepuffers verwendet wird).
Daten werden in Writable
-Streams gepuffert, wenn die Methode writable.write(chunk)
wiederholt aufgerufen wird. Solange die Gesamtgröße des internen Schreibpuffers unterhalb des von highWaterMark
festgelegten Schwellenwerts liegt, geben Aufrufe von writable.write()
true
zurück. Sobald die Größe des internen Puffers highWaterMark
erreicht oder überschreitet, wird false
zurückgegeben.
Ein Hauptziel der stream
-API, insbesondere der Methode stream.pipe()
, ist es, die Pufferung von Daten auf akzeptable Werte zu begrenzen, so dass Quellen und Ziele mit unterschiedlichen Geschwindigkeiten den verfügbaren Speicher nicht überfordern.
Die Option highWaterMark
ist ein Schwellenwert, keine Grenze: Sie gibt die Datenmenge an, die ein Stream puffert, bevor er aufhört, weitere Daten anzufordern. Sie erzwingt im Allgemeinen keine strenge Speicherbegrenzung. Bestimmte Stream-Implementierungen können strengere Grenzen erzwingen, aber dies ist optional.
Da Duplex
und Transform
-Streams sowohl Readable
als auch Writable
sind, verwalten beide zwei separate interne Puffer, die zum Lesen und Schreiben verwendet werden, so dass jede Seite unabhängig voneinander arbeiten kann, während ein angemessener und effizienter Datenfluss aufrechterhalten wird. Beispielsweise sind net.Socket
-Instanzen Duplex
-Streams, deren Readable
-Seite den Verbrauch von Daten ermöglicht, die vom Socket empfangen werden, und deren Writable
-Seite das Schreiben von Daten an den Socket ermöglicht. Da Daten mit einer schnelleren oder langsameren Rate an den Socket geschrieben werden können als Daten empfangen werden, sollte jede Seite unabhängig voneinander arbeiten (und puffern).
Die Mechanik der internen Pufferung ist ein internes Implementierungsdetail und kann jederzeit geändert werden. Für bestimmte erweiterte Implementierungen können die internen Puffer jedoch mit writable.writableBuffer
oder readable.readableBuffer
abgerufen werden. Die Verwendung dieser undokumentierten Eigenschaften wird nicht empfohlen.
API für Stream-Konsumenten
Fast alle Node.js-Anwendungen, egal wie einfach, verwenden Streams in irgendeiner Form. Das folgende Beispiel zeigt die Verwendung von Streams in einer Node.js-Anwendung, die einen HTTP-Server implementiert:
const http = require('node:http')
const server = http.createServer((req, res) => {
// `req` ist eine http.IncomingMessage, die ein lesbarer Stream ist.
// `res` ist eine http.ServerResponse, die ein schreibbarer Stream ist.
let body = ''
// Die Daten als UTF8-Strings abrufen.
// Wenn keine Kodierung festgelegt ist, werden Buffer-Objekte empfangen.
req.setEncoding('utf8')
// Lesbare Streams senden 'data'-Ereignisse aus, sobald ein Listener hinzugefügt wurde.
req.on('data', chunk => {
body += chunk
})
// Das 'end'-Ereignis zeigt an, dass der gesamte Body empfangen wurde.
req.on('end', () => {
try {
const data = JSON.parse(body)
// Etwas Interessantes an den Benutzer zurückschreiben:
res.write(typeof data)
res.end()
} catch (er) {
// oh oh! schlechtes JSON!
res.statusCode = 400
return res.end(`error: ${er.message}`)
}
})
})
server.listen(1337)
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" ist kein gültiges JSON
Writable
-Streams (wie res
im Beispiel) stellen Methoden wie write()
und end()
bereit, die verwendet werden, um Daten auf den Stream zu schreiben.
Readable
-Streams verwenden die EventEmitter
-API, um Anwendungscode zu benachrichtigen, wenn Daten zum Auslesen aus dem Stream verfügbar sind. Diese verfügbaren Daten können auf verschiedene Arten aus dem Stream gelesen werden.
Sowohl Writable
- als auch Readable
-Streams verwenden die EventEmitter
-API auf verschiedene Arten, um den aktuellen Zustand des Streams zu kommunizieren.
Duplex
- und Transform
-Streams sind sowohl Writable
als auch Readable
.
Anwendungen, die entweder Daten in einen Stream schreiben oder Daten aus einem Stream konsumieren, müssen die Stream-Schnittstellen nicht direkt implementieren und haben im Allgemeinen keinen Grund, require('node:stream')
aufzurufen.
Entwickler, die neue Arten von Streams implementieren möchten, sollten den Abschnitt API für Stream-Implementierer konsultieren.
Beschreibbare Streams
Beschreibbare Streams sind eine Abstraktion für ein Ziel, in das Daten geschrieben werden.
Beispiele für Writable
-Streams sind:
- HTTP-Anfragen auf dem Client
- HTTP-Antworten auf dem Server
- fs-Schreibstreams
- zlib-Streams
- crypto-Streams
- TCP-Sockets
- stdin von Kindprozessen
process.stdout
,process.stderr
Einige dieser Beispiele sind tatsächlich Duplex
-Streams, die die Writable
-Schnittstelle implementieren.
Alle Writable
-Streams implementieren die durch die stream.Writable
-Klasse definierte Schnittstelle.
Obwohl sich spezifische Instanzen von Writable
-Streams in verschiedener Hinsicht unterscheiden können, folgen alle Writable
-Streams dem gleichen grundlegenden Nutzungsmuster, wie im folgenden Beispiel veranschaulicht:
const myStream = getWritableStreamSomehow()
myStream.write('einige Daten')
myStream.write('noch mehr Daten')
myStream.end('Daten schreiben beendet')
Klasse: stream.Writable
Hinzugefügt in: v0.9.4
Event: 'close'
[Verlauf]
Version | Änderungen |
---|---|
v10.0.0 | Füge die Option emitClose hinzu, um anzugeben, ob 'close' beim Zerstören ausgegeben wird. |
v0.9.4 | Hinzugefügt in: v0.9.4 |
Das 'close'
-Ereignis wird ausgelöst, wenn der Stream und alle seine zugrunde liegenden Ressourcen (z. B. ein Dateideskriptor) geschlossen wurden. Das Ereignis zeigt an, dass keine weiteren Ereignisse ausgelöst werden und keine weiteren Berechnungen erfolgen.
Ein Writable
-Stream löst immer das 'close'
-Ereignis aus, wenn er mit der Option emitClose
erstellt wird.
Event: 'drain'
Hinzugefügt in: v0.9.4
Wenn ein Aufruf von stream.write(chunk)
false
zurückgibt, wird das 'drain'
-Ereignis ausgelöst, wenn es angebracht ist, das Schreiben von Daten in den Stream fortzusetzen.
// Schreibt die Daten eine Million Mal in den bereitgestellten beschreibbaren Stream.
// Achten Sie auf den Gegendruck.
function writeOneMillionTimes(writer, data, encoding, callback) {
let i = 1000000
write()
function write() {
let ok = true
do {
i--
if (i === 0) {
// Letztes Mal!
writer.write(data, encoding, callback)
} else {
// Prüfen, ob wir fortfahren oder warten sollen.
// Übergeben Sie den Callback nicht, da wir noch nicht fertig sind.
ok = writer.write(data, encoding)
}
} while (i > 0 && ok)
if (i > 0) {
// Musste frühzeitig anhalten!
// Schreiben Sie noch etwas, sobald es abgelaufen ist.
writer.once('drain', write)
}
}
}
Ereignis: 'error'
Hinzugefügt in: v0.9.4
Das Ereignis 'error'
wird ausgelöst, wenn beim Schreiben oder Weiterleiten von Daten ein Fehler aufgetreten ist. Der Listener-Callback erhält beim Aufruf ein einzelnes Error
-Argument.
Der Stream wird geschlossen, wenn das Ereignis 'error'
ausgelöst wird, es sei denn, die Option autoDestroy
wurde beim Erstellen des Streams auf false
gesetzt.
Nach 'error'
sollten keine weiteren Ereignisse außer 'close'
ausgelöst werden (einschließlich 'error'
-Ereignisse).
Ereignis: 'finish'
Hinzugefügt in: v0.9.4
Das Ereignis 'finish'
wird ausgelöst, nachdem die Methode stream.end()
aufgerufen wurde und alle Daten in das zugrunde liegende System geschrieben wurden.
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
console.log('Alle Schreibvorgänge sind nun abgeschlossen.')
})
writer.end('Das ist das Ende\n')
Ereignis: 'pipe'
Hinzugefügt in: v0.9.4
src
<stream.Readable> Quellstream, der an diesen beschreibbaren Stream weiterleitet
Das Ereignis 'pipe'
wird ausgelöst, wenn die Methode stream.pipe()
auf einem lesbaren Stream aufgerufen wird und dieser beschreibbare Stream zu seinen Zielen hinzugefügt wird.
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
console.log('Etwas wird in den Writer geleitet.')
assert.equal(src, reader)
})
reader.pipe(writer)
Ereignis: 'unpipe'
Hinzugefügt in: v0.9.4
src
<stream.Readable> Der Quellstream, der unpiped diesen beschreibbaren Stream
Das Ereignis 'unpipe'
wird ausgelöst, wenn die Methode stream.unpipe()
auf einem Readable
-Stream aufgerufen wird und dieser Writable
-Stream aus seinen Zielen entfernt wird.
Dies wird auch ausgelöst, falls dieser Writable
-Stream einen Fehler ausgibt, wenn ein Readable
-Stream in ihn geleitet wird.
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
console.log('Etwas hat aufgehört, in den Writer zu leiten.')
assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()
Hinzugefügt in: v0.11.2
Die Methode writable.cork()
zwingt alle geschriebenen Daten, im Speicher zwischengespeichert zu werden. Die zwischengespeicherten Daten werden geleert, wenn entweder die Methoden stream.uncork()
oder stream.end()
aufgerufen werden.
Die Hauptabsicht von writable.cork()
ist es, eine Situation zu berücksichtigen, in der mehrere kleine Datenblöcke in schneller Folge in den Stream geschrieben werden. Anstatt sie sofort an das zugrunde liegende Ziel weiterzuleiten, puffert writable.cork()
alle Datenblöcke, bis writable.uncork()
aufgerufen wird. Diese werden dann alle an writable._writev()
übergeben, falls vorhanden. Dies verhindert eine Head-of-Line-Blocking-Situation, bei der Daten zwischengespeichert werden, während darauf gewartet wird, dass der erste kleine Datenblock verarbeitet wird. Die Verwendung von writable.cork()
ohne Implementierung von writable._writev()
kann jedoch die Durchsatzleistung beeinträchtigen.
Siehe auch: writable.uncork()
, writable._writev()
.
writable.destroy([error])
[Verlauf]
Version | Änderungen |
---|---|
v14.0.0 | Wirkt als No-Op bei einem Stream, der bereits zerstört wurde. |
v8.0.0 | Hinzugefügt in: v8.0.0 |
error
<Error> Optional, ein Fehler, der mit dem Ereignis'error'
ausgelöst werden soll.- Gibt zurück: <this>
Zerstört den Stream. Löst optional ein 'error'
-Ereignis und ein 'close'
-Ereignis aus (es sei denn, emitClose
ist auf false
gesetzt). Nach diesem Aufruf ist der beschreibbare Stream beendet, und nachfolgende Aufrufe von write()
oder end()
führen zu einem ERR_STREAM_DESTROYED
-Fehler. Dies ist eine destruktive und sofortige Möglichkeit, einen Stream zu zerstören. Vorherige Aufrufe von write()
wurden möglicherweise nicht geleert und können einen ERR_STREAM_DESTROYED
-Fehler auslösen. Verwenden Sie end()
anstelle von destroy
, wenn Daten vor dem Schließen geleert werden sollen, oder warten Sie auf das 'drain'
-Ereignis, bevor Sie den Stream zerstören.
const { Writable } = require('node:stream')
const myStream = new Writable()
const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.on('error', function wontHappen() {})
const { Writable } = require('node:stream')
const myStream = new Writable()
myStream.destroy()
myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED
Sobald destroy()
aufgerufen wurde, sind alle weiteren Aufrufe ein No-Op und es können keine weiteren Fehler außer von _destroy()
als 'error'
ausgelöst werden.
Implementierer sollten diese Methode nicht überschreiben, sondern stattdessen writable._destroy()
implementieren.
writable.closed
Hinzugefügt in: v18.0.0
Ist true
, nachdem 'close'
emittiert wurde.
writable.destroyed
Hinzugefügt in: v8.0.0
Ist true
, nachdem writable.destroy()
aufgerufen wurde.
const { Writable } = require('node:stream')
const myStream = new Writable()
console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])
[Verlauf]
Version | Änderungen |
---|---|
v22.0.0, v20.13.0 | Das chunk -Argument kann jetzt eine TypedArray - oder DataView -Instanz sein. |
v15.0.0 | Die callback wird vor 'finish' oder bei einem Fehler aufgerufen. |
v14.0.0 | Die callback wird aufgerufen, wenn 'finish' oder 'error' emittiert wird. |
v10.0.0 | Diese Methode gibt jetzt eine Referenz auf writable zurück. |
v8.0.0 | Das chunk -Argument kann jetzt eine Uint8Array -Instanz sein. |
v0.9.4 | Hinzugefügt in: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> Optionale Daten zum Schreiben. Für Streams, die nicht im Objektmodus arbeiten, musschunk
ein <string>, <Buffer>, <TypedArray> oder <DataView> sein. Für Streams im Objektmodus kannchunk
ein beliebiger JavaScript-Wert außernull
sein.encoding
<string> Die Kodierung, wennchunk
ein String istcallback
<Function> Callback für den Zeitpunkt, wenn der Stream beendet ist.- Gibt zurück: <this>
Der Aufruf der writable.end()
-Methode signalisiert, dass keine weiteren Daten in das Writable
geschrieben werden. Die optionalen Argumente chunk
und encoding
ermöglichen es, einen letzten zusätzlichen Datenblock zu schreiben, unmittelbar bevor der Stream geschlossen wird.
Der Aufruf der Methode stream.write()
nach dem Aufruf von stream.end()
löst einen Fehler aus.
// Schreibe 'hallo, ' und beende dann mit 'welt!'.
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hallo, ')
file.end('welt!')
// Jetzt mehr zu schreiben ist nicht erlaubt!
writable.setDefaultEncoding(encoding)
[Verlauf]
Version | Änderungen |
---|---|
v6.1.0 | Diese Methode gibt jetzt eine Referenz auf writable zurück. |
v0.11.15 | Hinzugefügt in: v0.11.15 |
Die Methode writable.setDefaultEncoding()
setzt die Standard encoding
für einen Writable
Stream.
writable.uncork()
Hinzugefügt in: v0.11.2
Die Methode writable.uncork()
leert alle Daten, die seit dem Aufruf von stream.cork()
gepuffert wurden.
Wenn writable.cork()
und writable.uncork()
verwendet werden, um die Pufferung von Schreibvorgängen in einen Stream zu verwalten, verschieben Sie Aufrufe von writable.uncork()
mithilfe von process.nextTick()
. Dies ermöglicht das Batching aller writable.write()
-Aufrufe, die innerhalb einer bestimmten Node.js-Ereignisschleifenphase auftreten.
stream.cork()
stream.write('einige ')
stream.write('daten ')
process.nextTick(() => stream.uncork())
Wenn die Methode writable.cork()
mehrmals für einen Stream aufgerufen wird, muss die gleiche Anzahl von Aufrufen von writable.uncork()
aufgerufen werden, um die gepufferten Daten zu leeren.
stream.cork()
stream.write('einige ')
stream.cork()
stream.write('daten ')
process.nextTick(() => {
stream.uncork()
// Die Daten werden erst geleert, wenn uncork() ein zweites Mal aufgerufen wird.
stream.uncork()
})
Siehe auch: writable.cork()
.
writable.writable
Hinzugefügt in: v11.4.0
Ist true
, wenn es sicher ist, writable.write()
aufzurufen, was bedeutet, dass der Stream nicht zerstört, fehlerhaft oder beendet wurde.
writable.writableAborted
Hinzugefügt in: v18.0.0, v16.17.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
Gibt zurück, ob der Stream zerstört oder ein Fehler aufgetreten ist, bevor 'finish'
ausgegeben wurde.
writable.writableEnded
Hinzugefügt in: v12.9.0
Ist true
, nachdem writable.end()
aufgerufen wurde. Diese Eigenschaft gibt nicht an, ob die Daten geleert wurden. Verwenden Sie stattdessen writable.writableFinished
.
writable.writableCorked
Hinzugefügt in: v13.2.0, v12.16.0
Anzahl der Aufrufe von writable.uncork()
, die erforderlich sind, um den Stream vollständig zu entkorken.
writable.errored
Hinzugefügt in: v18.0.0
Gibt einen Fehler zurück, wenn der Stream mit einem Fehler zerstört wurde.
writable.writableFinished
Hinzugefügt in: v12.6.0
Wird unmittelbar vor dem Ausgeben des Ereignisses 'finish'
auf true
gesetzt.
writable.writableHighWaterMark
Hinzugefügt in: v9.3.0
Gibt den Wert von highWaterMark
zurück, der bei der Erstellung dieses Writable
übergeben wurde.
writable.writableLength
Hinzugefügt in: v9.4.0
Diese Eigenschaft enthält die Anzahl der Bytes (oder Objekte) in der Warteschlange, die zum Schreiben bereit sind. Der Wert liefert Introspektionsdaten zum Status von highWaterMark
.
writable.writableNeedDrain
Hinzugefügt in: v15.2.0, v14.17.0
Ist true
, wenn der Puffer des Streams voll ist und der Stream 'drain'
ausgibt.
writable.writableObjectMode
Hinzugefügt in: v12.3.0
Getter für die Eigenschaft objectMode
eines gegebenen Writable
-Streams.
writable[Symbol.asyncDispose]()
Hinzugefügt in: v22.4.0, v20.16.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
Ruft writable.destroy()
mit einem AbortError
auf und gibt eine Zusage zurück, die erfüllt wird, wenn der Stream beendet ist.
writable.write(chunk[, encoding][, callback])
[Verlauf]
Version | Änderungen |
---|---|
v22.0.0, v20.13.0 | Das chunk -Argument kann jetzt eine TypedArray - oder DataView -Instanz sein. |
v8.0.0 | Das chunk -Argument kann jetzt eine Uint8Array -Instanz sein. |
v6.0.0 | Das Übergeben von null als chunk -Parameter wird jetzt immer als ungültig betrachtet, auch im Objektmodus. |
v0.9.4 | Hinzugefügt in: v0.9.4 |
chunk
<string> | <Buffer> | <TypedArray> | <DataView> | <any> Optionale Daten zum Schreiben. Für Streams, die nicht im Objektmodus arbeiten, musschunk
ein <string>, <Buffer>, <TypedArray> oder <DataView> sein. Für Streams im Objektmodus kannchunk
ein beliebiger JavaScript-Wert außernull
sein.encoding
<string> | <null> Die Kodierung, wennchunk
eine Zeichenkette ist. Standard:'utf8'
callback
<Function> Callback für wenn dieser Datenblock geleert wurde.- Gibt zurück: <boolean>
false
, wenn der Stream möchte, dass der aufrufende Code wartet, bis das'drain'
-Ereignis ausgelöst wird, bevor er mit dem Schreiben zusätzlicher Daten fortfährt; andernfallstrue
.
Die Methode writable.write()
schreibt einige Daten in den Stream und ruft den bereitgestellten callback
auf, sobald die Daten vollständig verarbeitet wurden. Wenn ein Fehler auftritt, wird der callback
mit dem Fehler als erstem Argument aufgerufen. Der callback
wird asynchron und vor der Auslösung von 'error'
aufgerufen.
Der Rückgabewert ist true
, wenn der interne Puffer kleiner ist als das highWaterMark
, das bei der Erstellung des Streams nach der Aufnahme von chunk
konfiguriert wurde. Wenn false
zurückgegeben wird, sollten weitere Versuche, Daten in den Stream zu schreiben, gestoppt werden, bis das Ereignis 'drain'
ausgelöst wird.
Während ein Stream nicht geleert wird, werden Aufrufe von write()
chunk
puffern und false zurückgeben. Sobald alle aktuell gepufferten Blöcke geleert sind (vom Betriebssystem zur Lieferung angenommen), wird das 'drain'
-Ereignis ausgelöst. Sobald write()
false zurückgibt, schreiben Sie keine weiteren Blöcke, bis das 'drain'
-Ereignis ausgelöst wird. Während das Aufrufen von write()
auf einem Stream, der nicht geleert wird, erlaubt ist, puffert Node.js alle geschriebenen Blöcke, bis die maximale Speichernutzung eintritt, an der Stelle wird bedingungslos abgebrochen. Selbst bevor es abbricht, verursacht eine hohe Speichernutzung eine schlechte Leistung des Garbage Collectors und einen hohen RSS (der normalerweise nicht an das System zurückgegeben wird, auch wenn der Speicher nicht mehr benötigt wird). Da TCP-Sockets möglicherweise niemals geleert werden, wenn der Remote-Peer die Daten nicht liest, kann das Schreiben eines Sockets, der nicht geleert wird, zu einer aus der Ferne ausnutzbaren Schwachstelle führen.
Das Schreiben von Daten, während der Stream nicht geleert wird, ist besonders problematisch für einen Transform
, da die Transform
-Streams standardmäßig angehalten werden, bis sie per Pipe verbunden werden oder ein 'data'
- oder 'readable'
-Ereignishandler hinzugefügt wird.
Wenn die zu schreibenden Daten bei Bedarf generiert oder abgerufen werden können, wird empfohlen, die Logik in einen Readable
zu kapseln und stream.pipe()
zu verwenden. Wenn es jedoch bevorzugt wird, write()
aufzurufen, ist es möglich, den Gegendruck zu berücksichtigen und Speicherprobleme mithilfe des Ereignisses 'drain'
zu vermeiden:
function write(data, cb) {
if (!stream.write(data)) {
stream.once('drain', cb)
} else {
process.nextTick(cb)
}
}
// Warten Sie, bis cb aufgerufen wird, bevor Sie weitere Schreibvorgänge durchführen.
write('hallo', () => {
console.log('Schreiben abgeschlossen, jetzt weitere Schreibvorgänge durchführen.')
})
Ein Writable
-Stream im Objektmodus ignoriert immer das Argument encoding
.
Lesbare Streams
Lesbare Streams sind eine Abstraktion für eine Quelle, aus der Daten konsumiert werden.
Beispiele für Readable
-Streams sind:
- HTTP-Antworten auf dem Client
- HTTP-Anfragen auf dem Server
- fs-Lesestreams
- zlib-Streams
- crypto-Streams
- TCP-Sockets
- stdout und stderr von Kindprozessen
process.stdin
Alle Readable
-Streams implementieren die Schnittstelle, die von der stream.Readable
-Klasse definiert wird.
Zwei Lesemodi
Readable
-Streams arbeiten effektiv in einem von zwei Modi: fließend und pausiert. Diese Modi sind unabhängig vom Objektmodus. Ein Readable
-Stream kann im Objektmodus sein oder nicht, unabhängig davon, ob er sich im fließenden Modus oder im pausierten Modus befindet.
- Im fließenden Modus werden Daten automatisch aus dem zugrunde liegenden System gelesen und einer Anwendung so schnell wie möglich über Ereignisse über die
EventEmitter
-Schnittstelle bereitgestellt. - Im pausierten Modus muss die
stream.read()
-Methode explizit aufgerufen werden, um Datenblöcke aus dem Stream zu lesen.
Alle Readable
-Streams beginnen im pausierten Modus, können aber auf eine der folgenden Arten in den fließenden Modus geschaltet werden:
- Hinzufügen eines
'data'
-Ereignishandlers. - Aufrufen der
stream.resume()
-Methode. - Aufrufen der
stream.pipe()
-Methode, um die Daten an einWritable
zu senden.
Der Readable
kann mit einer der folgenden Methoden wieder in den pausierten Modus geschaltet werden:
- Wenn es keine Pipe-Ziele gibt, durch Aufrufen der
stream.pause()
-Methode. - Wenn es Pipe-Ziele gibt, durch Entfernen aller Pipe-Ziele. Mehrere Pipe-Ziele können durch Aufrufen der
stream.unpipe()
-Methode entfernt werden.
Das wichtigste Konzept, an das man sich erinnern sollte, ist, dass ein Readable
keine Daten erzeugt, bis ein Mechanismus zur Verfügung gestellt wird, der diese Daten entweder konsumiert oder ignoriert. Wenn der Konsummechanismus deaktiviert oder entfernt wird, wird der Readable
versuchen, die Datenerzeugung zu stoppen.
Aus Gründen der Abwärtskompatibilität werden 'data'
-Ereignishandler nicht automatisch den Stream pausieren. Wenn es außerdem Pipe-Ziele gibt, garantiert der Aufruf von stream.pause()
nicht, dass der Stream pausiert bleibt, sobald diese Ziele geleert sind und nach weiteren Daten fragen.
Wenn ein Readable
in den fließenden Modus geschaltet wird und keine Konsumenten zur Verfügung stehen, um die Daten zu verarbeiten, gehen diese Daten verloren. Dies kann z. B. auftreten, wenn die Methode readable.resume()
aufgerufen wird, ohne dass ein Listener an das 'data'
-Ereignis angehängt ist, oder wenn ein 'data'
-Ereignishandler aus dem Stream entfernt wird.
Das Hinzufügen eines 'readable'
-Ereignishandlers bewirkt automatisch, dass der Stream nicht mehr fließt, und die Daten müssen über readable.read()
konsumiert werden. Wenn der 'readable'
-Ereignishandler entfernt wird, beginnt der Stream wieder zu fließen, wenn ein 'data'
-Ereignishandler vorhanden ist.
Drei Zustände
Die „zwei Modi“ des Betriebs für einen Readable
-Stream sind eine vereinfachte Abstraktion für die kompliziertere interne Zustandsverwaltung, die innerhalb der Readable
-Stream-Implementierung stattfindet.
Genauer gesagt, befindet sich jeder Readable
-Stream zu einem bestimmten Zeitpunkt in einem von drei möglichen Zuständen:
readable.readableFlowing === null
readable.readableFlowing === false
readable.readableFlowing === true
Wenn readable.readableFlowing
null
ist, wird kein Mechanismus zum Verbrauch der Daten des Streams bereitgestellt. Daher erzeugt der Stream keine Daten. In diesem Zustand wechselt das Anhängen eines Listeners für das 'data'
-Ereignis, das Aufrufen der readable.pipe()
-Methode oder das Aufrufen der readable.resume()
-Methode readable.readableFlowing
auf true
, wodurch der Readable
-Stream beginnt, aktiv Ereignisse auszugeben, wenn Daten generiert werden.
Das Aufrufen von readable.pause()
, readable.unpipe()
oder das Empfangen von Gegendruck führt dazu, dass readable.readableFlowing
auf false
gesetzt wird, wodurch der Fluss von Ereignissen vorübergehend angehalten wird, aber nicht die Generierung von Daten. In diesem Zustand wechselt das Anhängen eines Listeners für das 'data'
-Ereignis nicht readable.readableFlowing
auf true
.
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()
pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing ist jetzt false.
pass.on('data', chunk => {
console.log(chunk.toString())
})
// readableFlowing ist immer noch false.
pass.write('ok') // Gibt kein 'data' aus.
pass.resume() // Muss aufgerufen werden, damit der Stream 'data' ausgibt.
// readableFlowing ist jetzt true.
Während readable.readableFlowing
false
ist, können sich Daten im internen Puffer des Streams ansammeln.
Wählen Sie einen API-Stil
Die Readable
-Stream-API hat sich über mehrere Node.js-Versionen hinweg entwickelt und bietet mehrere Methoden zum Verbrauch von Stream-Daten. Im Allgemeinen sollten Entwickler eine der Methoden zum Verbrauch von Daten wählen und sollten niemals mehrere Methoden verwenden, um Daten aus einem einzelnen Stream zu verbrauchen. Insbesondere die Verwendung einer Kombination aus on('data')
, on('readable')
, pipe()
oder asynchronen Iteratoren kann zu unintuitivem Verhalten führen.
Klasse: stream.Readable
Hinzugefügt in: v0.9.4
Ereignis: 'close'
[Verlauf]
Version | Änderungen |
---|---|
v10.0.0 | Option emitClose hinzugefügt, um anzugeben, ob 'close' bei der Zerstörung emittiert wird. |
v0.9.4 | Hinzugefügt in: v0.9.4 |
Das Ereignis 'close'
wird ausgelöst, wenn der Stream und alle seine zugrunde liegenden Ressourcen (z. B. ein Dateideskriptor) geschlossen wurden. Das Ereignis signalisiert, dass keine weiteren Ereignisse ausgelöst und keine weiteren Berechnungen durchgeführt werden.
Ein Readable
-Stream löst immer das Ereignis 'close'
aus, wenn er mit der Option emitClose
erstellt wird.
Ereignis: 'data'
Hinzugefügt in: v0.9.4
chunk
<Buffer> | <string> | <any> Der Datenchunk. Für Streams, die nicht im Objektmodus arbeiten, ist der Chunk entweder ein String oder einBuffer
. Für Streams, die sich im Objektmodus befinden, kann der Chunk ein beliebiger JavaScript-Wert außernull
sein.
Das Ereignis 'data'
wird jedes Mal ausgelöst, wenn der Stream die Eigentümerschaft an einem Datenchunk an einen Consumer abgibt. Dies kann geschehen, wenn der Stream durch Aufrufen von readable.pipe()
, readable.resume()
in den Fließmodus geschaltet wird oder durch Anhängen einer Listener-Callback-Funktion an das Ereignis 'data'
. Das Ereignis 'data'
wird auch ausgelöst, wenn die Methode readable.read()
aufgerufen wird und ein Datenchunk zurückgegeben werden kann.
Das Anhängen eines 'data'
-Ereignis-Listeners an einen Stream, der nicht explizit pausiert wurde, schaltet den Stream in den Fließmodus. Daten werden dann übergeben, sobald sie verfügbar sind.
Die Listener-Callback-Funktion erhält den Datenchunk als String, wenn eine Standardkodierung für den Stream mithilfe der Methode readable.setEncoding()
angegeben wurde; andernfalls werden die Daten als Buffer
übergeben.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Empfangen ${chunk.length} Bytes an Daten.`)
})
Ereignis: 'end'
Hinzugefügt in: v0.9.4
Das Ereignis 'end'
wird ausgelöst, wenn keine weiteren Daten mehr aus dem Stream konsumiert werden können.
Das 'end'
-Ereignis wird nicht ausgelöst, solange die Daten nicht vollständig konsumiert sind. Dies kann erreicht werden, indem der Stream in den fließenden Modus geschaltet wird oder indem stream.read()
wiederholt aufgerufen wird, bis alle Daten konsumiert wurden.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Empfangene ${chunk.length} Bytes an Daten.`)
})
readable.on('end', () => {
console.log('Es werden keine weiteren Daten mehr folgen.')
})
Ereignis: 'error'
Hinzugefügt in: v0.9.4
Das Ereignis 'error'
kann jederzeit von einer Readable
-Implementierung ausgelöst werden. Dies kann in der Regel auftreten, wenn der zugrunde liegende Stream aufgrund eines internen Fehlers keine Daten generieren kann oder wenn eine Stream-Implementierung versucht, einen ungültigen Datenblock zu übertragen.
Der Listener-Callback erhält ein einzelnes Error
-Objekt.
Ereignis: 'pause'
Hinzugefügt in: v0.9.4
Das Ereignis 'pause'
wird ausgelöst, wenn stream.pause()
aufgerufen wird und readableFlowing
nicht false
ist.
Ereignis: 'readable'
[Verlauf]
Version | Änderungen |
---|---|
v10.0.0 | Das 'readable' wird immer im nächsten Tick nach dem Aufruf von .push() ausgelöst. |
v10.0.0 | Die Verwendung von 'readable' erfordert das Aufrufen von .read() . |
v0.9.4 | Hinzugefügt in: v0.9.4 |
Das 'readable'
-Ereignis wird ausgelöst, wenn Daten zum Lesen aus dem Stream zur Verfügung stehen, bis zum konfigurierten High-Water-Mark (state.highWaterMark
). Es signalisiert effektiv, dass der Stream neue Informationen innerhalb des Puffers hat. Wenn Daten innerhalb dieses Puffers verfügbar sind, kann stream.read()
aufgerufen werden, um diese Daten abzurufen. Darüber hinaus kann das 'readable'
-Ereignis auch ausgelöst werden, wenn das Ende des Streams erreicht wurde.
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
// Es gibt jetzt Daten zum Lesen.
let data
while ((data = this.read()) !== null) {
console.log(data)
}
})
Wenn das Ende des Streams erreicht wurde, gibt der Aufruf von stream.read()
null
zurück und löst das 'end'
-Ereignis aus. Dies gilt auch, wenn es nie Daten zum Lesen gab. Im folgenden Beispiel ist foo.txt
beispielsweise eine leere Datei:
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
console.log('end')
})
Die Ausgabe beim Ausführen dieses Skripts lautet:
$ node test.js
readable: null
end
In einigen Fällen bewirkt das Anhängen eines Listeners für das 'readable'
-Ereignis, dass eine gewisse Menge an Daten in einen internen Puffer gelesen wird.
Im Allgemeinen sind die Mechanismen readable.pipe()
und 'data'
-Ereignis einfacher zu verstehen als das 'readable'
-Ereignis. Die Behandlung von 'readable'
kann jedoch zu einem höheren Durchsatz führen.
Wenn sowohl 'readable'
als auch 'data'
gleichzeitig verwendet werden, hat 'readable'
Vorrang bei der Steuerung des Flusses, d. h. 'data'
wird nur dann ausgelöst, wenn stream.read()
aufgerufen wird. Die Eigenschaft readableFlowing
würde false
werden. Wenn 'data'
-Listener vorhanden sind, wenn 'readable'
entfernt wird, beginnt der Stream zu fließen, d. h. 'data'
-Ereignisse werden ohne Aufruf von .resume()
ausgelöst.
Ereignis: 'resume'
Hinzugefügt in: v0.9.4
Das 'resume'
-Ereignis wird ausgelöst, wenn stream.resume()
aufgerufen wird und readableFlowing
nicht true
ist.
readable.destroy([error])
[Verlauf]
Version | Änderungen |
---|---|
v14.0.0 | Fungiert als No-Op für einen Stream, der bereits zerstört wurde. |
v8.0.0 | Hinzugefügt in: v8.0.0 |
Zerstört den Stream. Löst optional ein 'error'
-Ereignis und ein 'close'
-Ereignis aus (es sei denn, emitClose
ist auf false
gesetzt). Nach diesem Aufruf gibt der lesbare Stream alle internen Ressourcen frei und nachfolgende Aufrufe von push()
werden ignoriert.
Sobald destroy()
aufgerufen wurde, sind alle weiteren Aufrufe ein No-Op und es werden keine weiteren Fehler als 'error'
ausgegeben, außer von _destroy()
.
Implementierer sollten diese Methode nicht überschreiben, sondern stattdessen readable._destroy()
implementieren.
readable.closed
Hinzugefügt in: v18.0.0
Ist true
, nachdem 'close'
ausgelöst wurde.
readable.destroyed
Hinzugefügt in: v8.0.0
Ist true
, nachdem readable.destroy()
aufgerufen wurde.
readable.isPaused()
Hinzugefügt in: v0.11.14
- Gibt zurück: <boolean>
Die readable.isPaused()
-Methode gibt den aktuellen Betriebszustand des Readable
zurück. Dies wird hauptsächlich vom Mechanismus verwendet, der der readable.pipe()
-Methode zugrunde liegt. In den meisten typischen Fällen gibt es keinen Grund, diese Methode direkt zu verwenden.
const readable = new stream.Readable()
readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()
Hinzugefügt in: v0.9.4
- Gibt zurück: <this>
Die Methode readable.pause()
bewirkt, dass ein Stream im Flussmodus keine 'data'
-Ereignisse mehr ausgibt und aus dem Flussmodus wechselt. Alle Daten, die verfügbar werden, bleiben im internen Puffer.
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
console.log(`Empfangen ${chunk.length} Byte an Daten.`)
readable.pause()
console.log('Es werden für 1 Sekunde keine weiteren Daten vorhanden sein.')
setTimeout(() => {
console.log('Jetzt werden die Daten wieder fließen.')
readable.resume()
}, 1000)
})
Die Methode readable.pause()
hat keine Auswirkung, wenn ein 'readable'
-Ereignis-Listener vorhanden ist.
readable.pipe(destination[, options])
Hinzugefügt in: v0.9.4
destination
<stream.Writable> Das Ziel zum Schreiben von Datenoptions
<Object> Pipe-Optionenend
<boolean> Beendet den Writer, wenn der Reader endet. Standard:true
.
Gibt zurück: <stream.Writable> Das Ziel, das eine Kette von Pipes ermöglicht, wenn es sich um einen
Duplex
- oder einenTransform
-Stream handelt
Die Methode readable.pipe()
hängt einen Writable
-Stream an den readable
an, wodurch dieser automatisch in den Flussmodus wechselt und alle seine Daten an den angehängten Writable
weiterleitet. Der Datenfluss wird automatisch so verwaltet, dass der Ziel-Writable
-Stream nicht von einem schnelleren Readable
-Stream überlastet wird.
Das folgende Beispiel leitet alle Daten vom readable
in eine Datei namens file.txt
:
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Alle Daten von readable gehen in 'file.txt'.
readable.pipe(writable)
Es ist möglich, mehrere Writable
-Streams an einen einzelnen Readable
-Stream anzuhängen.
Die Methode readable.pipe()
gibt eine Referenz auf den Ziel-Stream zurück, wodurch es möglich ist, Ketten von gepipten Streams einzurichten:
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)
Standardmäßig wird stream.end()
auf dem Ziel-Writable
-Stream aufgerufen, wenn der Quell-Readable
-Stream 'end'
ausgibt, sodass das Ziel nicht mehr beschreibbar ist. Um dieses Standardverhalten zu deaktivieren, kann die Option end
als false
übergeben werden, wodurch der Zielstream geöffnet bleibt:
reader.pipe(writer, { end: false })
reader.on('end', () => {
writer.end('Goodbye\n')
})
Eine wichtige Einschränkung ist, dass, wenn der Readable
-Stream während der Verarbeitung einen Fehler ausgibt, das Writable
-Ziel nicht automatisch geschlossen wird. Wenn ein Fehler auftritt, ist es notwendig, jeden Stream manuell zu schließen, um Speicherlecks zu verhindern.
Die Writable
-Streams process.stderr
und process.stdout
werden niemals geschlossen, bis der Node.js-Prozess beendet wird, unabhängig von den angegebenen Optionen.
readable.read([size])
Hinzugefügt in: v0.9.4
size
<number> Optionales Argument, um anzugeben, wie viele Daten gelesen werden sollen.- Gibt zurück: <string> | <Buffer> | <null> | <any>
Die readable.read()
-Methode liest Daten aus dem internen Puffer und gibt sie zurück. Wenn keine Daten zum Lesen verfügbar sind, wird null
zurückgegeben. Standardmäßig werden die Daten als Buffer
-Objekt zurückgegeben, es sei denn, eine Kodierung wurde mit der Methode readable.setEncoding()
angegeben oder der Stream arbeitet im Objektmodus.
Das optionale Argument size
gibt eine bestimmte Anzahl von zu lesenden Bytes an. Wenn nicht size
Bytes zum Lesen verfügbar sind, wird null
zurückgegeben, es sei denn, der Stream ist beendet. In diesem Fall werden alle im internen Puffer verbleibenden Daten zurückgegeben.
Wenn das Argument size
nicht angegeben wird, werden alle im internen Puffer enthaltenen Daten zurückgegeben.
Das Argument size
muss kleiner oder gleich 1 GiB sein.
Die readable.read()
-Methode sollte nur auf Readable
-Streams aufgerufen werden, die im pausierten Modus arbeiten. Im Fließmodus wird readable.read()
automatisch aufgerufen, bis der interne Puffer vollständig entleert ist.
const readable = getReadableStreamSomehow()
// 'readable' kann mehrmals ausgelöst werden, wenn Daten zwischengespeichert werden
readable.on('readable', () => {
let chunk
console.log('Stream ist lesbar (neue Daten im Puffer empfangen)')
// Verwenden Sie eine Schleife, um sicherzustellen, dass wir alle derzeit verfügbaren Daten lesen
while (null !== (chunk = readable.read())) {
console.log(`Lese ${chunk.length} Bytes an Daten...`)
}
})
// 'end' wird einmal ausgelöst, wenn keine Daten mehr verfügbar sind
readable.on('end', () => {
console.log('Ende des Streams erreicht.')
})
Jeder Aufruf von readable.read()
gibt einen Datenblock oder null
zurück, was bedeutet, dass im Moment keine weiteren Daten gelesen werden können. Diese Blöcke werden nicht automatisch verkettet. Da ein einzelner read()
-Aufruf nicht alle Daten zurückgibt, kann die Verwendung einer While-Schleife erforderlich sein, um Blöcke kontinuierlich zu lesen, bis alle Daten abgerufen wurden. Beim Lesen einer großen Datei kann .read()
vorübergehend null
zurückgeben, was darauf hindeutet, dass der gesamte zwischengespeicherte Inhalt verbraucht wurde, aber möglicherweise noch weitere Daten zwischengespeichert werden müssen. In solchen Fällen wird ein neues 'readable'
-Ereignis ausgelöst, sobald sich mehr Daten im Puffer befinden, und das Ereignis 'end'
signalisiert das Ende der Datenübertragung.
Daher ist es zum Lesen des gesamten Inhalts einer Datei von einem readable
erforderlich, Blöcke über mehrere 'readable'
-Ereignisse hinweg zu sammeln:
const chunks = []
readable.on('readable', () => {
let chunk
while (null !== (chunk = readable.read())) {
chunks.push(chunk)
}
})
readable.on('end', () => {
const content = chunks.join('')
})
Ein Readable
-Stream im Objektmodus gibt immer ein einzelnes Element von einem Aufruf von readable.read(size)
zurück, unabhängig vom Wert des Arguments size
.
Wenn die Methode readable.read()
einen Datenblock zurückgibt, wird auch ein 'data'
-Ereignis ausgelöst.
Der Aufruf von stream.read([size])
nach dem Auslösen des Ereignisses 'end'
gibt null
zurück. Es wird kein Laufzeitfehler ausgelöst.
readable.readable
Hinzugefügt in: v11.4.0
Ist true
, wenn es sicher ist, readable.read()
aufzurufen, was bedeutet, dass der Stream nicht zerstört wurde oder 'error'
oder 'end'
ausgegeben hat.
readable.readableAborted
Hinzugefügt in: v16.8.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
Gibt zurück, ob der Stream zerstört oder mit einem Fehler behaftet wurde, bevor 'end'
ausgegeben wurde.
readable.readableDidRead
Hinzugefügt in: v16.7.0, v14.18.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
Gibt zurück, ob 'data'
ausgegeben wurde.
readable.readableEncoding
Hinzugefügt in: v12.7.0
Getter für die Eigenschaft encoding
eines bestimmten Readable
-Streams. Die Eigenschaft encoding
kann mit der Methode readable.setEncoding()
gesetzt werden.
readable.readableEnded
Hinzugefügt in: v12.9.0
Wird true
, wenn das 'end'
-Ereignis ausgegeben wird.
readable.errored
Hinzugefügt in: v18.0.0
Gibt einen Fehler zurück, wenn der Stream mit einem Fehler zerstört wurde.
readable.readableFlowing
Hinzugefügt in: v9.4.0
Diese Eigenschaft spiegelt den aktuellen Zustand eines Readable
-Streams wider, wie im Abschnitt Drei Zustände beschrieben.
readable.readableHighWaterMark
Hinzugefügt in: v9.3.0
Gibt den Wert von highWaterMark
zurück, der beim Erstellen dieses Readable
übergeben wurde.
readable.readableLength
Hinzugefügt in: v9.4.0
Diese Eigenschaft enthält die Anzahl der Bytes (oder Objekte) in der Warteschlange, die zum Lesen bereitstehen. Der Wert liefert Introspektionsdaten zum Status von highWaterMark
.
readable.readableObjectMode
Hinzugefügt in: v12.3.0
Getter für die Eigenschaft objectMode
eines gegebenen Readable
-Streams.
readable.resume()
[Verlauf]
Version | Änderungen |
---|---|
v10.0.0 | resume() hat keine Auswirkung, wenn ein 'readable' -Event abgehört wird. |
v0.9.4 | Hinzugefügt in: v0.9.4 |
- Gibt zurück: <this>
Die Methode readable.resume()
bewirkt, dass ein explizit pausierter Readable
-Stream wieder 'data'
-Events ausgibt und den Stream in den fließenden Modus schaltet.
Die Methode readable.resume()
kann verwendet werden, um die Daten aus einem Stream vollständig zu konsumieren, ohne diese Daten tatsächlich zu verarbeiten:
getReadableStreamSomehow()
.resume()
.on('end', () => {
console.log('Das Ende wurde erreicht, aber es wurde nichts gelesen.')
})
Die Methode readable.resume()
hat keine Auswirkung, wenn ein 'readable'
-Event-Listener vorhanden ist.
readable.setEncoding(encoding)
Hinzugefügt in: v0.9.4
Die Methode readable.setEncoding()
setzt die Zeichenkodierung für Daten, die aus dem Readable
-Stream gelesen werden.
Standardmäßig wird keine Kodierung zugewiesen und Stream-Daten werden als Buffer
-Objekte zurückgegeben. Durch das Setzen einer Kodierung werden die Stream-Daten als Strings der angegebenen Kodierung zurückgegeben, anstatt als Buffer
-Objekte. Wenn Sie beispielsweise readable.setEncoding('utf8')
aufrufen, werden die Ausgabedaten als UTF-8-Daten interpretiert und als Strings übergeben. Wenn Sie readable.setEncoding('hex')
aufrufen, werden die Daten im hexadezimalen String-Format kodiert.
Der Readable
-Stream verarbeitet Multibyte-Zeichen, die über den Stream geliefert werden, ordnungsgemäß, die andernfalls falsch dekodiert würden, wenn sie einfach als Buffer
-Objekte aus dem Stream gezogen würden.
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
assert.equal(typeof chunk, 'string')
console.log('Es wurden %d Zeichen Stringdaten erhalten:', chunk.length)
})
readable.unpipe([destination])
Hinzugefügt in: v0.9.4
destination
<stream.Writable> Optionaler spezifischer Stream, von dem die Pipe getrennt werden soll- Gibt zurück: <this>
Die Methode readable.unpipe()
trennt einen Writable
-Stream, der zuvor mit der Methode stream.pipe()
verbunden wurde.
Wenn destination
nicht angegeben ist, werden alle Pipes getrennt.
Wenn destination
angegeben ist, aber keine Pipe dafür eingerichtet ist, tut die Methode nichts.
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Alle Daten von readable gehen in 'file.txt',
// aber nur für die erste Sekunde.
readable.pipe(writable)
setTimeout(() => {
console.log('Schreibe nicht mehr in file.txt.')
readable.unpipe(writable)
console.log('Dateistream manuell schließen.')
writable.end()
}, 1000)
readable.unshift(chunk[, encoding])
[Verlauf]
Version | Änderungen |
---|---|
v22.0.0, v20.13.0 | Das Argument chunk kann jetzt eine TypedArray - oder DataView -Instanz sein. |
v8.0.0 | Das Argument chunk kann jetzt eine Uint8Array -Instanz sein. |
v0.9.11 | Hinzugefügt in: v0.9.11 |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Datenchunk, der in die Lesewarteschlange zurückgeschoben werden soll. Für Streams, die nicht im Objektmodus arbeiten, musschunk
ein <string>, <Buffer>, <TypedArray>, <DataView> odernull
sein. Für Objektmodus-Streams kannchunk
ein beliebiger JavaScript-Wert sein.encoding
<string> Kodierung von String-Chunks. Muss eine gültigeBuffer
-Kodierung sein, z. B.'utf8'
oder'ascii'
.
Das Übergeben von chunk
als null
signalisiert das Ende des Streams (EOF) und verhält sich gleich wie readable.push(null)
, wonach keine Daten mehr geschrieben werden können. Das EOF-Signal wird am Ende des Puffers platziert und alle gepufferten Daten werden weiterhin geleert.
Die Methode readable.unshift()
schiebt einen Datenchunk zurück in den internen Puffer. Dies ist in bestimmten Situationen nützlich, in denen ein Stream von Code verarbeitet wird, der eine gewisse Datenmenge "nicht verbrauchen" muss, die er optimistisch aus der Quelle gezogen hat, sodass die Daten an eine andere Partei weitergegeben werden können.
Die Methode stream.unshift(chunk)
kann nicht aufgerufen werden, nachdem das Ereignis 'end'
ausgelöst wurde, da sonst ein Laufzeitfehler ausgelöst wird.
Entwickler, die stream.unshift()
verwenden, sollten häufig den Wechsel zur Verwendung eines Transform
-Streams in Betracht ziehen. Weitere Informationen finden Sie im Abschnitt API für Stream-Implementierer.
// Header abziehen, der durch \n\n begrenzt ist.
// unshift() verwenden, wenn wir zu viel bekommen.
// Den Callback mit (error, header, stream) aufrufen.
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
stream.on('error', callback)
stream.on('readable', onReadable)
const decoder = new StringDecoder('utf8')
let header = ''
function onReadable() {
let chunk
while (null !== (chunk = stream.read())) {
const str = decoder.write(chunk)
if (str.includes('\n\n')) {
// Headergrenze gefunden.
const split = str.split(/\n\n/)
header += split.shift()
const remaining = split.join('\n\n')
const buf = Buffer.from(remaining, 'utf8')
stream.removeListener('error', callback)
// Den 'readable'-Listener vor dem Unshifting entfernen.
stream.removeListener('readable', onReadable)
if (buf.length) stream.unshift(buf)
// Jetzt kann der Nachrichtentext aus dem Stream gelesen werden.
callback(null, header, stream)
return
}
// Header wird noch gelesen.
header += str
}
}
}
Im Gegensatz zu stream.push(chunk)
beendet stream.unshift(chunk)
den Leseprozess nicht, indem der interne Lesezustand des Streams zurückgesetzt wird. Dies kann zu unerwarteten Ergebnissen führen, wenn readable.unshift()
während eines Lesevorgangs aufgerufen wird (d. h. innerhalb einer stream._read()
-Implementierung in einem benutzerdefinierten Stream). Wenn dem Aufruf von readable.unshift()
ein sofortiges stream.push('')
folgt, wird der Lesezustand entsprechend zurückgesetzt. Es ist jedoch am besten, den Aufruf von readable.unshift()
während der Durchführung eines Lesevorgangs zu vermeiden.
readable.wrap(stream)
Hinzugefügt in: v0.9.4
Vor Node.js 0.10 implementierten Streams nicht die gesamte node:stream
Modul-API, wie sie derzeit definiert ist. (Siehe Kompatibilität für weitere Informationen.)
Wenn eine ältere Node.js-Bibliothek verwendet wird, die 'data'
Ereignisse ausgibt und eine stream.pause()
Methode hat, die nur beratend ist, kann die readable.wrap()
Methode verwendet werden, um einen Readable
Stream zu erstellen, der den alten Stream als Datenquelle verwendet.
Es wird selten notwendig sein, readable.wrap()
zu verwenden, aber die Methode wurde als Komfortfunktion für die Interaktion mit älteren Node.js Anwendungen und Bibliotheken bereitgestellt.
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)
myReader.on('readable', () => {
myReader.read() // usw.
})
readable[Symbol.asyncIterator]()
[Verlauf]
Version | Änderungen |
---|---|
v11.14.0 | Symbol.asyncIterator Unterstützung ist nicht mehr experimentell. |
v10.0.0 | Hinzugefügt in: v10.0.0 |
- Gibt zurück: <AsyncIterator> um den Stream vollständig zu konsumieren.
const fs = require('node:fs')
async function print(readable) {
readable.setEncoding('utf8')
let data = ''
for await (const chunk of readable) {
data += chunk
}
console.log(data)
}
print(fs.createReadStream('file')).catch(console.error)
Wenn die Schleife mit einem break
, return
oder einem throw
beendet wird, wird der Stream zerstört. Mit anderen Worten, das Iterieren über einen Stream verbraucht den Stream vollständig. Der Stream wird in Blöcken mit einer Größe, die der Option highWaterMark
entspricht, gelesen. Im obigen Codebeispiel werden die Daten in einem einzigen Block sein, wenn die Datei weniger als 64 KiB Daten enthält, da keine highWaterMark
Option für fs.createReadStream()
bereitgestellt wird.
readable[Symbol.asyncDispose]()
Hinzugefügt in: v20.4.0, v18.18.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
Ruft readable.destroy()
mit einem AbortError
auf und gibt ein Promise zurück, das erfüllt wird, wenn der Stream beendet ist.
readable.compose(stream[, options])
Hinzugefügt in: v19.1.0, v18.13.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
stream
<Stream> | <Iterable> | <AsyncIterable> | <Function>options
<Object>signal
<AbortSignal> ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
Gibt zurück: <Duplex> ein Stream, der mit dem Stream
stream
zusammengesetzt wurde.
import { Readable } from 'node:stream'
async function* splitToWords(source) {
for await (const chunk of source) {
const words = String(chunk).split(' ')
for (const word of words) {
yield word
}
}
}
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()
console.log(words) // gibt ['this', 'is', 'compose', 'as', 'operator'] aus
Siehe stream.compose
für weitere Informationen.
readable.iterator([options])
Hinzugefügt in: v16.3.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
options
<Object>destroyOnReturn
<boolean> Wennfalse
gesetzt ist, wird durch den Aufruf vonreturn
auf dem asynchronen Iterator oder durch das Beenden einerfor await...of
-Iteration mit einembreak
,return
oderthrow
der Stream nicht zerstört. Standard:true
.
Gibt zurück: <AsyncIterator> um den Stream zu verarbeiten.
Der von dieser Methode erstellte Iterator gibt Benutzern die Möglichkeit, die Zerstörung des Streams abzubrechen, wenn die for await...of
-Schleife durch return
, break
oder throw
verlassen wird, oder ob der Iterator den Stream zerstören soll, wenn der Stream während der Iteration einen Fehler ausgibt.
const { Readable } = require('node:stream')
async function printIterator(readable) {
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk) // 1
break
}
console.log(readable.destroyed) // false
for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
console.log(chunk) // Gibt 2 und dann 3 aus
}
console.log(readable.destroyed) // True, Stream wurde vollständig verbraucht
}
async function printSymbolAsyncIterator(readable) {
for await (const chunk of readable) {
console.log(chunk) // 1
break
}
console.log(readable.destroyed) // true
}
async function showBoth() {
await printIterator(Readable.from([1, 2, 3]))
await printSymbolAsyncIterator(Readable.from([1, 2, 3]))
}
showBoth()
readable.map(fn[, options])
[Historie]
Version | Änderungen |
---|---|
v20.7.0, v18.19.0 | highWaterMark in Optionen hinzugefügt. |
v17.4.0, v16.14.0 | Hinzugefügt in: v17.4.0, v16.14.0 |
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
fn
<Funktion> | <AsyncFunction> Eine Funktion zum Mappen über jeden Chunk im Stream.data
<any> Ein Daten-Chunk aus dem Stream.options
<Objekt>signal
<AbortSignal> wird abgebrochen, wenn der Stream zerstört wird, wodurch derfn
-Aufruf frühzeitig abgebrochen werden kann.
options
<Objekt>concurrency
<number> Die maximale Anzahl gleichzeitiger Aufrufe vonfn
im Stream. Standard:1
.highWaterMark
<number> Wie viele Elemente zwischengespeichert werden sollen, während auf die Konsumierung der gemappten Elemente durch den Benutzer gewartet wird. Standard:concurrency * 2 - 1
.signal
<AbortSignal> ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
Gibt zurück: <Readable> ein Stream, der mit der Funktion
fn
gemappt wurde.
Diese Methode ermöglicht das Mappen über den Stream. Die Funktion fn
wird für jeden Chunk im Stream aufgerufen. Wenn die Funktion fn
ein Promise zurückgibt, wird dieses Promise await
ed, bevor es an den Ergebnisstream weitergegeben wird.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// Mit einem synchronen Mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).map(x => x * 2)) {
console.log(chunk) // 2, 4, 6, 8
}
// Mit einem asynchronen Mapper, der maximal 2 Abfragen gleichzeitig durchführt.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
domain => resolver.resolve4(domain),
{ concurrency: 2 }
)
for await (const result of dnsResults) {
console.log(result) // Protokolliert das DNS-Ergebnis von resolver.resolve4.
}
readable.filter(fn[, options])
[Verlauf]
Version | Änderungen |
---|---|
v20.7.0, v18.19.0 | highWaterMark in Optionen hinzugefügt. |
v17.4.0, v16.14.0 | Hinzugefügt in: v17.4.0, v16.14.0 |
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
fn
<Funktion> | <AsyncFunction> Eine Funktion zum Filtern von Chunks aus dem Stream.data
<any> Ein Chunk von Daten aus dem Stream.options
<Object>signal
<AbortSignal> abgebrochen, wenn der Stream zerstört wird, was es ermöglicht, denfn
-Aufruf frühzeitig abzubrechen.
options
<Object>concurrency
<number> Die maximale Anzahl von gleichzeitigen Aufrufen vonfn
im Stream auf einmal. Standard:1
.highWaterMark
<number> Wie viele Elemente gepuffert werden sollen, während auf die Nutzung der gefilterten Elemente durch den Benutzer gewartet wird. Standard:concurrency * 2 - 1
.signal
<AbortSignal> ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
Gibt zurück: <Readable> einen Stream, der mit dem Prädikat
fn
gefiltert wurde.
Diese Methode ermöglicht das Filtern des Streams. Für jeden Chunk im Stream wird die fn
-Funktion aufgerufen, und wenn sie einen Truthy-Wert zurückgibt, wird der Chunk an den Ergebnisstream weitergegeben. Wenn die fn
-Funktion ein Promise zurückgibt, wird auf dieses Promise gewartet (await
).
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// Mit einem synchronen Prädikat.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
console.log(chunk) // 3, 4
}
// Mit einem asynchronen Prädikat, das maximal 2 Abfragen gleichzeitig durchführt.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).filter(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address.ttl > 60
},
{ concurrency: 2 }
)
for await (const result of dnsResults) {
// Protokolliert Domains mit mehr als 60 Sekunden im aufgelösten DNS-Eintrag.
console.log(result)
}
readable.forEach(fn[, options])
Hinzugefügt in: v17.5.0, v16.15.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
fn
<Funktion> | <AsyncFunction> eine Funktion, die für jeden Chunk des Streams aufgerufen wird.data
<any> ein Daten-Chunk aus dem Stream.options
<Object>signal
<AbortSignal> abgebrochen, wenn der Stream zerstört wird, wodurch derfn
-Aufruf frühzeitig abgebrochen werden kann.
options
<Object>concurrency
<number> die maximale Anzahl gleichzeitiger Aufrufe vonfn
, die gleichzeitig im Stream aufgerufen werden. Standard:1
.signal
<AbortSignal> ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
Gibt zurück: <Promise> ein Promise, wenn der Stream abgeschlossen ist.
Diese Methode ermöglicht die Iteration eines Streams. Für jeden Chunk im Stream wird die Funktion fn
aufgerufen. Wenn die Funktion fn
ein Promise zurückgibt, wird dieses Promise await
ed.
Diese Methode unterscheidet sich von for await...of
-Schleifen darin, dass sie optional Chunks gleichzeitig verarbeiten kann. Darüber hinaus kann eine forEach
-Iteration nur durch Übergabe einer signal
-Option und das Abbrechen des zugehörigen AbortController
gestoppt werden, während for await...of
mit break
oder return
gestoppt werden kann. In beiden Fällen wird der Stream zerstört.
Diese Methode unterscheidet sich vom Abhören des Ereignisses 'data'
dadurch, dass sie das readable
-Ereignis im zugrunde liegenden Mechanismus verwendet und die Anzahl gleichzeitiger fn
-Aufrufe begrenzen kann.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
// Mit einem synchronen Prädikat.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
console.log(chunk) // 3, 4
}
// Mit einem asynchronen Prädikat, wobei maximal 2 Abfragen gleichzeitig durchgeführt werden.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address
},
{ concurrency: 2 }
)
await dnsResults.forEach(result => {
// Protokolliert das Ergebnis, ähnlich wie `for await (const result of dnsResults)`
console.log(result)
})
console.log('fertig') // Stream ist abgeschlossen
readable.toArray([options])
Hinzugefügt in: v17.5.0, v16.15.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
options
<Object>signal
<AbortSignal> erlaubt das Abbrechen der toArray-Operation, wenn das Signal abgebrochen wird.
Gibt zurück: <Promise> ein Promise, das ein Array mit dem Inhalt des Streams enthält.
Diese Methode ermöglicht es, den Inhalt eines Streams einfach zu erhalten.
Da diese Methode den gesamten Stream in den Speicher liest, werden die Vorteile von Streams negiert. Sie ist für die Interoperabilität und Bequemlichkeit gedacht, nicht als die primäre Methode zum Verbrauchen von Streams.
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'
await Readable.from([1, 2, 3, 4]).toArray() // [1, 2, 3, 4]
// DNS-Abfragen gleichzeitig mit .map durchführen und
// die Ergebnisse mit toArray in einem Array sammeln
const dnsResults = await Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org'])
.map(
async domain => {
const { address } = await resolver.resolve4(domain, { ttl: true })
return address
},
{ concurrency: 2 }
)
.toArray()
readable.some(fn[, options])
Hinzugefügt in: v17.5.0, v16.15.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
fn
<Function> | <AsyncFunction> eine Funktion, die auf jedem Chunk des Streams aufgerufen wird.data
<any> ein Chunk von Daten aus dem Stream.options
<Object>signal
<AbortSignal> wird abgebrochen, wenn der Stream zerstört wird, wodurch derfn
-Aufruf frühzeitig abgebrochen werden kann.
options
<Object>concurrency
<number> die maximale Anzahl gleichzeitiger Aufrufe vonfn
auf den Stream auf einmal. Standard:1
.signal
<AbortSignal> erlaubt das Zerstören des Streams, wenn das Signal abgebrochen wird.
Gibt zurück: <Promise> ein Promise, das zu
true
ausgewertet wird, wennfn
für mindestens einen der Chunks einen Truthy-Wert zurückgegeben hat.
Diese Methode ähnelt Array.prototype.some
und ruft fn
für jeden Chunk im Stream auf, bis der erwartete Rückgabewert true
ist (oder ein beliebiger Truthy-Wert). Sobald ein fn
-Aufruf für einen Chunk als erwarteter Rückgabewert Truthy ist, wird der Stream zerstört und das Promise mit true
erfüllt. Wenn keiner der fn
-Aufrufe für die Chunks einen Truthy-Wert zurückgibt, wird das Promise mit false
erfüllt.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// Mit einem synchronen Prädikat.
await Readable.from([1, 2, 3, 4]).some(x => x > 2) // true
await Readable.from([1, 2, 3, 4]).some(x => x < 0) // false
// Mit einem asynchronen Prädikat, das maximal 2 Dateiprüfungen gleichzeitig durchführt.
const anyBigFile = await Readable.from(['file1', 'file2', 'file3']).some(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
console.log(anyBigFile) // `true`, wenn eine Datei in der Liste größer als 1 MB ist
console.log('fertig') // Stream ist beendet
readable.find(fn[, options])
Hinzugefügt in: v17.5.0, v16.17.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
fn
<Funktion> | <AsyncFunction> Eine Funktion, die für jeden Chunk des Streams aufgerufen wird.data
<any> Ein Daten-Chunk aus dem Stream.options
<Object>signal
<AbortSignal> wird abgebrochen, wenn der Stream zerstört wird, was einen frühen Abbruch desfn
-Aufrufs ermöglicht.
options
<Object>concurrency
<number> Die maximale Anzahl gleichzeitiger Aufrufe vonfn
im Stream. Standard:1
.signal
<AbortSignal> ermöglicht die Zerstörung des Streams, wenn das Signal abgebrochen wird.
Gibt zurück: <Promise> Ein Promise, das zu dem ersten Chunk ausgewertet wird, für den
fn
einen "truthy" Wert ergibt, oderundefined
, wenn kein Element gefunden wurde.
Diese Methode ist ähnlich wie Array.prototype.find
und ruft fn
für jeden Chunk im Stream auf, um einen Chunk mit einem "truthy" Wert für fn
zu finden. Sobald der erwartete Rückgabewert eines fn
-Aufrufs "truthy" ist, wird der Stream zerstört und das Promise wird mit dem Wert erfüllt, für den fn
einen "truthy" Wert zurückgegeben hat. Wenn alle fn
-Aufrufe für die Chunks einen "falsy" Wert zurückgeben, wird das Promise mit undefined
erfüllt.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// Mit einem synchronen Prädikat.
await Readable.from([1, 2, 3, 4]).find(x => x > 2) // 3
await Readable.from([1, 2, 3, 4]).find(x => x > 0) // 1
await Readable.from([1, 2, 3, 4]).find(x => x > 10) // undefined
// Mit einem asynchronen Prädikat, wobei maximal 2 Dateiüberprüfungen gleichzeitig durchgeführt werden.
const foundBigFile = await Readable.from(['file1', 'file2', 'file3']).find(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
console.log(foundBigFile) // Dateiname einer großen Datei, falls eine Datei in der Liste größer als 1 MB ist
console.log('done') // Stream ist beendet
readable.every(fn[, options])
Hinzugefügt in: v17.5.0, v16.15.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
fn
<Funktion> | <AsyncFunction> Eine Funktion, die für jeden Chunk des Streams aufgerufen wird.data
<beliebig> Ein Chunk von Daten aus dem Stream.options
<Objekt>signal
<AbortSignal> wird abgebrochen, wenn der Stream zerstört wird, wodurch derfn
-Aufruf frühzeitig abgebrochen werden kann.
options
<Objekt>concurrency
<number> Die maximale Anzahl gleichzeitiger Aufrufe vonfn
, die gleichzeitig für den Stream aufgerufen werden sollen. Standard:1
.signal
<AbortSignal> ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
Gibt zurück: <Promise> Eine Promise, die zu
true
ausgewertet wird, wennfn
für alle Chunks einen Truthy-Wert zurückgegeben hat.
Diese Methode ähnelt Array.prototype.every
und ruft fn
für jeden Chunk im Stream auf, um zu überprüfen, ob alle erwarteten Rückgabewerte für fn
einen Truthy-Wert haben. Sobald ein fn
-Aufruf für einen Chunk einen Falsy-Wert zurückgibt, wird der Stream zerstört und die Promise wird mit false
erfüllt. Wenn alle fn
-Aufrufe für die Chunks einen Truthy-Wert zurückgeben, wird die Promise mit true
erfüllt.
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'
// Mit einem synchronen Prädikat.
await Readable.from([1, 2, 3, 4]).every(x => x > 2) // false
await Readable.from([1, 2, 3, 4]).every(x => x > 0) // true
// Mit einem asynchronen Prädikat, das maximal 2 Dateiüberprüfungen gleichzeitig durchführt.
const allBigFiles = await Readable.from(['file1', 'file2', 'file3']).every(
async fileName => {
const stats = await stat(fileName)
return stats.size > 1024 * 1024
},
{ concurrency: 2 }
)
// `true`, wenn alle Dateien in der Liste größer als 1 MiB sind
console.log(allBigFiles)
console.log('done') // Stream wurde beendet
readable.flatMap(fn[, options])
Hinzugefügt in: v17.5.0, v16.15.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
fn
<Funktion> | <AsyncGeneratorFunction> | <AsyncFunction> Eine Funktion, die auf jeden Chunk im Stream angewendet wird.data
<any> Ein Datenchunk aus dem Stream.options
<Object>signal
<AbortSignal> wird abgebrochen, wenn der Stream zerstört wird, sodass derfn
-Aufruf frühzeitig abgebrochen werden kann.
options
<Object>concurrency
<number> Die maximale Anzahl gleichzeitiger Aufrufe vonfn
auf dem Stream. Standard:1
.signal
<AbortSignal> ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
Gibt zurück: <Readable> Ein Stream, der mit der Funktion
fn
flat-mapped wird.
Diese Methode gibt einen neuen Stream zurück, indem der angegebene Callback auf jeden Chunk des Streams angewendet und das Ergebnis dann vereinfacht wird.
Es ist möglich, einen Stream oder ein anderes iterierbares oder asynchron iterierbares Objekt von fn
zurückzugeben, und die resultierenden Streams werden in den zurückgegebenen Stream zusammengeführt (vereinfacht).
import { Readable } from 'node:stream'
import { createReadStream } from 'node:fs'
// Mit einem synchronen Mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap(x => [x, x])) {
console.log(chunk) // 1, 1, 2, 2, 3, 3, 4, 4
}
// Mit einem asynchronen Mapper, kombiniere den Inhalt von 4 Dateien
const concatResult = Readable.from(['./1.mjs', './2.mjs', './3.mjs', './4.mjs']).flatMap(fileName =>
createReadStream(fileName)
)
for await (const result of concatResult) {
// Dies enthält den Inhalt (alle Chunks) aller 4 Dateien
console.log(result)
}
readable.drop(limit[, options])
Hinzugefügt in: v17.5.0, v16.15.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
limit
<number> Die Anzahl der Chunks, die vom Readable entfernt werden sollen.options
<Object>signal
<AbortSignal> Ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
Gibt zurück: <Readable> Ein Stream, bei dem
limit
Chunks entfernt wurden.
Diese Methode gibt einen neuen Stream zurück, bei dem die ersten limit
Chunks entfernt wurden.
import { Readable } from 'node:stream'
await Readable.from([1, 2, 3, 4]).drop(2).toArray() // [3, 4]
readable.take(limit[, options])
Hinzugefügt in: v17.5.0, v16.15.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
limit
<number> Die Anzahl der Chunks, die vom Readable genommen werden sollen.options
<Object>signal
<AbortSignal> Ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
Gibt zurück: <Readable> Ein Stream, bei dem
limit
Chunks genommen wurden.
Diese Methode gibt einen neuen Stream zurück, der die ersten limit
Chunks enthält.
import { Readable } from 'node:stream'
await Readable.from([1, 2, 3, 4]).take(2).toArray() // [1, 2]
readable.reduce(fn[, initial[, options]])
Hinzugefügt in: v17.5.0, v16.15.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
fn
<Function> | <AsyncFunction> Eine Reduzierfunktion, die über jeden Chunk im Stream aufgerufen wird.previous
<any> Der Wert, der vom letzten Aufruf vonfn
erhalten wurde, oder derinitial
-Wert, falls angegeben, oder der erste Chunk des Streams ansonsten.data
<any> Ein Daten-Chunk aus dem Stream.options
<Object>signal
<AbortSignal> Wird abgebrochen, wenn der Stream zerstört wird, wodurch derfn
-Aufruf frühzeitig abgebrochen werden kann.
initial
<any> Der Anfangswert, der bei der Reduzierung verwendet werden soll.options
<Object>signal
<AbortSignal> Ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
Gibt zurück: <Promise> Ein Promise für den Endwert der Reduzierung.
Diese Methode ruft fn
für jeden Chunk des Streams der Reihe nach auf und übergibt das Ergebnis der Berechnung für das vorherige Element. Sie gibt ein Promise für den Endwert der Reduzierung zurück.
Wenn kein initial
-Wert angegeben wird, wird der erste Chunk des Streams als Anfangswert verwendet. Wenn der Stream leer ist, wird das Promise mit einem TypeError
mit der Code-Eigenschaft ERR_INVALID_ARGS
abgelehnt.
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'
const directoryPath = './src'
const filesInDir = await readdir(directoryPath)
const folderSize = await Readable.from(filesInDir).reduce(async (totalSize, file) => {
const { size } = await stat(join(directoryPath, file))
return totalSize + size
}, 0)
console.log(folderSize)
Die Reduzierfunktion iteriert die Stream-Elemente einzeln, was bedeutet, dass es keinen concurrency
-Parameter oder Parallelismus gibt. Um ein reduce
gleichzeitig auszuführen, können Sie die Async-Funktion in die readable.map
Methode extrahieren.
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'
const directoryPath = './src'
const filesInDir = await readdir(directoryPath)
const folderSize = await Readable.from(filesInDir)
.map(file => stat(join(directoryPath, file)), { concurrency: 2 })
.reduce((totalSize, { size }) => totalSize + size, 0)
console.log(folderSize)
Duplex- und Transformations-Streams
Klasse: stream.Duplex
[Verlauf]
Version | Änderungen |
---|---|
v6.8.0 | Instanzen von Duplex geben jetzt true zurück, wenn instanceof stream.Writable überprüft wird. |
v0.9.4 | Hinzugefügt in: v0.9.4 |
Duplex-Streams sind Streams, die sowohl die Readable
- als auch die Writable
-Schnittstelle implementieren.
Beispiele für Duplex
-Streams sind:
duplex.allowHalfOpen
Hinzugefügt in: v0.9.4
Wenn false
, dann beendet der Stream automatisch die beschreibbare Seite, wenn die lesbare Seite endet. Wird anfänglich durch die allowHalfOpen
-Konstruktoroption gesetzt, die standardmäßig true
ist.
Dies kann manuell geändert werden, um das Halb-Offen-Verhalten einer bestehenden Duplex
-Stream-Instanz zu ändern, muss aber geändert werden, bevor das 'end'
-Ereignis ausgegeben wird.
Klasse: stream.Transform
Hinzugefügt in: v0.9.4
Transformations-Streams sind Duplex
-Streams, bei denen die Ausgabe in irgendeiner Weise mit der Eingabe zusammenhängt. Wie alle Duplex
-Streams implementieren Transform
-Streams sowohl die Readable
- als auch die Writable
-Schnittstelle.
Beispiele für Transform
-Streams sind:
transform.destroy([error])
[Verlauf]
Version | Änderungen |
---|---|
v14.0.0 | Funktioniert als No-Op bei einem Stream, der bereits zerstört wurde. |
v8.0.0 | Hinzugefügt in: v8.0.0 |
Zerstört den Stream und gibt optional ein 'error'
-Ereignis aus. Nach diesem Aufruf würde der Transformationsstream alle internen Ressourcen freigeben. Implementierer sollten diese Methode nicht überschreiben, sondern stattdessen readable._destroy()
implementieren. Die Standardimplementierung von _destroy()
für Transform
gibt auch 'close'
aus, es sei denn, emitClose
ist auf false gesetzt.
Sobald destroy()
aufgerufen wurde, sind alle weiteren Aufrufe ein No-Op, und es können keine weiteren Fehler außer von _destroy()
als 'error'
ausgegeben werden.
stream.duplexPair([options])
Hinzugefügt in: v22.6.0, v20.17.0
options
<Object> Ein Wert, der an beideDuplex
-Konstruktoren übergeben wird, um Optionen wie die Pufferung festzulegen.- Gibt zurück: <Array> mit zwei
Duplex
-Instanzen.
Die Hilfsfunktion duplexPair
gibt ein Array mit zwei Elementen zurück, wobei jedes ein Duplex
-Stream ist, der mit der anderen Seite verbunden ist:
const [sideA, sideB] = duplexPair()
Was auch immer in einen Stream geschrieben wird, wird auf dem anderen lesbar gemacht. Es bietet ein Verhalten analog zu einer Netzwerkverbindung, bei der die vom Client geschriebenen Daten für den Server lesbar werden und umgekehrt.
Die Duplex-Streams sind symmetrisch; der eine oder der andere kann ohne Unterschied im Verhalten verwendet werden.
stream.finished(stream[, options], callback)
[Verlauf]
Version | Änderungen |
---|---|
v19.5.0 | Unterstützung für ReadableStream und WritableStream hinzugefügt. |
v15.11.0 | Die Option signal wurde hinzugefügt. |
v14.0.0 | finished(stream, cb) wartet auf das Ereignis 'close' , bevor der Callback aufgerufen wird. Die Implementierung versucht, Legacy-Streams zu erkennen und dieses Verhalten nur auf Streams anzuwenden, von denen erwartet wird, dass sie 'close' ausgeben. |
v14.0.0 | Das Ausgeben von 'close' vor 'end' auf einem Readable -Stream führt zu einem ERR_STREAM_PREMATURE_CLOSE -Fehler. |
v14.0.0 | Der Callback wird auf Streams aufgerufen, die bereits vor dem Aufruf von finished(stream, cb) beendet wurden. |
v10.0.0 | Hinzugefügt in: v10.0.0 |
stream
<Stream> | <ReadableStream> | <WritableStream> Ein lesbarer und/oder schreibbarer Stream/Webstream.options
<Object>error
<boolean> Wenn auffalse
gesetzt, wird ein Aufruf vonemit('error', err)
nicht als beendet betrachtet. Standard:true
.readable
<boolean> Wenn auffalse
gesetzt, wird der Callback aufgerufen, wenn der Stream endet, auch wenn der Stream möglicherweise noch lesbar ist. Standard:true
.writable
<boolean> Wenn auffalse
gesetzt, wird der Callback aufgerufen, wenn der Stream endet, auch wenn der Stream möglicherweise noch beschreibbar ist. Standard:true
.signal
<AbortSignal> ermöglicht das Abbrechen des Wartens auf das Ende des Streams. Der zugrunde liegende Stream wird nicht abgebrochen, wenn das Signal abgebrochen wird. Der Callback wird mit einemAbortError
aufgerufen. Alle registrierten Listener, die von dieser Funktion hinzugefügt wurden, werden ebenfalls entfernt.
callback
<Function> Eine Callback-Funktion, die ein optionales Fehlerargument entgegennimmt.- Gibt zurück: <Function> Eine Bereinigungsfunktion, die alle registrierten Listener entfernt.
Eine Funktion, um benachrichtigt zu werden, wenn ein Stream nicht mehr lesbar, beschreibbar ist oder einen Fehler oder ein vorzeitiges Close-Ereignis erfahren hat.
const { finished } = require('node:stream')
const fs = require('node:fs')
const rs = fs.createReadStream('archive.tar')
finished(rs, err => {
if (err) {
console.error('Stream fehlgeschlagen.', err)
} else {
console.log('Stream ist mit dem Lesen fertig.')
}
})
rs.resume() // Den Stream entleeren.
Besonders nützlich in Fehlerbehandlungsszenarien, in denen ein Stream vorzeitig zerstört wird (wie eine abgebrochene HTTP-Anfrage) und nicht 'end'
oder 'finish'
ausgibt.
Die finished
-API bietet eine Promise-Version.
stream.finished()
hinterlässt hängende Event-Listener (insbesondere 'error'
, 'end'
, 'finish'
und 'close'
), nachdem callback
aufgerufen wurde. Der Grund dafür ist, dass unerwartete 'error'
-Ereignisse (aufgrund falscher Stream-Implementierungen) keine unerwarteten Abstürze verursachen. Wenn dies unerwünschtes Verhalten ist, muss die zurückgegebene Bereinigungsfunktion im Callback aufgerufen werden:
const cleanup = finished(rs, err => {
cleanup()
// ...
})
stream.pipeline(source[, ...transforms], destination, callback)
stream.pipeline(streams, callback)
[Verlauf]
Version | Änderungen |
---|---|
v19.7.0, v18.16.0 | Unterstützung für Webstreams hinzugefügt. |
v18.0.0 | Das Übergeben eines ungültigen Callbacks an das callback -Argument wirft nun ERR_INVALID_ARG_TYPE anstelle von ERR_INVALID_CALLBACK . |
v14.0.0 | Der pipeline(..., cb) wartet auf das 'close' -Ereignis, bevor der Callback aufgerufen wird. Die Implementierung versucht, Legacy-Streams zu erkennen und dieses Verhalten nur auf Streams anzuwenden, von denen erwartet wird, dass sie 'close' ausgeben. |
v13.10.0 | Unterstützung für asynchrone Generatoren hinzugefügt. |
v10.0.0 | Hinzugefügt in: v10.0.0 |
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]>source
<Stream> | <Iterable> | <AsyncIterable> | <Function> | <ReadableStream>- Gibt zurück: <Iterable> | <AsyncIterable>
...transforms
<Stream> | <Function> | <TransformStream>source
<AsyncIterable>- Gibt zurück: <AsyncIterable>
destination
<Stream> | <Function> | <WritableStream>source
<AsyncIterable>- Gibt zurück: <AsyncIterable> | <Promise>
callback
<Function> Wird aufgerufen, wenn die Pipeline vollständig abgeschlossen ist.err
<Error>val
Aufgelöster Wert vonPromise
, der vondestination
zurückgegeben wird.
Gibt zurück: <Stream>
Eine Modulmethode zum Pipe-Verbinden zwischen Streams und Generatoren, die Fehler weiterleitet und ordnungsgemäß bereinigt und einen Callback bereitstellt, wenn die Pipeline abgeschlossen ist.
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// Verwenden Sie die Pipeline-API, um auf einfache Weise eine Reihe von Streams
// zusammen zu verbinden und benachrichtigt zu werden, wenn die Pipeline vollständig abgeschlossen ist.
// Eine Pipeline zum effizienten Gzippen einer potenziell riesigen Tar-Datei:
pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
if (err) {
console.error('Pipeline fehlgeschlagen.', err)
} else {
console.log('Pipeline erfolgreich.')
}
})
Die pipeline
-API bietet eine Promise-Version.
stream.pipeline()
ruft stream.destroy(err)
auf allen Streams auf, außer:
Readable
-Streams, die'end'
oder'close'
ausgegeben haben.Writable
-Streams, die'finish'
oder'close'
ausgegeben haben.
stream.pipeline()
hinterlässt hängende Event-Listener auf den Streams, nachdem der callback
aufgerufen wurde. Im Falle der Wiederverwendung von Streams nach einem Fehler kann dies zu Event-Listener-Lecks und verschluckten Fehlern führen. Wenn der letzte Stream lesbar ist, werden hängende Event-Listener entfernt, sodass der letzte Stream später verbraucht werden kann.
stream.pipeline()
schließt alle Streams, wenn ein Fehler auftritt. Die IncomingRequest
-Nutzung mit pipeline
könnte zu einem unerwarteten Verhalten führen, da der Socket zerstört würde, ohne die erwartete Antwort zu senden. Siehe das Beispiel unten:
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt')
pipeline(fileStream, res, err => {
if (err) {
console.log(err) // Keine solche Datei
// Diese Meldung kann nicht gesendet werden, da `pipeline` den Socket bereits zerstört hat.
return res.end('Fehler!!!')
}
})
})
stream.compose(...streams)
[History]
Version | Änderungen |
---|---|
v21.1.0, v20.10.0 | Unterstützung für die Stream-Klasse hinzugefügt. |
v19.8.0, v18.16.0 | Unterstützung für Webstreams hinzugefügt. |
v16.9.0 | Hinzugefügt in: v16.9.0 |
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - stream.compose
ist experimentell.
streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]> | <Duplex[]> | <Function>- Gibt zurück: <stream.Duplex>
Kombiniert zwei oder mehr Streams zu einem Duplex
-Stream, der in den ersten Stream schreibt und aus dem letzten liest. Jeder bereitgestellte Stream wird mit stream.pipeline
in den nächsten geleitet. Wenn einer der Streams einen Fehler aufweist, werden alle zerstört, einschließlich des äußeren Duplex
-Streams.
Da stream.compose
einen neuen Stream zurückgibt, der wiederum in andere Streams geleitet werden kann (und sollte), ermöglicht er die Komposition. Im Gegensatz dazu bilden die Streams, wenn sie an stream.pipeline
übergeben werden, typischerweise einen geschlossenen Kreislauf, wobei der erste Stream ein lesbarer Stream und der letzte ein beschreibbarer Stream ist.
Wenn eine Function
übergeben wird, muss es sich um eine Factory-Methode handeln, die eine source
Iterable
entgegennimmt.
import { compose, Transform } from 'node:stream'
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''))
},
})
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
}
let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf
}
console.log(res) // gibt 'HELLOWORLD' aus
stream.compose
kann verwendet werden, um asynchrone Iterables, Generatoren und Funktionen in Streams zu konvertieren.
AsyncIterable
wandelt sich in einen lesbarenDuplex
um. Darf nichtnull
liefern.AsyncGeneratorFunction
wandelt sich in einen lesbaren/beschreibbaren TransformDuplex
um. Muss eineAsyncIterable
-Quelle als ersten Parameter entgegennehmen. Darf nichtnull
liefern.AsyncFunction
wandelt sich in einen beschreibbarenDuplex
um. Muss entwedernull
oderundefined
zurückgeben.
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'
// Konvertiert AsyncIterable in lesbaren Duplex.
const s1 = compose(
(async function* () {
yield 'Hallo'
yield 'Welt'
})()
)
// Konvertiert AsyncGenerator in Transform Duplex.
const s2 = compose(async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase()
}
})
let res = ''
// Konvertiert AsyncFunction in beschreibbaren Duplex.
const s3 = compose(async function (source) {
for await (const chunk of source) {
res += chunk
}
})
await finished(compose(s1, s2, s3))
console.log(res) // gibt 'HELLOWORLD' aus
Siehe readable.compose(stream)
für stream.compose
als Operator.
stream.Readable.from(iterable[, options])
Hinzugefügt in: v12.3.0, v10.17.0
iterable
<Iterable> Objekt, das das Iterable-ProtokollSymbol.asyncIterator
oderSymbol.iterator
implementiert. Gibt ein 'error'-Ereignis aus, wenn ein Nullwert übergeben wird.options
<Object> Optionen, die annew stream.Readable([options])
übergeben werden. Standardmäßig setztReadable.from()
options.objectMode
auftrue
, es sei denn, dies wird explizit durch Setzen vonoptions.objectMode
auffalse
abgewählt.- Gibt zurück: <stream.Readable>
Eine Hilfsmethode zum Erstellen von lesbaren Streams aus Iteratoren.
const { Readable } = require('node:stream')
async function* generate() {
yield 'hallo'
yield 'streams'
}
const readable = Readable.from(generate())
readable.on('data', chunk => {
console.log(chunk)
})
Das Aufrufen von Readable.from(string)
oder Readable.from(buffer)
führt nicht dazu, dass die Strings oder Puffer iteriert werden, um aus Leistungsgründen der Semantik der anderen Streams zu entsprechen.
Wenn ein Iterable
-Objekt, das Promises enthält, als Argument übergeben wird, kann dies zu einer unbehandelten Ablehnung führen.
const { Readable } = require('node:stream')
Readable.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unbehandelte Ablehnung
])
stream.Readable.fromWeb(readableStream[, options])
Hinzugefügt in: v17.0.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
readableStream
<ReadableStream>options
<Object>encoding
<string>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
Gibt zurück: <stream.Readable>
stream.Readable.isDisturbed(stream)
Hinzugefügt in: v16.8.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
stream
<stream.Readable> | <ReadableStream>- Gibt zurück:
boolean
Gibt zurück, ob von dem Stream gelesen oder er abgebrochen wurde.
stream.isErrored(stream)
Hinzugefügt in: v17.3.0, v16.14.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
stream
<Readable> | <Writable> | <Duplex> | <WritableStream> | <ReadableStream>- Gibt zurück: <boolean>
Gibt zurück, ob im Stream ein Fehler aufgetreten ist.
stream.isReadable(stream)
Hinzugefügt in: v17.4.0, v16.14.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
stream
<Readable> | <Duplex> | <ReadableStream>- Gibt zurück: <boolean>
Gibt zurück, ob der Stream lesbar ist.
stream.Readable.toWeb(streamReadable[, options])
Hinzugefügt in: v17.0.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
streamReadable
<stream.Readable>options
<Object>strategy
<Object>highWaterMark
<number> Die maximale interne Warteschlangengröße (des erstelltenReadableStream
), bevor bei Lesevorgängen aus dem angegebenenstream.Readable
Gegendruck angewendet wird. Wenn kein Wert angegeben wird, wird er von dem angegebenenstream.Readable
übernommen.size
<Function> Eine Funktion, welche die Größe des angegebenen Datenchunks ermittelt. Wenn kein Wert angegeben wird, ist die Größe für alle Chunks1
.chunk
<any>- Gibt zurück: <number>
Gibt zurück: <ReadableStream>
stream.Writable.fromWeb(writableStream[, options])
Hinzugefügt in: v17.0.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
writableStream
<WritableStream>options
<Object>decodeStrings
<boolean>highWaterMark
<number>objectMode
<boolean>signal
<AbortSignal>
Gibt zurück: <stream.Writable>
stream.Writable.toWeb(streamWritable)
Hinzugefügt in: v17.0.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
streamWritable
<stream.Writable>- Gibt zurück: <WritableStream>
stream.Duplex.from(src)
[Verlauf]
Version | Änderungen |
---|---|
v19.5.0, v18.17.0 | Das Argument src kann jetzt ein ReadableStream oder WritableStream sein. |
v16.8.0 | Hinzugefügt in: v16.8.0 |
src
<Stream> | <Blob> | <ArrayBuffer> | <string> | <Iterable> | <AsyncIterable> | <AsyncGeneratorFunction> | <AsyncFunction> | <Promise> | <Object> | <ReadableStream> | <WritableStream>
Eine Hilfsmethode zum Erstellen von Duplex-Streams.
Stream
konvertiert einen beschreibbaren Stream in einen beschreibbarenDuplex
und einen lesbaren Stream in einenDuplex
.Blob
konvertiert in einen lesbarenDuplex
.string
konvertiert in einen lesbarenDuplex
.ArrayBuffer
konvertiert in einen lesbarenDuplex
.AsyncIterable
konvertiert in einen lesbarenDuplex
. Kann nichtnull
ergeben.AsyncGeneratorFunction
konvertiert in einen lesbaren/beschreibbaren transformierendenDuplex
. Muss einAsyncIterable
als erste Quelle verwenden. Kann nichtnull
ergeben.AsyncFunction
konvertiert in einen beschreibbarenDuplex
. Muss entwedernull
oderundefined
zurückgeben.Object ({ writable, readable })
konvertiertreadable
undwritable
inStream
und kombiniert sie dann zuDuplex
, wobeiDuplex
inwritable
schreibt und vonreadable
liest.Promise
konvertiert in einen lesbarenDuplex
. Der Wertnull
wird ignoriert.ReadableStream
konvertiert in einen lesbarenDuplex
.WritableStream
konvertiert in einen beschreibbarenDuplex
.- Gibt zurück: <stream.Duplex>
Wenn ein Iterable
-Objekt, das Promises enthält, als Argument übergeben wird, kann dies zu einer nicht behandelten Ablehnung führen.
const { Duplex } = require('node:stream')
Duplex.from([
new Promise(resolve => setTimeout(resolve('1'), 1500)),
new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Nicht behandelte Ablehnung
])
stream.Duplex.fromWeb(pair[, options])
Hinzugefügt in: v17.0.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
pair
<Object>readable
<ReadableStream>writable
<WritableStream>
options
<Object>Gibt zurück: <stream.Duplex>
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
for await (const chunk of duplex) {
console.log('readable', chunk)
}
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')
const readable = new ReadableStream({
start(controller) {
controller.enqueue('world')
},
})
const writable = new WritableStream({
write(chunk) {
console.log('writable', chunk)
},
})
const pair = {
readable,
writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })
duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))
stream.Duplex.toWeb(streamDuplex)
Hinzugefügt in: v17.0.0
[Stabil: 1 - Experimentell]
Stabil: 1 Stabilität: 1 - Experimentell
streamDuplex
<stream.Duplex>- Gibt zurück: <Object>
readable
<ReadableStream>writable
<WritableStream>
import { Duplex } from 'node:stream'
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
const { value } = await readable.getReader().read()
console.log('readable', value)
const { Duplex } = require('node:stream')
const duplex = Duplex({
objectMode: true,
read() {
this.push('world')
this.push(null)
},
write(chunk, encoding, callback) {
console.log('writable', chunk)
callback()
},
})
const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')
readable
.getReader()
.read()
.then(result => {
console.log('readable', result.value)
})
stream.addAbortSignal(signal, stream)
[Verlauf]
Version | Änderungen |
---|---|
v19.7.0, v18.16.0 | Unterstützung für ReadableStream und WritableStream hinzugefügt. |
v15.4.0 | Hinzugefügt in: v15.4.0 |
signal
<AbortSignal> Ein Signal, das eine mögliche Abbrechung darstelltstream
<Stream> | <ReadableStream> | <WritableStream> Ein Stream, an den ein Signal angehängt werden soll.
Hängt ein AbortSignal an einen lesbaren oder beschreibbaren Stream an. Dies ermöglicht es dem Code, die Stream-Zerstörung mithilfe eines AbortController
zu steuern.
Das Aufrufen von abort
auf dem AbortController
, das dem übergebenen AbortSignal
entspricht, verhält sich auf die gleiche Weise wie das Aufrufen von .destroy(new AbortError())
auf dem Stream und controller.error(new AbortError())
für Webstreams.
const fs = require('node:fs')
const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// Später, brechen Sie den Vorgang ab, indem Sie den Stream schließen
controller.abort()
Oder die Verwendung eines AbortSignal
mit einem lesbaren Stream als asynchrone Iterable:
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // ein Timeout setzen
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
try {
for await (const chunk of stream) {
await process(chunk)
}
} catch (e) {
if (e.name === 'AbortError') {
// Der Vorgang wurde abgebrochen
} else {
throw e
}
}
})()
Oder die Verwendung eines AbortSignal
mit einem ReadableStream:
const controller = new AbortController()
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello')
controller.enqueue('world')
controller.close()
},
})
addAbortSignal(controller.signal, rs)
finished(rs, err => {
if (err) {
if (err.name === 'AbortError') {
// Der Vorgang wurde abgebrochen
}
}
})
const reader = rs.getReader()
reader.read().then(({ value, done }) => {
console.log(value) // hello
console.log(done) // false
controller.abort()
})
stream.getDefaultHighWaterMark(objectMode)
Hinzugefügt in: v19.9.0, v18.17.0
Gibt den Standardwert für highWaterMark
zurück, der von Streams verwendet wird. Standardmäßig ist dies 65536
(64 KiB) oder 16
für objectMode
.
stream.setDefaultHighWaterMark(objectMode, value)
Hinzugefügt in: v19.9.0, v18.17.0
Setzt den Standardwert für highWaterMark
, der von Streams verwendet wird.
API für Stream-Implementierer
Die node:stream
-Modul-API wurde so konzipiert, dass es einfach ist, Streams mit dem prototypischen Vererbungsmodell von JavaScript zu implementieren.
Zuerst deklariert ein Stream-Entwickler eine neue JavaScript-Klasse, die eine der vier grundlegenden Stream-Klassen (stream.Writable
, stream.Readable
, stream.Duplex
oder stream.Transform
) erweitert und stellt sicher, dass er den entsprechenden Konstruktor der übergeordneten Klasse aufruft:
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor({ highWaterMark, ...options }) {
super({ highWaterMark })
// ...
}
}
Denken Sie beim Erweitern von Streams daran, welche Optionen der Benutzer bereitstellen kann und sollte, bevor Sie diese an den Basiskonstruktor weiterleiten. Wenn die Implementierung beispielsweise Annahmen in Bezug auf die Optionen autoDestroy
und emitClose
trifft, erlauben Sie dem Benutzer nicht, diese zu überschreiben. Seien Sie explizit, welche Optionen weitergeleitet werden, anstatt implizit alle Optionen weiterzuleiten.
Die neue Stream-Klasse muss dann eine oder mehrere spezifische Methoden implementieren, abhängig vom Typ des zu erstellenden Streams, wie in der folgenden Tabelle aufgeführt:
Anwendungsfall | Klasse | Zu implementierende Methode(n) |
---|---|---|
Nur lesen | Readable | _read() |
Nur schreiben | Writable | _write() , _writev() , _final() |
Lesen und Schreiben | Duplex | _read() , _write() , _writev() , _final() |
Auf geschriebene Daten einwirken und dann das Ergebnis lesen | Transform | _transform() , _flush() , _final() |
Der Implementierungscode für einen Stream sollte niemals die "öffentlichen" Methoden eines Streams aufrufen, die für die Verwendung durch Konsumenten vorgesehen sind (wie im Abschnitt API für Stream-Konsumenten beschrieben). Dies kann zu unerwünschten Nebenwirkungen im Anwendungscode führen, der den Stream konsumiert.
Vermeiden Sie das Überschreiben öffentlicher Methoden wie write()
, end()
, cork()
, uncork()
, read()
und destroy()
oder das Ausgeben interner Ereignisse wie 'error'
, 'data'
, 'end'
, 'finish'
und 'close'
über .emit()
. Dies kann aktuelle und zukünftige Stream-Invarianten aufbrechen, was zu Verhaltens- und/oder Kompatibilitätsproblemen mit anderen Streams, Stream-Dienstprogrammen und Benutzererwartungen führt.
Vereinfachte Konstruktion
Hinzugefügt in: v1.2.0
In vielen einfachen Fällen ist es möglich, einen Stream zu erstellen, ohne auf Vererbung angewiesen zu sein. Dies kann erreicht werden, indem direkt Instanzen der stream.Writable
-, stream.Readable
-, stream.Duplex
- oder stream.Transform
-Objekte erstellt und entsprechende Methoden als Konstruktoroptionen übergeben werden.
const { Writable } = require('node:stream')
const myWritable = new Writable({
construct(callback) {
// Zustand initialisieren und Ressourcen laden...
},
write(chunk, encoding, callback) {
// ...
},
destroy() {
// Ressourcen freigeben...
},
})
Implementierung eines beschreibbaren Streams
Die Klasse stream.Writable
wird erweitert, um einen Writable
-Stream zu implementieren.
Benutzerdefinierte Writable
-Streams müssen den Konstruktor new stream.Writable([options])
aufrufen und die Methode writable._write()
und/oder writable._writev()
implementieren.
new stream.Writable([options])
[Historie]
Version | Änderungen |
---|---|
v22.0.0 | Standard highWaterMark erhöht. |
v15.5.0 | Unterstützung für die Übergabe eines AbortSignals. |
v14.0.0 | Die Standardeinstellung der Option autoDestroy wurde auf true geändert. |
v11.2.0, v10.16.0 | Die Option autoDestroy hinzugefügt, um den Stream automatisch destroy() auszuführen, wenn er 'finish' ausgibt oder Fehler auftreten. |
v10.0.0 | Die Option emitClose hinzugefügt, um anzugeben, ob beim Zerstören 'close' ausgegeben wird. |
options
<Object>highWaterMark
<number> Pufferpegel, wennstream.write()
beginnt,false
zurückzugeben. Standard:65536
(64 KiB) oder16
fürobjectMode
-Streams.decodeStrings
<boolean> Gibt an, ob anstream.write()
übergebenestring
s inBuffer
s (mit der imstream.write()
-Aufruf angegebenen Kodierung) kodiert werden sollen, bevor sie anstream._write()
übergeben werden. Andere Datentypen werden nicht konvertiert (d. h.Buffer
s werden nicht instring
s dekodiert). Wenn dieser Wert auf false gesetzt wird, wird die Konvertierung vonstring
s verhindert. Standard:true
.defaultEncoding
<string> Die Standardkodierung, die verwendet wird, wenn keine Kodierung als Argument fürstream.write()
angegeben wird. Standard:'utf8'
.objectMode
<boolean> Gibt an, obstream.write(anyObj)
eine gültige Operation ist. Wenn dies festgelegt ist, wird es möglich, andere JavaScript-Werte als String, <Buffer>, <TypedArray> oder <DataView> zu schreiben, wenn dies von der Stream-Implementierung unterstützt wird. Standard:false
.emitClose
<boolean> Gibt an, ob der Stream'close'
ausgeben soll, nachdem er zerstört wurde. Standard:true
.write
<Function> Implementierung für die Methodestream._write()
.writev
<Function> Implementierung für die Methodestream._writev()
.destroy
<Function> Implementierung für die Methodestream._destroy()
.final
<Function> Implementierung für die Methodestream._final()
.construct
<Function> Implementierung für die Methodestream._construct()
.autoDestroy
<boolean> Gibt an, ob dieser Stream nach dem Beenden automatisch.destroy()
für sich selbst aufrufen soll. Standard:true
.signal
<AbortSignal> Ein Signal, das eine mögliche Abbrechen darstellt.
const { Writable } = require('node:stream')
class MyWritable extends Writable {
constructor(options) {
// Ruft den Konstruktor stream.Writable() auf.
super(options)
// ...
}
}
Oder bei Verwendung von Konstruktoren im Pre-ES6-Stil:
const { Writable } = require('node:stream')
const util = require('node:util')
function MyWritable(options) {
if (!(this instanceof MyWritable)) return new MyWritable(options)
Writable.call(this, options)
}
util.inherits(MyWritable, Writable)
Oder unter Verwendung des vereinfachten Konstruktoransatzes:
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
})
Das Aufrufen von abort
auf dem AbortController
, der dem übergebenen AbortSignal
entspricht, verhält sich genauso wie der Aufruf von .destroy(new AbortError())
auf dem beschreibbaren Stream.
const { Writable } = require('node:stream')
const controller = new AbortController()
const myWritable = new Writable({
write(chunk, encoding, callback) {
// ...
},
writev(chunks, callback) {
// ...
},
signal: controller.signal,
})
// Später wird der Vorgang abgebrochen und der Stream geschlossen
controller.abort()
writable._construct(callback)
Hinzugefügt in: v15.0.0
callback
<Funktion> Rufen Sie diese Funktion (optional mit einem Fehlerargument) auf, wenn der Stream die Initialisierung abgeschlossen hat.
Die Methode _construct()
DARF NICHT direkt aufgerufen werden. Sie kann von Kindklassen implementiert werden und wird, falls dies der Fall ist, nur von den internen Methoden der Klasse Writable
aufgerufen.
Diese optionale Funktion wird in einem Tick aufgerufen, nachdem der Stream-Konstruktor zurückgekehrt ist, und verzögert alle Aufrufe von _write()
, _final()
und _destroy()
, bis callback
aufgerufen wird. Dies ist nützlich, um den Zustand zu initialisieren oder Ressourcen asynchron zu initialisieren, bevor der Stream verwendet werden kann.
const { Writable } = require('node:stream')
const fs = require('node:fs')
class WriteStream extends Writable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, 'w', (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_write(chunk, encoding, callback) {
fs.write(this.fd, chunk, callback)
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
writable._write(chunk, encoding, callback)
[Verlauf]
Version | Änderungen |
---|---|
v12.11.0 | _write() ist optional, wenn _writev() bereitgestellt wird. |
chunk
<Buffer> | <string> | <any> Der zu schreibendeBuffer
, der aus dem anstream.write()
übergebenenstring
konvertiert wurde. Wenn die OptiondecodeStrings
des Streamsfalse
ist oder der Stream im Objektmodus arbeitet, wird der Chunk nicht konvertiert und entspricht dem, was anstream.write()
übergeben wurde.encoding
<string> Wenn der Chunk ein String ist, istencoding
die Zeichenkodierung dieses Strings. Wenn der Chunk einBuffer
ist oder der Stream im Objektmodus arbeitet, kannencoding
ignoriert werden.callback
<Funktion> Rufen Sie diese Funktion (optional mit einem Fehlerargument) auf, wenn die Verarbeitung für den angegebenen Chunk abgeschlossen ist.
Alle Writable
-Stream-Implementierungen müssen eine writable._write()
- und/oder writable._writev()
-Methode bereitstellen, um Daten an die zugrunde liegende Ressource zu senden.
Transform
-Streams stellen ihre eigene Implementierung von writable._write()
bereit.
Diese Funktion DARF NICHT direkt vom Anwendungscode aufgerufen werden. Sie sollte von Kindklassen implementiert und nur von den internen Methoden der Writable
-Klasse aufgerufen werden.
Die callback
-Funktion muss synchron innerhalb von writable._write()
oder asynchron (d. h. unterschiedlicher Tick) aufgerufen werden, um entweder zu signalisieren, dass der Schreibvorgang erfolgreich abgeschlossen wurde oder mit einem Fehler fehlgeschlagen ist. Das erste Argument, das an den callback
übergeben wird, muss das Error
-Objekt sein, wenn der Aufruf fehlgeschlagen ist, oder null
, wenn der Schreibvorgang erfolgreich war.
Alle Aufrufe von writable.write()
, die zwischen dem Zeitpunkt des Aufrufs von writable._write()
und dem Zeitpunkt des Aufrufs von callback
auftreten, führen dazu, dass die geschriebenen Daten gepuffert werden. Wenn callback
aufgerufen wird, kann der Stream ein 'drain'
-Ereignis ausgeben. Wenn eine Stream-Implementierung in der Lage ist, mehrere Datenchunks gleichzeitig zu verarbeiten, sollte die Methode writable._writev()
implementiert werden.
Wenn die Eigenschaft decodeStrings
in den Konstruktoroptionen explizit auf false
gesetzt wird, bleibt chunk
dasselbe Objekt, das an .write()
übergeben wird, und kann ein String anstatt eines Buffer
sein. Dies dient zur Unterstützung von Implementierungen, die eine optimierte Behandlung für bestimmte String-Datenkodierungen haben. In diesem Fall gibt das Argument encoding
die Zeichenkodierung des Strings an. Andernfalls kann das Argument encoding
sicher ignoriert werden.
Die Methode writable._write()
wird mit einem Unterstrich versehen, da sie intern für die Klasse ist, die sie definiert, und niemals direkt von Benutzerprogrammen aufgerufen werden sollte.
writable._writev(chunks, callback)
chunks
<Object[]> Die zu schreibenden Daten. Der Wert ist ein Array von <Object>, die jeweils einen diskreten Datenblock darstellen, der geschrieben werden soll. Die Eigenschaften dieser Objekte sind:chunk
<Buffer> | <string> Eine Buffer-Instanz oder ein String, der die zu schreibenden Daten enthält. Derchunk
ist ein String, wenn dasWritable
mit der OptiondecodeStrings
auffalse
gesetzt wurde und ein String anwrite()
übergeben wurde.encoding
<string> Die Zeichenkodierung deschunk
. Wennchunk
einBuffer
ist, istencoding
'buffer'
.
callback
<Function> Eine Callback-Funktion (optional mit einem Fehlerargument), die aufgerufen wird, wenn die Verarbeitung für die bereitgestellten Chunks abgeschlossen ist.
Diese Funktion DARF NICHT direkt vom Anwendungscode aufgerufen werden. Sie sollte von untergeordneten Klassen implementiert und nur von den internen Writable
-Klassenmethoden aufgerufen werden.
Die Methode writable._writev()
kann zusätzlich oder alternativ zu writable._write()
in Stream-Implementierungen implementiert werden, die in der Lage sind, mehrere Datenblöcke gleichzeitig zu verarbeiten. Wenn implementiert und wenn gepufferte Daten aus vorherigen Schreibvorgängen vorhanden sind, wird _writev()
anstelle von _write()
aufgerufen.
Der Methode writable._writev()
ist ein Unterstrich vorangestellt, da sie intern für die Klasse ist, die sie definiert, und niemals direkt von Benutzerprogrammen aufgerufen werden sollte.
writable._destroy(err, callback)
Hinzugefügt in: v8.0.0
err
<Error> Ein möglicher Fehler.callback
<Function> Eine Callback-Funktion, die ein optionales Fehlerargument entgegennimmt.
Die Methode _destroy()
wird von writable.destroy()
aufgerufen. Sie kann von untergeordneten Klassen überschrieben werden, darf aber nicht direkt aufgerufen werden.
writable._final(callback)
Hinzugefügt in: v8.0.0
callback
<Funktion> Ruft diese Funktion (optional mit einem Fehlerargument) auf, wenn das Schreiben verbleibender Daten abgeschlossen ist.
Die Methode _final()
darf nicht direkt aufgerufen werden. Sie kann von untergeordneten Klassen implementiert werden und wird, falls dies der Fall ist, nur von den internen Methoden der Writable
-Klasse aufgerufen.
Diese optionale Funktion wird aufgerufen, bevor der Stream geschlossen wird, wodurch das 'finish'
-Ereignis verzögert wird, bis callback
aufgerufen wird. Dies ist nützlich, um Ressourcen zu schließen oder gepufferte Daten zu schreiben, bevor ein Stream endet.
Fehler beim Schreiben
Fehler, die bei der Verarbeitung der Methoden writable._write()
, writable._writev()
und writable._final()
auftreten, müssen durch Aufrufen des Callbacks und Übergeben des Fehlers als erstes Argument weitergegeben werden. Das Werfen eines Error
innerhalb dieser Methoden oder das manuelle Auslösen eines 'error'
-Ereignisses führt zu undefiniertem Verhalten.
Wenn ein Readable
-Stream in einen Writable
-Stream geleitet wird, wenn Writable
einen Fehler ausgibt, wird der Readable
-Stream entkoppelt.
const { Writable } = require('node:stream')
const myWritable = new Writable({
write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk ist ungültig'))
} else {
callback()
}
},
})
Ein Beispiel für einen schreibbaren Stream
Das Folgende veranschaulicht eine eher simple (und etwas sinnlose) benutzerdefinierte Writable
-Stream-Implementierung. Obwohl diese spezielle Writable
-Stream-Instanz keinen besonderen Nutzen hat, veranschaulicht das Beispiel jedes der erforderlichen Elemente einer benutzerdefinierten Writable
-Stream-Instanz:
const { Writable } = require('node:stream')
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) {
callback(new Error('chunk ist ungültig'))
} else {
callback()
}
}
}
Dekodieren von Puffern in einem beschreibbaren Stream
Das Dekodieren von Puffern ist eine häufige Aufgabe, z. B. bei der Verwendung von Transformatoren, deren Eingabe eine Zeichenkette ist. Dies ist kein trivialer Prozess bei der Verwendung von Mehrbyte-Zeichenkodierungen wie UTF-8. Das folgende Beispiel zeigt, wie man Mehrbyte-Zeichenketten mit StringDecoder
und Writable
dekodiert.
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')
class StringWritable extends Writable {
constructor(options) {
super(options)
this._decoder = new StringDecoder(options?.defaultEncoding)
this.data = ''
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk)
}
this.data += chunk
callback()
}
_final(callback) {
this.data += this._decoder.end()
callback()
}
}
const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()
w.write('currency: ')
w.write(euro[0])
w.end(euro[1])
console.log(w.data) // currency: €
Implementierung eines lesbaren Streams
Die Klasse stream.Readable
wird erweitert, um einen Readable
-Stream zu implementieren.
Benutzerdefinierte Readable
-Streams müssen den Konstruktor new stream.Readable([options])
aufrufen und die Methode readable._read()
implementieren.
new stream.Readable([options])
[Verlauf]
Version | Änderungen |
---|---|
v22.0.0 | Standard-HighWaterMark erhöht. |
v15.5.0 | Unterstützung für das Übergeben eines AbortSignals. |
v14.0.0 | Standardwert der Option autoDestroy auf true geändert. |
v11.2.0, v10.16.0 | Option autoDestroy hinzugefügt, um den Stream automatisch destroy() zu beenden, wenn er 'end' ausgibt oder Fehler auftreten. |
options
<Object>highWaterMark
<number> Die maximale Anzahl von Bytes, die im internen Puffer gespeichert werden sollen, bevor das Lesen von der zugrunde liegenden Ressource eingestellt wird. Standard:65536
(64 KiB) oder16
fürobjectMode
-Streams.encoding
<string> Wenn angegeben, werden Puffer mit der angegebenen Kodierung in Zeichenketten dekodiert. Standard:null
.objectMode
<boolean> Gibt an, ob dieser Stream sich wie ein Stream von Objekten verhalten soll. Das bedeutet, dassstream.read(n)
einen einzelnen Wert anstelle einesBuffer
der Größen
zurückgibt. Standard:false
.emitClose
<boolean> Gibt an, ob der Stream'close'
ausgeben soll, nachdem er zerstört wurde. Standard:true
.read
<Function> Implementierung für die Methodestream._read()
.destroy
<Function> Implementierung für die Methodestream._destroy()
.construct
<Function> Implementierung für die Methodestream._construct()
.autoDestroy
<boolean> Gibt an, ob dieser Stream automatisch.destroy()
für sich selbst aufrufen soll, nachdem er beendet wurde. Standard:true
.signal
<AbortSignal> Ein Signal, das eine mögliche Abbrechung darstellt.
const { Readable } = require('node:stream')
class MyReadable extends Readable {
constructor(options) {
// Ruft den Konstruktor stream.Readable(options) auf.
super(options)
// ...
}
}
Oder bei Verwendung von Konstruktoren im Pre-ES6-Stil:
const { Readable } = require('node:stream')
const util = require('node:util')
function MyReadable(options) {
if (!(this instanceof MyReadable)) return new MyReadable(options)
Readable.call(this, options)
}
util.inherits(MyReadable, Readable)
Oder bei Verwendung des vereinfachten Konstruktoransatzes:
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
// ...
},
})
Das Aufrufen von abort
auf dem AbortController
, der dem übergebenen AbortSignal
entspricht, verhält sich genauso wie das Aufrufen von .destroy(new AbortError())
auf dem erzeugten lesbaren Stream.
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
read(size) {
// ...
},
signal: controller.signal,
})
// Später die Operation abbrechen und den Stream schließen
controller.abort()
readable._construct(callback)
Hinzugefügt in: v15.0.0
callback
<Funktion> Rufe diese Funktion auf (optional mit einem Fehlerargument), wenn der Stream die Initialisierung abgeschlossen hat.
Die _construct()
-Methode DARF NICHT direkt aufgerufen werden. Sie kann von untergeordneten Klassen implementiert werden und wird, falls dies der Fall ist, nur von den internen Readable
-Klassenmethoden aufgerufen.
Diese optionale Funktion wird im nächsten Tick vom Stream-Konstruktor geplant, wodurch alle _read()
- und _destroy()
-Aufrufe verzögert werden, bis callback
aufgerufen wird. Dies ist nützlich, um den Status zu initialisieren oder Ressourcen asynchron zu initialisieren, bevor der Stream verwendet werden kann.
const { Readable } = require('node:stream')
const fs = require('node:fs')
class ReadStream extends Readable {
constructor(filename) {
super()
this.filename = filename
this.fd = null
}
_construct(callback) {
fs.open(this.filename, (err, fd) => {
if (err) {
callback(err)
} else {
this.fd = fd
callback()
}
})
}
_read(n) {
const buf = Buffer.alloc(n)
fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
if (err) {
this.destroy(err)
} else {
this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
}
})
}
_destroy(err, callback) {
if (this.fd) {
fs.close(this.fd, er => callback(er || err))
} else {
callback(err)
}
}
}
readable._read(size)
Hinzugefügt in: v0.9.4
size
<Zahl> Anzahl der asynchron zu lesenden Bytes
Diese Funktion DARF NICHT direkt vom Anwendungscode aufgerufen werden. Sie sollte von untergeordneten Klassen implementiert und nur von den internen Readable
-Klassenmethoden aufgerufen werden.
Alle Readable
-Stream-Implementierungen müssen eine Implementierung der [readable._read()
]-Methode bereitstellen, um Daten aus der zugrunde liegenden Ressource abzurufen.
Wenn readable._read()
aufgerufen wird und Daten aus der Ressource verfügbar sind, sollte die Implementierung beginnen, diese Daten mithilfe der [this.push(dataChunk)
]-Methode in die Lesewarteschlange zu schieben. _read()
wird nach jedem Aufruf von this.push(dataChunk)
erneut aufgerufen, sobald der Stream bereit ist, weitere Daten zu akzeptieren. _read()
kann weiterhin aus der Ressource lesen und Daten schieben, bis readable.push()
false
zurückgibt. Nur wenn _read()
erneut aufgerufen wird, nachdem es gestoppt wurde, sollte es mit dem Schieben weiterer Daten in die Warteschlange fortfahren.
Sobald die [readable._read()
]-Methode aufgerufen wurde, wird sie erst wieder aufgerufen, wenn weitere Daten über die [readable.push()
]-Methode geschoben werden. Leere Daten wie leere Puffer und Strings führen nicht dazu, dass readable._read()
aufgerufen wird.
Das Argument size
ist beratend. Für Implementierungen, bei denen ein "Lesen" eine einzelne Operation ist, die Daten zurückgibt, kann das Argument size
verwendet werden, um zu bestimmen, wie viele Daten abgerufen werden sollen. Andere Implementierungen können dieses Argument ignorieren und einfach Daten bereitstellen, sobald sie verfügbar sind. Es ist nicht erforderlich zu "warten", bis size
Bytes verfügbar sind, bevor stream.push(chunk)
aufgerufen wird.
Die [readable._read()
]-Methode ist mit einem Unterstrich versehen, da sie intern für die Klasse ist, die sie definiert, und niemals direkt von Benutzerprogrammen aufgerufen werden sollte.
readable._destroy(err, callback)
Hinzugefügt in: v8.0.0
err
<Error> Ein möglicher Fehler.callback
<Function> Eine Callback-Funktion, die ein optionales Fehlerargument entgegennimmt.
Die Methode _destroy()
wird von readable.destroy()
aufgerufen. Sie kann von untergeordneten Klassen überschrieben werden, darf aber nicht direkt aufgerufen werden.
readable.push(chunk[, encoding])
[Verlauf]
Version | Änderungen |
---|---|
v22.0.0, v20.13.0 | Das chunk -Argument kann nun eine TypedArray - oder DataView -Instanz sein. |
v8.0.0 | Das chunk -Argument kann nun eine Uint8Array -Instanz sein. |
chunk
<Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Datenchunk, der in die Lesewarteschlange geschoben werden soll. Für Streams, die nicht im Objektmodus betrieben werden, musschunk
ein <string>, <Buffer>, <TypedArray> oder <DataView> sein. Für Streams im Objektmodus kannchunk
ein beliebiger JavaScript-Wert sein.encoding
<string> Kodierung von String-Chunks. Muss eine gültigeBuffer
-Kodierung sein, wie z. B.'utf8'
oder'ascii'
.- Gibt zurück: <boolean>
true
, wenn weitere Daten-Chunks weitergeschoben werden können; ansonstenfalse
.
Wenn chunk
ein <Buffer>, <TypedArray>, <DataView> oder <string> ist, wird der chunk
an Daten der internen Warteschlange hinzugefügt, damit Benutzer des Streams ihn verbrauchen können. Das Übergeben von chunk
als null
signalisiert das Ende des Streams (EOF), nach dem keine Daten mehr geschrieben werden können.
Wenn der Readable
im pausierten Modus arbeitet, können die mit readable.push()
hinzugefügten Daten durch Aufrufen der Methode readable.read()
gelesen werden, wenn das Ereignis 'readable'
ausgelöst wird.
Wenn der Readable
im fließenden Modus arbeitet, werden die mit readable.push()
hinzugefügten Daten durch Auslösen eines 'data'
-Ereignisses zugestellt.
Die Methode readable.push()
ist so flexibel wie möglich gestaltet. Wenn beispielsweise eine Quelle auf niedrigerer Ebene mit einer Art Pause/Resume-Mechanismus und einem Daten-Callback umschlossen wird, kann die Quelle auf niedrigerer Ebene von der benutzerdefinierten Readable
-Instanz umschlossen werden:
// `_source` ist ein Objekt mit den Methoden readStop() und readStart()
// und einem `ondata`-Element, das aufgerufen wird, wenn es Daten hat, und
// einem `onend`-Element, das aufgerufen wird, wenn die Daten vorbei sind.
class SourceWrapper extends Readable {
constructor(options) {
super(options)
this._source = getLowLevelSourceObject()
// Jedes Mal, wenn Daten vorhanden sind, werden diese in den internen Puffer geschoben.
this._source.ondata = chunk => {
// Wenn push() false zurückgibt, wird das Lesen aus der Quelle beendet.
if (!this.push(chunk)) this._source.readStop()
}
// Wenn die Quelle endet, wird der EOF-signal-`null`-Chunk geschoben.
this._source.onend = () => {
this.push(null)
}
}
// _read() wird aufgerufen, wenn der Stream weitere Daten abrufen möchte.
// Das beratende Größenargument wird in diesem Fall ignoriert.
_read(size) {
this._source.readStart()
}
}
Die Methode readable.push()
wird verwendet, um den Inhalt in den internen Puffer zu schieben. Sie kann von der Methode readable._read()
gesteuert werden.
Für Streams, die nicht im Objektmodus arbeiten, wird der chunk
-Parameter von readable.push()
als leerer String oder Puffer behandelt, wenn er undefined
ist. Weitere Informationen finden Sie unter readable.push('')
.
Fehler beim Lesen
Fehler, die während der Verarbeitung von readable._read()
auftreten, müssen über die Methode readable.destroy(err)
weitergegeben werden. Das Auslösen eines Error
innerhalb von readable._read()
oder das manuelle Ausgeben eines 'error'
-Ereignisses führt zu undefiniertem Verhalten.
const { Readable } = require('node:stream')
const myReadable = new Readable({
read(size) {
const err = checkSomeErrorCondition()
if (err) {
this.destroy(err)
} else {
// Do some work.
}
},
})
Ein Beispiel für einen Zählstream
Das Folgende ist ein einfaches Beispiel für einen Readable
-Stream, der die Ziffern von 1 bis 1.000.000 in aufsteigender Reihenfolge ausgibt und dann endet.
const { Readable } = require('node:stream')
class Counter extends Readable {
constructor(opt) {
super(opt)
this._max = 1000000
this._index = 1
}
_read() {
const i = this._index++
if (i > this._max) this.push(null)
else {
const str = String(i)
const buf = Buffer.from(str, 'ascii')
this.push(buf)
}
}
}
Implementieren eines Duplex-Streams
Ein Duplex
-Stream ist ein Stream, der sowohl Readable
als auch Writable
implementiert, wie z. B. eine TCP-Socket-Verbindung.
Da JavaScript keine Unterstützung für Mehrfachvererbung bietet, wird die Klasse stream.Duplex
erweitert, um einen Duplex
-Stream zu implementieren (im Gegensatz zur Erweiterung der Klassen stream.Readable
und stream.Writable
).
Die Klasse stream.Duplex
erbt prototypisch von stream.Readable
und parasitär von stream.Writable
, aber instanceof
funktioniert aufgrund der Überschreibung von Symbol.hasInstance
in stream.Writable
für beide Basisklassen korrekt.
Benutzerdefinierte Duplex
-Streams müssen den Konstruktor new stream.Duplex([options])
aufrufen und sowohl die Methoden readable._read()
als auch writable._write()
implementieren.
new stream.Duplex(options)
[Historie]
Version | Änderungen |
---|---|
v8.4.0 | Die Optionen readableHighWaterMark und writableHighWaterMark werden jetzt unterstützt. |
options
<Object> Wird sowohl an die KonstruktorenWritable
als auchReadable
übergeben. Hat auch die folgenden Felder:allowHalfOpen
<boolean> Wenn auffalse
gesetzt, beendet der Stream automatisch die beschreibbare Seite, wenn die lesbare Seite endet. Standard:true
.readable
<boolean> Legt fest, ob derDuplex
lesbar sein soll. Standard:true
.writable
<boolean> Legt fest, ob derDuplex
beschreibbar sein soll. Standard:true
.readableObjectMode
<boolean> LegtobjectMode
für die lesbare Seite des Streams fest. Hat keine Auswirkung, wennobjectMode
true
ist. Standard:false
.writableObjectMode
<boolean> LegtobjectMode
für die beschreibbare Seite des Streams fest. Hat keine Auswirkung, wennobjectMode
true
ist. Standard:false
.readableHighWaterMark
<number> LegthighWaterMark
für die lesbare Seite des Streams fest. Hat keine Auswirkung, wennhighWaterMark
angegeben wird.writableHighWaterMark
<number> LegthighWaterMark
für die beschreibbare Seite des Streams fest. Hat keine Auswirkung, wennhighWaterMark
angegeben wird.
const { Duplex } = require('node:stream')
class MyDuplex extends Duplex {
constructor(options) {
super(options)
// ...
}
}
Oder bei Verwendung von Konstruktoren im Pre-ES6-Stil:
const { Duplex } = require('node:stream')
const util = require('node:util')
function MyDuplex(options) {
if (!(this instanceof MyDuplex)) return new MyDuplex(options)
Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)
Oder unter Verwendung des vereinfachten Konstruktoransatzes:
const { Duplex } = require('node:stream')
const myDuplex = new Duplex({
read(size) {
// ...
},
write(chunk, encoding, callback) {
// ...
},
})
Bei Verwendung von Pipeline:
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')
pipeline(
fs.createReadStream('object.json').setEncoding('utf8'),
new Transform({
decodeStrings: false, // Akzeptiert String-Eingabe anstelle von Buffers
construct(callback) {
this.data = ''
callback()
},
transform(chunk, encoding, callback) {
this.data += chunk
callback()
},
flush(callback) {
try {
// Sicherstellen, dass es sich um gültiges JSON handelt.
JSON.parse(this.data)
this.push(this.data)
callback()
} catch (err) {
callback(err)
}
},
}),
fs.createWriteStream('valid-object.json'),
err => {
if (err) {
console.error('fehlgeschlagen', err)
} else {
console.log('abgeschlossen')
}
}
)
Ein Beispiel für einen Duplex-Stream
Das Folgende veranschaulicht ein einfaches Beispiel für einen Duplex
-Stream, der ein hypothetisches Low-Level-Quellobjekt umschließt, in das Daten geschrieben und aus dem Daten gelesen werden können, wenn auch mit einer API, die nicht mit Node.js-Streams kompatibel ist. Das Folgende veranschaulicht ein einfaches Beispiel für einen Duplex
-Stream, der eingehende geschriebene Daten über die Writable
-Schnittstelle puffert, die über die Readable
-Schnittstelle wieder ausgelesen werden.
const { Duplex } = require('node:stream')
const kSource = Symbol('source')
class MyDuplex extends Duplex {
constructor(source, options) {
super(options)
this[kSource] = source
}
_write(chunk, encoding, callback) {
// Die zugrunde liegende Quelle verarbeitet nur Strings.
if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
this[kSource].writeSomeData(chunk)
callback()
}
_read(size) {
this[kSource].fetchSomeData(size, (data, encoding) => {
this.push(Buffer.from(data, encoding))
})
}
}
Der wichtigste Aspekt eines Duplex
-Streams ist, dass die Readable
- und Writable
-Seiten unabhängig voneinander arbeiten, obwohl sie innerhalb einer einzigen Objektinstanz koexistieren.
Object-Mode-Duplex-Streams
Für Duplex
-Streams kann objectMode
ausschließlich für die Readable
- oder Writable
-Seite mithilfe der Optionen readableObjectMode
bzw. writableObjectMode
eingestellt werden.
Im folgenden Beispiel wird beispielsweise ein neuer Transform
-Stream (der eine Art von Duplex
-Stream ist) erstellt, der eine Writable
-Seite im Objektmodus hat, die JavaScript-Zahlen akzeptiert, die auf der Readable
-Seite in hexadezimale Strings konvertiert werden.
const { Transform } = require('node:stream')
// Alle Transform-Streams sind auch Duplex-Streams.
const myTransform = new Transform({
writableObjectMode: true,
transform(chunk, encoding, callback) {
// Zwinge den Chunk bei Bedarf zu einer Zahl.
chunk |= 0
// Wandle den Chunk in etwas anderes um.
const data = chunk.toString(16)
// Füge die Daten der lesbaren Warteschlange hinzu.
callback(null, '0'.repeat(data.length % 2) + data)
},
})
myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))
myTransform.write(1)
// Gibt aus: 01
myTransform.write(10)
// Gibt aus: 0a
myTransform.write(100)
// Gibt aus: 64
Implementierung eines Transformations-Streams
Ein Transform
-Stream ist ein Duplex
-Stream, bei dem die Ausgabe auf irgendeine Weise aus der Eingabe berechnet wird. Beispiele hierfür sind zlib-Streams oder crypto-Streams, die Daten komprimieren, verschlüsseln oder entschlüsseln.
Es gibt keine Vorschrift, dass die Ausgabe die gleiche Größe wie die Eingabe haben muss, die gleiche Anzahl von Chunks oder zur gleichen Zeit eintreffen muss. Zum Beispiel wird ein Hash
-Stream immer nur einen einzigen Ausgabechunk haben, der bereitgestellt wird, wenn die Eingabe beendet ist. Ein zlib
-Stream erzeugt eine Ausgabe, die entweder viel kleiner oder viel größer als seine Eingabe ist.
Die stream.Transform
-Klasse wird erweitert, um einen Transform
-Stream zu implementieren.
Die stream.Transform
-Klasse erbt prototypisch von stream.Duplex
und implementiert ihre eigenen Versionen der Methoden writable._write()
und readable._read()
. Benutzerdefinierte Transform
-Implementierungen müssen die Methode transform._transform()
implementieren und können auch die Methode transform._flush()
implementieren.
Bei der Verwendung von Transform
-Streams ist Vorsicht geboten, da Daten, die in den Stream geschrieben werden, dazu führen können, dass die Writable
-Seite des Streams angehalten wird, wenn die Ausgabe auf der Readable
-Seite nicht verbraucht wird.
new stream.Transform([options])
options
<Object> Wird sowohl anWritable
- als auch anReadable
-Konstruktoren übergeben. Hat auch folgende Felder:transform
<Function> Implementierung für die Methodestream._transform()
.flush
<Function> Implementierung für die Methodestream._flush()
.
const { Transform } = require('node:stream')
class MyTransform extends Transform {
constructor(options) {
super(options)
// ...
}
}
Oder bei Verwendung von Konstruktoren im Pre-ES6-Stil:
const { Transform } = require('node:stream')
const util = require('node:util')
function MyTransform(options) {
if (!(this instanceof MyTransform)) return new MyTransform(options)
Transform.call(this, options)
}
util.inherits(MyTransform, Transform)
Oder mit dem vereinfachten Konstruktoransatz:
const { Transform } = require('node:stream')
const myTransform = new Transform({
transform(chunk, encoding, callback) {
// ...
},
})
Ereignis: 'end'
Das 'end'
-Ereignis stammt von der stream.Readable
-Klasse. Das 'end'
-Ereignis wird ausgelöst, nachdem alle Daten ausgegeben wurden, was nach dem Aufruf des Callbacks in transform._flush()
erfolgt. Im Fehlerfall sollte 'end'
nicht ausgelöst werden.
Ereignis: 'finish'
Das 'finish'
-Ereignis stammt von der stream.Writable
-Klasse. Das 'finish'
-Ereignis wird ausgelöst, nachdem stream.end()
aufgerufen wurde und alle Chunks von stream._transform()
verarbeitet wurden. Im Fehlerfall sollte 'finish'
nicht ausgelöst werden.
transform._flush(callback)
callback
<Funktion> Eine Callback-Funktion (optional mit einem Fehlerargument und Daten), die aufgerufen wird, wenn verbleibende Daten geleert wurden.
Diese Funktion DARF NICHT direkt vom Anwendungscode aufgerufen werden. Sie sollte von Kindklassen implementiert und nur von den internen Readable
-Klassenmethoden aufgerufen werden.
In einigen Fällen kann eine Transformationsoperation am Ende des Streams ein zusätzliches Datenbit ausgeben müssen. Beispielsweise speichert ein zlib
-Komprimierungsstream einen Teil des internen Zustands, der verwendet wird, um die Ausgabe optimal zu komprimieren. Wenn der Stream endet, müssen diese zusätzlichen Daten jedoch geleert werden, damit die komprimierten Daten vollständig sind.
Benutzerdefinierte Transform
-Implementierungen können die transform._flush()
-Methode implementieren. Diese wird aufgerufen, wenn keine weiteren geschriebenen Daten zu verbrauchen sind, jedoch bevor das 'end'
-Ereignis ausgelöst wird, das das Ende des Readable
-Streams signalisiert.
Innerhalb der transform._flush()
-Implementierung kann die transform.push()
-Methode gegebenenfalls null oder mehrmals aufgerufen werden. Die callback
-Funktion muss aufgerufen werden, wenn die Flush-Operation abgeschlossen ist.
Die transform._flush()
-Methode ist mit einem Unterstrich versehen, da sie intern für die Klasse ist, die sie definiert, und niemals direkt von Benutzerprogrammen aufgerufen werden sollte.
transform._transform(chunk, encoding, callback)
chunk
<Buffer> | <string> | <any> Der zu transformierendeBuffer
, der von dem anstream.write()
übergebenenstring
konvertiert wurde. Wenn die OptiondecodeStrings
des Streamsfalse
ist oder der Stream im Objektmodus arbeitet, wird der Chunk nicht konvertiert und ist das, was anstream.write()
übergeben wurde.encoding
<string> Wenn der Chunk ein String ist, dann ist dies der Kodierungstyp. Wenn der Chunk ein Buffer ist, dann ist dies der spezielle Wert'buffer'
. Ignorieren Sie ihn in diesem Fall.callback
<Function> Eine Callback-Funktion (optional mit einem Fehlerargument und Daten), die aufgerufen werden soll, nachdem der bereitgestelltechunk
verarbeitet wurde.
Diese Funktion DARF NICHT direkt vom Anwendungscode aufgerufen werden. Sie sollte von Kindklassen implementiert und nur von den internen Methoden der Readable
-Klasse aufgerufen werden.
Alle Transform
-Stream-Implementierungen müssen eine _transform()
-Methode bereitstellen, um Eingaben zu akzeptieren und Ausgaben zu erzeugen. Die transform._transform()
-Implementierung behandelt die geschriebenen Bytes, berechnet eine Ausgabe und übergibt diese Ausgabe dann über die transform.push()
-Methode an den lesbaren Teil.
Die transform.push()
-Methode kann null oder mehrmals aufgerufen werden, um aus einem einzelnen Eingabe-Chunk eine Ausgabe zu erzeugen, je nachdem, wie viel als Ergebnis des Chunks ausgegeben werden soll.
Es ist möglich, dass aus einem bestimmten Chunk von Eingabedaten keine Ausgabe erzeugt wird.
Die callback
-Funktion muss nur aufgerufen werden, wenn der aktuelle Chunk vollständig verarbeitet wurde. Das erste Argument, das an den callback
übergeben wird, muss ein Error
-Objekt sein, wenn bei der Verarbeitung der Eingabe ein Fehler aufgetreten ist, oder null
, wenn kein Fehler aufgetreten ist. Wenn ein zweites Argument an den callback
übergeben wird, wird es an die transform.push()
-Methode weitergeleitet, aber nur, wenn das erste Argument falsch ist. Mit anderen Worten, die folgenden sind äquivalent:
transform.prototype._transform = function (data, encoding, callback) {
this.push(data)
callback()
}
transform.prototype._transform = function (data, encoding, callback) {
callback(null, data)
}
Die transform._transform()
-Methode ist mit einem Unterstrich präfixiert, da sie intern zur definierenden Klasse gehört und niemals direkt von Benutzerprogrammen aufgerufen werden sollte.
transform._transform()
wird niemals parallel aufgerufen; Streams implementieren einen Warteschlangenmechanismus, und um den nächsten Chunk zu empfangen, muss callback
entweder synchron oder asynchron aufgerufen werden.
Klasse: stream.PassThrough
Die Klasse stream.PassThrough
ist eine triviale Implementierung eines Transform
-Streams, der die Eingabe-Bytes einfach an die Ausgabe weiterleitet. Ihr Zweck liegt hauptsächlich in Beispielen und Tests, aber es gibt einige Anwendungsfälle, in denen stream.PassThrough
als Baustein für neuartige Arten von Streams nützlich ist.
Zusätzliche Hinweise
Stream-Kompatibilität mit asynchronen Generatoren und asynchronen Iteratoren
Mit der Unterstützung von asynchronen Generatoren und Iteratoren in JavaScript sind asynchrone Generatoren an diesem Punkt effektiv ein erstklassiges Sprach-Level-Stream-Konstrukt.
Einige gängige Interop-Fälle der Verwendung von Node.js-Streams mit asynchronen Generatoren und asynchronen Iteratoren sind unten aufgeführt.
Konsumieren von lesbaren Streams mit asynchronen Iteratoren
;(async function () {
for await (const chunk of readable) {
console.log(chunk)
}
})()
Asynchrone Iteratoren registrieren einen permanenten Fehlerhandler auf dem Stream, um unberücksichtigte Fehler nach der Zerstörung zu verhindern.
Erstellen von lesbaren Streams mit asynchronen Generatoren
Ein Node.js-lesbarer Stream kann aus einem asynchronen Generator mit der Utility-Methode Readable.from()
erstellt werden:
const { Readable } = require('node:stream')
const ac = new AbortController()
const signal = ac.signal
async function* generate() {
yield 'a'
await someLongRunningFn({ signal })
yield 'b'
yield 'c'
}
const readable = Readable.from(generate())
readable.on('close', () => {
ac.abort()
})
readable.on('data', chunk => {
console.log(chunk)
})
Weiterleiten an beschreibbare Streams von asynchronen Iteratoren
Wenn Sie aus einem asynchronen Iterator in einen beschreibbaren Stream schreiben, stellen Sie sicher, dass der Gegendruck und Fehler korrekt behandelt werden. stream.pipeline()
abstrahiert die Behandlung von Gegendruck und Gegendruck-bezogenen Fehlern:
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')
const writable = fs.createWriteStream('./file')
const ac = new AbortController()
const signal = ac.signal
const iterator = createIterator({ signal })
// Callback-Muster
pipeline(iterator, writable, (err, value) => {
if (err) {
console.error(err)
} else {
console.log(value, 'Wert zurückgegeben')
}
}).on('close', () => {
ac.abort()
})
// Promise-Muster
pipelinePromise(iterator, writable)
.then(value => {
console.log(value, 'Wert zurückgegeben')
})
.catch(err => {
console.error(err)
ac.abort()
})
Kompatibilität mit älteren Node.js-Versionen
Vor Node.js 0.10 war die Readable
-Stream-Schnittstelle einfacher, aber auch weniger leistungsfähig und weniger nützlich.
- Anstatt auf Aufrufe der Methode
stream.read()
zu warten, wurden'data'
-Ereignisse sofort ausgelöst. Anwendungen, die einen gewissen Arbeitsaufwand benötigten, um zu entscheiden, wie mit Daten umzugehen ist, mussten gelesene Daten in Puffern speichern, damit die Daten nicht verloren gingen. - Die Methode
stream.pause()
war eher empfehlend als garantiert. Das bedeutete, dass es immer noch notwendig war, darauf vorbereitet zu sein,'data'
-Ereignisse zu empfangen, auch wenn sich der Stream in einem pausierten Zustand befand.
In Node.js 0.10 wurde die Klasse Readable
hinzugefügt. Aus Gründen der Abwärtskompatibilität mit älteren Node.js-Programmen wechseln Readable
-Streams in den "Flussmodus", wenn ein 'data'
-Ereignishandler hinzugefügt wird oder wenn die Methode stream.resume()
aufgerufen wird. Der Effekt ist, dass es selbst dann, wenn man die neue Methode stream.read()
und das Ereignis 'readable'
nicht verwendet, nicht mehr nötig ist, sich um den Verlust von 'data'
-Chunks zu sorgen.
Während die meisten Anwendungen weiterhin normal funktionieren, führt dies zu einem Sonderfall unter den folgenden Bedingungen:
- Es wird kein
'data'
-Ereignis-Listener hinzugefügt. - Die Methode
stream.resume()
wird nie aufgerufen. - Der Stream wird nicht zu einem beschreibbaren Ziel weitergeleitet.
Betrachten Sie zum Beispiel den folgenden Code:
// WARNUNG! DEFEKT!
net
.createServer(socket => {
// Wir fügen einen 'end'-Listener hinzu, konsumieren aber nie die Daten.
socket.on('end', () => {
// Es wird niemals hierher kommen.
socket.end('Die Nachricht wurde empfangen, aber nicht verarbeitet.\n')
})
})
.listen(1337)
Vor Node.js 0.10 wurden die eingehenden Nachrichtendaten einfach verworfen. In Node.js 0.10 und höher bleibt der Socket jedoch für immer pausiert.
Die Problemumgehung in dieser Situation besteht darin, die Methode stream.resume()
aufzurufen, um den Datenfluss zu starten:
// Problemumgehung.
net
.createServer(socket => {
socket.on('end', () => {
socket.end('Die Nachricht wurde empfangen, aber nicht verarbeitet.\n')
})
// Starten Sie den Datenfluss und verwerfen Sie ihn.
socket.resume()
})
.listen(1337)
Zusätzlich zum Umschalten neuer Readable
-Streams in den Flussmodus können Streams im Pre-0.10-Stil mit der Methode readable.wrap()
in eine Readable
-Klasse verpackt werden.
readable.read(0)
Es gibt einige Fälle, in denen es notwendig ist, eine Aktualisierung der zugrunde liegenden lesbaren Stream-Mechanismen auszulösen, ohne tatsächlich Daten zu konsumieren. In solchen Fällen ist es möglich, readable.read(0)
aufzurufen, was immer null
zurückgibt.
Wenn der interne Lesepuffer unterhalb des highWaterMark
liegt und der Stream gerade nicht liest, löst der Aufruf von stream.read(0)
einen Low-Level-Aufruf stream._read()
aus.
Während die meisten Anwendungen dies fast nie tun müssen, gibt es in Node.js Situationen, in denen dies geschieht, insbesondere in den Interna der Readable
-Stream-Klasse.
readable.push('')
Die Verwendung von readable.push('')
wird nicht empfohlen.
Das Pushen eines Null-Byte-<string>, <Buffer>, <TypedArray> oder <DataView> in einen Stream, der sich nicht im Objektmodus befindet, hat einen interessanten Nebeneffekt. Da es sich um einen Aufruf von readable.push()
handelt, beendet der Aufruf den Leseprozess. Da das Argument jedoch eine leere Zeichenkette ist, werden dem lesbaren Puffer keine Daten hinzugefügt, sodass ein Benutzer nichts verbrauchen kann.
highWaterMark
-Diskrepanz nach dem Aufruf von readable.setEncoding()
Die Verwendung von readable.setEncoding()
ändert das Verhalten von highWaterMark
im Nicht-Objektmodus.
Normalerweise wird die Größe des aktuellen Puffers mit dem highWaterMark
in Bytes gemessen. Nach dem Aufruf von setEncoding()
beginnt die Vergleichsfunktion jedoch, die Größe des Puffers in Zeichen zu messen.
Dies ist in häufigen Fällen mit latin1
oder ascii
kein Problem. Es ist jedoch ratsam, dieses Verhalten bei der Arbeit mit Strings zu beachten, die Multibyte-Zeichen enthalten könnten.