Skip to content

Rückstau in Streams

Es gibt ein allgemeines Problem, das bei der Datenverarbeitung auftritt und als Rückstau bezeichnet wird. Es beschreibt eine Ansammlung von Daten hinter einem Puffer während der Datenübertragung. Wenn das empfangende Ende der Übertragung komplexe Operationen ausführt oder aus irgendeinem Grund langsamer ist, neigen die Daten aus der eingehenden Quelle dazu, sich anzusammeln, wie eine Verstopfung.

Um dieses Problem zu lösen, muss ein Delegationssystem vorhanden sein, um einen reibungslosen Datenfluss von einer Quelle zur anderen zu gewährleisten. Verschiedene Communitys haben dieses Problem auf ihre Weise für ihre Programme gelöst. Unix-Pipes und TCP-Sockets sind gute Beispiele dafür und werden oft als Flusskontrolle bezeichnet. In Node.js sind Streams die bevorzugte Lösung.

Der Zweck dieser Anleitung ist es, genauer zu erläutern, was Rückstau ist und wie genau Streams dies im Node.js-Quellcode angehen. Der zweite Teil der Anleitung wird empfohlene Best Practices vorstellen, um sicherzustellen, dass der Code Ihrer Anwendung sicher und optimiert ist, wenn Sie Streams implementieren.

Wir setzen eine gewisse Vertrautheit mit der allgemeinen Definition von backpressure, Buffer und EventEmitters in Node.js sowie einige Erfahrung mit Stream voraus. Wenn Sie diese Dokumente noch nicht gelesen haben, ist es keine schlechte Idee, zuerst einen Blick auf die API-Dokumentation zu werfen, da dies Ihnen helfen wird, Ihr Verständnis beim Lesen dieser Anleitung zu erweitern.

Das Problem bei der Datenverarbeitung

In einem Computersystem werden Daten von einem Prozess zu einem anderen über Pipes, Sockets und Signale übertragen. In Node.js finden wir einen ähnlichen Mechanismus namens Stream. Streams sind großartig! Sie leisten so viel für Node.js und fast jeder Teil der internen Codebasis verwendet dieses Modul. Als Entwickler sind Sie mehr als ermutigt, sie auch zu verwenden!

javascript
const readline = require('node:readline')

const rl = readline.createInterface({
  output: process.stdout,
  input: process.stdin,
})

rl.question('Warum sollten Sie Streams verwenden? ', answer => {
  console.log(`Vielleicht ist es ${answer}, vielleicht auch, weil sie toll sind!`)
})

rl.close()

Ein gutes Beispiel dafür, warum der durch Streams implementierte Rückstau-Mechanismus eine großartige Optimierung ist, lässt sich durch den Vergleich der internen Systemwerkzeuge der Stream-Implementierung von Node.js demonstrieren.

In einem Szenario nehmen wir eine große Datei (ungefähr -9 GB) und komprimieren sie mit dem bekannten Tool zip(1).

bash
zip The.Matrix.1080p.mkv

Während dies einige Minuten dauern wird, können wir in einer anderen Shell ein Skript ausführen, das das Node.js-Modul zlib verwendet, das ein weiteres Komprimierungstool, gzip(1), umschließt.

javascript
const gzip = require('node:zlib').createGzip()
const fs = require('node:fs')

const inp = fs.createReadStream('The.Matrix.1080p.mkv')
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz')

inp.pipe(gzip).pipe(out)

Um die Ergebnisse zu testen, versuchen Sie, jede komprimierte Datei zu öffnen. Die mit dem Tool zip(1) komprimierte Datei wird Sie darüber informieren, dass die Datei beschädigt ist, während die von Stream abgeschlossene Komprimierung ohne Fehler dekomprimiert wird.

Hinweis

In diesem Beispiel verwenden wir .pipe(), um die Datenquelle von einem Ende zum anderen zu übertragen. Beachten Sie jedoch, dass keine ordnungsgemäßen Fehlerbehandlungsroutinen angehängt sind. Wenn ein Datenchunk nicht ordnungsgemäß empfangen wird, wird die Readable-Quelle oder der gzip-Stream nicht zerstört. pump ist ein Hilfsprogramm, das alle Streams in einer Pipeline ordnungsgemäß zerstören würde, wenn einer von ihnen fehlschlägt oder sich schließt, und ist in diesem Fall ein Muss!

pump ist nur für Node.js 8.x oder früher erforderlich, da für Node.js 10.x oder höher pipeline eingeführt wurde, um pump zu ersetzen. Dies ist eine Modulmethode, um zwischen Streams zu pipelinen, Fehler weiterzuleiten und ordnungsgemäß aufzuräumen und einen Callback bereitzustellen, wenn die Pipeline abgeschlossen ist.

Hier ist ein Beispiel für die Verwendung von Pipeline:

javascript
const { pipeline } = require('node:stream')
const fs = require('node:fs')
const zlib = require('node:zlib')
// Verwenden Sie die Pipeline-API, um auf einfache Weise eine Reihe von Streams
// zusammenzuleiten und benachrichtigt zu werden, wenn die Pipeline vollständig abgeschlossen ist.
// Eine Pipeline zum effizienten Komprimieren einer potenziell riesigen Videodatei:
pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  err => {
    if (err) {
      console.error('Pipeline fehlgeschlagen', err)
    } else {
      console.log('Pipeline erfolgreich')
    }
  }
)

Sie können auch das Modul stream/promises verwenden, um Pipeline mit async / await zu verwenden:

javascript
const { pipeline } = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
  try {
    await pipeline(
      fs.createReadStream('The.Matrix.1080p.mkv'),
      zlib.createGzip(),
      fs.createWriteStream('The.Matrix.1080p.mkv.gz')
    )
    console.log('Pipeline erfolgreich')
  } catch (err) {
    console.error('Pipeline fehlgeschlagen', err)
  }
}

Zu viele Daten, zu schnell

Es gibt Fälle, in denen ein Readable-Stream Daten viel zu schnell an den Writable-Stream weitergeben kann - viel mehr, als der Konsument verarbeiten kann!

Wenn dies geschieht, beginnt der Konsument, alle Datenblöcke für die spätere Verarbeitung in eine Warteschlange zu stellen. Die Warteschlange wird immer länger, und aus diesem Grund müssen mehr Daten im Speicher gehalten werden, bis der gesamte Prozess abgeschlossen ist.

Das Schreiben auf eine Festplatte ist viel langsamer als das Lesen von einer Festplatte. Wenn wir also versuchen, eine Datei zu komprimieren und auf unsere Festplatte zu schreiben, entsteht ein Rückstau, weil die Festplatte nicht mit der Geschwindigkeit des Lesens mithalten kann.

javascript
// Heimlich sagt der Stream: "Hoppla, hoppla! Warte mal, das ist viel zu viel!"
// Daten werden sich auf der Leseseite des Datenpuffers ansammeln, während
// write' versucht, mit dem eingehenden Datenfluss Schritt zu halten.
inp.pipe(gzip).pipe(outputFile)

Deshalb ist ein Rückstaumechanismus wichtig. Wenn es kein Rückstausystem gäbe, würde der Prozess den Speicher Ihres Systems aufbrauchen, wodurch andere Prozesse verlangsamt und ein großer Teil Ihres Systems bis zum Abschluss monopolisiert würde.

Dies führt zu einigen Dingen:

  • Verlangsamung aller anderen aktuellen Prozesse
  • Ein sehr überlasteter Garbage Collector
  • Speichererschöpfung

In den folgenden Beispielen werden wir den Rückgabewert der Funktion .write() herausnehmen und in true ändern, wodurch die Unterstützung für Rückstau im Node.js-Kern effektiv deaktiviert wird. Bei jeder Referenz auf die binäre Datei 'modified' sprechen wir davon, die Node-Binärdatei ohne die Zeile return ret; und stattdessen mit dem ersetzten return true; auszuführen.

Übermäßiger Zug am Garbage Collector

Werfen wir einen kurzen Blick auf einen Benchmark. Mit dem gleichen Beispiel wie oben haben wir einige Zeitmessungen durchgeführt, um eine Medianzeit für beide Binärdateien zu erhalten.

bash
   Testlauf (#)  | `node` binär (ms) | modifizierte `node` binär (ms)
=================================================================
      1       |      56924         |           55011
      2       |      52686         |           55869
      3       |      59479         |           54043
      4       |      54473         |           55229
      5       |      52933         |           59723
=================================================================
Durchschnittszeit: |      55299         |           55975

Beide brauchen etwa eine Minute, um zu laufen, es gibt also keinen großen Unterschied, aber sehen wir uns das genauer an, um zu bestätigen, ob unsere Vermutungen richtig sind. Wir verwenden das Linux-Tool dtrace, um zu bewerten, was mit dem V8 Garbage Collector passiert.

Die vom GC (Garbage Collector) gemessene Zeit gibt die Intervalle eines vollständigen Durchlaufs einer einzelnen Bereinigung durch den Garbage Collector an:

bash
ungefähre Zeit (ms) | GC (ms) | modifizierter GC (ms)
=================================================
          0       |    0    |      0
          1       |    0    |      0
         40       |    0    |      2
        170       |    3    |      1
        300       |    3    |      1
         *             *           *
         *             *           *
         *             *           *
      39000       |    6    |     26
      42000       |    6    |     21
      47000       |    5    |     32
      50000       |    8    |     28
      54000       |    6    |     35

Während die beiden Prozesse gleich starten und die GC-Last mit der gleichen Rate zu verarbeiten scheinen, wird nach einigen Sekunden mit einem funktionierenden Rückstaussystem deutlich, dass es die GC-Last auf konsistente Intervalle von 4-8 Millisekunden bis zum Ende der Datenübertragung verteilt.

Wenn jedoch kein Rückstausystem vorhanden ist, beginnt die V8 Garbage Collection sich zu verzögern. Die normale Binärdatei rief den GC etwa 75 Mal in einer Minute auf, während die modifizierte Binärdatei ihn nur 36 Mal aufrief.

Dies ist die langsame und allmähliche Verschuldung, die sich aus dem wachsenden Speicherverbrauch ergibt. Wenn Daten übertragen werden, wird ohne ein Rückstausystem mehr Speicher für jede Chunk-Übertragung verwendet.

Je mehr Speicher zugewiesen wird, desto mehr muss der GC in einem Durchlauf verarbeiten. Je größer der Durchlauf ist, desto mehr muss der GC entscheiden, was freigegeben werden kann, und das Scannen nach losgelösten Zeigern in einem größeren Speicherbereich verbraucht mehr Rechenleistung.

Speicherausschöpfung

Um den Speicherverbrauch jeder Binärdatei zu bestimmen, haben wir jeden Prozess einzeln mit /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js getaktet.

Dies ist die Ausgabe der normalen Binärdatei:

bash
Den Rückgabewert von .write() beachten
=============================================
real        58.88
user        56.79
sys          8.79
  87810048  maximale Resident Set Size
         0  durchschnittliche Shared Memory Size
         0  durchschnittliche Unshared Data Size
         0  durchschnittliche Unshared Stack Size
     19427  Page Reclaims
      3134  Page Faults
         0  Swaps
         5  Block Input Operations
       194  Block Output Operations
         0  Nachrichten gesendet
         0  Nachrichten empfangen
         1  Signale empfangen
        12  Freiwillige Kontextwechsel
    666037  Unfreiwillige Kontextwechsel

Die maximale Byte-Größe, die vom virtuellen Speicher belegt wird, beträgt ungefähr 87,81 MB.

Und jetzt, wenn wir den Rückgabewert der Funktion .write() ändern, erhalten wir:

bash
Ohne Beachtung des Rückgabewerts von .write():
==================================================
real        54.48
user        53.15
sys          7.43
1524965376  maximale Resident Set Size
         0  durchschnittliche Shared Memory Size
         0  durchschnittliche Unshared Data Size
         0  durchschnittliche Unshared Stack Size
    373617  Page Reclaims
      3139  Page Faults
         0  Swaps
        18  Block Input Operations
       199  Block Output Operations
         0  Nachrichten gesendet
         0  Nachrichten empfangen
         1  Signale empfangen
        25  Freiwillige Kontextwechsel
    629566  Unfreiwillige Kontextwechsel

Die maximale Byte-Größe, die vom virtuellen Speicher belegt wird, beträgt ungefähr 1,52 GB.

Ohne Streams, die den Gegendruck delegieren, wird eine Größenordnung mehr Speicherplatz zugewiesen - ein enormer Unterschied zwischen demselben Prozess!

Dieses Experiment zeigt, wie optimiert und kosteneffizient der Backpressure-Mechanismus von Node.js für Ihr Computersystem ist. Lassen Sie uns nun eine Aufschlüsselung der Funktionsweise vornehmen!

Wie löst Gegendruck diese Probleme?

Es gibt verschiedene Funktionen, um Daten von einem Prozess zu einem anderen zu übertragen. In Node.js gibt es eine interne, eingebaute Funktion namens .pipe(). Es gibt auch andere Pakete, die Sie verwenden können! Letztendlich haben wir jedoch auf der grundlegenden Ebene dieses Prozesses zwei separate Komponenten: die Quelle der Daten und den Verbraucher.

Wenn .pipe() von der Quelle aufgerufen wird, signalisiert dies dem Verbraucher, dass Daten übertragen werden sollen. Die Pipe-Funktion hilft, die entsprechenden Gegendruck-Closures für die Ereignisauslöser einzurichten.

In Node.js ist die Quelle ein Readable-Stream und der Verbraucher ein Writable-Stream (beide können mit einem Duplex- oder einem Transform-Stream ausgetauscht werden, aber das ist für diesen Leitfaden nicht relevant).

Der Moment, in dem Gegendruck ausgelöst wird, kann genau auf den Rückgabewert der .write()-Funktion eines Writable eingegrenzt werden. Dieser Rückgabewert wird natürlich durch einige Bedingungen bestimmt.

In jedem Szenario, in dem der Datenpuffer den highwaterMark überschritten hat oder die Schreibwarteschlange gerade beschäftigt ist, gibt .write() false zurück.

Wenn ein false-Wert zurückgegeben wird, wird das Gegendrucksystem aktiviert. Es pausiert den eingehenden Readable-Stream, um keine Daten zu senden, und wartet, bis der Verbraucher wieder bereit ist. Sobald der Datenpuffer geleert ist, wird ein 'drain'-Ereignis ausgelöst und der eingehende Datenfluss wird fortgesetzt.

Sobald die Warteschlange abgearbeitet ist, ermöglicht der Gegendruck wieder das Senden von Daten. Der Speicherplatz, der verwendet wurde, wird freigegeben und bereitet sich auf den nächsten Datenblock vor.

Dies ermöglicht effektiv die Verwendung einer festen Menge an Speicher zu einem bestimmten Zeitpunkt für eine .pipe()-Funktion. Es gibt keine Speicherlecks und keine unendliche Pufferung, und der Garbage Collector muss sich nur mit einem Bereich im Speicher befassen!

Wenn Gegendruck also so wichtig ist, warum haben Sie (wahrscheinlich) noch nicht davon gehört? Nun, die Antwort ist einfach: Node.js erledigt das alles automatisch für Sie.

Das ist so toll! Aber auch nicht so toll, wenn wir versuchen zu verstehen, wie wir unsere eigenen benutzerdefinierten Streams implementieren.

HINWEIS

In den meisten Maschinen gibt es eine Byte-Größe, die bestimmt, wann ein Puffer voll ist (was auf verschiedenen Maschinen variiert). Node.js ermöglicht es Ihnen, Ihren eigenen highWaterMark festzulegen, aber üblicherweise ist der Standard auf 16 KB (16384 oder 16 für objectMode-Streams) eingestellt. In Fällen, in denen Sie diesen Wert erhöhen möchten, tun Sie dies, aber mit Vorsicht!

Lebenszyklus von .pipe()

Um ein besseres Verständnis für Gegendruck zu erlangen, hier ein Flussdiagramm zum Lebenszyklus eines Readable-Streams, der in einen Writable-Stream geleitet wird:

bash
                                                     +===================+
                         x-->  Piping-Funktionen +-->   src.pipe(dest)  |
                         x     werden während      |===================|
                         x     der .pipe-Methode    |  Ereignis-Callbacks |
  +===============+      x     eingerichtet.       |-------------------|
  |   Ihre Daten   |      x                           | .on('close', cb)  |
  +=======+=======+      x     Sie existieren       | .on('data', cb)   |
          |              x     außerhalb des        | .on('drain', cb)  |
          |              x     Datenflusses, aber    | .on('unpipe', cb) |
+---------v---------+    x     hängen wichtig       | .on('error', cb)  |
|  Readable Stream  +----+     Ereignisse und       | .on('finish', cb) |
+-^-------^-------^-+    |     ihre jeweiligen      | .on('end', cb)    |
  ^       |       ^      |     Callbacks an.        +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable Stream  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                | Ist dieser Chunk zu groß? |
  ^       |       |     emit .end();             | Ist die Warteschlange belegt? |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  Nein |        |  Ja   |
  |       |                                           +------+        +---v---+
  |       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            wenn Warteschlange leer ist     +============+                         |
  ^------------^-----------------------<  Pufferung |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Puffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Puffer^  |                         |
                                       +------------+   Chunk zur Warteschlange hinzufügen   |
                                       |            <---^---------------------<
                                       +============+

HINWEIS

Wenn Sie eine Pipeline einrichten, um einige Streams zu verketten, um Ihre Daten zu bearbeiten, werden Sie höchstwahrscheinlich einen Transform-Stream implementieren.

In diesem Fall gelangt Ihre Ausgabe von Ihrem Readable-Stream in den Transform und wird in den Writable geleitet.

javascript
Readable.pipe(Transformable).pipe(Writable)

Gegendruck wird automatisch angewendet, aber beachten Sie, dass sowohl die eingehenden als auch die ausgehenden highwaterMark des Transform-Streams manipuliert werden können und das Gegendrucksystem beeinflussen.

Richtlinien für Gegendruck

Seit Node.js v0.10 bietet die Stream-Klasse die Möglichkeit, das Verhalten von .read() oder .write() zu ändern, indem die Unterstrich-Version dieser jeweiligen Funktionen (._read() und ._write()) verwendet wird.

Es gibt dokumentierte Richtlinien für die Implementierung von lesbaren Streams und die Implementierung von schreibbaren Streams. Wir gehen davon aus, dass Sie diese gelesen haben, und der nächste Abschnitt wird etwas tiefer gehen.

Regeln, die bei der Implementierung benutzerdefinierter Streams zu beachten sind

Die goldene Regel für Streams ist, immer den Gegendruck zu respektieren. Was als Best Practice gilt, ist eine widerspruchsfreie Praxis. Solange Sie darauf achten, Verhaltensweisen zu vermeiden, die mit der internen Unterstützung für Gegendruck in Konflikt stehen, können Sie sicher sein, dass Sie eine gute Praxis befolgen.

Im Allgemeinen gilt:

  1. Niemals .push() aufrufen, wenn Sie nicht dazu aufgefordert werden.
  2. Niemals .write() aufrufen, nachdem es false zurückgegeben hat, sondern stattdessen auf 'drain' warten.
  3. Streams ändern sich zwischen verschiedenen Node.js-Versionen und der von Ihnen verwendeten Bibliothek. Seien Sie vorsichtig und testen Sie die Dinge.

HINWEIS

In Bezug auf Punkt 3 ist readable-stream ein unglaublich nützliches Paket zum Erstellen von Browser-Streams. Rodd Vagg hat einen großartigen Blogbeitrag geschrieben, der den Nutzen dieser Bibliothek beschreibt. Kurz gesagt, sie bietet eine Art automatisierte, anmutige Degradierung für lesbare Streams und unterstützt ältere Versionen von Browsern und Node.js.

Regeln, die speziell für lesbare Streams gelten

Bisher haben wir uns angesehen, wie sich .write() auf den Gegendruck auswirkt, und uns hauptsächlich auf den schreibbaren Stream konzentriert. Aufgrund der Funktionalität von Node.js fließen Daten technisch gesehen von lesbar zu schreibbar. Wie wir jedoch bei jeder Übertragung von Daten, Materie oder Energie beobachten können, ist die Quelle genauso wichtig wie das Ziel, und der lesbare Stream ist entscheidend dafür, wie der Gegendruck behandelt wird.

Beide Prozesse sind voneinander abhängig, um effektiv zu kommunizieren. Wenn der lesbare Stream ignoriert, wenn der schreibbare Stream ihn auffordert, das Senden von Daten zu stoppen, kann dies genauso problematisch sein wie wenn der Rückgabewert von .write() falsch ist.

Zusätzlich zur Beachtung des .write()-Rückgabewerts müssen wir auch den Rückgabewert von .push() beachten, der in der ._read()-Methode verwendet wird. Wenn .push() einen falschen Wert zurückgibt, stoppt der Stream das Lesen aus der Quelle. Andernfalls wird er ohne Pause fortgesetzt.

Hier ist ein Beispiel für schlechte Praxis bei der Verwendung von .push():

javascript
// Das ist problematisch, da es den Rückgabewert des Push vollständig ignoriert
// Dies kann ein Signal für Gegendruck vom Zielstream sein!
class MyReadable extends Readable {
  _read(size) {
    let chunk
    while (null == (chunk = getNextChunk())) {
      this.push(chunk)
    }
  }
}

Darüber hinaus gibt es von außerhalb des benutzerdefinierten Streams Fallstricke, wenn der Gegendruck ignoriert wird. In diesem Gegenbeispiel für gute Praxis forciert der Code der Anwendung Daten durch, wann immer sie verfügbar sind (signalisiert durch das Ereignis 'data'):

javascript
// Dies ignoriert die Gegendruckmechanismen, die Node.js eingerichtet hat,
// und drückt bedingungslos Daten durch, unabhängig davon, ob der
// Zielstream bereit dafür ist oder nicht.
readable.on('data', data => writable.write(data))

Hier ist ein Beispiel für die Verwendung von .push() mit einem lesbaren Stream.

javascript
const { Readable } = require('node:stream')

// Erstellen Sie einen benutzerdefinierten lesbaren Stream
const myReadableStream = new Readable({
  objectMode: true,
  read(size) {
    // Einige Daten auf den Stream schieben
    this.push({ message: 'Hallo, Welt!' })
    this.push(null) // Markiert das Ende des Streams
  },
})

// Den Stream konsumieren
myReadableStream.on('data', chunk => {
  console.log(chunk)
})

// Ausgabe:
// { message: 'Hallo, Welt!' }

Regeln speziell für Writable Streams

Erinnern wir uns, dass ein .write() in Abhängigkeit von bestimmten Bedingungen true oder false zurückgeben kann. Glücklicherweise kümmert sich die Stream-Zustandsmaschine bei der Erstellung unseres eigenen Writable Streams um unsere Callbacks und entscheidet, wann Backpressure behandelt und der Datenfluss für uns optimiert werden soll. Wenn wir jedoch ein Writable direkt verwenden möchten, müssen wir den Rückgabewert von .write() respektieren und diese Bedingungen genau beachten:

  • Wenn die Schreibwarteschlange ausgelastet ist, gibt .write() false zurück.
  • Wenn der Datenabschnitt zu groß ist, gibt .write() false zurück (die Grenze wird durch die Variable highWaterMark angegeben).

In diesem Beispiel erstellen wir einen benutzerdefinierten Readable Stream, der mit .push() ein einzelnes Objekt in den Stream schiebt. Die ._read()-Methode wird aufgerufen, wenn der Stream bereit ist, Daten zu verarbeiten, und in diesem Fall schieben wir sofort einige Daten in den Stream und markieren das Ende des Streams durch das Schieben von null.

javascript
const stream = require('stream')

class MyReadable extends stream.Readable {
  constructor() {
    super()
  }

  _read() {
    const data = { message: 'Hallo, Welt!' }
    this.push(data)
    this.push(null)
  }
}

const readableStream = new MyReadable()

readableStream.pipe(process.stdout)

Wir verarbeiten den Stream dann, indem wir auf das 'data'-Ereignis lauschen und jeden Datenabschnitt protokollieren, der in den Stream geschoben wird. In diesem Fall schieben wir nur einen einzigen Datenabschnitt in den Stream, daher sehen wir nur eine Protokollmeldung.

Regeln speziell für Writable Streams

Erinnern wir uns, dass ein .write() in Abhängigkeit von bestimmten Bedingungen true oder false zurückgeben kann. Glücklicherweise kümmert sich die Stream-Zustandsmaschine bei der Erstellung unseres eigenen Writable Streams um unsere Callbacks und entscheidet, wann Backpressure behandelt und der Datenfluss für uns optimiert werden soll.

Wenn wir jedoch ein Writable direkt verwenden möchten, müssen wir den Rückgabewert von .write() respektieren und diese Bedingungen genau beachten:

  • Wenn die Schreibwarteschlange ausgelastet ist, gibt .write() false zurück.
  • Wenn der Datenabschnitt zu groß ist, gibt .write() false zurück (die Grenze wird durch die Variable highWaterMark angegeben).
javascript
class MyWritable extends Writable {
  // Dieser Writable ist aufgrund der asynchronen Natur von JavaScript-Callbacks ungültig.
  // Ohne eine return-Anweisung für jeden Callback vor dem letzten,
  // besteht eine große Wahrscheinlichkeit, dass mehrere Callbacks aufgerufen werden.
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) callback()
    else if (chunk.toString().indexOf('b') >= 0) callback()
    callback()
  }
}

Es gibt auch einige Dinge, auf die Sie bei der Implementierung von ._writev() achten sollten. Die Funktion ist an .cork() gekoppelt, aber es gibt einen häufigen Fehler beim Schreiben:

javascript
// Die zweimalige Verwendung von .uncork() hier führt zu zwei Aufrufen auf der C++-Ebene, wodurch die
// Cork/Uncork-Technik nutzlos wird.
ws.cork()
ws.write('hallo ')
ws.write('welt ')
ws.uncork()

ws.cork()
ws.write('von ')
ws.write('Matteo')
ws.uncork()

// Der richtige Weg, dies zu schreiben, ist die Verwendung von process.nextTick(), das im
// nächsten Event Loop ausgelöst wird.
ws.cork()
ws.write('hallo ')
ws.write('welt ')
process.nextTick(doUncork, ws)

ws.cork()
ws.write('von ')
ws.write('Matteo')
process.nextTick(doUncork, ws)

// Als globale Funktion.
function doUncork(stream) {
  stream.uncork()
}

.cork() kann beliebig oft aufgerufen werden, wir müssen nur darauf achten, .uncork() genauso oft aufzurufen, damit es wieder fließen kann.

Fazit

Streams sind ein häufig verwendetes Modul in Node.js. Sie sind wichtig für die interne Struktur und für Entwickler, um sich im gesamten Node.js-Modul-Ökosystem zu erweitern und zu verbinden.

Hoffentlich sind Sie nun in der Lage, Ihre eigenen Writable- und Readable-Streams unter Berücksichtigung des Gegendrucks zu beheben und sicher zu programmieren und Ihr Wissen mit Kollegen und Freunden zu teilen.

Lesen Sie unbedingt mehr über Stream für andere API-Funktionen, um Ihre Streaming-Fähigkeiten beim Erstellen einer Anwendung mit Node.js zu verbessern und freizusetzen.