Skip to content

Stream

[Stable: 2 - Stable]

Stable: 2 Stabilität: 2 - Stabil

Quellcode: lib/stream.js

Ein Stream ist eine abstrakte Schnittstelle für die Arbeit mit Streaming-Daten in Node.js. Das Modul node:stream bietet eine API zur Implementierung der Stream-Schnittstelle.

Es gibt viele Stream-Objekte, die von Node.js bereitgestellt werden. Beispielsweise sind eine Anfrage an einen HTTP-Server und process.stdout beides Stream-Instanzen.

Streams können lesbar, schreibbar oder beides sein. Alle Streams sind Instanzen von EventEmitter.

Um auf das Modul node:stream zuzugreifen:

js
const stream = require('node:stream')

Das Modul node:stream ist nützlich, um neue Arten von Stream-Instanzen zu erstellen. Es ist in der Regel nicht erforderlich, das Modul node:stream zu verwenden, um Streams zu konsumieren.

Gliederung dieses Dokuments

Dieses Dokument enthält zwei Hauptabschnitte und einen dritten Abschnitt für Anmerkungen. Der erste Abschnitt erklärt, wie man vorhandene Streams innerhalb einer Anwendung verwendet. Der zweite Abschnitt erklärt, wie man neue Arten von Streams erstellt.

Arten von Streams

Es gibt vier grundlegende Stream-Typen in Node.js:

Zusätzlich enthält dieses Modul die Hilfsfunktionen stream.duplexPair(), stream.pipeline(), stream.finished(), stream.Readable.from() und stream.addAbortSignal().

Streams Promises API

Hinzugefügt in: v15.0.0

Die stream/promises API bietet einen alternativen Satz von asynchronen Hilfsfunktionen für Streams, die Promise-Objekte zurückgeben, anstatt Callbacks zu verwenden. Die API ist über require('node:stream/promises') oder require('node:stream').promises zugänglich.

stream.pipeline(source[, ...transforms], destination[, options])

stream.pipeline(streams[, options])

[Historie]

VersionÄnderungen
v18.0.0, v17.2.0, v16.14.0Füge die end-Option hinzu, die auf false gesetzt werden kann, um zu verhindern, dass der Ziel-Stream automatisch geschlossen wird, wenn die Quelle endet.
v15.0.0Hinzugefügt in: v15.0.0
js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'))
  console.log('Pipeline erfolgreich.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'))
console.log('Pipeline erfolgreich.')

Um ein AbortSignal zu verwenden, übergeben Sie es als letztes Argument innerhalb eines Options-Objekts. Wenn das Signal abgebrochen wird, wird destroy auf der zugrunde liegenden Pipeline mit einem AbortError aufgerufen.

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  const ac = new AbortController()
  const signal = ac.signal

  setImmediate(() => ac.abort())
  await pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), {
    signal,
  })
}

run().catch(console.error) // AbortError
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'
import { createGzip } from 'node:zlib'

const ac = new AbortController()
const { signal } = ac
setImmediate(() => ac.abort())
try {
  await pipeline(createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal })
} catch (err) {
  console.error(err) // AbortError
}

Die pipeline API unterstützt auch asynchrone Generatoren:

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(
    fs.createReadStream('lowercase.txt'),
    async function* (source, { signal }) {
      source.setEncoding('utf8') // Arbeite mit Strings anstelle von `Buffer`s.
      for await (const chunk of source) {
        yield await processChunk(chunk, { signal })
      }
    },
    fs.createWriteStream('uppercase.txt')
  )
  console.log('Pipeline erfolgreich.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import { createReadStream, createWriteStream } from 'node:fs'

await pipeline(
  createReadStream('lowercase.txt'),
  async function* (source, { signal }) {
    source.setEncoding('utf8') // Arbeite mit Strings anstelle von `Buffer`s.
    for await (const chunk of source) {
      yield await processChunk(chunk, { signal })
    }
  },
  createWriteStream('uppercase.txt')
)
console.log('Pipeline erfolgreich.')

Denken Sie daran, das signal-Argument zu behandeln, das an den asynchronen Generator übergeben wird. Insbesondere in dem Fall, in dem der asynchrone Generator die Quelle für die Pipeline ist (d.h. das erste Argument), oder die Pipeline wird niemals abgeschlossen.

js
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')

async function run() {
  await pipeline(async function* ({ signal }) {
    await someLongRunningfn({ signal })
    yield 'asd'
  }, fs.createWriteStream('uppercase.txt'))
  console.log('Pipeline erfolgreich.')
}

run().catch(console.error)
js
import { pipeline } from 'node:stream/promises'
import fs from 'node:fs'
await pipeline(async function* ({ signal }) {
  await someLongRunningfn({ signal })
  yield 'asd'
}, fs.createWriteStream('uppercase.txt'))
console.log('Pipeline erfolgreich.')

Die pipeline API bietet eine Callback-Version:

stream.finished(stream[, options])

[Verlauf]

VersionÄnderungen
v19.5.0, v18.14.0Unterstützung für ReadableStream und WritableStream hinzugefügt.
v19.1.0, v18.13.0Die Option cleanup wurde hinzugefügt.
v15.0.0Hinzugefügt in: v15.0.0
js
const { finished } = require('node:stream/promises')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Stream ist mit dem Lesen fertig.')
}

run().catch(console.error)
rs.resume() // Den Stream leeren.
js
import { finished } from 'node:stream/promises'
import { createReadStream } from 'node:fs'

const rs = createReadStream('archive.tar')

async function run() {
  await finished(rs)
  console.log('Stream ist mit dem Lesen fertig.')
}

run().catch(console.error)
rs.resume() // Den Stream leeren.

Die finished-API bietet auch eine Callback-Version.

stream.finished() hinterlässt hängende Event-Listener (insbesondere 'error', 'end', 'finish' und 'close'), nachdem das zurückgegebene Promise aufgelöst oder abgelehnt wurde. Der Grund dafür ist, dass unerwartete 'error'-Ereignisse (aufgrund fehlerhafter Stream-Implementierungen) keine unerwarteten Abstürze verursachen. Wenn dieses Verhalten unerwünscht ist, sollte options.cleanup auf true gesetzt werden:

js
await finished(rs, { cleanup: true })

Objektmodus

Alle von Node.js-APIs erstellten Streams arbeiten ausschließlich mit Strings, <Buffer>, <TypedArray> und <DataView>-Objekten:

  • Strings und Buffers sind die am häufigsten verwendeten Typen mit Streams.
  • TypedArray und DataView ermöglichen es Ihnen, binäre Daten mit Typen wie Int32Array oder Uint8Array zu verarbeiten. Wenn Sie ein TypedArray oder DataView in einen Stream schreiben, verarbeitet Node.js die rohen Bytes.

Es ist jedoch möglich, dass Stream-Implementierungen mit anderen Arten von JavaScript-Werten arbeiten (mit Ausnahme von null, das innerhalb von Streams einen besonderen Zweck erfüllt). Solche Streams gelten als im "Objektmodus" betrieben.

Stream-Instanzen werden in den Objektmodus versetzt, indem die Option objectMode verwendet wird, wenn der Stream erstellt wird. Der Versuch, einen bestehenden Stream in den Objektmodus zu versetzen, ist nicht sicher.

Pufferung

Sowohl Writable als auch Readable-Streams speichern Daten in einem internen Puffer.

Die Menge der potenziell gepufferten Daten hängt von der Option highWaterMark ab, die dem Konstruktor des Streams übergeben wird. Bei normalen Streams gibt die Option highWaterMark eine Gesamtanzahl von Bytes an. Für Streams, die im Objektmodus arbeiten, gibt highWaterMark eine Gesamtzahl von Objekten an. Für Streams, die mit (aber nicht dekodierenden) Strings arbeiten, gibt highWaterMark eine Gesamtzahl von UTF-16-Codeeinheiten an.

Daten werden in Readable-Streams gepuffert, wenn die Implementierung stream.push(chunk) aufruft. Wenn der Konsument des Streams nicht stream.read() aufruft, verbleiben die Daten in der internen Warteschlange, bis sie konsumiert werden.

Sobald die Gesamtgröße des internen Lesepuffers den durch highWaterMark angegebenen Schwellenwert erreicht, stoppt der Stream vorübergehend das Lesen von Daten aus der zugrunde liegenden Ressource, bis die aktuell gepufferten Daten konsumiert werden können (d. h. der Stream stoppt den Aufruf der internen readable._read()-Methode, die zum Füllen des Lesepuffers verwendet wird).

Daten werden in Writable-Streams gepuffert, wenn die Methode writable.write(chunk) wiederholt aufgerufen wird. Solange die Gesamtgröße des internen Schreibpuffers unterhalb des von highWaterMark festgelegten Schwellenwerts liegt, geben Aufrufe von writable.write() true zurück. Sobald die Größe des internen Puffers highWaterMark erreicht oder überschreitet, wird false zurückgegeben.

Ein Hauptziel der stream-API, insbesondere der Methode stream.pipe(), ist es, die Pufferung von Daten auf akzeptable Werte zu begrenzen, so dass Quellen und Ziele mit unterschiedlichen Geschwindigkeiten den verfügbaren Speicher nicht überfordern.

Die Option highWaterMark ist ein Schwellenwert, keine Grenze: Sie gibt die Datenmenge an, die ein Stream puffert, bevor er aufhört, weitere Daten anzufordern. Sie erzwingt im Allgemeinen keine strenge Speicherbegrenzung. Bestimmte Stream-Implementierungen können strengere Grenzen erzwingen, aber dies ist optional.

Da Duplex und Transform-Streams sowohl Readable als auch Writable sind, verwalten beide zwei separate interne Puffer, die zum Lesen und Schreiben verwendet werden, so dass jede Seite unabhängig voneinander arbeiten kann, während ein angemessener und effizienter Datenfluss aufrechterhalten wird. Beispielsweise sind net.Socket-Instanzen Duplex-Streams, deren Readable-Seite den Verbrauch von Daten ermöglicht, die vom Socket empfangen werden, und deren Writable-Seite das Schreiben von Daten an den Socket ermöglicht. Da Daten mit einer schnelleren oder langsameren Rate an den Socket geschrieben werden können als Daten empfangen werden, sollte jede Seite unabhängig voneinander arbeiten (und puffern).

Die Mechanik der internen Pufferung ist ein internes Implementierungsdetail und kann jederzeit geändert werden. Für bestimmte erweiterte Implementierungen können die internen Puffer jedoch mit writable.writableBuffer oder readable.readableBuffer abgerufen werden. Die Verwendung dieser undokumentierten Eigenschaften wird nicht empfohlen.

API für Stream-Konsumenten

Fast alle Node.js-Anwendungen, egal wie einfach, verwenden Streams in irgendeiner Form. Das folgende Beispiel zeigt die Verwendung von Streams in einer Node.js-Anwendung, die einen HTTP-Server implementiert:

js
const http = require('node:http')

const server = http.createServer((req, res) => {
  // `req` ist eine http.IncomingMessage, die ein lesbarer Stream ist.
  // `res` ist eine http.ServerResponse, die ein schreibbarer Stream ist.

  let body = ''
  // Die Daten als UTF8-Strings abrufen.
  // Wenn keine Kodierung festgelegt ist, werden Buffer-Objekte empfangen.
  req.setEncoding('utf8')

  // Lesbare Streams senden 'data'-Ereignisse aus, sobald ein Listener hinzugefügt wurde.
  req.on('data', chunk => {
    body += chunk
  })

  // Das 'end'-Ereignis zeigt an, dass der gesamte Body empfangen wurde.
  req.on('end', () => {
    try {
      const data = JSON.parse(body)
      // Etwas Interessantes an den Benutzer zurückschreiben:
      res.write(typeof data)
      res.end()
    } catch (er) {
      // oh oh! schlechtes JSON!
      res.statusCode = 400
      return res.end(`error: ${er.message}`)
    }
  })
})

server.listen(1337)

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" ist kein gültiges JSON

Writable-Streams (wie res im Beispiel) stellen Methoden wie write() und end() bereit, die verwendet werden, um Daten auf den Stream zu schreiben.

Readable-Streams verwenden die EventEmitter-API, um Anwendungscode zu benachrichtigen, wenn Daten zum Auslesen aus dem Stream verfügbar sind. Diese verfügbaren Daten können auf verschiedene Arten aus dem Stream gelesen werden.

Sowohl Writable- als auch Readable-Streams verwenden die EventEmitter-API auf verschiedene Arten, um den aktuellen Zustand des Streams zu kommunizieren.

Duplex- und Transform-Streams sind sowohl Writable als auch Readable.

Anwendungen, die entweder Daten in einen Stream schreiben oder Daten aus einem Stream konsumieren, müssen die Stream-Schnittstellen nicht direkt implementieren und haben im Allgemeinen keinen Grund, require('node:stream') aufzurufen.

Entwickler, die neue Arten von Streams implementieren möchten, sollten den Abschnitt API für Stream-Implementierer konsultieren.

Beschreibbare Streams

Beschreibbare Streams sind eine Abstraktion für ein Ziel, in das Daten geschrieben werden.

Beispiele für Writable-Streams sind:

Einige dieser Beispiele sind tatsächlich Duplex-Streams, die die Writable-Schnittstelle implementieren.

Alle Writable-Streams implementieren die durch die stream.Writable-Klasse definierte Schnittstelle.

Obwohl sich spezifische Instanzen von Writable-Streams in verschiedener Hinsicht unterscheiden können, folgen alle Writable-Streams dem gleichen grundlegenden Nutzungsmuster, wie im folgenden Beispiel veranschaulicht:

js
const myStream = getWritableStreamSomehow()
myStream.write('einige Daten')
myStream.write('noch mehr Daten')
myStream.end('Daten schreiben beendet')

Klasse: stream.Writable

Hinzugefügt in: v0.9.4

Event: 'close'

[Verlauf]

VersionÄnderungen
v10.0.0Füge die Option emitClose hinzu, um anzugeben, ob 'close' beim Zerstören ausgegeben wird.
v0.9.4Hinzugefügt in: v0.9.4

Das 'close'-Ereignis wird ausgelöst, wenn der Stream und alle seine zugrunde liegenden Ressourcen (z. B. ein Dateideskriptor) geschlossen wurden. Das Ereignis zeigt an, dass keine weiteren Ereignisse ausgelöst werden und keine weiteren Berechnungen erfolgen.

Ein Writable-Stream löst immer das 'close'-Ereignis aus, wenn er mit der Option emitClose erstellt wird.

Event: 'drain'

Hinzugefügt in: v0.9.4

Wenn ein Aufruf von stream.write(chunk) false zurückgibt, wird das 'drain'-Ereignis ausgelöst, wenn es angebracht ist, das Schreiben von Daten in den Stream fortzusetzen.

js
// Schreibt die Daten eine Million Mal in den bereitgestellten beschreibbaren Stream.
// Achten Sie auf den Gegendruck.
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000
  write()
  function write() {
    let ok = true
    do {
      i--
      if (i === 0) {
        // Letztes Mal!
        writer.write(data, encoding, callback)
      } else {
        // Prüfen, ob wir fortfahren oder warten sollen.
        // Übergeben Sie den Callback nicht, da wir noch nicht fertig sind.
        ok = writer.write(data, encoding)
      }
    } while (i > 0 && ok)
    if (i > 0) {
      // Musste frühzeitig anhalten!
      // Schreiben Sie noch etwas, sobald es abgelaufen ist.
      writer.once('drain', write)
    }
  }
}
Ereignis: 'error'

Hinzugefügt in: v0.9.4

Das Ereignis 'error' wird ausgelöst, wenn beim Schreiben oder Weiterleiten von Daten ein Fehler aufgetreten ist. Der Listener-Callback erhält beim Aufruf ein einzelnes Error-Argument.

Der Stream wird geschlossen, wenn das Ereignis 'error' ausgelöst wird, es sei denn, die Option autoDestroy wurde beim Erstellen des Streams auf false gesetzt.

Nach 'error' sollten keine weiteren Ereignisse außer 'close' ausgelöst werden (einschließlich 'error'-Ereignisse).

Ereignis: 'finish'

Hinzugefügt in: v0.9.4

Das Ereignis 'finish' wird ausgelöst, nachdem die Methode stream.end() aufgerufen wurde und alle Daten in das zugrunde liegende System geschrieben wurden.

js
const writer = getWritableStreamSomehow()
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`)
}
writer.on('finish', () => {
  console.log('Alle Schreibvorgänge sind nun abgeschlossen.')
})
writer.end('Das ist das Ende\n')
Ereignis: 'pipe'

Hinzugefügt in: v0.9.4

Das Ereignis 'pipe' wird ausgelöst, wenn die Methode stream.pipe() auf einem lesbaren Stream aufgerufen wird und dieser beschreibbare Stream zu seinen Zielen hinzugefügt wird.

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('pipe', src => {
  console.log('Etwas wird in den Writer geleitet.')
  assert.equal(src, reader)
})
reader.pipe(writer)
Ereignis: 'unpipe'

Hinzugefügt in: v0.9.4

Das Ereignis 'unpipe' wird ausgelöst, wenn die Methode stream.unpipe() auf einem Readable-Stream aufgerufen wird und dieser Writable-Stream aus seinen Zielen entfernt wird.

Dies wird auch ausgelöst, falls dieser Writable-Stream einen Fehler ausgibt, wenn ein Readable-Stream in ihn geleitet wird.

js
const writer = getWritableStreamSomehow()
const reader = getReadableStreamSomehow()
writer.on('unpipe', src => {
  console.log('Etwas hat aufgehört, in den Writer zu leiten.')
  assert.equal(src, reader)
})
reader.pipe(writer)
reader.unpipe(writer)
writable.cork()

Hinzugefügt in: v0.11.2

Die Methode writable.cork() zwingt alle geschriebenen Daten, im Speicher zwischengespeichert zu werden. Die zwischengespeicherten Daten werden geleert, wenn entweder die Methoden stream.uncork() oder stream.end() aufgerufen werden.

Die Hauptabsicht von writable.cork() ist es, eine Situation zu berücksichtigen, in der mehrere kleine Datenblöcke in schneller Folge in den Stream geschrieben werden. Anstatt sie sofort an das zugrunde liegende Ziel weiterzuleiten, puffert writable.cork() alle Datenblöcke, bis writable.uncork() aufgerufen wird. Diese werden dann alle an writable._writev() übergeben, falls vorhanden. Dies verhindert eine Head-of-Line-Blocking-Situation, bei der Daten zwischengespeichert werden, während darauf gewartet wird, dass der erste kleine Datenblock verarbeitet wird. Die Verwendung von writable.cork() ohne Implementierung von writable._writev() kann jedoch die Durchsatzleistung beeinträchtigen.

Siehe auch: writable.uncork(), writable._writev().

writable.destroy([error])

[Verlauf]

VersionÄnderungen
v14.0.0Wirkt als No-Op bei einem Stream, der bereits zerstört wurde.
v8.0.0Hinzugefügt in: v8.0.0
  • error <Error> Optional, ein Fehler, der mit dem Ereignis 'error' ausgelöst werden soll.
  • Gibt zurück: <this>

Zerstört den Stream. Löst optional ein 'error'-Ereignis und ein 'close'-Ereignis aus (es sei denn, emitClose ist auf false gesetzt). Nach diesem Aufruf ist der beschreibbare Stream beendet, und nachfolgende Aufrufe von write() oder end() führen zu einem ERR_STREAM_DESTROYED-Fehler. Dies ist eine destruktive und sofortige Möglichkeit, einen Stream zu zerstören. Vorherige Aufrufe von write() wurden möglicherweise nicht geleert und können einen ERR_STREAM_DESTROYED-Fehler auslösen. Verwenden Sie end() anstelle von destroy, wenn Daten vor dem Schließen geleert werden sollen, oder warten Sie auf das 'drain'-Ereignis, bevor Sie den Stream zerstören.

js
const { Writable } = require('node:stream')

const myStream = new Writable()

const fooErr = new Error('foo error')
myStream.destroy(fooErr)
myStream.on('error', fooErr => console.error(fooErr.message)) // foo error
js
const { Writable } = require('node:stream')

const myStream = new Writable()

myStream.destroy()
myStream.on('error', function wontHappen() {})
js
const { Writable } = require('node:stream')

const myStream = new Writable()
myStream.destroy()

myStream.write('foo', error => console.error(error.code))
// ERR_STREAM_DESTROYED

Sobald destroy() aufgerufen wurde, sind alle weiteren Aufrufe ein No-Op und es können keine weiteren Fehler außer von _destroy() als 'error' ausgelöst werden.

Implementierer sollten diese Methode nicht überschreiben, sondern stattdessen writable._destroy() implementieren.

writable.closed

Hinzugefügt in: v18.0.0

Ist true, nachdem 'close' emittiert wurde.

writable.destroyed

Hinzugefügt in: v8.0.0

Ist true, nachdem writable.destroy() aufgerufen wurde.

js
const { Writable } = require('node:stream')

const myStream = new Writable()

console.log(myStream.destroyed) // false
myStream.destroy()
console.log(myStream.destroyed) // true
writable.end([chunk[, encoding]][, callback])

[Verlauf]

VersionÄnderungen
v22.0.0, v20.13.0Das chunk-Argument kann jetzt eine TypedArray- oder DataView-Instanz sein.
v15.0.0Die callback wird vor 'finish' oder bei einem Fehler aufgerufen.
v14.0.0Die callback wird aufgerufen, wenn 'finish' oder 'error' emittiert wird.
v10.0.0Diese Methode gibt jetzt eine Referenz auf writable zurück.
v8.0.0Das chunk-Argument kann jetzt eine Uint8Array-Instanz sein.
v0.9.4Hinzugefügt in: v0.9.4

Der Aufruf der writable.end()-Methode signalisiert, dass keine weiteren Daten in das Writable geschrieben werden. Die optionalen Argumente chunk und encoding ermöglichen es, einen letzten zusätzlichen Datenblock zu schreiben, unmittelbar bevor der Stream geschlossen wird.

Der Aufruf der Methode stream.write() nach dem Aufruf von stream.end() löst einen Fehler aus.

js
// Schreibe 'hallo, ' und beende dann mit 'welt!'.
const fs = require('node:fs')
const file = fs.createWriteStream('example.txt')
file.write('hallo, ')
file.end('welt!')
// Jetzt mehr zu schreiben ist nicht erlaubt!
writable.setDefaultEncoding(encoding)

[Verlauf]

VersionÄnderungen
v6.1.0Diese Methode gibt jetzt eine Referenz auf writable zurück.
v0.11.15Hinzugefügt in: v0.11.15

Die Methode writable.setDefaultEncoding() setzt die Standard encoding für einen Writable Stream.

writable.uncork()

Hinzugefügt in: v0.11.2

Die Methode writable.uncork() leert alle Daten, die seit dem Aufruf von stream.cork() gepuffert wurden.

Wenn writable.cork() und writable.uncork() verwendet werden, um die Pufferung von Schreibvorgängen in einen Stream zu verwalten, verschieben Sie Aufrufe von writable.uncork() mithilfe von process.nextTick(). Dies ermöglicht das Batching aller writable.write()-Aufrufe, die innerhalb einer bestimmten Node.js-Ereignisschleifenphase auftreten.

js
stream.cork()
stream.write('einige ')
stream.write('daten ')
process.nextTick(() => stream.uncork())

Wenn die Methode writable.cork() mehrmals für einen Stream aufgerufen wird, muss die gleiche Anzahl von Aufrufen von writable.uncork() aufgerufen werden, um die gepufferten Daten zu leeren.

js
stream.cork()
stream.write('einige ')
stream.cork()
stream.write('daten ')
process.nextTick(() => {
  stream.uncork()
  // Die Daten werden erst geleert, wenn uncork() ein zweites Mal aufgerufen wird.
  stream.uncork()
})

Siehe auch: writable.cork().

writable.writable

Hinzugefügt in: v11.4.0

Ist true, wenn es sicher ist, writable.write() aufzurufen, was bedeutet, dass der Stream nicht zerstört, fehlerhaft oder beendet wurde.

writable.writableAborted

Hinzugefügt in: v18.0.0, v16.17.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

Gibt zurück, ob der Stream zerstört oder ein Fehler aufgetreten ist, bevor 'finish' ausgegeben wurde.

writable.writableEnded

Hinzugefügt in: v12.9.0

Ist true, nachdem writable.end() aufgerufen wurde. Diese Eigenschaft gibt nicht an, ob die Daten geleert wurden. Verwenden Sie stattdessen writable.writableFinished.

writable.writableCorked

Hinzugefügt in: v13.2.0, v12.16.0

Anzahl der Aufrufe von writable.uncork(), die erforderlich sind, um den Stream vollständig zu entkorken.

writable.errored

Hinzugefügt in: v18.0.0

Gibt einen Fehler zurück, wenn der Stream mit einem Fehler zerstört wurde.

writable.writableFinished

Hinzugefügt in: v12.6.0

Wird unmittelbar vor dem Ausgeben des Ereignisses 'finish' auf true gesetzt.

writable.writableHighWaterMark

Hinzugefügt in: v9.3.0

Gibt den Wert von highWaterMark zurück, der bei der Erstellung dieses Writable übergeben wurde.

writable.writableLength

Hinzugefügt in: v9.4.0

Diese Eigenschaft enthält die Anzahl der Bytes (oder Objekte) in der Warteschlange, die zum Schreiben bereit sind. Der Wert liefert Introspektionsdaten zum Status von highWaterMark.

writable.writableNeedDrain

Hinzugefügt in: v15.2.0, v14.17.0

Ist true, wenn der Puffer des Streams voll ist und der Stream 'drain' ausgibt.

writable.writableObjectMode

Hinzugefügt in: v12.3.0

Getter für die Eigenschaft objectMode eines gegebenen Writable-Streams.

writable[Symbol.asyncDispose]()

Hinzugefügt in: v22.4.0, v20.16.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

Ruft writable.destroy() mit einem AbortError auf und gibt eine Zusage zurück, die erfüllt wird, wenn der Stream beendet ist.

writable.write(chunk[, encoding][, callback])

[Verlauf]

VersionÄnderungen
v22.0.0, v20.13.0Das chunk-Argument kann jetzt eine TypedArray- oder DataView-Instanz sein.
v8.0.0Das chunk-Argument kann jetzt eine Uint8Array-Instanz sein.
v6.0.0Das Übergeben von null als chunk-Parameter wird jetzt immer als ungültig betrachtet, auch im Objektmodus.
v0.9.4Hinzugefügt in: v0.9.4
  • chunk <string> | <Buffer> | <TypedArray> | <DataView> | <any> Optionale Daten zum Schreiben. Für Streams, die nicht im Objektmodus arbeiten, muss chunk ein <string>, <Buffer>, <TypedArray> oder <DataView> sein. Für Streams im Objektmodus kann chunk ein beliebiger JavaScript-Wert außer null sein.
  • encoding <string> | <null> Die Kodierung, wenn chunk eine Zeichenkette ist. Standard: 'utf8'
  • callback <Function> Callback für wenn dieser Datenblock geleert wurde.
  • Gibt zurück: <boolean> false, wenn der Stream möchte, dass der aufrufende Code wartet, bis das 'drain'-Ereignis ausgelöst wird, bevor er mit dem Schreiben zusätzlicher Daten fortfährt; andernfalls true.

Die Methode writable.write() schreibt einige Daten in den Stream und ruft den bereitgestellten callback auf, sobald die Daten vollständig verarbeitet wurden. Wenn ein Fehler auftritt, wird der callback mit dem Fehler als erstem Argument aufgerufen. Der callback wird asynchron und vor der Auslösung von 'error' aufgerufen.

Der Rückgabewert ist true, wenn der interne Puffer kleiner ist als das highWaterMark, das bei der Erstellung des Streams nach der Aufnahme von chunk konfiguriert wurde. Wenn false zurückgegeben wird, sollten weitere Versuche, Daten in den Stream zu schreiben, gestoppt werden, bis das Ereignis 'drain' ausgelöst wird.

Während ein Stream nicht geleert wird, werden Aufrufe von write() chunk puffern und false zurückgeben. Sobald alle aktuell gepufferten Blöcke geleert sind (vom Betriebssystem zur Lieferung angenommen), wird das 'drain'-Ereignis ausgelöst. Sobald write() false zurückgibt, schreiben Sie keine weiteren Blöcke, bis das 'drain'-Ereignis ausgelöst wird. Während das Aufrufen von write() auf einem Stream, der nicht geleert wird, erlaubt ist, puffert Node.js alle geschriebenen Blöcke, bis die maximale Speichernutzung eintritt, an der Stelle wird bedingungslos abgebrochen. Selbst bevor es abbricht, verursacht eine hohe Speichernutzung eine schlechte Leistung des Garbage Collectors und einen hohen RSS (der normalerweise nicht an das System zurückgegeben wird, auch wenn der Speicher nicht mehr benötigt wird). Da TCP-Sockets möglicherweise niemals geleert werden, wenn der Remote-Peer die Daten nicht liest, kann das Schreiben eines Sockets, der nicht geleert wird, zu einer aus der Ferne ausnutzbaren Schwachstelle führen.

Das Schreiben von Daten, während der Stream nicht geleert wird, ist besonders problematisch für einen Transform, da die Transform-Streams standardmäßig angehalten werden, bis sie per Pipe verbunden werden oder ein 'data'- oder 'readable'-Ereignishandler hinzugefügt wird.

Wenn die zu schreibenden Daten bei Bedarf generiert oder abgerufen werden können, wird empfohlen, die Logik in einen Readable zu kapseln und stream.pipe() zu verwenden. Wenn es jedoch bevorzugt wird, write() aufzurufen, ist es möglich, den Gegendruck zu berücksichtigen und Speicherprobleme mithilfe des Ereignisses 'drain' zu vermeiden:

js
function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb)
  } else {
    process.nextTick(cb)
  }
}

// Warten Sie, bis cb aufgerufen wird, bevor Sie weitere Schreibvorgänge durchführen.
write('hallo', () => {
  console.log('Schreiben abgeschlossen, jetzt weitere Schreibvorgänge durchführen.')
})

Ein Writable-Stream im Objektmodus ignoriert immer das Argument encoding.

Lesbare Streams

Lesbare Streams sind eine Abstraktion für eine Quelle, aus der Daten konsumiert werden.

Beispiele für Readable-Streams sind:

Alle Readable-Streams implementieren die Schnittstelle, die von der stream.Readable-Klasse definiert wird.

Zwei Lesemodi

Readable-Streams arbeiten effektiv in einem von zwei Modi: fließend und pausiert. Diese Modi sind unabhängig vom Objektmodus. Ein Readable-Stream kann im Objektmodus sein oder nicht, unabhängig davon, ob er sich im fließenden Modus oder im pausierten Modus befindet.

  • Im fließenden Modus werden Daten automatisch aus dem zugrunde liegenden System gelesen und einer Anwendung so schnell wie möglich über Ereignisse über die EventEmitter-Schnittstelle bereitgestellt.
  • Im pausierten Modus muss die stream.read()-Methode explizit aufgerufen werden, um Datenblöcke aus dem Stream zu lesen.

Alle Readable-Streams beginnen im pausierten Modus, können aber auf eine der folgenden Arten in den fließenden Modus geschaltet werden:

Der Readable kann mit einer der folgenden Methoden wieder in den pausierten Modus geschaltet werden:

  • Wenn es keine Pipe-Ziele gibt, durch Aufrufen der stream.pause()-Methode.
  • Wenn es Pipe-Ziele gibt, durch Entfernen aller Pipe-Ziele. Mehrere Pipe-Ziele können durch Aufrufen der stream.unpipe()-Methode entfernt werden.

Das wichtigste Konzept, an das man sich erinnern sollte, ist, dass ein Readable keine Daten erzeugt, bis ein Mechanismus zur Verfügung gestellt wird, der diese Daten entweder konsumiert oder ignoriert. Wenn der Konsummechanismus deaktiviert oder entfernt wird, wird der Readable versuchen, die Datenerzeugung zu stoppen.

Aus Gründen der Abwärtskompatibilität werden 'data'-Ereignishandler nicht automatisch den Stream pausieren. Wenn es außerdem Pipe-Ziele gibt, garantiert der Aufruf von stream.pause() nicht, dass der Stream pausiert bleibt, sobald diese Ziele geleert sind und nach weiteren Daten fragen.

Wenn ein Readable in den fließenden Modus geschaltet wird und keine Konsumenten zur Verfügung stehen, um die Daten zu verarbeiten, gehen diese Daten verloren. Dies kann z. B. auftreten, wenn die Methode readable.resume() aufgerufen wird, ohne dass ein Listener an das 'data'-Ereignis angehängt ist, oder wenn ein 'data'-Ereignishandler aus dem Stream entfernt wird.

Das Hinzufügen eines 'readable'-Ereignishandlers bewirkt automatisch, dass der Stream nicht mehr fließt, und die Daten müssen über readable.read() konsumiert werden. Wenn der 'readable'-Ereignishandler entfernt wird, beginnt der Stream wieder zu fließen, wenn ein 'data'-Ereignishandler vorhanden ist.

Drei Zustände

Die „zwei Modi“ des Betriebs für einen Readable-Stream sind eine vereinfachte Abstraktion für die kompliziertere interne Zustandsverwaltung, die innerhalb der Readable-Stream-Implementierung stattfindet.

Genauer gesagt, befindet sich jeder Readable-Stream zu einem bestimmten Zeitpunkt in einem von drei möglichen Zuständen:

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

Wenn readable.readableFlowing null ist, wird kein Mechanismus zum Verbrauch der Daten des Streams bereitgestellt. Daher erzeugt der Stream keine Daten. In diesem Zustand wechselt das Anhängen eines Listeners für das 'data'-Ereignis, das Aufrufen der readable.pipe()-Methode oder das Aufrufen der readable.resume()-Methode readable.readableFlowing auf true, wodurch der Readable-Stream beginnt, aktiv Ereignisse auszugeben, wenn Daten generiert werden.

Das Aufrufen von readable.pause(), readable.unpipe() oder das Empfangen von Gegendruck führt dazu, dass readable.readableFlowing auf false gesetzt wird, wodurch der Fluss von Ereignissen vorübergehend angehalten wird, aber nicht die Generierung von Daten. In diesem Zustand wechselt das Anhängen eines Listeners für das 'data'-Ereignis nicht readable.readableFlowing auf true.

js
const { PassThrough, Writable } = require('node:stream')
const pass = new PassThrough()
const writable = new Writable()

pass.pipe(writable)
pass.unpipe(writable)
// readableFlowing ist jetzt false.

pass.on('data', chunk => {
  console.log(chunk.toString())
})
// readableFlowing ist immer noch false.
pass.write('ok') // Gibt kein 'data' aus.
pass.resume() // Muss aufgerufen werden, damit der Stream 'data' ausgibt.
// readableFlowing ist jetzt true.

Während readable.readableFlowing false ist, können sich Daten im internen Puffer des Streams ansammeln.

Wählen Sie einen API-Stil

Die Readable-Stream-API hat sich über mehrere Node.js-Versionen hinweg entwickelt und bietet mehrere Methoden zum Verbrauch von Stream-Daten. Im Allgemeinen sollten Entwickler eine der Methoden zum Verbrauch von Daten wählen und sollten niemals mehrere Methoden verwenden, um Daten aus einem einzelnen Stream zu verbrauchen. Insbesondere die Verwendung einer Kombination aus on('data'), on('readable'), pipe() oder asynchronen Iteratoren kann zu unintuitivem Verhalten führen.

Klasse: stream.Readable

Hinzugefügt in: v0.9.4

Ereignis: 'close'

[Verlauf]

VersionÄnderungen
v10.0.0Option emitClose hinzugefügt, um anzugeben, ob 'close' bei der Zerstörung emittiert wird.
v0.9.4Hinzugefügt in: v0.9.4

Das Ereignis 'close' wird ausgelöst, wenn der Stream und alle seine zugrunde liegenden Ressourcen (z. B. ein Dateideskriptor) geschlossen wurden. Das Ereignis signalisiert, dass keine weiteren Ereignisse ausgelöst und keine weiteren Berechnungen durchgeführt werden.

Ein Readable-Stream löst immer das Ereignis 'close' aus, wenn er mit der Option emitClose erstellt wird.

Ereignis: 'data'

Hinzugefügt in: v0.9.4

  • chunk <Buffer> | <string> | <any> Der Datenchunk. Für Streams, die nicht im Objektmodus arbeiten, ist der Chunk entweder ein String oder ein Buffer. Für Streams, die sich im Objektmodus befinden, kann der Chunk ein beliebiger JavaScript-Wert außer null sein.

Das Ereignis 'data' wird jedes Mal ausgelöst, wenn der Stream die Eigentümerschaft an einem Datenchunk an einen Consumer abgibt. Dies kann geschehen, wenn der Stream durch Aufrufen von readable.pipe(), readable.resume() in den Fließmodus geschaltet wird oder durch Anhängen einer Listener-Callback-Funktion an das Ereignis 'data'. Das Ereignis 'data' wird auch ausgelöst, wenn die Methode readable.read() aufgerufen wird und ein Datenchunk zurückgegeben werden kann.

Das Anhängen eines 'data'-Ereignis-Listeners an einen Stream, der nicht explizit pausiert wurde, schaltet den Stream in den Fließmodus. Daten werden dann übergeben, sobald sie verfügbar sind.

Die Listener-Callback-Funktion erhält den Datenchunk als String, wenn eine Standardkodierung für den Stream mithilfe der Methode readable.setEncoding() angegeben wurde; andernfalls werden die Daten als Buffer übergeben.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Empfangen ${chunk.length} Bytes an Daten.`)
})
Ereignis: 'end'

Hinzugefügt in: v0.9.4

Das Ereignis 'end' wird ausgelöst, wenn keine weiteren Daten mehr aus dem Stream konsumiert werden können.

Das 'end'-Ereignis wird nicht ausgelöst, solange die Daten nicht vollständig konsumiert sind. Dies kann erreicht werden, indem der Stream in den fließenden Modus geschaltet wird oder indem stream.read() wiederholt aufgerufen wird, bis alle Daten konsumiert wurden.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Empfangene ${chunk.length} Bytes an Daten.`)
})
readable.on('end', () => {
  console.log('Es werden keine weiteren Daten mehr folgen.')
})
Ereignis: 'error'

Hinzugefügt in: v0.9.4

Das Ereignis 'error' kann jederzeit von einer Readable-Implementierung ausgelöst werden. Dies kann in der Regel auftreten, wenn der zugrunde liegende Stream aufgrund eines internen Fehlers keine Daten generieren kann oder wenn eine Stream-Implementierung versucht, einen ungültigen Datenblock zu übertragen.

Der Listener-Callback erhält ein einzelnes Error-Objekt.

Ereignis: 'pause'

Hinzugefügt in: v0.9.4

Das Ereignis 'pause' wird ausgelöst, wenn stream.pause() aufgerufen wird und readableFlowing nicht false ist.

Ereignis: 'readable'

[Verlauf]

VersionÄnderungen
v10.0.0Das 'readable' wird immer im nächsten Tick nach dem Aufruf von .push() ausgelöst.
v10.0.0Die Verwendung von 'readable' erfordert das Aufrufen von .read().
v0.9.4Hinzugefügt in: v0.9.4

Das 'readable'-Ereignis wird ausgelöst, wenn Daten zum Lesen aus dem Stream zur Verfügung stehen, bis zum konfigurierten High-Water-Mark (state.highWaterMark). Es signalisiert effektiv, dass der Stream neue Informationen innerhalb des Puffers hat. Wenn Daten innerhalb dieses Puffers verfügbar sind, kann stream.read() aufgerufen werden, um diese Daten abzurufen. Darüber hinaus kann das 'readable'-Ereignis auch ausgelöst werden, wenn das Ende des Streams erreicht wurde.

js
const readable = getReadableStreamSomehow()
readable.on('readable', function () {
  // Es gibt jetzt Daten zum Lesen.
  let data

  while ((data = this.read()) !== null) {
    console.log(data)
  }
})

Wenn das Ende des Streams erreicht wurde, gibt der Aufruf von stream.read() null zurück und löst das 'end'-Ereignis aus. Dies gilt auch, wenn es nie Daten zum Lesen gab. Im folgenden Beispiel ist foo.txt beispielsweise eine leere Datei:

js
const fs = require('node:fs')
const rr = fs.createReadStream('foo.txt')
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`)
})
rr.on('end', () => {
  console.log('end')
})

Die Ausgabe beim Ausführen dieses Skripts lautet:

bash
$ node test.js
readable: null
end

In einigen Fällen bewirkt das Anhängen eines Listeners für das 'readable'-Ereignis, dass eine gewisse Menge an Daten in einen internen Puffer gelesen wird.

Im Allgemeinen sind die Mechanismen readable.pipe() und 'data'-Ereignis einfacher zu verstehen als das 'readable'-Ereignis. Die Behandlung von 'readable' kann jedoch zu einem höheren Durchsatz führen.

Wenn sowohl 'readable' als auch 'data' gleichzeitig verwendet werden, hat 'readable' Vorrang bei der Steuerung des Flusses, d. h. 'data' wird nur dann ausgelöst, wenn stream.read() aufgerufen wird. Die Eigenschaft readableFlowing würde false werden. Wenn 'data'-Listener vorhanden sind, wenn 'readable' entfernt wird, beginnt der Stream zu fließen, d. h. 'data'-Ereignisse werden ohne Aufruf von .resume() ausgelöst.

Ereignis: 'resume'

Hinzugefügt in: v0.9.4

Das 'resume'-Ereignis wird ausgelöst, wenn stream.resume() aufgerufen wird und readableFlowing nicht true ist.

readable.destroy([error])

[Verlauf]

VersionÄnderungen
v14.0.0Fungiert als No-Op für einen Stream, der bereits zerstört wurde.
v8.0.0Hinzugefügt in: v8.0.0
  • error <Error> Fehler, der als Payload im 'error'-Ereignis übergeben wird
  • Gibt zurück: <this>

Zerstört den Stream. Löst optional ein 'error'-Ereignis und ein 'close'-Ereignis aus (es sei denn, emitClose ist auf false gesetzt). Nach diesem Aufruf gibt der lesbare Stream alle internen Ressourcen frei und nachfolgende Aufrufe von push() werden ignoriert.

Sobald destroy() aufgerufen wurde, sind alle weiteren Aufrufe ein No-Op und es werden keine weiteren Fehler als 'error' ausgegeben, außer von _destroy().

Implementierer sollten diese Methode nicht überschreiben, sondern stattdessen readable._destroy() implementieren.

readable.closed

Hinzugefügt in: v18.0.0

Ist true, nachdem 'close' ausgelöst wurde.

readable.destroyed

Hinzugefügt in: v8.0.0

Ist true, nachdem readable.destroy() aufgerufen wurde.

readable.isPaused()

Hinzugefügt in: v0.11.14

Die readable.isPaused()-Methode gibt den aktuellen Betriebszustand des Readable zurück. Dies wird hauptsächlich vom Mechanismus verwendet, der der readable.pipe()-Methode zugrunde liegt. In den meisten typischen Fällen gibt es keinen Grund, diese Methode direkt zu verwenden.

js
const readable = new stream.Readable()

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()

Hinzugefügt in: v0.9.4

Die Methode readable.pause() bewirkt, dass ein Stream im Flussmodus keine 'data'-Ereignisse mehr ausgibt und aus dem Flussmodus wechselt. Alle Daten, die verfügbar werden, bleiben im internen Puffer.

js
const readable = getReadableStreamSomehow()
readable.on('data', chunk => {
  console.log(`Empfangen ${chunk.length} Byte an Daten.`)
  readable.pause()
  console.log('Es werden für 1 Sekunde keine weiteren Daten vorhanden sein.')
  setTimeout(() => {
    console.log('Jetzt werden die Daten wieder fließen.')
    readable.resume()
  }, 1000)
})

Die Methode readable.pause() hat keine Auswirkung, wenn ein 'readable'-Ereignis-Listener vorhanden ist.

readable.pipe(destination[, options])

Hinzugefügt in: v0.9.4

Die Methode readable.pipe() hängt einen Writable-Stream an den readable an, wodurch dieser automatisch in den Flussmodus wechselt und alle seine Daten an den angehängten Writable weiterleitet. Der Datenfluss wird automatisch so verwaltet, dass der Ziel-Writable-Stream nicht von einem schnelleren Readable-Stream überlastet wird.

Das folgende Beispiel leitet alle Daten vom readable in eine Datei namens file.txt:

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Alle Daten von readable gehen in 'file.txt'.
readable.pipe(writable)

Es ist möglich, mehrere Writable-Streams an einen einzelnen Readable-Stream anzuhängen.

Die Methode readable.pipe() gibt eine Referenz auf den Ziel-Stream zurück, wodurch es möglich ist, Ketten von gepipten Streams einzurichten:

js
const fs = require('node:fs')
const zlib = require('node:zlib')
const r = fs.createReadStream('file.txt')
const z = zlib.createGzip()
const w = fs.createWriteStream('file.txt.gz')
r.pipe(z).pipe(w)

Standardmäßig wird stream.end() auf dem Ziel-Writable-Stream aufgerufen, wenn der Quell-Readable-Stream 'end' ausgibt, sodass das Ziel nicht mehr beschreibbar ist. Um dieses Standardverhalten zu deaktivieren, kann die Option end als false übergeben werden, wodurch der Zielstream geöffnet bleibt:

js
reader.pipe(writer, { end: false })
reader.on('end', () => {
  writer.end('Goodbye\n')
})

Eine wichtige Einschränkung ist, dass, wenn der Readable-Stream während der Verarbeitung einen Fehler ausgibt, das Writable-Ziel nicht automatisch geschlossen wird. Wenn ein Fehler auftritt, ist es notwendig, jeden Stream manuell zu schließen, um Speicherlecks zu verhindern.

Die Writable-Streams process.stderr und process.stdout werden niemals geschlossen, bis der Node.js-Prozess beendet wird, unabhängig von den angegebenen Optionen.

readable.read([size])

Hinzugefügt in: v0.9.4

Die readable.read()-Methode liest Daten aus dem internen Puffer und gibt sie zurück. Wenn keine Daten zum Lesen verfügbar sind, wird null zurückgegeben. Standardmäßig werden die Daten als Buffer-Objekt zurückgegeben, es sei denn, eine Kodierung wurde mit der Methode readable.setEncoding() angegeben oder der Stream arbeitet im Objektmodus.

Das optionale Argument size gibt eine bestimmte Anzahl von zu lesenden Bytes an. Wenn nicht size Bytes zum Lesen verfügbar sind, wird null zurückgegeben, es sei denn, der Stream ist beendet. In diesem Fall werden alle im internen Puffer verbleibenden Daten zurückgegeben.

Wenn das Argument size nicht angegeben wird, werden alle im internen Puffer enthaltenen Daten zurückgegeben.

Das Argument size muss kleiner oder gleich 1 GiB sein.

Die readable.read()-Methode sollte nur auf Readable-Streams aufgerufen werden, die im pausierten Modus arbeiten. Im Fließmodus wird readable.read() automatisch aufgerufen, bis der interne Puffer vollständig entleert ist.

js
const readable = getReadableStreamSomehow()

// 'readable' kann mehrmals ausgelöst werden, wenn Daten zwischengespeichert werden
readable.on('readable', () => {
  let chunk
  console.log('Stream ist lesbar (neue Daten im Puffer empfangen)')
  // Verwenden Sie eine Schleife, um sicherzustellen, dass wir alle derzeit verfügbaren Daten lesen
  while (null !== (chunk = readable.read())) {
    console.log(`Lese ${chunk.length} Bytes an Daten...`)
  }
})

// 'end' wird einmal ausgelöst, wenn keine Daten mehr verfügbar sind
readable.on('end', () => {
  console.log('Ende des Streams erreicht.')
})

Jeder Aufruf von readable.read() gibt einen Datenblock oder null zurück, was bedeutet, dass im Moment keine weiteren Daten gelesen werden können. Diese Blöcke werden nicht automatisch verkettet. Da ein einzelner read()-Aufruf nicht alle Daten zurückgibt, kann die Verwendung einer While-Schleife erforderlich sein, um Blöcke kontinuierlich zu lesen, bis alle Daten abgerufen wurden. Beim Lesen einer großen Datei kann .read() vorübergehend null zurückgeben, was darauf hindeutet, dass der gesamte zwischengespeicherte Inhalt verbraucht wurde, aber möglicherweise noch weitere Daten zwischengespeichert werden müssen. In solchen Fällen wird ein neues 'readable'-Ereignis ausgelöst, sobald sich mehr Daten im Puffer befinden, und das Ereignis 'end' signalisiert das Ende der Datenübertragung.

Daher ist es zum Lesen des gesamten Inhalts einer Datei von einem readable erforderlich, Blöcke über mehrere 'readable'-Ereignisse hinweg zu sammeln:

js
const chunks = []

readable.on('readable', () => {
  let chunk
  while (null !== (chunk = readable.read())) {
    chunks.push(chunk)
  }
})

readable.on('end', () => {
  const content = chunks.join('')
})

Ein Readable-Stream im Objektmodus gibt immer ein einzelnes Element von einem Aufruf von readable.read(size) zurück, unabhängig vom Wert des Arguments size.

Wenn die Methode readable.read() einen Datenblock zurückgibt, wird auch ein 'data'-Ereignis ausgelöst.

Der Aufruf von stream.read([size]) nach dem Auslösen des Ereignisses 'end' gibt null zurück. Es wird kein Laufzeitfehler ausgelöst.

readable.readable

Hinzugefügt in: v11.4.0

Ist true, wenn es sicher ist, readable.read() aufzurufen, was bedeutet, dass der Stream nicht zerstört wurde oder 'error' oder 'end' ausgegeben hat.

readable.readableAborted

Hinzugefügt in: v16.8.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

Gibt zurück, ob der Stream zerstört oder mit einem Fehler behaftet wurde, bevor 'end' ausgegeben wurde.

readable.readableDidRead

Hinzugefügt in: v16.7.0, v14.18.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

Gibt zurück, ob 'data' ausgegeben wurde.

readable.readableEncoding

Hinzugefügt in: v12.7.0

Getter für die Eigenschaft encoding eines bestimmten Readable-Streams. Die Eigenschaft encoding kann mit der Methode readable.setEncoding() gesetzt werden.

readable.readableEnded

Hinzugefügt in: v12.9.0

Wird true, wenn das 'end'-Ereignis ausgegeben wird.

readable.errored

Hinzugefügt in: v18.0.0

Gibt einen Fehler zurück, wenn der Stream mit einem Fehler zerstört wurde.

readable.readableFlowing

Hinzugefügt in: v9.4.0

Diese Eigenschaft spiegelt den aktuellen Zustand eines Readable-Streams wider, wie im Abschnitt Drei Zustände beschrieben.

readable.readableHighWaterMark

Hinzugefügt in: v9.3.0

Gibt den Wert von highWaterMark zurück, der beim Erstellen dieses Readable übergeben wurde.

readable.readableLength

Hinzugefügt in: v9.4.0

Diese Eigenschaft enthält die Anzahl der Bytes (oder Objekte) in der Warteschlange, die zum Lesen bereitstehen. Der Wert liefert Introspektionsdaten zum Status von highWaterMark.

readable.readableObjectMode

Hinzugefügt in: v12.3.0

Getter für die Eigenschaft objectMode eines gegebenen Readable-Streams.

readable.resume()

[Verlauf]

VersionÄnderungen
v10.0.0resume() hat keine Auswirkung, wenn ein 'readable'-Event abgehört wird.
v0.9.4Hinzugefügt in: v0.9.4

Die Methode readable.resume() bewirkt, dass ein explizit pausierter Readable-Stream wieder 'data'-Events ausgibt und den Stream in den fließenden Modus schaltet.

Die Methode readable.resume() kann verwendet werden, um die Daten aus einem Stream vollständig zu konsumieren, ohne diese Daten tatsächlich zu verarbeiten:

js
getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Das Ende wurde erreicht, aber es wurde nichts gelesen.')
  })

Die Methode readable.resume() hat keine Auswirkung, wenn ein 'readable'-Event-Listener vorhanden ist.

readable.setEncoding(encoding)

Hinzugefügt in: v0.9.4

Die Methode readable.setEncoding() setzt die Zeichenkodierung für Daten, die aus dem Readable-Stream gelesen werden.

Standardmäßig wird keine Kodierung zugewiesen und Stream-Daten werden als Buffer-Objekte zurückgegeben. Durch das Setzen einer Kodierung werden die Stream-Daten als Strings der angegebenen Kodierung zurückgegeben, anstatt als Buffer-Objekte. Wenn Sie beispielsweise readable.setEncoding('utf8') aufrufen, werden die Ausgabedaten als UTF-8-Daten interpretiert und als Strings übergeben. Wenn Sie readable.setEncoding('hex') aufrufen, werden die Daten im hexadezimalen String-Format kodiert.

Der Readable-Stream verarbeitet Multibyte-Zeichen, die über den Stream geliefert werden, ordnungsgemäß, die andernfalls falsch dekodiert würden, wenn sie einfach als Buffer-Objekte aus dem Stream gezogen würden.

js
const readable = getReadableStreamSomehow()
readable.setEncoding('utf8')
readable.on('data', chunk => {
  assert.equal(typeof chunk, 'string')
  console.log('Es wurden %d Zeichen Stringdaten erhalten:', chunk.length)
})
readable.unpipe([destination])

Hinzugefügt in: v0.9.4

  • destination <stream.Writable> Optionaler spezifischer Stream, von dem die Pipe getrennt werden soll
  • Gibt zurück: <this>

Die Methode readable.unpipe() trennt einen Writable-Stream, der zuvor mit der Methode stream.pipe() verbunden wurde.

Wenn destination nicht angegeben ist, werden alle Pipes getrennt.

Wenn destination angegeben ist, aber keine Pipe dafür eingerichtet ist, tut die Methode nichts.

js
const fs = require('node:fs')
const readable = getReadableStreamSomehow()
const writable = fs.createWriteStream('file.txt')
// Alle Daten von readable gehen in 'file.txt',
// aber nur für die erste Sekunde.
readable.pipe(writable)
setTimeout(() => {
  console.log('Schreibe nicht mehr in file.txt.')
  readable.unpipe(writable)
  console.log('Dateistream manuell schließen.')
  writable.end()
}, 1000)
readable.unshift(chunk[, encoding])

[Verlauf]

VersionÄnderungen
v22.0.0, v20.13.0Das Argument chunk kann jetzt eine TypedArray- oder DataView-Instanz sein.
v8.0.0Das Argument chunk kann jetzt eine Uint8Array-Instanz sein.
v0.9.11Hinzugefügt in: v0.9.11

Das Übergeben von chunk als null signalisiert das Ende des Streams (EOF) und verhält sich gleich wie readable.push(null), wonach keine Daten mehr geschrieben werden können. Das EOF-Signal wird am Ende des Puffers platziert und alle gepufferten Daten werden weiterhin geleert.

Die Methode readable.unshift() schiebt einen Datenchunk zurück in den internen Puffer. Dies ist in bestimmten Situationen nützlich, in denen ein Stream von Code verarbeitet wird, der eine gewisse Datenmenge "nicht verbrauchen" muss, die er optimistisch aus der Quelle gezogen hat, sodass die Daten an eine andere Partei weitergegeben werden können.

Die Methode stream.unshift(chunk) kann nicht aufgerufen werden, nachdem das Ereignis 'end' ausgelöst wurde, da sonst ein Laufzeitfehler ausgelöst wird.

Entwickler, die stream.unshift() verwenden, sollten häufig den Wechsel zur Verwendung eines Transform-Streams in Betracht ziehen. Weitere Informationen finden Sie im Abschnitt API für Stream-Implementierer.

js
// Header abziehen, der durch \n\n begrenzt ist.
// unshift() verwenden, wenn wir zu viel bekommen.
// Den Callback mit (error, header, stream) aufrufen.
const { StringDecoder } = require('node:string_decoder')
function parseHeader(stream, callback) {
  stream.on('error', callback)
  stream.on('readable', onReadable)
  const decoder = new StringDecoder('utf8')
  let header = ''
  function onReadable() {
    let chunk
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk)
      if (str.includes('\n\n')) {
        // Headergrenze gefunden.
        const split = str.split(/\n\n/)
        header += split.shift()
        const remaining = split.join('\n\n')
        const buf = Buffer.from(remaining, 'utf8')
        stream.removeListener('error', callback)
        // Den 'readable'-Listener vor dem Unshifting entfernen.
        stream.removeListener('readable', onReadable)
        if (buf.length) stream.unshift(buf)
        // Jetzt kann der Nachrichtentext aus dem Stream gelesen werden.
        callback(null, header, stream)
        return
      }
      // Header wird noch gelesen.
      header += str
    }
  }
}

Im Gegensatz zu stream.push(chunk) beendet stream.unshift(chunk) den Leseprozess nicht, indem der interne Lesezustand des Streams zurückgesetzt wird. Dies kann zu unerwarteten Ergebnissen führen, wenn readable.unshift() während eines Lesevorgangs aufgerufen wird (d. h. innerhalb einer stream._read()-Implementierung in einem benutzerdefinierten Stream). Wenn dem Aufruf von readable.unshift() ein sofortiges stream.push('') folgt, wird der Lesezustand entsprechend zurückgesetzt. Es ist jedoch am besten, den Aufruf von readable.unshift() während der Durchführung eines Lesevorgangs zu vermeiden.

readable.wrap(stream)

Hinzugefügt in: v0.9.4

Vor Node.js 0.10 implementierten Streams nicht die gesamte node:stream Modul-API, wie sie derzeit definiert ist. (Siehe Kompatibilität für weitere Informationen.)

Wenn eine ältere Node.js-Bibliothek verwendet wird, die 'data' Ereignisse ausgibt und eine stream.pause() Methode hat, die nur beratend ist, kann die readable.wrap() Methode verwendet werden, um einen Readable Stream zu erstellen, der den alten Stream als Datenquelle verwendet.

Es wird selten notwendig sein, readable.wrap() zu verwenden, aber die Methode wurde als Komfortfunktion für die Interaktion mit älteren Node.js Anwendungen und Bibliotheken bereitgestellt.

js
const { OldReader } = require('./old-api-module.js')
const { Readable } = require('node:stream')
const oreader = new OldReader()
const myReader = new Readable().wrap(oreader)

myReader.on('readable', () => {
  myReader.read() // usw.
})
readable[Symbol.asyncIterator]()

[Verlauf]

VersionÄnderungen
v11.14.0Symbol.asyncIterator Unterstützung ist nicht mehr experimentell.
v10.0.0Hinzugefügt in: v10.0.0
js
const fs = require('node:fs')

async function print(readable) {
  readable.setEncoding('utf8')
  let data = ''
  for await (const chunk of readable) {
    data += chunk
  }
  console.log(data)
}

print(fs.createReadStream('file')).catch(console.error)

Wenn die Schleife mit einem break, return oder einem throw beendet wird, wird der Stream zerstört. Mit anderen Worten, das Iterieren über einen Stream verbraucht den Stream vollständig. Der Stream wird in Blöcken mit einer Größe, die der Option highWaterMark entspricht, gelesen. Im obigen Codebeispiel werden die Daten in einem einzigen Block sein, wenn die Datei weniger als 64 KiB Daten enthält, da keine highWaterMark Option für fs.createReadStream() bereitgestellt wird.

readable[Symbol.asyncDispose]()

Hinzugefügt in: v20.4.0, v18.18.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

Ruft readable.destroy() mit einem AbortError auf und gibt ein Promise zurück, das erfüllt wird, wenn der Stream beendet ist.

readable.compose(stream[, options])

Hinzugefügt in: v19.1.0, v18.13.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

js
import { Readable } from 'node:stream'

async function* splitToWords(source) {
  for await (const chunk of source) {
    const words = String(chunk).split(' ')

    for (const word of words) {
      yield word
    }
  }
}

const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords)
const words = await wordsStream.toArray()

console.log(words) // gibt ['this', 'is', 'compose', 'as', 'operator'] aus

Siehe stream.compose für weitere Informationen.

readable.iterator([options])

Hinzugefügt in: v16.3.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

  • options <Object>

    • destroyOnReturn <boolean> Wenn false gesetzt ist, wird durch den Aufruf von return auf dem asynchronen Iterator oder durch das Beenden einer for await...of-Iteration mit einem break, return oder throw der Stream nicht zerstört. Standard: true.
  • Gibt zurück: <AsyncIterator> um den Stream zu verarbeiten.

Der von dieser Methode erstellte Iterator gibt Benutzern die Möglichkeit, die Zerstörung des Streams abzubrechen, wenn die for await...of-Schleife durch return, break oder throw verlassen wird, oder ob der Iterator den Stream zerstören soll, wenn der Stream während der Iteration einen Fehler ausgibt.

js
const { Readable } = require('node:stream')

async function printIterator(readable) {
  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk) // 1
    break
  }

  console.log(readable.destroyed) // false

  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk) // Gibt 2 und dann 3 aus
  }

  console.log(readable.destroyed) // True, Stream wurde vollständig verbraucht
}

async function printSymbolAsyncIterator(readable) {
  for await (const chunk of readable) {
    console.log(chunk) // 1
    break
  }

  console.log(readable.destroyed) // true
}

async function showBoth() {
  await printIterator(Readable.from([1, 2, 3]))
  await printSymbolAsyncIterator(Readable.from([1, 2, 3]))
}

showBoth()
readable.map(fn[, options])

[Historie]

VersionÄnderungen
v20.7.0, v18.19.0highWaterMark in Optionen hinzugefügt.
v17.4.0, v16.14.0Hinzugefügt in: v17.4.0, v16.14.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

  • fn <Funktion> | <AsyncFunction> Eine Funktion zum Mappen über jeden Chunk im Stream.

    • data <any> Ein Daten-Chunk aus dem Stream.
    • options <Objekt>
    • signal <AbortSignal> wird abgebrochen, wenn der Stream zerstört wird, wodurch der fn-Aufruf frühzeitig abgebrochen werden kann.
  • options <Objekt>

    • concurrency <number> Die maximale Anzahl gleichzeitiger Aufrufe von fn im Stream. Standard: 1.
    • highWaterMark <number> Wie viele Elemente zwischengespeichert werden sollen, während auf die Konsumierung der gemappten Elemente durch den Benutzer gewartet wird. Standard: concurrency * 2 - 1.
    • signal <AbortSignal> ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
  • Gibt zurück: <Readable> ein Stream, der mit der Funktion fn gemappt wurde.

Diese Methode ermöglicht das Mappen über den Stream. Die Funktion fn wird für jeden Chunk im Stream aufgerufen. Wenn die Funktion fn ein Promise zurückgibt, wird dieses Promise awaited, bevor es an den Ergebnisstream weitergegeben wird.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// Mit einem synchronen Mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).map(x => x * 2)) {
  console.log(chunk) // 2, 4, 6, 8
}
// Mit einem asynchronen Mapper, der maximal 2 Abfragen gleichzeitig durchführt.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
  domain => resolver.resolve4(domain),
  { concurrency: 2 }
)
for await (const result of dnsResults) {
  console.log(result) // Protokolliert das DNS-Ergebnis von resolver.resolve4.
}
readable.filter(fn[, options])

[Verlauf]

VersionÄnderungen
v20.7.0, v18.19.0highWaterMark in Optionen hinzugefügt.
v17.4.0, v16.14.0Hinzugefügt in: v17.4.0, v16.14.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

  • fn <Funktion> | <AsyncFunction> Eine Funktion zum Filtern von Chunks aus dem Stream.

    • data <any> Ein Chunk von Daten aus dem Stream.
    • options <Object>
    • signal <AbortSignal> abgebrochen, wenn der Stream zerstört wird, was es ermöglicht, den fn-Aufruf frühzeitig abzubrechen.
  • options <Object>

    • concurrency <number> Die maximale Anzahl von gleichzeitigen Aufrufen von fn im Stream auf einmal. Standard: 1.
    • highWaterMark <number> Wie viele Elemente gepuffert werden sollen, während auf die Nutzung der gefilterten Elemente durch den Benutzer gewartet wird. Standard: concurrency * 2 - 1.
    • signal <AbortSignal> ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
  • Gibt zurück: <Readable> einen Stream, der mit dem Prädikat fn gefiltert wurde.

Diese Methode ermöglicht das Filtern des Streams. Für jeden Chunk im Stream wird die fn-Funktion aufgerufen, und wenn sie einen Truthy-Wert zurückgibt, wird der Chunk an den Ergebnisstream weitergegeben. Wenn die fn-Funktion ein Promise zurückgibt, wird auf dieses Promise gewartet (await).

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// Mit einem synchronen Prädikat.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
  console.log(chunk) // 3, 4
}
// Mit einem asynchronen Prädikat, das maximal 2 Abfragen gleichzeitig durchführt.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).filter(
  async domain => {
    const { address } = await resolver.resolve4(domain, { ttl: true })
    return address.ttl > 60
  },
  { concurrency: 2 }
)
for await (const result of dnsResults) {
  // Protokolliert Domains mit mehr als 60 Sekunden im aufgelösten DNS-Eintrag.
  console.log(result)
}
readable.forEach(fn[, options])

Hinzugefügt in: v17.5.0, v16.15.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

  • fn <Funktion> | <AsyncFunction> eine Funktion, die für jeden Chunk des Streams aufgerufen wird.

    • data <any> ein Daten-Chunk aus dem Stream.
    • options <Object>
    • signal <AbortSignal> abgebrochen, wenn der Stream zerstört wird, wodurch der fn-Aufruf frühzeitig abgebrochen werden kann.
  • options <Object>

    • concurrency <number> die maximale Anzahl gleichzeitiger Aufrufe von fn, die gleichzeitig im Stream aufgerufen werden. Standard: 1.
    • signal <AbortSignal> ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
  • Gibt zurück: <Promise> ein Promise, wenn der Stream abgeschlossen ist.

Diese Methode ermöglicht die Iteration eines Streams. Für jeden Chunk im Stream wird die Funktion fn aufgerufen. Wenn die Funktion fn ein Promise zurückgibt, wird dieses Promise awaited.

Diese Methode unterscheidet sich von for await...of-Schleifen darin, dass sie optional Chunks gleichzeitig verarbeiten kann. Darüber hinaus kann eine forEach-Iteration nur durch Übergabe einer signal-Option und das Abbrechen des zugehörigen AbortController gestoppt werden, während for await...of mit break oder return gestoppt werden kann. In beiden Fällen wird der Stream zerstört.

Diese Methode unterscheidet sich vom Abhören des Ereignisses 'data' dadurch, dass sie das readable-Ereignis im zugrunde liegenden Mechanismus verwendet und die Anzahl gleichzeitiger fn-Aufrufe begrenzen kann.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

// Mit einem synchronen Prädikat.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter(x => x > 2)) {
  console.log(chunk) // 3, 4
}
// Mit einem asynchronen Prädikat, wobei maximal 2 Abfragen gleichzeitig durchgeführt werden.
const resolver = new Resolver()
const dnsResults = Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org']).map(
  async domain => {
    const { address } = await resolver.resolve4(domain, { ttl: true })
    return address
  },
  { concurrency: 2 }
)
await dnsResults.forEach(result => {
  // Protokolliert das Ergebnis, ähnlich wie `for await (const result of dnsResults)`
  console.log(result)
})
console.log('fertig') // Stream ist abgeschlossen
readable.toArray([options])

Hinzugefügt in: v17.5.0, v16.15.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

  • options <Object>

    • signal <AbortSignal> erlaubt das Abbrechen der toArray-Operation, wenn das Signal abgebrochen wird.
  • Gibt zurück: <Promise> ein Promise, das ein Array mit dem Inhalt des Streams enthält.

Diese Methode ermöglicht es, den Inhalt eines Streams einfach zu erhalten.

Da diese Methode den gesamten Stream in den Speicher liest, werden die Vorteile von Streams negiert. Sie ist für die Interoperabilität und Bequemlichkeit gedacht, nicht als die primäre Methode zum Verbrauchen von Streams.

js
import { Readable } from 'node:stream'
import { Resolver } from 'node:dns/promises'

await Readable.from([1, 2, 3, 4]).toArray() // [1, 2, 3, 4]

// DNS-Abfragen gleichzeitig mit .map durchführen und
// die Ergebnisse mit toArray in einem Array sammeln
const dnsResults = await Readable.from(['nodejs.org', 'openjsf.org', 'www.linuxfoundation.org'])
  .map(
    async domain => {
      const { address } = await resolver.resolve4(domain, { ttl: true })
      return address
    },
    { concurrency: 2 }
  )
  .toArray()
readable.some(fn[, options])

Hinzugefügt in: v17.5.0, v16.15.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

  • fn <Function> | <AsyncFunction> eine Funktion, die auf jedem Chunk des Streams aufgerufen wird.

    • data <any> ein Chunk von Daten aus dem Stream.
    • options <Object>
    • signal <AbortSignal> wird abgebrochen, wenn der Stream zerstört wird, wodurch der fn-Aufruf frühzeitig abgebrochen werden kann.
  • options <Object>

    • concurrency <number> die maximale Anzahl gleichzeitiger Aufrufe von fn auf den Stream auf einmal. Standard: 1.
    • signal <AbortSignal> erlaubt das Zerstören des Streams, wenn das Signal abgebrochen wird.
  • Gibt zurück: <Promise> ein Promise, das zu true ausgewertet wird, wenn fn für mindestens einen der Chunks einen Truthy-Wert zurückgegeben hat.

Diese Methode ähnelt Array.prototype.some und ruft fn für jeden Chunk im Stream auf, bis der erwartete Rückgabewert true ist (oder ein beliebiger Truthy-Wert). Sobald ein fn-Aufruf für einen Chunk als erwarteter Rückgabewert Truthy ist, wird der Stream zerstört und das Promise mit true erfüllt. Wenn keiner der fn-Aufrufe für die Chunks einen Truthy-Wert zurückgibt, wird das Promise mit false erfüllt.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// Mit einem synchronen Prädikat.
await Readable.from([1, 2, 3, 4]).some(x => x > 2) // true
await Readable.from([1, 2, 3, 4]).some(x => x < 0) // false

// Mit einem asynchronen Prädikat, das maximal 2 Dateiprüfungen gleichzeitig durchführt.
const anyBigFile = await Readable.from(['file1', 'file2', 'file3']).some(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
console.log(anyBigFile) // `true`, wenn eine Datei in der Liste größer als 1 MB ist
console.log('fertig') // Stream ist beendet
readable.find(fn[, options])

Hinzugefügt in: v17.5.0, v16.17.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

  • fn <Funktion> | <AsyncFunction> Eine Funktion, die für jeden Chunk des Streams aufgerufen wird.

    • data <any> Ein Daten-Chunk aus dem Stream.
    • options <Object>
    • signal <AbortSignal> wird abgebrochen, wenn der Stream zerstört wird, was einen frühen Abbruch des fn-Aufrufs ermöglicht.
  • options <Object>

    • concurrency <number> Die maximale Anzahl gleichzeitiger Aufrufe von fn im Stream. Standard: 1.
    • signal <AbortSignal> ermöglicht die Zerstörung des Streams, wenn das Signal abgebrochen wird.
  • Gibt zurück: <Promise> Ein Promise, das zu dem ersten Chunk ausgewertet wird, für den fn einen "truthy" Wert ergibt, oder undefined, wenn kein Element gefunden wurde.

Diese Methode ist ähnlich wie Array.prototype.find und ruft fn für jeden Chunk im Stream auf, um einen Chunk mit einem "truthy" Wert für fn zu finden. Sobald der erwartete Rückgabewert eines fn-Aufrufs "truthy" ist, wird der Stream zerstört und das Promise wird mit dem Wert erfüllt, für den fn einen "truthy" Wert zurückgegeben hat. Wenn alle fn-Aufrufe für die Chunks einen "falsy" Wert zurückgeben, wird das Promise mit undefined erfüllt.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// Mit einem synchronen Prädikat.
await Readable.from([1, 2, 3, 4]).find(x => x > 2) // 3
await Readable.from([1, 2, 3, 4]).find(x => x > 0) // 1
await Readable.from([1, 2, 3, 4]).find(x => x > 10) // undefined

// Mit einem asynchronen Prädikat, wobei maximal 2 Dateiüberprüfungen gleichzeitig durchgeführt werden.
const foundBigFile = await Readable.from(['file1', 'file2', 'file3']).find(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
console.log(foundBigFile) // Dateiname einer großen Datei, falls eine Datei in der Liste größer als 1 MB ist
console.log('done') // Stream ist beendet
readable.every(fn[, options])

Hinzugefügt in: v17.5.0, v16.15.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

  • fn <Funktion> | <AsyncFunction> Eine Funktion, die für jeden Chunk des Streams aufgerufen wird.

    • data <beliebig> Ein Chunk von Daten aus dem Stream.
    • options <Objekt>
    • signal <AbortSignal> wird abgebrochen, wenn der Stream zerstört wird, wodurch der fn-Aufruf frühzeitig abgebrochen werden kann.
  • options <Objekt>

    • concurrency <number> Die maximale Anzahl gleichzeitiger Aufrufe von fn, die gleichzeitig für den Stream aufgerufen werden sollen. Standard: 1.
    • signal <AbortSignal> ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
  • Gibt zurück: <Promise> Eine Promise, die zu true ausgewertet wird, wenn fn für alle Chunks einen Truthy-Wert zurückgegeben hat.

Diese Methode ähnelt Array.prototype.every und ruft fn für jeden Chunk im Stream auf, um zu überprüfen, ob alle erwarteten Rückgabewerte für fn einen Truthy-Wert haben. Sobald ein fn-Aufruf für einen Chunk einen Falsy-Wert zurückgibt, wird der Stream zerstört und die Promise wird mit false erfüllt. Wenn alle fn-Aufrufe für die Chunks einen Truthy-Wert zurückgeben, wird die Promise mit true erfüllt.

js
import { Readable } from 'node:stream'
import { stat } from 'node:fs/promises'

// Mit einem synchronen Prädikat.
await Readable.from([1, 2, 3, 4]).every(x => x > 2) // false
await Readable.from([1, 2, 3, 4]).every(x => x > 0) // true

// Mit einem asynchronen Prädikat, das maximal 2 Dateiüberprüfungen gleichzeitig durchführt.
const allBigFiles = await Readable.from(['file1', 'file2', 'file3']).every(
  async fileName => {
    const stats = await stat(fileName)
    return stats.size > 1024 * 1024
  },
  { concurrency: 2 }
)
// `true`, wenn alle Dateien in der Liste größer als 1 MiB sind
console.log(allBigFiles)
console.log('done') // Stream wurde beendet
readable.flatMap(fn[, options])

Hinzugefügt in: v17.5.0, v16.15.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

  • fn <Funktion> | <AsyncGeneratorFunction> | <AsyncFunction> Eine Funktion, die auf jeden Chunk im Stream angewendet wird.

    • data <any> Ein Datenchunk aus dem Stream.
    • options <Object>
    • signal <AbortSignal> wird abgebrochen, wenn der Stream zerstört wird, sodass der fn-Aufruf frühzeitig abgebrochen werden kann.
  • options <Object>

    • concurrency <number> Die maximale Anzahl gleichzeitiger Aufrufe von fn auf dem Stream. Standard: 1.
    • signal <AbortSignal> ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
  • Gibt zurück: <Readable> Ein Stream, der mit der Funktion fn flat-mapped wird.

Diese Methode gibt einen neuen Stream zurück, indem der angegebene Callback auf jeden Chunk des Streams angewendet und das Ergebnis dann vereinfacht wird.

Es ist möglich, einen Stream oder ein anderes iterierbares oder asynchron iterierbares Objekt von fn zurückzugeben, und die resultierenden Streams werden in den zurückgegebenen Stream zusammengeführt (vereinfacht).

js
import { Readable } from 'node:stream'
import { createReadStream } from 'node:fs'

// Mit einem synchronen Mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap(x => [x, x])) {
  console.log(chunk) // 1, 1, 2, 2, 3, 3, 4, 4
}
// Mit einem asynchronen Mapper, kombiniere den Inhalt von 4 Dateien
const concatResult = Readable.from(['./1.mjs', './2.mjs', './3.mjs', './4.mjs']).flatMap(fileName =>
  createReadStream(fileName)
)
for await (const result of concatResult) {
  // Dies enthält den Inhalt (alle Chunks) aller 4 Dateien
  console.log(result)
}
readable.drop(limit[, options])

Hinzugefügt in: v17.5.0, v16.15.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

  • limit <number> Die Anzahl der Chunks, die vom Readable entfernt werden sollen.

  • options <Object>

    • signal <AbortSignal> Ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
  • Gibt zurück: <Readable> Ein Stream, bei dem limit Chunks entfernt wurden.

Diese Methode gibt einen neuen Stream zurück, bei dem die ersten limit Chunks entfernt wurden.

js
import { Readable } from 'node:stream'

await Readable.from([1, 2, 3, 4]).drop(2).toArray() // [3, 4]
readable.take(limit[, options])

Hinzugefügt in: v17.5.0, v16.15.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

  • limit <number> Die Anzahl der Chunks, die vom Readable genommen werden sollen.

  • options <Object>

    • signal <AbortSignal> Ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
  • Gibt zurück: <Readable> Ein Stream, bei dem limit Chunks genommen wurden.

Diese Methode gibt einen neuen Stream zurück, der die ersten limit Chunks enthält.

js
import { Readable } from 'node:stream'

await Readable.from([1, 2, 3, 4]).take(2).toArray() // [1, 2]
readable.reduce(fn[, initial[, options]])

Hinzugefügt in: v17.5.0, v16.15.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

  • fn <Function> | <AsyncFunction> Eine Reduzierfunktion, die über jeden Chunk im Stream aufgerufen wird.

    • previous <any> Der Wert, der vom letzten Aufruf von fn erhalten wurde, oder der initial-Wert, falls angegeben, oder der erste Chunk des Streams ansonsten.
    • data <any> Ein Daten-Chunk aus dem Stream.
    • options <Object>
    • signal <AbortSignal> Wird abgebrochen, wenn der Stream zerstört wird, wodurch der fn-Aufruf frühzeitig abgebrochen werden kann.
  • initial <any> Der Anfangswert, der bei der Reduzierung verwendet werden soll.

  • options <Object>

    • signal <AbortSignal> Ermöglicht das Zerstören des Streams, wenn das Signal abgebrochen wird.
  • Gibt zurück: <Promise> Ein Promise für den Endwert der Reduzierung.

Diese Methode ruft fn für jeden Chunk des Streams der Reihe nach auf und übergibt das Ergebnis der Berechnung für das vorherige Element. Sie gibt ein Promise für den Endwert der Reduzierung zurück.

Wenn kein initial-Wert angegeben wird, wird der erste Chunk des Streams als Anfangswert verwendet. Wenn der Stream leer ist, wird das Promise mit einem TypeError mit der Code-Eigenschaft ERR_INVALID_ARGS abgelehnt.

js
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'

const directoryPath = './src'
const filesInDir = await readdir(directoryPath)

const folderSize = await Readable.from(filesInDir).reduce(async (totalSize, file) => {
  const { size } = await stat(join(directoryPath, file))
  return totalSize + size
}, 0)

console.log(folderSize)

Die Reduzierfunktion iteriert die Stream-Elemente einzeln, was bedeutet, dass es keinen concurrency-Parameter oder Parallelismus gibt. Um ein reduce gleichzeitig auszuführen, können Sie die Async-Funktion in die readable.map Methode extrahieren.

js
import { Readable } from 'node:stream'
import { readdir, stat } from 'node:fs/promises'
import { join } from 'node:path'

const directoryPath = './src'
const filesInDir = await readdir(directoryPath)

const folderSize = await Readable.from(filesInDir)
  .map(file => stat(join(directoryPath, file)), { concurrency: 2 })
  .reduce((totalSize, { size }) => totalSize + size, 0)

console.log(folderSize)

Duplex- und Transformations-Streams

Klasse: stream.Duplex

[Verlauf]

VersionÄnderungen
v6.8.0Instanzen von Duplex geben jetzt true zurück, wenn instanceof stream.Writable überprüft wird.
v0.9.4Hinzugefügt in: v0.9.4

Duplex-Streams sind Streams, die sowohl die Readable- als auch die Writable-Schnittstelle implementieren.

Beispiele für Duplex-Streams sind:

duplex.allowHalfOpen

Hinzugefügt in: v0.9.4

Wenn false, dann beendet der Stream automatisch die beschreibbare Seite, wenn die lesbare Seite endet. Wird anfänglich durch die allowHalfOpen-Konstruktoroption gesetzt, die standardmäßig true ist.

Dies kann manuell geändert werden, um das Halb-Offen-Verhalten einer bestehenden Duplex-Stream-Instanz zu ändern, muss aber geändert werden, bevor das 'end'-Ereignis ausgegeben wird.

Klasse: stream.Transform

Hinzugefügt in: v0.9.4

Transformations-Streams sind Duplex-Streams, bei denen die Ausgabe in irgendeiner Weise mit der Eingabe zusammenhängt. Wie alle Duplex-Streams implementieren Transform-Streams sowohl die Readable- als auch die Writable-Schnittstelle.

Beispiele für Transform-Streams sind:

transform.destroy([error])

[Verlauf]

VersionÄnderungen
v14.0.0Funktioniert als No-Op bei einem Stream, der bereits zerstört wurde.
v8.0.0Hinzugefügt in: v8.0.0

Zerstört den Stream und gibt optional ein 'error'-Ereignis aus. Nach diesem Aufruf würde der Transformationsstream alle internen Ressourcen freigeben. Implementierer sollten diese Methode nicht überschreiben, sondern stattdessen readable._destroy() implementieren. Die Standardimplementierung von _destroy() für Transform gibt auch 'close' aus, es sei denn, emitClose ist auf false gesetzt.

Sobald destroy() aufgerufen wurde, sind alle weiteren Aufrufe ein No-Op, und es können keine weiteren Fehler außer von _destroy() als 'error' ausgegeben werden.

stream.duplexPair([options])

Hinzugefügt in: v22.6.0, v20.17.0

  • options <Object> Ein Wert, der an beide Duplex-Konstruktoren übergeben wird, um Optionen wie die Pufferung festzulegen.
  • Gibt zurück: <Array> mit zwei Duplex-Instanzen.

Die Hilfsfunktion duplexPair gibt ein Array mit zwei Elementen zurück, wobei jedes ein Duplex-Stream ist, der mit der anderen Seite verbunden ist:

js
const [sideA, sideB] = duplexPair()

Was auch immer in einen Stream geschrieben wird, wird auf dem anderen lesbar gemacht. Es bietet ein Verhalten analog zu einer Netzwerkverbindung, bei der die vom Client geschriebenen Daten für den Server lesbar werden und umgekehrt.

Die Duplex-Streams sind symmetrisch; der eine oder der andere kann ohne Unterschied im Verhalten verwendet werden.

stream.finished(stream[, options], callback)

[Verlauf]

VersionÄnderungen
v19.5.0Unterstützung für ReadableStream und WritableStream hinzugefügt.
v15.11.0Die Option signal wurde hinzugefügt.
v14.0.0finished(stream, cb) wartet auf das Ereignis 'close', bevor der Callback aufgerufen wird. Die Implementierung versucht, Legacy-Streams zu erkennen und dieses Verhalten nur auf Streams anzuwenden, von denen erwartet wird, dass sie 'close' ausgeben.
v14.0.0Das Ausgeben von 'close' vor 'end' auf einem Readable-Stream führt zu einem ERR_STREAM_PREMATURE_CLOSE-Fehler.
v14.0.0Der Callback wird auf Streams aufgerufen, die bereits vor dem Aufruf von finished(stream, cb) beendet wurden.
v10.0.0Hinzugefügt in: v10.0.0
  • stream <Stream> | <ReadableStream> | <WritableStream> Ein lesbarer und/oder schreibbarer Stream/Webstream.
  • options <Object>
    • error <boolean> Wenn auf false gesetzt, wird ein Aufruf von emit('error', err) nicht als beendet betrachtet. Standard: true.
    • readable <boolean> Wenn auf false gesetzt, wird der Callback aufgerufen, wenn der Stream endet, auch wenn der Stream möglicherweise noch lesbar ist. Standard: true.
    • writable <boolean> Wenn auf false gesetzt, wird der Callback aufgerufen, wenn der Stream endet, auch wenn der Stream möglicherweise noch beschreibbar ist. Standard: true.
    • signal <AbortSignal> ermöglicht das Abbrechen des Wartens auf das Ende des Streams. Der zugrunde liegende Stream wird nicht abgebrochen, wenn das Signal abgebrochen wird. Der Callback wird mit einem AbortError aufgerufen. Alle registrierten Listener, die von dieser Funktion hinzugefügt wurden, werden ebenfalls entfernt.
  • callback <Function> Eine Callback-Funktion, die ein optionales Fehlerargument entgegennimmt.
  • Gibt zurück: <Function> Eine Bereinigungsfunktion, die alle registrierten Listener entfernt.

Eine Funktion, um benachrichtigt zu werden, wenn ein Stream nicht mehr lesbar, beschreibbar ist oder einen Fehler oder ein vorzeitiges Close-Ereignis erfahren hat.

js
const { finished } = require('node:stream')
const fs = require('node:fs')

const rs = fs.createReadStream('archive.tar')

finished(rs, err => {
  if (err) {
    console.error('Stream fehlgeschlagen.', err)
  } else {
    console.log('Stream ist mit dem Lesen fertig.')
  }
})

rs.resume() // Den Stream entleeren.

Besonders nützlich in Fehlerbehandlungsszenarien, in denen ein Stream vorzeitig zerstört wird (wie eine abgebrochene HTTP-Anfrage) und nicht 'end' oder 'finish' ausgibt.

Die finished-API bietet eine Promise-Version.

stream.finished() hinterlässt hängende Event-Listener (insbesondere 'error', 'end', 'finish' und 'close'), nachdem callback aufgerufen wurde. Der Grund dafür ist, dass unerwartete 'error'-Ereignisse (aufgrund falscher Stream-Implementierungen) keine unerwarteten Abstürze verursachen. Wenn dies unerwünschtes Verhalten ist, muss die zurückgegebene Bereinigungsfunktion im Callback aufgerufen werden:

js
const cleanup = finished(rs, err => {
  cleanup()
  // ...
})

stream.pipeline(source[, ...transforms], destination, callback)

stream.pipeline(streams, callback)

[Verlauf]

VersionÄnderungen
v19.7.0, v18.16.0Unterstützung für Webstreams hinzugefügt.
v18.0.0Das Übergeben eines ungültigen Callbacks an das callback-Argument wirft nun ERR_INVALID_ARG_TYPE anstelle von ERR_INVALID_CALLBACK.
v14.0.0Der pipeline(..., cb) wartet auf das 'close'-Ereignis, bevor der Callback aufgerufen wird. Die Implementierung versucht, Legacy-Streams zu erkennen und dieses Verhalten nur auf Streams anzuwenden, von denen erwartet wird, dass sie 'close' ausgeben.
v13.10.0Unterstützung für asynchrone Generatoren hinzugefügt.
v10.0.0Hinzugefügt in: v10.0.0

Eine Modulmethode zum Pipe-Verbinden zwischen Streams und Generatoren, die Fehler weiterleitet und ordnungsgemäß bereinigt und einen Callback bereitstellt, wenn die Pipeline abgeschlossen ist.

js
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')

// Verwenden Sie die Pipeline-API, um auf einfache Weise eine Reihe von Streams
// zusammen zu verbinden und benachrichtigt zu werden, wenn die Pipeline vollständig abgeschlossen ist.

// Eine Pipeline zum effizienten Gzippen einer potenziell riesigen Tar-Datei:

pipeline(fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), err => {
  if (err) {
    console.error('Pipeline fehlgeschlagen.', err)
  } else {
    console.log('Pipeline erfolgreich.')
  }
})

Die pipeline-API bietet eine Promise-Version.

stream.pipeline() ruft stream.destroy(err) auf allen Streams auf, außer:

  • Readable-Streams, die 'end' oder 'close' ausgegeben haben.
  • Writable-Streams, die 'finish' oder 'close' ausgegeben haben.

stream.pipeline() hinterlässt hängende Event-Listener auf den Streams, nachdem der callback aufgerufen wurde. Im Falle der Wiederverwendung von Streams nach einem Fehler kann dies zu Event-Listener-Lecks und verschluckten Fehlern führen. Wenn der letzte Stream lesbar ist, werden hängende Event-Listener entfernt, sodass der letzte Stream später verbraucht werden kann.

stream.pipeline() schließt alle Streams, wenn ein Fehler auftritt. Die IncomingRequest-Nutzung mit pipeline könnte zu einem unerwarteten Verhalten führen, da der Socket zerstört würde, ohne die erwartete Antwort zu senden. Siehe das Beispiel unten:

js
const fs = require('node:fs')
const http = require('node:http')
const { pipeline } = require('node:stream')

const server = http.createServer((req, res) => {
  const fileStream = fs.createReadStream('./fileNotExist.txt')
  pipeline(fileStream, res, err => {
    if (err) {
      console.log(err) // Keine solche Datei
      // Diese Meldung kann nicht gesendet werden, da `pipeline` den Socket bereits zerstört hat.
      return res.end('Fehler!!!')
    }
  })
})

stream.compose(...streams)

[History]

VersionÄnderungen
v21.1.0, v20.10.0Unterstützung für die Stream-Klasse hinzugefügt.
v19.8.0, v18.16.0Unterstützung für Webstreams hinzugefügt.
v16.9.0Hinzugefügt in: v16.9.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - stream.compose ist experimentell.

Kombiniert zwei oder mehr Streams zu einem Duplex-Stream, der in den ersten Stream schreibt und aus dem letzten liest. Jeder bereitgestellte Stream wird mit stream.pipeline in den nächsten geleitet. Wenn einer der Streams einen Fehler aufweist, werden alle zerstört, einschließlich des äußeren Duplex-Streams.

Da stream.compose einen neuen Stream zurückgibt, der wiederum in andere Streams geleitet werden kann (und sollte), ermöglicht er die Komposition. Im Gegensatz dazu bilden die Streams, wenn sie an stream.pipeline übergeben werden, typischerweise einen geschlossenen Kreislauf, wobei der erste Stream ein lesbarer Stream und der letzte ein beschreibbarer Stream ist.

Wenn eine Function übergeben wird, muss es sich um eine Factory-Methode handeln, die eine source Iterable entgegennimmt.

js
import { compose, Transform } from 'node:stream'

const removeSpaces = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''))
  },
})

async function* toUpper(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
}

let res = ''
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
  res += buf
}

console.log(res) // gibt 'HELLOWORLD' aus

stream.compose kann verwendet werden, um asynchrone Iterables, Generatoren und Funktionen in Streams zu konvertieren.

  • AsyncIterable wandelt sich in einen lesbaren Duplex um. Darf nicht null liefern.
  • AsyncGeneratorFunction wandelt sich in einen lesbaren/beschreibbaren Transform Duplex um. Muss eine AsyncIterable-Quelle als ersten Parameter entgegennehmen. Darf nicht null liefern.
  • AsyncFunction wandelt sich in einen beschreibbaren Duplex um. Muss entweder null oder undefined zurückgeben.
js
import { compose } from 'node:stream'
import { finished } from 'node:stream/promises'

// Konvertiert AsyncIterable in lesbaren Duplex.
const s1 = compose(
  (async function* () {
    yield 'Hallo'
    yield 'Welt'
  })()
)

// Konvertiert AsyncGenerator in Transform Duplex.
const s2 = compose(async function* (source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase()
  }
})

let res = ''

// Konvertiert AsyncFunction in beschreibbaren Duplex.
const s3 = compose(async function (source) {
  for await (const chunk of source) {
    res += chunk
  }
})

await finished(compose(s1, s2, s3))

console.log(res) // gibt 'HELLOWORLD' aus

Siehe readable.compose(stream) für stream.compose als Operator.

stream.Readable.from(iterable[, options])

Hinzugefügt in: v12.3.0, v10.17.0

  • iterable <Iterable> Objekt, das das Iterable-Protokoll Symbol.asyncIterator oder Symbol.iterator implementiert. Gibt ein 'error'-Ereignis aus, wenn ein Nullwert übergeben wird.
  • options <Object> Optionen, die an new stream.Readable([options]) übergeben werden. Standardmäßig setzt Readable.from() options.objectMode auf true, es sei denn, dies wird explizit durch Setzen von options.objectMode auf false abgewählt.
  • Gibt zurück: <stream.Readable>

Eine Hilfsmethode zum Erstellen von lesbaren Streams aus Iteratoren.

js
const { Readable } = require('node:stream')

async function* generate() {
  yield 'hallo'
  yield 'streams'
}

const readable = Readable.from(generate())

readable.on('data', chunk => {
  console.log(chunk)
})

Das Aufrufen von Readable.from(string) oder Readable.from(buffer) führt nicht dazu, dass die Strings oder Puffer iteriert werden, um aus Leistungsgründen der Semantik der anderen Streams zu entsprechen.

Wenn ein Iterable-Objekt, das Promises enthält, als Argument übergeben wird, kann dies zu einer unbehandelten Ablehnung führen.

js
const { Readable } = require('node:stream')

Readable.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unbehandelte Ablehnung
])

stream.Readable.fromWeb(readableStream[, options])

Hinzugefügt in: v17.0.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

stream.Readable.isDisturbed(stream)

Hinzugefügt in: v16.8.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

Gibt zurück, ob von dem Stream gelesen oder er abgebrochen wurde.

stream.isErrored(stream)

Hinzugefügt in: v17.3.0, v16.14.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

Gibt zurück, ob im Stream ein Fehler aufgetreten ist.

stream.isReadable(stream)

Hinzugefügt in: v17.4.0, v16.14.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

Gibt zurück, ob der Stream lesbar ist.

stream.Readable.toWeb(streamReadable[, options])

Hinzugefügt in: v17.0.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

  • streamReadable <stream.Readable>

  • options <Object>

    • strategy <Object>
    • highWaterMark <number> Die maximale interne Warteschlangengröße (des erstellten ReadableStream), bevor bei Lesevorgängen aus dem angegebenen stream.Readable Gegendruck angewendet wird. Wenn kein Wert angegeben wird, wird er von dem angegebenen stream.Readable übernommen.
    • size <Function> Eine Funktion, welche die Größe des angegebenen Datenchunks ermittelt. Wenn kein Wert angegeben wird, ist die Größe für alle Chunks 1.
    • chunk <any>
    • Gibt zurück: <number>
  • Gibt zurück: <ReadableStream>

stream.Writable.fromWeb(writableStream[, options])

Hinzugefügt in: v17.0.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

stream.Writable.toWeb(streamWritable)

Hinzugefügt in: v17.0.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

stream.Duplex.from(src)

[Verlauf]

VersionÄnderungen
v19.5.0, v18.17.0Das Argument src kann jetzt ein ReadableStream oder WritableStream sein.
v16.8.0Hinzugefügt in: v16.8.0

Eine Hilfsmethode zum Erstellen von Duplex-Streams.

  • Stream konvertiert einen beschreibbaren Stream in einen beschreibbaren Duplex und einen lesbaren Stream in einen Duplex.
  • Blob konvertiert in einen lesbaren Duplex.
  • string konvertiert in einen lesbaren Duplex.
  • ArrayBuffer konvertiert in einen lesbaren Duplex.
  • AsyncIterable konvertiert in einen lesbaren Duplex. Kann nicht null ergeben.
  • AsyncGeneratorFunction konvertiert in einen lesbaren/beschreibbaren transformierenden Duplex. Muss ein AsyncIterable als erste Quelle verwenden. Kann nicht null ergeben.
  • AsyncFunction konvertiert in einen beschreibbaren Duplex. Muss entweder null oder undefined zurückgeben.
  • Object ({ writable, readable }) konvertiert readable und writable in Stream und kombiniert sie dann zu Duplex, wobei Duplex in writable schreibt und von readable liest.
  • Promise konvertiert in einen lesbaren Duplex. Der Wert null wird ignoriert.
  • ReadableStream konvertiert in einen lesbaren Duplex.
  • WritableStream konvertiert in einen beschreibbaren Duplex.
  • Gibt zurück: <stream.Duplex>

Wenn ein Iterable-Objekt, das Promises enthält, als Argument übergeben wird, kann dies zu einer nicht behandelten Ablehnung führen.

js
const { Duplex } = require('node:stream')

Duplex.from([
  new Promise(resolve => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Nicht behandelte Ablehnung
])

stream.Duplex.fromWeb(pair[, options])

Hinzugefügt in: v17.0.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

js
import { Duplex } from 'node:stream'
import { ReadableStream, WritableStream } from 'node:stream/web'

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')

for await (const chunk of duplex) {
  console.log('readable', chunk)
}
js
const { Duplex } = require('node:stream')
const { ReadableStream, WritableStream } = require('node:stream/web')

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world')
  },
})

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk)
  },
})

const pair = {
  readable,
  writable,
}
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true })

duplex.write('hello')
duplex.once('readable', () => console.log('readable', duplex.read()))

stream.Duplex.toWeb(streamDuplex)

Hinzugefügt in: v17.0.0

[Stabil: 1 - Experimentell]

Stabil: 1 Stabilität: 1 - Experimentell

js
import { Duplex } from 'node:stream'

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

const { value } = await readable.getReader().read()
console.log('readable', value)
js
const { Duplex } = require('node:stream')

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world')
    this.push(null)
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk)
    callback()
  },
})

const { readable, writable } = Duplex.toWeb(duplex)
writable.getWriter().write('hello')

readable
  .getReader()
  .read()
  .then(result => {
    console.log('readable', result.value)
  })

stream.addAbortSignal(signal, stream)

[Verlauf]

VersionÄnderungen
v19.7.0, v18.16.0Unterstützung für ReadableStream und WritableStream hinzugefügt.
v15.4.0Hinzugefügt in: v15.4.0

Hängt ein AbortSignal an einen lesbaren oder beschreibbaren Stream an. Dies ermöglicht es dem Code, die Stream-Zerstörung mithilfe eines AbortController zu steuern.

Das Aufrufen von abort auf dem AbortController, das dem übergebenen AbortSignal entspricht, verhält sich auf die gleiche Weise wie das Aufrufen von .destroy(new AbortError()) auf dem Stream und controller.error(new AbortError()) für Webstreams.

js
const fs = require('node:fs')

const controller = new AbortController()
const read = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
// Später, brechen Sie den Vorgang ab, indem Sie den Stream schließen
controller.abort()

Oder die Verwendung eines AbortSignal mit einem lesbaren Stream als asynchrone Iterable:

js
const controller = new AbortController()
setTimeout(() => controller.abort(), 10_000) // ein Timeout setzen
const stream = addAbortSignal(controller.signal, fs.createReadStream('object.json'))
;(async () => {
  try {
    for await (const chunk of stream) {
      await process(chunk)
    }
  } catch (e) {
    if (e.name === 'AbortError') {
      // Der Vorgang wurde abgebrochen
    } else {
      throw e
    }
  }
})()

Oder die Verwendung eines AbortSignal mit einem ReadableStream:

js
const controller = new AbortController()
const rs = new ReadableStream({
  start(controller) {
    controller.enqueue('hello')
    controller.enqueue('world')
    controller.close()
  },
})

addAbortSignal(controller.signal, rs)

finished(rs, err => {
  if (err) {
    if (err.name === 'AbortError') {
      // Der Vorgang wurde abgebrochen
    }
  }
})

const reader = rs.getReader()

reader.read().then(({ value, done }) => {
  console.log(value) // hello
  console.log(done) // false
  controller.abort()
})

stream.getDefaultHighWaterMark(objectMode)

Hinzugefügt in: v19.9.0, v18.17.0

Gibt den Standardwert für highWaterMark zurück, der von Streams verwendet wird. Standardmäßig ist dies 65536 (64 KiB) oder 16 für objectMode.

stream.setDefaultHighWaterMark(objectMode, value)

Hinzugefügt in: v19.9.0, v18.17.0

Setzt den Standardwert für highWaterMark, der von Streams verwendet wird.

API für Stream-Implementierer

Die node:stream-Modul-API wurde so konzipiert, dass es einfach ist, Streams mit dem prototypischen Vererbungsmodell von JavaScript zu implementieren.

Zuerst deklariert ein Stream-Entwickler eine neue JavaScript-Klasse, die eine der vier grundlegenden Stream-Klassen (stream.Writable, stream.Readable, stream.Duplex oder stream.Transform) erweitert und stellt sicher, dass er den entsprechenden Konstruktor der übergeordneten Klasse aufruft:

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor({ highWaterMark, ...options }) {
    super({ highWaterMark })
    // ...
  }
}

Denken Sie beim Erweitern von Streams daran, welche Optionen der Benutzer bereitstellen kann und sollte, bevor Sie diese an den Basiskonstruktor weiterleiten. Wenn die Implementierung beispielsweise Annahmen in Bezug auf die Optionen autoDestroy und emitClose trifft, erlauben Sie dem Benutzer nicht, diese zu überschreiben. Seien Sie explizit, welche Optionen weitergeleitet werden, anstatt implizit alle Optionen weiterzuleiten.

Die neue Stream-Klasse muss dann eine oder mehrere spezifische Methoden implementieren, abhängig vom Typ des zu erstellenden Streams, wie in der folgenden Tabelle aufgeführt:

AnwendungsfallKlasseZu implementierende Methode(n)
Nur lesenReadable_read()
Nur schreibenWritable_write() , _writev() , _final()
Lesen und SchreibenDuplex_read() , _write() , _writev() , _final()
Auf geschriebene Daten einwirken und dann das Ergebnis lesenTransform_transform() , _flush() , _final()

Der Implementierungscode für einen Stream sollte niemals die "öffentlichen" Methoden eines Streams aufrufen, die für die Verwendung durch Konsumenten vorgesehen sind (wie im Abschnitt API für Stream-Konsumenten beschrieben). Dies kann zu unerwünschten Nebenwirkungen im Anwendungscode führen, der den Stream konsumiert.

Vermeiden Sie das Überschreiben öffentlicher Methoden wie write(), end(), cork(), uncork(), read() und destroy() oder das Ausgeben interner Ereignisse wie 'error', 'data', 'end', 'finish' und 'close' über .emit(). Dies kann aktuelle und zukünftige Stream-Invarianten aufbrechen, was zu Verhaltens- und/oder Kompatibilitätsproblemen mit anderen Streams, Stream-Dienstprogrammen und Benutzererwartungen führt.

Vereinfachte Konstruktion

Hinzugefügt in: v1.2.0

In vielen einfachen Fällen ist es möglich, einen Stream zu erstellen, ohne auf Vererbung angewiesen zu sein. Dies kann erreicht werden, indem direkt Instanzen der stream.Writable-, stream.Readable-, stream.Duplex- oder stream.Transform-Objekte erstellt und entsprechende Methoden als Konstruktoroptionen übergeben werden.

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  construct(callback) {
    // Zustand initialisieren und Ressourcen laden...
  },
  write(chunk, encoding, callback) {
    // ...
  },
  destroy() {
    // Ressourcen freigeben...
  },
})

Implementierung eines beschreibbaren Streams

Die Klasse stream.Writable wird erweitert, um einen Writable-Stream zu implementieren.

Benutzerdefinierte Writable-Streams müssen den Konstruktor new stream.Writable([options]) aufrufen und die Methode writable._write() und/oder writable._writev() implementieren.

new stream.Writable([options])

[Historie]

VersionÄnderungen
v22.0.0Standard highWaterMark erhöht.
v15.5.0Unterstützung für die Übergabe eines AbortSignals.
v14.0.0Die Standardeinstellung der Option autoDestroy wurde auf true geändert.
v11.2.0, v10.16.0Die Option autoDestroy hinzugefügt, um den Stream automatisch destroy() auszuführen, wenn er 'finish' ausgibt oder Fehler auftreten.
v10.0.0Die Option emitClose hinzugefügt, um anzugeben, ob beim Zerstören 'close' ausgegeben wird.
  • options <Object>
    • highWaterMark <number> Pufferpegel, wenn stream.write() beginnt, false zurückzugeben. Standard: 65536 (64 KiB) oder 16 für objectMode-Streams.
    • decodeStrings <boolean> Gibt an, ob an stream.write() übergebene strings in Buffers (mit der im stream.write()-Aufruf angegebenen Kodierung) kodiert werden sollen, bevor sie an stream._write() übergeben werden. Andere Datentypen werden nicht konvertiert (d. h. Buffers werden nicht in strings dekodiert). Wenn dieser Wert auf false gesetzt wird, wird die Konvertierung von strings verhindert. Standard: true.
    • defaultEncoding <string> Die Standardkodierung, die verwendet wird, wenn keine Kodierung als Argument für stream.write() angegeben wird. Standard: 'utf8'.
    • objectMode <boolean> Gibt an, ob stream.write(anyObj) eine gültige Operation ist. Wenn dies festgelegt ist, wird es möglich, andere JavaScript-Werte als String, <Buffer>, <TypedArray> oder <DataView> zu schreiben, wenn dies von der Stream-Implementierung unterstützt wird. Standard: false.
    • emitClose <boolean> Gibt an, ob der Stream 'close' ausgeben soll, nachdem er zerstört wurde. Standard: true.
    • write <Function> Implementierung für die Methode stream._write().
    • writev <Function> Implementierung für die Methode stream._writev().
    • destroy <Function> Implementierung für die Methode stream._destroy().
    • final <Function> Implementierung für die Methode stream._final().
    • construct <Function> Implementierung für die Methode stream._construct().
    • autoDestroy <boolean> Gibt an, ob dieser Stream nach dem Beenden automatisch .destroy() für sich selbst aufrufen soll. Standard: true.
    • signal <AbortSignal> Ein Signal, das eine mögliche Abbrechen darstellt.
js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  constructor(options) {
    // Ruft den Konstruktor stream.Writable() auf.
    super(options)
    // ...
  }
}

Oder bei Verwendung von Konstruktoren im Pre-ES6-Stil:

js
const { Writable } = require('node:stream')
const util = require('node:util')

function MyWritable(options) {
  if (!(this instanceof MyWritable)) return new MyWritable(options)
  Writable.call(this, options)
}
util.inherits(MyWritable, Writable)

Oder unter Verwendung des vereinfachten Konstruktoransatzes:

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
})

Das Aufrufen von abort auf dem AbortController, der dem übergebenen AbortSignal entspricht, verhält sich genauso wie der Aufruf von .destroy(new AbortError()) auf dem beschreibbaren Stream.

js
const { Writable } = require('node:stream')

const controller = new AbortController()
const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
  signal: controller.signal,
})
// Später wird der Vorgang abgebrochen und der Stream geschlossen
controller.abort()

writable._construct(callback)

Hinzugefügt in: v15.0.0

  • callback <Funktion> Rufen Sie diese Funktion (optional mit einem Fehlerargument) auf, wenn der Stream die Initialisierung abgeschlossen hat.

Die Methode _construct() DARF NICHT direkt aufgerufen werden. Sie kann von Kindklassen implementiert werden und wird, falls dies der Fall ist, nur von den internen Methoden der Klasse Writable aufgerufen.

Diese optionale Funktion wird in einem Tick aufgerufen, nachdem der Stream-Konstruktor zurückgekehrt ist, und verzögert alle Aufrufe von _write(), _final() und _destroy(), bis callback aufgerufen wird. Dies ist nützlich, um den Zustand zu initialisieren oder Ressourcen asynchron zu initialisieren, bevor der Stream verwendet werden kann.

js
const { Writable } = require('node:stream')
const fs = require('node:fs')

class WriteStream extends Writable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, 'w', (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _write(chunk, encoding, callback) {
    fs.write(this.fd, chunk, callback)
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

writable._write(chunk, encoding, callback)

[Verlauf]

VersionÄnderungen
v12.11.0_write() ist optional, wenn _writev() bereitgestellt wird.
  • chunk <Buffer> | <string> | <any> Der zu schreibende Buffer, der aus dem an stream.write() übergebenen string konvertiert wurde. Wenn die Option decodeStrings des Streams false ist oder der Stream im Objektmodus arbeitet, wird der Chunk nicht konvertiert und entspricht dem, was an stream.write() übergeben wurde.
  • encoding <string> Wenn der Chunk ein String ist, ist encoding die Zeichenkodierung dieses Strings. Wenn der Chunk ein Buffer ist oder der Stream im Objektmodus arbeitet, kann encoding ignoriert werden.
  • callback <Funktion> Rufen Sie diese Funktion (optional mit einem Fehlerargument) auf, wenn die Verarbeitung für den angegebenen Chunk abgeschlossen ist.

Alle Writable-Stream-Implementierungen müssen eine writable._write() - und/oder writable._writev()-Methode bereitstellen, um Daten an die zugrunde liegende Ressource zu senden.

Transform-Streams stellen ihre eigene Implementierung von writable._write() bereit.

Diese Funktion DARF NICHT direkt vom Anwendungscode aufgerufen werden. Sie sollte von Kindklassen implementiert und nur von den internen Methoden der Writable-Klasse aufgerufen werden.

Die callback-Funktion muss synchron innerhalb von writable._write() oder asynchron (d. h. unterschiedlicher Tick) aufgerufen werden, um entweder zu signalisieren, dass der Schreibvorgang erfolgreich abgeschlossen wurde oder mit einem Fehler fehlgeschlagen ist. Das erste Argument, das an den callback übergeben wird, muss das Error-Objekt sein, wenn der Aufruf fehlgeschlagen ist, oder null, wenn der Schreibvorgang erfolgreich war.

Alle Aufrufe von writable.write(), die zwischen dem Zeitpunkt des Aufrufs von writable._write() und dem Zeitpunkt des Aufrufs von callback auftreten, führen dazu, dass die geschriebenen Daten gepuffert werden. Wenn callback aufgerufen wird, kann der Stream ein 'drain'-Ereignis ausgeben. Wenn eine Stream-Implementierung in der Lage ist, mehrere Datenchunks gleichzeitig zu verarbeiten, sollte die Methode writable._writev() implementiert werden.

Wenn die Eigenschaft decodeStrings in den Konstruktoroptionen explizit auf false gesetzt wird, bleibt chunk dasselbe Objekt, das an .write() übergeben wird, und kann ein String anstatt eines Buffer sein. Dies dient zur Unterstützung von Implementierungen, die eine optimierte Behandlung für bestimmte String-Datenkodierungen haben. In diesem Fall gibt das Argument encoding die Zeichenkodierung des Strings an. Andernfalls kann das Argument encoding sicher ignoriert werden.

Die Methode writable._write() wird mit einem Unterstrich versehen, da sie intern für die Klasse ist, die sie definiert, und niemals direkt von Benutzerprogrammen aufgerufen werden sollte.

writable._writev(chunks, callback)

  • chunks <Object[]> Die zu schreibenden Daten. Der Wert ist ein Array von <Object>, die jeweils einen diskreten Datenblock darstellen, der geschrieben werden soll. Die Eigenschaften dieser Objekte sind:

    • chunk <Buffer> | <string> Eine Buffer-Instanz oder ein String, der die zu schreibenden Daten enthält. Der chunk ist ein String, wenn das Writable mit der Option decodeStrings auf false gesetzt wurde und ein String an write() übergeben wurde.
    • encoding <string> Die Zeichenkodierung des chunk. Wenn chunk ein Buffer ist, ist encoding 'buffer'.
  • callback <Function> Eine Callback-Funktion (optional mit einem Fehlerargument), die aufgerufen wird, wenn die Verarbeitung für die bereitgestellten Chunks abgeschlossen ist.

Diese Funktion DARF NICHT direkt vom Anwendungscode aufgerufen werden. Sie sollte von untergeordneten Klassen implementiert und nur von den internen Writable-Klassenmethoden aufgerufen werden.

Die Methode writable._writev() kann zusätzlich oder alternativ zu writable._write() in Stream-Implementierungen implementiert werden, die in der Lage sind, mehrere Datenblöcke gleichzeitig zu verarbeiten. Wenn implementiert und wenn gepufferte Daten aus vorherigen Schreibvorgängen vorhanden sind, wird _writev() anstelle von _write() aufgerufen.

Der Methode writable._writev() ist ein Unterstrich vorangestellt, da sie intern für die Klasse ist, die sie definiert, und niemals direkt von Benutzerprogrammen aufgerufen werden sollte.

writable._destroy(err, callback)

Hinzugefügt in: v8.0.0

  • err <Error> Ein möglicher Fehler.
  • callback <Function> Eine Callback-Funktion, die ein optionales Fehlerargument entgegennimmt.

Die Methode _destroy() wird von writable.destroy() aufgerufen. Sie kann von untergeordneten Klassen überschrieben werden, darf aber nicht direkt aufgerufen werden.

writable._final(callback)

Hinzugefügt in: v8.0.0

  • callback <Funktion> Ruft diese Funktion (optional mit einem Fehlerargument) auf, wenn das Schreiben verbleibender Daten abgeschlossen ist.

Die Methode _final() darf nicht direkt aufgerufen werden. Sie kann von untergeordneten Klassen implementiert werden und wird, falls dies der Fall ist, nur von den internen Methoden der Writable-Klasse aufgerufen.

Diese optionale Funktion wird aufgerufen, bevor der Stream geschlossen wird, wodurch das 'finish'-Ereignis verzögert wird, bis callback aufgerufen wird. Dies ist nützlich, um Ressourcen zu schließen oder gepufferte Daten zu schreiben, bevor ein Stream endet.

Fehler beim Schreiben

Fehler, die bei der Verarbeitung der Methoden writable._write(), writable._writev() und writable._final() auftreten, müssen durch Aufrufen des Callbacks und Übergeben des Fehlers als erstes Argument weitergegeben werden. Das Werfen eines Error innerhalb dieser Methoden oder das manuelle Auslösen eines 'error'-Ereignisses führt zu undefiniertem Verhalten.

Wenn ein Readable-Stream in einen Writable-Stream geleitet wird, wenn Writable einen Fehler ausgibt, wird der Readable-Stream entkoppelt.

js
const { Writable } = require('node:stream')

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk ist ungültig'))
    } else {
      callback()
    }
  },
})

Ein Beispiel für einen schreibbaren Stream

Das Folgende veranschaulicht eine eher simple (und etwas sinnlose) benutzerdefinierte Writable-Stream-Implementierung. Obwohl diese spezielle Writable-Stream-Instanz keinen besonderen Nutzen hat, veranschaulicht das Beispiel jedes der erforderlichen Elemente einer benutzerdefinierten Writable-Stream-Instanz:

js
const { Writable } = require('node:stream')

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk ist ungültig'))
    } else {
      callback()
    }
  }
}

Dekodieren von Puffern in einem beschreibbaren Stream

Das Dekodieren von Puffern ist eine häufige Aufgabe, z. B. bei der Verwendung von Transformatoren, deren Eingabe eine Zeichenkette ist. Dies ist kein trivialer Prozess bei der Verwendung von Mehrbyte-Zeichenkodierungen wie UTF-8. Das folgende Beispiel zeigt, wie man Mehrbyte-Zeichenketten mit StringDecoder und Writable dekodiert.

js
const { Writable } = require('node:stream')
const { StringDecoder } = require('node:string_decoder')

class StringWritable extends Writable {
  constructor(options) {
    super(options)
    this._decoder = new StringDecoder(options?.defaultEncoding)
    this.data = ''
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk)
    }
    this.data += chunk
    callback()
  }
  _final(callback) {
    this.data += this._decoder.end()
    callback()
  }
}

const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from)
const w = new StringWritable()

w.write('currency: ')
w.write(euro[0])
w.end(euro[1])

console.log(w.data) // currency: €

Implementierung eines lesbaren Streams

Die Klasse stream.Readable wird erweitert, um einen Readable-Stream zu implementieren.

Benutzerdefinierte Readable-Streams müssen den Konstruktor new stream.Readable([options]) aufrufen und die Methode readable._read() implementieren.

new stream.Readable([options])

[Verlauf]

VersionÄnderungen
v22.0.0Standard-HighWaterMark erhöht.
v15.5.0Unterstützung für das Übergeben eines AbortSignals.
v14.0.0Standardwert der Option autoDestroy auf true geändert.
v11.2.0, v10.16.0Option autoDestroy hinzugefügt, um den Stream automatisch destroy() zu beenden, wenn er 'end' ausgibt oder Fehler auftreten.
  • options <Object>
    • highWaterMark <number> Die maximale Anzahl von Bytes, die im internen Puffer gespeichert werden sollen, bevor das Lesen von der zugrunde liegenden Ressource eingestellt wird. Standard: 65536 (64 KiB) oder 16 für objectMode-Streams.
    • encoding <string> Wenn angegeben, werden Puffer mit der angegebenen Kodierung in Zeichenketten dekodiert. Standard: null.
    • objectMode <boolean> Gibt an, ob dieser Stream sich wie ein Stream von Objekten verhalten soll. Das bedeutet, dass stream.read(n) einen einzelnen Wert anstelle eines Buffer der Größe n zurückgibt. Standard: false.
    • emitClose <boolean> Gibt an, ob der Stream 'close' ausgeben soll, nachdem er zerstört wurde. Standard: true.
    • read <Function> Implementierung für die Methode stream._read().
    • destroy <Function> Implementierung für die Methode stream._destroy().
    • construct <Function> Implementierung für die Methode stream._construct().
    • autoDestroy <boolean> Gibt an, ob dieser Stream automatisch .destroy() für sich selbst aufrufen soll, nachdem er beendet wurde. Standard: true.
    • signal <AbortSignal> Ein Signal, das eine mögliche Abbrechung darstellt.
js
const { Readable } = require('node:stream')

class MyReadable extends Readable {
  constructor(options) {
    // Ruft den Konstruktor stream.Readable(options) auf.
    super(options)
    // ...
  }
}

Oder bei Verwendung von Konstruktoren im Pre-ES6-Stil:

js
const { Readable } = require('node:stream')
const util = require('node:util')

function MyReadable(options) {
  if (!(this instanceof MyReadable)) return new MyReadable(options)
  Readable.call(this, options)
}
util.inherits(MyReadable, Readable)

Oder bei Verwendung des vereinfachten Konstruktoransatzes:

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    // ...
  },
})

Das Aufrufen von abort auf dem AbortController, der dem übergebenen AbortSignal entspricht, verhält sich genauso wie das Aufrufen von .destroy(new AbortError()) auf dem erzeugten lesbaren Stream.

js
const { Readable } = require('node:stream')
const controller = new AbortController()
const read = new Readable({
  read(size) {
    // ...
  },
  signal: controller.signal,
})
// Später die Operation abbrechen und den Stream schließen
controller.abort()

readable._construct(callback)

Hinzugefügt in: v15.0.0

  • callback <Funktion> Rufe diese Funktion auf (optional mit einem Fehlerargument), wenn der Stream die Initialisierung abgeschlossen hat.

Die _construct()-Methode DARF NICHT direkt aufgerufen werden. Sie kann von untergeordneten Klassen implementiert werden und wird, falls dies der Fall ist, nur von den internen Readable-Klassenmethoden aufgerufen.

Diese optionale Funktion wird im nächsten Tick vom Stream-Konstruktor geplant, wodurch alle _read()- und _destroy()-Aufrufe verzögert werden, bis callback aufgerufen wird. Dies ist nützlich, um den Status zu initialisieren oder Ressourcen asynchron zu initialisieren, bevor der Stream verwendet werden kann.

js
const { Readable } = require('node:stream')
const fs = require('node:fs')

class ReadStream extends Readable {
  constructor(filename) {
    super()
    this.filename = filename
    this.fd = null
  }
  _construct(callback) {
    fs.open(this.filename, (err, fd) => {
      if (err) {
        callback(err)
      } else {
        this.fd = fd
        callback()
      }
    })
  }
  _read(n) {
    const buf = Buffer.alloc(n)
    fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
      if (err) {
        this.destroy(err)
      } else {
        this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null)
      }
    })
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, er => callback(er || err))
    } else {
      callback(err)
    }
  }
}

readable._read(size)

Hinzugefügt in: v0.9.4

  • size <Zahl> Anzahl der asynchron zu lesenden Bytes

Diese Funktion DARF NICHT direkt vom Anwendungscode aufgerufen werden. Sie sollte von untergeordneten Klassen implementiert und nur von den internen Readable-Klassenmethoden aufgerufen werden.

Alle Readable-Stream-Implementierungen müssen eine Implementierung der [readable._read()]-Methode bereitstellen, um Daten aus der zugrunde liegenden Ressource abzurufen.

Wenn readable._read() aufgerufen wird und Daten aus der Ressource verfügbar sind, sollte die Implementierung beginnen, diese Daten mithilfe der [this.push(dataChunk)]-Methode in die Lesewarteschlange zu schieben. _read() wird nach jedem Aufruf von this.push(dataChunk) erneut aufgerufen, sobald der Stream bereit ist, weitere Daten zu akzeptieren. _read() kann weiterhin aus der Ressource lesen und Daten schieben, bis readable.push() false zurückgibt. Nur wenn _read() erneut aufgerufen wird, nachdem es gestoppt wurde, sollte es mit dem Schieben weiterer Daten in die Warteschlange fortfahren.

Sobald die [readable._read()]-Methode aufgerufen wurde, wird sie erst wieder aufgerufen, wenn weitere Daten über die [readable.push()]-Methode geschoben werden. Leere Daten wie leere Puffer und Strings führen nicht dazu, dass readable._read() aufgerufen wird.

Das Argument size ist beratend. Für Implementierungen, bei denen ein "Lesen" eine einzelne Operation ist, die Daten zurückgibt, kann das Argument size verwendet werden, um zu bestimmen, wie viele Daten abgerufen werden sollen. Andere Implementierungen können dieses Argument ignorieren und einfach Daten bereitstellen, sobald sie verfügbar sind. Es ist nicht erforderlich zu "warten", bis size Bytes verfügbar sind, bevor stream.push(chunk) aufgerufen wird.

Die [readable._read()]-Methode ist mit einem Unterstrich versehen, da sie intern für die Klasse ist, die sie definiert, und niemals direkt von Benutzerprogrammen aufgerufen werden sollte.

readable._destroy(err, callback)

Hinzugefügt in: v8.0.0

  • err <Error> Ein möglicher Fehler.
  • callback <Function> Eine Callback-Funktion, die ein optionales Fehlerargument entgegennimmt.

Die Methode _destroy() wird von readable.destroy() aufgerufen. Sie kann von untergeordneten Klassen überschrieben werden, darf aber nicht direkt aufgerufen werden.

readable.push(chunk[, encoding])

[Verlauf]

VersionÄnderungen
v22.0.0, v20.13.0Das chunk-Argument kann nun eine TypedArray- oder DataView-Instanz sein.
v8.0.0Das chunk-Argument kann nun eine Uint8Array-Instanz sein.
  • chunk <Buffer> | <TypedArray> | <DataView> | <string> | <null> | <any> Datenchunk, der in die Lesewarteschlange geschoben werden soll. Für Streams, die nicht im Objektmodus betrieben werden, muss chunk ein <string>, <Buffer>, <TypedArray> oder <DataView> sein. Für Streams im Objektmodus kann chunk ein beliebiger JavaScript-Wert sein.
  • encoding <string> Kodierung von String-Chunks. Muss eine gültige Buffer-Kodierung sein, wie z. B. 'utf8' oder 'ascii'.
  • Gibt zurück: <boolean> true, wenn weitere Daten-Chunks weitergeschoben werden können; ansonsten false.

Wenn chunk ein <Buffer>, <TypedArray>, <DataView> oder <string> ist, wird der chunk an Daten der internen Warteschlange hinzugefügt, damit Benutzer des Streams ihn verbrauchen können. Das Übergeben von chunk als null signalisiert das Ende des Streams (EOF), nach dem keine Daten mehr geschrieben werden können.

Wenn der Readable im pausierten Modus arbeitet, können die mit readable.push() hinzugefügten Daten durch Aufrufen der Methode readable.read() gelesen werden, wenn das Ereignis 'readable' ausgelöst wird.

Wenn der Readable im fließenden Modus arbeitet, werden die mit readable.push() hinzugefügten Daten durch Auslösen eines 'data'-Ereignisses zugestellt.

Die Methode readable.push() ist so flexibel wie möglich gestaltet. Wenn beispielsweise eine Quelle auf niedrigerer Ebene mit einer Art Pause/Resume-Mechanismus und einem Daten-Callback umschlossen wird, kann die Quelle auf niedrigerer Ebene von der benutzerdefinierten Readable-Instanz umschlossen werden:

js
// `_source` ist ein Objekt mit den Methoden readStop() und readStart()
// und einem `ondata`-Element, das aufgerufen wird, wenn es Daten hat, und
// einem `onend`-Element, das aufgerufen wird, wenn die Daten vorbei sind.

class SourceWrapper extends Readable {
  constructor(options) {
    super(options)

    this._source = getLowLevelSourceObject()

    // Jedes Mal, wenn Daten vorhanden sind, werden diese in den internen Puffer geschoben.
    this._source.ondata = chunk => {
      // Wenn push() false zurückgibt, wird das Lesen aus der Quelle beendet.
      if (!this.push(chunk)) this._source.readStop()
    }

    // Wenn die Quelle endet, wird der EOF-signal-`null`-Chunk geschoben.
    this._source.onend = () => {
      this.push(null)
    }
  }
  // _read() wird aufgerufen, wenn der Stream weitere Daten abrufen möchte.
  // Das beratende Größenargument wird in diesem Fall ignoriert.
  _read(size) {
    this._source.readStart()
  }
}

Die Methode readable.push() wird verwendet, um den Inhalt in den internen Puffer zu schieben. Sie kann von der Methode readable._read() gesteuert werden.

Für Streams, die nicht im Objektmodus arbeiten, wird der chunk-Parameter von readable.push() als leerer String oder Puffer behandelt, wenn er undefined ist. Weitere Informationen finden Sie unter readable.push('').

Fehler beim Lesen

Fehler, die während der Verarbeitung von readable._read() auftreten, müssen über die Methode readable.destroy(err) weitergegeben werden. Das Auslösen eines Error innerhalb von readable._read() oder das manuelle Ausgeben eines 'error'-Ereignisses führt zu undefiniertem Verhalten.

js
const { Readable } = require('node:stream')

const myReadable = new Readable({
  read(size) {
    const err = checkSomeErrorCondition()
    if (err) {
      this.destroy(err)
    } else {
      // Do some work.
    }
  },
})

Ein Beispiel für einen Zählstream

Das Folgende ist ein einfaches Beispiel für einen Readable-Stream, der die Ziffern von 1 bis 1.000.000 in aufsteigender Reihenfolge ausgibt und dann endet.

js
const { Readable } = require('node:stream')

class Counter extends Readable {
  constructor(opt) {
    super(opt)
    this._max = 1000000
    this._index = 1
  }

  _read() {
    const i = this._index++
    if (i > this._max) this.push(null)
    else {
      const str = String(i)
      const buf = Buffer.from(str, 'ascii')
      this.push(buf)
    }
  }
}

Implementieren eines Duplex-Streams

Ein Duplex-Stream ist ein Stream, der sowohl Readable als auch Writable implementiert, wie z. B. eine TCP-Socket-Verbindung.

Da JavaScript keine Unterstützung für Mehrfachvererbung bietet, wird die Klasse stream.Duplex erweitert, um einen Duplex-Stream zu implementieren (im Gegensatz zur Erweiterung der Klassen stream.Readable und stream.Writable).

Die Klasse stream.Duplex erbt prototypisch von stream.Readable und parasitär von stream.Writable, aber instanceof funktioniert aufgrund der Überschreibung von Symbol.hasInstance in stream.Writable für beide Basisklassen korrekt.

Benutzerdefinierte Duplex-Streams müssen den Konstruktor new stream.Duplex([options]) aufrufen und sowohl die Methoden readable._read() als auch writable._write() implementieren.

new stream.Duplex(options)

[Historie]

VersionÄnderungen
v8.4.0Die Optionen readableHighWaterMark und writableHighWaterMark werden jetzt unterstützt.
  • options <Object> Wird sowohl an die Konstruktoren Writable als auch Readable übergeben. Hat auch die folgenden Felder:
    • allowHalfOpen <boolean> Wenn auf false gesetzt, beendet der Stream automatisch die beschreibbare Seite, wenn die lesbare Seite endet. Standard: true.
    • readable <boolean> Legt fest, ob der Duplex lesbar sein soll. Standard: true.
    • writable <boolean> Legt fest, ob der Duplex beschreibbar sein soll. Standard: true.
    • readableObjectMode <boolean> Legt objectMode für die lesbare Seite des Streams fest. Hat keine Auswirkung, wenn objectMode true ist. Standard: false.
    • writableObjectMode <boolean> Legt objectMode für die beschreibbare Seite des Streams fest. Hat keine Auswirkung, wenn objectMode true ist. Standard: false.
    • readableHighWaterMark <number> Legt highWaterMark für die lesbare Seite des Streams fest. Hat keine Auswirkung, wenn highWaterMark angegeben wird.
    • writableHighWaterMark <number> Legt highWaterMark für die beschreibbare Seite des Streams fest. Hat keine Auswirkung, wenn highWaterMark angegeben wird.
js
const { Duplex } = require('node:stream')

class MyDuplex extends Duplex {
  constructor(options) {
    super(options)
    // ...
  }
}

Oder bei Verwendung von Konstruktoren im Pre-ES6-Stil:

js
const { Duplex } = require('node:stream')
const util = require('node:util')

function MyDuplex(options) {
  if (!(this instanceof MyDuplex)) return new MyDuplex(options)
  Duplex.call(this, options)
}
util.inherits(MyDuplex, Duplex)

Oder unter Verwendung des vereinfachten Konstruktoransatzes:

js
const { Duplex } = require('node:stream')

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  },
})

Bei Verwendung von Pipeline:

js
const { Transform, pipeline } = require('node:stream')
const fs = require('node:fs')

pipeline(
  fs.createReadStream('object.json').setEncoding('utf8'),
  new Transform({
    decodeStrings: false, // Akzeptiert String-Eingabe anstelle von Buffers
    construct(callback) {
      this.data = ''
      callback()
    },
    transform(chunk, encoding, callback) {
      this.data += chunk
      callback()
    },
    flush(callback) {
      try {
        // Sicherstellen, dass es sich um gültiges JSON handelt.
        JSON.parse(this.data)
        this.push(this.data)
        callback()
      } catch (err) {
        callback(err)
      }
    },
  }),
  fs.createWriteStream('valid-object.json'),
  err => {
    if (err) {
      console.error('fehlgeschlagen', err)
    } else {
      console.log('abgeschlossen')
    }
  }
)

Ein Beispiel für einen Duplex-Stream

Das Folgende veranschaulicht ein einfaches Beispiel für einen Duplex-Stream, der ein hypothetisches Low-Level-Quellobjekt umschließt, in das Daten geschrieben und aus dem Daten gelesen werden können, wenn auch mit einer API, die nicht mit Node.js-Streams kompatibel ist. Das Folgende veranschaulicht ein einfaches Beispiel für einen Duplex-Stream, der eingehende geschriebene Daten über die Writable-Schnittstelle puffert, die über die Readable-Schnittstelle wieder ausgelesen werden.

js
const { Duplex } = require('node:stream')
const kSource = Symbol('source')

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options)
    this[kSource] = source
  }

  _write(chunk, encoding, callback) {
    // Die zugrunde liegende Quelle verarbeitet nur Strings.
    if (Buffer.isBuffer(chunk)) chunk = chunk.toString()
    this[kSource].writeSomeData(chunk)
    callback()
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding))
    })
  }
}

Der wichtigste Aspekt eines Duplex-Streams ist, dass die Readable- und Writable-Seiten unabhängig voneinander arbeiten, obwohl sie innerhalb einer einzigen Objektinstanz koexistieren.

Object-Mode-Duplex-Streams

Für Duplex-Streams kann objectMode ausschließlich für die Readable- oder Writable-Seite mithilfe der Optionen readableObjectMode bzw. writableObjectMode eingestellt werden.

Im folgenden Beispiel wird beispielsweise ein neuer Transform-Stream (der eine Art von Duplex-Stream ist) erstellt, der eine Writable-Seite im Objektmodus hat, die JavaScript-Zahlen akzeptiert, die auf der Readable-Seite in hexadezimale Strings konvertiert werden.

js
const { Transform } = require('node:stream')

// Alle Transform-Streams sind auch Duplex-Streams.
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // Zwinge den Chunk bei Bedarf zu einer Zahl.
    chunk |= 0

    // Wandle den Chunk in etwas anderes um.
    const data = chunk.toString(16)

    // Füge die Daten der lesbaren Warteschlange hinzu.
    callback(null, '0'.repeat(data.length % 2) + data)
  },
})

myTransform.setEncoding('ascii')
myTransform.on('data', chunk => console.log(chunk))

myTransform.write(1)
// Gibt aus: 01
myTransform.write(10)
// Gibt aus: 0a
myTransform.write(100)
// Gibt aus: 64

Implementierung eines Transformations-Streams

Ein Transform-Stream ist ein Duplex-Stream, bei dem die Ausgabe auf irgendeine Weise aus der Eingabe berechnet wird. Beispiele hierfür sind zlib-Streams oder crypto-Streams, die Daten komprimieren, verschlüsseln oder entschlüsseln.

Es gibt keine Vorschrift, dass die Ausgabe die gleiche Größe wie die Eingabe haben muss, die gleiche Anzahl von Chunks oder zur gleichen Zeit eintreffen muss. Zum Beispiel wird ein Hash-Stream immer nur einen einzigen Ausgabechunk haben, der bereitgestellt wird, wenn die Eingabe beendet ist. Ein zlib-Stream erzeugt eine Ausgabe, die entweder viel kleiner oder viel größer als seine Eingabe ist.

Die stream.Transform-Klasse wird erweitert, um einen Transform-Stream zu implementieren.

Die stream.Transform-Klasse erbt prototypisch von stream.Duplex und implementiert ihre eigenen Versionen der Methoden writable._write() und readable._read(). Benutzerdefinierte Transform-Implementierungen müssen die Methode transform._transform() implementieren und können auch die Methode transform._flush() implementieren.

Bei der Verwendung von Transform-Streams ist Vorsicht geboten, da Daten, die in den Stream geschrieben werden, dazu führen können, dass die Writable-Seite des Streams angehalten wird, wenn die Ausgabe auf der Readable-Seite nicht verbraucht wird.

new stream.Transform([options])

js
const { Transform } = require('node:stream')

class MyTransform extends Transform {
  constructor(options) {
    super(options)
    // ...
  }
}

Oder bei Verwendung von Konstruktoren im Pre-ES6-Stil:

js
const { Transform } = require('node:stream')
const util = require('node:util')

function MyTransform(options) {
  if (!(this instanceof MyTransform)) return new MyTransform(options)
  Transform.call(this, options)
}
util.inherits(MyTransform, Transform)

Oder mit dem vereinfachten Konstruktoransatz:

js
const { Transform } = require('node:stream')

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  },
})

Ereignis: 'end'

Das 'end'-Ereignis stammt von der stream.Readable-Klasse. Das 'end'-Ereignis wird ausgelöst, nachdem alle Daten ausgegeben wurden, was nach dem Aufruf des Callbacks in transform._flush() erfolgt. Im Fehlerfall sollte 'end' nicht ausgelöst werden.

Ereignis: 'finish'

Das 'finish'-Ereignis stammt von der stream.Writable-Klasse. Das 'finish'-Ereignis wird ausgelöst, nachdem stream.end() aufgerufen wurde und alle Chunks von stream._transform() verarbeitet wurden. Im Fehlerfall sollte 'finish' nicht ausgelöst werden.

transform._flush(callback)

  • callback <Funktion> Eine Callback-Funktion (optional mit einem Fehlerargument und Daten), die aufgerufen wird, wenn verbleibende Daten geleert wurden.

Diese Funktion DARF NICHT direkt vom Anwendungscode aufgerufen werden. Sie sollte von Kindklassen implementiert und nur von den internen Readable-Klassenmethoden aufgerufen werden.

In einigen Fällen kann eine Transformationsoperation am Ende des Streams ein zusätzliches Datenbit ausgeben müssen. Beispielsweise speichert ein zlib-Komprimierungsstream einen Teil des internen Zustands, der verwendet wird, um die Ausgabe optimal zu komprimieren. Wenn der Stream endet, müssen diese zusätzlichen Daten jedoch geleert werden, damit die komprimierten Daten vollständig sind.

Benutzerdefinierte Transform-Implementierungen können die transform._flush()-Methode implementieren. Diese wird aufgerufen, wenn keine weiteren geschriebenen Daten zu verbrauchen sind, jedoch bevor das 'end'-Ereignis ausgelöst wird, das das Ende des Readable-Streams signalisiert.

Innerhalb der transform._flush()-Implementierung kann die transform.push()-Methode gegebenenfalls null oder mehrmals aufgerufen werden. Die callback-Funktion muss aufgerufen werden, wenn die Flush-Operation abgeschlossen ist.

Die transform._flush()-Methode ist mit einem Unterstrich versehen, da sie intern für die Klasse ist, die sie definiert, und niemals direkt von Benutzerprogrammen aufgerufen werden sollte.

transform._transform(chunk, encoding, callback)

  • chunk <Buffer> | <string> | <any> Der zu transformierende Buffer, der von dem an stream.write() übergebenen string konvertiert wurde. Wenn die Option decodeStrings des Streams false ist oder der Stream im Objektmodus arbeitet, wird der Chunk nicht konvertiert und ist das, was an stream.write() übergeben wurde.
  • encoding <string> Wenn der Chunk ein String ist, dann ist dies der Kodierungstyp. Wenn der Chunk ein Buffer ist, dann ist dies der spezielle Wert 'buffer'. Ignorieren Sie ihn in diesem Fall.
  • callback <Function> Eine Callback-Funktion (optional mit einem Fehlerargument und Daten), die aufgerufen werden soll, nachdem der bereitgestellte chunk verarbeitet wurde.

Diese Funktion DARF NICHT direkt vom Anwendungscode aufgerufen werden. Sie sollte von Kindklassen implementiert und nur von den internen Methoden der Readable-Klasse aufgerufen werden.

Alle Transform-Stream-Implementierungen müssen eine _transform()-Methode bereitstellen, um Eingaben zu akzeptieren und Ausgaben zu erzeugen. Die transform._transform()-Implementierung behandelt die geschriebenen Bytes, berechnet eine Ausgabe und übergibt diese Ausgabe dann über die transform.push()-Methode an den lesbaren Teil.

Die transform.push()-Methode kann null oder mehrmals aufgerufen werden, um aus einem einzelnen Eingabe-Chunk eine Ausgabe zu erzeugen, je nachdem, wie viel als Ergebnis des Chunks ausgegeben werden soll.

Es ist möglich, dass aus einem bestimmten Chunk von Eingabedaten keine Ausgabe erzeugt wird.

Die callback-Funktion muss nur aufgerufen werden, wenn der aktuelle Chunk vollständig verarbeitet wurde. Das erste Argument, das an den callback übergeben wird, muss ein Error-Objekt sein, wenn bei der Verarbeitung der Eingabe ein Fehler aufgetreten ist, oder null, wenn kein Fehler aufgetreten ist. Wenn ein zweites Argument an den callback übergeben wird, wird es an die transform.push()-Methode weitergeleitet, aber nur, wenn das erste Argument falsch ist. Mit anderen Worten, die folgenden sind äquivalent:

js
transform.prototype._transform = function (data, encoding, callback) {
  this.push(data)
  callback()
}

transform.prototype._transform = function (data, encoding, callback) {
  callback(null, data)
}

Die transform._transform()-Methode ist mit einem Unterstrich präfixiert, da sie intern zur definierenden Klasse gehört und niemals direkt von Benutzerprogrammen aufgerufen werden sollte.

transform._transform() wird niemals parallel aufgerufen; Streams implementieren einen Warteschlangenmechanismus, und um den nächsten Chunk zu empfangen, muss callback entweder synchron oder asynchron aufgerufen werden.

Klasse: stream.PassThrough

Die Klasse stream.PassThrough ist eine triviale Implementierung eines Transform-Streams, der die Eingabe-Bytes einfach an die Ausgabe weiterleitet. Ihr Zweck liegt hauptsächlich in Beispielen und Tests, aber es gibt einige Anwendungsfälle, in denen stream.PassThrough als Baustein für neuartige Arten von Streams nützlich ist.

Zusätzliche Hinweise

Stream-Kompatibilität mit asynchronen Generatoren und asynchronen Iteratoren

Mit der Unterstützung von asynchronen Generatoren und Iteratoren in JavaScript sind asynchrone Generatoren an diesem Punkt effektiv ein erstklassiges Sprach-Level-Stream-Konstrukt.

Einige gängige Interop-Fälle der Verwendung von Node.js-Streams mit asynchronen Generatoren und asynchronen Iteratoren sind unten aufgeführt.

Konsumieren von lesbaren Streams mit asynchronen Iteratoren

js
;(async function () {
  for await (const chunk of readable) {
    console.log(chunk)
  }
})()

Asynchrone Iteratoren registrieren einen permanenten Fehlerhandler auf dem Stream, um unberücksichtigte Fehler nach der Zerstörung zu verhindern.

Erstellen von lesbaren Streams mit asynchronen Generatoren

Ein Node.js-lesbarer Stream kann aus einem asynchronen Generator mit der Utility-Methode Readable.from() erstellt werden:

js
const { Readable } = require('node:stream')

const ac = new AbortController()
const signal = ac.signal

async function* generate() {
  yield 'a'
  await someLongRunningFn({ signal })
  yield 'b'
  yield 'c'
}

const readable = Readable.from(generate())
readable.on('close', () => {
  ac.abort()
})

readable.on('data', chunk => {
  console.log(chunk)
})

Weiterleiten an beschreibbare Streams von asynchronen Iteratoren

Wenn Sie aus einem asynchronen Iterator in einen beschreibbaren Stream schreiben, stellen Sie sicher, dass der Gegendruck und Fehler korrekt behandelt werden. stream.pipeline() abstrahiert die Behandlung von Gegendruck und Gegendruck-bezogenen Fehlern:

js
const fs = require('node:fs')
const { pipeline } = require('node:stream')
const { pipeline: pipelinePromise } = require('node:stream/promises')

const writable = fs.createWriteStream('./file')

const ac = new AbortController()
const signal = ac.signal

const iterator = createIterator({ signal })

// Callback-Muster
pipeline(iterator, writable, (err, value) => {
  if (err) {
    console.error(err)
  } else {
    console.log(value, 'Wert zurückgegeben')
  }
}).on('close', () => {
  ac.abort()
})

// Promise-Muster
pipelinePromise(iterator, writable)
  .then(value => {
    console.log(value, 'Wert zurückgegeben')
  })
  .catch(err => {
    console.error(err)
    ac.abort()
  })

Kompatibilität mit älteren Node.js-Versionen

Vor Node.js 0.10 war die Readable-Stream-Schnittstelle einfacher, aber auch weniger leistungsfähig und weniger nützlich.

  • Anstatt auf Aufrufe der Methode stream.read() zu warten, wurden 'data'-Ereignisse sofort ausgelöst. Anwendungen, die einen gewissen Arbeitsaufwand benötigten, um zu entscheiden, wie mit Daten umzugehen ist, mussten gelesene Daten in Puffern speichern, damit die Daten nicht verloren gingen.
  • Die Methode stream.pause() war eher empfehlend als garantiert. Das bedeutete, dass es immer noch notwendig war, darauf vorbereitet zu sein, 'data'-Ereignisse zu empfangen, auch wenn sich der Stream in einem pausierten Zustand befand.

In Node.js 0.10 wurde die Klasse Readable hinzugefügt. Aus Gründen der Abwärtskompatibilität mit älteren Node.js-Programmen wechseln Readable-Streams in den "Flussmodus", wenn ein 'data'-Ereignishandler hinzugefügt wird oder wenn die Methode stream.resume() aufgerufen wird. Der Effekt ist, dass es selbst dann, wenn man die neue Methode stream.read() und das Ereignis 'readable' nicht verwendet, nicht mehr nötig ist, sich um den Verlust von 'data'-Chunks zu sorgen.

Während die meisten Anwendungen weiterhin normal funktionieren, führt dies zu einem Sonderfall unter den folgenden Bedingungen:

  • Es wird kein 'data'-Ereignis-Listener hinzugefügt.
  • Die Methode stream.resume() wird nie aufgerufen.
  • Der Stream wird nicht zu einem beschreibbaren Ziel weitergeleitet.

Betrachten Sie zum Beispiel den folgenden Code:

js
// WARNUNG!  DEFEKT!
net
  .createServer(socket => {
    // Wir fügen einen 'end'-Listener hinzu, konsumieren aber nie die Daten.
    socket.on('end', () => {
      // Es wird niemals hierher kommen.
      socket.end('Die Nachricht wurde empfangen, aber nicht verarbeitet.\n')
    })
  })
  .listen(1337)

Vor Node.js 0.10 wurden die eingehenden Nachrichtendaten einfach verworfen. In Node.js 0.10 und höher bleibt der Socket jedoch für immer pausiert.

Die Problemumgehung in dieser Situation besteht darin, die Methode stream.resume() aufzurufen, um den Datenfluss zu starten:

js
// Problemumgehung.
net
  .createServer(socket => {
    socket.on('end', () => {
      socket.end('Die Nachricht wurde empfangen, aber nicht verarbeitet.\n')
    })

    // Starten Sie den Datenfluss und verwerfen Sie ihn.
    socket.resume()
  })
  .listen(1337)

Zusätzlich zum Umschalten neuer Readable-Streams in den Flussmodus können Streams im Pre-0.10-Stil mit der Methode readable.wrap() in eine Readable-Klasse verpackt werden.

readable.read(0)

Es gibt einige Fälle, in denen es notwendig ist, eine Aktualisierung der zugrunde liegenden lesbaren Stream-Mechanismen auszulösen, ohne tatsächlich Daten zu konsumieren. In solchen Fällen ist es möglich, readable.read(0) aufzurufen, was immer null zurückgibt.

Wenn der interne Lesepuffer unterhalb des highWaterMark liegt und der Stream gerade nicht liest, löst der Aufruf von stream.read(0) einen Low-Level-Aufruf stream._read() aus.

Während die meisten Anwendungen dies fast nie tun müssen, gibt es in Node.js Situationen, in denen dies geschieht, insbesondere in den Interna der Readable-Stream-Klasse.

readable.push('')

Die Verwendung von readable.push('') wird nicht empfohlen.

Das Pushen eines Null-Byte-<string>, <Buffer>, <TypedArray> oder <DataView> in einen Stream, der sich nicht im Objektmodus befindet, hat einen interessanten Nebeneffekt. Da es sich um einen Aufruf von readable.push() handelt, beendet der Aufruf den Leseprozess. Da das Argument jedoch eine leere Zeichenkette ist, werden dem lesbaren Puffer keine Daten hinzugefügt, sodass ein Benutzer nichts verbrauchen kann.

highWaterMark-Diskrepanz nach dem Aufruf von readable.setEncoding()

Die Verwendung von readable.setEncoding() ändert das Verhalten von highWaterMark im Nicht-Objektmodus.

Normalerweise wird die Größe des aktuellen Puffers mit dem highWaterMark in Bytes gemessen. Nach dem Aufruf von setEncoding() beginnt die Vergleichsfunktion jedoch, die Größe des Puffers in Zeichen zu messen.

Dies ist in häufigen Fällen mit latin1 oder ascii kein Problem. Es ist jedoch ratsam, dieses Verhalten bei der Arbeit mit Strings zu beachten, die Multibyte-Zeichen enthalten könnten.